Simplified the code
This commit is contained in:
@@ -943,8 +943,7 @@ class Worker
|
|||||||
|
|
||||||
if (DI::config()->get('system', 'worker_multiple_fetch')) {
|
if (DI::config()->get('system', 'worker_multiple_fetch')) {
|
||||||
$pids = [];
|
$pids = [];
|
||||||
$worker_pids = self::getWorkerPIDList();
|
foreach (self::getWorkerPIDList() as $pid => $count) {
|
||||||
foreach ($worker_pids as $pid => $count) {
|
|
||||||
if ($count <= $fetch_limit) {
|
if ($count <= $fetch_limit) {
|
||||||
$pids[] = $pid;
|
$pids[] = $pid;
|
||||||
}
|
}
|
||||||
@@ -979,27 +978,28 @@ class Worker
|
|||||||
DBA::close($tasks);
|
DBA::close($tasks);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!empty($ids)) {
|
if (empty($ids)) {
|
||||||
$worker = [];
|
return;
|
||||||
foreach (array_unique($ids) as $id) {
|
|
||||||
$pid = next($pids);
|
|
||||||
if (!$pid) {
|
|
||||||
$pid = reset($pids);
|
|
||||||
}
|
|
||||||
$worker[$pid][] = $id;
|
|
||||||
}
|
|
||||||
|
|
||||||
$stamp = (float)microtime(true);
|
|
||||||
foreach ($worker as $worker_pid => $worker_ids) {
|
|
||||||
Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]);
|
|
||||||
DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid],
|
|
||||||
['id' => $worker_ids, 'done' => false, 'pid' => 0]);
|
|
||||||
}
|
|
||||||
self::$db_duration += (microtime(true) - $stamp);
|
|
||||||
self::$db_duration_write += (microtime(true) - $stamp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return !empty($ids);
|
// Assign the task ids to the workers
|
||||||
|
$worker = [];
|
||||||
|
foreach (array_unique($ids) as $id) {
|
||||||
|
$pid = next($pids);
|
||||||
|
if (!$pid) {
|
||||||
|
$pid = reset($pids);
|
||||||
|
}
|
||||||
|
$worker[$pid][] = $id;
|
||||||
|
}
|
||||||
|
|
||||||
|
$stamp = (float)microtime(true);
|
||||||
|
foreach ($worker as $worker_pid => $worker_ids) {
|
||||||
|
Logger::info('Set queue entry', ['pid' => $worker_pid, 'ids' => $worker_ids]);
|
||||||
|
DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $worker_pid],
|
||||||
|
['id' => $worker_ids, 'done' => false, 'pid' => 0]);
|
||||||
|
}
|
||||||
|
self::$db_duration += (microtime(true) - $stamp);
|
||||||
|
self::$db_duration_write += (microtime(true) - $stamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -1022,17 +1022,11 @@ class Worker
|
|||||||
}
|
}
|
||||||
self::$lock_duration += (microtime(true) - $stamp);
|
self::$lock_duration += (microtime(true) - $stamp);
|
||||||
|
|
||||||
$found = self::findWorkerProcesses();
|
self::findWorkerProcesses();
|
||||||
|
|
||||||
DI::lock()->release(self::LOCK_PROCESS);
|
DI::lock()->release(self::LOCK_PROCESS);
|
||||||
|
|
||||||
if ($found) {
|
return self::getWaitingJobForPID();
|
||||||
$stamp = (float)microtime(true);
|
|
||||||
$r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]);
|
|
||||||
self::$db_duration += (microtime(true) - $stamp);
|
|
||||||
return DBA::toArray($r);
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
Reference in New Issue
Block a user