Fetching of missing posts is reworked

This commit is contained in:
Michael 2022-07-21 05:16:14 +00:00
parent 7dcd02938d
commit 1d13574225
10 changed files with 275 additions and 264 deletions

View File

@ -3410,9 +3410,7 @@ class Item
return is_numeric($hookData['item_id']) ? $hookData['item_id'] : 0; return is_numeric($hookData['item_id']) ? $hookData['item_id'] : 0;
} }
$fetchQueue = new ActivityPub\FetchQueue(); $fetched_uri = ActivityPub\Processor::fetchMissingActivity($uri);
$fetched_uri = ActivityPub\Processor::fetchMissingActivity($fetchQueue, $uri);
$fetchQueue->process();
if ($fetched_uri) { if ($fetched_uri) {
$item_id = self::searchByLink($fetched_uri, $uid); $item_id = self::searchByLink($fetched_uri, $uid);

View File

@ -123,7 +123,7 @@ class ActivityPubConversion extends BaseModule
'content' => visible_whitespace(var_export($object_data, true)) 'content' => visible_whitespace(var_export($object_data, true))
]; ];
$item = ActivityPub\Processor::createItem(new ActivityPub\FetchQueue(), $object_data); $item = ActivityPub\Processor::createItem($object_data);
$results[] = [ $results[] = [
'title' => DI::l10n()->t('Result Item'), 'title' => DI::l10n()->t('Result Item'),

View File

@ -25,7 +25,6 @@ use Friendica\Core\Logger;
use Friendica\Core\Protocol; use Friendica\Core\Protocol;
use Friendica\Model\APContact; use Friendica\Model\APContact;
use Friendica\Model\User; use Friendica\Model\User;
use Friendica\Protocol\ActivityPub\FetchQueue;
use Friendica\Util\HTTPSignature; use Friendica\Util\HTTPSignature;
use Friendica\Util\JsonLD; use Friendica\Util\JsonLD;
@ -224,14 +223,10 @@ class ActivityPub
$items = []; $items = [];
} }
$fetchQueue = new FetchQueue();
foreach ($items as $activity) { foreach ($items as $activity) {
$ldactivity = JsonLD::compact($activity); $ldactivity = JsonLD::compact($activity);
ActivityPub\Receiver::processActivity($fetchQueue, $ldactivity, '', $uid, true); ActivityPub\Receiver::processActivity($ldactivity, '', $uid, true);
} }
$fetchQueue->process();
} }
/** /**

View File

@ -1,57 +0,0 @@
<?php
/**
* @copyright Copyright (C) 2010-2022, the Friendica project
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
namespace Friendica\Protocol\ActivityPub;
/**
* This class prevents maximum function nesting errors by flattening recursive calls to Processor::fetchMissingActivity
*/
class FetchQueue
{
/** @var FetchQueueItem[] */
protected $queue = [];
public function push(FetchQueueItem $item)
{
array_push($this->queue, $item);
}
/**
* Processes missing activities one by one. It is possible that a processing call will add additional missing
* activities, they will be processed in subsequent iterations of the loop.
*
* Since this process is self-contained, it isn't suitable to retrieve the URI of a single activity.
*
* The simplest way to get the URI of the first activity and ensures all the parents are fetched is this way:
*
* $fetchQueue = new ActivityPub\FetchQueue();
* $fetchedUri = ActivityPub\Processor::fetchMissingActivity($fetchQueue, $activityUri);
* $fetchQueue->process();
*/
public function process()
{
while (count($this->queue)) {
$fetchQueueItem = array_pop($this->queue);
call_user_func_array([Processor::class, 'fetchMissingActivity'], array_merge([$this], $fetchQueueItem->toParameters()));
}
}
}

View File

@ -1,62 +0,0 @@
<?php
/**
* @copyright Copyright (C) 2010-2022, the Friendica project
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
namespace Friendica\Protocol\ActivityPub;
class FetchQueueItem
{
/** @var string */
private $url;
/** @var array */
private $child;
/** @var string */
private $relay_actor;
/** @var int */
private $completion;
/**
* This constructor matches the signature of Processor::fetchMissingActivity except for the default $completion value
*
* @param string $url
* @param array $child
* @param string $relay_actor
* @param int $completion
*/
public function __construct(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_AUTO)
{
$this->url = $url;
$this->child = $child;
$this->relay_actor = $relay_actor;
$this->completion = $completion;
}
/**
* Array meant to be used in call_user_function_array([Processor::class, 'fetchMissingActivity']). Caller needs to
* provide an instance of a FetchQueue that isn't included in these parameters.
*
* @see FetchQueue::process()
* @return array
*/
public function toParameters(): array
{
return [$this->url, $this->child, $this->relay_actor, $this->completion];
}
}

View File

@ -189,17 +189,16 @@ class Processor
/** /**
* Updates a message * Updates a message
* *
* @param FetchQueue $fetchQueue
* @param array $activity Activity array * @param array $activity Activity array
* @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException * @throws \ImagickException
*/ */
public static function updateItem(FetchQueue $fetchQueue, array $activity) public static function updateItem(array $activity)
{ {
$item = Post::selectFirst(['uri', 'uri-id', 'thr-parent', 'gravity', 'post-type'], ['uri' => $activity['id']]); $item = Post::selectFirst(['uri', 'uri-id', 'thr-parent', 'gravity', 'post-type'], ['uri' => $activity['id']]);
if (!DBA::isResult($item)) { if (!DBA::isResult($item)) {
Logger::warning('No existing item, item will be created', ['uri' => $activity['id']]); Logger::warning('No existing item, item will be created', ['uri' => $activity['id']]);
$item = self::createItem($fetchQueue, $activity); $item = self::createItem($activity);
if (empty($item)) { if (empty($item)) {
return; return;
} }
@ -223,7 +222,7 @@ class Processor
Post\History::add($item['uri-id'], $item); Post\History::add($item['uri-id'], $item);
Item::update($item, ['uri' => $activity['id']]); Item::update($item, ['uri' => $activity['id']]);
Receiver::removeFromQueue($activity); Queue::remove($activity);
if ($activity['object_type'] == 'as:Event') { if ($activity['object_type'] == 'as:Event') {
$posts = Post::select(['event-id', 'uid'], ["`uri` = ? AND `event-id` > ?", $activity['id'], 0]); $posts = Post::select(['event-id', 'uid'], ["`uri` = ? AND `event-id` > ?", $activity['id'], 0]);
@ -262,13 +261,12 @@ class Processor
/** /**
* Prepares data for a message * Prepares data for a message
* *
* @param FetchQueue $fetchQueue
* @param array $activity Activity array * @param array $activity Activity array
* @return array Internal item * @return array Internal item
* @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException * @throws \ImagickException
*/ */
public static function createItem(FetchQueue $fetchQueue, array $activity): array public static function createItem(array $activity): array
{ {
$item = []; $item = [];
$item['verb'] = Activity::POST; $item['verb'] = Activity::POST;
@ -283,13 +281,15 @@ class Processor
} }
if (empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) { if (empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) {
Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id']]); $recursion_depth = $activity['recursion-depth'] ?? 0;
/** Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
* Instead of calling recursively self::fetchMissingActivity which can hit PHP's default function nesting if ($recursion_depth < 10) {
* limit of 256 recursive calls, we push the parent activity fetch parameters in this queue. The initial self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
* caller is responsible for processing the remaining queue once the original activity has been processed. } else {
*/ Logger::notice('Recursion level is too high, fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
$fetchQueue->push(new FetchQueueItem($activity['reply-to-id'], $activity)); Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
return [];
}
} }
$item['diaspora_signed_text'] = $activity['diaspora:comment'] ?? ''; $item['diaspora_signed_text'] = $activity['diaspora:comment'] ?? '';
@ -428,7 +428,7 @@ class Processor
Logger::info('Deleting item', ['object' => $activity['object_id'], 'owner' => $owner]); Logger::info('Deleting item', ['object' => $activity['object_id'], 'owner' => $owner]);
Item::markForDeletion(['uri' => $activity['object_id'], 'owner-id' => $owner]); Item::markForDeletion(['uri' => $activity['object_id'], 'owner-id' => $owner]);
Receiver::removeFromQueue($activity); Queue::remove($activity);
} }
/** /**
@ -464,15 +464,14 @@ class Processor
/** /**
* Prepare the item array for an activity * Prepare the item array for an activity
* *
* @param FetchQueue $fetchQueue
* @param array $activity Activity array * @param array $activity Activity array
* @param string $verb Activity verb * @param string $verb Activity verb
* @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException * @throws \ImagickException
*/ */
public static function createActivity(FetchQueue $fetchQueue, array $activity, string $verb) public static function createActivity(array $activity, string $verb)
{ {
$item = self::createItem($fetchQueue, $activity); $item = self::createItem($activity);
if (empty($item)) { if (empty($item)) {
return; return;
} }
@ -546,7 +545,7 @@ class Processor
Logger::debug('Add post to featured collection', ['uri-id' => $uriid]); Logger::debug('Add post to featured collection', ['uri-id' => $uriid]);
Post\Collection::add($uriid, Post\Collection::FEATURED); Post\Collection::add($uriid, Post\Collection::FEATURED);
Receiver::removeFromQueue($activity); Queue::remove($activity);
} }
/** /**
@ -564,7 +563,7 @@ class Processor
Logger::debug('Remove post from featured collection', ['uri-id' => $uriid]); Logger::debug('Remove post from featured collection', ['uri-id' => $uriid]);
Post\Collection::remove($uriid, Post\Collection::FEATURED); Post\Collection::remove($uriid, Post\Collection::FEATURED);
Receiver::removeFromQueue($activity); Queue::remove($activity);
} }
/** /**
@ -652,13 +651,12 @@ class Processor
$item['body'] = Item::improveSharedDataInBody($item); $item['body'] = Item::improveSharedDataInBody($item);
} else { } else {
if (empty($activity['directmessage']) && ($item['thr-parent'] != $item['uri']) && ($item['gravity'] == GRAVITY_COMMENT)) { if (empty($activity['directmessage']) && ($item['thr-parent'] != $item['uri']) && ($item['gravity'] == GRAVITY_COMMENT)) {
$item_private = !in_array(0, $activity['item_receiver']);
$parent = Post::selectFirst(['id', 'uri-id', 'private', 'author-link', 'alias'], ['uri' => $item['thr-parent']]); $parent = Post::selectFirst(['id', 'uri-id', 'private', 'author-link', 'alias'], ['uri' => $item['thr-parent']]);
if (!DBA::isResult($parent)) { if (!DBA::isResult($parent)) {
Logger::warning('Unknown parent item.', ['uri' => $item['thr-parent']]); Logger::warning('Unknown parent item.', ['uri' => $item['thr-parent']]);
return false; return false;
} }
if ($item_private && ($parent['private'] != Item::PRIVATE)) { if (($parent['private'] == Item::PRIVATE) && ($parent['private'] != Item::PRIVATE)) {
Logger::warning('Item is private but the parent is not. Dropping.', ['item-uri' => $item['uri'], 'thr-parent' => $item['thr-parent']]); Logger::warning('Item is private but the parent is not. Dropping.', ['item-uri' => $item['uri'], 'thr-parent' => $item['thr-parent']]);
return false; return false;
} }
@ -783,6 +781,7 @@ class Processor
} }
$stored = false; $stored = false;
$success = false;
ksort($activity['receiver']); ksort($activity['receiver']);
if (!self::isSolicitedMessage($activity, $item)) { if (!self::isSolicitedMessage($activity, $item)) {
@ -895,7 +894,7 @@ class Processor
$item_id = Item::insert($item); $item_id = Item::insert($item);
if ($item_id) { if ($item_id) {
Logger::info('Item insertion successful', ['user' => $item['uid'], 'item_id' => $item_id]); Logger::info('Item insertion successful', ['user' => $item['uid'], 'item_id' => $item_id]);
Receiver::removeFromQueue($activity); $success = true;
} else { } else {
Logger::notice('Item insertion aborted', ['user' => $item['uid']]); Logger::notice('Item insertion aborted', ['user' => $item['uid']]);
} }
@ -905,6 +904,11 @@ class Processor
} }
} }
if ($success) {
Queue::remove($activity);
Queue::processReplyByUri($item['uri']);
}
// Store send a follow request for every reshare - but only when the item had been stored // Store send a follow request for every reshare - but only when the item had been stored
if ($stored && ($item['private'] != Item::PRIVATE) && ($item['gravity'] == GRAVITY_PARENT) && !empty($item['author-link']) && ($item['author-link'] != $item['owner-link'])) { if ($stored && ($item['private'] != Item::PRIVATE) && ($item['gravity'] == GRAVITY_PARENT) && !empty($item['author-link']) && ($item['author-link'] != $item['owner-link'])) {
$author = APContact::getByURL($item['owner-link'], false); $author = APContact::getByURL($item['owner-link'], false);
@ -1121,7 +1125,6 @@ class Processor
/** /**
* Fetches missing posts * Fetches missing posts
* *
* @param FetchQueue $fetchQueue
* @param string $url message URL * @param string $url message URL
* @param array $child activity array with the child of this message * @param array $child activity array with the child of this message
* @param string $relay_actor Relay actor * @param string $relay_actor Relay actor
@ -1130,7 +1133,7 @@ class Processor
* @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException * @throws \ImagickException
*/ */
public static function fetchMissingActivity(FetchQueue $fetchQueue, string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL): string public static function fetchMissingActivity(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL): string
{ {
if (!empty($child['receiver'])) { if (!empty($child['receiver'])) {
$uid = ActivityPub\Receiver::getFirstUserFromReceivers($child['receiver']); $uid = ActivityPub\Receiver::getFirstUserFromReceivers($child['receiver']);
@ -1140,7 +1143,7 @@ class Processor
$object = ActivityPub::fetchContent($url, $uid); $object = ActivityPub::fetchContent($url, $uid);
if (empty($object)) { if (empty($object)) {
Logger::notice('Activity was not fetchable, aborting.', ['url' => $url]); Logger::notice('Activity was not fetchable, aborting.', ['url' => $url, 'uid' => $uid]);
return ''; return '';
} }
@ -1192,6 +1195,8 @@ class Processor
$ldactivity = JsonLD::compact($activity); $ldactivity = JsonLD::compact($activity);
$ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 1;
if (!empty($relay_actor)) { if (!empty($relay_actor)) {
$ldactivity['thread-completion'] = $ldactivity['from-relay'] = Contact::getIdForURL($relay_actor); $ldactivity['thread-completion'] = $ldactivity['from-relay'] = Contact::getIdForURL($relay_actor);
$ldactivity['completion-mode'] = Receiver::COMPLETION_RELAY; $ldactivity['completion-mode'] = Receiver::COMPLETION_RELAY;
@ -1211,7 +1216,7 @@ class Processor
return ''; return '';
} }
ActivityPub\Receiver::processActivity($fetchQueue, $ldactivity, json_encode($activity), $uid, true, false, $signer); ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), $uid, true, false, $signer);
Logger::notice('Activity had been fetched and processed.', ['url' => $url, 'object' => $activity['id']]); Logger::notice('Activity had been fetched and processed.', ['url' => $url, 'object' => $activity['id']]);
@ -1354,7 +1359,7 @@ class Processor
Logger::info('Updating profile', ['object' => $activity['object_id']]); Logger::info('Updating profile', ['object' => $activity['object_id']]);
Contact::updateFromProbeByURL($activity['object_id']); Contact::updateFromProbeByURL($activity['object_id']);
Receiver::removeFromQueue($activity); Queue::remove($activity);
} }
/** /**
@ -1383,7 +1388,7 @@ class Processor
DBA::close($contacts); DBA::close($contacts);
Logger::info('Deleted contact', ['object' => $activity['object_id']]); Logger::info('Deleted contact', ['object' => $activity['object_id']]);
Receiver::removeFromQueue($activity); Queue::remove($activity);
} }
/** /**
@ -1466,7 +1471,7 @@ class Processor
$condition = ['id' => $cid]; $condition = ['id' => $cid];
Contact::update($fields, $condition); Contact::update($fields, $condition);
Logger::info('Accept contact request', ['contact' => $cid, 'user' => $uid]); Logger::info('Accept contact request', ['contact' => $cid, 'user' => $uid]);
Receiver::removeFromQueue($activity); Queue::remove($activity);
} }
/** /**
@ -1500,7 +1505,7 @@ class Processor
} else { } else {
Logger::info('Rejected contact request', ['contact' => $cid, 'user' => $uid]); Logger::info('Rejected contact request', ['contact' => $cid, 'user' => $uid]);
} }
Receiver::removeFromQueue($activity); Queue::remove($activity);
} }
/** /**
@ -1526,7 +1531,7 @@ class Processor
} }
Item::markForDeletion(['uri' => $activity['object_id'], 'author-id' => $author_id, 'gravity' => GRAVITY_ACTIVITY]); Item::markForDeletion(['uri' => $activity['object_id'], 'author-id' => $author_id, 'gravity' => GRAVITY_ACTIVITY]);
Receiver::removeFromQueue($activity); Queue::remove($activity);
} }
/** /**
@ -1563,7 +1568,7 @@ class Processor
Contact::removeFollower($contact); Contact::removeFollower($contact);
Logger::info('Undo following request', ['contact' => $cid, 'user' => $uid]); Logger::info('Undo following request', ['contact' => $cid, 'user' => $uid]);
Receiver::removeFromQueue($activity); Queue::remove($activity);
} }
/** /**

View File

@ -0,0 +1,114 @@
<?php
/**
* @copyright Copyright (C) 2010-2022, the Friendica project
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
namespace Friendica\Protocol\ActivityPub;
use Friendica\Core\Logger;
use Friendica\Database\Database;
use Friendica\Database\DBA;
use Friendica\Util\DateTimeFormat;
/**
* This class handles the processing of incoming posts
*/
class Queue
{
public static function add(array $activity, string $type, int $uid, string $http_signer, bool $push): array
{
$fields = [
'activity-id' => $activity['id'],
'object-id' => $activity['object_id'],
'type' => $type,
'object-type' => $activity['object_type'],
'activity' => json_encode($activity, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT),
'received' => DateTimeFormat::utcNow(),
'push' => $push,
];
if (!empty($activity['reply-to-id'])) {
$fields['in-reply-to-id'] = $activity['reply-to-id'];
}
if (!empty($activity['object_object_type'])) {
$fields['object-object-type'] = $activity['object_object_type'];
}
if (!empty($http_signer)) {
$fields['signer'] = $http_signer;
}
DBA::insert('inbox-entry', $fields, Database::INSERT_IGNORE);
$queue = DBA::selectFirst('inbox-entry', ['id'], ['activity-id' => $activity['id']]);
if (!empty($queue['id'])) {
$activity['entry-id'] = $queue['id'];
DBA::insert('inbox-entry-receiver', ['queue-id' => $queue['id'], 'uid' => $uid], Database::INSERT_IGNORE);
}
return $activity;
}
public static function remove(array $activity = [])
{
if (empty($activity['entry-id'])) {
return;
}
DBA::delete('inbox-entry', ['id' => $activity['entry-id']]);
//echo "Delete ".$activity['entry-id']."\n";
}
public static function process(int $id)
{
$entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]);
if (empty($entry)) {
return;
}
Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id']]);
$activity = json_decode($entry['activity'], true);
$type = $entry['type'];
$push = $entry['push'];
$activity['entry-id'] = $entry['id'];
if (!Receiver::routeActivities($activity, $type, $push)) {
self::remove($activity);
}
}
public static function processAll()
{
$entries = DBA::select('inbox-entry', ['id', 'type', 'object-type'], [], ['order' => ['id' => true]]);
while ($entry = DBA::fetch($entries)) {
echo $entry['id'] . "\t" . $entry['type'] . "\t" . $entry['object-type'] . "\n";
self::process($entry['id']);
}
}
public static function processReplyByUri(string $uri)
{
$entries = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $uri], ['order' => ['id' => true]]);
while ($entry = DBA::fetch($entries)) {
self::process($entry['id']);
}
}
}

View File

@ -28,7 +28,6 @@ use Friendica\Content\Text\Markdown;
use Friendica\Core\Logger; use Friendica\Core\Logger;
use Friendica\Core\Protocol; use Friendica\Core\Protocol;
use Friendica\Core\System; use Friendica\Core\System;
use Friendica\Database\Database;
use Friendica\DI; use Friendica\DI;
use Friendica\Model\Contact; use Friendica\Model\Contact;
use Friendica\Model\APContact; use Friendica\Model\APContact;
@ -37,7 +36,6 @@ use Friendica\Model\Post;
use Friendica\Model\User; use Friendica\Model\User;
use Friendica\Protocol\Activity; use Friendica\Protocol\Activity;
use Friendica\Protocol\ActivityPub; use Friendica\Protocol\ActivityPub;
use Friendica\Util\DateTimeFormat;
use Friendica\Util\HTTPSignature; use Friendica\Util\HTTPSignature;
use Friendica\Util\JsonLD; use Friendica\Util\JsonLD;
use Friendica\Util\LDSignature; use Friendica\Util\LDSignature;
@ -155,46 +153,7 @@ class Receiver
$trust_source = false; $trust_source = false;
} }
$fetchQueue = new FetchQueue(); self::processActivity($ldactivity, $body, $uid, $trust_source, true, $signer, $http_signer);
self::processActivity($fetchQueue, $ldactivity, $body, $uid, $trust_source, true, $signer, $http_signer);
$fetchQueue->process();
}
private static function enqueuePost(array $ldactivity = [], string $type, int $uid, string $http_signer): array
{
$fields = [
'activity-id' => $ldactivity['id'],
'object-id' => $ldactivity['object_id'],
'type' => $type,
'object-type' => $ldactivity['object_type'],
'activity' => json_encode($ldactivity, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT),
'received' => DateTimeFormat::utcNow(),
];
if (!empty($ldactivity['object_object_type'])) {
$fields['object-object-type'] = $ldactivity['object_object_type'];
}
if (!empty($http_signer)) {
$fields['signer'] = $http_signer;
}
DBA::insert('inbox-entry', $fields, Database::INSERT_IGNORE);
$queue = DBA::selectFirst('inbox-entry', ['id'], ['activity-id' => $ldactivity['id']]);
if (!empty($queue['id'])) {
$ldactivity['entry-id'] = $queue['id'];
DBA::insert('inbox-entry-receiver', ['queue-id' => $queue['id'], 'uid' => $uid], Database::INSERT_IGNORE);
}
return $ldactivity;
}
public static function removeFromQueue(array $activity = [])
{
if (empty($activity['entry-id'])) {
return;
}
DBA::delete('inbox-entry', ['id' => $activity['entry-id']]);
} }
/** /**
@ -242,16 +201,12 @@ class Receiver
return; return;
} }
$fetchQueue = new FetchQueue(); $id = Processor::fetchMissingActivity($object_id, [], $actor, self::COMPLETION_RELAY);
$id = Processor::fetchMissingActivity($fetchQueue, $object_id, [], $actor, self::COMPLETION_RELAY);
if (empty($id)) { if (empty($id)) {
Logger::notice('Relayed message had not been fetched', ['id' => $object_id, 'actor' => $actor]); Logger::notice('Relayed message had not been fetched', ['id' => $object_id, 'actor' => $actor]);
return; return;
} }
$fetchQueue->process();
$item_id = Item::searchByLink($object_id); $item_id = Item::searchByLink($object_id);
if ($item_id) { if ($item_id) {
Logger::info('Relayed message had been fetched and stored', ['id' => $object_id, 'item' => $item_id, 'actor' => $actor]); Logger::info('Relayed message had been fetched and stored', ['id' => $object_id, 'item' => $item_id, 'actor' => $actor]);
@ -518,7 +473,6 @@ class Receiver
/** /**
* Processes the activity object * Processes the activity object
* *
* @param FetchQueue $fetchQueue
* @param array $activity Array with activity data * @param array $activity Array with activity data
* @param string $body The unprocessed body * @param string $body The unprocessed body
* @param int|null $uid User ID * @param int|null $uid User ID
@ -528,7 +482,7 @@ class Receiver
* @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException * @throws \ImagickException
*/ */
public static function processActivity(FetchQueue $fetchQueue, array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = [], string $http_signer = '') public static function processActivity(array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = [], string $http_signer = '')
{ {
$type = JsonLD::fetchElement($activity, '@type'); $type = JsonLD::fetchElement($activity, '@type');
if (!$type) { if (!$type) {
@ -597,36 +551,56 @@ class Receiver
$object_data['thread-children-type'] = $activity['thread-children-type']; $object_data['thread-children-type'] = $activity['thread-children-type'];
} }
if (!empty($activity['recursion-depth'])) {
$object_data['recursion-depth'] = $activity['recursion-depth'];
}
// Internal flag for posts that arrived via relay // Internal flag for posts that arrived via relay
if (!empty($activity['from-relay'])) { if (!empty($activity['from-relay'])) {
$object_data['from-relay'] = $activity['from-relay']; $object_data['from-relay'] = $activity['from-relay'];
} }
$object_data = self::enqueuePost($object_data, $type, $uid, $http_signer); if ($type == 'as:Announce') {
$object_data['object_activity'] = $activity;
}
$object_data = Queue::add($object_data, $type, $uid, $http_signer, $push);
if (in_array('as:Question', [$object_data['object_type'] ?? '', $object_data['object_object_type'] ?? ''])) { if (in_array('as:Question', [$object_data['object_type'] ?? '', $object_data['object_object_type'] ?? ''])) {
self::storeUnhandledActivity(false, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); self::storeUnhandledActivity(false, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
} }
if (!self::routeActivities($object_data, $type, $push)) {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
//if (!DI::config()->get('debug', 'ap_log_unknown')) {
// Queue::remove($object_data);
//}
}
}
public static function routeActivities($object_data, $type, $push)
{
$activity = $object_data['object_activity'] ?? [];
switch ($type) { switch ($type) {
case 'as:Create': case 'as:Create':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
$item = ActivityPub\Processor::createItem($fetchQueue, $object_data); $item = ActivityPub\Processor::createItem($object_data);
ActivityPub\Processor::postItem($object_data, $item); ActivityPub\Processor::postItem($object_data, $item);
} elseif (in_array($object_data['object_type'], ['pt:CacheFile'])) { } elseif (in_array($object_data['object_type'], ['pt:CacheFile'])) {
// Unhandled Peertube activity // Unhandled Peertube activity
self::removeFromQueue($object_data); Queue::remove($object_data);
} else { } else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
} }
break; break;
case 'as:Invite': case 'as:Invite':
if (in_array($object_data['object_type'], ['as:Event'])) { if (in_array($object_data['object_type'], ['as:Event'])) {
$item = ActivityPub\Processor::createItem($fetchQueue, $object_data); $item = ActivityPub\Processor::createItem($object_data);
ActivityPub\Processor::postItem($object_data, $item); ActivityPub\Processor::postItem($object_data, $item);
} else { } else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
} }
break; break;
@ -637,17 +611,19 @@ class Receiver
ActivityPub\Processor::addToFeaturedCollection($object_data); ActivityPub\Processor::addToFeaturedCollection($object_data);
} elseif ($object_data['object_type'] == '') { } elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity. // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
Queue::remove($object_data);
} else { } else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
} }
break; break;
case 'as:Announce': case 'as:Announce':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
$actor = JsonLD::fetchElement($activity, 'as:actor', '@id');
$object_data['thread-completion'] = Contact::getIdForURL($actor); $object_data['thread-completion'] = Contact::getIdForURL($actor);
$object_data['completion-mode'] = self::COMPLETION_ANNOUCE; $object_data['completion-mode'] = self::COMPLETION_ANNOUCE;
$item = ActivityPub\Processor::createItem($fetchQueue, $object_data); $item = ActivityPub\Processor::createItem($object_data);
if (empty($item)) { if (empty($item)) {
return; return;
} }
@ -655,61 +631,64 @@ class Receiver
$item['post-reason'] = Item::PR_ANNOUNCEMENT; $item['post-reason'] = Item::PR_ANNOUNCEMENT;
ActivityPub\Processor::postItem($object_data, $item); ActivityPub\Processor::postItem($object_data, $item);
if (!empty($activity)) {
$announce_object_data = self::processObject($activity); $announce_object_data = self::processObject($activity);
$announce_object_data['name'] = $type; $announce_object_data['name'] = $type;
$announce_object_data['author'] = JsonLD::fetchElement($activity, 'as:actor', '@id'); $announce_object_data['author'] = $actor;
$announce_object_data['object_id'] = $object_data['object_id']; $announce_object_data['object_id'] = $object_data['object_id'];
$announce_object_data['object_type'] = $object_data['object_type']; $announce_object_data['object_type'] = $object_data['object_type'];
$announce_object_data['push'] = $push; $announce_object_data['push'] = $push;
if (!empty($body)) { if (!empty($object_data['raw'])) {
$announce_object_data['raw'] = $body; $announce_object_data['raw'] = $object_data['raw'];
} }
ActivityPub\Processor::createActivity($announce_object_data, Activity::ANNOUNCE);
ActivityPub\Processor::createActivity($fetchQueue, $announce_object_data, Activity::ANNOUNCE); } else echo "\n***************************\n";
} else { } else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
} }
break; break;
case 'as:Like': case 'as:Like':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::LIKE); ActivityPub\Processor::createActivity($object_data, Activity::LIKE);
} elseif ($object_data['object_type'] == '') { } elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity. // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
Queue::remove($object_data);
} else { } else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
} }
break; break;
case 'as:Dislike': case 'as:Dislike':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::DISLIKE); ActivityPub\Processor::createActivity($object_data, Activity::DISLIKE);
} elseif ($object_data['object_type'] == '') { } elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity. // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
Queue::remove($object_data);
} else { } else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
} }
break; break;
case 'as:TentativeAccept': case 'as:TentativeAccept':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTENDMAYBE); ActivityPub\Processor::createActivity($object_data, Activity::ATTENDMAYBE);
} else { } else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
} }
break; break;
case 'as:Update': case 'as:Update':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
ActivityPub\Processor::updateItem($fetchQueue, $object_data); ActivityPub\Processor::updateItem($object_data);
} elseif (in_array($object_data['object_type'], self::ACCOUNT_TYPES)) { } elseif (in_array($object_data['object_type'], self::ACCOUNT_TYPES)) {
ActivityPub\Processor::updatePerson($object_data); ActivityPub\Processor::updatePerson($object_data);
} elseif (in_array($object_data['object_type'], ['pt:CacheFile'])) { } elseif (in_array($object_data['object_type'], ['pt:CacheFile'])) {
// Unhandled Peertube activity // Unhandled Peertube activity
self::removeFromQueue($object_data); Queue::remove($object_data);
} else { } else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
} }
break; break;
@ -720,8 +699,9 @@ class Receiver
ActivityPub\Processor::deletePerson($object_data); ActivityPub\Processor::deletePerson($object_data);
} elseif ($object_data['object_type'] == '') { } elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. Most likely we don't have it here. We ignore this activity. // The object type couldn't be determined. Most likely we don't have it here. We ignore this activity.
Queue::remove($object_data);
} else { } else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
} }
break; break;
@ -729,7 +709,7 @@ class Receiver
if (in_array($object_data['object_type'], self::ACCOUNT_TYPES)) { if (in_array($object_data['object_type'], self::ACCOUNT_TYPES)) {
ActivityPub\Processor::blockAccount($object_data); ActivityPub\Processor::blockAccount($object_data);
} else { } else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
} }
break; break;
@ -738,8 +718,9 @@ class Receiver
ActivityPub\Processor::removeFromFeaturedCollection($object_data); ActivityPub\Processor::removeFromFeaturedCollection($object_data);
} elseif ($object_data['object_type'] == '') { } elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity. // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
Queue::remove($object_data);
} else { } else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
} }
break; break;
@ -748,9 +729,9 @@ class Receiver
ActivityPub\Processor::followUser($object_data); ActivityPub\Processor::followUser($object_data);
} elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) { } elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
$object_data['reply-to-id'] = $object_data['object_id']; $object_data['reply-to-id'] = $object_data['object_id'];
ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::FOLLOW); ActivityPub\Processor::createActivity($object_data, Activity::FOLLOW);
} else { } else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
} }
break; break;
@ -758,9 +739,9 @@ class Receiver
if ($object_data['object_type'] == 'as:Follow') { if ($object_data['object_type'] == 'as:Follow') {
ActivityPub\Processor::acceptFollowUser($object_data); ActivityPub\Processor::acceptFollowUser($object_data);
} elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) { } elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTEND); ActivityPub\Processor::createActivity($object_data, Activity::ATTEND);
} else { } else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
} }
break; break;
@ -768,9 +749,9 @@ class Receiver
if ($object_data['object_type'] == 'as:Follow') { if ($object_data['object_type'] == 'as:Follow') {
ActivityPub\Processor::rejectFollowUser($object_data); ActivityPub\Processor::rejectFollowUser($object_data);
} elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) { } elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTENDNO); ActivityPub\Processor::createActivity($object_data, Activity::ATTENDNO);
} else { } else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
} }
break; break;
@ -793,42 +774,42 @@ class Receiver
} elseif (in_array($object_data['object_type'], array_merge(self::ACTIVITY_TYPES, ['as:Announce', 'as:Create', ''])) && } elseif (in_array($object_data['object_type'], array_merge(self::ACTIVITY_TYPES, ['as:Announce', 'as:Create', ''])) &&
empty($object_data['object_object_type'])) { empty($object_data['object_object_type'])) {
// We cannot detect the target object. So we can ignore it. // We cannot detect the target object. So we can ignore it.
self::removeFromQueue($object_data);
} elseif (in_array($object_data['object_type'], ['as:Create']) && } elseif (in_array($object_data['object_type'], ['as:Create']) &&
in_array($object_data['object_object_type'], ['pt:CacheFile'])) { in_array($object_data['object_object_type'], ['pt:CacheFile'])) {
// Unhandled Peertube activity // Unhandled Peertube activity
self::removeFromQueue($object_data); Queue::remove($object_data);
} else { } else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
} }
break; break;
case 'as:View': case 'as:View':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::VIEW); ActivityPub\Processor::createActivity($object_data, Activity::VIEW);
} elseif ($object_data['object_type'] == '') { } elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. Most likely we don't have it here. We ignore this activity. // The object type couldn't be determined. Most likely we don't have it here. We ignore this activity.
self::removeFromQueue($object_data); Queue::remove($object_data);
} else { } else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
} }
break; break;
case 'litepub:EmojiReact': case 'litepub:EmojiReact':
if (in_array($object_data['object_type'], self::CONTENT_TYPES)) { if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::EMOJIREACT); ActivityPub\Processor::createActivity($object_data, Activity::EMOJIREACT);
} elseif ($object_data['object_type'] == '') { } elseif ($object_data['object_type'] == '') {
// The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity. // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
Queue::remove($object_data);
} else { } else {
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
} }
break; break;
default: default:
Logger::info('Unknown activity: ' . $type . ' ' . $object_data['object_type']); Logger::info('Unknown activity: ' . $type . ' ' . $object_data['object_type']);
self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); return false;
break;
} }
return true;
} }
/** /**
@ -847,11 +828,6 @@ class Receiver
*/ */
private static function storeUnhandledActivity(bool $unknown, string $type, array $object_data, array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = []) private static function storeUnhandledActivity(bool $unknown, string $type, array $object_data, array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = [])
{ {
if (!DI::config()->get('debug', 'ap_log_unknown')) {
self::removeFromQueue($activity);
return;
}
$file = ($unknown ? 'unknown-' : 'unhandled-') . str_replace(':', '-', $type) . '-'; $file = ($unknown ? 'unknown-' : 'unhandled-') . str_replace(':', '-', $type) . '-';
if (!empty($object_data['object_type'])) { if (!empty($object_data['object_type'])) {

View File

@ -0,0 +1,40 @@
<?php
/**
* @copyright Copyright (C) 2010-2022, the Friendica project
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
namespace Friendica\Worker;
use Friendica\Core\Logger;
use Friendica\Protocol\ActivityPub;
use Friendica\Protocol\ActivityPub\Receiver;
class FetchMissingActivity
{
/**
* Fetch missing activities
* @param string $url Contact URL
*/
public static function execute(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL)
{
Logger::info('Start fetching missing activity', ['url' => $url]);
$result = ActivityPub\Processor::fetchMissingActivity($url, $child, $relay_actor, $completion);
Logger::info('Finished fetching missing activity', ['url' => $url, 'result' => $result]);
}
}

View File

@ -790,12 +790,14 @@ return [
"id" => ["type" => "int unsigned", "not null" => "1", "extra" => "auto_increment", "primary" => "1", "comment" => "sequential ID"], "id" => ["type" => "int unsigned", "not null" => "1", "extra" => "auto_increment", "primary" => "1", "comment" => "sequential ID"],
"activity-id" => ["type" => "varbinary(255)", "comment" => "id of the incoming activity"], "activity-id" => ["type" => "varbinary(255)", "comment" => "id of the incoming activity"],
"object-id" => ["type" => "varbinary(255)", "comment" => ""], "object-id" => ["type" => "varbinary(255)", "comment" => ""],
"in-reply-to-id" => ["type" => "varbinary(255)", "comment" => ""],
"type" => ["type" => "varchar(64)", "comment" => "Type of the activity"], "type" => ["type" => "varchar(64)", "comment" => "Type of the activity"],
"object-type" => ["type" => "varchar(64)", "comment" => "Type of the object activity"], "object-type" => ["type" => "varchar(64)", "comment" => "Type of the object activity"],
"object-object-type" => ["type" => "varchar(64)", "comment" => "Type of the object's object activity"], "object-object-type" => ["type" => "varchar(64)", "comment" => "Type of the object's object activity"],
"received" => ["type" => "datetime", "comment" => "Receiving date"], "received" => ["type" => "datetime", "comment" => "Receiving date"],
"activity" => ["type" => "mediumtext", "comment" => "The JSON activity"], "activity" => ["type" => "mediumtext", "comment" => "The JSON activity"],
"signer" => ["type" => "varchar(255)", "comment" => ""], "signer" => ["type" => "varchar(255)", "comment" => ""],
"push" => ["type" => "boolean", "not null" => "1", "default" => "0", "comment" => ""],
], ],
"indexes" => [ "indexes" => [
"PRIMARY" => ["id"], "PRIMARY" => ["id"],