From dad74e965085a29589f1f6f9c9ba33df493e889e Mon Sep 17 00:00:00 2001 From: Michael Vogel Date: Thu, 4 Aug 2016 15:15:43 +0200 Subject: [PATCH 1/3] pubsubpublish is now split into separate calls per entry. --- include/notifier.php | 4 +- include/pubsubpublish.php | 101 ++++++++++++++++++++++---------------- 2 files changed, 60 insertions(+), 45 deletions(-) diff --git a/include/notifier.php b/include/notifier.php index cfe4e18412..0610a4e398 100644 --- a/include/notifier.php +++ b/include/notifier.php @@ -642,8 +642,8 @@ function notifier_run(&$argv, &$argc){ if ($h === '[internal]') { // Set push flag for PuSH subscribers to this topic, // they will be notified in queue.php - q("UPDATE `push_subscriber` SET `push` = 1 " . - "WHERE `nickname` = '%s'", dbesc($owner['nickname'])); + q("UPDATE `push_subscriber` SET `push` = 1 ". + "WHERE `nickname` = '%s' AND `push` = 0", dbesc($owner['nickname'])); logger('Activating internal PuSH for item '.$item_id, LOGGER_DEBUG); diff --git a/include/pubsubpublish.php b/include/pubsubpublish.php index 625eefc261..85637facb2 100644 --- a/include/pubsubpublish.php +++ b/include/pubsubpublish.php @@ -2,60 +2,57 @@ require_once("boot.php"); require_once("include/ostatus.php"); -function handle_pubsubhubbub() { +use \Friendica\Core\Config; +use \Friendica\Core\PConfig; + +function handle_pubsubhubbub($id) { global $a, $db; - logger('start'); + $r = q("SELECT * FROM `push_subscriber` WHERE `id` = %d", intval($id)); + if (!$r) + return; + else + $rr = $r[0]; - // We'll push to each subscriber that has push > 0, - // i.e. there has been an update (set in notifier.php). + logger("Generate feed of user ".$rr['nickname']." to ".$rr['callback_url']." - last updated ".$rr['last_update'], LOGGER_DEBUG); - $r = q("SELECT * FROM `push_subscriber` WHERE `push` > 0"); + $params = ostatus::feed($a, $rr['nickname'], $rr['last_update']); + $hmac_sig = hash_hmac("sha1", $params, $rr['secret']); - foreach($r as $rr) { + $headers = array("Content-type: application/atom+xml", + sprintf("Link: <%s>;rel=hub,<%s>;rel=self", + $a->get_baseurl().'/pubsubhubbub', + $rr['topic']), + "X-Hub-Signature: sha1=".$hmac_sig); - logger("Generate feed for user ".$rr['nickname']." - last updated ".$rr['last_update'], LOGGER_DEBUG); + logger('POST '.print_r($headers, true)."\n".$params, LOGGER_DEBUG); - $params = ostatus::feed($a, $rr['nickname'], $rr['last_update']); - $hmac_sig = hash_hmac("sha1", $params, $rr['secret']); + post_url($rr['callback_url'], $params, $headers); + $ret = $a->get_curl_code(); - $headers = array("Content-type: application/atom+xml", - sprintf("Link: <%s>;rel=hub,<%s>;rel=self", - $a->get_baseurl().'/pubsubhubbub', - $rr['topic']), - "X-Hub-Signature: sha1=".$hmac_sig); + if ($ret >= 200 && $ret <= 299) { + logger('successfully pushed to '.$rr['callback_url']); - logger('POST '.print_r($headers, true)."\n".$params, LOGGER_DEBUG); + // set last_update to "now", and reset push=0 + $date_now = datetime_convert('UTC','UTC','now','Y-m-d H:i:s'); + q("UPDATE `push_subscriber` SET `push` = 0, last_update = '%s' WHERE id = %d", + dbesc($date_now), + intval($rr['id'])); - post_url($rr['callback_url'], $params, $headers); - $ret = $a->get_curl_code(); + } else { + logger('error when pushing to '.$rr['callback_url'].' HTTP: '.$ret); - if ($ret >= 200 && $ret <= 299) { - logger('successfully pushed to '.$rr['callback_url']); + // we use the push variable also as a counter, if we failed we + // increment this until some upper limit where we give up + $new_push = intval($rr['push']) + 1; - // set last_update to "now", and reset push=0 - $date_now = datetime_convert('UTC','UTC','now','Y-m-d H:i:s'); - q("UPDATE `push_subscriber` SET `push` = 0, last_update = '%s' WHERE id = %d", - dbesc($date_now), - intval($rr['id'])); + if ($new_push > 30) // OK, let's give up + $new_push = 0; - } else { - logger('error when pushing to '.$rr['callback_url'].' HTTP: '.$ret); - - // we use the push variable also as a counter, if we failed we - // increment this until some upper limit where we give up - $new_push = intval($rr['push']) + 1; - - if ($new_push > 30) // OK, let's give up - $new_push = 0; - - q("UPDATE `push_subscriber` SET `push` = %d WHERE id = %d", - $new_push, - intval($rr['id'])); - } + q("UPDATE `push_subscriber` SET `push` = %d WHERE id = %d", + $new_push, + intval($rr['id'])); } - - logger('done'); } @@ -89,10 +86,28 @@ function pubsubpublish_run(&$argv, &$argc){ if($argc > 1) $pubsubpublish_id = intval($argv[1]); - else - $pubsubpublish_id = 0; + else { + // We'll push to each subscriber that has push > 0, + // i.e. there has been an update (set in notifier.php). + $r = q("SELECT `id`, `callback_url` FROM `push_subscriber` WHERE `push` > 0"); - handle_pubsubhubbub(); + // Use the delivery interval that is also used for the notifier + $interval = Config::get("system", "delivery_interval", 2); + + // If we are using the worker we don't need a delivery interval + if (get_config("system", "worker")) + $interval = false; + + foreach($r as $rr) { + logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG); + proc_run(PRIORITY_HIGH, 'include/pubsubpublish.php', $rr["id"]); + + if($interval) + @time_sleep_until(microtime(true) + (float) $interval); + } + } + + handle_pubsubhubbub($pubsubpublish_id); return; From 074ae59f49ddaae9d699a1e32537cb71c2c7590e Mon Sep 17 00:00:00 2001 From: Michael Vogel Date: Thu, 4 Aug 2016 15:33:15 +0200 Subject: [PATCH 2/3] Reschedule killed processes at the beginning of the queue. --- include/poller.php | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/include/poller.php b/include/poller.php index dfc3b90cf0..578f1424eb 100644 --- a/include/poller.php +++ b/include/poller.php @@ -235,9 +235,10 @@ function poller_kill_stale_workers() { logger("Worker process ".$pid["pid"]." took more than 3 hours. It will be killed now."); posix_kill($pid["pid"], SIGTERM); - // Question: If a process is stale: Should we remove it or should we reschedule it? - // By now we rescheduling it. It's maybe not the wisest decision? - q("UPDATE `workerqueue` SET `executed` = '0000-00-00 00:00:00', `pid` = 0 WHERE `pid` = %d", + // We killed the stale process. + // To avoid a blocking situation we reschedule the process at the beginning of the queue. + q("UPDATE `workerqueue` SET `executed` = '0000-00-00 00:00:00', `created` = '%s', `pid` = 0 WHERE `pid` = %d", + dbesc(datetime_convert()), intval($pid["pid"])); } else logger("Worker process ".$pid["pid"]." now runs for ".round($duration)." minutes. That's okay.", LOGGER_DEBUG); From 0411eb289e967dcc0dbd17f4dcba512c5903082e Mon Sep 17 00:00:00 2001 From: Michael Vogel Date: Thu, 4 Aug 2016 15:41:32 +0200 Subject: [PATCH 3/3] Lower the priority for stale processes --- include/poller.php | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/include/poller.php b/include/poller.php index 578f1424eb..73950e35b2 100644 --- a/include/poller.php +++ b/include/poller.php @@ -237,8 +237,11 @@ function poller_kill_stale_workers() { // We killed the stale process. // To avoid a blocking situation we reschedule the process at the beginning of the queue. - q("UPDATE `workerqueue` SET `executed` = '0000-00-00 00:00:00', `created` = '%s', `pid` = 0 WHERE `pid` = %d", + // Additionally we are lowering the priority. + q("UPDATE `workerqueue` SET `executed` = '0000-00-00 00:00:00', `created` = '%s', + `priority` = %d, `pid` = 0 WHERE `pid` = %d", dbesc(datetime_convert()), + intval(PRIORITY_LOW), intval($pid["pid"])); } else logger("Worker process ".$pid["pid"]." now runs for ".round($duration)." minutes. That's okay.", LOGGER_DEBUG);