From 6c7dfd6958885d21d3a1809dc1a0a215ea3bd600 Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 1 Aug 2022 04:48:49 +0000 Subject: [PATCH 1/5] New table to control the fetching process --- src/Protocol/ActivityPub/Fetch.php | 84 ++++++++++++++++++++++++++ src/Protocol/ActivityPub/Processor.php | 10 ++- static/dbstructure.config.php | 16 ++++- 3 files changed, 107 insertions(+), 3 deletions(-) create mode 100644 src/Protocol/ActivityPub/Fetch.php diff --git a/src/Protocol/ActivityPub/Fetch.php b/src/Protocol/ActivityPub/Fetch.php new file mode 100644 index 0000000000..198202b1c1 --- /dev/null +++ b/src/Protocol/ActivityPub/Fetch.php @@ -0,0 +1,84 @@ +. + * + */ + +namespace Friendica\Protocol\ActivityPub; + +use Friendica\Core\Logger; +use Friendica\Database\Database; +use Friendica\Database\DBA; +use Friendica\Util\DateTimeFormat; + +/** + * This class handles the fetching of posts + */ +class Fetch +{ + public static function add(string $url): int + { + DBA::insert('fetch-entry', ['url' => $url, 'created' => DateTimeFormat::utcNow()], Database::INSERT_IGNORE); + + $fetch = DBA::selectFirst('fetch-entry', ['id'], ['url' => $url]); + Logger::debug('Added fetch entry', ['url' => $url, 'fetch' => $fetch]); + return $fetch['id'] ?? 0; + } + + /** + * Set the worker id for the queue entry + * + * @param array $activity + * @param int $wid + * @return void + */ + public static function setWorkerId(string $url, int $wid) + { + if (empty($url) || empty($wid)) { + return; + } + + DBA::update('fetch-entry', ['wid' => $wid], ['url' => $url]); + Logger::debug('Worker id set', ['url' => $url, 'wid' => $wid]); + } + + /** + * Check if there is an assigned worker task + * + * @param array $activity + * @return bool + */ + public static function hasWorker(string $url): bool + { + $fetch = DBA::selectFirst('fetch-entry', ['id', 'wid'], ['url' => $url]); + if (empty($fetch['id'])) { + Logger::debug('No entry found for url', ['url' => $url]); + return false; + } + + // We don't have a workerqueue id yet. So most likely is isn't assigned yet. + // To avoid the ramping up of another fetch request we simply claim that there is a waiting worker. + if (!empty($fetch['id']) && empty($fetch['wid'])) { + Logger::debug('Entry without worker found for url', ['url' => $url]); + return true; + } + + return DBA::exists('workerqueue', ['id' => $fetch['wid'], 'done' => false]); + } + +} diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index 96308ca2cf..6d36786a22 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -333,8 +333,14 @@ class Processor if ($fetch_by_worker) { Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); $activity['recursion-depth'] = 0; - $wid = Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); - Queue::setWorkerId($activity, $wid); + if (!Fetch::hasWorker($activity['reply-to-id'])) { + Fetch::add($activity['reply-to-id']); + $wid = Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); + Fetch::setWorkerId($activity['reply-to-id'], $wid); + Queue::setWorkerId($activity, $wid); + } else { + Logger::debug('Activity is already in the fetching process', ['url' => $activity['reply-to-id']]); + } if (!empty($conversation)) { return []; } diff --git a/static/dbstructure.config.php b/static/dbstructure.config.php index 79da57a2bb..f72ec08020 100644 --- a/static/dbstructure.config.php +++ b/static/dbstructure.config.php @@ -55,7 +55,7 @@ use Friendica\Database\DBA; if (!defined('DB_UPDATE_VERSION')) { - define('DB_UPDATE_VERSION', 1476); + define('DB_UPDATE_VERSION', 1477); } return [ @@ -692,6 +692,20 @@ return [ "uri-id" => ["UNIQUE", "uri-id"], ] ], + "fetch-entry" => [ + "comment" => "", + "fields" => [ + "id" => ["type" => "int unsigned", "not null" => "1", "extra" => "auto_increment", "primary" => "1", "comment" => "sequential ID"], + "url" => ["type" => "varbinary(255)", "comment" => "url that awaiting to be fetched"], + "created" => ["type" => "datetime", "not null" => "1", "default" => DBA::NULL_DATETIME, "comment" => "Creation date of the fetch request"], + "wid" => ["type" => "int unsigned", "foreign" => ["workerqueue" => "id"], "comment" => "Workerqueue id"], ], + "indexes" => [ + "PRIMARY" => ["id"], + "url" => ["UNIQUE", "url"], + "created" => ["created"], + "wid" => ["wid"], + ] + ], "fsuggest" => [ "comment" => "friend suggestion stuff", "fields" => [ From 4be6e9a27ddace3e3e8dda470f29b60936c00519 Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 1 Aug 2022 05:56:55 +0000 Subject: [PATCH 2/5] Cache if an activity has recently been fetched --- src/Protocol/ActivityPub/Processor.php | 55 +++++++++++++++++++------- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index 96308ca2cf..4a420490c3 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -58,6 +58,7 @@ use Friendica\Worker\Delivery; class Processor { const CACHEKEY_FETCH_ACTIVITY = 'processor:fetchMissingActivity:'; + const CACHEKEY_JUST_FETCHED = 'processor:isJustFetched:'; /** * Extracts the tag character (#, @, !) from mention links * @@ -305,27 +306,36 @@ class Processor } if (empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) { - $recursion_depth = $activity['recursion-depth'] ?? 0; - Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); - if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) { - $result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); - if (empty($result) && self::isActivityGone($activity['reply-to-id'])) { - // Recursively delete this and all depending entries - if (!empty($activity['entry-id'])) { - Queue::deleteById($activity['entry-id']); - } + if (self::hasJustBeenFetched($activity['reply-to-id'])) { + Logger::notice('We just have tried to fetch this activity. We don\'t try it again.', ['parent' => $activity['reply-to-id']]); + $fetch_by_worker = false; + if (empty($conversation)) { return []; } - $fetch_by_worker = empty($result); } else { - Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); - $fetch_by_worker = true; + $recursion_depth = $activity['recursion-depth'] ?? 0; + Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); + if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) { + $result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); + $fetch_by_worker = empty($result); + if (empty($result) && self::isActivityGone($activity['reply-to-id'])) { + if (!empty($activity['entry-id'])) { + Queue::deleteById($activity['entry-id']); + } + if (empty($conversation)) { + return []; + } + } + } else { + Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); + $fetch_by_worker = true; + } } if ($fetch_by_worker && Queue::hasWorker($activity)) { Logger::notice('There is already a worker task to fetch the post.', ['id' => $activity['id'], 'parent' => $activity['reply-to-id']]); $fetch_by_worker = false; - if (!empty($conversation)) { + if (empty($conversation)) { return []; } } @@ -335,7 +345,7 @@ class Processor $activity['recursion-depth'] = 0; $wid = Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); Queue::setWorkerId($activity, $wid); - if (!empty($conversation)) { + if (empty($conversation)) { return []; } } elseif (!empty($result)) { @@ -466,6 +476,23 @@ class Processor return $item; } + /** + * Check if a given activity has recently been fetched + * + * @param string $url + * @return boolean + */ + private static function hasJustBeenFetched(string $url): bool + { + $cachekey = self::CACHEKEY_JUST_FETCHED . $url; + $time = DI::cache()->get($cachekey); + if (is_null($time)) { + DI::cache()->set($cachekey, time(), Duration::FIVE_MINUTES); + return false; + } + return ($time + 300) > time(); + } + /** * Check if a given activity is no longer available * From f7ec8d5b8e39cb5284f78efb882413de37f4891c Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 1 Aug 2022 05:59:59 +0000 Subject: [PATCH 3/5] Improved log message --- src/Protocol/ActivityPub/Processor.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index 4f31fe8c28..6959755a11 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -349,7 +349,7 @@ class Processor Fetch::setWorkerId($activity['reply-to-id'], $wid); Queue::setWorkerId($activity, $wid); } else { - Logger::debug('Activity is already in the fetching process', ['url' => $activity['reply-to-id']]); + Logger::debug('Activity will already be fetched via a worker.', ['url' => $activity['reply-to-id']]); } if (empty($conversation)) { return []; From 81b244f91ed1706c9a645ae8eb45254b633e031b Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 1 Aug 2022 06:59:20 +0000 Subject: [PATCH 4/5] Option to activate or deactivate the background fetching --- src/Protocol/ActivityPub/Processor.php | 2 +- static/defaults.config.php | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index 6959755a11..653649aa2d 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -340,7 +340,7 @@ class Processor } } - if ($fetch_by_worker) { + if ($fetch_by_worker && DI::config()->get('system', 'fetch_by_worker')) { Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); $activity['recursion-depth'] = 0; if (!Fetch::hasWorker($activity['reply-to-id'])) { diff --git a/static/defaults.config.php b/static/defaults.config.php index 01cd24f04b..ee15ce826d 100644 --- a/static/defaults.config.php +++ b/static/defaults.config.php @@ -282,6 +282,10 @@ return [ // Priority for the expirary notification 'expire-notify-priority' => PRIORITY_LOW, + // fetch_by_worker (Boolean) + // Fetch missing posts via a background process + 'fetch_by_worker' => false, + // free_crawls (Integer) // Number of "free" searches when system => permit_crawling is enabled. 'free_crawls' => 10, @@ -421,8 +425,8 @@ return [ 'max_processes_frontend' => 20, // max_recursion_depth (Integer) - // Maximum recursion depth when fetching posts until the job is delegated to a worker task. - 'max_recursion_depth' => 10, + // Maximum recursion depth when fetching posts until the job is delegated to a worker task or finished. + 'max_recursion_depth' => 50, // maximagesize (Integer) // Maximum size in bytes of an uploaded photo. From c8d1bf4cdf1e1d0f16538b3121f8c022c17d02bf Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 1 Aug 2022 07:06:30 +0000 Subject: [PATCH 5/5] Code standards and updated database documentation --- database.sql | 17 ++++++++++++++- doc/database.md | 1 + doc/database/db_fetch-entry.md | 33 ++++++++++++++++++++++++++++++ src/Protocol/ActivityPub/Fetch.php | 1 - 4 files changed, 50 insertions(+), 2 deletions(-) create mode 100644 doc/database/db_fetch-entry.md diff --git a/database.sql b/database.sql index a2f09e1c38..7663b71590 100644 --- a/database.sql +++ b/database.sql @@ -1,6 +1,6 @@ -- ------------------------------------------ -- Friendica 2022.09-dev (Giant Rhubarb) --- DB_UPDATE_VERSION 1476 +-- DB_UPDATE_VERSION 1477 -- ------------------------------------------ @@ -632,6 +632,21 @@ CREATE TABLE IF NOT EXISTS `fcontact` ( FOREIGN KEY (`uri-id`) REFERENCES `item-uri` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE ) DEFAULT COLLATE utf8mb4_general_ci COMMENT='Diaspora compatible contacts - used in the Diaspora implementation'; +-- +-- TABLE fetch-entry +-- +CREATE TABLE IF NOT EXISTS `fetch-entry` ( + `id` int unsigned NOT NULL auto_increment COMMENT 'sequential ID', + `url` varbinary(255) COMMENT 'url that awaiting to be fetched', + `created` datetime NOT NULL DEFAULT '0001-01-01 00:00:00' COMMENT 'Creation date of the fetch request', + `wid` int unsigned COMMENT 'Workerqueue id', + PRIMARY KEY(`id`), + UNIQUE INDEX `url` (`url`), + INDEX `created` (`created`), + INDEX `wid` (`wid`), + FOREIGN KEY (`wid`) REFERENCES `workerqueue` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE +) DEFAULT COLLATE utf8mb4_general_ci COMMENT=''; + -- -- TABLE fsuggest -- diff --git a/doc/database.md b/doc/database.md index 9932ee307a..2bd2fa5f49 100644 --- a/doc/database.md +++ b/doc/database.md @@ -24,6 +24,7 @@ Database Tables | [endpoint](help/database/db_endpoint) | ActivityPub endpoints - used in the ActivityPub implementation | | [event](help/database/db_event) | Events | | [fcontact](help/database/db_fcontact) | Diaspora compatible contacts - used in the Diaspora implementation | +| [fetch-entry](help/database/db_fetch-entry) | | | [fsuggest](help/database/db_fsuggest) | friend suggestion stuff | | [group](help/database/db_group) | privacy groups, group info | | [group_member](help/database/db_group_member) | privacy groups, member info | diff --git a/doc/database/db_fetch-entry.md b/doc/database/db_fetch-entry.md new file mode 100644 index 0000000000..4b3cba1042 --- /dev/null +++ b/doc/database/db_fetch-entry.md @@ -0,0 +1,33 @@ +Table fetch-entry +=========== + + + +Fields +------ + +| Field | Description | Type | Null | Key | Default | Extra | +| ------- | ---------------------------------- | -------------- | ---- | --- | ------------------- | -------------- | +| id | sequential ID | int unsigned | NO | PRI | NULL | auto_increment | +| url | url that awaiting to be fetched | varbinary(255) | YES | | NULL | | +| created | Creation date of the fetch request | datetime | NO | | 0001-01-01 00:00:00 | | +| wid | Workerqueue id | int unsigned | YES | | NULL | | + +Indexes +------------ + +| Name | Fields | +| ------- | ----------- | +| PRIMARY | id | +| url | UNIQUE, url | +| created | created | +| wid | wid | + +Foreign Keys +------------ + +| Field | Target Table | Target Field | +|-------|--------------|--------------| +| wid | [workerqueue](help/database/db_workerqueue) | id | + +Return to [database documentation](help/database) diff --git a/src/Protocol/ActivityPub/Fetch.php b/src/Protocol/ActivityPub/Fetch.php index 198202b1c1..54666e691e 100644 --- a/src/Protocol/ActivityPub/Fetch.php +++ b/src/Protocol/ActivityPub/Fetch.php @@ -80,5 +80,4 @@ class Fetch return DBA::exists('workerqueue', ['id' => $fetch['wid'], 'done' => false]); } - }