diff --git a/database.sql b/database.sql index 1d632eb3fb..d13a3330c2 100644 --- a/database.sql +++ b/database.sql @@ -1,6 +1,6 @@ -- ------------------------------------------ -- Friendica 2022.05-rc (Siberian Iris) --- DB_UPDATE_VERSION 1461 +-- DB_UPDATE_VERSION 1462 -- ------------------------------------------ @@ -1126,6 +1126,8 @@ CREATE TABLE IF NOT EXISTS `post-delivery` ( `uid` mediumint unsigned COMMENT 'Delivering user', `created` datetime DEFAULT '0001-01-01 00:00:00' COMMENT '', `command` varbinary(32) COMMENT '', + `failed` tinyint DEFAULT 0 COMMENT 'Number of times the delivery has failed', + `receivers` mediumtext COMMENT 'JSON encoded array with the receiving contacts', PRIMARY KEY(`uri-id`,`inbox-id`), INDEX `inbox-id_created` (`inbox-id`,`created`), INDEX `uid` (`uid`), diff --git a/doc/database/db_post-delivery.md b/doc/database/db_post-delivery.md index 79e858193a..2a766eea56 100644 --- a/doc/database/db_post-delivery.md +++ b/doc/database/db_post-delivery.md @@ -6,13 +6,15 @@ Delivery data for posts for the batch processing Fields ------ -| Field | Description | Type | Null | Key | Default | Extra | -| -------- | --------------------------------------------------------- | ------------------ | ---- | --- | ------------------- | ----- | -| uri-id | Id of the item-uri table entry that contains the item uri | int unsigned | NO | PRI | NULL | | -| inbox-id | Item-uri id of inbox url | int unsigned | NO | PRI | NULL | | -| uid | Delivering user | mediumint unsigned | YES | | NULL | | -| created | | datetime | YES | | 0001-01-01 00:00:00 | | -| command | | varbinary(32) | YES | | NULL | | +| Field | Description | Type | Null | Key | Default | Extra | +| --------- | --------------------------------------------------------- | ------------------ | ---- | --- | ------------------- | ----- | +| uri-id | Id of the item-uri table entry that contains the item uri | int unsigned | NO | PRI | NULL | | +| inbox-id | Item-uri id of inbox url | int unsigned | NO | PRI | NULL | | +| uid | Delivering user | mediumint unsigned | YES | | NULL | | +| created | | datetime | YES | | 0001-01-01 00:00:00 | | +| command | | varbinary(32) | YES | | NULL | | +| failed | Number of times the delivery has failed | tinyint | YES | | 0 | | +| receivers | JSON encoded array with the receiving contacts | mediumtext | YES | | NULL | | Indexes ------------ diff --git a/src/Model/Post/Delivery.php b/src/Model/Post/Delivery.php index 8cd01c3a85..f8ed068fe8 100644 --- a/src/Model/Post/Delivery.php +++ b/src/Model/Post/Delivery.php @@ -24,6 +24,7 @@ namespace Friendica\Model\Post; use Friendica\Database\DBA; use BadMethodCallException; use Friendica\Database\Database; +use Friendica\DI; use Friendica\Model\ItemURI; class Delivery @@ -34,14 +35,16 @@ class Delivery * @param integer $uri_id * @param string $inbox * @param string $created + * @param array %receivers */ - public static function add(int $uri_id, int $uid, string $inbox, string $created, string $command) + public static function add(int $uri_id, int $uid, string $inbox, string $created, string $command, array $receivers) { if (empty($uri_id)) { throw new BadMethodCallException('Empty URI_id'); } - $fields = ['uri-id' => $uri_id, 'uid' => $uid, 'inbox-id' => ItemURI::getIdByURI($inbox), 'created' => $created, 'command' => $command]; + $fields = ['uri-id' => $uri_id, 'uid' => $uid, 'inbox-id' => ItemURI::getIdByURI($inbox), + 'created' => $created, 'command' => $command, 'receivers' => json_encode($receivers)]; DBA::insert('post-delivery', $fields, Database::INSERT_IGNORE); } @@ -57,8 +60,41 @@ class Delivery DBA::delete('post-delivery', ['uri-id' => $uri_id, 'inbox-id' => ItemURI::getIdByURI($inbox)]); } + /** + * Remove failed posts for an inbox + * + * @param string $inbox + */ + public static function removeFailed(string $inbox) + { + DBA::delete('post-delivery', ["`inbox-id` = ? AND `failed` >= ?", ItemURI::getIdByURI($inbox), DI::config()->get('system', 'worker_defer_limit')]); + } + + /** + * Increment "failed" counter for the given inbox and post + * + * @param integer $uri_id + * @param string $inbox + */ + public static function incrementFailed(int $uri_id, string $inbox) + { + return DBA::e('UPDATE `post-delivery` SET `failed` = `failed` + 1 WHERE `uri-id` = ? AND `inbox-id` = ?', $uri_id, ItemURI::getIdByURI($inbox)); + } + public static function selectForInbox(string $inbox) { - return DBA::selectToArray('post-delivery', [], ['inbox-id' => ItemURI::getIdByURI($inbox)], ['order' => ['created']]); + $rows = DBA::select('post-delivery', [], ["`inbox-id` = ? AND `failed` < ?", ItemURI::getIdByURI($inbox), DI::config()->get('system', 'worker_defer_limit')], ['order' => ['created']]); + $deliveries = []; + while ($row = DBA::fetch($rows)) { + if (!empty($row['receivers'])) { + $row['receivers'] = json_decode($row['receivers'], true); + } else { + $row['receivers'] = []; + } + $deliveries[] = $row; + } + DBA::close($rows); + + return $deliveries; } } diff --git a/src/Worker/APDelivery.php b/src/Worker/APDelivery.php index 37b47dd46d..df9df8128a 100644 --- a/src/Worker/APDelivery.php +++ b/src/Worker/APDelivery.php @@ -25,7 +25,6 @@ use Friendica\Core\Logger; use Friendica\Core\Worker; use Friendica\Model\Contact; use Friendica\Model\GServer; -use Friendica\Model\Item; use Friendica\Model\Post; use Friendica\Protocol\ActivityPub; use Friendica\Util\HTTPSignature; @@ -47,10 +46,21 @@ class APDelivery public static function execute(string $cmd, int $item_id, string $inbox, int $uid, array $receivers = [], int $uri_id = 0) { if (ActivityPub\Transmitter::archivedInbox($inbox)) { - Logger::info('Inbox is archived', ['cmd' => $cmd, 'inbox' => $inbox, 'id' => $item_id, 'uid' => $uid]); - if (in_array($cmd, [Delivery::POST])) { + Logger::info('Inbox is archived', ['cmd' => $cmd, 'inbox' => $inbox, 'id' => $item_id, 'uri-id' => $uri_id, 'uid' => $uid]); + if (empty($uri_id) && !empty($item_id)) { $item = Post::selectFirst(['uri-id'], ['id' => $item_id]); - Post\DeliveryData::incrementQueueFailed($item['uri-id'] ?? 0); + $uri_id = $item['uri-id'] ?? 0; + } + if (empty($uri_id)) { + $posts = Post\Delivery::selectForInbox($inbox); + $uri_ids = array_column($posts, 'uri-id'); + } else { + $uri_ids = [$uri_id]; + } + + foreach ($uri_ids as $uri_id) { + Post\Delivery::remove($uri_id, $inbox); + Post\DeliveryData::incrementQueueFailed($uri_id); } return; } @@ -61,29 +71,15 @@ class APDelivery $result = self::deliver($inbox); $success = $result['success']; $uri_ids = $result['uri_ids']; - } - - if (empty($uri_ids)) { + } else { $success = self::deliverToInbox($cmd, $item_id, $inbox, $uid, $receivers, $uri_id); + $uri_ids = [$uri_id]; } - if (!$success && !Worker::defer() && in_array($cmd, [Delivery::POST])) { - if (!empty($uri_id)) { + if (!$success && !Worker::defer() && !empty($uri_ids)) { + foreach ($uri_ids as $uri_id) { Post\Delivery::remove($uri_id, $inbox); Post\DeliveryData::incrementQueueFailed($uri_id); - } elseif (!empty($uri_ids)) { - foreach ($uri_ids as $uri_id) { - Post\Delivery::remove($uri_id, $inbox); - Post\DeliveryData::incrementQueueFailed($uri_id); - } - } - } elseif ($success && in_array($cmd, [Delivery::POST])) { - if (!empty($uri_id)) { - Post\DeliveryData::incrementQueueDone($uri_id, Post\DeliveryData::ACTIVITYPUB); - } elseif (!empty($uri_ids)) { - foreach ($uri_ids as $uri_id) { - Post\DeliveryData::incrementQueueDone($uri_id, Post\DeliveryData::ACTIVITYPUB); - } } } } @@ -91,35 +87,28 @@ class APDelivery private static function deliver(string $inbox) { $uri_ids = []; - $success = true; + $posts = Post\Delivery::selectForInbox($inbox); - $posts = Post\Delivery::selectForInbox($inbox); foreach ($posts as $post) { - $uri_ids[] = $post['uri-id']; - if ($success) { - $success = self::deliverToInbox($post['command'], 0, $inbox, $post['uid'], [], $post['uri-id']); + if (!self::deliverToInbox($post['command'], 0, $inbox, $post['uid'], $post['receivers'], $post['uri-id'])) { + $uri_ids[] = $post['uri-id']; } } - return ['success' => $success, 'uri_ids' => $uri_ids]; + Logger::debug('Inbox delivery done', ['inbox' => $inbox, 'posts' => count($posts), 'failed' => count($uri_ids)]); + return ['success' => empty($uri_ids), 'uri_ids' => $uri_ids]; } private static function deliverToInbox(string $cmd, int $item_id, string $inbox, int $uid, array $receivers, int $uri_id) { if (empty($item_id) && !empty($uri_id) && !empty($uid)) { - $item = Post::selectFirst(['id', 'parent', 'origin'], ['uri-id' => $uri_id, 'uid' => $uid]); - $item_id = $item['id'] ?? 0; - if (empty($receivers) && !empty($item)) { - $parent = Post::selectFirst(Item::DELIVER_FIELDLIST, ['id' => $item['parent']]); - - $inboxes = ActivityPub\Transmitter::fetchTargetInboxes($parent, $uid); - $receivers = $inboxes[$inbox] ?? []; - - // When we haven't fetched the receiver list, it can be a personal inbox - if (empty($receivers)) { - $inboxes = ActivityPub\Transmitter::fetchTargetInboxes($parent, $uid, true); - $receivers = $inboxes[$inbox] ?? []; - } + $item = Post::selectFirst(['id', 'parent', 'origin'], ['uri-id' => $uri_id, 'uid' => [$uid, 0]], ['order' => ['uid' => true]]); + if (empty($item['id'])) { + Logger::debug('Item not found, removing delivery', ['uri-id' => $uri_id, 'uid' => $uid, 'cmd' => $cmd, 'inbox' => $inbox]); + Post\Delivery::remove($uri_id, $inbox); + return true; + } else { + $item_id = $item['id']; } } @@ -144,15 +133,23 @@ class APDelivery $data = ActivityPub\Transmitter::createCachedActivityFromItem($item_id); if (!empty($data)) { $success = HTTPSignature::transmit($data, $inbox, $uid); - if ($success && $uri_id) { - Post\Delivery::remove($uri_id, $inbox); + if ($uri_id) { + if ($success) { + Post\Delivery::remove($uri_id, $inbox); + } else { + Post\Delivery::incrementFailed($uri_id, $inbox); + } } } } self::setSuccess($receivers, $success); - Logger::info('Delivered', ['cmd' => $cmd, 'inbox' => $inbox, 'id' => $item_id, 'uri-id' => $uri_id, 'uid' => $uid, 'success' => $success]); + Logger::info('Delivered', ['uri-id' => $uri_id, 'uid' => $uid, 'item_id' => $item_id, 'cmd' => $cmd, 'inbox' => $inbox, 'success' => $success]); + + if ($success && in_array($cmd, [Delivery::POST])) { + Post\DeliveryData::incrementQueueDone($uri_id, Post\DeliveryData::ACTIVITYPUB); + } return $success; } diff --git a/src/Worker/Cron.php b/src/Worker/Cron.php index 01c54fed18..e39ba24aae 100644 --- a/src/Worker/Cron.php +++ b/src/Worker/Cron.php @@ -95,6 +95,9 @@ class Cron // Clear cache entries Worker::add(PRIORITY_LOW, 'ClearCache'); + // Requeue posts from the post delivery entries + Worker::add(PRIORITY_MEDIUM, 'RequeuePosts'); + DI::config()->set('system', 'last_cron_hourly', time()); } diff --git a/src/Worker/Notifier.php b/src/Worker/Notifier.php index 48b81b9a99..e248c944dd 100644 --- a/src/Worker/Notifier.php +++ b/src/Worker/Notifier.php @@ -788,8 +788,8 @@ class Notifier if (DI::config()->get('system', 'bulk_delivery')) { $delivery_queue_count++; - Post\Delivery::add($target_item['uri-id'], $uid, $inbox, $target_item['created'], $cmd); - Worker::add(['priority' => $priority, 'dont_fork' => true], 'APDelivery', $cmd, 0, $inbox, $uid); + Post\Delivery::add($target_item['uri-id'], $uid, $inbox, $target_item['created'], $cmd, $receivers); + Worker::add(PRIORITY_HIGH, 'APDelivery', '', 0, $inbox, 0); } else { if (Worker::add(['priority' => $priority, 'created' => $created, 'dont_fork' => true], 'APDelivery', $cmd, $target_item['id'], $inbox, $uid, $receivers, $target_item['uri-id'])) { @@ -804,8 +804,8 @@ class Notifier if (DI::config()->get('system', 'bulk_delivery')) { $delivery_queue_count++; - Post\Delivery::add($target_item['uri-id'], $uid, $inbox, $target_item['created'], $cmd); - Worker::add(['priority' => $priority, 'dont_fork' => true], 'APDelivery', $cmd, 0, $inbox, $uid); + Post\Delivery::add($target_item['uri-id'], $uid, $inbox, $target_item['created'], $cmd, []); + Worker::add(PRIORITY_MEDIUM, 'APDelivery', '', 0, $inbox, 0); } else { if (Worker::add(['priority' => $priority, 'dont_fork' => true], 'APDelivery', $cmd, $target_item['id'], $inbox, $uid, [], $target_item['uri-id'])) { $delivery_queue_count++; diff --git a/src/Worker/RequeuePosts.php b/src/Worker/RequeuePosts.php new file mode 100644 index 0000000000..568595aafd --- /dev/null +++ b/src/Worker/RequeuePosts.php @@ -0,0 +1,47 @@ +. + * + */ + +namespace Friendica\Worker; + +use Friendica\Core\Logger; +use Friendica\Core\Worker; +use Friendica\Database\DBA; +use Friendica\Model\Post; + +/** + * Requeue posts that are stuck in the post-delivery table without a matching delivery job. + * This should not happen in regular situations, this is a precaution. + */ +class RequeuePosts +{ + public static function execute() + { + $deliveries = DBA::p("SELECT `item-uri`.`uri` AS `inbox` FROM `post-delivery` INNER JOIN `item-uri` ON `item-uri`.`id` = `post-delivery`.`inbox-id` GROUP BY `inbox`"); + while ($delivery = DBA::fetch($deliveries)) { + Post\Delivery::removeFailed($delivery['inbox']); + + if (Worker::add(PRIORITY_HIGH, 'APDelivery', '', 0, $delivery['inbox'], 0)) { + Logger::info('Missing APDelivery worker added for inbox', ['inbox' => $delivery['inbox']]); + } + } + DBA::close($deliveries); + } +} diff --git a/static/dbstructure.config.php b/static/dbstructure.config.php index 08634290b7..44b2fc1f80 100644 --- a/static/dbstructure.config.php +++ b/static/dbstructure.config.php @@ -55,7 +55,7 @@ use Friendica\Database\DBA; if (!defined('DB_UPDATE_VERSION')) { - define('DB_UPDATE_VERSION', 1461); + define('DB_UPDATE_VERSION', 1462); } return [ @@ -1165,6 +1165,8 @@ return [ "uid" => ["type" => "mediumint unsigned", "foreign" => ["user" => "uid"], "comment" => "Delivering user"], "created" => ["type" => "datetime", "default" => DBA::NULL_DATETIME, "comment" => ""], "command" => ["type" => "varbinary(32)", "comment" => ""], + "failed" => ["type" => "tinyint", "default" => 0, "comment" => "Number of times the delivery has failed"], + "receivers" => ["type" => "mediumtext", "comment" => "JSON encoded array with the receiving contacts"], ], "indexes" => [ "PRIMARY" => ["uri-id", "inbox-id"],