2017-11-05 05:33:46 -05:00
< ? php
2017-11-19 17:04:40 -05:00
/**
2022-01-02 02:27:47 -05:00
* @ copyright Copyright ( C ) 2010 - 2022 , the Friendica project
2020-02-09 09:45:36 -05:00
*
* @ license GNU AGPL version 3 or any later version
*
* This program is free software : you can redistribute it and / or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation , either version 3 of the
* License , or ( at your option ) any later version .
*
* This program is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
* GNU Affero General Public License for more details .
*
* You should have received a copy of the GNU Affero General Public License
* along with this program . If not , see < https :// www . gnu . org / licenses />.
*
2017-11-19 17:04:40 -05:00
*/
2020-02-09 09:45:36 -05:00
2017-11-05 05:33:46 -05:00
namespace Friendica\Core ;
2021-01-01 14:35:29 -05:00
use Friendica\App\Mode ;
2019-08-16 05:06:37 -04:00
use Friendica\Core ;
2021-10-24 14:43:59 -04:00
use Friendica\Core\Worker\Entity\Process ;
2018-07-20 08:19:26 -04:00
use Friendica\Database\DBA ;
2019-12-15 16:34:11 -05:00
use Friendica\DI ;
2018-01-26 21:38:34 -05:00
use Friendica\Util\DateTimeFormat ;
2017-11-05 05:33:46 -05:00
/**
2020-01-19 01:05:23 -05:00
* Contains the class for the worker background job processing
2017-11-05 05:33:46 -05:00
*/
2017-11-19 17:04:40 -05:00
class Worker
{
2019-02-27 04:49:26 -05:00
const STATE_STARTUP = 1 ; // Worker is in startup. This takes most time.
2019-03-08 15:39:58 -05:00
const STATE_LONG_LOOP = 2 ; // Worker is processing the whole - long - loop.
2019-02-27 04:49:26 -05:00
const STATE_REFETCH = 3 ; // Worker had refetched jobs in the execution loop.
2019-03-08 15:39:58 -05:00
const STATE_SHORT_LOOP = 4 ; // Worker is processing preassigned jobs, thus saving much time.
2019-02-27 01:55:04 -05:00
2020-10-24 04:05:03 -04:00
const FAST_COMMANDS = [ 'APDelivery' , 'Delivery' ];
2019-03-23 02:08:18 -04:00
2020-08-06 14:53:45 -04:00
const LOCK_PROCESS = 'worker_process' ;
const LOCK_WORKER = 'worker' ;
2019-03-23 02:08:18 -04:00
2017-11-05 05:33:46 -05:00
private static $up_start ;
2019-02-09 18:10:15 -05:00
private static $db_duration = 0 ;
private static $db_duration_count = 0 ;
private static $db_duration_write = 0 ;
private static $db_duration_stat = 0 ;
private static $lock_duration = 0 ;
2017-11-05 05:33:46 -05:00
private static $last_update ;
2019-02-27 01:55:04 -05:00
private static $state ;
2021-01-02 03:43:55 -05:00
private static $daemon_mode = null ;
2021-11-01 02:53:34 -04:00
/** @var Process */
2021-10-24 14:43:59 -04:00
private static $process ;
2017-11-05 05:33:46 -05:00
/**
2020-01-19 01:05:23 -05:00
* Processes the tasks that are in the workerqueue table
2017-11-05 05:33:46 -05:00
*
* @ param boolean $run_cron Should the cron processes be executed ?
2021-10-24 14:43:59 -04:00
* @ param Process $process The current running process
2017-11-19 17:04:40 -05:00
* @ return void
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 05:33:46 -05:00
*/
2021-10-24 14:43:59 -04:00
public static function processQueue ( $run_cron , Process $process )
2017-11-19 17:04:40 -05:00
{
2017-11-05 05:33:46 -05:00
self :: $up_start = microtime ( true );
2017-11-05 10:28:55 -05:00
// At first check the maximum load. We shouldn't continue with a high load
2021-10-24 14:43:59 -04:00
if ( DI :: system () -> isMaxLoadReached ()) {
2020-09-17 13:57:41 -04:00
Logger :: notice ( 'Pre check: maximum load reached, quitting.' );
2017-11-05 10:28:55 -05:00
return ;
}
// We now start the process. This is done after the load check since this could increase the load.
2021-10-24 14:43:59 -04:00
self :: $process = $process ;
2017-11-05 10:28:55 -05:00
2017-11-05 05:33:46 -05:00
// Kill stale processes every 5 minutes
2020-01-19 15:21:13 -05:00
$last_cleanup = DI :: config () -> get ( 'system' , 'worker_last_cleaned' , 0 );
2017-11-05 05:33:46 -05:00
if ( time () > ( $last_cleanup + 300 )) {
2020-01-19 15:21:53 -05:00
DI :: config () -> set ( 'system' , 'worker_last_cleaned' , time ());
2017-11-05 05:33:46 -05:00
self :: killStaleWorkers ();
}
2020-08-19 14:21:40 -04:00
// Check if the system is ready
if ( ! self :: isReady ()) {
2017-11-05 05:33:46 -05:00
return ;
}
// Now we start additional cron processes if we should do so
if ( $run_cron ) {
self :: runCron ();
}
2020-08-29 09:01:58 -04:00
$last_check = $starttime = time ();
2019-02-27 01:55:04 -05:00
self :: $state = self :: STATE_STARTUP ;
2017-11-05 05:33:46 -05:00
// We fetch the next queue entry that is about to be executed
2019-02-17 13:55:17 -05:00
while ( $r = self :: workerProcess ()) {
2021-01-05 05:18:25 -05:00
if ( self :: IPCJobsExists ( getmypid ())) {
2021-01-05 11:01:05 -05:00
self :: IPCDeleteJobState ( getmypid ());
2021-01-05 05:18:25 -05:00
}
2021-01-05 11:01:05 -05:00
2020-08-29 07:26:40 -04:00
// Don't refetch when a worker fetches tasks for multiple workers
$refetched = DI :: config () -> get ( 'system' , 'worker_multiple_fetch' );
2017-11-19 17:04:40 -05:00
foreach ( $r as $entry ) {
2017-11-05 05:33:46 -05:00
// Assure that the priority is an integer value
$entry [ 'priority' ] = ( int ) $entry [ 'priority' ];
// The work will be done
if ( ! self :: execute ( $entry )) {
2020-12-31 15:14:13 -05:00
Logger :: notice ( 'Process execution failed, quitting.' );
2017-11-05 05:33:46 -05:00
return ;
}
2019-03-08 15:39:58 -05:00
// Trying to fetch new processes - but only once when successful
2020-08-06 14:53:45 -04:00
if ( ! $refetched && DI :: lock () -> acquire ( self :: LOCK_PROCESS , 0 )) {
2019-02-17 13:55:17 -05:00
self :: findWorkerProcesses ();
2020-08-06 14:53:45 -04:00
DI :: lock () -> release ( self :: LOCK_PROCESS );
2019-02-27 01:55:04 -05:00
self :: $state = self :: STATE_REFETCH ;
2019-03-08 15:39:58 -05:00
$refetched = true ;
} else {
self :: $state = self :: STATE_SHORT_LOOP ;
2017-11-05 05:33:46 -05:00
}
}
2019-03-08 15:39:58 -05:00
// To avoid the quitting of multiple workers only one worker at a time will execute the check
2020-08-29 09:01:58 -04:00
if (( time () > $last_check + 5 ) && ! self :: getWaitingJobForPID ()) {
2019-02-27 01:55:04 -05:00
self :: $state = self :: STATE_LONG_LOOP ;
2019-02-27 01:36:19 -05:00
2020-08-06 14:53:45 -04:00
if ( DI :: lock () -> acquire ( self :: LOCK_WORKER , 0 )) {
2017-11-05 05:33:46 -05:00
// Count active workers and compare them with a maximum value that depends on the load
2019-03-08 15:39:58 -05:00
if ( self :: tooMuchWorkers ()) {
2020-12-31 15:14:13 -05:00
Logger :: notice ( 'Active worker limit reached, quitting.' );
2020-08-06 14:53:45 -04:00
DI :: lock () -> release ( self :: LOCK_WORKER );
2019-03-08 15:39:58 -05:00
return ;
}
// Check free memory
2021-10-24 14:43:59 -04:00
if ( DI :: system () -> isMinMemoryReached ()) {
2020-12-31 15:14:13 -05:00
Logger :: warning ( 'Memory limit reached, quitting.' );
2020-08-06 14:53:45 -04:00
DI :: lock () -> release ( self :: LOCK_WORKER );
2019-03-08 15:39:58 -05:00
return ;
}
2020-08-06 14:53:45 -04:00
DI :: lock () -> release ( self :: LOCK_WORKER );
2017-11-05 05:33:46 -05:00
}
2020-08-29 09:01:58 -04:00
$last_check = time ();
2017-11-05 05:33:46 -05:00
}
2019-02-27 02:08:44 -05:00
// Quit the worker once every cron interval
2020-01-19 15:21:13 -05:00
if ( time () > ( $starttime + ( DI :: config () -> get ( 'system' , 'cron_interval' ) * 60 ))) {
2019-02-27 04:49:26 -05:00
Logger :: info ( 'Process lifetime reached, respawning.' );
2021-10-24 14:43:59 -04:00
self :: unclaimProcess ( $process );
2021-01-05 05:18:25 -05:00
if ( self :: isDaemonMode ()) {
self :: IPCSetJobState ( true );
} else {
self :: spawnWorker ();
2021-01-04 04:20:44 -05:00
}
2019-02-27 02:08:44 -05:00
return ;
}
2017-11-05 05:33:46 -05:00
}
2018-06-15 14:18:20 -04:00
// Cleaning up. Possibly not needed, but it doesn't harm anything.
2021-01-02 03:43:55 -05:00
if ( self :: isDaemonMode ()) {
2018-06-01 18:09:27 -04:00
self :: IPCSetJobState ( false );
}
2020-06-01 09:51:58 -04:00
Logger :: info ( " Couldn't select a workerqueue entry, quitting process " , [ 'pid' => getmypid ()]);
2017-11-05 05:33:46 -05:00
}
2020-08-19 14:21:40 -04:00
/**
* Checks if the system is ready .
*
* Several system parameters like memory , connections and processes are checked .
*
* @ return boolean
*/
public static function isReady ()
{
// Count active workers and compare them with a maximum value that depends on the load
if ( self :: tooMuchWorkers ()) {
2020-12-31 15:14:13 -05:00
Logger :: notice ( 'Active worker limit reached, quitting.' );
2020-08-19 14:21:40 -04:00
return false ;
}
// Do we have too few memory?
2021-10-24 14:43:59 -04:00
if ( DI :: system () -> isMinMemoryReached ()) {
2020-12-31 15:14:13 -05:00
Logger :: warning ( 'Memory limit reached, quitting.' );
2020-08-19 14:21:40 -04:00
return false ;
}
// Possibly there are too much database connections
if ( self :: maxConnectionsReached ()) {
2020-12-31 15:14:13 -05:00
Logger :: warning ( 'Maximum connections reached, quitting.' );
2020-08-19 14:21:40 -04:00
return false ;
}
// Possibly there are too much database processes that block the system
2021-10-24 14:43:59 -04:00
if ( DI :: system () -> isMaxProcessesReached ()) {
2020-12-31 15:14:13 -05:00
Logger :: warning ( 'Maximum processes reached, quitting.' );
2020-08-19 14:21:40 -04:00
return false ;
}
2021-01-02 03:43:55 -05:00
2020-08-19 14:21:40 -04:00
return true ;
}
2019-02-09 18:10:15 -05:00
/**
2020-01-19 01:05:23 -05:00
* Check if non executed tasks do exist in the worker queue
2019-02-09 18:10:15 -05:00
*
* @ return boolean Returns " true " if tasks are existing
* @ throws \Exception
*/
2020-08-19 14:21:40 -04:00
public static function entriesExists ()
2019-02-09 18:10:15 -05:00
{
$stamp = ( float ) microtime ( true );
$exists = DBA :: exists ( 'workerqueue' , [ " NOT `done` AND `pid` = 0 AND `next_try` < ? " , DateTimeFormat :: utcNow ()]);
self :: $db_duration += ( microtime ( true ) - $stamp );
return $exists ;
}
2018-10-23 16:38:28 -04:00
/**
2020-01-19 01:05:23 -05:00
* Returns the number of deferred entries in the worker queue
2018-10-23 16:38:28 -04:00
*
* @ return integer Number of deferred entries in the worker queue
2019-01-06 16:06:53 -05:00
* @ throws \Exception
2018-10-23 16:38:28 -04:00
*/
private static function deferredEntries ()
{
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2019-08-13 00:43:08 -04:00
$count = DBA :: count ( 'workerqueue' , [ " NOT `done` AND `pid` = 0 AND `retrial` > ? " , 0 ]);
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_count += ( microtime ( true ) - $stamp );
return $count ;
2018-10-23 16:38:28 -04:00
}
2017-11-05 05:33:46 -05:00
/**
2020-01-19 01:05:23 -05:00
* Returns the number of non executed entries in the worker queue
2017-11-05 05:33:46 -05:00
*
* @ return integer Number of non executed entries in the worker queue
2019-01-06 16:06:53 -05:00
* @ throws \Exception
2017-11-05 05:33:46 -05:00
*/
2017-11-19 17:04:40 -05:00
private static function totalEntries ()
{
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
$count = DBA :: count ( 'workerqueue' , [ 'done' => false , 'pid' => 0 ]);
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_count += ( microtime ( true ) - $stamp );
return $count ;
2017-11-05 05:33:46 -05:00
}
/**
2020-01-19 01:05:23 -05:00
* Returns the highest priority in the worker queue that isn ' t executed
2017-11-05 05:33:46 -05:00
*
2017-11-19 16:47:21 -05:00
* @ return integer Number of active worker processes
2019-01-06 16:06:53 -05:00
* @ throws \Exception
2017-11-05 05:33:46 -05:00
*/
2017-11-19 17:04:40 -05:00
private static function highestPriority ()
{
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2019-02-08 16:48:41 -05:00
$condition = [ " `pid` = 0 AND NOT `done` AND `next_try` < ? " , DateTimeFormat :: utcNow ()];
2018-07-20 08:19:26 -04:00
$workerqueue = DBA :: selectFirst ( 'workerqueue' , [ 'priority' ], $condition , [ 'order' => [ 'priority' ]]);
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2018-07-21 08:46:04 -04:00
if ( DBA :: isResult ( $workerqueue )) {
2018-01-11 03:26:30 -05:00
return $workerqueue [ " priority " ];
2017-11-05 05:33:46 -05:00
} else {
return 0 ;
}
}
/**
2020-01-19 01:05:23 -05:00
* Returns if a process with the given priority is running
2017-11-05 05:33:46 -05:00
*
* @ param integer $priority The priority that should be checked
*
* @ return integer Is there a process running with that priority ?
2019-01-06 16:06:53 -05:00
* @ throws \Exception
2017-11-05 05:33:46 -05:00
*/
2017-11-19 17:04:40 -05:00
private static function processWithPriorityActive ( $priority )
{
2019-02-08 16:48:41 -05:00
$condition = [ " `priority` <= ? AND `pid` != 0 AND NOT `done` " , $priority ];
2018-07-20 08:19:26 -04:00
return DBA :: exists ( 'workerqueue' , $condition );
2017-11-05 05:33:46 -05:00
}
2021-11-04 16:29:59 -04:00
/**
* Checks if the given file is valid to be included
*
* @ param mixed $file
* @ return bool
*/
private static function validateInclude ( & $file )
{
$orig_file = $file ;
$file = realpath ( $file );
if ( strpos ( $file , getcwd ()) !== 0 ) {
return false ;
}
$file = str_replace ( getcwd () . " / " , " " , $file , $count );
if ( $count != 1 ) {
return false ;
}
if ( $orig_file !== $file ) {
return false ;
}
$valid = false ;
if ( strpos ( $file , " include/ " ) === 0 ) {
$valid = true ;
}
if ( strpos ( $file , " addon/ " ) === 0 ) {
$valid = true ;
}
// Simply return flag
return $valid ;
}
2017-11-05 05:33:46 -05:00
/**
2020-01-19 01:05:23 -05:00
* Execute a worker entry
2017-11-05 05:33:46 -05:00
*
* @ param array $queue Workerqueue entry
*
* @ return boolean " true " if further processing should be stopped
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 05:33:46 -05:00
*/
2017-11-19 17:04:40 -05:00
public static function execute ( $queue )
{
2017-11-05 05:33:46 -05:00
$mypid = getmypid ();
// Quit when in maintenance
2020-01-19 15:21:13 -05:00
if ( DI :: config () -> get ( 'system' , 'maintenance' , false , true )) {
2020-09-17 13:57:41 -04:00
Logger :: notice ( " Maintenance mode - quit process " , [ 'pid' => $mypid ]);
2017-11-05 05:33:46 -05:00
return false ;
}
// Constantly check the number of parallel database processes
2021-10-24 14:43:59 -04:00
if ( DI :: system () -> isMaxProcessesReached ()) {
2020-12-31 15:14:13 -05:00
Logger :: warning ( " Max processes reached for process " , [ 'pid' => $mypid ]);
2017-11-05 05:33:46 -05:00
return false ;
}
// Constantly check the number of available database connections to let the frontend be accessible at any time
if ( self :: maxConnectionsReached ()) {
2020-12-31 15:14:13 -05:00
Logger :: warning ( " Max connection reached for process " , [ 'pid' => $mypid ]);
2017-11-05 05:33:46 -05:00
return false ;
}
2020-12-03 10:47:50 -05:00
$argv = json_decode ( $queue [ 'parameter' ], true );
2021-03-24 15:52:53 -04:00
if ( ! is_array ( $argv )) {
$argv = [];
}
2020-12-03 10:47:50 -05:00
if ( ! empty ( $queue [ 'command' ])) {
array_unshift ( $argv , $queue [ 'command' ]);
}
2020-06-01 09:51:58 -04:00
if ( empty ( $argv )) {
2020-09-17 13:57:41 -04:00
Logger :: warning ( 'Parameter is empty' , [ 'queue' => $queue ]);
2020-06-01 09:51:58 -04:00
return false ;
}
2017-11-05 05:33:46 -05:00
// Check for existance and validity of the include file
$include = $argv [ 0 ];
2017-11-12 02:21:23 -05:00
if ( method_exists ( sprintf ( 'Friendica\Worker\%s' , $include ), 'execute' )) {
// We constantly update the "executed" date every minute to avoid being killed too soon
if ( ! isset ( self :: $last_update )) {
self :: $last_update = strtotime ( $queue [ " executed " ]);
}
$age = ( time () - self :: $last_update ) / 60 ;
self :: $last_update = time ();
if ( $age > 1 ) {
$stamp = ( float ) microtime ( true );
2018-07-20 08:19:26 -04:00
DBA :: update ( 'workerqueue' , [ 'executed' => DateTimeFormat :: utcNow ()], [ 'pid' => $mypid , 'done' => false ]);
2017-11-12 02:21:23 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2019-02-09 18:10:15 -05:00
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-12 02:21:23 -05:00
}
array_shift ( $argv );
self :: execFunction ( $queue , $include , $argv , true );
$stamp = ( float ) microtime ( true );
2018-10-22 23:54:18 -04:00
$condition = [ " `id` = ? AND `next_try` < ? " , $queue [ 'id' ], DateTimeFormat :: utcNow ()];
if ( DBA :: update ( 'workerqueue' , [ 'done' => true ], $condition )) {
2020-01-19 15:21:53 -05:00
DI :: config () -> set ( 'system' , 'last_worker_execution' , DateTimeFormat :: utcNow ());
2017-11-12 02:21:23 -05:00
}
self :: $db_duration = ( microtime ( true ) - $stamp );
2019-02-09 18:10:15 -05:00
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-12 02:21:23 -05:00
return true ;
}
2017-11-05 05:33:46 -05:00
// The script could be provided as full path or only with the function name
if ( $include == basename ( $include )) {
2017-11-14 17:13:33 -05:00
$include = " include/ " . $include . " .php " ;
2017-11-05 05:33:46 -05:00
}
2021-11-04 16:29:59 -04:00
if ( ! self :: validateInclude ( $include )) {
2020-09-17 13:57:41 -04:00
Logger :: warning ( " Include file is not valid " , [ 'file' => $argv [ 0 ]]);
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2018-07-20 08:19:26 -04:00
DBA :: delete ( 'workerqueue' , [ 'id' => $queue [ " id " ]]);
2019-02-09 18:10:15 -05:00
self :: $db_duration = ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 05:33:46 -05:00
return true ;
}
2017-11-19 17:04:40 -05:00
require_once $include ;
2017-11-05 05:33:46 -05:00
$funcname = str_replace ( " .php " , " " , basename ( $argv [ 0 ])) . " _run " ;
if ( function_exists ( $funcname )) {
// We constantly update the "executed" date every minute to avoid being killed too soon
if ( ! isset ( self :: $last_update )) {
self :: $last_update = strtotime ( $queue [ " executed " ]);
}
$age = ( time () - self :: $last_update ) / 60 ;
self :: $last_update = time ();
if ( $age > 1 ) {
$stamp = ( float ) microtime ( true );
2018-07-20 08:19:26 -04:00
DBA :: update ( 'workerqueue' , [ 'executed' => DateTimeFormat :: utcNow ()], [ 'pid' => $mypid , 'done' => false ]);
2017-11-05 05:33:46 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2019-02-09 18:10:15 -05:00
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 05:33:46 -05:00
}
2017-11-12 02:21:23 -05:00
self :: execFunction ( $queue , $funcname , $argv , false );
2017-11-05 05:33:46 -05:00
$stamp = ( float ) microtime ( true );
2018-07-20 08:19:26 -04:00
if ( DBA :: update ( 'workerqueue' , [ 'done' => true ], [ 'id' => $queue [ " id " ]])) {
2020-01-19 15:21:53 -05:00
DI :: config () -> set ( 'system' , 'last_worker_execution' , DateTimeFormat :: utcNow ());
2017-11-05 05:33:46 -05:00
}
self :: $db_duration = ( microtime ( true ) - $stamp );
2019-02-09 18:10:15 -05:00
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 05:33:46 -05:00
} else {
2020-09-17 13:57:41 -04:00
Logger :: warning ( " Function does not exist " , [ 'function' => $funcname ]);
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2018-07-20 08:19:26 -04:00
DBA :: delete ( 'workerqueue' , [ 'id' => $queue [ " id " ]]);
2019-02-09 18:10:15 -05:00
self :: $db_duration = ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 05:33:46 -05:00
}
return true ;
}
/**
2020-01-19 01:05:23 -05:00
* Execute a function from the queue
2017-11-05 05:33:46 -05:00
*
2017-11-19 17:04:40 -05:00
* @ param array $queue Workerqueue entry
* @ param string $funcname name of the function
* @ param array $argv Array of values to be passed to the function
* @ param boolean $method_call boolean
* @ return void
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 05:33:46 -05:00
*/
2017-11-19 17:04:40 -05:00
private static function execFunction ( $queue , $funcname , $argv , $method_call )
{
2020-01-04 17:42:01 -05:00
$a = DI :: app ();
2017-11-05 05:33:46 -05:00
2021-01-02 14:33:50 -05:00
$cooldown = DI :: config () -> get ( " system " , " worker_cooldown " , 0 );
if ( $cooldown > 0 ) {
Logger :: info ( 'Pre execution cooldown.' , [ 'priority' => $queue [ " priority " ], 'id' => $queue [ " id " ], 'cooldown' => $cooldown ]);
sleep ( $cooldown );
}
2019-07-21 14:24:16 -04:00
Logger :: enableWorker ( $funcname );
2017-11-05 05:33:46 -05:00
2019-07-21 14:24:16 -04:00
Logger :: info ( " Process start. " , [ 'priority' => $queue [ " priority " ], 'id' => $queue [ " id " ]]);
2017-11-05 05:33:46 -05:00
$stamp = ( float ) microtime ( true );
// We use the callstack here to analyze the performance of executed worker entries.
// For this reason the variables have to be initialized.
2019-12-15 17:50:35 -05:00
DI :: profiler () -> reset ();
2017-11-05 05:33:46 -05:00
2020-12-08 16:58:32 -05:00
if ( ! in_array ( $queue [ 'priority' ], PRIORITIES )) {
2020-12-09 00:58:19 -05:00
Logger :: warning ( 'Invalid priority' , [ 'queue' => $queue , 'callstack' => System :: callstack ( 20 )]);
2020-12-08 16:58:32 -05:00
$queue [ 'priority' ] = PRIORITY_MEDIUM ;
}
2021-07-24 18:08:33 -04:00
$a -> setQueue ( $queue );
2017-11-05 05:33:46 -05:00
2019-02-09 18:10:15 -05:00
$up_duration = microtime ( true ) - self :: $up_start ;
2017-11-05 05:33:46 -05:00
// Reset global data to avoid interferences
unset ( $_SESSION );
2019-02-22 14:41:13 -05:00
// Set the workerLogger as new default logger
2017-11-12 02:21:23 -05:00
if ( $method_call ) {
call_user_func_array ( sprintf ( 'Friendica\Worker\%s::execute' , $funcname ), $argv );
} else {
2020-06-01 09:51:58 -04:00
$funcname ( $argv , count ( $argv ));
2017-11-12 02:21:23 -05:00
}
2019-07-21 14:24:16 -04:00
Logger :: disableWorker ();
2017-11-05 05:33:46 -05:00
2021-07-24 18:08:33 -04:00
$a -> setQueue ([]);
2017-11-05 05:33:46 -05:00
2018-07-15 14:36:20 -04:00
$duration = ( microtime ( true ) - $stamp );
2017-11-05 05:33:46 -05:00
/* With these values we can analyze how effective the worker is .
* The database and rest time should be low since this is the unproductive time .
* The execution time is the productive time .
* By changing parameters like the maximum number of workers we can check the effectivness .
*/
2019-02-27 01:36:19 -05:00
$dbtotal = round ( self :: $db_duration , 2 );
$dbread = round ( self :: $db_duration - ( self :: $db_duration_count + self :: $db_duration_write + self :: $db_duration_stat ), 2 );
$dbcount = round ( self :: $db_duration_count , 2 );
$dbstat = round ( self :: $db_duration_stat , 2 );
$dbwrite = round ( self :: $db_duration_write , 2 );
$dblock = round ( self :: $lock_duration , 2 );
$rest = round ( max ( 0 , $up_duration - ( self :: $db_duration + self :: $lock_duration )), 2 );
$exec = round ( $duration , 2 );
2019-02-21 14:32:31 -05:00
2019-07-21 14:24:16 -04:00
Logger :: info ( 'Performance:' , [ 'state' => self :: $state , 'count' => $dbcount , 'stat' => $dbstat , 'write' => $dbwrite , 'lock' => $dblock , 'total' => $dbtotal , 'rest' => $rest , 'exec' => $exec ]);
2017-11-19 17:04:40 -05:00
2019-02-09 18:10:15 -05:00
self :: $up_start = microtime ( true );
self :: $db_duration = 0 ;
self :: $db_duration_count = 0 ;
self :: $db_duration_stat = 0 ;
self :: $db_duration_write = 0 ;
2017-11-05 05:33:46 -05:00
self :: $lock_duration = 0 ;
if ( $duration > 3600 ) {
2019-07-21 14:24:16 -04:00
Logger :: info ( 'Longer than 1 hour.' , [ 'priority' => $queue [ " priority " ], 'id' => $queue [ " id " ], 'duration' => round ( $duration / 60 , 3 )]);
2017-11-05 05:33:46 -05:00
} elseif ( $duration > 600 ) {
2019-07-21 14:24:16 -04:00
Logger :: info ( 'Longer than 10 minutes.' , [ 'priority' => $queue [ " priority " ], 'id' => $queue [ " id " ], 'duration' => round ( $duration / 60 , 3 )]);
2017-11-05 05:33:46 -05:00
} elseif ( $duration > 300 ) {
2019-07-21 14:24:16 -04:00
Logger :: info ( 'Longer than 5 minutes.' , [ 'priority' => $queue [ " priority " ], 'id' => $queue [ " id " ], 'duration' => round ( $duration / 60 , 3 )]);
2017-11-05 05:33:46 -05:00
} elseif ( $duration > 120 ) {
2019-07-21 14:24:16 -04:00
Logger :: info ( 'Longer than 2 minutes.' , [ 'priority' => $queue [ " priority " ], 'id' => $queue [ " id " ], 'duration' => round ( $duration / 60 , 3 )]);
2017-11-05 05:33:46 -05:00
}
2019-07-21 14:24:16 -04:00
Logger :: info ( 'Process done.' , [ 'priority' => $queue [ " priority " ], 'id' => $queue [ " id " ], 'duration' => round ( $duration , 3 )]);
2017-11-05 05:33:46 -05:00
2019-12-15 17:50:35 -05:00
DI :: profiler () -> saveLog ( DI :: logger (), " ID " . $queue [ " id " ] . " : " . $funcname );
2017-11-05 05:33:46 -05:00
if ( $cooldown > 0 ) {
2021-01-02 14:33:50 -05:00
Logger :: info ( 'Post execution cooldown.' , [ 'priority' => $queue [ " priority " ], 'id' => $queue [ " id " ], 'cooldown' => $cooldown ]);
2017-11-05 05:33:46 -05:00
sleep ( $cooldown );
}
}
/**
2020-01-19 01:05:23 -05:00
* Checks if the number of database connections has reached a critical limit .
2017-11-05 05:33:46 -05:00
*
* @ return bool Are more than 3 / 4 of the maximum connections used ?
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 05:33:46 -05:00
*/
2017-11-19 17:04:40 -05:00
private static function maxConnectionsReached ()
{
2017-11-05 05:33:46 -05:00
// Fetch the max value from the config. This is needed when the system cannot detect the correct value by itself.
2020-01-19 15:21:13 -05:00
$max = DI :: config () -> get ( " system " , " max_connections " );
2017-11-05 05:33:46 -05:00
2017-11-19 16:47:21 -05:00
// Fetch the percentage level where the worker will get active
2020-01-19 15:21:13 -05:00
$maxlevel = DI :: config () -> get ( " system " , " max_connections_level " , 75 );
2017-11-05 05:33:46 -05:00
if ( $max == 0 ) {
// the maximum number of possible user connections can be a system variable
2018-07-20 22:01:53 -04:00
$r = DBA :: fetchFirst ( " SHOW VARIABLES WHERE `variable_name` = 'max_user_connections' " );
2018-07-21 08:46:04 -04:00
if ( DBA :: isResult ( $r )) {
2017-11-05 12:13:37 -05:00
$max = $r [ " Value " ];
2017-11-05 05:33:46 -05:00
}
// Or it can be granted. This overrides the system variable
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2018-07-20 08:19:26 -04:00
$r = DBA :: p ( 'SHOW GRANTS' );
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2018-07-20 08:19:26 -04:00
while ( $grants = DBA :: fetch ( $r )) {
2017-11-05 12:13:37 -05:00
$grant = array_pop ( $grants );
if ( stristr ( $grant , " GRANT USAGE ON " )) {
if ( preg_match ( " /WITH MAX_USER_CONNECTIONS ( \ d*)/ " , $grant , $match )) {
$max = $match [ 1 ];
2017-11-05 05:33:46 -05:00
}
}
}
2018-07-20 08:19:26 -04:00
DBA :: close ( $r );
2017-11-05 05:33:46 -05:00
}
// If $max is set we will use the processlist to determine the current number of connections
// The processlist only shows entries of the current user
if ( $max != 0 ) {
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2018-07-20 08:19:26 -04:00
$r = DBA :: p ( 'SHOW PROCESSLIST' );
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2018-07-20 22:05:12 -04:00
$used = DBA :: numRows ( $r );
2018-07-20 08:19:26 -04:00
DBA :: close ( $r );
2017-11-05 05:33:46 -05:00
2020-06-01 09:51:58 -04:00
Logger :: info ( " Connection usage (user values) " , [ 'usage' => $used , 'max' => $max ]);
2017-11-05 05:33:46 -05:00
$level = ( $used / $max ) * 100 ;
if ( $level >= $maxlevel ) {
2020-12-31 15:14:13 -05:00
Logger :: warning ( " Maximum level ( " . $maxlevel . " %) of user connections reached: " . $used . " / " . $max );
2017-11-05 05:33:46 -05:00
return true ;
}
}
// We will now check for the system values.
// This limit could be reached although the user limits are fine.
2018-07-20 22:01:53 -04:00
$r = DBA :: fetchFirst ( " SHOW VARIABLES WHERE `variable_name` = 'max_connections' " );
2018-07-21 08:46:04 -04:00
if ( ! DBA :: isResult ( $r )) {
2017-11-05 05:33:46 -05:00
return false ;
}
2017-11-05 12:13:37 -05:00
$max = intval ( $r [ " Value " ]);
2017-11-05 05:33:46 -05:00
if ( $max == 0 ) {
return false ;
}
2018-07-20 22:01:53 -04:00
$r = DBA :: fetchFirst ( " SHOW STATUS WHERE `variable_name` = 'Threads_connected' " );
2018-07-21 08:46:04 -04:00
if ( ! DBA :: isResult ( $r )) {
2017-11-05 05:33:46 -05:00
return false ;
}
2017-11-05 12:13:37 -05:00
$used = intval ( $r [ " Value " ]);
2017-11-05 05:33:46 -05:00
if ( $used == 0 ) {
return false ;
}
2020-06-01 09:51:58 -04:00
Logger :: info ( " Connection usage (system values) " , [ 'used' => $used , 'max' => $max ]);
2017-11-05 05:33:46 -05:00
$level = $used / $max * 100 ;
if ( $level < $maxlevel ) {
return false ;
}
2020-12-31 15:14:13 -05:00
Logger :: warning ( " Maximum level ( " . $level . " %) of system connections reached: " . $used . " / " . $max );
2017-11-05 05:33:46 -05:00
return true ;
}
/**
2020-01-19 01:05:23 -05:00
* fix the queue entry if the worker process died
2020-01-19 04:51:37 -05:00
*
2017-11-19 17:04:40 -05:00
* @ return void
2019-01-06 16:06:53 -05:00
* @ throws \Exception
2017-11-05 05:33:46 -05:00
*/
2017-11-19 17:04:40 -05:00
private static function killStaleWorkers ()
{
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2018-07-20 08:19:26 -04:00
$entries = DBA :: select (
2017-11-19 17:04:40 -05:00
'workerqueue' ,
2020-12-03 10:47:50 -05:00
[ 'id' , 'pid' , 'executed' , 'priority' , 'command' , 'parameter' ],
2019-02-08 16:48:41 -05:00
[ 'NOT `done` AND `pid` != 0' ],
2020-10-24 04:05:03 -04:00
[ 'order' => [ 'priority' , 'retrial' , 'created' ]]
2017-11-19 17:04:40 -05:00
);
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2017-11-19 17:04:40 -05:00
2018-07-20 08:19:26 -04:00
while ( $entry = DBA :: fetch ( $entries )) {
2017-11-05 05:33:46 -05:00
if ( ! posix_kill ( $entry [ " pid " ], 0 )) {
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2018-07-20 08:19:26 -04:00
DBA :: update (
2017-11-20 11:29:55 -05:00
'workerqueue' ,
2018-10-21 01:53:47 -04:00
[ 'executed' => DBA :: NULL_DATETIME , 'pid' => 0 ],
2018-01-15 08:05:12 -05:00
[ 'id' => $entry [ " id " ]]
2017-11-20 11:29:55 -05:00
);
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 05:33:46 -05:00
} else {
// Kill long running processes
// Check if the priority is in a valid range
2018-01-15 08:05:12 -05:00
if ( ! in_array ( $entry [ " priority " ], [ PRIORITY_CRITICAL , PRIORITY_HIGH , PRIORITY_MEDIUM , PRIORITY_LOW , PRIORITY_NEGLIGIBLE ])) {
2017-11-05 05:33:46 -05:00
$entry [ " priority " ] = PRIORITY_MEDIUM ;
}
// Define the maximum durations
2018-01-15 08:05:12 -05:00
$max_duration_defaults = [ PRIORITY_CRITICAL => 720 , PRIORITY_HIGH => 10 , PRIORITY_MEDIUM => 60 , PRIORITY_LOW => 180 , PRIORITY_NEGLIGIBLE => 720 ];
2017-11-05 05:33:46 -05:00
$max_duration = $max_duration_defaults [ $entry [ " priority " ]];
2020-12-03 10:47:50 -05:00
$argv = json_decode ( $entry [ 'parameter' ], true );
if ( ! empty ( $entry [ 'command' ])) {
$command = $entry [ 'command' ];
} elseif ( ! empty ( $argv )) {
$command = array_shift ( $argv );
} else {
2020-06-01 09:51:58 -04:00
return ;
}
2020-12-03 10:47:50 -05:00
$command = basename ( $command );
2017-11-05 05:33:46 -05:00
// How long is the process already running?
2019-02-12 01:42:45 -05:00
$duration = ( time () - strtotime ( $entry [ " executed " ])) / 60 ;
2017-11-05 05:33:46 -05:00
if ( $duration > $max_duration ) {
2020-12-03 10:47:50 -05:00
Logger :: notice ( 'Worker process took too much time - killed' , [ 'duration' => number_format ( $duration , 3 ), 'max' => $max_duration , 'id' => $entry [ " id " ], 'pid' => $entry [ " pid " ], 'command' => $command ]);
2017-11-05 05:33:46 -05:00
posix_kill ( $entry [ " pid " ], SIGTERM );
// We killed the stale process.
// To avoid a blocking situation we reschedule the process at the beginning of the queue.
// Additionally we are lowering the priority. (But not PRIORITY_CRITICAL)
2018-02-14 00:05:00 -05:00
$new_priority = $entry [ " priority " ];
2017-11-05 05:33:46 -05:00
if ( $entry [ " priority " ] == PRIORITY_HIGH ) {
$new_priority = PRIORITY_MEDIUM ;
} elseif ( $entry [ " priority " ] == PRIORITY_MEDIUM ) {
$new_priority = PRIORITY_LOW ;
} elseif ( $entry [ " priority " ] != PRIORITY_CRITICAL ) {
$new_priority = PRIORITY_NEGLIGIBLE ;
}
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2018-07-20 08:19:26 -04:00
DBA :: update (
2017-11-19 17:04:40 -05:00
'workerqueue' ,
2018-10-21 01:53:47 -04:00
[ 'executed' => DBA :: NULL_DATETIME , 'created' => DateTimeFormat :: utcNow (), 'priority' => $new_priority , 'pid' => 0 ],
2018-01-15 08:05:12 -05:00
[ 'id' => $entry [ " id " ]]
2017-11-19 17:04:40 -05:00
);
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 05:33:46 -05:00
} else {
2020-12-03 10:47:50 -05:00
Logger :: info ( 'Process runtime is okay' , [ 'duration' => number_format ( $duration , 3 ), 'max' => $max_duration , 'id' => $entry [ " id " ], 'pid' => $entry [ " pid " ], 'command' => $command ]);
2017-11-05 05:33:46 -05:00
}
}
}
2020-04-28 01:55:17 -04:00
DBA :: close ( $entries );
2017-11-05 05:33:46 -05:00
}
/**
2020-01-19 01:05:23 -05:00
* Checks if the number of active workers exceeds the given limits
2017-11-05 05:33:46 -05:00
*
* @ return bool Are there too much workers running ?
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 05:33:46 -05:00
*/
2019-02-17 13:55:17 -05:00
private static function tooMuchWorkers ()
2017-11-19 17:04:40 -05:00
{
2020-01-19 15:21:13 -05:00
$queues = DI :: config () -> get ( " system " , " worker_queues " , 10 );
2017-11-05 05:33:46 -05:00
$maxqueues = $queues ;
$active = self :: activeWorkers ();
// Decrease the number of workers at higher load
2018-10-13 12:57:31 -04:00
$load = System :: currentLoad ();
2017-11-05 05:33:46 -05:00
if ( $load ) {
2020-01-19 15:21:13 -05:00
$maxsysload = intval ( DI :: config () -> get ( " system " , " maxloadavg " , 20 ));
2018-06-20 00:38:50 -04:00
2018-06-20 06:43:57 -04:00
/* Default exponent 3 causes queues to rapidly decrease as load increases .
* If you have 20 max queues at idle , then you get only 5 queues at 37.1 % of $maxsysload .
* For some environments , this rapid decrease is not needed .
* With exponent 1 , you could have 20 max queues at idle and 13 at 37 % of $maxsysload .
*/
2020-01-19 15:21:13 -05:00
$exponent = intval ( DI :: config () -> get ( 'system' , 'worker_load_exponent' , 3 ));
2018-06-20 06:06:20 -04:00
$slope = pow ( max ( 0 , $maxsysload - $load ) / $maxsysload , $exponent );
$queues = intval ( ceil ( $slope * $maxqueues ));
2018-01-01 15:51:02 -05:00
$processlist = '' ;
2017-11-05 05:33:46 -05:00
2020-01-19 15:21:13 -05:00
if ( DI :: config () -> get ( 'system' , 'worker_jpm' )) {
$intervals = explode ( ',' , DI :: config () -> get ( 'system' , 'worker_jpm_range' ));
2019-02-06 02:37:45 -05:00
$jobs_per_minute = [];
foreach ( $intervals as $interval ) {
2019-02-10 23:39:24 -05:00
if ( $interval == 0 ) {
continue ;
} else {
$interval = ( int ) $interval ;
}
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2021-12-02 09:19:01 -05:00
$jobs = DBA :: count ( 'workerqueue' , [ " `done` AND `executed` > ? " , DateTimeFormat :: utc ( 'now - ' . $interval . ' minute' )]);
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2019-02-17 14:20:24 -05:00
self :: $db_duration_stat += ( microtime ( true ) - $stamp );
2021-10-12 01:53:29 -04:00
$jobs_per_minute [ $interval ] = number_format ( $jobs / $interval , 0 );
2019-02-06 02:37:45 -05:00
}
$processlist = ' - jpm: ' . implode ( '/' , $jobs_per_minute );
}
2019-02-09 18:10:15 -05:00
// Create a list of queue entries grouped by their priority
$listitem = [ 0 => '' ];
2017-11-19 17:04:40 -05:00
2019-02-09 18:10:15 -05:00
$idle_workers = $active ;
2017-11-05 05:33:46 -05:00
2019-02-17 13:55:17 -05:00
$deferred = self :: deferredEntries ();
2019-02-11 03:59:14 -05:00
2020-01-19 15:21:13 -05:00
if ( DI :: config () -> get ( 'system' , 'worker_debug' )) {
2019-02-11 03:59:14 -05:00
$waiting_processes = 0 ;
2017-11-05 05:33:46 -05:00
// Now adding all processes with workerqueue entries
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2019-08-28 00:44:37 -04:00
$jobs = DBA :: p ( " SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` WHERE NOT `done` GROUP BY `priority` " );
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_stat += ( microtime ( true ) - $stamp );
while ( $entry = DBA :: fetch ( $jobs )) {
$stamp = ( float ) microtime ( true );
2021-10-12 01:53:29 -04:00
$running = DBA :: count ( 'workerqueue-view' , [ 'priority' => $entry [ " priority " ]]);
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_stat += ( microtime ( true ) - $stamp );
2021-10-12 01:53:29 -04:00
$idle_workers -= $running ;
$waiting_processes += $entry [ " entries " ];
$listitem [ $entry [ " priority " ]] = $entry [ " priority " ] . " : " . $running . " / " . $entry [ " entries " ];
2017-11-05 05:33:46 -05:00
}
2019-02-09 18:10:15 -05:00
DBA :: close ( $jobs );
} else {
2019-08-13 00:43:08 -04:00
$waiting_processes = self :: totalEntries ();
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2020-04-24 14:50:36 -04:00
$jobs = DBA :: p ( " SELECT COUNT(*) AS `running`, `priority` FROM `workerqueue-view` GROUP BY `priority` ORDER BY `priority` " );
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2019-02-17 13:55:17 -05:00
self :: $db_duration_stat += ( microtime ( true ) - $stamp );
2019-02-08 16:48:41 -05:00
2019-02-09 18:10:15 -05:00
while ( $entry = DBA :: fetch ( $jobs )) {
$idle_workers -= $entry [ " running " ];
$listitem [ $entry [ " priority " ]] = $entry [ " priority " ] . " : " . $entry [ " running " ];
}
DBA :: close ( $jobs );
2017-11-05 05:33:46 -05:00
}
2019-08-23 01:23:32 -04:00
$waiting_processes -= $deferred ;
2019-02-09 18:10:15 -05:00
$listitem [ 0 ] = " 0: " . max ( 0 , $idle_workers );
$processlist .= ' (' . implode ( ', ' , $listitem ) . ')' ;
2017-11-05 05:33:46 -05:00
2020-01-19 15:21:13 -05:00
if ( DI :: config () -> get ( " system " , " worker_fastlane " , false ) && ( $queues > 0 ) && ( $active >= $queues ) && self :: entriesExists ()) {
2017-11-05 05:33:46 -05:00
$top_priority = self :: highestPriority ();
$high_running = self :: processWithPriorityActive ( $top_priority );
if ( ! $high_running && ( $top_priority > PRIORITY_UNDEFINED ) && ( $top_priority < PRIORITY_NEGLIGIBLE )) {
2020-06-01 09:51:58 -04:00
Logger :: info ( " Jobs with a higher priority are waiting but none is executed. Open a fastlane. " , [ 'priority' => $top_priority ]);
2017-11-05 05:33:46 -05:00
$queues = $active + 1 ;
}
}
2020-08-18 06:50:18 -04:00
Logger :: notice ( " Load: " . $load . " / " . $maxsysload . " - processes: " . $deferred . " / " . $active . " / " . $waiting_processes . $processlist . " - maximum: " . $queues . " / " . $maxqueues );
2017-11-05 05:33:46 -05:00
// Are there fewer workers running as possible? Then fork a new one.
2020-01-19 15:21:13 -05:00
if ( ! DI :: config () -> get ( " system " , " worker_dont_fork " , false ) && ( $queues > ( $active + 1 )) && self :: entriesExists ()) {
2020-06-01 09:51:58 -04:00
Logger :: info ( " There are fewer workers as possible, fork a new worker. " , [ 'active' => $active , 'queues' => $queues ]);
2021-01-02 03:43:55 -05:00
if ( self :: isDaemonMode ()) {
2018-06-15 14:18:20 -04:00
self :: IPCSetJobState ( true );
} else {
self :: spawnWorker ();
}
2017-11-05 05:33:46 -05:00
}
}
2018-06-19 18:53:02 -04:00
// if there are too much worker, we don't spawn a new one.
2021-01-02 03:43:55 -05:00
if ( self :: isDaemonMode () && ( $active > $queues )) {
2018-06-15 14:18:20 -04:00
self :: IPCSetJobState ( false );
}
2018-06-19 18:53:02 -04:00
return $active > $queues ;
2017-11-05 05:33:46 -05:00
}
/**
2020-01-19 01:05:23 -05:00
* Returns the number of active worker processes
2017-11-05 05:33:46 -05:00
*
2017-11-19 16:47:21 -05:00
* @ return integer Number of active worker processes
2019-01-06 16:06:53 -05:00
* @ throws \Exception
2017-11-05 05:33:46 -05:00
*/
2017-11-19 17:04:40 -05:00
private static function activeWorkers ()
{
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2021-10-31 09:31:02 -04:00
$count = DI :: process () -> countCommand ( 'Worker.php' );
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2020-08-29 09:01:58 -04:00
self :: $db_duration_count += ( microtime ( true ) - $stamp );
2019-02-09 18:10:15 -05:00
return $count ;
2017-11-05 05:33:46 -05:00
}
2020-08-29 05:03:50 -04:00
/**
* Returns the number of active worker processes
*
* @ return array List of worker process ids
* @ throws \Exception
*/
private static function getWorkerPIDList ()
{
$ids = [];
$stamp = ( float ) microtime ( true );
$queues = DBA :: p ( " SELECT `process`.`pid`, COUNT(`workerqueue`.`pid`) AS `entries` FROM `process`
2021-07-24 18:08:33 -04:00
LEFT JOIN `workerqueue` ON `workerqueue` . `pid` = `process` . `pid` AND NOT `workerqueue` . `done`
2020-08-29 05:03:50 -04:00
GROUP BY `process` . `pid` " );
while ( $queue = DBA :: fetch ( $queues )) {
$ids [ $queue [ 'pid' ]] = $queue [ 'entries' ];
}
DBA :: close ( $queues );
self :: $db_duration += ( microtime ( true ) - $stamp );
2020-08-29 09:01:58 -04:00
self :: $db_duration_count += ( microtime ( true ) - $stamp );
2020-08-29 05:03:50 -04:00
return $ids ;
}
2019-02-17 14:20:24 -05:00
/**
2020-01-19 01:05:23 -05:00
* Returns waiting jobs for the current process id
2019-02-17 14:20:24 -05:00
*
* @ return array waiting workerqueue jobs
* @ throws \Exception
*/
2019-02-17 13:55:17 -05:00
private static function getWaitingJobForPID ()
{
$stamp = ( float ) microtime ( true );
$r = DBA :: select ( 'workerqueue' , [], [ 'pid' => getmypid (), 'done' => false ]);
self :: $db_duration += ( microtime ( true ) - $stamp );
if ( DBA :: isResult ( $r )) {
return DBA :: toArray ( $r );
}
DBA :: close ( $r );
return false ;
}
2019-02-17 14:20:24 -05:00
/**
2020-01-19 01:05:23 -05:00
* Returns the next jobs that should be executed
2020-08-29 05:03:50 -04:00
* @ param int $limit
2019-02-17 14:20:24 -05:00
* @ return array array with next jobs
* @ throws \Exception
*/
2020-08-29 05:03:50 -04:00
private static function nextProcess ( int $limit )
2019-02-16 10:03:37 -05:00
{
$priority = self :: nextPriority ();
if ( empty ( $priority )) {
2019-02-21 14:32:31 -05:00
Logger :: info ( 'No tasks found' );
2019-02-16 10:03:37 -05:00
return [];
}
$ids = [];
$stamp = ( float ) microtime ( true );
$condition = [ " `priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ? " , $priority , DateTimeFormat :: utcNow ()];
2020-12-03 10:47:50 -05:00
$tasks = DBA :: select ( 'workerqueue' , [ 'id' , 'command' , 'parameter' ], $condition , [ 'limit' => $limit , 'order' => [ 'retrial' , 'created' ]]);
2019-02-16 10:03:37 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
while ( $task = DBA :: fetch ( $tasks )) {
$ids [] = $task [ 'id' ];
2019-03-23 02:08:18 -04:00
// Only continue that loop while we are storing commands that can be processed quickly
2020-12-03 10:47:50 -05:00
if ( ! empty ( $task [ 'command' ])) {
$command = $task [ 'command' ];
} else {
$command = json_decode ( $task [ 'parameter' ])[ 0 ];
}
2019-03-23 02:08:18 -04:00
if ( ! in_array ( $command , self :: FAST_COMMANDS )) {
break ;
}
2019-02-16 10:03:37 -05:00
}
DBA :: close ( $tasks );
2019-03-23 02:08:18 -04:00
Logger :: info ( 'Found:' , [ 'priority' => $priority , 'id' => $ids ]);
2019-02-16 10:03:37 -05:00
return $ids ;
}
2019-02-17 14:20:24 -05:00
/**
2020-01-19 01:05:23 -05:00
* Returns the priority of the next workerqueue job
2019-02-17 14:20:24 -05:00
*
* @ return string priority
* @ throws \Exception
*/
2019-02-17 13:55:17 -05:00
private static function nextPriority ()
2019-02-16 10:03:37 -05:00
{
$waiting = [];
$priorities = [ PRIORITY_CRITICAL , PRIORITY_HIGH , PRIORITY_MEDIUM , PRIORITY_LOW , PRIORITY_NEGLIGIBLE ];
foreach ( $priorities as $priority ) {
$stamp = ( float ) microtime ( true );
if ( DBA :: exists ( 'workerqueue' , [ " `priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ? " , $priority , DateTimeFormat :: utcNow ()])) {
$waiting [ $priority ] = true ;
}
self :: $db_duration += ( microtime ( true ) - $stamp );
}
if ( ! empty ( $waiting [ PRIORITY_CRITICAL ])) {
return PRIORITY_CRITICAL ;
}
$running = [];
2019-02-16 22:22:29 -05:00
$running_total = 0 ;
2019-02-16 10:03:37 -05:00
$stamp = ( float ) microtime ( true );
2020-04-24 14:50:36 -04:00
$processes = DBA :: p ( " SELECT COUNT(DISTINCT(`pid`)) AS `running`, `priority` FROM `workerqueue-view` GROUP BY `priority` " );
2019-02-16 10:03:37 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
while ( $process = DBA :: fetch ( $processes )) {
$running [ $process [ 'priority' ]] = $process [ 'running' ];
2019-02-16 22:22:29 -05:00
$running_total += $process [ 'running' ];
2019-02-16 10:03:37 -05:00
}
DBA :: close ( $processes );
foreach ( $priorities as $priority ) {
if ( ! empty ( $waiting [ $priority ]) && empty ( $running [ $priority ])) {
2019-02-21 14:32:31 -05:00
Logger :: info ( 'No running worker found with priority {priority} - assigning it.' , [ 'priority' => $priority ]);
2019-02-16 10:03:37 -05:00
return $priority ;
}
}
2019-02-16 22:22:29 -05:00
$active = max ( self :: activeWorkers (), $running_total );
$priorities = max ( count ( $waiting ), count ( $running ));
$exponent = 2 ;
$total = 0 ;
for ( $i = 1 ; $i <= $priorities ; ++ $i ) {
$total += pow ( $i , $exponent );
2019-02-16 10:03:37 -05:00
}
2019-02-16 22:22:29 -05:00
$limit = [];
for ( $i = 1 ; $i <= $priorities ; ++ $i ) {
$limit [ $priorities - $i ] = max ( 1 , round ( $active * ( pow ( $i , $exponent ) / $total )));
}
$i = 0 ;
foreach ( $running as $priority => $workers ) {
if ( $workers < $limit [ $i ++ ]) {
2019-02-21 14:32:31 -05:00
Logger :: info ( 'Priority {priority} has got {workers} workers out of a limit of {limit}' , [ 'priority' => $priority , 'workers' => $workers , 'limit' => $limit [ $i - 1 ]]);
2019-02-16 22:22:29 -05:00
return $priority ;
}
2019-02-16 10:03:37 -05:00
}
if ( ! empty ( $waiting )) {
2019-02-25 07:11:35 -05:00
$priority = array_keys ( $waiting )[ 0 ];
2019-02-21 14:32:31 -05:00
Logger :: info ( 'No underassigned priority found, now taking the highest priority.' , [ 'priority' => $priority ]);
2019-02-16 22:22:29 -05:00
return $priority ;
2019-02-16 10:03:37 -05:00
}
return false ;
}
2017-11-05 05:33:46 -05:00
/**
2020-01-19 01:05:23 -05:00
* Find and claim the next worker process for us
2017-11-05 05:33:46 -05:00
*
* @ return boolean Have we found something ?
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 05:33:46 -05:00
*/
2019-02-17 13:55:17 -05:00
private static function findWorkerProcesses ()
2017-11-19 17:04:40 -05:00
{
2020-08-29 05:03:50 -04:00
$fetch_limit = DI :: config () -> get ( 'system' , 'worker_fetch_limit' , 1 );
if ( DI :: config () -> get ( 'system' , 'worker_multiple_fetch' )) {
$pids = [];
2020-08-29 06:44:38 -04:00
foreach ( self :: getWorkerPIDList () as $pid => $count ) {
2020-08-29 05:03:50 -04:00
if ( $count <= $fetch_limit ) {
$pids [] = $pid ;
}
}
if ( empty ( $pids )) {
return ;
}
$limit = $fetch_limit * count ( $pids );
} else {
$pids = [ getmypid ()];
$limit = $fetch_limit ;
}
2019-02-16 10:03:37 -05:00
2020-08-29 05:03:50 -04:00
$ids = self :: nextProcess ( $limit );
$limit -= count ( $ids );
2019-03-23 02:08:18 -04:00
2020-08-29 05:03:50 -04:00
// If there is not enough results we check without priority limit
if ( $limit > 0 ) {
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2019-02-17 14:20:24 -05:00
$condition = [ " `pid` = 0 AND NOT `done` AND `next_try` < ? " , DateTimeFormat :: utcNow ()];
2020-12-03 10:47:50 -05:00
$tasks = DBA :: select ( 'workerqueue' , [ 'id' , 'command' , 'parameter' ], $condition , [ 'limit' => $limit , 'order' => [ 'priority' , 'retrial' , 'created' ]]);
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2017-11-05 05:33:46 -05:00
2019-03-23 02:08:18 -04:00
while ( $task = DBA :: fetch ( $tasks )) {
$ids [] = $task [ 'id' ];
// Only continue that loop while we are storing commands that can be processed quickly
2020-12-03 10:47:50 -05:00
if ( ! empty ( $task [ 'command' ])) {
$command = $task [ 'command' ];
} else {
$command = json_decode ( $task [ 'parameter' ])[ 0 ];
}
2019-03-23 02:08:18 -04:00
if ( ! in_array ( $command , self :: FAST_COMMANDS )) {
break ;
}
2017-11-05 05:33:46 -05:00
}
2019-03-23 02:08:18 -04:00
DBA :: close ( $tasks );
2017-11-05 05:33:46 -05:00
}
2020-08-29 06:44:38 -04:00
if ( empty ( $ids )) {
return ;
}
2020-08-29 05:03:50 -04:00
2020-08-29 06:44:38 -04:00
// Assign the task ids to the workers
$worker = [];
foreach ( array_unique ( $ids ) as $id ) {
$pid = next ( $pids );
if ( ! $pid ) {
$pid = reset ( $pids );
2020-08-29 05:03:50 -04:00
}
2020-08-29 06:44:38 -04:00
$worker [ $pid ][] = $id ;
2017-11-05 05:33:46 -05:00
}
2020-08-29 06:44:38 -04:00
$stamp = ( float ) microtime ( true );
foreach ( $worker as $worker_pid => $worker_ids ) {
Logger :: info ( 'Set queue entry' , [ 'pid' => $worker_pid , 'ids' => $worker_ids ]);
DBA :: update ( 'workerqueue' , [ 'executed' => DateTimeFormat :: utcNow (), 'pid' => $worker_pid ],
[ 'id' => $worker_ids , 'done' => false , 'pid' => 0 ]);
}
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 05:33:46 -05:00
}
/**
2020-01-19 01:05:23 -05:00
* Returns the next worker process
2017-11-05 05:33:46 -05:00
*
2020-06-01 09:51:58 -04:00
* @ return array worker processes
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 05:33:46 -05:00
*/
2019-02-17 13:55:17 -05:00
public static function workerProcess ()
2017-11-19 17:04:40 -05:00
{
2017-11-05 05:33:46 -05:00
// There can already be jobs for us in the queue.
2019-02-17 13:55:17 -05:00
$waiting = self :: getWaitingJobForPID ();
if ( ! empty ( $waiting )) {
return $waiting ;
2017-11-05 05:33:46 -05:00
}
2019-02-08 16:48:41 -05:00
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2020-08-06 14:53:45 -04:00
if ( ! DI :: lock () -> acquire ( self :: LOCK_PROCESS )) {
2017-11-05 05:33:46 -05:00
return false ;
}
2019-02-09 18:10:15 -05:00
self :: $lock_duration += ( microtime ( true ) - $stamp );
2017-11-05 05:33:46 -05:00
2020-08-29 06:44:38 -04:00
self :: findWorkerProcesses ();
2017-11-05 05:33:46 -05:00
2020-08-06 14:53:45 -04:00
DI :: lock () -> release ( self :: LOCK_PROCESS );
2017-11-05 05:33:46 -05:00
2020-08-29 06:44:38 -04:00
return self :: getWaitingJobForPID ();
2017-11-05 05:33:46 -05:00
}
/**
2020-01-19 01:05:23 -05:00
* Removes a workerqueue entry from the current process
2020-01-19 04:51:37 -05:00
*
2021-10-24 14:43:59 -04:00
* @ param Process $process the process behind the workerqueue
*
2017-11-19 17:04:40 -05:00
* @ return void
2019-01-06 16:06:53 -05:00
* @ throws \Exception
2017-11-05 05:33:46 -05:00
*/
2021-10-24 14:43:59 -04:00
public static function unclaimProcess ( Process $process )
2017-11-19 17:04:40 -05:00
{
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2021-10-24 14:43:59 -04:00
DBA :: update ( 'workerqueue' , [ 'executed' => DBA :: NULL_DATETIME , 'pid' => 0 ], [ 'pid' => $process -> pid , 'done' => false ]);
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2017-11-05 05:33:46 -05:00
}
/**
2020-01-19 01:05:23 -05:00
* Runs the cron processes
2020-01-19 04:51:37 -05:00
*
2017-11-19 17:04:40 -05:00
* @ return void
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 05:33:46 -05:00
*/
2017-11-19 17:04:40 -05:00
private static function runCron ()
{
2020-06-01 09:51:58 -04:00
Logger :: info ( 'Add cron entries' );
2017-11-05 05:33:46 -05:00
// Check for spooled items
2019-02-08 16:48:41 -05:00
self :: add ([ 'priority' => PRIORITY_HIGH , 'force_priority' => true ], 'SpoolPost' );
2017-11-05 05:33:46 -05:00
// Run the cron job that calls all other jobs
2019-02-08 16:48:41 -05:00
self :: add ([ 'priority' => PRIORITY_MEDIUM , 'force_priority' => true ], 'Cron' );
2017-11-05 05:33:46 -05:00
// Cleaning dead processes
self :: killStaleWorkers ();
}
2021-01-01 14:35:29 -05:00
/**
* Fork a child process
*
* @ param boolean $do_cron
* @ return void
*/
private static function forkProcess ( bool $do_cron )
{
2021-10-24 14:43:59 -04:00
if ( DI :: system () -> isMinMemoryReached ()) {
2021-01-01 18:10:38 -05:00
Logger :: warning ( 'Memory limit reached - quitting' );
return ;
}
2021-01-01 14:35:29 -05:00
// Children inherit their parent's database connection.
// To avoid problems we disconnect and connect both parent and child
DBA :: disconnect ();
$pid = pcntl_fork ();
if ( $pid == - 1 ) {
DBA :: connect ();
Logger :: warning ( 'Could not spawn worker' );
return ;
} elseif ( $pid ) {
// The parent process continues here
DBA :: connect ();
2021-01-05 11:01:05 -05:00
2021-01-05 05:18:25 -05:00
self :: IPCSetJobState ( true , $pid );
2021-01-05 11:01:05 -05:00
Logger :: info ( 'Spawned new worker' , [ 'pid' => $pid ]);
2021-01-05 05:18:25 -05:00
$cycles = 0 ;
while ( self :: IPCJobsExists ( $pid ) && ( ++ $cycles < 100 )) {
usleep ( 10000 );
}
Logger :: info ( 'Spawned worker is ready' , [ 'pid' => $pid , 'wait_cycles' => $cycles ]);
2021-01-01 14:35:29 -05:00
return ;
}
2021-01-02 03:43:55 -05:00
2021-01-01 14:35:29 -05:00
// We now are in the new worker
DBA :: connect ();
2021-10-24 14:43:59 -04:00
2021-10-31 15:15:57 -04:00
DI :: flushLogger ();
2021-11-01 08:54:18 -04:00
$process = DI :: process () -> create ( getmypid (), basename ( __FILE__ ));
2021-01-01 14:35:29 -05:00
2021-01-05 11:47:55 -05:00
$cycles = 0 ;
2021-10-31 15:23:23 -04:00
while ( ! self :: IPCJobsExists ( $process -> pid ) && ( ++ $cycles < 100 )) {
2021-01-05 11:47:55 -05:00
usleep ( 10000 );
}
2021-10-31 15:23:23 -04:00
Logger :: info ( 'Worker spawned' , [ 'pid' => $process -> pid , 'wait_cycles' => $cycles ]);
2021-01-01 14:35:29 -05:00
2021-10-24 14:43:59 -04:00
self :: processQueue ( $do_cron , $process );
2021-01-01 14:35:29 -05:00
2021-10-24 14:43:59 -04:00
self :: unclaimProcess ( $process );
2021-01-01 14:35:29 -05:00
2021-10-31 15:23:23 -04:00
self :: IPCSetJobState ( false , $process -> pid );
2021-10-24 14:43:59 -04:00
DI :: process () -> delete ( $process );
2021-10-31 15:23:23 -04:00
Logger :: info ( 'Worker ended' , [ 'pid' => $process -> pid ]);
2021-01-01 14:35:29 -05:00
exit ();
}
2017-11-19 17:33:07 -05:00
/**
2020-01-19 01:05:23 -05:00
* Spawns a new worker
2020-01-19 04:51:37 -05:00
*
2019-01-06 16:06:53 -05:00
* @ param bool $do_cron
2017-11-19 17:33:07 -05:00
* @ return void
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-19 17:33:07 -05:00
*/
2018-06-05 23:48:04 -04:00
public static function spawnWorker ( $do_cron = false )
2017-11-19 17:33:07 -05:00
{
2021-01-05 05:18:25 -05:00
if ( self :: isDaemonMode () && DI :: config () -> get ( 'system' , 'worker_fork' )) {
2021-01-01 14:35:29 -05:00
self :: forkProcess ( $do_cron );
} else {
2021-10-24 14:43:59 -04:00
DI :: system () -> run ( 'bin/worker.php' , [ 'no_cron' => ! $do_cron ]);
2021-01-01 14:35:29 -05:00
}
2021-01-05 05:18:25 -05:00
if ( self :: isDaemonMode ()) {
2018-06-15 14:18:20 -04:00
self :: IPCSetJobState ( false );
}
2017-11-05 10:28:55 -05:00
}
2017-11-05 05:33:46 -05:00
/**
2020-01-19 01:05:23 -05:00
* Adds tasks to the worker queue
2017-11-05 05:33:46 -05:00
*
2017-11-06 10:38:15 -05:00
* @ param ( integer | array ) priority or parameter array , strings are deprecated and are ignored
2017-11-05 05:33:46 -05:00
*
* next args are passed as $cmd command line
2019-06-10 10:19:24 -04:00
* or : Worker :: add ( PRIORITY_HIGH , " Notifier " , Delivery :: DELETION , $drop_id );
2020-10-24 04:05:03 -04:00
* or : Worker :: add ( array ( 'priority' => PRIORITY_HIGH , 'dont_fork' => true ), " Delivery " , $post_id );
2017-11-05 05:33:46 -05:00
*
2021-07-28 18:22:00 -04:00
* @ return int " 0 " if worker queue entry already existed or there had been an error , otherwise the ID of the worker task
2019-01-06 16:06:53 -05:00
* @ throws \Friendica\Network\HTTPException\InternalServerErrorException
2017-11-05 05:33:46 -05:00
* @ note $cmd and string args are surrounded with " "
*
* @ hooks 'proc_run'
2019-01-06 16:06:53 -05:00
* array $arr
2017-11-05 05:33:46 -05:00
*
*/
2021-10-16 19:06:43 -04:00
public static function add ( ... $args )
2017-11-19 17:04:40 -05:00
{
2018-02-11 11:18:39 -05:00
if ( ! count ( $args )) {
2021-07-28 18:22:00 -04:00
return 0 ;
2017-11-05 05:33:46 -05:00
}
2018-01-15 08:05:12 -05:00
$arr = [ 'args' => $args , 'run_cmd' => true ];
2017-11-05 05:33:46 -05:00
2018-12-26 01:06:24 -05:00
Hook :: callAll ( " proc_run " , $arr );
2017-11-05 05:33:46 -05:00
if ( ! $arr [ 'run_cmd' ] || ! count ( $args )) {
2021-07-28 18:22:00 -04:00
return 1 ;
2017-11-05 05:33:46 -05:00
}
$priority = PRIORITY_MEDIUM ;
2019-08-08 16:42:12 -04:00
// Don't fork from frontend tasks by default
2020-01-19 15:21:13 -05:00
$dont_fork = DI :: config () -> get ( " system " , " worker_dont_fork " , false ) || ! DI :: mode () -> isBackend ();
2018-01-26 21:38:34 -05:00
$created = DateTimeFormat :: utcNow ();
2020-11-25 14:56:39 -05:00
$delayed = DBA :: NULL_DATETIME ;
2019-02-08 16:48:41 -05:00
$force_priority = false ;
2017-11-05 05:33:46 -05:00
2018-02-12 21:26:35 -05:00
$run_parameter = array_shift ( $args );
2017-11-05 05:33:46 -05:00
if ( is_int ( $run_parameter )) {
$priority = $run_parameter ;
} elseif ( is_array ( $run_parameter )) {
2020-11-25 14:56:39 -05:00
if ( isset ( $run_parameter [ 'delayed' ])) {
2020-11-30 00:39:12 -05:00
$delayed = $run_parameter [ 'delayed' ];
2020-11-25 14:56:39 -05:00
}
2017-11-05 05:33:46 -05:00
if ( isset ( $run_parameter [ 'priority' ])) {
$priority = $run_parameter [ 'priority' ];
}
if ( isset ( $run_parameter [ 'created' ])) {
$created = $run_parameter [ 'created' ];
}
if ( isset ( $run_parameter [ 'dont_fork' ])) {
$dont_fork = $run_parameter [ 'dont_fork' ];
}
2019-02-08 16:48:41 -05:00
if ( isset ( $run_parameter [ 'force_priority' ])) {
$force_priority = $run_parameter [ 'force_priority' ];
}
2021-10-16 19:06:43 -04:00
} else {
throw new \InvalidArgumentException ( 'Priority number or task parameter array expected as first argument' );
2017-11-05 05:33:46 -05:00
}
2020-12-03 10:47:50 -05:00
$command = array_shift ( $args );
2018-02-11 11:18:39 -05:00
$parameters = json_encode ( $args );
2020-12-03 10:47:50 -05:00
$found = DBA :: exists ( 'workerqueue' , [ 'command' => $command , 'parameter' => $parameters , 'done' => false ]);
2021-07-28 18:22:00 -04:00
$added = 0 ;
2017-11-05 05:33:46 -05:00
2020-12-08 16:58:32 -05:00
if ( ! in_array ( $priority , PRIORITIES )) {
2020-12-09 00:58:19 -05:00
Logger :: warning ( 'Invalid priority' , [ 'priority' => $priority , 'command' => $command , 'callstack' => System :: callstack ( 20 )]);
2020-12-08 16:58:32 -05:00
$priority = PRIORITY_MEDIUM ;
}
2017-11-05 05:33:46 -05:00
// Quit if there was a database error - a precaution for the update process to 3.5.3
2018-07-20 08:19:26 -04:00
if ( DBA :: errorNo () != 0 ) {
2021-07-28 18:22:00 -04:00
return 0 ;
2017-11-05 05:33:46 -05:00
}
if ( ! $found ) {
2021-07-28 18:22:00 -04:00
if ( ! DBA :: insert ( 'workerqueue' , [ 'command' => $command , 'parameter' => $parameters , 'created' => $created ,
'priority' => $priority , 'next_try' => $delayed ])) {
return 0 ;
2019-09-01 23:25:05 -04:00
}
2021-07-28 18:22:00 -04:00
$added = DBA :: lastInsertId ();
2019-02-08 16:48:41 -05:00
} elseif ( $force_priority ) {
2020-12-03 10:47:50 -05:00
DBA :: update ( 'workerqueue' , [ 'priority' => $priority ], [ 'command' => $command , 'parameter' => $parameters , 'done' => false , 'pid' => 0 ]);
2017-11-05 05:33:46 -05:00
}
2020-11-18 08:29:10 -05:00
// Set the IPC flag to ensure an immediate process execution via daemon
2021-01-02 03:43:55 -05:00
if ( self :: isDaemonMode ()) {
2020-11-18 08:29:10 -05:00
self :: IPCSetJobState ( true );
}
2020-11-20 14:50:08 -05:00
self :: checkDaemonState ();
2018-06-02 01:17:32 -04:00
// Should we quit and wait for the worker to be called as a cronjob?
if ( $dont_fork ) {
2019-09-01 23:25:05 -04:00
return $added ;
2018-06-01 18:09:27 -04:00
}
2017-11-05 05:33:46 -05:00
// If there is a lock then we don't have to check for too much worker
2020-08-06 14:53:45 -04:00
if ( ! DI :: lock () -> acquire ( self :: LOCK_WORKER , 0 )) {
2019-09-01 23:25:05 -04:00
return $added ;
2017-11-05 05:33:46 -05:00
}
// If there are already enough workers running, don't fork another one
$quit = self :: tooMuchWorkers ();
2020-08-06 14:53:45 -04:00
DI :: lock () -> release ( self :: LOCK_WORKER );
2017-11-05 05:33:46 -05:00
if ( $quit ) {
2019-09-01 23:25:05 -04:00
return $added ;
2017-11-05 05:33:46 -05:00
}
2020-11-18 08:29:10 -05:00
// Quit on daemon mode
2021-01-02 03:43:55 -05:00
if ( self :: isDaemonMode ()) {
2019-09-01 23:25:05 -04:00
return $added ;
2018-06-15 14:18:20 -04:00
}
2017-11-19 16:47:21 -05:00
// Now call the worker to execute the jobs that we just added to the queue
2017-11-05 10:28:55 -05:00
self :: spawnWorker ();
2017-11-05 05:33:46 -05:00
2019-09-01 23:25:05 -04:00
return $added ;
2017-11-05 05:33:46 -05:00
}
2018-01-15 19:08:28 -05:00
2020-12-03 10:47:50 -05:00
public static function countWorkersByCommand ( string $command )
{
return DBA :: count ( 'workerqueue' , [ 'done' => false , 'pid' => 0 , 'command' => $command ]);
}
2019-08-11 04:28:52 -04:00
/**
* Returns the next retrial level for worker jobs .
* This function will skip levels when jobs are older .
*
* @ param array $queue Worker queue entry
* @ param integer $max_level maximum retrial level
* @ return integer the next retrial level value
*/
private static function getNextRetrial ( $queue , $max_level )
{
$created = strtotime ( $queue [ 'created' ]);
$retrial_time = time () - $created ;
$new_retrial = $queue [ 'retrial' ] + 1 ;
$total = 0 ;
for ( $retrial = 0 ; $retrial <= $max_level + 1 ; ++ $retrial ) {
$delay = (( $retrial + 3 ) ** 4 ) + ( rand ( 1 , 30 ) * ( $retrial + 1 ));
$total += $delay ;
if (( $total < $retrial_time ) && ( $retrial > $queue [ 'retrial' ])) {
$new_retrial = $retrial ;
}
}
2020-12-31 15:14:13 -05:00
Logger :: notice ( 'New retrial for task' , [ 'id' => $queue [ 'id' ], 'created' => $queue [ 'created' ], 'old' => $queue [ 'retrial' ], 'new' => $new_retrial ]);
2019-08-11 04:28:52 -04:00
return $new_retrial ;
}
2018-10-15 01:19:35 -04:00
/**
* Defers the current worker entry
2020-01-19 04:51:37 -05:00
*
2019-08-20 03:39:13 -04:00
* @ return boolean had the entry been deferred ?
2021-10-16 21:07:31 -04:00
* @ throws \Exception
2018-10-15 01:19:35 -04:00
*/
2021-10-16 21:07:31 -04:00
public static function defer () : bool
2018-10-15 01:19:35 -04:00
{
2021-07-24 18:08:33 -04:00
$queue = DI :: app () -> getQueue ();
if ( empty ( $queue )) {
2019-08-20 03:39:13 -04:00
return false ;
2018-10-15 01:19:35 -04:00
}
$id = $queue [ 'id' ];
2019-02-08 16:48:41 -05:00
$priority = $queue [ 'priority' ];
2018-10-15 01:19:35 -04:00
2020-01-19 15:21:13 -05:00
$max_level = DI :: config () -> get ( 'system' , 'worker_defer_limit' );
2019-08-11 17:07:06 -04:00
2019-08-11 04:28:52 -04:00
$new_retrial = self :: getNextRetrial ( $queue , $max_level );
if ( $new_retrial > $max_level ) {
2020-12-31 15:14:13 -05:00
Logger :: notice ( 'The task exceeded the maximum retry count' , [ 'id' => $id , 'created' => $queue [ 'created' ], 'old_prio' => $queue [ 'priority' ], 'old_retrial' => $queue [ 'retrial' ], 'max_level' => $max_level , 'retrial' => $new_retrial ]);
2019-08-20 03:39:13 -04:00
return false ;
2018-10-15 01:19:35 -04:00
}
// Calculate the delay until the next trial
2019-08-11 04:28:52 -04:00
$delay = (( $new_retrial + 2 ) ** 4 ) + ( rand ( 1 , 30 ) * ( $new_retrial ));
2018-10-15 01:19:35 -04:00
$next = DateTimeFormat :: utc ( 'now + ' . $delay . ' seconds' );
2019-08-11 04:28:52 -04:00
if (( $priority < PRIORITY_MEDIUM ) && ( $new_retrial > 3 )) {
2019-02-08 16:48:41 -05:00
$priority = PRIORITY_MEDIUM ;
2019-08-11 04:28:52 -04:00
} elseif (( $priority < PRIORITY_LOW ) && ( $new_retrial > 6 )) {
2019-02-08 16:48:41 -05:00
$priority = PRIORITY_LOW ;
2019-08-11 04:28:52 -04:00
} elseif (( $priority < PRIORITY_NEGLIGIBLE ) && ( $new_retrial > 8 )) {
2019-02-08 16:48:41 -05:00
$priority = PRIORITY_NEGLIGIBLE ;
}
2019-08-27 15:01:11 -04:00
Logger :: info ( 'Deferred task' , [ 'id' => $id , 'retrial' => $new_retrial , 'created' => $queue [ 'created' ], 'next_execution' => $next , 'old_prio' => $queue [ 'priority' ], 'new_prio' => $priority ]);
2018-10-15 01:19:35 -04:00
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2019-08-11 04:28:52 -04:00
$fields = [ 'retrial' => $new_retrial , 'next_try' => $next , 'executed' => DBA :: NULL_DATETIME , 'pid' => 0 , 'priority' => $priority ];
2018-10-15 01:19:35 -04:00
DBA :: update ( 'workerqueue' , $fields , [ 'id' => $id ]);
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2019-08-20 03:39:13 -04:00
return true ;
2018-10-15 01:19:35 -04:00
}
2018-06-02 01:03:23 -04:00
/**
* Set the flag if some job is waiting
*
* @ param boolean $jobs Is there a waiting job ?
2021-01-05 11:01:05 -05:00
* @ param int $key Key number
2019-01-06 16:06:53 -05:00
* @ throws \Exception
2018-06-02 01:03:23 -04:00
*/
2021-01-05 05:18:25 -05:00
public static function IPCSetJobState ( bool $jobs , int $key = 0 )
2018-06-01 18:09:27 -04:00
{
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2021-01-05 05:18:25 -05:00
DBA :: replace ( 'worker-ipc' , [ 'jobs' => $jobs , 'key' => $key ]);
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2018-06-01 18:09:27 -04:00
}
2021-01-05 11:01:05 -05:00
/**
* Delete a key entry
*
* @ param int $key Key number
2019-01-06 16:06:53 -05:00
* @ throws \Exception
2018-06-02 01:03:23 -04:00
*/
2021-01-05 11:01:05 -05:00
public static function IPCDeleteJobState ( int $key )
2018-06-01 18:09:27 -04:00
{
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2021-01-05 11:01:05 -05:00
DBA :: delete ( 'worker-ipc' , [ 'key' => $key ]);
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
self :: $db_duration_write += ( microtime ( true ) - $stamp );
2018-06-01 18:09:27 -04:00
}
2018-06-02 01:03:23 -04:00
/**
* Checks if some worker job waits to be executed
*
2021-01-05 11:01:05 -05:00
* @ param int $key Key number
2018-06-02 01:03:23 -04:00
* @ return bool
2019-01-06 16:06:53 -05:00
* @ throws \Exception
2018-06-02 01:03:23 -04:00
*/
2021-01-05 05:18:25 -05:00
public static function IPCJobsExists ( int $key = 0 )
2018-06-01 18:09:27 -04:00
{
2019-02-09 18:10:15 -05:00
$stamp = ( float ) microtime ( true );
2021-01-05 05:18:25 -05:00
$row = DBA :: selectFirst ( 'worker-ipc' , [ 'jobs' ], [ 'key' => $key ]);
2019-02-09 18:10:15 -05:00
self :: $db_duration += ( microtime ( true ) - $stamp );
2018-06-01 18:09:27 -04:00
// When we don't have a row, no job is running
2018-07-21 08:46:04 -04:00
if ( ! DBA :: isResult ( $row )) {
2018-06-01 18:09:27 -04:00
return false ;
}
return ( bool ) $row [ 'jobs' ];
}
2020-10-17 08:39:42 -04:00
2020-11-20 14:50:08 -05:00
/**
2021-01-02 03:43:55 -05:00
* Checks if the worker is running in the daemon mode .
*
* @ return boolean
*/
public static function isDaemonMode ()
{
if ( ! is_null ( self :: $daemon_mode )) {
return self :: $daemon_mode ;
}
if ( DI :: mode () -> getExecutor () == Mode :: DAEMON ) {
return true ;
}
$daemon_mode = DI :: config () -> get ( 'system' , 'worker_daemon_mode' , false , true );
if ( $daemon_mode ) {
return $daemon_mode ;
}
2021-01-03 17:57:25 -05:00
if ( ! function_exists ( 'pcntl_fork' )) {
self :: $daemon_mode = false ;
return false ;
}
2021-01-02 03:43:55 -05:00
$pidfile = DI :: config () -> get ( 'system' , 'pidfile' );
if ( empty ( $pidfile )) {
// No pid file, no daemon
self :: $daemon_mode = false ;
return false ;
}
if ( ! is_readable ( $pidfile )) {
// No pid file. We assume that the daemon had been intentionally stopped.
self :: $daemon_mode = false ;
return false ;
}
$pid = intval ( file_get_contents ( $pidfile ));
$running = posix_kill ( $pid , 0 );
self :: $daemon_mode = $running ;
return $running ;
}
/**
2020-11-20 14:50:08 -05:00
* Test if the daemon is running . If not , it will be started
*
* @ return void
*/
private static function checkDaemonState ()
{
if ( ! DI :: config () -> get ( 'system' , 'daemon_watchdog' , false )) {
return ;
}
if ( ! DI :: mode () -> isNormal ()) {
return ;
}
// Check every minute if the daemon is running
if ( DI :: config () -> get ( 'system' , 'last_daemon_check' , 0 ) + 60 > time ()) {
return ;
}
DI :: config () -> set ( 'system' , 'last_daemon_check' , time ());
$pidfile = DI :: config () -> get ( 'system' , 'pidfile' );
if ( empty ( $pidfile )) {
// No pid file, no daemon
return ;
}
if ( ! is_readable ( $pidfile )) {
// No pid file. We assume that the daemon had been intentionally stopped.
return ;
}
$pid = intval ( file_get_contents ( $pidfile ));
if ( posix_kill ( $pid , 0 )) {
Logger :: info ( 'Daemon process is running' , [ 'pid' => $pid ]);
return ;
}
Logger :: warning ( 'Daemon process is not running' , [ 'pid' => $pid ]);
self :: spawnDaemon ();
}
/**
* Spawn a new daemon process
*
* @ return void
*/
private static function spawnDaemon ()
{
2020-12-31 15:14:13 -05:00
Logger :: notice ( 'Starting new daemon process' );
2020-11-20 14:50:08 -05:00
$command = 'bin/daemon.php' ;
$a = DI :: app ();
2021-10-24 14:43:59 -04:00
DI :: system () -> run ( $command , [ 'start' ]);
2020-12-31 15:14:13 -05:00
Logger :: notice ( 'New daemon process started' );
2020-11-20 14:50:08 -05:00
}
2020-10-17 08:39:42 -04:00
/**
* Check if the system is inside the defined maintenance window
*
* @ return boolean
*/
public static function isInMaintenanceWindow ( bool $check_last_execution = false )
{
// Calculate the seconds of the start end end of the maintenance window
$start = strtotime ( DI :: config () -> get ( 'system' , 'maintenance_start' )) % 86400 ;
$end = strtotime ( DI :: config () -> get ( 'system' , 'maintenance_end' )) % 86400 ;
Logger :: info ( 'Maintenance window' , [ 'start' => date ( 'H:i:s' , $start ), 'end' => date ( 'H:i:s' , $end )]);
if ( $check_last_execution ) {
// Calculate the window duration
$duration = max ( $start , $end ) - min ( $start , $end );
// Quit when the last cron execution had been after the previous window
$last_cron = DI :: config () -> get ( 'system' , 'last_cron_daily' );
if ( $last_cron + $duration > time ()) {
Logger :: info ( 'The Daily cron had been executed recently' , [ 'last' => date ( DateTimeFormat :: MYSQL , $last_cron ), 'start' => date ( 'H:i:s' , $start ), 'end' => date ( 'H:i:s' , $end )]);
return false ;
}
}
$current = time () % 86400 ;
if ( $start < $end ) {
// Execute if we are inside the window
$execute = ( $current >= $start ) && ( $current <= $end );
} else {
// Don't execute if we are outside the window
$execute = ! (( $current > $end ) && ( $current < $start ));
}
2020-10-17 09:54:52 -04:00
if ( $execute ) {
2020-10-17 08:39:42 -04:00
Logger :: info ( 'We are inside the maintenance window' , [ 'current' => date ( 'H:i:s' , $current ), 'start' => date ( 'H:i:s' , $start ), 'end' => date ( 'H:i:s' , $end )]);
2020-10-17 09:54:52 -04:00
} else {
Logger :: info ( 'We are outside the maintenance window' , [ 'current' => date ( 'H:i:s' , $current ), 'start' => date ( 'H:i:s' , $start ), 'end' => date ( 'H:i:s' , $end )]);
2020-10-17 08:39:42 -04:00
}
2021-07-24 18:08:33 -04:00
2020-10-17 08:39:42 -04:00
return $execute ;
}
2017-11-05 05:33:46 -05:00
}