More prevention of double processing of the same content

This commit is contained in:
Michael 2022-08-06 17:06:55 +00:00
parent 8b698b183d
commit 87a945b295
8 changed files with 141 additions and 52 deletions

View File

@ -1,6 +1,6 @@
-- ------------------------------------------ -- ------------------------------------------
-- Friendica 2022.09-dev (Giant Rhubarb) -- Friendica 2022.09-dev (Giant Rhubarb)
-- DB_UPDATE_VERSION 1477 -- DB_UPDATE_VERSION 1478
-- ------------------------------------------ -- ------------------------------------------
@ -1716,6 +1716,24 @@ CREATE TABLE IF NOT EXISTS `user-contact` (
FOREIGN KEY (`uri-id`) REFERENCES `item-uri` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE FOREIGN KEY (`uri-id`) REFERENCES `item-uri` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE
) DEFAULT COLLATE utf8mb4_general_ci COMMENT='User specific public contact data'; ) DEFAULT COLLATE utf8mb4_general_ci COMMENT='User specific public contact data';
--
-- TABLE arrived-activity
--
CREATE TABLE IF NOT EXISTS `arrived-activity` (
`object-id` varbinary(255) NOT NULL COMMENT 'object id of the incoming activity',
`received` datetime COMMENT 'Receiving date',
PRIMARY KEY(`object-id`)
) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of arrived activities';
--
-- TABLE processed-activity
--
CREATE TABLE IF NOT EXISTS `processed-activity` (
`object-id` varbinary(255) NOT NULL COMMENT 'object id of the incoming activity',
`received` datetime COMMENT 'Receiving date',
PRIMARY KEY(`object-id`)
) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of processed activities';
-- --
-- TABLE worker-ipc -- TABLE worker-ipc
-- --

View File

@ -13,6 +13,7 @@ Database Tables
| [application](help/database/db_application) | OAuth application | | [application](help/database/db_application) | OAuth application |
| [application-marker](help/database/db_application-marker) | Timeline marker | | [application-marker](help/database/db_application-marker) | Timeline marker |
| [application-token](help/database/db_application-token) | OAuth user token | | [application-token](help/database/db_application-token) | OAuth user token |
| [arrived-activity](help/database/db_arrived-activity) | Id of arrived activities |
| [attach](help/database/db_attach) | file attachments | | [attach](help/database/db_attach) | file attachments |
| [cache](help/database/db_cache) | Stores temporary data | | [cache](help/database/db_cache) | Stores temporary data |
| [config](help/database/db_config) | main configuration storage | | [config](help/database/db_config) | main configuration storage |
@ -67,6 +68,7 @@ Database Tables
| [post-user](help/database/db_post-user) | User specific post data | | [post-user](help/database/db_post-user) | User specific post data |
| [post-user-notification](help/database/db_post-user-notification) | User post notifications | | [post-user-notification](help/database/db_post-user-notification) | User post notifications |
| [process](help/database/db_process) | Currently running system processes | | [process](help/database/db_process) | Currently running system processes |
| [processed-activity](help/database/db_processed-activity) | Id of processed activities |
| [profile](help/database/db_profile) | user profiles data | | [profile](help/database/db_profile) | user profiles data |
| [profile_field](help/database/db_profile_field) | Custom profile fields | | [profile_field](help/database/db_profile_field) | Custom profile fields |
| [push_subscriber](help/database/db_push_subscriber) | Used for OStatus: Contains feed subscribers | | [push_subscriber](help/database/db_push_subscriber) | Used for OStatus: Contains feed subscribers |

View File

@ -0,0 +1,22 @@
Table arrived-activity
===========
Id of arrived activities
Fields
------
| Field | Description | Type | Null | Key | Default | Extra |
| --------- | ---------------------------------- | -------------- | ---- | --- | ------- | ----- |
| object-id | object id of the incoming activity | varbinary(255) | NO | PRI | NULL | |
| received | Receiving date | datetime | YES | | NULL | |
Indexes
------------
| Name | Fields |
| ------- | --------- |
| PRIMARY | object-id |
Return to [database documentation](help/database)

View File

@ -0,0 +1,22 @@
Table processed-activity
===========
Id of processed activities
Fields
------
| Field | Description | Type | Null | Key | Default | Extra |
| --------- | ---------------------------------- | -------------- | ---- | --- | ------- | ----- |
| object-id | object id of the incoming activity | varbinary(255) | NO | PRI | NULL | |
| received | Receiving date | datetime | YES | | NULL | |
Indexes
------------
| Name | Fields |
| ------- | --------- |
| PRIMARY | object-id |
Return to [database documentation](help/database)

View File

@ -22,15 +22,10 @@
namespace Friendica\Module\Debug; namespace Friendica\Module\Debug;
use Friendica\BaseModule; use Friendica\BaseModule;
use Friendica\Content\Text;
use Friendica\Core\Logger;
use Friendica\Core\Renderer; use Friendica\Core\Renderer;
use Friendica\DI; use Friendica\DI;
use Friendica\Model\Item;
use Friendica\Model\Tag;
use Friendica\Protocol\ActivityPub; use Friendica\Protocol\ActivityPub;
use Friendica\Util\JsonLD; use Friendica\Util\JsonLD;
use Friendica\Util\XML;
class ActivityPubConversion extends BaseModule class ActivityPubConversion extends BaseModule
{ {
@ -123,7 +118,7 @@ class ActivityPubConversion extends BaseModule
'content' => visible_whitespace(var_export($object_data, true)) 'content' => visible_whitespace(var_export($object_data, true))
]; ];
$item = ActivityPub\Processor::createItem($object_data); $item = ActivityPub\Processor::createItem($object_data, true);
$results[] = [ $results[] = [
'title' => DI::l10n()->t('Result Item'), 'title' => DI::l10n()->t('Result Item'),

View File

@ -60,33 +60,29 @@ class Processor
const CACHEKEY_FETCH_ACTIVITY = 'processor:fetchMissingActivity:'; const CACHEKEY_FETCH_ACTIVITY = 'processor:fetchMissingActivity:';
const CACHEKEY_JUST_FETCHED = 'processor:isJustFetched:'; const CACHEKEY_JUST_FETCHED = 'processor:isJustFetched:';
static $processed = [];
/** /**
* Add an activity id to the list of processed ids * Add an object id to the list of processed ids
* *
* @param string $id * @param string $id
* *
* @return void * @return void
*/ */
public static function addActivityId(string $id) private static function addActivityId(string $id)
{ {
self::$processed[] = $id; DBA::delete('processed-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]);
if (count(self::$processed) > 100) { DBA::insert('processed-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]);
self::$processed = array_slice(self::$processed, 1);
}
} }
/** /**
* Checks if the given has just been processed * Checks if the given object id has just been processed
* *
* @param string $id * @param string $id
* *
* @return boolean * @return boolean
*/ */
public static function isProcessed(string $id): bool private static function isProcessed(string $id): bool
{ {
return in_array($id, self::$processed); return DBA::exists('processed-activity', ['object-id' => $id]);
} }
/** /**
@ -233,7 +229,7 @@ class Processor
$item = Post::selectFirst(['uri', 'uri-id', 'thr-parent', 'gravity', 'post-type'], ['uri' => $activity['id']]); $item = Post::selectFirst(['uri', 'uri-id', 'thr-parent', 'gravity', 'post-type'], ['uri' => $activity['id']]);
if (!DBA::isResult($item)) { if (!DBA::isResult($item)) {
Logger::warning('No existing item, item will be created', ['uri' => $activity['id']]); Logger::warning('No existing item, item will be created', ['uri' => $activity['id']]);
$item = self::createItem($activity); $item = self::createItem($activity, false);
if (empty($item)) { if (empty($item)) {
return; return;
} }
@ -303,7 +299,7 @@ class Processor
* @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \Friendica\Network\HTTPException\InternalServerErrorException
* @throws \ImagickException * @throws \ImagickException
*/ */
public static function createItem(array $activity, bool $fetch_parents = true): array public static function createItem(array $activity, bool $fetch_parents): array
{ {
if (self::isProcessed($activity['id'])) { if (self::isProcessed($activity['id'])) {
Logger::info('Id is already processed', ['id' => $activity['id']]); Logger::info('Id is already processed', ['id' => $activity['id']]);
@ -324,10 +320,10 @@ class Processor
$item['object-type'] = Activity\ObjectType::COMMENT; $item['object-type'] = Activity\ObjectType::COMMENT;
} }
if (!empty($activity['context'])) { if (!empty($activity['conversation'])) {
$item['conversation'] = $activity['context'];
} elseif (!empty($activity['conversation'])) {
$item['conversation'] = $activity['conversation']; $item['conversation'] = $activity['conversation'];
} elseif (!empty($activity['context'])) {
$item['conversation'] = $activity['context'];
} }
if (!empty($item['conversation'])) { if (!empty($item['conversation'])) {
@ -340,6 +336,7 @@ class Processor
$conversation = []; $conversation = [];
} }
Logger::debug('Create Item', ['id' => $activity['id'], 'conversation' => $item['conversation'] ?? '']);
if (empty($activity['author']) && empty($activity['actor'])) { if (empty($activity['author']) && empty($activity['actor'])) {
Logger::notice('Missing author and actor. We quit here.', ['activity' => $activity]); Logger::notice('Missing author and actor. We quit here.', ['activity' => $activity]);
return []; return [];
@ -490,11 +487,6 @@ class Processor
*/ */
private static function fetchParent(array $activity): string private static function fetchParent(array $activity): string
{ {
if (self::hasJustBeenFetched($activity['reply-to-id'])) {
Logger::notice('We just have tried to fetch this activity. We don\'t try it again.', ['parent' => $activity['reply-to-id']]);
return '';
}
$recursion_depth = $activity['recursion-depth'] ?? 0; $recursion_depth = $activity['recursion-depth'] ?? 0;
if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) { if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) {
@ -552,23 +544,6 @@ class Processor
return ''; return '';
} }
/**
* Check if a given activity has recently been fetched
*
* @param string $url
* @return boolean
*/
private static function hasJustBeenFetched(string $url): bool
{
$cachekey = self::CACHEKEY_JUST_FETCHED . $url;
$time = DI::cache()->get($cachekey);
if (is_null($time)) {
DI::cache()->set($cachekey, time(), Duration::FIVE_MINUTES);
return false;
}
return ($time + 300) > time();
}
/** /**
* Check if a given activity is no longer available * Check if a given activity is no longer available
* *
@ -656,7 +631,7 @@ class Processor
public static function createActivity(array $activity, string $verb) public static function createActivity(array $activity, string $verb)
{ {
$activity['reply-to-id'] = $activity['object_id']; $activity['reply-to-id'] = $activity['object_id'];
$item = self::createItem($activity); $item = self::createItem($activity, false);
if (empty($item)) { if (empty($item)) {
return; return;
} }
@ -1419,7 +1394,7 @@ class Processor
$ldactivity = JsonLD::compact($activity); $ldactivity = JsonLD::compact($activity);
$ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 1; $ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 0;
if (!empty($relay_actor)) { if (!empty($relay_actor)) {
$ldactivity['thread-completion'] = $ldactivity['from-relay'] = Contact::getIdForURL($relay_actor); $ldactivity['thread-completion'] = $ldactivity['from-relay'] = Contact::getIdForURL($relay_actor);

View File

@ -586,10 +586,18 @@ class Receiver
$object_data['object_activity'] = $activity; $object_data['object_activity'] = $activity;
} }
if (($type == 'as:Create') && Queue::exists($object_data['object_id'], $type)) { if (($type == 'as:Create') && $trust_source) {
if (self::hasArrived($object_data['object_id'])) {
Logger::info('The activity already arrived.', ['id' => $object_data['object_id']]);
return true;
}
self::addArrivedId($object_data['object_id']);
if (Queue::exists($object_data['object_id'], $type)) {
Logger::info('The activity is already added.', ['id' => $object_data['object_id']]); Logger::info('The activity is already added.', ['id' => $object_data['object_id']]);
return true; return true;
} }
}
if (DI::config()->get('system', 'decoupled_receiver') && ($trust_source || DI::config()->get('debug', 'ap_inbox_store_untrusted'))) { if (DI::config()->get('system', 'decoupled_receiver') && ($trust_source || DI::config()->get('debug', 'ap_inbox_store_untrusted'))) {
$object_data = Queue::add($object_data, $type, $uid, $http_signer, $push, $trust_source); $object_data = Queue::add($object_data, $type, $uid, $http_signer, $push, $trust_source);
@ -1883,4 +1891,29 @@ class Receiver
return $object_data; return $object_data;
} }
/**
* Add an object id to the list of arrived activities
*
* @param string $id
*
* @return void
*/
private static function addArrivedId(string $id)
{
DBA::delete('arrived-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]);
DBA::insert('arrived-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]);
}
/**
* Checks if the given object already arrived before
*
* @param string $id
*
* @return boolean
*/
private static function hasArrived(string $id): bool
{
return DBA::exists('arrived-activity', ['object-id' => $id]);
}
} }

View File

@ -55,7 +55,7 @@
use Friendica\Database\DBA; use Friendica\Database\DBA;
if (!defined('DB_UPDATE_VERSION')) { if (!defined('DB_UPDATE_VERSION')) {
define('DB_UPDATE_VERSION', 1477); define('DB_UPDATE_VERSION', 1478);
} }
return [ return [
@ -1723,6 +1723,28 @@ return [
"uri-id_uid" => ["UNIQUE", "uri-id", "uid"], "uri-id_uid" => ["UNIQUE", "uri-id", "uid"],
] ]
], ],
"arrived-activity" => [
"comment" => "Id of arrived activities",
"fields" => [
"object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of the incoming activity"],
"received" => ["type" => "datetime", "comment" => "Receiving date"],
],
"indexes" => [
"PRIMARY" => ["object-id"],
],
"engine" => "MEMORY",
],
"processed-activity" => [
"comment" => "Id of processed activities",
"fields" => [
"object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of the incoming activity"],
"received" => ["type" => "datetime", "comment" => "Receiving date"],
],
"indexes" => [
"PRIMARY" => ["object-id"],
],
"engine" => "MEMORY",
],
"worker-ipc" => [ "worker-ipc" => [
"comment" => "Inter process communication between the frontend and the worker", "comment" => "Inter process communication between the frontend and the worker",
"fields" => [ "fields" => [