Merge pull request #11809 from annando/fetch-cache
Repeated fetch requests are now prohibited
This commit is contained in:
commit
64894f9d6f
17
database.sql
17
database.sql
|
@ -1,6 +1,6 @@
|
||||||
-- ------------------------------------------
|
-- ------------------------------------------
|
||||||
-- Friendica 2022.09-dev (Giant Rhubarb)
|
-- Friendica 2022.09-dev (Giant Rhubarb)
|
||||||
-- DB_UPDATE_VERSION 1476
|
-- DB_UPDATE_VERSION 1477
|
||||||
-- ------------------------------------------
|
-- ------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
@ -632,6 +632,21 @@ CREATE TABLE IF NOT EXISTS `fcontact` (
|
||||||
FOREIGN KEY (`uri-id`) REFERENCES `item-uri` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE
|
FOREIGN KEY (`uri-id`) REFERENCES `item-uri` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE
|
||||||
) DEFAULT COLLATE utf8mb4_general_ci COMMENT='Diaspora compatible contacts - used in the Diaspora implementation';
|
) DEFAULT COLLATE utf8mb4_general_ci COMMENT='Diaspora compatible contacts - used in the Diaspora implementation';
|
||||||
|
|
||||||
|
--
|
||||||
|
-- TABLE fetch-entry
|
||||||
|
--
|
||||||
|
CREATE TABLE IF NOT EXISTS `fetch-entry` (
|
||||||
|
`id` int unsigned NOT NULL auto_increment COMMENT 'sequential ID',
|
||||||
|
`url` varbinary(255) COMMENT 'url that awaiting to be fetched',
|
||||||
|
`created` datetime NOT NULL DEFAULT '0001-01-01 00:00:00' COMMENT 'Creation date of the fetch request',
|
||||||
|
`wid` int unsigned COMMENT 'Workerqueue id',
|
||||||
|
PRIMARY KEY(`id`),
|
||||||
|
UNIQUE INDEX `url` (`url`),
|
||||||
|
INDEX `created` (`created`),
|
||||||
|
INDEX `wid` (`wid`),
|
||||||
|
FOREIGN KEY (`wid`) REFERENCES `workerqueue` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE
|
||||||
|
) DEFAULT COLLATE utf8mb4_general_ci COMMENT='';
|
||||||
|
|
||||||
--
|
--
|
||||||
-- TABLE fsuggest
|
-- TABLE fsuggest
|
||||||
--
|
--
|
||||||
|
|
|
@ -24,6 +24,7 @@ Database Tables
|
||||||
| [endpoint](help/database/db_endpoint) | ActivityPub endpoints - used in the ActivityPub implementation |
|
| [endpoint](help/database/db_endpoint) | ActivityPub endpoints - used in the ActivityPub implementation |
|
||||||
| [event](help/database/db_event) | Events |
|
| [event](help/database/db_event) | Events |
|
||||||
| [fcontact](help/database/db_fcontact) | Diaspora compatible contacts - used in the Diaspora implementation |
|
| [fcontact](help/database/db_fcontact) | Diaspora compatible contacts - used in the Diaspora implementation |
|
||||||
|
| [fetch-entry](help/database/db_fetch-entry) | |
|
||||||
| [fsuggest](help/database/db_fsuggest) | friend suggestion stuff |
|
| [fsuggest](help/database/db_fsuggest) | friend suggestion stuff |
|
||||||
| [group](help/database/db_group) | privacy groups, group info |
|
| [group](help/database/db_group) | privacy groups, group info |
|
||||||
| [group_member](help/database/db_group_member) | privacy groups, member info |
|
| [group_member](help/database/db_group_member) | privacy groups, member info |
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
Table fetch-entry
|
||||||
|
===========
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
Fields
|
||||||
|
------
|
||||||
|
|
||||||
|
| Field | Description | Type | Null | Key | Default | Extra |
|
||||||
|
| ------- | ---------------------------------- | -------------- | ---- | --- | ------------------- | -------------- |
|
||||||
|
| id | sequential ID | int unsigned | NO | PRI | NULL | auto_increment |
|
||||||
|
| url | url that awaiting to be fetched | varbinary(255) | YES | | NULL | |
|
||||||
|
| created | Creation date of the fetch request | datetime | NO | | 0001-01-01 00:00:00 | |
|
||||||
|
| wid | Workerqueue id | int unsigned | YES | | NULL | |
|
||||||
|
|
||||||
|
Indexes
|
||||||
|
------------
|
||||||
|
|
||||||
|
| Name | Fields |
|
||||||
|
| ------- | ----------- |
|
||||||
|
| PRIMARY | id |
|
||||||
|
| url | UNIQUE, url |
|
||||||
|
| created | created |
|
||||||
|
| wid | wid |
|
||||||
|
|
||||||
|
Foreign Keys
|
||||||
|
------------
|
||||||
|
|
||||||
|
| Field | Target Table | Target Field |
|
||||||
|
|-------|--------------|--------------|
|
||||||
|
| wid | [workerqueue](help/database/db_workerqueue) | id |
|
||||||
|
|
||||||
|
Return to [database documentation](help/database)
|
|
@ -0,0 +1,83 @@
|
||||||
|
<?php
|
||||||
|
/**
|
||||||
|
* @copyright Copyright (C) 2010-2022, the Friendica project
|
||||||
|
*
|
||||||
|
* @license GNU AGPL version 3 or any later version
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License as
|
||||||
|
* published by the Free Software Foundation, either version 3 of the
|
||||||
|
* License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Affero General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Friendica\Protocol\ActivityPub;
|
||||||
|
|
||||||
|
use Friendica\Core\Logger;
|
||||||
|
use Friendica\Database\Database;
|
||||||
|
use Friendica\Database\DBA;
|
||||||
|
use Friendica\Util\DateTimeFormat;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class handles the fetching of posts
|
||||||
|
*/
|
||||||
|
class Fetch
|
||||||
|
{
|
||||||
|
public static function add(string $url): int
|
||||||
|
{
|
||||||
|
DBA::insert('fetch-entry', ['url' => $url, 'created' => DateTimeFormat::utcNow()], Database::INSERT_IGNORE);
|
||||||
|
|
||||||
|
$fetch = DBA::selectFirst('fetch-entry', ['id'], ['url' => $url]);
|
||||||
|
Logger::debug('Added fetch entry', ['url' => $url, 'fetch' => $fetch]);
|
||||||
|
return $fetch['id'] ?? 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the worker id for the queue entry
|
||||||
|
*
|
||||||
|
* @param array $activity
|
||||||
|
* @param int $wid
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public static function setWorkerId(string $url, int $wid)
|
||||||
|
{
|
||||||
|
if (empty($url) || empty($wid)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
DBA::update('fetch-entry', ['wid' => $wid], ['url' => $url]);
|
||||||
|
Logger::debug('Worker id set', ['url' => $url, 'wid' => $wid]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if there is an assigned worker task
|
||||||
|
*
|
||||||
|
* @param array $activity
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public static function hasWorker(string $url): bool
|
||||||
|
{
|
||||||
|
$fetch = DBA::selectFirst('fetch-entry', ['id', 'wid'], ['url' => $url]);
|
||||||
|
if (empty($fetch['id'])) {
|
||||||
|
Logger::debug('No entry found for url', ['url' => $url]);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We don't have a workerqueue id yet. So most likely is isn't assigned yet.
|
||||||
|
// To avoid the ramping up of another fetch request we simply claim that there is a waiting worker.
|
||||||
|
if (!empty($fetch['id']) && empty($fetch['wid'])) {
|
||||||
|
Logger::debug('Entry without worker found for url', ['url' => $url]);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return DBA::exists('workerqueue', ['id' => $fetch['wid'], 'done' => false]);
|
||||||
|
}
|
||||||
|
}
|
|
@ -58,6 +58,7 @@ use Friendica\Worker\Delivery;
|
||||||
class Processor
|
class Processor
|
||||||
{
|
{
|
||||||
const CACHEKEY_FETCH_ACTIVITY = 'processor:fetchMissingActivity:';
|
const CACHEKEY_FETCH_ACTIVITY = 'processor:fetchMissingActivity:';
|
||||||
|
const CACHEKEY_JUST_FETCHED = 'processor:isJustFetched:';
|
||||||
/**
|
/**
|
||||||
* Extracts the tag character (#, @, !) from mention links
|
* Extracts the tag character (#, @, !) from mention links
|
||||||
*
|
*
|
||||||
|
@ -305,37 +306,52 @@ class Processor
|
||||||
}
|
}
|
||||||
|
|
||||||
if (empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) {
|
if (empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) {
|
||||||
|
if (self::hasJustBeenFetched($activity['reply-to-id'])) {
|
||||||
|
Logger::notice('We just have tried to fetch this activity. We don\'t try it again.', ['parent' => $activity['reply-to-id']]);
|
||||||
|
$fetch_by_worker = false;
|
||||||
|
if (empty($conversation)) {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
} else {
|
||||||
$recursion_depth = $activity['recursion-depth'] ?? 0;
|
$recursion_depth = $activity['recursion-depth'] ?? 0;
|
||||||
Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
|
Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
|
||||||
if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) {
|
if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) {
|
||||||
$result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
|
$result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
|
||||||
|
$fetch_by_worker = empty($result);
|
||||||
if (empty($result) && self::isActivityGone($activity['reply-to-id'])) {
|
if (empty($result) && self::isActivityGone($activity['reply-to-id'])) {
|
||||||
// Recursively delete this and all depending entries
|
|
||||||
if (!empty($activity['entry-id'])) {
|
if (!empty($activity['entry-id'])) {
|
||||||
Queue::deleteById($activity['entry-id']);
|
Queue::deleteById($activity['entry-id']);
|
||||||
}
|
}
|
||||||
|
if (empty($conversation)) {
|
||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
$fetch_by_worker = empty($result);
|
}
|
||||||
} else {
|
} else {
|
||||||
Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
|
Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
|
||||||
$fetch_by_worker = true;
|
$fetch_by_worker = true;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if ($fetch_by_worker && Queue::hasWorker($activity)) {
|
if ($fetch_by_worker && Queue::hasWorker($activity)) {
|
||||||
Logger::notice('There is already a worker task to fetch the post.', ['id' => $activity['id'], 'parent' => $activity['reply-to-id']]);
|
Logger::notice('There is already a worker task to fetch the post.', ['id' => $activity['id'], 'parent' => $activity['reply-to-id']]);
|
||||||
$fetch_by_worker = false;
|
$fetch_by_worker = false;
|
||||||
if (!empty($conversation)) {
|
if (empty($conversation)) {
|
||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($fetch_by_worker) {
|
if ($fetch_by_worker && DI::config()->get('system', 'fetch_by_worker')) {
|
||||||
Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
|
Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
|
||||||
$activity['recursion-depth'] = 0;
|
$activity['recursion-depth'] = 0;
|
||||||
|
if (!Fetch::hasWorker($activity['reply-to-id'])) {
|
||||||
|
Fetch::add($activity['reply-to-id']);
|
||||||
$wid = Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
|
$wid = Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
|
||||||
|
Fetch::setWorkerId($activity['reply-to-id'], $wid);
|
||||||
Queue::setWorkerId($activity, $wid);
|
Queue::setWorkerId($activity, $wid);
|
||||||
if (!empty($conversation)) {
|
} else {
|
||||||
|
Logger::debug('Activity will already be fetched via a worker.', ['url' => $activity['reply-to-id']]);
|
||||||
|
}
|
||||||
|
if (empty($conversation)) {
|
||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
} elseif (!empty($result)) {
|
} elseif (!empty($result)) {
|
||||||
|
@ -466,6 +482,23 @@ class Processor
|
||||||
return $item;
|
return $item;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a given activity has recently been fetched
|
||||||
|
*
|
||||||
|
* @param string $url
|
||||||
|
* @return boolean
|
||||||
|
*/
|
||||||
|
private static function hasJustBeenFetched(string $url): bool
|
||||||
|
{
|
||||||
|
$cachekey = self::CACHEKEY_JUST_FETCHED . $url;
|
||||||
|
$time = DI::cache()->get($cachekey);
|
||||||
|
if (is_null($time)) {
|
||||||
|
DI::cache()->set($cachekey, time(), Duration::FIVE_MINUTES);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return ($time + 300) > time();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if a given activity is no longer available
|
* Check if a given activity is no longer available
|
||||||
*
|
*
|
||||||
|
|
|
@ -55,7 +55,7 @@
|
||||||
use Friendica\Database\DBA;
|
use Friendica\Database\DBA;
|
||||||
|
|
||||||
if (!defined('DB_UPDATE_VERSION')) {
|
if (!defined('DB_UPDATE_VERSION')) {
|
||||||
define('DB_UPDATE_VERSION', 1476);
|
define('DB_UPDATE_VERSION', 1477);
|
||||||
}
|
}
|
||||||
|
|
||||||
return [
|
return [
|
||||||
|
@ -692,6 +692,20 @@ return [
|
||||||
"uri-id" => ["UNIQUE", "uri-id"],
|
"uri-id" => ["UNIQUE", "uri-id"],
|
||||||
]
|
]
|
||||||
],
|
],
|
||||||
|
"fetch-entry" => [
|
||||||
|
"comment" => "",
|
||||||
|
"fields" => [
|
||||||
|
"id" => ["type" => "int unsigned", "not null" => "1", "extra" => "auto_increment", "primary" => "1", "comment" => "sequential ID"],
|
||||||
|
"url" => ["type" => "varbinary(255)", "comment" => "url that awaiting to be fetched"],
|
||||||
|
"created" => ["type" => "datetime", "not null" => "1", "default" => DBA::NULL_DATETIME, "comment" => "Creation date of the fetch request"],
|
||||||
|
"wid" => ["type" => "int unsigned", "foreign" => ["workerqueue" => "id"], "comment" => "Workerqueue id"], ],
|
||||||
|
"indexes" => [
|
||||||
|
"PRIMARY" => ["id"],
|
||||||
|
"url" => ["UNIQUE", "url"],
|
||||||
|
"created" => ["created"],
|
||||||
|
"wid" => ["wid"],
|
||||||
|
]
|
||||||
|
],
|
||||||
"fsuggest" => [
|
"fsuggest" => [
|
||||||
"comment" => "friend suggestion stuff",
|
"comment" => "friend suggestion stuff",
|
||||||
"fields" => [
|
"fields" => [
|
||||||
|
|
|
@ -282,6 +282,10 @@ return [
|
||||||
// Priority for the expirary notification
|
// Priority for the expirary notification
|
||||||
'expire-notify-priority' => PRIORITY_LOW,
|
'expire-notify-priority' => PRIORITY_LOW,
|
||||||
|
|
||||||
|
// fetch_by_worker (Boolean)
|
||||||
|
// Fetch missing posts via a background process
|
||||||
|
'fetch_by_worker' => false,
|
||||||
|
|
||||||
// free_crawls (Integer)
|
// free_crawls (Integer)
|
||||||
// Number of "free" searches when system => permit_crawling is enabled.
|
// Number of "free" searches when system => permit_crawling is enabled.
|
||||||
'free_crawls' => 10,
|
'free_crawls' => 10,
|
||||||
|
@ -421,8 +425,8 @@ return [
|
||||||
'max_processes_frontend' => 20,
|
'max_processes_frontend' => 20,
|
||||||
|
|
||||||
// max_recursion_depth (Integer)
|
// max_recursion_depth (Integer)
|
||||||
// Maximum recursion depth when fetching posts until the job is delegated to a worker task.
|
// Maximum recursion depth when fetching posts until the job is delegated to a worker task or finished.
|
||||||
'max_recursion_depth' => 10,
|
'max_recursion_depth' => 50,
|
||||||
|
|
||||||
// maximagesize (Integer)
|
// maximagesize (Integer)
|
||||||
// Maximum size in bytes of an uploaded photo.
|
// Maximum size in bytes of an uploaded photo.
|
||||||
|
|
Loading…
Reference in New Issue
Block a user