Merge pull request #3559 from annando/some-more-poller

The endless story "poller" just continues ;-)
This commit is contained in:
Tobias Diekershoff 2017-07-04 08:38:38 +02:00 committed by GitHub
commit fd36e4e8b0
3 changed files with 107 additions and 61 deletions

View File

@ -627,6 +627,12 @@ class dba {
self::$dbo->errorno = mysql_errno(self::$dbo->db); self::$dbo->errorno = mysql_errno(self::$dbo->db);
} else { } else {
self::$dbo->affected_rows = mysql_affected_rows($retval); self::$dbo->affected_rows = mysql_affected_rows($retval);
// Due to missing mysql_* support this here wasn't tested at all
// See here: http://php.net/manual/en/function.mysql-num-rows.php
if (self::$dbo->affected_rows <= 0) {
self::$dbo->affected_rows = mysql_num_rows($retval);
}
} }
break; break;
} }
@ -1038,7 +1044,7 @@ class dba {
$sql = "DELETE FROM `".$command['table']."` WHERE `". $sql = "DELETE FROM `".$command['table']."` WHERE `".
implode("` = ? AND `", array_keys($command['param']))."` = ?"; implode("` = ? AND `", array_keys($command['param']))."` = ?";
logger(dba::replace_parameters($sql, $command['param']), LOGGER_DATA); logger(self::replace_parameters($sql, $command['param']), LOGGER_DATA);
if (!self::e($sql, $command['param'])) { if (!self::e($sql, $command['param'])) {
if ($do_transaction) { if ($do_transaction) {
@ -1068,7 +1074,7 @@ class dba {
$sql = "DELETE FROM `".$table."` WHERE `".$field."` IN (". $sql = "DELETE FROM `".$table."` WHERE `".$field."` IN (".
substr(str_repeat("?, ", count($field_values)), 0, -2).");"; substr(str_repeat("?, ", count($field_values)), 0, -2).");";
logger(dba::replace_parameters($sql, $field_values), LOGGER_DATA); logger(self::replace_parameters($sql, $field_values), LOGGER_DATA);
if (!self::e($sql, $field_values)) { if (!self::e($sql, $field_values)) {
if ($do_transaction) { if ($do_transaction) {

View File

@ -2051,7 +2051,7 @@ function item_expire($uid, $days, $network = "", $force = false) {
$expire_photos = get_pconfig($uid, 'expire', 'photos'); $expire_photos = get_pconfig($uid, 'expire', 'photos');
$expire_photos = (($expire_photos === false) ? 0 : intval($expire_photos)); // default if not set: 0 $expire_photos = (($expire_photos === false) ? 0 : intval($expire_photos)); // default if not set: 0
logger('expire: # items=' . count($r). "; expire items: $expire_items, expire notes: $expire_notes, expire starred: $expire_starred, expire photos: $expire_photos"); logger('User '.$uid.': expire: # items=' . count($r). "; expire items: $expire_items, expire notes: $expire_notes, expire starred: $expire_starred, expire photos: $expire_photos");
foreach ($r as $item) { foreach ($r as $item) {

View File

@ -18,7 +18,7 @@ if (!file_exists("boot.php") && (sizeof($_SERVER["argv"]) != 0)) {
require_once("boot.php"); require_once("boot.php");
function poller_run($argv, $argc){ function poller_run($argv, $argc){
global $a, $db, $poller_up_start; global $a, $db, $poller_up_start, $poller_db_duration;
$poller_up_start = microtime(true); $poller_up_start = microtime(true);
@ -85,12 +85,13 @@ function poller_run($argv, $argc){
poller_run_cron(); poller_run_cron();
} }
$refetched = false;
$starttime = time(); $starttime = time();
// We fetch the next queue entry that is about to be executed // We fetch the next queue entry that is about to be executed
while ($r = poller_worker_process()) { while ($r = poller_worker_process()) {
$refetched = false;
foreach ($r AS $entry) { foreach ($r AS $entry) {
// Assure that the priority is an integer value // Assure that the priority is an integer value
$entry['priority'] = (int)$entry['priority']; $entry['priority'] = (int)$entry['priority'];
@ -100,10 +101,19 @@ function poller_run($argv, $argc){
logger('Process execution failed, quitting.', LOGGER_DEBUG); logger('Process execution failed, quitting.', LOGGER_DEBUG);
return; return;
} }
// If possible we will fetch new jobs for this worker
if (!$refetched && Lock::set('poller_worker_process', 0)) {
$stamp = (float)microtime(true);
$refetched = find_worker_processes();
$poller_db_duration += (microtime(true) - $stamp);
Lock::remove('poller_worker_process');
}
} }
// To avoid the quitting of multiple pollers only one poller at a time will execute the check // To avoid the quitting of multiple pollers only one poller at a time will execute the check
if (Lock::set('poller_worker', 0)) { if (Lock::set('poller_worker', 0)) {
$stamp = (float)microtime(true);
// Count active workers and compare them with a maximum value that depends on the load // Count active workers and compare them with a maximum value that depends on the load
if (poller_too_much_workers()) { if (poller_too_much_workers()) {
logger('Active worker limit reached, quitting.', LOGGER_DEBUG); logger('Active worker limit reached, quitting.', LOGGER_DEBUG);
@ -116,6 +126,7 @@ function poller_run($argv, $argc){
return; return;
} }
Lock::remove('poller_worker'); Lock::remove('poller_worker');
$poller_db_duration += (microtime(true) - $stamp);
} }
// Quit the poller once every 5 minutes // Quit the poller once every 5 minutes
@ -123,13 +134,6 @@ function poller_run($argv, $argc){
logger('Process lifetime reached, quitting.', LOGGER_DEBUG); logger('Process lifetime reached, quitting.', LOGGER_DEBUG);
return; return;
} }
// If possible we will fetch new jobs for this worker
if (!$refetched && Lock::set('poller_worker_process', 0)) {
$refetched = find_worker_processes();
Lock::remove('poller_worker_process');
}
} }
logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG); logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG);
} }
@ -183,7 +187,7 @@ function poller_process_with_priority_active($priority) {
* @return boolean "true" if further processing should be stopped * @return boolean "true" if further processing should be stopped
*/ */
function poller_execute($queue) { function poller_execute($queue) {
global $poller_db_duration; global $poller_db_duration, $poller_last_update;
$a = get_app(); $a = get_app();
@ -223,6 +227,21 @@ function poller_execute($queue) {
$funcname = str_replace(".php", "", basename($argv[0]))."_run"; $funcname = str_replace(".php", "", basename($argv[0]))."_run";
if (function_exists($funcname)) { if (function_exists($funcname)) {
// We constantly update the "executed" date every minute to avoid being killed too soon
if (!isset($poller_last_update)) {
$poller_last_update = strtotime($queue["executed"]);
}
$age = (time() - $poller_last_update) / 60;
$poller_last_update = time();
if ($age > 1) {
$stamp = (float)microtime(true);
dba::update('workerqueue', array('executed' => datetime_convert()), array('pid' => $mypid, 'done' => false));
$poller_db_duration += (microtime(true) - $stamp);
}
poller_exec_function($queue, $funcname, $argv); poller_exec_function($queue, $funcname, $argv);
$stamp = (float)microtime(true); $stamp = (float)microtime(true);
@ -451,46 +470,47 @@ function poller_max_connections_reached() {
* *
*/ */
function poller_kill_stale_workers() { function poller_kill_stale_workers() {
$r = q("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > '%s' AND NOT `done`", dbesc(NULL_DATE)); $entries = dba::p("SELECT `id`, `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > ? AND NOT `done` AND `pid` != 0 ORDER BY `priority`, `created`", NULL_DATE);
if (!dbm::is_result($r)) { while ($entry = dba::fetch($entries)) {
// No processing here needed if (!posix_kill($entry["pid"], 0)) {
return;
}
foreach ($r AS $pid) {
if (!posix_kill($pid["pid"], 0)) {
dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0),
array('pid' => $pid["pid"], 'done' => false)); array('id' => $entry["id"]));
} else { } else {
// Kill long running processes // Kill long running processes
// Check if the priority is in a valid range // Check if the priority is in a valid range
if (!in_array($pid["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE))) { if (!in_array($entry["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE))) {
$pid["priority"] = PRIORITY_MEDIUM; $entry["priority"] = PRIORITY_MEDIUM;
} }
// Define the maximum durations // Define the maximum durations
$max_duration_defaults = array(PRIORITY_CRITICAL => 360, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 360); $max_duration_defaults = array(PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720);
$max_duration = $max_duration_defaults[$pid["priority"]]; $max_duration = $max_duration_defaults[$entry["priority"]];
$argv = json_decode($pid["parameter"]); $argv = json_decode($entry["parameter"]);
$argv[0] = basename($argv[0]); $argv[0] = basename($argv[0]);
// How long is the process already running? // How long is the process already running?
$duration = (time() - strtotime($pid["executed"])) / 60; $duration = (time() - strtotime($entry["executed"])) / 60;
if ($duration > $max_duration) { if ($duration > $max_duration) {
logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") took more than ".$max_duration." minutes. It will be killed now."); logger("Worker process ".$entry["pid"]." (".implode(" ", $argv).") took more than ".$max_duration." minutes. It will be killed now.");
posix_kill($pid["pid"], SIGTERM); posix_kill($entry["pid"], SIGTERM);
// We killed the stale process. // We killed the stale process.
// To avoid a blocking situation we reschedule the process at the beginning of the queue. // To avoid a blocking situation we reschedule the process at the beginning of the queue.
// Additionally we are lowering the priority. // Additionally we are lowering the priority. (But not PRIORITY_CRITICAL)
if ($entry["priority"] == PRIORITY_HIGH) {
$new_priority = PRIORITY_MEDIUM;
} elseif ($entry["priority"] == PRIORITY_MEDIUM) {
$new_priority = PRIORITY_LOW;
} elseif ($entry["priority"] != PRIORITY_CRITICAL) {
$new_priority = PRIORITY_NEGLIGIBLE;
}
dba::update('workerqueue', dba::update('workerqueue',
array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => PRIORITY_NEGLIGIBLE, 'pid' => 0), array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => $new_priority, 'pid' => 0),
array('pid' => $pid["pid"], 'done' => false)); array('id' => $entry["id"]));
} else { } else {
logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG); logger("Worker process ".$entry["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG);
} }
} }
} }
@ -544,15 +564,16 @@ function poller_too_much_workers() {
} }
dba::close($entries); dba::close($entries);
$jobs_per_minute = 0; $intervals = array(1, 10, 60);
$jobs_per_minute = array();
$jobs = dba::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL 10 MINUTE"); foreach ($intervals AS $interval) {
if ($job = dba::fetch($jobs)) { $jobs = dba::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ".intval($interval)." MINUTE");
$jobs_per_minute = number_format($job['jobs'] / 10, 0); if ($job = dba::fetch($jobs)) {
$jobs_per_minute[$interval] = number_format($job['jobs'] / $interval, 0);
}
dba::close($jobs);
} }
dba::close($jobs); $processlist = ' - jpm: '.implode('/', $jobs_per_minute).' ('.implode(', ', $listitem).')';
$processlist = ' - jpm: '.$jobs_per_minute.' ('.implode(', ', $listitem).')';
} }
$entries = poller_total_entries(); $entries = poller_total_entries();
@ -573,8 +594,7 @@ function poller_too_much_workers() {
if (!Config::get("system", "worker_dont_fork") && ($queues > ($active + 1)) && ($entries > 1)) { if (!Config::get("system", "worker_dont_fork") && ($queues > ($active + 1)) && ($entries > 1)) {
logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG); logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG);
$args = array("include/poller.php", "no_cron"); $args = array("include/poller.php", "no_cron");
$a = get_app(); get_app()->proc_run($args);
$a->proc_run($args);
} }
} }
@ -649,44 +669,65 @@ function poller_passing_slow(&$highest_priority) {
* @return boolean Have we found something? * @return boolean Have we found something?
*/ */
function find_worker_processes() { function find_worker_processes() {
$mypid = getmypid();
// Check if we should pass some low priority process // Check if we should pass some low priority process
$highest_priority = 0; $highest_priority = 0;
$found = false; $found = false;
// The higher the number of parallel workers, the more we prefetch to prevent concurring access // The higher the number of parallel workers, the more we prefetch to prevent concurring access
$limit = Config::get("system", "worker_queues", 4) * 2; $limit = Config::get("system", "worker_queues", 4);
$limit = Config::get('system', 'worker_fetch_limit', $limit); $limit = Config::get('system', 'worker_fetch_limit', $limit);
if (poller_passing_slow($highest_priority)) { if (poller_passing_slow($highest_priority)) {
// Are there waiting processes with a higher priority than the currently highest? // Are there waiting processes with a higher priority than the currently highest?
$result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? $result = dba::p("SELECT `id` FROM `workerqueue`
WHERE `executed` <= ? AND `priority` < ? AND NOT `done` WHERE `executed` <= ? AND `priority` < ? AND NOT `done`
ORDER BY `priority`, `created` LIMIT ".intval($limit), ORDER BY `priority`, `created` LIMIT ".intval($limit),
datetime_convert(), getmypid(), NULL_DATE, $highest_priority); NULL_DATE, $highest_priority);
if ($result) {
$found = (dba::affected_rows() > 0); while ($id = dba::fetch($result)) {
$ids[] = $id["id"];
} }
dba::close($result);
$found = (count($ids) > 0);
if (!$found) { if (!$found) {
// Give slower processes some processing time // Give slower processes some processing time
$result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? $result = dba::p("SELECT `id` FROM `workerqueue`
WHERE `executed` <= ? AND `priority` > ? AND NOT `done` WHERE `executed` <= ? AND `priority` > ? AND NOT `done`
ORDER BY `priority`, `created` LIMIT ".intval($limit), ORDER BY `priority`, `created` LIMIT ".intval($limit),
datetime_convert(), getmypid(), NULL_DATE, $highest_priority); NULL_DATE, $highest_priority);
if ($result) {
$found = (dba::affected_rows() > 0); while ($id = dba::fetch($result)) {
$ids[] = $id["id"];
} }
dba::close($result);
$found = (count($ids) > 0);
} }
} }
// If there is no result (or we shouldn't pass lower processes) we check without priority limit // If there is no result (or we shouldn't pass lower processes) we check without priority limit
if (!$found) { if (!$found) {
$result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit), $result = dba::p("SELECT `id` FROM `workerqueue` WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit), NULL_DATE);
datetime_convert(), getmypid(), NULL_DATE);
if ($result) { while ($id = dba::fetch($result)) {
$found = (dba::affected_rows() > 0); $ids[] = $id["id"];
} }
dba::close($result);
$found = (count($ids) > 0);
} }
if ($found) {
$sql = "UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `id` IN (".substr(str_repeat("?, ", count($ids)), 0, -2).") AND `pid` = 0 AND NOT `done`;";
array_unshift($ids, datetime_convert(), $mypid);
dba::e($sql, $ids);
}
return $found; return $found;
} }
@ -778,8 +819,7 @@ function call_worker_if_idle() {
logger('Call poller', LOGGER_DEBUG); logger('Call poller', LOGGER_DEBUG);
$args = array("include/poller.php", "no_cron"); $args = array("include/poller.php", "no_cron");
$a = get_app(); get_app()->proc_run($args);
$a->proc_run($args);
return; return;
} }