Merge pull request #5926 from annando/defer-worker
Poddibility to defer worker execution
This commit is contained in:
commit
664afbcafb
2
boot.php
2
boot.php
|
@ -41,7 +41,7 @@ define('FRIENDICA_PLATFORM', 'Friendica');
|
||||||
define('FRIENDICA_CODENAME', 'The Tazmans Flax-lily');
|
define('FRIENDICA_CODENAME', 'The Tazmans Flax-lily');
|
||||||
define('FRIENDICA_VERSION', '2018.12-dev');
|
define('FRIENDICA_VERSION', '2018.12-dev');
|
||||||
define('DFRN_PROTOCOL_VERSION', '2.23');
|
define('DFRN_PROTOCOL_VERSION', '2.23');
|
||||||
define('DB_UPDATE_VERSION', 1285);
|
define('DB_UPDATE_VERSION', 1286);
|
||||||
define('NEW_UPDATE_ROUTINE_VERSION', 1170);
|
define('NEW_UPDATE_ROUTINE_VERSION', 1170);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1298,14 +1298,16 @@
|
||||||
"created": {"type": "datetime", "not null": "1", "default": "0001-01-01 00:00:00", "comment": "Creation date"},
|
"created": {"type": "datetime", "not null": "1", "default": "0001-01-01 00:00:00", "comment": "Creation date"},
|
||||||
"pid": {"type": "int unsigned", "not null": "1", "default": "0", "comment": "Process id of the worker"},
|
"pid": {"type": "int unsigned", "not null": "1", "default": "0", "comment": "Process id of the worker"},
|
||||||
"executed": {"type": "datetime", "not null": "1", "default": "0001-01-01 00:00:00", "comment": "Execution date"},
|
"executed": {"type": "datetime", "not null": "1", "default": "0001-01-01 00:00:00", "comment": "Execution date"},
|
||||||
|
"next_try": {"type": "datetime", "not null": "1", "default": "0001-01-01 00:00:00", "comment": "Next retrial date"},
|
||||||
|
"retrial": {"type": "tinyint", "not null": "1", "default": "0", "comment": "Retrial counter"},
|
||||||
"done": {"type": "boolean", "not null": "1", "default": "0", "comment": "Marked 1 when the task was done - will be deleted later"}
|
"done": {"type": "boolean", "not null": "1", "default": "0", "comment": "Marked 1 when the task was done - will be deleted later"}
|
||||||
},
|
},
|
||||||
"indexes": {
|
"indexes": {
|
||||||
"PRIMARY": ["id"],
|
"PRIMARY": ["id"],
|
||||||
"pid": ["pid"],
|
"pid": ["pid"],
|
||||||
"parameter": ["parameter(64)"],
|
"parameter": ["parameter(64)"],
|
||||||
"priority_created": ["priority", "created"],
|
"priority_created_next_try": ["priority", "created", "next_try"],
|
||||||
"done_executed": ["done", "executed"]
|
"done_executed_next_try": ["done", "executed", "next_try"]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ use Friendica\Database\DBA;
|
||||||
use Friendica\Model\Process;
|
use Friendica\Model\Process;
|
||||||
use Friendica\Util\DateTimeFormat;
|
use Friendica\Util\DateTimeFormat;
|
||||||
use Friendica\Util\Network;
|
use Friendica\Util\Network;
|
||||||
|
use Friendica\BaseObject;
|
||||||
|
|
||||||
require_once 'include/dba.php';
|
require_once 'include/dba.php';
|
||||||
|
|
||||||
|
@ -152,7 +153,8 @@ class Worker
|
||||||
*/
|
*/
|
||||||
private static function totalEntries()
|
private static function totalEntries()
|
||||||
{
|
{
|
||||||
return DBA::count('workerqueue', ["`executed` <= ? AND NOT `done`", NULL_DATE]);
|
return DBA::count('workerqueue', ["`executed` <= ? AND NOT `done` AND `next_try` < ?",
|
||||||
|
NULL_DATE, DateTimeFormat::utcNow()]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -162,7 +164,7 @@ class Worker
|
||||||
*/
|
*/
|
||||||
private static function highestPriority()
|
private static function highestPriority()
|
||||||
{
|
{
|
||||||
$condition = ["`executed` <= ? AND NOT `done`", NULL_DATE];
|
$condition = ["`executed` <= ? AND NOT `done` AND `next_try` < ?", NULL_DATE, DateTimeFormat::utcNow()];
|
||||||
$workerqueue = DBA::selectFirst('workerqueue', ['priority'], $condition, ['order' => ['priority']]);
|
$workerqueue = DBA::selectFirst('workerqueue', ['priority'], $condition, ['order' => ['priority']]);
|
||||||
if (DBA::isResult($workerqueue)) {
|
if (DBA::isResult($workerqueue)) {
|
||||||
return $workerqueue["priority"];
|
return $workerqueue["priority"];
|
||||||
|
@ -180,7 +182,8 @@ class Worker
|
||||||
*/
|
*/
|
||||||
private static function processWithPriorityActive($priority)
|
private static function processWithPriorityActive($priority)
|
||||||
{
|
{
|
||||||
$condition = ["`priority` <= ? AND `executed` > ? AND NOT `done`", $priority, NULL_DATE];
|
$condition = ["`priority` <= ? AND `executed` > ? AND NOT `done` AND `next_try` < ?",
|
||||||
|
$priority, NULL_DATE, DateTimeFormat::utcNow()];
|
||||||
return DBA::exists('workerqueue', $condition);
|
return DBA::exists('workerqueue', $condition);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -240,7 +243,7 @@ class Worker
|
||||||
self::execFunction($queue, $include, $argv, true);
|
self::execFunction($queue, $include, $argv, true);
|
||||||
|
|
||||||
$stamp = (float)microtime(true);
|
$stamp = (float)microtime(true);
|
||||||
if (DBA::update('workerqueue', ['done' => true], ['id' => $queue["id"]])) {
|
if (DBA::update('workerqueue', ['done' => true], ['id' => $queue['id']])) {
|
||||||
Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow());
|
Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow());
|
||||||
}
|
}
|
||||||
self::$db_duration = (microtime(true) - $stamp);
|
self::$db_duration = (microtime(true) - $stamp);
|
||||||
|
@ -805,7 +808,8 @@ class Worker
|
||||||
$result = DBA::select(
|
$result = DBA::select(
|
||||||
'workerqueue',
|
'workerqueue',
|
||||||
['id'],
|
['id'],
|
||||||
["`executed` <= ? AND `priority` < ? AND NOT `done`", NULL_DATE, $highest_priority],
|
["`executed` <= ? AND `priority` < ? AND NOT `done` AND `next_try` < ?",
|
||||||
|
NULL_DATE, $highest_priority, DateTimeFormat::utcNow()],
|
||||||
['limit' => $limit, 'order' => ['priority', 'created']]
|
['limit' => $limit, 'order' => ['priority', 'created']]
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -821,7 +825,8 @@ class Worker
|
||||||
$result = DBA::select(
|
$result = DBA::select(
|
||||||
'workerqueue',
|
'workerqueue',
|
||||||
['id'],
|
['id'],
|
||||||
["`executed` <= ? AND `priority` > ? AND NOT `done`", NULL_DATE, $highest_priority],
|
["`executed` <= ? AND `priority` > ? AND NOT `done` AND `next_try` < ?",
|
||||||
|
NULL_DATE, $highest_priority, DateTimeFormat::utcNow()],
|
||||||
['limit' => $limit, 'order' => ['priority', 'created']]
|
['limit' => $limit, 'order' => ['priority', 'created']]
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -840,7 +845,8 @@ class Worker
|
||||||
$result = DBA::select(
|
$result = DBA::select(
|
||||||
'workerqueue',
|
'workerqueue',
|
||||||
['id'],
|
['id'],
|
||||||
["`executed` <= ? AND NOT `done`", NULL_DATE],
|
["`executed` <= ? AND NOT `done` AND `next_try` < ?",
|
||||||
|
NULL_DATE, DateTimeFormat::utcNow()],
|
||||||
['limit' => $limit, 'order' => ['priority', 'created']]
|
['limit' => $limit, 'order' => ['priority', 'created']]
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -1116,6 +1122,35 @@ class Worker
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Defers the current worker entry
|
||||||
|
*/
|
||||||
|
public static function defer()
|
||||||
|
{
|
||||||
|
if (empty(BaseObject::getApp()->queue)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$queue = BaseObject::getApp()->queue;
|
||||||
|
|
||||||
|
$retrial = $queue['retrial'];
|
||||||
|
$id = $queue['id'];
|
||||||
|
|
||||||
|
if ($retrial > 14) {
|
||||||
|
logger('Id ' . $id . ' had been tried 14 times, it will be deleted now.', LOGGER_DEBUG);
|
||||||
|
DBA::delete('workerqueue', ['id' => $id]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate the delay until the next trial
|
||||||
|
$delay = (($retrial + 3) ** 4) + (rand(1, 30) * ($retrial + 1));
|
||||||
|
$next = DateTimeFormat::utc('now + ' . $delay . ' seconds');
|
||||||
|
|
||||||
|
logger('Defer execution ' . $retrial . ' of id ' . $id . ' to ' . $next, LOGGER_DEBUG);
|
||||||
|
|
||||||
|
$fields = ['retrial' => $retrial + 1, 'next_try' => $next, 'executed' => NULL_DATE, 'pid' => 0];
|
||||||
|
DBA::update('workerqueue', $fields, ['id' => $id]);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Log active processes into the "process" table
|
* Log active processes into the "process" table
|
||||||
*
|
*
|
||||||
|
|
Loading…
Reference in New Issue
Block a user