Cron parallelization

# Conflicts:
#	webservices/cron.php
This commit is contained in:
Eric Espie
2022-07-01 18:00:38 +02:00
parent 63e40fc0a2
commit 66383ddaac
2 changed files with 75 additions and 57 deletions

View File

@@ -585,14 +585,6 @@ class Config
'source_of_value' => '', 'source_of_value' => '',
'show_in_conf_sample' => true, 'show_in_conf_sample' => true,
], ],
'cron_task_max_execution_time' => [
'type' => 'integer',
'description' => 'Background tasks will use this value (integer) multiplicated by its periodicity (in seconds) as max duration per cron execution. 0 is unlimited time',
'default' => 0,
'value' => 0,
'source_of_value' => '',
'show_in_conf_sample' => false,
],
'cron_sleep' => [ 'cron_sleep' => [
'type' => 'integer', 'type' => 'integer',
'description' => 'Duration (seconds) before cron.php checks again if something must be done', 'description' => 'Duration (seconds) before cron.php checks again if something must be done',
@@ -601,6 +593,14 @@ class Config
'source_of_value' => '', 'source_of_value' => '',
'show_in_conf_sample' => false, 'show_in_conf_sample' => false,
], ],
'cron.max_process' => [
'type' => 'integer',
'description' => 'Maximum number of cron process to run',
'default' => 10,
'value' => 10,
'source_of_value' => '',
'show_in_conf_sample' => true,
],
'async_task_retries' => [ 'async_task_retries' => [
'type' => 'array', 'type' => 'array',
'description' => 'Automatic retries of asynchronous tasks in case of failure (per class)', 'description' => 'Automatic retries of asynchronous tasks in case of failure (per class)',

View File

@@ -91,7 +91,6 @@ function RunTask(BackgroundTask $oTask, $iTimeLimit)
$oProcess = new $TaskClass(); $oProcess = new $TaskClass();
$oRefClass = new ReflectionClass(get_class($oProcess)); $oRefClass = new ReflectionClass(get_class($oProcess));
$oDateStarted = new DateTime(); $oDateStarted = new DateTime();
$oDatePlanned = new DateTime($oTask->Get('next_run_date'));
$fStart = microtime(true); $fStart = microtime(true);
$oCtx = new ContextTag('CRON:Task:'.$TaskClass); $oCtx = new ContextTag('CRON:Task:'.$TaskClass);
@@ -99,25 +98,34 @@ function RunTask(BackgroundTask $oTask, $iTimeLimit)
$oExceptionToThrow = null; $oExceptionToThrow = null;
try { try {
// Record (when starting) that this task was started, just in case it crashes during the execution // Record (when starting) that this task was started, just in case it crashes during the execution
if ($oTask->Get('total_exec_count') == 0) {
// First execution
$oTask->Set('first_run_date', $oDateStarted->format('Y-m-d H:i:s'));
}
$oTask->Set('latest_run_date', $oDateStarted->format('Y-m-d H:i:s')); $oTask->Set('latest_run_date', $oDateStarted->format('Y-m-d H:i:s'));
// Record the current user running the cron // Record the current user running the cron
$oTask->Set('system_user', utils::GetCurrentUserName()); $oTask->Set('system_user', utils::GetCurrentUserName());
$oTask->Set('running', 1); $oTask->Set('running', 1);
// Compute the next run date
if ($oRefClass->implementsInterface('iScheduledProcess')) {
// Schedules process do repeat at specific moments
$oPlannedStart = $oProcess->GetNextOccurrence();
} else {
// Background processes do repeat periodically
$oDatePlanned = new DateTime($oTask->Get('next_run_date'));
$oPlannedStart = clone $oDatePlanned;
// Let's schedule from the previous planned date of execution to avoid shift
$oPlannedStart->modify('+'.$oProcess->GetPeriodicity().' seconds');
$oNow = new DateTime();
while ($oPlannedStart->format('U') <= $oNow->format('U')) {
// Next planned start is already in the past, increase it again by a period
$oPlannedStart = $oPlannedStart->modify('+'.$oProcess->GetPeriodicity().' seconds');
}
}
$oTask->Set('next_run_date', $oPlannedStart->format('Y-m-d H:i:s'));
$oTask->DBUpdate(); $oTask->DBUpdate();
// Time in seconds allowed to the task
$iCurrTimeLimit = $iTimeLimit; $sMessage = $oProcess->Process($iTimeLimit);
// Compute allowed time
if ($oRefClass->implementsInterface('iScheduledProcess') === false) {
// Periodic task, allow only X times ($iMaxTaskExecutionTime) its periodicity (GetPeriodicity())
$iMaxTaskExecutionTime = MetaModel::GetConfig()->Get('cron_task_max_execution_time');
$iTaskLimit = time() + $oProcess->GetPeriodicity() * $iMaxTaskExecutionTime;
// If our proposed time limit is less than cron limit, and cron_task_max_execution_time is > 0
if ($iTaskLimit < $iTimeLimit && $iMaxTaskExecutionTime > 0) {
$iCurrTimeLimit = $iTaskLimit;
}
}
$sMessage = $oProcess->Process($iCurrTimeLimit);
$oTask->Set('running', 0);
} catch (MySQLHasGoneAwayException $e) { } catch (MySQLHasGoneAwayException $e) {
throw $e; throw $e;
} catch (ProcessFatalException $e) { } catch (ProcessFatalException $e) {
@@ -128,35 +136,12 @@ function RunTask(BackgroundTask $oTask, $iTimeLimit)
} else { } else {
$sMessage = 'Processing failed with message: '.$e->getMessage(); $sMessage = 'Processing failed with message: '.$e->getMessage();
} }
} } finally {
$oTask->Set('running', 0);
$fDuration = microtime(true) - $fStart; $fDuration = microtime(true) - $fStart;
if ($oTask->Get('total_exec_count') == 0) {
// First execution
$oTask->Set('first_run_date', $oDateStarted->format('Y-m-d H:i:s'));
}
$oTask->ComputeDurations($fDuration); // does increment the counter and compute statistics $oTask->ComputeDurations($fDuration); // does increment the counter and compute statistics
// Update the timestamp since we want to be able to re-order the tasks based on the time they finished
$oDateEnded = new DateTime();
$oTask->Set('latest_run_date', $oDateEnded->format('Y-m-d H:i:s'));
if ($oRefClass->implementsInterface('iScheduledProcess')) {
// Schedules process do repeat at specific moments
$oPlannedStart = $oProcess->GetNextOccurrence();
} else {
// Background processes do repeat periodically
$oPlannedStart = clone $oDatePlanned;
// Let's schedule from the previous planned date of execution to avoid shift
$oPlannedStart->modify($oProcess->GetPeriodicity().' seconds');
$oEnd = new DateTime();
while ($oPlannedStart->format('U') < $oEnd->format('U')) {
// Next planned start is already in the past, increase it again by a period
$oPlannedStart = $oPlannedStart->modify('+'.$oProcess->GetPeriodicity().' seconds');
}
}
$oTask->Set('next_run_date', $oPlannedStart->format('Y-m-d H:i:s'));
$oTask->DBUpdate(); $oTask->DBUpdate();
}
if ($oExceptionToThrow) { if ($oExceptionToThrow) {
throw $oExceptionToThrow; throw $oExceptionToThrow;
@@ -190,6 +175,7 @@ function CronExec($oP, $bVerbose, $bDebug = false)
$iMaxDuration = MetaModel::GetConfig()->Get('cron_max_execution_time'); $iMaxDuration = MetaModel::GetConfig()->Get('cron_max_execution_time');
$iTimeLimit = $iStarted + $iMaxDuration; $iTimeLimit = $iStarted + $iMaxDuration;
$iCronSleep = MetaModel::GetConfig()->Get('cron_sleep'); $iCronSleep = MetaModel::GetConfig()->Get('cron_sleep');
$iMaxCronProcess = MetaModel::GetConfig()->Get('cron.max_process');
if ($bVerbose) { if ($bVerbose) {
$oP->p("Planned duration = $iMaxDuration seconds"); $oP->p("Planned duration = $iMaxDuration seconds");
@@ -235,6 +221,19 @@ function CronExec($oP, $bVerbose, $bDebug = false)
$aRunTasks = []; $aRunTasks = [];
foreach ($aTasks as $oTask) { foreach ($aTasks as $oTask) {
$sTaskClass = $oTask->Get('class_name'); $sTaskClass = $oTask->Get('class_name');
// Check if the current task is running
$oTaskMutex = new iTopMutex("cron_$sTaskClass");
if (!$oTaskMutex->TryLock()) {
// Task is already running, try next one
continue;
}
// Just to indicate to Itop that the cron is running for setup
// The mutex will be released when the process dies
$oCronMutex = new iTopMutex('cron');
$oCronMutex->TryLock();
$aRunTasks[] = $sTaskClass; $aRunTasks[] = $sTaskClass;
// N°3219 for each process will use a specific CMDBChange object with a specific track info // N°3219 for each process will use a specific CMDBChange object with a specific track info
@@ -255,6 +254,9 @@ function CronExec($oP, $bVerbose, $bDebug = false)
$oP->p("ERROR : an exception was thrown when processing '$sTaskClass' (".$e->getInfoLog().")"); $oP->p("ERROR : an exception was thrown when processing '$sTaskClass' (".$e->getInfoLog().")");
IssueLog::Error("Cron.php error : an exception was thrown when processing '$sTaskClass' (".$e->getInfoLog().')'); IssueLog::Error("Cron.php error : an exception was thrown when processing '$sTaskClass' (".$e->getInfoLog().')');
} }
finally {
$oTaskMutex->Unlock();
}
if ($bVerbose) { if ($bVerbose) {
if (!empty($sMessage)) { if (!empty($sMessage)) {
$oP->p("$sTaskClass: $sMessage"); $oP->p("$sTaskClass: $sMessage");
@@ -267,6 +269,10 @@ function CronExec($oP, $bVerbose, $bDebug = false)
break 2; break 2;
} }
CheckMaintenanceMode($oP); CheckMaintenanceMode($oP);
if ($iMaxCronProcess > 1) {
// Reindex tasks every time
break;
}
} }
// Tasks to run later // Tasks to run later
@@ -498,16 +504,28 @@ try {
exit(EXIT_CODE_FATAL); exit(EXIT_CODE_FATAL);
} }
try { try
$oMutex = new iTopMutex('cron'); {
if (!MetaModel::DBHasAccess(ACCESS_ADMIN_WRITE)) { if (!MetaModel::DBHasAccess(ACCESS_ADMIN_WRITE))
{
$oP->p("A maintenance is ongoing"); $oP->p("A maintenance is ongoing");
} else { }
else
{
// Limit the number of cron process to run in parallel
$iMaxCronProcess = MetaModel::GetConfig()->Get('cron.max_process');
$bCanRun = false;
for ($i = 0; $i < $iMaxCronProcess; $i++) {
$oMutex = new iTopMutex("cron#$i");
if ($oMutex->TryLock()) { if ($oMutex->TryLock()) {
$bCanRun = true;
break;
}
}
if ($bCanRun) {
CronExec($oP, $bVerbose, $bDebug); CronExec($oP, $bVerbose, $bDebug);
} else { } else {
// Exit silently $oP->p("Already $iMaxCronProcess are running...");
$oP->p("Already running...");
} }
} }
} catch (Exception $e) { } catch (Exception $e) {