From 7e89bf5af89c18c1934b5af0d162897eda3ad7e7 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 5 Jan 2021 10:18:25 +0000 Subject: [PATCH] Wait for child being ready --- src/Core/Worker.php | 182 ++++++++++++++++--------------------- static/defaults.config.php | 4 + 2 files changed, 82 insertions(+), 104 deletions(-) diff --git a/src/Core/Worker.php b/src/Core/Worker.php index bb17e430c1..f84dd29475 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -94,101 +94,70 @@ class Worker $last_check = $starttime = time(); self::$state = self::STATE_STARTUP; - $wait_interval = self::isDaemonMode() ? 360 : 10; - $start = time(); - do { - // We fetch the next queue entry that is about to be executed - while ($r = self::workerProcess()) { - // Don't refetch when a worker fetches tasks for multiple workers - $refetched = DI::config()->get('system', 'worker_multiple_fetch'); - foreach ($r as $entry) { - // Assure that the priority is an integer value - $entry['priority'] = (int)$entry['priority']; + // We fetch the next queue entry that is about to be executed + while ($r = self::workerProcess()) { + if (self::IPCJobsExists(getmypid())) { + self::IPCSetJobState(false, getmypid()); + } + // Don't refetch when a worker fetches tasks for multiple workers + $refetched = DI::config()->get('system', 'worker_multiple_fetch'); + foreach ($r as $entry) { + // Assure that the priority is an integer value + $entry['priority'] = (int)$entry['priority']; - // The work will be done - if (!self::execute($entry)) { - Logger::notice('Process execution failed, quitting.'); + // The work will be done + if (!self::execute($entry)) { + Logger::notice('Process execution failed, quitting.'); + return; + } + + // Trying to fetch new processes - but only once when successful + if (!$refetched && DI::lock()->acquire(self::LOCK_PROCESS, 0)) { + self::findWorkerProcesses(); + DI::lock()->release(self::LOCK_PROCESS); + self::$state = self::STATE_REFETCH; + $refetched = true; + } else { + self::$state = self::STATE_SHORT_LOOP; + } + } + + // To avoid the quitting of multiple workers only one worker at a time will execute the check + if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) { + self::$state = self::STATE_LONG_LOOP; + + if (DI::lock()->acquire(self::LOCK_WORKER, 0)) { + // Count active workers and compare them with a maximum value that depends on the load + if (self::tooMuchWorkers()) { + Logger::notice('Active worker limit reached, quitting.'); + DI::lock()->release(self::LOCK_WORKER); return; } - // Trying to fetch new processes - but only once when successful - if (!$refetched && DI::lock()->acquire(self::LOCK_PROCESS, 0)) { - self::findWorkerProcesses(); - DI::lock()->release(self::LOCK_PROCESS); - self::$state = self::STATE_REFETCH; - $refetched = true; - } else { - self::$state = self::STATE_SHORT_LOOP; - } - } - - // To avoid the quitting of multiple workers only one worker at a time will execute the check - if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) { - self::$state = self::STATE_LONG_LOOP; - - if (DI::lock()->acquire(self::LOCK_WORKER, 0)) { - // Count active workers and compare them with a maximum value that depends on the load - if (self::tooMuchWorkers()) { - Logger::notice('Active worker limit reached, quitting.'); - DI::lock()->release(self::LOCK_WORKER); - return; - } - - // Check free memory - if (DI::process()->isMinMemoryReached()) { - Logger::warning('Memory limit reached, quitting.'); - DI::lock()->release(self::LOCK_WORKER); - return; - } + // Check free memory + if (DI::process()->isMinMemoryReached()) { + Logger::warning('Memory limit reached, quitting.'); DI::lock()->release(self::LOCK_WORKER); + return; } - $last_check = time(); + DI::lock()->release(self::LOCK_WORKER); } - - // Quit the worker once every cron interval - if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { - Logger::info('Process lifetime reached, respawning.'); - self::unclaimProcess(); - if (self::isDaemonMode()) { - self::IPCSetJobState(true); - } else { - self::spawnWorker(); - } - return; - } - $start = time(); + $last_check = time(); } - $seconds = (time() - $start); - - // logarithmic wait time calculation. - $arg = (($seconds + 1) / ($wait_interval / 9)) + 1; - $sleep = min(1000000, round(log10($arg) * 1000000, 0)); - usleep($sleep); - - $timeout = ($seconds >= $wait_interval); - Logger::info('Timeout', ['timeout' => $timeout, 'seconds' => $seconds, 'sleep' => $sleep]); - - if (!$timeout) { - if (DI::process()->isMaxLoadReached()) { - Logger::notice('maximum load reached, quitting.'); - return; + // Quit the worker once every cron interval + if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { + Logger::info('Process lifetime reached, respawning.'); + self::unclaimProcess(); + if (self::isDaemonMode()) { + self::IPCSetJobState(true); + } else { + self::spawnWorker(); } - - // Kill stale processes every 5 minutes - $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0); - if (time() > ($last_cleanup + 300)) { - DI::config()->set('system', 'worker_last_cleaned', time()); - self::killStaleWorkers(); - } - - // Check if the system is ready - if (!self::isReady()) { - return; - } + return; } - } while (!$timeout); + } // Cleaning up. Possibly not needed, but it doesn't harm anything. if (self::isDaemonMode()) { @@ -1248,33 +1217,36 @@ class Worker } elseif ($pid) { // The parent process continues here DBA::connect(); - Logger::info('Spawned new worker', ['cron' => $do_cron, 'pid' => $pid]); + Logger::info('Spawned new worker', ['pid' => $pid]); + self::IPCSetJobState(true, $pid); + + $cycles = 0; + while (self::IPCJobsExists($pid) && (++$cycles < 100)) { + usleep(10000); + } + + Logger::info('Spawned worker is ready', ['pid' => $pid, 'wait_cycles' => $cycles]); return; } // We now are in the new worker DBA::connect(); - Logger::info('Worker spawned', ['cron' => $do_cron, 'pid' => getmypid()]); + Logger::info('Worker spawned', ['pid' => getmypid()]); - DI::process()->start(); + $cycles = 0; + while (!self::IPCJobsExists($pid) && (++$cycles < 100)) { + usleep(10000); + } + + Logger::info('Parent is ready', ['pid' => getmypid(), 'wait_cycles' => $cycles]); self::processQueue($do_cron); self::unclaimProcess(); + self::IPCSetJobState(false, getmypid()); DI::process()->end(); - Logger::info('Worker ended', ['cron' => $do_cron, 'pid' => getmypid()]); - - DBA::disconnect(); -/* - $php = '/usr/bin/php'; - $param = ['bin/worker.php']; - if ($do_cron) { - $param[] = 'no_cron'; - } - pcntl_exec($php, $param); - Logger::warning('Error calling worker', ['cron' => $do_cron, 'pid' => getmypid()]); -*/ + Logger::info('Worker ended', ['pid' => getmypid()]); exit(); } @@ -1287,14 +1259,16 @@ class Worker */ public static function spawnWorker($do_cron = false) { - if (self::isDaemonMode()) { + if (self::isDaemonMode() && DI::config()->get('system', 'worker_fork')) { self::forkProcess($do_cron); - self::IPCSetJobState(false); } else { $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), DI::modelProcess(), DI::app()->getBasePath(), getmypid()); $process->run('bin/worker.php', ['no_cron' => !$do_cron]); } + if (self::isDaemonMode()) { + self::IPCSetJobState(false); + } } /** @@ -1505,10 +1479,10 @@ class Worker * @param boolean $jobs Is there a waiting job? * @throws \Exception */ - public static function IPCSetJobState($jobs) + public static function IPCSetJobState(bool $jobs, int $key = 0) { $stamp = (float)microtime(true); - DBA::update('worker-ipc', ['jobs' => $jobs], ['key' => 1], true); + DBA::replace('worker-ipc', ['jobs' => $jobs, 'key' => $key]); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); } @@ -1519,10 +1493,10 @@ class Worker * @return bool * @throws \Exception */ - public static function IPCJobsExists() + public static function IPCJobsExists(int $key = 0) { $stamp = (float)microtime(true); - $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => 1]); + $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => $key]); self::$db_duration += (microtime(true) - $stamp); // When we don't have a row, no job is running diff --git a/static/defaults.config.php b/static/defaults.config.php index 310d1ea08e..ed0f8f871a 100644 --- a/static/defaults.config.php +++ b/static/defaults.config.php @@ -538,6 +538,10 @@ return [ // Number of worker tasks that are fetched in a single query. 'worker_fetch_limit' => 1, + // worker_fork (Boolean) + // Experimental setting. use pcntl_fork to spawn a new worker process + 'worker_fork' => false, + // worker_jpm (Boolean) // If enabled, it prints out the jobs per minute. 'worker_jpm' => false,