#489 Run the synchro with parameter max_chunk_size to split the execution into several processes, each loading at most the given count of replica (note: the master process will continue to run while child processes are being forked one by one)

SVN:trunk[1732]
This commit is contained in:
Romain Quetiez
2011-12-20 15:55:07 +00:00
parent d74f562dba
commit 9ac4f84e22
9 changed files with 912 additions and 336 deletions

View File

@@ -46,6 +46,7 @@ class utils
// Parameters loaded from a file, parameters of the page/command line still have precedence
private static $m_aParamsFromFile = null;
private static $m_aParamSource = array();
protected static function LoadParamFile($sParamFile)
{
@@ -82,6 +83,7 @@ class utils
$sParam = $aMatches[1];
$value = trim($aMatches[2]);
self::$m_aParamsFromFile[$sParam] = $value;
self::$m_aParamSource[$sParam] = $sParamFile;
}
}
}
@@ -99,6 +101,25 @@ class utils
}
}
/**
* Return the source file from which the parameter has been found,
* usefull when it comes to pass user credential to a process executed
* in the background
* @param $sName Parameter name
* @return The file name if any, or null
*/
public static function GetParamSourceFile($sName)
{
if (array_key_exists($sName, self::$m_aParamSource))
{
return self::$m_aParamSource[$sName];
}
else
{
return null;
}
}
public static function IsModeCLI()
{
$sSAPIName = php_sapi_name();
@@ -152,10 +173,18 @@ class utils
public static function Sanitize($value, $defaultValue, $sSanitizationFilter)
{
$retValue = self::Sanitize_Internal($value, $sSanitizationFilter);
if ($retValue === false)
if ($value === $defaultValue)
{
$retValue = $defaultValue;
// Preserve the real default value (can be used to detect missing mandatory parameters)
$retValue = $value;
}
else
{
$retValue = self::Sanitize_Internal($value, $sSanitizationFilter);
if ($retValue === false)
{
$retValue = $defaultValue;
}
}
return $retValue;
}
@@ -601,5 +630,86 @@ class utils
}
echo "<p><pre>".print_r($aLightTrace, true)."</pre></p>\n";
}
/**
* Execute the given iTop PHP script, passing it the current credentials
* Only CLI mode is supported, because of the need to hand the credentials over to the next process
* Throws an exception if the execution fails or could not be attempted (config issue)
* @param string $sScript Name and relative path to the file (relative to the iTop root dir)
* @param hash $aArguments Associative array of 'arg' => 'value'
* @return array(iCode, array(output lines))
*/
/**
*/
static function ExecITopScript($sScriptName, $aArguments)
{
$aDisabled = explode(', ', ini_get('disable_functions'));
if (in_array('exec', $aDisabled))
{
throw new Exception("The PHP exec() function has been disabled on this server");
}
$sPHPExec = trim(MetaModel::GetConfig()->Get('php_path'));
if (strlen($sPHPExec) == 0)
{
throw new Exception("The path to php must not be empty. Please set a value for 'php_path' in your configuration file.");
}
$sAuthUser = self::ReadParam('auth_user', '', 'raw_data');
$sAuthPwd = self::ReadParam('auth_pwd', '', 'raw_data');
$sParamFile = self::GetParamSourceFile('auth_user');
if (is_null($sParamFile))
{
$aArguments['auth_user'] = $sAuthUser;
$aArguments['auth_pwd'] = $sAuthPwd;
}
else
{
$aArguments['param_file'] = $sParamFile;
}
$aArgs = array();
foreach($aArguments as $sName => $value)
{
// Note: See comment from the 23-Apr-2004 03:30 in the PHP documentation
// It suggests to rely on pctnl_* function instead of using escapeshellargs
$aArgs[] = "--$sName=".escapeshellarg($value);
}
$sArgs = implode(' ', $aArgs);
$sScript = realpath(APPROOT.$sScriptName);
if (!file_exists($sScript))
{
throw new Exception("Could not find the script file '$sScriptName' from the directory '".APPROOT."'");
}
$sCommand = '"'.$sPHPExec.'" '.escapeshellarg($sScript).' -- '.$sArgs;
if (version_compare(phpversion(), '5.3.0', '<'))
{
if (substr(PHP_OS,0,3) == 'WIN')
{
// Under Windows, and for PHP 5.2.x, the whole command has to be quoted
// Cf PHP doc: http://php.net/manual/fr/function.exec.php, comment from the 27-Dec-2010
$sCommand = '"'.$sCommand.'"';
}
}
$sLastLine = exec($sCommand, $aOutput, $iRes);
if ($iRes == 1)
{
throw new Exception(Dict::S('Core:ExecProcess:Code1')." - ".$sCommand);
}
elseif ($iRes == 255)
{
$sErrors = implode("\n", $aOutput);
throw new Exception(Dict::S('Core:ExecProcess:Code255')." - ".$sCommand.":\n".$sErrors);
}
//$aOutput[] = $sCommand;
return array($iRes, $aOutput);
}
}
?>

View File

@@ -124,6 +124,14 @@ class Config
'source_of_value' => '',
'show_in_conf_sample' => false,
),
'php_path' => array(
'type' => 'string',
'description' => 'Path to the php executable in CLI mode',
'default' => 'php',
'value' => 'php',
'source_of_value' => '',
'show_in_conf_sample' => true,
),
'session_name' => array(
'type' => 'string',
'description' => 'The name of the cookie used to store the PHP session id',

View File

@@ -931,7 +931,7 @@ abstract class DBObject
$oDeletionPlan->AddToDelete($oReplica, DEL_SILENT);
if ($oDataSource->GetKey() == SynchroDataSource::GetCurrentTaskId())
if ($oDataSource->GetKey() == SynchroExecution::GetCurrentTaskId())
{
// The current task has the right to delete the object
continue;
@@ -1860,7 +1860,7 @@ abstract class DBObject
$oSet = $this->GetMasterReplica();
while($aData = $oSet->FetchAssoc())
{
if ($aData['datasource']->GetKey() == SynchroDataSource::GetCurrentTaskId())
if ($aData['datasource']->GetKey() == SynchroExecution::GetCurrentTaskId())
{
// Ignore the current task (check to write => ok)
continue;

View File

@@ -639,6 +639,7 @@ Dict::Add('EN US', 'English', 'English', array(
'Core:SyncDataSourceObsolete' => 'The data source is marked as obsolete. Operation cancelled.',
'Core:SyncDataSourceAccessRestriction' => 'Only adminstrators or the user specified in the data source can execute this operation. Operation cancelled.',
'Core:SyncTooManyMissingReplicas' => 'All records have been untouched for some time (all of the objects could be deleted). Please check that the process that writes into the synchronization table is still running. Operation cancelled.',
'Core:SyncSplitModeCLIOnly' => 'The synchronization can be executed in chunks only if run in mode CLI',
'Core:Synchro:ListReplicas_AllReplicas_Errors_Warnings' => '%1$s replicas, %2$s error(s), %3$s warning(s).',
'Core:SynchroReplica:TargetObject' => 'Synchronized Object: %1$s',
'Class:AsyncSendEmail' => 'Email (asynchronous)',
@@ -733,6 +734,8 @@ Dict::Add('EN US', 'English', 'English', array(
'Class:appUserPreferences' => 'User Preferences',
'Class:appUserPreferences/Attribute:userid' => 'User',
'Class:appUserPreferences/Attribute:preferences' => 'Prefs',
'Core:ExecProcess:Code1' => 'Wrong command or command finished with errors (e.g. wrong script name)',
'Core:ExecProcess:Code255' => 'PHP Error (parsing, or runtime)',
));
//

133
synchro/priv_sync_chunk.php Normal file
View File

@@ -0,0 +1,133 @@
<?php
// Copyright (C) 2010 Combodo SARL
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; version 3 of the License.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
/**
* Internal: synchronize part of the records - cannot be invoked separately
*
* @author Erwan Taloc <erwan.taloc@combodo.com>
* @author Romain Quetiez <romain.quetiez@combodo.com>
* @author Denis Flaven <denis.flaven@combodo.com>
* @license http://www.opensource.org/licenses/gpl-3.0.html LGPL
*/
if (!defined('__DIR__')) define('__DIR__', dirname(__FILE__));
require_once(__DIR__.'/../approot.inc.php');
require_once(APPROOT.'/application/application.inc.php');
require_once(APPROOT.'/application/webpage.class.inc.php');
require_once(APPROOT.'/application/csvpage.class.inc.php');
require_once(APPROOT.'/application/clipage.class.inc.php');
require_once(APPROOT.'/application/startup.inc.php');
function ReadMandatoryParam($oP, $sParam, $sSanitizationFilter = 'parameter')
{
$sValue = utils::ReadParam($sParam, null, true /* Allow CLI */, $sSanitizationFilter);
if (is_null($sValue))
{
$oP->p("ERROR: Missing argument '$sParam'\n");
exit(29);
}
return trim($sValue);
}
/////////////////////////////////
// Main program
if (!utils::IsModeCLI())
{
$oP = new WebPage(Dict::S("TitleSynchroExecution"));
$oP->p("This page is used internally by iTop");
$oP->output();
exit -2;
}
$oP = new CLIPage(Dict::S("TitleSynchroExecution"));
try
{
utils::UseParamFile();
}
catch(Exception $e)
{
$oP->p("Error: ".$e->GetMessage());
$oP->output();
exit -2;
}
// Next steps:
// specific arguments: 'csvfile'
//
$sAuthUser = ReadMandatoryParam($oP, 'auth_user', 'raw_data');
$sAuthPwd = ReadMandatoryParam($oP, 'auth_pwd', 'raw_data');
if (UserRights::CheckCredentials($sAuthUser, $sAuthPwd))
{
UserRights::Login($sAuthUser); // Login & set the user's language
}
else
{
$oP->p("Access restricted or wrong credentials ('$sAuthUser')");
$oP->output();
exit -1;
}
$iStepCount = ReadMandatoryParam($oP, 'step_count');
$oP->p('Executing a partial synchro - step '.$iStepCount);
$iSource = ReadMandatoryParam($oP, 'source');
$iStatLog = ReadMandatoryParam($oP, 'log');
$iChange = ReadMandatoryParam($oP, 'change');
$sLastFullLoad = ReadMandatoryParam($oP, 'last_full_load', 'raw_data');
$iChunkSize = ReadMandatoryParam($oP, 'chunk');
$oP->p('Last full load: '.$sLastFullLoad);
$oP->p('Chunk size: '.$iChunkSize);
$oP->p('Source: '.$iSource);
try
{
$oSynchroDataSource = MetaModel::GetObject('SynchroDataSource', $iSource);
$oLog = MetaModel::GetObject('SynchroLog', $iStatLog);
$oChange = MetaModel::GetObject('CMDBChange', $iChange);
if (strlen($sLastFullLoad) > 0)
{
$oLastFullLoad = new DateTime($sLastFullLoad);
$oSynchroExec = new SynchroExecution($oSynchroDataSource, $oLastFullLoad);
}
else
{
$oSynchroExec = new SynchroExecution($oSynchroDataSource);
}
if ($oSynchroExec->DoSynchronizeChunk($oLog, $oChange, $iChunkSize))
{
$oP->p("continue");
}
else
{
$oP->p("done!");
}
$oP->output();
}
catch(Exception $e)
{
$oP->p("Error: ".$e->GetMessage());
$oP->add($e->getTraceAsString());
$oP->output();
exit(28);
}
?>

View File

@@ -51,7 +51,7 @@ function UsageAndExit($oP)
if ($bModeCLI)
{
$oP->p("USAGE:\n");
$oP->p("php -q synchro_exec.php --auth_user=<login> --auth_pwd=<password> --data_sources=<comma_separated_list_of_data_sources>\n");
$oP->p("php -q synchro_exec.php --auth_user=<login> --auth_pwd=<password> --data_sources=<comma_separated_list_of_data_sources> [max_chunk_size=<limit the count of replica loaded in a single pass>]\n");
}
else
{
@@ -147,7 +147,8 @@ foreach(explode(',', $sDataSourcesList) as $iSDS)
}
try
{
$oStatLog = $oSynchroDataSource->Synchronize(null);
$oSynchroExec = new SynchroExecution($oSynchroDataSource);
$oStatLog = $oSynchroExec->Process();
if ($bSimulate)
{
CMDBSource::Query('ROLLBACK');

View File

@@ -120,6 +120,13 @@ $aPageParams = array
'default' => 'summary',
'description' => '[retcode] to return the count of lines in error, [summary] to return a concise report, [details] to get a detailed report (each line listed)',
),
'max_chunk_size' => array
(
'mandatory' => false,
'modes' => 'cli',
'default' => '0',
'description' => 'Limit on the count of records that can be loaded at once while performing the synchronization',
),
/*
'reportlevel' => array
(
@@ -621,7 +628,8 @@ try
//
if ($bSynchronize)
{
$oStatLog = $oDataSource->Synchronize($oLoadStartDate);
$oSynchroExec = new SynchroExecution($oDataSource, $oLoadStartDate);
$oStatLog = $oSynchroExec->Process();
$oP->add_comment('Synchronization---');
$oP->add_comment('------------------');
if ($sOutput == 'details')

File diff suppressed because it is too large Load Diff

View File

@@ -2271,8 +2271,8 @@ class TestDataExchange extends TestBizModel
),
),
),
//);
//$aXXXXScenarios = array(
// );
// $aXXXXScenarios = array(
array(
'desc' => 'Update then delete with retention (to complete with manual testing) and reconciliation on org/name',
'login' => 'admin',
@@ -2361,7 +2361,7 @@ class TestDataExchange extends TestBizModel
),
array(
array('obj_C', 2, 'obj_C', 'production'),
),
),
),
'target_data' => array(
array('org_id', 'name', 'status'),
@@ -2403,7 +2403,7 @@ class TestDataExchange extends TestBizModel
array(2, 'obj_B', 'implementation'),
array(2, 'obj_C', 'production'),
array(2, 'obj_D', 'obsolete'),
),
),
),
'attributes' => array(
'org_id' => array(