Cron parallelization

* Setup wait for multiple cron processes
 * Avoid waiting while tasks must be run
This commit is contained in:
Eric Espie
2022-07-04 12:26:26 +02:00
parent 66383ddaac
commit 605512e0e4
2 changed files with 42 additions and 28 deletions

View File

@@ -1992,28 +1992,39 @@ JS
if (is_null($oConfig) || ContextTag::Check(ContextTag::TAG_CRON)) {
return;
}
// Use mutex to check if cron is running
// Limit the number of cron process to run in parallel
$iMaxCronProcess = $oConfig->Get('cron.max_process');
$iCount = 1;
$iMaxDuration = $oConfig->Get('cron_max_execution_time');
$iTimeLimit = time() + $iMaxDuration;
do {
$bIsRunning = false;
// Use all mutexes to check if cron is running
for ($i = 0; $i < $iMaxCronProcess; $i++) {
$sName = "cron#$i";
$oMutex = new iTopMutex(
'cron'.$oConfig->Get('db_name').$oConfig->Get('db_subname'),
$sName.$oConfig->Get('db_name').$oConfig->Get('db_subname'),
$oConfig->Get('db_host'),
$oConfig->Get('db_user'),
$oConfig->Get('db_pwd'),
$oConfig->Get('db_tls.enabled'),
$oConfig->Get('db_tls.ca')
);
$iCount = 1;
$iStarted = time();
$iMaxDuration = $oConfig->Get('cron_max_execution_time');
$iTimeLimit = $iStarted + $iMaxDuration;
while ($oMutex->IsLocked()) {
if ($oMutex->IsLocked()) {
$bIsRunning = true;
SetupLog::Info("Waiting for cron to stop ($iCount)");
$iCount++;
sleep(1);
if (time() > $iTimeLimit) {
throw new Exception("Cannot enter $sMode mode, consider stopping the cron temporarily");
}
break;
}
}
} catch (Exception $e) {
} while ($bIsRunning);
}
catch (Exception $e) {
// Ignore errors
}
}

View File

@@ -193,12 +193,12 @@ function CronExec($oP, $bVerbose, $bDebug = false)
$oSearch->AddCondition('next_run_date', $sNow, '<=');
$oSearch->AddCondition('status', 'active');
$oTasks = new DBObjectSet($oSearch, ['next_run_date' => true]);
$bWorkDone = false;
if ($oTasks->CountExceeds(0)) {
$bWorkDone = true;
$aTasks = [];
if ($bVerbose) {
$aTasks = [];
if ($oTasks->CountExceeds(0))
{
if ($bVerbose)
{
$sCount = $oTasks->Count();
$oP->p("$sCount Tasks planned to run now ($sNow):");
$oP->p('+---------------------------+---------+---------------------+---------------------+');
@@ -206,9 +206,15 @@ function CronExec($oP, $bVerbose, $bDebug = false)
$oP->p('+---------------------------+---------+---------------------+---------------------+');
}
while ($oTask = $oTasks->Fetch()) {
$aTasks[$oTask->Get('class_name')] = $oTask;
if ($bVerbose) {
$sTaskName = $oTask->Get('class_name');
$sTaskName = $oTask->Get('class_name');
$oTaskMutex = new iTopMutex("cron_$sTaskName");
if ($oTaskMutex->IsLocked()) {
// Already running, ignore
continue;
}
$aTasks[] = $oTask;
if ($bVerbose)
{
$sStatus = $oTask->Get('status');
$sLastRunDate = $oTask->Get('latest_run_date');
$sNextRunDate = $oTask->Get('next_run_date');
@@ -219,7 +225,8 @@ function CronExec($oP, $bVerbose, $bDebug = false)
$oP->p('+---------------------------+---------+---------------------+---------------------+');
}
$aRunTasks = [];
foreach ($aTasks as $oTask) {
while ($aTasks != []) {
$oTask = array_shift($aTasks);
$sTaskClass = $oTask->Get('class_name');
// Check if the current task is running
@@ -229,11 +236,6 @@ function CronExec($oP, $bVerbose, $bDebug = false)
continue;
}
// Just to indicate to Itop that the cron is running for setup
// The mutex will be released when the process dies
$oCronMutex = new iTopMutex('cron');
$oCronMutex->TryLock();
$aRunTasks[] = $sTaskClass;
// N°3219 for each process will use a specific CMDBChange object with a specific track info
@@ -246,7 +248,7 @@ function CronExec($oP, $bVerbose, $bDebug = false)
$oP->p(">> === ".$oNow->format('Y-m-d H:i:s').sprintf(" Starting:%-'=49s", ' '.$sTaskClass.' '));
}
try {
$sMessage = RunTask($aTasks[$sTaskClass], $iTimeLimit);
$sMessage = RunTask($oTask, $iTimeLimit);
} catch (MySQLHasGoneAwayException $e) {
$oP->p("ERROR : 'MySQL has gone away' thrown when processing $sTaskClass (error_code=".$e->getCode().")");
exit(EXIT_CODE_FATAL);
@@ -276,7 +278,7 @@ function CronExec($oP, $bVerbose, $bDebug = false)
}
// Tasks to run later
if ($bVerbose) {
if ($bVerbose && $aTasks == []) {
$oP->p('--');
$oSearch = new DBObjectSearch('BackgroundTask');
$oSearch->AddCondition('next_run_date', $sNow, '>');
@@ -289,11 +291,12 @@ function CronExec($oP, $bVerbose, $bDebug = false)
}
}
}
if ($bVerbose && $bWorkDone) {
$oP->p("Sleeping...\n");
if ($aTasks == []) {
if ($bVerbose) {
$oP->p("Sleeping...\n");
}
sleep($iCronSleep);
}
sleep($iCronSleep);
}
if ($bVerbose) {
$oP->p('');