From f2eec66240f674ac56ef57350606aab6cfda8aa8 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 3 Aug 2022 03:38:03 +0000 Subject: [PATCH 1/4] Hopefully fixes loops during message processing --- src/Model/Contact.php | 2 +- src/Protocol/ActivityPub/Processor.php | 136 +++++++++++++++---------- src/Protocol/ActivityPub/Queue.php | 44 ++++++-- src/Protocol/ActivityPub/Receiver.php | 28 +++-- src/Worker/Cron.php | 4 +- static/defaults.config.php | 4 + 6 files changed, 142 insertions(+), 76 deletions(-) diff --git a/src/Model/Contact.php b/src/Model/Contact.php index 2c1ca2d4ce..c873c3941e 100644 --- a/src/Model/Contact.php +++ b/src/Model/Contact.php @@ -2449,7 +2449,7 @@ class Contact $new_pubkey = $ret['pubkey'] ?? ''; - if ($uid == 0) { + if ($uid == 0 && DI::config()->get('system', 'fetch_featured_posts')) { if ($ret['network'] == Protocol::ACTIVITYPUB) { $apcontact = APContact::getByURL($ret['url'], false); if (!empty($apcontact['featured'])) { diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index 653649aa2d..a409fec115 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -306,58 +306,13 @@ class Processor } if (empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) { - if (self::hasJustBeenFetched($activity['reply-to-id'])) { - Logger::notice('We just have tried to fetch this activity. We don\'t try it again.', ['parent' => $activity['reply-to-id']]); - $fetch_by_worker = false; - if (empty($conversation)) { - return []; - } - } else { - $recursion_depth = $activity['recursion-depth'] ?? 0; - Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); - if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) { - $result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); - $fetch_by_worker = empty($result); - if (empty($result) && self::isActivityGone($activity['reply-to-id'])) { - if (!empty($activity['entry-id'])) { - Queue::deleteById($activity['entry-id']); - } - if (empty($conversation)) { - return []; - } - } - } else { - Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); - $fetch_by_worker = true; - } - } - - if ($fetch_by_worker && Queue::hasWorker($activity)) { - Logger::notice('There is already a worker task to fetch the post.', ['id' => $activity['id'], 'parent' => $activity['reply-to-id']]); - $fetch_by_worker = false; - if (empty($conversation)) { - return []; - } - } - - if ($fetch_by_worker && DI::config()->get('system', 'fetch_by_worker')) { - Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); - $activity['recursion-depth'] = 0; - if (!Fetch::hasWorker($activity['reply-to-id'])) { - Fetch::add($activity['reply-to-id']); - $wid = Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); - Fetch::setWorkerId($activity['reply-to-id'], $wid); - Queue::setWorkerId($activity, $wid); - } else { - Logger::debug('Activity will already be fetched via a worker.', ['url' => $activity['reply-to-id']]); - } - if (empty($conversation)) { - return []; - } - } elseif (!empty($result)) { + $result = self::fetchParent($activity); + if (!empty($result)) { if (($item['thr-parent'] != $result) && Post::exists(['uri' => $result])) { $item['thr-parent'] = $result; } + } elseif (empty($conversation)) { + return []; } } @@ -482,6 +437,77 @@ class Processor return $item; } + /** + * Fetch and process parent posts for the given activity + * + * @param array $activity + * + * @return string + */ + private static function fetchParent(array $activity): string + { + if (self::hasJustBeenFetched($activity['reply-to-id'])) { + Logger::notice('We just have tried to fetch this activity. We don\'t try it again.', ['parent' => $activity['reply-to-id']]); + return ''; + } + + $recursion_depth = $activity['recursion-depth'] ?? 0; + + if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) { + Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); + $result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); + if (empty($result) && self::isActivityGone($activity['reply-to-id'])) { + Logger::notice('The activity is gone, the queue entry will be deleted', ['parent' => $activity['reply-to-id']]); + if (!empty($activity['entry-id'])) { + Queue::deleteById($activity['entry-id']); + } + return ''; + } elseif (!empty($result)) { + $exists = Post::exists(['uri' => [$result, $activity['reply-to-id']]]); + if ($exists) { + Logger::notice('The activity has been fetched and created.', ['parent' => $result]); + return $result; + } elseif (DI::config()->get('system', 'fetch_by_worker') || DI::config()->get('system', 'decoupled_receiver')) { + Logger::notice('The activity has been fetched and will hopefully be created later.', ['parent' => $result]); + } else { + Logger::notice('The activity exists but has not been created, the queue entry will be deleted.', ['parent' => $result]); + if (!empty($activity['entry-id'])) { + Queue::deleteById($activity['entry-id']); + } + } + return ''; + } + if (empty($result) && !DI::config()->get('system', 'fetch_by_worker')) { + return ''; + } + } elseif (self::isActivityGone($activity['reply-to-id'])) { + Logger::notice('The activity is gone. We will not spawn a worker. The queue entry will be deleted', ['parent' => $activity['reply-to-id']]); + if (!empty($activity['entry-id'])) { + Queue::deleteById($activity['entry-id']); + } + return ''; + } else { + Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); + } + + if (Queue::hasWorker($activity['worker-id'] ?? 0)) { + Logger::notice('There is already a worker task to fetch the post.', ['id' => $activity['id'], 'parent' => $activity['reply-to-id']]); + return ''; + } + + if (!Fetch::hasWorker($activity['reply-to-id'])) { + Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]); + Fetch::add($activity['reply-to-id']); + $activity['recursion-depth'] = 0; + $wid = Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO); + Fetch::setWorkerId($activity['reply-to-id'], $wid); + } else { + Logger::debug('Activity will already be fetched via a worker.', ['url' => $activity['reply-to-id']]); + } + + return ''; + } + /** * Check if a given activity has recently been fetched * @@ -1022,7 +1048,7 @@ class Processor Queue::remove($activity); if ($success && Queue::hasChildren($item['uri'])) { - Worker::add(PRIORITY_HIGH, 'ProcessReplyByUri', $item['uri']); + Queue::processReplyByUri($item['uri']); } // Store send a follow request for every reshare - but only when the item had been stored @@ -1366,9 +1392,13 @@ class Processor return ''; } - ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), $uid, true, false, $signer); - - Logger::notice('Activity had been fetched and processed.', ['url' => $url, 'object' => $activity['id']]); + if (($completion == Receiver::COMPLETION_RELAY) && Queue::exists($url, 'as:Create')) { + Logger::notice('Activity has already been queued.', ['url' => $url, 'object' => $activity['id']]); + } elseif (ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), $uid, true, false, $signer, '', $completion)) { + Logger::notice('Activity had been fetched and processed.', ['url' => $url, 'entry' => $child['entry-id'] ?? 0, 'completion' => $completion, 'object' => $activity['id']]); + } else { + Logger::notice('Activity had been fetched and will be processed later.', ['url' => $url, 'entry' => $child['entry-id'] ?? 0, 'completion' => $completion, 'object' => $activity['id']]); + } return $activity['id']; } diff --git a/src/Protocol/ActivityPub/Queue.php b/src/Protocol/ActivityPub/Queue.php index 321bd7f432..87d491bd99 100644 --- a/src/Protocol/ActivityPub/Queue.php +++ b/src/Protocol/ActivityPub/Queue.php @@ -84,6 +84,18 @@ class Queue return $activity; } + /** + * Checks if an entryy for a given url and type already exists + * + * @param string $url + * @param string $type + * @return boolean + */ + public static function exists(string $url, string $type): bool + { + return DBA::exists('inbox-entry', ['type' => $type, 'object-id' => $url]); + } + /** * Remove activity from the queue * @@ -132,30 +144,31 @@ class Queue /** * Set the worker id for the queue entry * - * @param array $activity - * @param int $wid + * @param int $entryid + * @param int $wid * @return void */ - public static function setWorkerId(array $activity, int $wid) + public static function setWorkerId(int $entryid, int $wid) { - if (empty($activity['entry-id']) || empty($wid)) { + if (empty($entryid) || empty($wid)) { return; } - DBA::update('inbox-entry', ['wid' => $wid], ['id' => $activity['entry-id']]); + DBA::update('inbox-entry', ['wid' => $wid], ['id' => $entryid]); } /** * Check if there is an assigned worker task * - * @param array $activity + * @param int $wid + * * @return bool */ - public static function hasWorker(array $activity = []): bool + public static function hasWorker(int $wid): bool { - if (empty($activity['worker-id'])) { + if (empty($wid)) { return false; } - return DBA::exists('workerqueue', ['id' => $activity['worker-id'], 'done' => false]); + return DBA::exists('workerqueue', ['id' => $wid, 'done' => false]); } /** @@ -172,6 +185,18 @@ class Queue return false; } + if (!empty($entry['wid'])) { + $worker = DI::app()->getQueue(); + $wid = $worker['id'] ?? 0; + if ($entry['wid'] != $wid) { + $workerqueue = DBA::selectFirst('workerqueue', ['pid'], ['id' => $entry['wid'], 'done' => false]); + if (!empty($workerqueue['pid']) && posix_kill($workerqueue['pid'], 0)) { + Logger::notice('Entry is already processed via another process.', ['current' => $wid, 'processor' => $entry['wid']]); + return false; + } + } + } + Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id']]); $activity = json_decode($entry['activity'], true); @@ -314,6 +339,5 @@ class Queue } } DBA::close($entries); - } } diff --git a/src/Protocol/ActivityPub/Receiver.php b/src/Protocol/ActivityPub/Receiver.php index 1d337f28af..b3d67b2af7 100644 --- a/src/Protocol/ActivityPub/Receiver.php +++ b/src/Protocol/ActivityPub/Receiver.php @@ -507,26 +507,29 @@ class Receiver * @param boolean $trust_source Do we trust the source? * @param boolean $push Message had been pushed to our system * @param array $signer The signer of the post + * + * @return bool + * * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - public static function processActivity(array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = [], string $http_signer = '') + public static function processActivity(array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = [], string $http_signer = '', int $completion = Receiver::COMPLETION_AUTO): bool { $type = JsonLD::fetchElement($activity, '@type'); if (!$type) { Logger::info('Empty type', ['activity' => $activity]); - return; + return true; } if (!JsonLD::fetchElement($activity, 'as:object', '@id')) { Logger::info('Empty object', ['activity' => $activity]); - return; + return true; } $actor = JsonLD::fetchElement($activity, 'as:actor', '@id'); if (empty($actor)) { Logger::info('Empty actor', ['activity' => $activity]); - return; + return true; } if (is_array($activity['as:object'])) { @@ -548,7 +551,7 @@ class Receiver $object_data = self::prepareObjectData($activity, $uid, $push, $trust_source); if (empty($object_data)) { Logger::info('No object data found', ['activity' => $activity]); - return; + return true; } // Lemmy is announcing activities. @@ -583,21 +586,27 @@ class Receiver $object_data['object_activity'] = $activity; } + if (($type == 'as:Create') && Queue::exists($object_data['object_id'], $type)) { + Logger::info('The activity is already added.', ['id' => $object_data['object_id']]); + return true; + } + if (DI::config()->get('system', 'decoupled_receiver') && ($trust_source || DI::config()->get('debug', 'ap_inbox_store_untrusted'))) { $object_data = Queue::add($object_data, $type, $uid, $http_signer, $push, $trust_source); } if (!$trust_source) { Logger::info('Activity trust could not be achieved.', ['id' => $object_data['object_id'], 'type' => $type, 'signer' => $signer, 'actor' => $actor, 'attributedTo' => $attributed_to]); - return; + return true; } - if (!empty($object_data['entry-id']) && DI::config()->get('system', 'decoupled_receiver') && ($push || ($activity['completion-mode'] == self::COMPLETION_RELAY))) { + if (!empty($object_data['entry-id']) && DI::config()->get('system', 'decoupled_receiver') && ($push || ($completion == self::COMPLETION_RELAY))) { // We delay by 5 seconds to allow to accumulate all receivers $delayed = date(DateTimeFormat::MYSQL, time() + 5); Logger::debug('Initiate processing', ['id' => $object_data['entry-id'], 'uri' => $object_data['object_id']]); - Worker::add(['priority' => PRIORITY_HIGH, 'delayed' => $delayed], 'ProcessQueue', $object_data['entry-id']); - return; + $wid = Worker::add(['priority' => PRIORITY_HIGH, 'delayed' => $delayed], 'ProcessQueue', $object_data['entry-id']); + Queue::setWorkerId($object_data['entry-id'], $wid); + return false; } if (!empty($activity['recursion-depth'])) { @@ -612,6 +621,7 @@ class Receiver self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); Queue::remove($object_data); } + return true; } /** diff --git a/src/Worker/Cron.php b/src/Worker/Cron.php index 2c724366da..12ea6e60a3 100644 --- a/src/Worker/Cron.php +++ b/src/Worker/Cron.php @@ -93,9 +93,7 @@ class Cron Queue::clear(); // Process all unprocessed entries - if (DI::config()->get('system', 'decoupled_receiver')) { - Queue::processAll(); - } + Queue::processAll(); // Search for new contacts in the directory if (DI::config()->get('system', 'synchronize_directory')) { diff --git a/static/defaults.config.php b/static/defaults.config.php index ee15ce826d..44c80b7b0c 100644 --- a/static/defaults.config.php +++ b/static/defaults.config.php @@ -286,6 +286,10 @@ return [ // Fetch missing posts via a background process 'fetch_by_worker' => false, + // fetch_featured_posts (Boolean) + // Fetch featured posts from all contacts + 'fetch_featured_posts' => false, + // free_crawls (Integer) // Number of "free" searches when system => permit_crawling is enabled. 'free_crawls' => 10, From 23ef4a99bb9e612e62b7627b1b4d6a5238837343 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 3 Aug 2022 04:31:56 +0000 Subject: [PATCH 2/4] Changes after code review --- src/Protocol/ActivityPub/Queue.php | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Protocol/ActivityPub/Queue.php b/src/Protocol/ActivityPub/Queue.php index 87d491bd99..751ee93bda 100644 --- a/src/Protocol/ActivityPub/Queue.php +++ b/src/Protocol/ActivityPub/Queue.php @@ -85,7 +85,7 @@ class Queue } /** - * Checks if an entryy for a given url and type already exists + * Checks if an entry for a given url and type already exists * * @param string $url * @param string $type @@ -144,16 +144,16 @@ class Queue /** * Set the worker id for the queue entry * - * @param int $entryid + * @param int $entry_id * @param int $wid * @return void */ - public static function setWorkerId(int $entryid, int $wid) + public static function setWorkerId(int $entry_id, int $wid) { - if (empty($entryid) || empty($wid)) { + if (empty($entry_id) || empty($wid)) { return; } - DBA::update('inbox-entry', ['wid' => $wid], ['id' => $entryid]); + DBA::update('inbox-entry', ['wid' => $wid], ['id' => $entry_id]); } /** From 3463e346932c27c4eb8b38fb3d1ecd0e83c519ad Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 3 Aug 2022 04:51:57 +0000 Subject: [PATCH 3/4] Don't always fetch parent posts --- src/Protocol/ActivityPub/Processor.php | 9 ++++++--- src/Protocol/ActivityPub/Queue.php | 9 +++++---- src/Protocol/ActivityPub/Receiver.php | 15 ++++++++------- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index a409fec115..73f414d73c 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -265,12 +265,15 @@ class Processor /** * Prepares data for a message * - * @param array $activity Activity array + * @param array $activity Activity array + * @param bool $fetch_parents + * * @return array Internal item + * * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - public static function createItem(array $activity): array + public static function createItem(array $activity, bool $fetch_parents = true): array { $item = []; $item['verb'] = Activity::POST; @@ -305,7 +308,7 @@ class Processor return []; } - if (empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) { + if ($fetch_parents && empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) { $result = self::fetchParent($activity); if (!empty($result)) { if (($item['thr-parent'] != $result) && Post::exists(['uri' => $result])) { diff --git a/src/Protocol/ActivityPub/Queue.php b/src/Protocol/ActivityPub/Queue.php index 751ee93bda..98c3b4d497 100644 --- a/src/Protocol/ActivityPub/Queue.php +++ b/src/Protocol/ActivityPub/Queue.php @@ -175,10 +175,11 @@ class Queue * Process the activity with the given id * * @param integer $id + * @param bool $fetch_parents * * @return bool */ - public static function process(int $id): bool + public static function process(int $id, bool $fetch_parents = true): bool { $entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]); if (empty($entry)) { @@ -215,7 +216,7 @@ class Queue } DBA::close($receivers); - if (!Receiver::routeActivities($activity, $type, $push)) { + if (!Receiver::routeActivities($activity, $type, $push, $fetch_parents)) { self::remove($activity); } @@ -236,7 +237,7 @@ class Queue continue; } Logger::debug('Process leftover entry', $entry); - self::process($entry['id']); + self::process($entry['id'], false); } DBA::close($entries); } @@ -272,7 +273,7 @@ class Queue $entries = DBA::select('inbox-entry', ['id'], ["`in-reply-to-id` = ? AND `object-id` != ?", $uri, $uri]); while ($entry = DBA::fetch($entries)) { $count += 1; - self::process($entry['id']); + self::process($entry['id'], false); } DBA::close($entries); return $count; diff --git a/src/Protocol/ActivityPub/Receiver.php b/src/Protocol/ActivityPub/Receiver.php index b3d67b2af7..1f7946af22 100644 --- a/src/Protocol/ActivityPub/Receiver.php +++ b/src/Protocol/ActivityPub/Receiver.php @@ -627,20 +627,21 @@ class Receiver /** * Route activities * - * @param array $object_data - * @param string $type - * @param boolean $push + * @param array $object_data + * @param string $type + * @param bool $push + * @param bool $fetch_parents * * @return boolean Could the activity be routed? */ - public static function routeActivities(array $object_data, string $type, bool $push): bool + public static function routeActivities(array $object_data, string $type, bool $push, bool $fetch_parents = true): bool { $activity = $object_data['object_activity'] ?? []; switch ($type) { case 'as:Create': if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { - $item = ActivityPub\Processor::createItem($object_data); + $item = ActivityPub\Processor::createItem($object_data, $fetch_parents); ActivityPub\Processor::postItem($object_data, $item); } elseif (in_array($object_data['object_type'], ['pt:CacheFile'])) { // Unhandled Peertube activity @@ -652,7 +653,7 @@ class Receiver case 'as:Invite': if (in_array($object_data['object_type'], ['as:Event'])) { - $item = ActivityPub\Processor::createItem($object_data); + $item = ActivityPub\Processor::createItem($object_data, $fetch_parents); ActivityPub\Processor::postItem($object_data, $item); } else { return false; @@ -678,7 +679,7 @@ class Receiver $object_data['thread-completion'] = Contact::getIdForURL($actor); $object_data['completion-mode'] = self::COMPLETION_ANNOUCE; - $item = ActivityPub\Processor::createItem($object_data); + $item = ActivityPub\Processor::createItem($object_data, $fetch_parents); if (empty($item)) { return false; } From e82ef8890b60da42e47b8a4439309f7311e05141 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 3 Aug 2022 05:14:07 +0000 Subject: [PATCH 4/4] Only process entries for existing posts --- src/Protocol/ActivityPub/Processor.php | 2 +- src/Protocol/ActivityPub/Queue.php | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index 73f414d73c..d58628ff2a 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -1050,7 +1050,7 @@ class Processor Queue::remove($activity); - if ($success && Queue::hasChildren($item['uri'])) { + if ($success && Queue::hasChildren($item['uri']) && Post::exists(['uri' => $item['uri']])) { Queue::processReplyByUri($item['uri']); } diff --git a/src/Protocol/ActivityPub/Queue.php b/src/Protocol/ActivityPub/Queue.php index 98c3b4d497..2fa95897c6 100644 --- a/src/Protocol/ActivityPub/Queue.php +++ b/src/Protocol/ActivityPub/Queue.php @@ -25,6 +25,7 @@ use Friendica\Core\Logger; use Friendica\Database\Database; use Friendica\Database\DBA; use Friendica\DI; +use Friendica\Model\Post; use Friendica\Util\DateTimeFormat; use Friendica\Util\JsonLD; @@ -232,6 +233,10 @@ class Queue { $entries = DBA::select('inbox-entry', ['id', 'type', 'object-type', 'object-id', 'in-reply-to-id'], ["`trust` AND `wid` IS NULL"], ['order' => ['id' => true]]); while ($entry = DBA::fetch($entries)) { + // Don't process entries of items that are answer to non existing posts + if (!empty($entry['in-reply-to-id']) && !Post::exists(['uri' => $entry['in-reply-to-id']])) { + continue; + } // We don't need to process entries that depend on already existing entries. if (!empty($entry['in-reply-to-id']) && DBA::exists('inbox-entry', ["`id` != ? AND `object-id` = ?", $entry['id'], $entry['in-reply-to-id']])) { continue;