From 30b0a035f984e0ba7241e4902b90f2bdee10a189 Mon Sep 17 00:00:00 2001
From: Michael <heluecht@pirati.ca>
Date: Thu, 6 Jul 2017 05:48:02 +0000
Subject: [PATCH] Split expire.php in several processes / small worker changes

---
 include/cron.php          |  2 +-
 include/discover_poco.php | 13 +++++-----
 include/expire.php        | 52 +++++++++++++++++++++++++++++----------
 include/gprobe.php        |  2 +-
 include/identity.php      |  2 +-
 include/poller.php        | 32 ++++++++++++++++++------
 include/queue.php         |  2 +-
 include/socgraph.php      | 12 ++++-----
 8 files changed, 79 insertions(+), 38 deletions(-)

diff --git a/include/cron.php b/include/cron.php
index 0e58778440..eda88dbcd3 100644
--- a/include/cron.php
+++ b/include/cron.php
@@ -253,7 +253,7 @@ function cron_poll_contacts($argc, $argv) {
 			} else {
 				$priority = PRIORITY_LOW;
 			}
-			proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/onepoll.php', intval($contact['id']));
+			proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/onepoll.php', (int)$contact['id']);
 		}
 	}
 }
diff --git a/include/discover_poco.php b/include/discover_poco.php
index 6bb296c9c1..dd208d492f 100644
--- a/include/discover_poco.php
+++ b/include/discover_poco.php
@@ -47,13 +47,12 @@ function discover_poco_run(&$argv, &$argc) {
 	logger('start '.$search);
 
 	if ($mode == 8) {
-		$profile_url = base64_decode($argv[2]);
-		if ($profile_url != "") {
-			poco_last_updated($profile_url, true);
+		if ($argv[2] != "") {
+			poco_last_updated($argv[2], true);
 		}
 	} elseif ($mode == 7) {
 		if ($argc == 6) {
-			$url = base64_decode($argv[5]);
+			$url = $argv[5];
 		} else {
 			$url = '';
 		}
@@ -63,7 +62,7 @@ function discover_poco_run(&$argv, &$argc) {
 	} elseif ($mode == 5) {
 		update_server();
 	} elseif ($mode == 4) {
-		$server_url = base64_decode($argv[2]);
+		$server_url = $argv[2];
 		if ($server_url == "") {
 			return;
 		}
@@ -119,7 +118,7 @@ function update_server() {
 		}
 		logger('Update server status for server '.$server["url"], LOGGER_DEBUG);
 
-		proc_run(PRIORITY_LOW, "include/discover_poco.php", "server", base64_encode($server["url"]));
+		proc_run(PRIORITY_LOW, "include/discover_poco.php", "server", $server["url"]);
 
 		if (++$updated > 250) {
 			return;
@@ -178,7 +177,7 @@ function discover_users() {
 
 		if ((($server_url == "") && ($user["network"] == NETWORK_FEED)) || $force_update || poco_check_server($server_url, $user["network"])) {
 			logger('Check profile '.$user["url"]);
-			proc_run(PRIORITY_LOW, "include/discover_poco.php", "check_profile", base64_encode($user["url"]));
+			proc_run(PRIORITY_LOW, "include/discover_poco.php", "check_profile", $user["url"]);
 
 			if (++$checked > 100) {
 				return;
diff --git a/include/expire.php b/include/expire.php
index aa34cbaa5b..b8295c2ef0 100644
--- a/include/expire.php
+++ b/include/expire.php
@@ -9,14 +9,32 @@ function expire_run(&$argv, &$argc){
 	require_once('include/items.php');
 	require_once('include/Contact.php');
 
+	load_hooks();
+
+	if (($argc == 2) && (intval($argv[1]) > 0)) {
+		$user = dba::select('user', array('uid', 'username', 'expire'), array('uid' => $argv[1]), array('limit' => 1));
+		if (dbm::is_result($user)) {
+			logger('Expire items for user '.$user['uid'].' ('.$user['username'].') - interval: '.$user['expire'], LOGGER_DEBUG);
+			item_expire($user['uid'], $user['expire']);
+			logger('Expire items for user '.$user['uid'].' ('.$user['username'].') - done ', LOGGER_DEBUG);
+		}
+		return;
+	} elseif (($argc == 3) && ($argv[1] == 'hook') && is_array($a->hooks) && array_key_exists("expire", $a->hooks)) {
+		foreach ($a->hooks["expire"] as $hook) {
+			if ($hook[1] == $argv[2]) {
+				logger("Calling expire hook '" . $hook[1] . "'", LOGGER_DEBUG);
+				call_single_hook($a, $name, $hook, $data);
+			}
+		}
+		return;
+	}
+
 	// physically remove anything that has been deleted for more than two months
 	$r = dba::p("SELECT `id` FROM `item` WHERE `deleted` AND `changed` < UTC_TIMESTAMP() - INTERVAL 60 DAY");
-	if (dbm::is_result($r)) {
-		while ($row = dba::fetch($r)) {
-			dba::delete('item', array('id' => $row['id']));
-		}
-		dba::close($r);
+	while ($row = dba::fetch($r)) {
+		dba::delete('item', array('id' => $row['id']));
 	}
+	dba::close($r);
 
 	// make this optional as it could have a performance impact on large sites
 	if (intval(get_config('system', 'optimize_items'))) {
@@ -25,17 +43,25 @@ function expire_run(&$argv, &$argc){
 
 	logger('expire: start');
 
-	$r = q("SELECT `uid`, `username`, `expire` FROM `user` WHERE `expire` != 0");
-	if (dbm::is_result($r)) {
-		foreach ($r as $rr) {
-			logger('Expire: ' . $rr['username'] . ' interval: ' . $rr['expire'], LOGGER_DEBUG);
-			item_expire($rr['uid'], $rr['expire']);
+	$r = dba::p("SELECT `uid`, `username` FROM `user` WHERE `expire` != 0");
+	while ($row = dba::fetch($r)) {
+		logger('Calling expiry for user '.$row['uid'].' ('.$row['username'].')', LOGGER_DEBUG);
+		proc_run(array('priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true),
+				'include/expire.php', (int)$row['uid']);
+	}
+	dba::close($r);
+
+	logger('expire: calling hooks');
+
+	if (is_array($a->hooks) && array_key_exists('expire', $a->hooks)) {
+		foreach ($a->hooks['expire'] as $hook) {
+			logger("Calling expire hook for '" . $hook[1] . "'", LOGGER_DEBUG);
+			proc_run(array('priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true),
+					'include/expire.php', 'hook', $hook[1]);
 		}
 	}
 
-	load_hooks();
-
-	call_hooks('expire');
+	logger('expire: end');
 
 	return;
 }
diff --git a/include/gprobe.php b/include/gprobe.php
index 788a9eb044..9fb93e9596 100644
--- a/include/gprobe.php
+++ b/include/gprobe.php
@@ -10,7 +10,7 @@ function gprobe_run(&$argv, &$argc){
 	if ($argc != 2) {
 		return;
 	}
-	$url = hex2bin($argv[1]);
+	$url = $argv[1];
 
 	$r = q("SELECT `id`, `url`, `network` FROM `gcontact` WHERE `nurl` = '%s' ORDER BY `id` LIMIT 1",
 		dbesc(normalise_link($url))
diff --git a/include/identity.php b/include/identity.php
index c733bea31b..a6db963ddf 100644
--- a/include/identity.php
+++ b/include/identity.php
@@ -888,7 +888,7 @@ function zrl_init(App $a) {
 			return;
 		}
 
-		proc_run(PRIORITY_LOW, 'include/gprobe.php', bin2hex($tmp_str));
+		proc_run(PRIORITY_LOW, 'include/gprobe.php', $tmp_str);
 		$arr = array('zrl' => $tmp_str, 'url' => $a->cmd);
 		call_hooks('zrl_init', $arr);
 	}
diff --git a/include/poller.php b/include/poller.php
index 4c0f665963..89a17fa0c7 100644
--- a/include/poller.php
+++ b/include/poller.php
@@ -88,9 +88,11 @@ function poller_run($argv, $argc){
 	$starttime = time();
 
 	// We fetch the next queue entry that is about to be executed
-	while ($r = poller_worker_process()) {
+	while ($r = poller_worker_process($passing_slow)) {
 
-		$refetched = false;
+		// When we are processing jobs with a lower priority, we don't refetch new jobs
+		// Otherwise fast jobs could wait behind slow ones and could be blocked.
+		$refetched = $passing_slow;
 
 		foreach ($r AS $entry) {
 			// Assure that the priority is an integer value
@@ -105,7 +107,7 @@ function poller_run($argv, $argc){
 			// If possible we will fetch new jobs for this worker
 			if (!$refetched && Lock::set('poller_worker_process', 0)) {
 				$stamp = (float)microtime(true);
-				$refetched = find_worker_processes();
+				$refetched = find_worker_processes($passing_slow);
 				$poller_db_duration += (microtime(true) - $stamp);
 				Lock::remove('poller_worker_process');
 			}
@@ -666,19 +668,31 @@ function poller_passing_slow(&$highest_priority) {
 /**
  * @brief Find and claim the next worker process for us
  *
+ * @param boolean $passing_slow Returns if we had passed low priority processes
  * @return boolean Have we found something?
  */
-function find_worker_processes() {
+function find_worker_processes(&$passing_slow) {
 
 	$mypid = getmypid();
 
 	// Check if we should pass some low priority process
 	$highest_priority = 0;
 	$found = false;
+	$passing_slow = false;
 
 	// The higher the number of parallel workers, the more we prefetch to prevent concurring access
-	$limit = Config::get("system", "worker_queues", 4);
-	$limit = Config::get('system', 'worker_fetch_limit', $limit);
+	// We decrease the limit with the number of entries left in the queue
+	$worker_queues = Config::get("system", "worker_queues", 4);
+	$queue_length = Config::get('system', 'worker_fetch_limit', $worker_queues);
+	$lower_job_limit = $worker_queues * $queue_length * 2;
+	$jobs = poller_total_entries();
+
+	// Now do some magic
+	$exponent = 2;
+	$slope = $queue_length / pow($lower_job_limit, $exponent);
+	$limit = min($queue_length, ceil($slope * pow($jobs, $exponent)));
+
+	logger('Total: '.$jobs.' - Maximum: '.$queue_length.' - jobs per queue: '.$limit, LOGGER_DEBUG);
 
 	if (poller_passing_slow($highest_priority)) {
 		// Are there waiting processes with a higher priority than the currently highest?
@@ -707,6 +721,7 @@ function find_worker_processes() {
 			dba::close($result);
 
 			$found = (count($ids) > 0);
+			$passing_slow = $found;
 		}
 	}
 
@@ -734,9 +749,10 @@ function find_worker_processes() {
 /**
  * @brief Returns the next worker process
  *
+ * @param boolean $passing_slow Returns if we had passed low priority processes
  * @return string SQL statement
  */
-function poller_worker_process() {
+function poller_worker_process(&$passing_slow) {
 	global $poller_db_duration, $poller_lock_duration;
 
 	$stamp = (float)microtime(true);
@@ -755,7 +771,7 @@ function poller_worker_process() {
 	$poller_lock_duration = (microtime(true) - $stamp);
 
 	$stamp = (float)microtime(true);
-	$found = find_worker_processes();
+	$found = find_worker_processes($passing_slow);
 	$poller_db_duration += (microtime(true) - $stamp);
 
 	Lock::remove('poller_worker_process');
diff --git a/include/queue.php b/include/queue.php
index f721326f06..865d6903a2 100644
--- a/include/queue.php
+++ b/include/queue.php
@@ -52,7 +52,7 @@ function queue_run(&$argv, &$argc) {
 		if (dbm::is_result($r)) {
 			foreach ($r as $q_item) {
 				logger('Call queue for id '.$q_item['id']);
-				proc_run(array('priority' => PRIORITY_LOW, 'dont_fork' => true), "include/queue.php", $q_item['id']);
+				proc_run(array('priority' => PRIORITY_LOW, 'dont_fork' => true), "include/queue.php", (int)$q_item['id']);
 			}
 		}
 		return;
diff --git a/include/socgraph.php b/include/socgraph.php
index f055aebdf8..ce6cdcdef6 100644
--- a/include/socgraph.php
+++ b/include/socgraph.php
@@ -38,7 +38,7 @@ require_once 'include/Photo.php';
  */
 function poco_load($cid, $uid = 0, $zcid = 0, $url = null) {
 	// Call the function "poco_load_worker" via the worker
-	proc_run(PRIORITY_LOW, "include/discover_poco.php", "poco_load", intval($cid), intval($uid), intval($zcid), base64_encode($url));
+	proc_run(PRIORITY_LOW, "include/discover_poco.php", "poco_load", (int)$cid, (int)$uid, (int)$zcid, $url);
 }
 
 /**
@@ -1668,7 +1668,7 @@ function poco_fetch_serverlist($poco) {
 		$r = q("SELECT `nurl` FROM `gserver` WHERE `nurl` = '%s'", dbesc(normalise_link($server_url)));
 		if (!dbm::is_result($r)) {
 			logger("Call server check for server ".$server_url, LOGGER_DEBUG);
-			proc_run(PRIORITY_LOW, "include/discover_poco.php", "server", base64_encode($server_url));
+			proc_run(PRIORITY_LOW, "include/discover_poco.php", "server", $server_url);
 		}
 	}
 }
@@ -1690,7 +1690,7 @@ function poco_discover_federation() {
 		$servers = json_decode($serverdata);
 
 		foreach ($servers->pods as $server) {
-			proc_run(PRIORITY_LOW, "include/discover_poco.php", "server", base64_encode("https://".$server->host));
+			proc_run(PRIORITY_LOW, "include/discover_poco.php", "server", "https://".$server->host);
 		}
 	}
 
@@ -1703,7 +1703,7 @@ function poco_discover_federation() {
 
 			foreach ($servers as $server) {
 				$url = (is_null($server->https_score) ? 'http' : 'https').'://'.$server->name;
-				proc_run(PRIORITY_LOW, "include/discover_poco.php", "server", base64_encode($url));
+				proc_run(PRIORITY_LOW, "include/discover_poco.php", "server", $url);
 			}
 		}
 	}
@@ -1813,7 +1813,7 @@ function poco_discover($complete = false) {
 			}
 
 			logger('Update directory from server '.$server['url'].' with ID '.$server['id'], LOGGER_DEBUG);
-			proc_run(PRIORITY_LOW, "include/discover_poco.php", "update_server_directory", intval($server['id']));
+			proc_run(PRIORITY_LOW, "include/discover_poco.php", "update_server_directory", (int)$server['id']);
 
 			if (!$complete && (--$no_of_queries == 0)) {
 				break;
@@ -2091,7 +2091,7 @@ function get_gcontact_id($contact) {
 
 	if ($doprobing) {
 		logger("Last Contact: ". $last_contact_str." - Last Failure: ".$last_failure_str." - Checking: ".$contact["url"], LOGGER_DEBUG);
-		proc_run(PRIORITY_LOW, 'include/gprobe.php', bin2hex($contact["url"]));
+		proc_run(PRIORITY_LOW, 'include/gprobe.php', $contact["url"]);
 	}
 
 	return $gcontact_id;