From f9152ce140cf3bc7dcbbb767e85dbf4d8a7e1d41 Mon Sep 17 00:00:00 2001
From: Michael <heluecht@pirati.ca>
Date: Sat, 29 Aug 2020 09:03:50 +0000
Subject: [PATCH 1/4] Worker: Fetch jobs for multiple workers

---
 src/Core/Worker.php        | 71 ++++++++++++++++++++++++++++++++------
 static/defaults.config.php |  5 +++
 2 files changed, 65 insertions(+), 11 deletions(-)

diff --git a/src/Core/Worker.php b/src/Core/Worker.php
index 1cde61351c..04cd4cc608 100644
--- a/src/Core/Worker.php
+++ b/src/Core/Worker.php
@@ -785,6 +785,29 @@ class Worker
 		return $count;
 	}
 
+	/**
+	 * Returns the number of active worker processes
+	 *
+	 * @return array List of worker process ids
+	 * @throws \Exception
+	 */
+	private static function getWorkerPIDList()
+	{
+		$ids = [];
+		$stamp = (float)microtime(true);
+
+		$queues = DBA::p("SELECT `process`.`pid`, COUNT(`workerqueue`.`pid`) AS `entries` FROM `process`
+			LEFT JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `workerqueue`.`done` 
+			GROUP BY `process`.`pid`");
+		while ($queue = DBA::fetch($queues)) {
+			$ids[$queue['pid']] = $queue['entries'];
+		}
+		DBA::close($queues);
+
+		self::$db_duration += (microtime(true) - $stamp);
+		return $ids;
+	}
+
 	/**
 	 * Returns waiting jobs for the current process id
 	 *
@@ -806,11 +829,11 @@ class Worker
 
 	/**
 	 * Returns the next jobs that should be executed
-	 *
+	 * @param int $limit
 	 * @return array array with next jobs
 	 * @throws \Exception
 	 */
-	private static function nextProcess()
+	private static function nextProcess(int $limit)
 	{
 		$priority = self::nextPriority();
 		if (empty($priority)) {
@@ -818,8 +841,6 @@ class Worker
 			return [];
 		}
 
-		$limit = DI::config()->get('system', 'worker_fetch_limit', 1);
-
 		$ids = [];
 		$stamp = (float)microtime(true);
 		$condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
@@ -918,14 +939,30 @@ class Worker
 	 */
 	private static function findWorkerProcesses()
 	{
-		$mypid = getmypid();
+		$fetch_limit = DI::config()->get('system', 'worker_fetch_limit', 1);
 
-		$ids = self::nextProcess();
+		if (DI::config()->get('system', 'worker_multiple_fetch')) {
+			$pids = [];
+			$worker_pids = self::getWorkerPIDList();
+			foreach ($worker_pids as $pid => $count) {
+				if ($count <= $fetch_limit) {
+					$pids[] = $pid;
+				}
+			}
+			if (empty($pids)) {
+				return;
+			}
+			$limit = $fetch_limit * count($pids);
+		} else {
+			$pids = [getmypid()];
+			$limit = $fetch_limit;
+		}
 
-		// If there is no result we check without priority limit
-		if (empty($ids)) {
-			$limit = DI::config()->get('system', 'worker_fetch_limit', 1);
+		$ids = self::nextProcess($limit);
+		$limit -= count($ids);
 
+		// If there is not enough results we check without priority limit
+		if ($limit > 0) {
 			$stamp = (float)microtime(true);
 			$condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
 			$tasks = DBA::select('workerqueue', ['id', 'parameter'], $condition, ['limit' => $limit, 'order' => ['priority', 'created']]);
@@ -943,9 +980,21 @@ class Worker
 		}
 
 		if (!empty($ids)) {
+			$worker = [];
+			foreach (array_unique($ids) as $id) {
+				$pid = next($pids);
+				if (!$pid) {
+					$pid = reset($pids);
+				}
+				$worker[$pid][] = $id;
+			}
+
 			$stamp = (float)microtime(true);
-			$condition = ['id' => $ids, 'done' => false, 'pid' => 0];
-			DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $condition);
+			foreach ($worker as $worker_pid => $worker_ids) {
+				Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]);
+				DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid],
+					['id' => $worker_ids, 'done' => false, 'pid' => 0]);
+			}
 			self::$db_duration += (microtime(true) - $stamp);
 			self::$db_duration_write += (microtime(true) - $stamp);
 		}
diff --git a/static/defaults.config.php b/static/defaults.config.php
index 947ac1dc01..4d595010ce 100644
--- a/static/defaults.config.php
+++ b/static/defaults.config.php
@@ -499,6 +499,11 @@ return [
 		// Setting 0 would allow maximum worker queues at all times, which is not recommended.
 		'worker_load_exponent' => 3,
 
+		// worker_multiple_fetch (Boolean)
+		// When activated, the worker fetches jobs for multiple workers (not only for itself).
+		// This is an experimental setting without knowing the performance impact.
+		'worker_multiple_fetch' => false,
+		
 		// worker_defer_limit (Integer)
 		// Per default the systems tries delivering for 15 times before dropping it.
 		'worker_defer_limit' => 15,

From 069786cd7ff859bb3cc842cf6947160f1b7faba2 Mon Sep 17 00:00:00 2001
From: Michael <heluecht@pirati.ca>
Date: Sat, 29 Aug 2020 10:44:38 +0000
Subject: [PATCH 2/4] Simplified the code

---
 src/Core/Worker.php | 52 ++++++++++++++++++++-------------------------
 1 file changed, 23 insertions(+), 29 deletions(-)

diff --git a/src/Core/Worker.php b/src/Core/Worker.php
index 04cd4cc608..f4df322492 100644
--- a/src/Core/Worker.php
+++ b/src/Core/Worker.php
@@ -943,8 +943,7 @@ class Worker
 
 		if (DI::config()->get('system', 'worker_multiple_fetch')) {
 			$pids = [];
-			$worker_pids = self::getWorkerPIDList();
-			foreach ($worker_pids as $pid => $count) {
+			foreach (self::getWorkerPIDList() as $pid => $count) {
 				if ($count <= $fetch_limit) {
 					$pids[] = $pid;
 				}
@@ -979,27 +978,28 @@ class Worker
 			DBA::close($tasks);
 		}
 
-		if (!empty($ids)) {
-			$worker = [];
-			foreach (array_unique($ids) as $id) {
-				$pid = next($pids);
-				if (!$pid) {
-					$pid = reset($pids);
-				}
-				$worker[$pid][] = $id;
-			}
-
-			$stamp = (float)microtime(true);
-			foreach ($worker as $worker_pid => $worker_ids) {
-				Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]);
-				DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid],
-					['id' => $worker_ids, 'done' => false, 'pid' => 0]);
-			}
-			self::$db_duration += (microtime(true) - $stamp);
-			self::$db_duration_write += (microtime(true) - $stamp);
+		if (empty($ids)) {
+			return;
 		}
 
-		return !empty($ids);
+		// Assign the task ids to the workers
+		$worker = [];
+		foreach (array_unique($ids) as $id) {
+			$pid = next($pids);
+			if (!$pid) {
+				$pid = reset($pids);
+			}
+			$worker[$pid][] = $id;
+		}
+
+		$stamp = (float)microtime(true);
+		foreach ($worker as $worker_pid => $worker_ids) {
+			Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]);
+			DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid],
+				['id' => $worker_ids, 'done' => false, 'pid' => 0]);
+		}
+		self::$db_duration += (microtime(true) - $stamp);
+		self::$db_duration_write += (microtime(true) - $stamp);
 	}
 
 	/**
@@ -1022,17 +1022,11 @@ class Worker
 		}
 		self::$lock_duration += (microtime(true) - $stamp);
 
-		$found = self::findWorkerProcesses();
+		self::findWorkerProcesses();
 
 		DI::lock()->release(self::LOCK_PROCESS);
 
-		if ($found) {
-			$stamp = (float)microtime(true);
-			$r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]);
-			self::$db_duration += (microtime(true) - $stamp);
-			return DBA::toArray($r);
-		}
-		return false;
+		return self::getWaitingJobForPID();
 	}
 
 	/**

From 8148d9dc9db8599ed92e6698930e9020854e455e Mon Sep 17 00:00:00 2001
From: Michael <heluecht@pirati.ca>
Date: Sat, 29 Aug 2020 11:26:40 +0000
Subject: [PATCH 3/4] Unclaim on end / don't refetch

---
 src/Core/Worker.php | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/Core/Worker.php b/src/Core/Worker.php
index f4df322492..bbb6f57b2a 100644
--- a/src/Core/Worker.php
+++ b/src/Core/Worker.php
@@ -96,7 +96,8 @@ class Worker
 
 		// We fetch the next queue entry that is about to be executed
 		while ($r = self::workerProcess()) {
-			$refetched = false;
+			// Don't refetch when a worker fetches tasks for multiple workers
+			$refetched = DI::config()->get('system', 'worker_multiple_fetch');
 			foreach ($r as $entry) {
 				// Assure that the priority is an integer value
 				$entry['priority'] = (int)$entry['priority'];
@@ -143,6 +144,7 @@ class Worker
 			// Quit the worker once every cron interval
 			if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
 				Logger::info('Process lifetime reached, respawning.');
+				self::unclaimProcess();
 				self::spawnWorker();
 				return;
 			}

From d8d2cdc6ef64daf83a23ee1eabe95e27e6bb051b Mon Sep 17 00:00:00 2001
From: Michael <heluecht@pirati.ca>
Date: Sat, 29 Aug 2020 13:01:58 +0000
Subject: [PATCH 4/4] Only c heck every 5 seconds for the system health

---
 src/Core/Worker.php | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/src/Core/Worker.php b/src/Core/Worker.php
index bbb6f57b2a..7758e06a90 100644
--- a/src/Core/Worker.php
+++ b/src/Core/Worker.php
@@ -91,7 +91,7 @@ class Worker
 			self::runCron();
 		}
 
-		$starttime = time();
+		$last_check = $starttime = time();
 		self::$state = self::STATE_STARTUP;
 
 		// We fetch the next queue entry that is about to be executed
@@ -120,7 +120,7 @@ class Worker
 			}
 
 			// To avoid the quitting of multiple workers only one worker at a time will execute the check
-			if (!self::getWaitingJobForPID()) {
+			if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) {
 				self::$state = self::STATE_LONG_LOOP;
 
 				if (DI::lock()->acquire(self::LOCK_WORKER, 0)) {
@@ -139,6 +139,7 @@ class Worker
 					}
 					DI::lock()->release(self::LOCK_WORKER);
 				}
+				$last_check = time();
 			}
 
 			// Quit the worker once every cron interval
@@ -784,6 +785,7 @@ class Worker
 		$stamp = (float)microtime(true);
 		$count = DBA::count('process', ['command' => 'Worker.php']);
 		self::$db_duration += (microtime(true) - $stamp);
+		self::$db_duration_count += (microtime(true) - $stamp);
 		return $count;
 	}
 
@@ -807,6 +809,7 @@ class Worker
 		DBA::close($queues);
 
 		self::$db_duration += (microtime(true) - $stamp);
+		self::$db_duration_count += (microtime(true) - $stamp);
 		return $ids;
 	}