From 9ac4f84e22e86d85666357a14ed3dfbc8ece865e Mon Sep 17 00:00:00 2001
From: Romain Quetiez ".print_r($aLightTrace, true)."
The synchronization has been executed, $iErrors errors have been encountered. Click here to see the records being currently in error.
".$sStatistics); - } - else - { - //$this->SendNotification('success', 'The synchronization has been successfully executed.
'); - } - } - catch (SynchroExceptionNotStarted $e) - { - // Set information for reporting... but delete the object in DB - $oStatLog->Set('end_date', time()); - $oStatLog->Set('status', 'error'); - $oStatLog->Set('last_error', $e->getMessage()); - $oStatLog->DBDeleteTracked($oMyChange); - $this->SendNotification('fatal error', 'The synchronization could not start: \''.$e->getMessage().'\'
Please check its configuration
'); - } - catch (Exception $e) - { - $oStatLog->Set('end_date', time()); - $oStatLog->Set('status', 'error'); - $oStatLog->Set('last_error', $e->getMessage()); - $oStatLog->DBUpdateTracked($oMyChange); - $this->SendNotification('exception', 'The synchronization has been interrupted: \''.$e->getMessage().'\'
Please contact the application support team
'); - } - self::$m_oCurrentTask = null; - - return $oStatLog; - } - - protected function DoSynchronize($oLastFullLoadStartDate, $oMyChange, &$oStatLog) - { - if ($this->Get('status') == 'obsolete') - { - throw new SynchroExceptionNotStarted(Dict::S('Core:SyncDataSourceObsolete')); - } - if (!UserRights::IsAdministrator() && $this->Get('user_id') != UserRights::GetUserId()) - { - throw new SynchroExceptionNotStarted(Dict::S('Core:SyncDataSourceAccessRestriction')); - } - - // Get the list of SQL columns - $sClass = $this->GetTargetClass(); - $aAttCodesExpected = array(); - $aAttCodesToReconcile = array(); - $aAttCodesToUpdate = array(); - $sSelectAtt = "SELECT SynchroAttribute WHERE sync_source_id = :source_id AND (update = 1 OR reconcile = 1)"; - $oSetAtt = new DBObjectSet(DBObjectSearch::FromOQL($sSelectAtt), array() /* order by*/, array('source_id' => $this->GetKey()) /* aArgs */); - while ($oSyncAtt = $oSetAtt->Fetch()) - { - if ($oSyncAtt->Get('update')) - { - $aAttCodesToUpdate[$oSyncAtt->Get('attcode')] = $oSyncAtt; - } - if ($oSyncAtt->Get('reconcile')) - { - $aAttCodesToReconcile[$oSyncAtt->Get('attcode')] = $oSyncAtt; - } - $aAttCodesExpected[$oSyncAtt->Get('attcode')] = $oSyncAtt; - } - $aColumns = $this->GetSQLColumns(array_keys($aAttCodesExpected)); - $aExtDataFields = array_keys($aColumns); - $aExtDataFields[] = 'primary_key'; - $aExtDataSpec = array( - 'table' => $this->GetDataTable(), - 'join_key' => 'id', - 'fields' => $aExtDataFields - ); - - // Get the list of attributes, determine reconciliation keys and update targets - // - if ($this->Get('reconciliation_policy') == 'use_attributes') - { - $aReconciliationKeys = $aAttCodesToReconcile; - } - elseif ($this->Get('reconciliation_policy') == 'use_primary_key') - { - // Override the settings made at the attribute level ! - $aReconciliationKeys = array("primary_key" => null); - } - - $oStatLog->AddTrace("Update of: {".implode(', ', array_keys($aAttCodesToUpdate))."}"); - $oStatLog->AddTrace("Reconciliation on: {".implode(', ', array_keys($aReconciliationKeys))."}"); - - if (count($aAttCodesToUpdate) == 0) - { - $oStatLog->AddTrace("No attribute to update"); - throw new SynchroExceptionNotStarted('There is no attribute to update'); - } - if (count($aReconciliationKeys) == 0) - { - $oStatLog->AddTrace("No attribute for reconciliation"); - throw new SynchroExceptionNotStarted('No attribute for reconciliation'); - } - - $aAttributes = array(); - foreach($aAttCodesToUpdate as $sAttCode => $oSyncAtt) - { - $oAttDef = MetaModel::GetAttributeDef($this->GetTargetClass(), $sAttCode); - if ($oAttDef->IsWritable()) - { - $aAttributes[$sAttCode] = $oSyncAtt; - } - } - - // Count the replicas - $sSelectAll = "SELECT SynchroReplica WHERE sync_source_id = :source_id"; - $oSetAll = new DBObjectSet(DBObjectSearch::FromOQL($sSelectAll), array() /* order by*/, array('source_id' => $this->GetKey())); - $iCountAllReplicas = $oSetAll->Count(); - $oStatLog->Set('stats_nb_replica_total', $iCountAllReplicas); - - // Get all the replicas that were not seen in the last import and mark them as obsolete - if ($oLastFullLoadStartDate == null) - { - // No previous import known, use the full_load_periodicity value... and the current date - $oLastFullLoadStartDate = new DateTime(); // Now - $iLoadPeriodicity = $this->Get('full_load_periodicity'); // Duration in seconds - if ($iLoadPeriodicity > 0) - { - $sInterval = "-$iLoadPeriodicity seconds"; - $oLastFullLoadStartDate->Modify($sInterval); - } - else - { - $oLastFullLoadStartDate = new DateTime('1970-01-01'); - } - } - $sLimitDate = $oLastFullLoadStartDate->Format('Y-m-d H:i:s'); - $oStatLog->AddTrace("Limit Date: $sLimitDate"); - - $sDeletePolicy = $this->Get('delete_policy'); - - if ($sDeletePolicy != 'ignore') - { - $sSelectToObsolete = "SELECT SynchroReplica WHERE sync_source_id = :source_id AND status IN ('new', 'synchronized', 'modified', 'orphan') AND status_last_seen < :last_import"; - $oSetToObsolete = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToObsolete), array() /* order by*/, array('source_id' => $this->GetKey(), 'last_import' => $sLimitDate)); - if (($iCountAllReplicas > 10) && ($iCountAllReplicas == $oSetToObsolete->Count())) - { - throw new SynchroExceptionNotStarted(Dict::S('Core:SyncTooManyMissingReplicas')); - } - while($oReplica = $oSetToObsolete->Fetch()) - { - switch ($sDeletePolicy) - { - case 'update': - case 'update_then_delete': - $oStatLog->AddTrace("Destination object to be updated", $oReplica); - $aToUpdate = array(); - $aToUpdateSpec = explode(';', $this->Get('delete_policy_update')); //ex: 'status:obsolete;description:stopped', - foreach($aToUpdateSpec as $sUpdateSpec) - { - $aUpdateSpec = explode(':', $sUpdateSpec); - if (count($aUpdateSpec) == 2) - { - $sAttCode = $aUpdateSpec[0]; - $sValue = $aUpdateSpec[1]; - $aToUpdate[$sAttCode] = $sValue; - } - } - $oReplica->Set('status_last_error', ''); - if ($oReplica->Get('dest_id') == '') - { - $oReplica->Set('status', 'obsolete'); - $oStatLog->Inc('stats_nb_replica_disappeared_no_action'); - } - else - { - $oReplica->UpdateDestObject($aToUpdate, $oMyChange, $oStatLog); - if ($oReplica->Get('status_last_error') == '') - { - // Change the status of the replica IIF - $oReplica->Set('status', 'obsolete'); - } - } - $oReplica->DBUpdateTracked($oMyChange); - break; - - case 'delete': - default: - $oStatLog->AddTrace("Destination object to be DELETED", $oReplica); - $oReplica->DeleteDestObject($oMyChange, $oStatLog); - } - } - } // if ($sDeletePolicy != 'ignore' - - //Count "seen" objects - $sSelectSeen = "SELECT SynchroReplica WHERE sync_source_id = :source_id AND status IN ('new', 'synchronized', 'modified', 'orphan') AND status_last_seen >= :last_import"; - $oSetSeen = new DBObjectSet(DBObjectSearch::FromOQL($sSelectSeen), array() /* order by*/, array('source_id' => $this->GetKey(), 'last_import' => $sLimitDate)); - $oStatLog->Set('stats_nb_replica_seen', $oSetSeen->Count()); - - // Get all the replicas that are 'new' or modified or synchronized with a warning - // - $sSelectToSync = "SELECT SynchroReplica WHERE (status = 'new' OR status = 'modified' OR (status = 'synchronized' AND status_last_warning != '')) AND sync_source_id = :source_id AND status_last_seen >= :last_import"; - $oSetToSync = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToSync), array() /* order by*/, array('source_id' => $this->GetKey(), 'last_import' => $sLimitDate) /* aArgs */, $aExtDataSpec, 0 /* limitCount */, 0 /* limitStart */); - - while($oReplica = $oSetToSync->Fetch()) - { - $oReplica->Synchro($this, $aReconciliationKeys, $aAttributes, $oMyChange, $oStatLog); - $oReplica->DBUpdateTracked($oMyChange); - } - - // Get all the replicas that are to be deleted - // - if ($sDeletePolicy == 'update_then_delete') - { - $oDeletionDate = $oLastFullLoadStartDate; - $iDeleteRetention = $this->Get('delete_policy_retention'); // Duration in seconds - if ($iDeleteRetention > 0) - { - $sInterval = "-$iDeleteRetention seconds"; - $oDeletionDate->Modify($sInterval); - } - $sDeletionDate = $oDeletionDate->Format('Y-m-d H:i:s'); - $oStatLog->AddTrace("Deletion date: $sDeletionDate"); - $sSelectToDelete = "SELECT SynchroReplica WHERE sync_source_id = :source_id AND status IN ('obsolete') AND status_last_seen < :last_import"; - $oSetToDelete = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToDelete), array() /* order by*/, array('source_id' => $this->GetKey(), 'last_import' => $sDeletionDate)); - while($oReplica = $oSetToDelete->Fetch()) - { - $oStatLog->AddTrace("Destination object to be DELETED", $oReplica); - $oReplica->DeleteDestObject($oMyChange, $oStatLog); - } - } - } - /** * Get the list of attributes eligible to the synchronization */ @@ -1527,6 +1206,8 @@ class SynchroLog extends DBObject MetaModel::Init_AddAttribute(new AttributeDateTime("start_date", array("allowed_values"=>null, "sql"=>"start_date", "default_value"=>"", "is_null_allowed"=>true, "depends_on"=>array()))); MetaModel::Init_AddAttribute(new AttributeDateTime("end_date", array("allowed_values"=>null, "sql"=>"end_date", "default_value"=>"", "is_null_allowed"=>true, "depends_on"=>array()))); MetaModel::Init_AddAttribute(new AttributeEnum("status", array("allowed_values"=>new ValueSetEnum('running,completed,error'), "sql"=>"status", "default_value"=>"running", "is_null_allowed"=>false, "depends_on"=>array()))); + MetaModel::Init_AddAttribute(new AttributeInteger("status_curr_job", array("allowed_values"=>null, "sql"=>"status_curr_job", "default_value"=>0, "is_null_allowed"=>true, "depends_on"=>array()))); + MetaModel::Init_AddAttribute(new AttributeInteger("status_curr_pos", array("allowed_values"=>null, "sql"=>"status_curr_pos", "default_value"=>0, "is_null_allowed"=>true, "depends_on"=>array()))); MetaModel::Init_AddAttribute(new AttributeInteger("stats_nb_replica_seen", array("allowed_values"=>null, "sql"=>"stats_nb_replica_seen", "default_value"=>0, "is_null_allowed"=>false, "depends_on"=>array()))); MetaModel::Init_AddAttribute(new AttributeInteger("stats_nb_replica_total", array("allowed_values"=>null, "sql"=>"stats_nb_replica_total", "default_value"=>0, "is_null_allowed"=>false, "depends_on"=>array()))); @@ -1626,14 +1307,26 @@ class SynchroLog extends DBObject return; } + $sPrevTrace = $this->Get('traces'); + $oAttDef = MetaModel::GetAttributeDef(get_class($this), 'traces'); $iMaxSize = $oAttDef->GetMaxSize(); - $sTrace = implode("\n", $this->m_aTraces); + if (strlen($sPrevTrace) > 0) + { + $sTrace = $sPrevTrace."\n".implode("\n", $this->m_aTraces); + } + else + { + $sTrace = implode("\n", $this->m_aTraces); + } if (strlen($sTrace) >= $iMaxSize) { $sTrace = substr($sTrace, 0, $iMaxSize - 255)."...\nTruncated (size: ".strlen($sTrace).')'; } $this->Set('traces', $sTrace); + + //DBUpdate may be called many times... the operation should not be repeated + $this->m_aTraces = array(); } protected function OnInsert() @@ -2330,6 +2023,626 @@ class SynchroReplica extends DBObject implements iDisplay } } +/** + * Context of an ongoing synchronization + * Two usages: + * 1) Public usage: execute the synchronization + * $oSynchroExec = new SynchroExecution($oDataSource[, $iLastFullLoad]); + * $oSynchroExec->Process($iMaxChunkSize); + * + * 2) Internal usage: continue the synchronization (split into chunks, each performed in a separate process) + * This is implemented in the page priv_sync_chunk.php + * $oSynchroExec = SynchroExecution::Resume($oDataSource, $iLastFullLoad, $iSynchroLog, $iChange, $iMaxToProcess, $iJob, $iNextInJob); + * $oSynchroExec->Process() + */ +class SynchroExecution +{ + protected $m_oDataSource = null; + protected $m_oLastFullLoadStartDate = null; + + protected $m_oChange = null; + protected $m_oStatLog = null; + + // Context computed one for optimization and report inconsistencies ASAP + protected $m_aExtDataSpec = array(); + protected $m_aReconciliationKeys = array(); + protected $m_aAttributes = array(); + protected $m_iCountAllReplicas = 0; + + /** + * Constructor + * @param SynchroDataSource $oDataSource Synchronization task + * @param DateTime $oLastFullLoadStartDate Date of the last full load (start date/time), if known + * @return void + */ + public function __construct($oDataSource, $oLastFullLoadStartDate = null) + { + $this->m_oDataSource = $oDataSource; + $this->m_oLastFullLoadStartDate = $oLastFullLoadStartDate; + } + + /** + * Create the persistant information records, for the current synchronization + * In fact, those records ARE defining what is the "current" synchronization + */ + protected function PrepareLogs() + { + if (!is_null($this->m_oChange)) + { + return; + } + + // Create a change used for logging all the modifications/creations happening during the synchro + $this->m_oChange = MetaModel::NewObject("CMDBChange"); + $this->m_oChange->Set("date", time()); + $sUserString = CMDBChange::GetCurrentUserName(); + $this->m_oChange->Set("userinfo", $sUserString.' '.Dict::S('Core:SyncDataExchangeComment')); + $iChangeId = $this->m_oChange->DBInsert(); + + // Start logging this execution (stats + protection against reentrance) + // + $this->m_oStatLog = new SynchroLog(); + $this->m_oStatLog->Set('sync_source_id', $this->m_oDataSource->GetKey()); + $this->m_oStatLog->Set('start_date', time()); + $this->m_oStatLog->Set('status', 'running'); + $this->m_oStatLog->Set('stats_nb_replica_seen', 0); + $this->m_oStatLog->Set('stats_nb_replica_total', 0); + $this->m_oStatLog->Set('stats_nb_obj_deleted', 0); + $this->m_oStatLog->Set('stats_nb_obj_deleted_errors', 0); + $this->m_oStatLog->Set('stats_nb_obj_obsoleted', 0); + $this->m_oStatLog->Set('stats_nb_obj_obsoleted_errors', 0); + $this->m_oStatLog->Set('stats_nb_obj_created', 0); + $this->m_oStatLog->Set('stats_nb_obj_created_errors', 0); + $this->m_oStatLog->Set('stats_nb_obj_created_warnings', 0); + $this->m_oStatLog->Set('stats_nb_obj_updated', 0); + $this->m_oStatLog->Set('stats_nb_obj_updated_warnings', 0); + $this->m_oStatLog->Set('stats_nb_obj_updated_errors', 0); + $this->m_oStatLog->Set('stats_nb_obj_unchanged_warnings', 0); + // $this->m_oStatLog->Set('stats_nb_replica_reconciled', 0); + $this->m_oStatLog->Set('stats_nb_replica_reconciled_errors', 0); + $this->m_oStatLog->Set('stats_nb_replica_disappeared_no_action', 0); + $this->m_oStatLog->Set('stats_nb_obj_new_updated', 0); + $this->m_oStatLog->Set('stats_nb_obj_new_updated_warnings', 0); + $this->m_oStatLog->Set('stats_nb_obj_new_unchanged',0); + $this->m_oStatLog->Set('stats_nb_obj_new_unchanged_warnings',0); + + $sSelectTotal = "SELECT SynchroReplica WHERE sync_source_id = :source_id"; + $oSetTotal = new DBObjectSet(DBObjectSearch::FromOQL($sSelectTotal), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey())); + $this->m_oStatLog->Set('stats_nb_replica_total', $oSetTotal->Count()); + + $this->m_oStatLog->DBInsertTracked($this->m_oChange); + } + + /** + * Prevent against the reentrance... or allow the current task to do things forbidden by the others ! + */ + public static $m_oCurrentTask = null; + public static function GetCurrentTaskId() + { + if (is_object(self::$m_oCurrentTask)) + { + return self::$m_oCurrentTask->GetKey(); + } + else + { + return null; + } + } + + /** + * Prepare structures in memory, to speedup the processing of a given replica + */ + public function PrepareProcessing($bFirstPass = true) + { + if ($this->m_oDataSource->Get('status') == 'obsolete') + { + throw new SynchroExceptionNotStarted(Dict::S('Core:SyncDataSourceObsolete')); + } + if (!UserRights::IsAdministrator() && $this->m_oDataSource->Get('user_id') != UserRights::GetUserId()) + { + throw new SynchroExceptionNotStarted(Dict::S('Core:SyncDataSourceAccessRestriction')); + } + + // Get the list of SQL columns + $sClass = $this->m_oDataSource->GetTargetClass(); + $aAttCodesExpected = array(); + $aAttCodesToReconcile = array(); + $aAttCodesToUpdate = array(); + $sSelectAtt = "SELECT SynchroAttribute WHERE sync_source_id = :source_id AND (update = 1 OR reconcile = 1)"; + $oSetAtt = new DBObjectSet(DBObjectSearch::FromOQL($sSelectAtt), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey()) /* aArgs */); + while ($oSyncAtt = $oSetAtt->Fetch()) + { + if ($oSyncAtt->Get('update')) + { + $aAttCodesToUpdate[$oSyncAtt->Get('attcode')] = $oSyncAtt; + } + if ($oSyncAtt->Get('reconcile')) + { + $aAttCodesToReconcile[$oSyncAtt->Get('attcode')] = $oSyncAtt; + } + $aAttCodesExpected[$oSyncAtt->Get('attcode')] = $oSyncAtt; + } + $aColumns = $this->m_oDataSource->GetSQLColumns(array_keys($aAttCodesExpected)); + $aExtDataFields = array_keys($aColumns); + $aExtDataFields[] = 'primary_key'; + + $this->m_aExtDataSpec = array( + 'table' => $this->m_oDataSource->GetDataTable(), + 'join_key' => 'id', + 'fields' => $aExtDataFields + ); + + // Get the list of attributes, determine reconciliation keys and update targets + // + if ($this->m_oDataSource->Get('reconciliation_policy') == 'use_attributes') + { + $this->m_aReconciliationKeys = $aAttCodesToReconcile; + } + elseif ($this->m_oDataSource->Get('reconciliation_policy') == 'use_primary_key') + { + // Override the settings made at the attribute level ! + $this->m_aReconciliationKeys = array("primary_key" => null); + } + + if ($bFirstPass) + { + $this->m_oStatLog->AddTrace("Update of: {".implode(', ', array_keys($aAttCodesToUpdate))."}"); + $this->m_oStatLog->AddTrace("Reconciliation on: {".implode(', ', array_keys($this->m_aReconciliationKeys))."}"); + } + + if (count($aAttCodesToUpdate) == 0) + { + $this->m_oStatLog->AddTrace("No attribute to update"); + throw new SynchroExceptionNotStarted('There is no attribute to update'); + } + if (count($this->m_aReconciliationKeys) == 0) + { + $this->m_oStatLog->AddTrace("No attribute for reconciliation"); + throw new SynchroExceptionNotStarted('No attribute for reconciliation'); + } + + $this->m_aAttributes = array(); + foreach($aAttCodesToUpdate as $sAttCode => $oSyncAtt) + { + $oAttDef = MetaModel::GetAttributeDef($this->m_oDataSource->GetTargetClass(), $sAttCode); + if ($oAttDef->IsWritable()) + { + $this->m_aAttributes[$sAttCode] = $oSyncAtt; + } + } + + // Count the replicas + $sSelectAll = "SELECT SynchroReplica WHERE sync_source_id = :source_id"; + $oSetAll = new DBObjectSet(DBObjectSearch::FromOQL($sSelectAll), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey())); + $this->m_iCountAllReplicas = $oSetAll->Count(); + $this->m_oStatLog->Set('stats_nb_replica_total', $this->m_iCountAllReplicas); + + // Compute and keep track of the limit date taken into account for obsoleting replicas + // + if ($this->m_oLastFullLoadStartDate == null) + { + // No previous import known, use the full_load_periodicity value... and the current date + $this->m_oLastFullLoadStartDate = new DateTime(); // Now + $iLoadPeriodicity = $this->m_oDataSource->Get('full_load_periodicity'); // Duration in seconds + if ($iLoadPeriodicity > 0) + { + $sInterval = "-$iLoadPeriodicity seconds"; + $this->m_oLastFullLoadStartDate->Modify($sInterval); + } + else + { + $this->m_oLastFullLoadStartDate = new DateTime('1970-01-01'); + } + } + if ($bFirstPass) + { + $this->m_oStatLog->AddTrace("Limit Date: ".$this->m_oLastFullLoadStartDate->Format('Y-m-d H:i:s')); + } + } + + + /** + * Perform a synchronization between the data stored in the replicas (&synchro_data_xxx_xx table) + * and the iTop objects. If the lastFullLoadStartDate is NOT specified then the full_load_periodicity + * is used to determine which records are obsolete. + * @return void + */ + public function Process() + { + $this->PrepareLogs(); + + self::$m_oCurrentTask = $this->m_oDataSource; + try + { + $this->DoSynchronize(); + + $this->m_oStatLog->Set('end_date', time()); + $this->m_oStatLog->Set('status', 'completed'); + $this->m_oStatLog->DBUpdateTracked($this->m_oChange); + + $iErrors = $this->m_oStatLog->GetErrorCount(); + if ($iErrors > 0) + { + $sIssuesOQL = "SELECT SynchroReplica WHERE sync_source_id=".$this->m_oDataSource->GetKey()." AND status_last_error!=''"; + $sAbsoluteUrl = utils::GetAbsoluteUrlAppRoot(); + $sIssuesURL = "{$sAbsoluteUrl}synchro/replica.php?operation=oql&datasource=".$this->m_oDataSource->GetKey()."&oql=".urlencode($sIssuesOQL); + $sSeeIssues = ""; + + $sStatistics = "The synchronization has been executed, $iErrors errors have been encountered. Click here to see the records being currently in error.
".$sStatistics); + } + else + { + //$this->m_oDataSource->SendNotification('success', 'The synchronization has been successfully executed.
'); + } + } + catch (SynchroExceptionNotStarted $e) + { + // Set information for reporting... but delete the object in DB + $this->m_oStatLog->Set('end_date', time()); + $this->m_oStatLog->Set('status', 'error'); + $this->m_oStatLog->Set('last_error', $e->getMessage()); + $this->m_oStatLog->DBDeleteTracked($this->m_oChange); + $this->m_oDataSource->SendNotification('fatal error', 'The synchronization could not start: \''.$e->getMessage().'\'
Please check its configuration
'); + } + catch (Exception $e) + { + $this->m_oStatLog->Set('end_date', time()); + $this->m_oStatLog->Set('status', 'error'); + $this->m_oStatLog->Set('last_error', $e->getMessage()); + $this->m_oStatLog->DBUpdateTracked($this->m_oChange); + $this->m_oDataSource->SendNotification('exception', 'The synchronization has been interrupted: \''.$e->getMessage().'\'
Please contact the application support team
'); + } + self::$m_oCurrentTask = null; + + return $this->m_oStatLog; + } + + /** + * Do the entire synchronization job + */ + protected function DoSynchronize() + { + $this->m_oStatLog->Set('status_curr_job', 1); + $this->m_oStatLog->Set('status_curr_pos', -1); + + $iMaxChunkSize = utils::ReadParam('max_chunk_size', 0, true /* allow CLI */); + if ($iMaxChunkSize > 0) + { + // Split the execution into several processes + // Each process will call DoSynchronizeChunk() + // The loop will end when a process does not reply "continue" on the last line of its output + if (!utils::IsModeCLI()) + { + throw new SynchroExceptionNotStarted(Dict::S('Core:SyncSplitModeCLIOnly')); + } + $aArguments = array(); + $aArguments['source'] = $this->m_oDataSource->GetKey(); + $aArguments['log'] = $this->m_oStatLog->GetKey(); + $aArguments['change'] = $this->m_oChange->GetKey(); + $aArguments['chunk'] = $iMaxChunkSize; + if ($this->m_oLastFullLoadStartDate) + { + $aArguments['last_full_load'] = $this->m_oLastFullLoadStartDate->Format('Y-m-d H:i:s'); + } + else + { + $aArguments['last_full_load'] = ''; + } + + $this->m_oStatLog->DBUpdate($this->m_oChange); + + $iStepCount = 0; + do + { + $aArguments['step_count'] = $iStepCount; + $iStepCount++; + + list ($iRes, $aOut) = utils::ExecITopScript('synchro/priv_sync_chunk.php', $aArguments); + + $sLastRes = strtolower(trim(end($aOut))); + $bContinue = ($sLastRes == 'continue'); + } + while ($bContinue); + + // Reload the log that has been modified by the processes + $this->m_oStatLog->Reload(); + } + else + { + $this->PrepareProcessing(/* first pass */); + $this->DoJob1(); + $this->DoJob2(); + $this->DoJob3(); + } + } + + /** + * Do the synchronization job, limited to some amount of work + * This verb has been designed to be called from within a separate process + * @return true if the process has to be continued + */ + public function DoSynchronizeChunk($oLog, $oChange, $iMaxChunkSize) + { + // Initialize the structures... + self::$m_oCurrentTask = $this->m_oDataSource; + $this->m_oStatLog = $oLog; + $this->m_oChange = $oChange; + + // Prepare internal structures (not the first pass) + $this->PrepareProcessing(false); + + $iCurrJob = $this->m_oStatLog->Get('status_curr_job'); + $iCurrPos = $this->m_oStatLog->Get('status_curr_pos'); + + $this->m_oStatLog->AddTrace("Synchronizing chunk - curr_job:$iCurrJob, curr_pos:$iCurrPos, max_chunk_size:$iMaxChunkSize"); + + $bContinue = false; + switch ($iCurrJob) + { + case 1: + default: + $this->DoJob1($iMaxChunkSize, $iCurrPos); + $bContinue = true; + break; + + case 2: + $this->DoJob2($iMaxChunkSize, $iCurrPos); + $bContinue = true; + break; + + case 3: + $bContinue = $this->DoJob3($iMaxChunkSize, $iCurrPos); + break; + } + $this->m_oStatLog->DBUpdate($this->m_oChange); + self::$m_oCurrentTask = null; + return $bContinue; + } + + /** + * Do the synchronization job #1: Obsolete replica "untouched" for some time + * @param integer $iMaxReplica Limit the number of replicas to process + * @param integer $iCurrPos Current position where to resume the processing + * @return true if the process must be continued + */ + protected function DoJob1($iMaxReplica = null, $iCurrPos = -1) + { + $sLimitDate = $this->m_oLastFullLoadStartDate->Format('Y-m-d H:i:s'); + + // Get all the replicas that were not seen in the last import and mark them as obsolete + $sDeletePolicy = $this->m_oDataSource->Get('delete_policy'); + if ($sDeletePolicy != 'ignore') + { + $sSelectToObsolete = "SELECT SynchroReplica WHERE id > :curr_pos AND sync_source_id = :source_id AND status IN ('new', 'synchronized', 'modified', 'orphan') AND status_last_seen < :last_import"; + $oSetScope = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToObsolete), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey(), 'last_import' => $sLimitDate, 'curr_pos' => $iCurrPos)); + $iCountScope = $oSetScope->Count(); + if (($this->m_iCountAllReplicas > 10) && ($this->m_iCountAllReplicas == $iCountScope)) + { + throw new SynchroExceptionNotStarted(Dict::S('Core:SyncTooManyMissingReplicas')); + } + + if ($iMaxReplica) + { + // Re-build the object set and set a LIMIT + $oSetToProcess = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToObsolete), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey(), 'last_import' => $sLimitDate, 'curr_pos' => $iCurrPos)); + $oSetToProcess->SetLimit($iMaxReplica); + } + else + { + $oSetToProcess = $oSetScope; + } + + $iLastReplicaProcessed = -1; + while($oReplica = $oSetToProcess->Fetch()) + { + $iLastReplicaProcessed = $oReplica->GetKey(); + switch ($sDeletePolicy) + { + case 'update': + case 'update_then_delete': + $this->m_oStatLog->AddTrace("Destination object to be updated", $oReplica); + $aToUpdate = array(); + $aToUpdateSpec = explode(';', $this->m_oDataSource->Get('delete_policy_update')); //ex: 'status:obsolete;description:stopped', + foreach($aToUpdateSpec as $sUpdateSpec) + { + $aUpdateSpec = explode(':', $sUpdateSpec); + if (count($aUpdateSpec) == 2) + { + $sAttCode = $aUpdateSpec[0]; + $sValue = $aUpdateSpec[1]; + $aToUpdate[$sAttCode] = $sValue; + } + } + $oReplica->Set('status_last_error', ''); + if ($oReplica->Get('dest_id') == '') + { + $oReplica->Set('status', 'obsolete'); + $this->m_oStatLog->Inc('stats_nb_replica_disappeared_no_action'); + } + else + { + $oReplica->UpdateDestObject($aToUpdate, $this->m_oChange, $this->m_oStatLog); + if ($oReplica->Get('status_last_error') == '') + { + // Change the status of the replica IIF + $oReplica->Set('status', 'obsolete'); + } + } + $oReplica->DBUpdateTracked($this->m_oChange); + break; + + case 'delete': + default: + $this->m_oStatLog->AddTrace("Destination object to be DELETED", $oReplica); + $oReplica->DeleteDestObject($this->m_oChange, $this->m_oStatLog); + } + } + if ($iMaxReplica) + { + if ($iMaxReplica < $iCountScope) + { + // Continue with this job! + $this->m_oStatLog->Set('status_curr_pos', $iLastReplicaProcessed); + return true; + } + } + } // if ($sDeletePolicy != 'ignore' + + //Count "seen" objects + $sSelectSeen = "SELECT SynchroReplica WHERE sync_source_id = :source_id AND status IN ('new', 'synchronized', 'modified', 'orphan') AND status_last_seen >= :last_import"; + $oSetSeen = new DBObjectSet(DBObjectSearch::FromOQL($sSelectSeen), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey(), 'last_import' => $sLimitDate)); + $this->m_oStatLog->Set('stats_nb_replica_seen', $oSetSeen->Count()); + + + // Job complete! + $this->m_oStatLog->Set('status_curr_job', 2); + $this->m_oStatLog->Set('status_curr_pos', -1); + return false; + } + + /** + * Do the synchronization job #2: Create and modify object for new/modified replicas + * @param integer $iMaxReplica Limit the number of replicas to process + * @param integer $iCurrPos Current position where to resume the processing + * @return true if the process must be continued + */ + protected function DoJob2($iMaxReplica = null, $iCurrPos = -1) + { + $sLimitDate = $this->m_oLastFullLoadStartDate->Format('Y-m-d H:i:s'); + + // Get all the replicas that are 'new' or modified or synchronized with a warning + // + $sSelectToSync = "SELECT SynchroReplica WHERE id > :curr_pos AND (status = 'new' OR status = 'modified' OR (status = 'synchronized' AND status_last_warning != '')) AND sync_source_id = :source_id AND status_last_seen >= :last_import"; + $oSetScope = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToSync), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey(), 'last_import' => $sLimitDate, 'curr_pos' => $iCurrPos), $this->m_aExtDataSpec); + $iCountScope = $oSetScope->Count(); + + if ($iMaxReplica) + { + // Re-build the object set and set a LIMIT + $oSetToProcess = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToSync), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey(), 'last_import' => $sLimitDate, 'curr_pos' => $iCurrPos), $this->m_aExtDataSpec); + $oSetToProcess->SetLimit($iMaxReplica); + } + else + { + $oSetToProcess = $oSetScope; + } + + $iLastReplicaProcessed = -1; + while($oReplica = $oSetToProcess->Fetch()) + { + $iLastReplicaProcessed = $oReplica->GetKey(); + $oReplica->Synchro($this->m_oDataSource, $this->m_aReconciliationKeys, $this->m_aAttributes, $this->m_oChange, $this->m_oStatLog); + $oReplica->DBUpdateTracked($this->m_oChange); + } + + if ($iMaxReplica) + { + if ($iMaxReplica < $iCountScope) + { + // Continue with this job! + $this->m_oStatLog->Set('status_curr_pos', $iLastReplicaProcessed); + return true; + } + } + + // Job complete! + $this->m_oStatLog->Set('status_curr_job', 3); + $this->m_oStatLog->Set('status_curr_pos', -1); + return false; + } + + /** + * Do the synchronization job #3: Delete replica depending on the obsolescence scheme + * @param integer $iMaxReplica Limit the number of replicas to process + * @param integer $iCurrPos Current position where to resume the processing + * @return true if the process must be continued + */ + protected function DoJob3($iMaxReplica = null, $iCurrPos = -1) + { + $sDeletePolicy = $this->m_oDataSource->Get('delete_policy'); + if ($sDeletePolicy != 'update_then_delete') + { + // Job complete! + $this->m_oStatLog->Set('status_curr_job', 0); + $this->m_oStatLog->Set('status_curr_pos', -1); + return false; + } + + $bFirstPass = ($iCurrPos == -1); + + // Get all the replicas that are to be deleted + // + $oDeletionDate = $this->m_oLastFullLoadStartDate; + $iDeleteRetention = $this->m_oDataSource->Get('delete_policy_retention'); // Duration in seconds + if ($iDeleteRetention > 0) + { + $sInterval = "-$iDeleteRetention seconds"; + $oDeletionDate->Modify($sInterval); + } + $sDeletionDate = $oDeletionDate->Format('Y-m-d H:i:s'); + if ($bFirstPass) + { + $this->m_oStatLog->AddTrace("Deletion date: $sDeletionDate"); + } + $sSelectToDelete = "SELECT SynchroReplica WHERE id > :curr_pos AND sync_source_id = :source_id AND status IN ('obsolete') AND status_last_seen < :last_import"; + $oSetScope = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToDelete), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey(), 'last_import' => $sDeletionDate, 'curr_pos' => $iCurrPos)); + $iCountScope = $oSetScope->Count(); + + if ($iMaxReplica) + { + // Re-build the object set and set a LIMIT + $oSetToProcess = new DBObjectSet(DBObjectSearch::FromOQL($sSelectToDelete), array() /* order by*/, array('source_id' => $this->m_oDataSource->GetKey(), 'last_import' => $sDeletionDate, 'curr_pos' => $iCurrPos)); + $oSetToProcess->SetLimit($iMaxReplica); + } + else + { + $oSetToProcess = $oSetScope; + } + + $iLastReplicaProcessed = -1; + while($oReplica = $oSetToProcess->Fetch()) + { + $iLastReplicaProcessed = $oReplica->GetKey(); + $this->m_oStatLog->AddTrace("Destination object to be DELETED", $oReplica); + $oReplica->DeleteDestObject($this->m_oChange, $this->m_oStatLog); + } + + if ($iMaxReplica) + { + if ($iMaxReplica < $iCountScope) + { + // Continue with this job! + $this->m_oStatLog->Set('status_curr_pos', $iLastReplicaProcessed); + return true; + } + } + // Job complete! + $this->m_oStatLog->Set('status_curr_job', 0); + $this->m_oStatLog->Set('status_curr_pos', -1); + return false; + } +} + $oAdminMenu = new MenuGroup('AdminTools', 80 /* fRank */, 'SynchroDataSource', UR_ACTION_MODIFY, UR_ALLOWED_YES); new OQLMenuNode('DataSources', 'SELECT SynchroDataSource', $oAdminMenu->GetIndex(), 12 /* fRank */, true, 'SynchroDataSource', UR_ACTION_MODIFY, UR_ALLOWED_YES); // new OQLMenuNode('Replicas', 'SELECT SynchroReplica', $oAdminMenu->GetIndex(), 12 /* fRank */, true, 'SynchroReplica', UR_ACTION_MODIFY, UR_ALLOWED_YES); diff --git a/test/testlist.inc.php b/test/testlist.inc.php index d5ab84829..ce62839bf 100644 --- a/test/testlist.inc.php +++ b/test/testlist.inc.php @@ -2271,8 +2271,8 @@ class TestDataExchange extends TestBizModel ), ), ), - //); - //$aXXXXScenarios = array( +// ); +// $aXXXXScenarios = array( array( 'desc' => 'Update then delete with retention (to complete with manual testing) and reconciliation on org/name', 'login' => 'admin', @@ -2361,7 +2361,7 @@ class TestDataExchange extends TestBizModel ), array( array('obj_C', 2, 'obj_C', 'production'), - ), + ), ), 'target_data' => array( array('org_id', 'name', 'status'), @@ -2403,7 +2403,7 @@ class TestDataExchange extends TestBizModel array(2, 'obj_B', 'implementation'), array(2, 'obj_C', 'production'), array(2, 'obj_D', 'obsolete'), - ), + ), ), 'attributes' => array( 'org_id' => array(