Use a centralized function to check the priority

This commit is contained in:
Michael 2022-05-01 09:29:31 +00:00
parent f7b6507438
commit 13e4144ba6

View File

@ -22,7 +22,6 @@
namespace Friendica\Core; namespace Friendica\Core;
use Friendica\App\Mode; use Friendica\App\Mode;
use Friendica\Core;
use Friendica\Core\Worker\Entity\Process; use Friendica\Core\Worker\Entity\Process;
use Friendica\Database\DBA; use Friendica\Database\DBA;
use Friendica\DI; use Friendica\DI;
@ -105,12 +104,7 @@ class Worker
// Don't refetch when a worker fetches tasks for multiple workers // Don't refetch when a worker fetches tasks for multiple workers
$refetched = DI::config()->get('system', 'worker_multiple_fetch'); $refetched = DI::config()->get('system', 'worker_multiple_fetch');
foreach ($r as $entry) { foreach ($r as $entry) {
// Assure that the priority is an integer value $entry = self::checkPriority($entry);
$entry['priority'] = (int)$entry['priority'];
if (!in_array($entry['priority'], PRIORITIES)) {
Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]);
$entry['priority'] = PRIORITY_MEDIUM;
}
// The work will be done // The work will be done
if (!self::execute($entry)) { if (!self::execute($entry)) {
@ -172,6 +166,24 @@ class Worker
Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]); Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]);
} }
/**
* Check and fix the priority of a worker task
* @param array $entry
* @return array
*/
private static function checkPriority(array $entry)
{
$entry['priority'] = (int)$entry['priority'];
if (!in_array($entry['priority'], PRIORITIES)) {
Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]);
DBA::update('workerqueue', ['priority' => PRIORITY_MEDIUM], ['id' => $entry['id']]);
$entry['priority'] = PRIORITY_MEDIUM;
}
return $entry;
}
/** /**
* Checks if the system is ready. * Checks if the system is ready.
* *
@ -484,11 +496,6 @@ class Worker
// For this reason the variables have to be initialized. // For this reason the variables have to be initialized.
DI::profiler()->reset(); DI::profiler()->reset();
if (!in_array($queue['priority'], PRIORITIES)) {
Logger::warning('Invalid priority', ['queue' => $queue, 'callstack' => System::callstack(20)]);
$queue['priority'] = PRIORITY_MEDIUM;
}
$a->setQueue($queue); $a->setQueue($queue);
$up_duration = microtime(true) - self::$up_start; $up_duration = microtime(true) - self::$up_start;
@ -653,6 +660,8 @@ class Worker
self::$db_duration += (microtime(true) - $stamp); self::$db_duration += (microtime(true) - $stamp);
while ($entry = DBA::fetch($entries)) { while ($entry = DBA::fetch($entries)) {
$entry = self::checkPriority($entry);
if (!posix_kill($entry["pid"], 0)) { if (!posix_kill($entry["pid"], 0)) {
$stamp = (float)microtime(true); $stamp = (float)microtime(true);
DBA::update( DBA::update(
@ -664,11 +673,6 @@ class Worker
self::$db_duration_write += (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp);
} else { } else {
// Kill long running processes // Kill long running processes
// Check if the priority is in a valid range
if (!in_array($entry['priority'], PRIORITIES)) {
Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]);
$entry['priority'] = PRIORITY_MEDIUM;
}
// Define the maximum durations // Define the maximum durations
$max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720]; $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
@ -1393,14 +1397,11 @@ class Worker
return false; return false;
} }
$queue = self::checkPriority($queue);
$id = $queue['id']; $id = $queue['id'];
$priority = $queue['priority']; $priority = $queue['priority'];
if (!in_array($priority, PRIORITIES)) {
Logger::warning('Invalid priority', ['queue' => $queue, 'callstack' => System::callstack(20)]);
$priority = PRIORITY_MEDIUM;
}
$max_level = DI::config()->get('system', 'worker_defer_limit'); $max_level = DI::config()->get('system', 'worker_defer_limit');
$new_retrial = self::getNextRetrial($queue, $max_level); $new_retrial = self::getNextRetrial($queue, $max_level);