Cron parallelization

* Setup wait for multiple cron processes
 * Avoid waiting while tasks must be run
This commit is contained in:
Eric Espie
2022-07-04 12:26:26 +02:00
parent 3ab8bc71fe
commit 43bc77784a
2 changed files with 37 additions and 28 deletions

View File

@@ -2044,21 +2044,27 @@ JS
if (is_null($oConfig) || ContextTag::Check(ContextTag::TAG_CRON)) {
return;
}
// Use mutex to check if cron is running
// Limit the number of cron process to run in parallel
$iMaxCronProcess = $oConfig->Get('cron.max_process');
$iCount = 1;
$iMaxDuration = $oConfig->Get('cron_max_execution_time');
$iTimeLimit = time() + $iMaxDuration;
do {
$bIsRunning = false;
// Use all mutexes to check if cron is running
for ($i = 0; $i < $iMaxCronProcess; $i++) {
$sName = "cron#$i";
$oMutex = new iTopMutex(
'cron'.$oConfig->Get('db_name').$oConfig->Get('db_subname'),
$sName.$oConfig->Get('db_name').$oConfig->Get('db_subname'),
$oConfig->Get('db_host'),
$oConfig->Get('db_user'),
$oConfig->Get('db_pwd'),
$oConfig->Get('db_tls.enabled'),
$oConfig->Get('db_tls.ca')
);
$iCount = 1;
$iStarted = time();
$iMaxDuration = $oConfig->Get('cron_max_execution_time');
$iTimeLimit = $iStarted + $iMaxDuration;
while ($oMutex->IsLocked())
{
if ($oMutex->IsLocked()) {
$bIsRunning = true;
SetupLog::Info("Waiting for cron to stop ($iCount)");
$iCount++;
sleep(1);
@@ -2066,8 +2072,12 @@ JS
{
throw new Exception("Cannot enter $sMode mode, consider stopping the cron temporarily");
}
break;
}
}
} catch (Exception $e) {
} while ($bIsRunning);
}
catch (Exception $e) {
// Ignore errors
}
}

View File

@@ -211,12 +211,10 @@ function CronExec($oP, $bVerbose, $bDebug=false)
$oSearch->AddCondition('next_run_date', $sNow, '<=');
$oSearch->AddCondition('status', 'active');
$oTasks = new DBObjectSet($oSearch, ['next_run_date' => true]);
$bWorkDone = false;
$aTasks = [];
if ($oTasks->CountExceeds(0))
{
$bWorkDone = true;
$aTasks = array();
if ($bVerbose)
{
$sCount = $oTasks->Count();
@@ -227,10 +225,15 @@ function CronExec($oP, $bVerbose, $bDebug=false)
}
while ($oTask = $oTasks->Fetch())
{
$aTasks[$oTask->Get('class_name')] = $oTask;
$sTaskName = $oTask->Get('class_name');
$oTaskMutex = new iTopMutex("cron_$sTaskName");
if ($oTaskMutex->IsLocked()) {
// Already running, ignore
continue;
}
$aTasks[] = $oTask;
if ($bVerbose)
{
$sTaskName = $oTask->Get('class_name');
$sStatus = $oTask->Get('status');
$sLastRunDate = $oTask->Get('latest_run_date');
$sNextRunDate = $oTask->Get('next_run_date');
@@ -242,8 +245,9 @@ function CronExec($oP, $bVerbose, $bDebug=false)
$oP->p('+---------------------------+---------+---------------------+---------------------+');
}
$aRunTasks = [];
foreach ($aTasks as $oTask)
{
while ($aTasks != []) {
$oTask = array_shift($aTasks);
$sTaskClass = $oTask->Get('class_name');
// Check if the current task is running
@@ -253,11 +257,6 @@ function CronExec($oP, $bVerbose, $bDebug=false)
continue;
}
// Just to indicate to Itop that the cron is running for setup
// The mutex will be released when the process dies
$oCronMutex = new iTopMutex('cron');
$oCronMutex->TryLock();
$aRunTasks[] = $sTaskClass;
// N°3219 for each process will use a specific CMDBChange object with a specific track info
@@ -272,7 +271,7 @@ function CronExec($oP, $bVerbose, $bDebug=false)
}
try
{
$sMessage = RunTask($aTasks[$sTaskClass], $iTimeLimit);
$sMessage = RunTask($oTask, $iTimeLimit);
} catch (MySQLHasGoneAwayException $e)
{
$oP->p("ERROR : 'MySQL has gone away' thrown when processing $sTaskClass (error_code=".$e->getCode().")");
@@ -307,7 +306,7 @@ function CronExec($oP, $bVerbose, $bDebug=false)
}
// Tasks to run later
if ($bVerbose)
if ($bVerbose && $aTasks == [])
{
$oP->p('--');
$oSearch = new DBObjectSearch('BackgroundTask');
@@ -323,12 +322,12 @@ function CronExec($oP, $bVerbose, $bDebug=false)
}
}
}
if ($bVerbose && $bWorkDone)
{
$oP->p("Sleeping...\n");
if ($aTasks == []) {
if ($bVerbose) {
$oP->p("Sleeping...\n");
}
sleep($iCronSleep);
}
sleep($iCronSleep);
}
if ($bVerbose)
{