Top level object to manage a single task. This object will manages an set of Jobs, each of which looks after a single job, submitting and resubmitting until successful or user intervention is required. ProtoJobs ========= Batches of new jobs are introduced into the system initially as "ProtoJobs". The following methods are used to manipulate them:- AddProtoJob(job_name,args_str="",env_str="") PromoteProtoJobs(job_name_pattern = ".*") RemoveProtoJobs(job_name_pattern = ".*")
Definition at line 14 of file GBSTask.py.
| def python::GBSTask::GBSTask::__init__ | ( | self, | ||
| name, | ||||
| parent, | ||||
| model, | ||||
| model_args | ||||
| ) |
Definition at line 36 of file GBSTask.py.
00036 : 00037 self.__scriptFileName = "" 00038 self.__scriptGlobalArgs = "" 00039 self.__globalEnvironment = "" 00040 self.__globalInputSandbox = "" 00041 self.__globalOutputSandbox = "" 00042 self.__backend = GetConfigValue("DefaultBackend") 00043 self.__submitEnabled = 0 00044 self.__maxGangaJobs = int(GetConfigValue("DefaultMaxGangaJobs")) 00045 self.__maxSubmitJobs = int(GetConfigValue("DefaultMaxSubmitJobs")) 00046 self.__mode = "Test" 00047 00048 # The remaining members are volatile i.e. not persisted but derived from other state. 00049 # Note that _jobManagers is not private so that MinosRSMTask can rename jobs. 00050 self._jobManagers = {} # Volatile: Hash name -> Job 00051 self.__gangaTreeDir = 'gbs/' + name # Ganga JobTree directory that holds jobs. 00052 00053 self.__jstatAll = 0 # Job stats: All 00054 self.__jstatNReadyAll = 0 # Job stats: Not ready - all 00055 self.__jstatNReadyOther = 0 # Job stats: Not ready - other than placed on hold 00056 self.__jstatReadyAll = 0 # Job stats: Ready to run - all 00057 self.__jstatReadyRetry = 0 # Job stats: Ready to run - retries 00058 self.__jstatSubmitAll = 0 # Job stats: Submitted to Ganga - all 00059 self.__jstatSubmitNRun = 0 # Job stats: Submitted to Ganga - but not running 00060 self.__jstatDoneAll = 0 # Job stats: Done - all 00061 self.__jstatDoneFail = 0 # Job stats: Done - Failed 00062 00063 GBSObject.__init__(self,name,parent,model) 00064 00065 # Create Ganga JobTree directory if necessary 00066 if not Ganga.GPI.jobtree.exists(self.__gangaTreeDir): Ganga.GPI.jobtree.mkdir(self.__gangaTreeDir) 00067 00068 self.MakeChildDirectory() 00069 self.__ReloadChildren() 00070 self.RefreshJobStats() 00071 00072 def _DoMemberIO(self,ioh):
| def python::GBSTask::GBSTask::_DoMemberIO | ( | self, | ||
| ioh | ||||
| ) | [private] |
Definition at line 73 of file GBSTask.py.
00073 : 00074 self.__scriptFileName = ioh("Script file name", "s",self.__scriptFileName) 00075 self.__scriptGlobalArgs = ioh("Script Global Args", "s",self.__scriptGlobalArgs) 00076 self.__globalEnvironment = ioh("+Global environment", "s",self.__globalEnvironment) 00077 self.__globalInputSandbox = ioh("+Global Input Sandbox List", "s",self.__globalInputSandbox) 00078 self.__globalOutputSandbox = ioh("+Global Output Sandbox List", "s",self.__globalOutputSandbox) 00079 self.__backend = ioh("Backend", "s",self.__backend) 00080 self.__submitEnabled = ioh("Submit Enabled", "i",self.__submitEnabled) 00081 self.__maxGangaJobs = ioh("Max Ganga Jobs", "i",self.__maxGangaJobs) 00082 self.__maxSubmitJobs = ioh("Max Submit Jobs", "i",self.__maxSubmitJobs) 00083 self.__mode = ioh("+Test/Production Mode", "s",self.__mode) 00084 GBSObject._DoMemberIO(self,ioh) 00085 def WriteFamily(self):
| def python::GBSTask::GBSTask::WriteFamily | ( | self | ) |
Definition at line 86 of file GBSTask.py.
00086 : 00087 self.Write() 00088 for job_name,job in self._jobManagers.iteritems():job.WriteFamily() 00089 def GetType(self): return "GBSTask"
| def python::GBSTask::GBSTask::GetType | ( | self | ) |
| def python::GBSTask::GBSTask::__repr__ | ( | self | ) |
Definition at line 92 of file GBSTask.py.
00092 : return self.AsString() 00093 00094 00095 #----------------------------------------------------------------------------------------------- 00096 # 00097 # User Callable Methods Getters 00098 # 00099 #----------------------------------------------------------------------------------------------- 00100 00101
| def python::GBSTask::GBSTask::AsString | ( | self, | ||
level = "Full" | ||||
| ) |
Return string description. Return string description at the following levels:- "Brief" one line summary suitable for rows in tables "Heading" one line summary suitable as heading for "Brief" "Full" full desciption including value of every data member
Definition at line 102 of file GBSTask.py.
00102 : 00103 00104 """Return string description. 00105 00106 Return string description at the following levels:- 00107 "Brief" one line summary suitable for rows in tables 00108 "Heading" one line summary suitable as heading for "Brief" 00109 "Full" full desciption including value of every data member""" 00110 00111 if ( level == "Heading"): 00112 s = "Name".ljust(20) 00113 s += "Model".ljust(15) 00114 s += "!Ready(other) Ready(retry) Sub(!run) Done(fail)".rjust(51) 00115 s += "Backend".rjust(10) + " " 00116 s += "Script file".ljust(40) 00117 return s 00118 if ( level == "Brief"): 00119 s = self.GetName().ljust(20) 00120 s += self.GetModel().ljust(15) 00121 s += str(self.__jstatNReadyAll).rjust(8) + "("+ str(self.__jstatNReadyOther).rjust(3) + ")" 00122 s += str(self.__jstatReadyAll).rjust(9) + "("+ str(self.__jstatReadyRetry).rjust(3) + ")" 00123 s += str(self.__jstatSubmitAll).rjust(8) + "("+ str(self.__jstatSubmitNRun).rjust(3) + ")" 00124 s += str(self.__jstatDoneAll).rjust(8) + "("+ str(self.__jstatDoneFail).rjust(3) + ")" 00125 backend = self.__backend 00126 mo = re.search(r"(.*?):",backend) 00127 if mo: backend = mo.group(1) 00128 s += backend.rjust(10) + " " 00129 s += self.__scriptFileName.ljust(40) 00130 return s 00131 00132 s = GBSObject.__repr__(self) + "\n\n" 00133 s += "Model: " + self.GetModel() + "\n\n" 00134 s += "Job Definition\n" 00135 s += " Script file: " + self.__scriptFileName + "\n" 00136 s += " Global args: '" + self.__scriptGlobalArgs + "'\n" 00137 s += " Global env: '" + self.__globalEnvironment + "'\n" 00138 s += " Input Sandbox: '" + self.__globalInputSandbox + "'\n" 00139 s += " Output Sandbox: '" + self.__globalOutputSandbox + "'\n\n" 00140 s += "Job Submission\n" 00141 s += " " 00142 if self.__submitEnabled: s += "Enabled\n" 00143 else: s += "Disabled\n" 00144 s += " Backend: " + self.__backend + "\n" 00145 s += " Limits: " + str(self.__maxSubmitJobs) + "(single submit) " + str(self.__maxGangaJobs) + "(maximum)\n" 00146 s += " Mode: " + self.__mode + "\n\n" 00147 s += "Managing " + str(self.__jstatAll) + " jobs\n" 00148 s += " Holding: " + str(self.__jstatNReadyAll).rjust(5) + " ("+ str(self.__jstatNReadyOther) + " other)\n" 00149 s += " Waiting: " + str(self.__jstatReadyAll).rjust(5) + " ("+ str(self.__jstatReadyRetry) + " retries)\n" 00150 s += " Submitted: " + str(self.__jstatSubmitAll).rjust(5) + " ("+ str(self.__jstatSubmitNRun) + " not running)\n" 00151 s += " Done: " + str(self.__jstatDoneAll).rjust(5) + " ("+ str(self.__jstatDoneFail) + " failed)\n" 00152 return s 00153 00154 def GetBackend(self):
| def python::GBSTask::GBSTask::GetBackend | ( | self | ) |
Return backend for future jobs
Definition at line 155 of file GBSTask.py.
00155 : 00156 """Return backend for future jobs""" 00157 return self.__backend 00158 def GetGangaTreeDir(self):
| def python::GBSTask::GBSTask::GetGangaTreeDir | ( | self | ) |
Return Ganga JobTree Directory that holds all Ganga jobs
Definition at line 159 of file GBSTask.py.
00159 : 00160 """Return Ganga JobTree Directory that holds all Ganga jobs""" 00161 return self.__gangaTreeDir 00162 def GetGlobalEnvironment(self):
| def python::GBSTask::GBSTask::GetGlobalEnvironment | ( | self | ) |
Return, as a comma separated list string, the environment that is global to all jobs
Definition at line 163 of file GBSTask.py.
00163 : 00164 """Return, as a comma separated list string, the environment that is global to all jobs""" 00165 return self.__globalEnvironment 00166 def GetGlobalInputSandbox(self):
| def python::GBSTask::GBSTask::GetGlobalInputSandbox | ( | self | ) |
Return, as a comma separated list string, the input sandbox file list that is global to all jobs
Definition at line 167 of file GBSTask.py.
00167 : 00168 """Return, as a comma separated list string, the input sandbox file list that is global to all jobs""" 00169 return self.__globalInputSandbox 00170 def GetGlobalOutputSandbox(self):
| def python::GBSTask::GBSTask::GetGlobalOutputSandbox | ( | self | ) |
Return, as a comma separated list string, the ouput sandbox file list that is global to all jobs
Definition at line 171 of file GBSTask.py.
00171 : 00172 """Return, as a comma separated list string, the ouput sandbox file list that is global to all jobs""" 00173 return self.__globalOutputSandbox 00174 def GetMaxGangaJobs(self):
| def python::GBSTask::GBSTask::GetMaxGangaJobs | ( | self | ) |
Return the maximum number of jobs that can be submitted to Ganga at any one time
Definition at line 175 of file GBSTask.py.
00175 : 00176 """Return the maximum number of jobs that can be submitted to Ganga at any one time """ 00177 return self.__maxGangaJobs 00178 def GetMaxSubmitJobs(self):
| def python::GBSTask::GBSTask::GetMaxSubmitJobs | ( | self | ) |
Return the maximum number of jobs that can be submitted by a single call SubmitJobs()
Definition at line 179 of file GBSTask.py.
00179 : 00180 """Return the maximum number of jobs that can be submitted by a single call SubmitJobs() """ 00181 return self.__maxSubmitJobs 00182 def GetMode(self):
| def python::GBSTask::GBSTask::GetMode | ( | self | ) |
Return the Test/Production Mode
Definition at line 183 of file GBSTask.py.
00183 : 00184 """Return the Test/Production Mode """ 00185 return self.__mode 00186 def GetScriptFileName(self):
| def python::GBSTask::GBSTask::GetScriptFileName | ( | self | ) |
Returns the current user application script file name (or "" if none).
Definition at line 187 of file GBSTask.py.
00187 : 00188 """Returns the current user application script file name (or "" if none). """ 00189 return self.__scriptFileName 00190 def GetScriptFileSpec(self):
| def python::GBSTask::GBSTask::GetScriptFileSpec | ( | self | ) |
Definition at line 191 of file GBSTask.py.
00191 : 00192 if not self.__scriptFileName : return "" 00193 return self.GetStoreLocation("child_dir") + "/" + self.__scriptFileName 00194 def GetJob(self,name,warn=True) :
| def python::GBSTask::GBSTask::GetJob | ( | self, | ||
| name, | ||||
warn = True | ||||
| ) |
Return Job called job_name e.g. job = task.GetJob("job_00123")Definition at line 195 of file GBSTask.py.
00195 : 00196 """Return Job called job_name e.g. job = task.GetJob("job_00123")""" 00197 if not self._jobManagers.has_key(name): 00198 if warn: print "Sorry, there is no Job named " + str(name) 00199 return None 00200 else : return self._jobManagers[name] 00201 def GetJobs(self,job_name_pattern = ".*"):
| def python::GBSTask::GBSTask::GetJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
| ) |
Return a dictionary: dict[name] job for all jobs matching pattern
Definition at line 202 of file GBSTask.py.
00202 : 00203 """Return a dictionary: dict[name] job for all jobs matching pattern""" 00204 00205 job_dict = {} 00206 for job_name,job in self._jobManagers.iteritems(): 00207 if re.search(job_name_pattern,job_name): job_dict[job_name] = job 00208 return job_dict 00209 def GetScriptGlobalArgs(self):
| def python::GBSTask::GBSTask::GetScriptGlobalArgs | ( | self | ) |
Returns (as a string) the comma separated list of application script args that are global to all jobs
Definition at line 210 of file GBSTask.py.
00210 : 00211 """Returns (as a string) the comma separated list of application script args that are global to all jobs """ 00212 return self.__scriptGlobalArgs 00213 def GetTestOnlyMethods(self):
| def python::GBSTask::GBSTask::GetTestOnlyMethods | ( | self | ) |
Return a list of methods that are only available in 'Test' mode.
Definition at line 214 of file GBSTask.py.
00214 : 00215 """Return a list of methods that are only available in 'Test' mode.""" 00216 return ["SetGlobalEnvironment", \ 00217 "SetGlobalInputSandbox", \ 00218 "SetGlobalOutputSandbox", \ 00219 "SetScriptFileName", \ 00220 "SetScriptGlobalArgs"] \ 00221 def IsAuthorisedToSubmit(self,warn=True):
| def python::GBSTask::GBSTask::IsAuthorisedToSubmit | ( | self, | ||
warn = True | ||||
| ) |
Returns True is authorised to submit jobs. Always True except if backend is LCG, then checks proxy
Definition at line 222 of file GBSTask.py.
00222 : 00223 """Returns True is authorised to submit jobs. 00224 00225 Always True except if backend is LCG, then checks proxy""" 00226 if self.GetBackend()[0:3] != "LCG" : return True 00227 ok = False 00228 try: ok = Ganga.GPI.gridProxy.isValid() 00229 except: ok = False 00230 if warn and not ok: print "Cannot submit jobs; backend is LCG but cannot find valid proxy" 00231 return ok 00232 def IsDisabled(self,method):
| def python::GBSTask::GBSTask::IsDisabled | ( | self, | ||
| method | ||||
| ) |
Return True if method is disabled.
Definition at line 233 of file GBSTask.py.
00233 : 00234 """Return True if method is disabled.""" 00235 00236 if self.GetMode() == 'Test': return False 00237 for disabled_method in self.GetTestOnlyMethods(): 00238 if method == disabled_method: 00239 print "Method " + method + " is disabled in Production mode" 00240 return True 00241 return False 00242 def ListJobs(self,job_name_pattern = ".*"):
| def python::GBSTask::GBSTask::ListJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
| ) |
List existing Jobs matching pattern
Definition at line 243 of file GBSTask.py.
00243 : 00244 00245 """List existing Jobs matching pattern""" 00246 00247 print self.GetName() + " has the following jobs", 00248 if job_name_pattern != ".*": print "that match the pattern: '" + str(job_name_pattern) + "'", 00249 print ":-" 00250 first = 1 00251 job_names = self._jobManagers.keys() 00252 job_names.sort() 00253 for job_name in job_names: 00254 if not re.search(job_name_pattern,job_name): continue 00255 job = self._jobManagers[job_name] 00256 if first: 00257 print job.AsString("Heading") + "\n" 00258 first = 0 00259 print job.AsString("Brief") 00260 if first: print " no jobs found" 00261 00262 00263 #----------------------------------------------------------------------------------------------- 00264 # 00265 # User Callable Methods Setters 00266 # 00267 #----------------------------------------------------------------------------------------------- 00268 00269 def AddJob(self,job_name,args_str = "",env_str = ""):
| def python::GBSTask::GBSTask::AddJob | ( | self, | ||
| job_name, | ||||
args_str = "", |
||||
env_str = "" | ||||
| ) |
Create a new named Job and optionally assign its application script local args and environment.
e.g. job = task.AddJob("job_00034621_0002","F00034621_0002.mdaq.root","mini_flux=no")
Note that the job_name must begin job_ (if omitted it will be added automatically) and
will be rejected if it contains anything beyond alphanumeric, '_' and '-'.for as it
will be used to name a directory.Definition at line 270 of file GBSTask.py.
00270 : 00271 00272 """Create a new named Job and optionally assign its application script local args and environment. 00273 00274 e.g. job = task.AddJob("job_00034621_0002","F00034621_0002.mdaq.root","mini_flux=no") 00275 00276 Note that the job_name must begin job_ (if omitted it will be added automatically) and 00277 will be rejected if it contains anything beyond alphanumeric, '_' and '-'.for as it 00278 will be used to name a directory.""" 00279 00280 return self._AddJobOrProtoJob(job_name,args_str,env_str,"Job") 00281 def ClearErrorCountsJobs(self,job_name_pattern= ".*"):
| def python::GBSTask::GBSTask::ClearErrorCountsJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
| ) |
After seeking confirmation, clear error counts on all suitable jobs matching pattern
Definition at line 282 of file GBSTask.py.
00282 : 00283 """After seeking confirmation, clear error counts on all suitable jobs matching pattern""" 00284 self.__ApplyActionToJobs("CLEAR ERROR COUNTS",job_name_pattern) 00285 def ClearHistoryJobs(self,job_name_pattern= ".*"):
| def python::GBSTask::GBSTask::ClearHistoryJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
| ) |
After seeking confirmation, clear histories on all suitable jobs matching pattern
Definition at line 286 of file GBSTask.py.
00286 : 00287 """After seeking confirmation, clear histories on all suitable jobs matching pattern""" 00288 self.__ApplyActionToJobs("CLEAR HISTORY",job_name_pattern) 00289 def EnableSubmit(self,enable=True):
| def python::GBSTask::GBSTask::EnableSubmit | ( | self, | ||
enable = True | ||||
| ) |
Enable or disable submission of jobs via the SubmitJobs() method.
The function returns the previous value of the switch.Definition at line 290 of file GBSTask.py.
00290 : 00291 """Enable or disable submission of jobs via the SubmitJobs() method. 00292 The function returns the previous value of the switch.""" 00293 00294 old_val = self.__submitEnabled 00295 self.__submitEnabled = 0 00296 if enable: self.__submitEnabled = 1 00297 self.Write() 00298 return old_val 00299 def HoldJobs(self,job_name_pattern= ".*"):
| def python::GBSTask::GBSTask::HoldJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
| ) |
After seeking confirmation, hold all suitable jobs matching pattern
Definition at line 300 of file GBSTask.py.
00300 : 00301 """After seeking confirmation, hold all suitable jobs matching pattern""" 00302 self.__ApplyActionToJobs("HOLD",job_name_pattern) 00303 def KillJobs(self,job_name_pattern= ".*"):
| def python::GBSTask::GBSTask::KillJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
| ) |
After seeking confirmation, kill all suitable jobs matching pattern
Definition at line 304 of file GBSTask.py.
00304 : 00305 """After seeking confirmation, kill all suitable jobs matching pattern""" 00306 self.__ApplyActionToJobs("KILL",job_name_pattern) 00307 def RefreshJobStats(self):
| def python::GBSTask::GBSTask::RefreshJobStats | ( | self | ) |
Bring job statistics up to date. Should not be necessary (but harmless) for user to call this as child jobs call it when their state changes. Note: unlike UpdateJobsStatus this simply asks jobs what their current state without checking Ganga
Definition at line 308 of file GBSTask.py.
00308 : 00309 00310 """ Bring job statistics up to date. 00311 00312 Should not be necessary (but harmless) for user to call this 00313 as child jobs call it when their state changes. Note: unlike 00314 UpdateJobsStatus this simply asks jobs what their current 00315 state without checking Ganga """ 00316 self.__jstatAll = 0 00317 self.__jstatNReadyAll = 0 00318 self.__jstatNReadyOther = 0 00319 self.__jstatReadyAll = 0 00320 self.__jstatReadyRetry = 0 00321 self.__jstatSubmitAll = 0 00322 self.__jstatSubmitNRun = 0 00323 self.__jstatDoneAll = 0 00324 self.__jstatDoneFail = 0 00325 for job_name,job in self._jobManagers.iteritems(): 00326 self.__jstatAll += 1 00327 pc = job.GetPhaseCode() 00328 if pc == GID_JPC_DONE_FAIL or pc == GID_JPC_DONE_NFAIL: self.__jstatDoneAll += 1 00329 if pc == GID_JPC_DONE_FAIL: self.__jstatDoneFail += 1 00330 if pc == GID_JPC_READY_RETRY or pc == GID_JPC_READY_NRETRY: self.__jstatReadyAll += 1 00331 if pc == GID_JPC_READY_RETRY: self.__jstatReadyRetry += 1 00332 if pc == GID_JPC_SUBMIT_RUN or pc == GID_JPC_SUBMIT_NRUN: self.__jstatSubmitAll += 1 00333 if pc == GID_JPC_SUBMIT_NRUN: self.__jstatSubmitNRun += 1 00334 if pc == GID_JPC_NREADY_HOLD or pc == GID_JPC_NREADY_NHOLD: self.__jstatNReadyAll += 1 00335 if pc == GID_JPC_NREADY_NHOLD: self.__jstatNReadyOther += 1 00336 def ReleaseJobs(self,job_name_pattern = ".*"):
| def python::GBSTask::GBSTask::ReleaseJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
| ) |
After seeking confirmation, release all suitable jobs matching pattern
Definition at line 337 of file GBSTask.py.
00337 : 00338 """After seeking confirmation, release all suitable jobs matching pattern""" 00339 self.__ApplyActionToJobs("RELEASE",job_name_pattern) 00340 def RemoveJobs(self,job_name_pattern= ".*"):
| def python::GBSTask::GBSTask::RemoveJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
| ) |
After seeking confirmation, remove all suitable jobs matching pattern
Definition at line 341 of file GBSTask.py.
00341 : 00342 """After seeking confirmation, remove all suitable jobs matching pattern""" 00343 self.__ApplyActionToJobs("REMOVE",job_name_pattern) 00344 def SetBackend(self,backend):
| def python::GBSTask::GBSTask::SetBackend | ( | self, | ||
| backend | ||||
| ) |
Define backend for future jobs: One of: Local, PBS:queue, LCG:queue
Definition at line 345 of file GBSTask.py.
00345 : 00346 00347 """Define backend for future jobs: One of: Local, PBS:queue, LCG:queue""" 00348 00349 # Check value before accepting 00350 if not re.search(r"^(Local|PBS|LCG)(:.+)?$",backend): 00351 print "Sorry, '" + str(backend) + "' is not an acceptible backend. It should be one of:-\n"\ 00352 + " Local, PBS{:queue} or LCG{:queue}" 00353 return 00354 self.__backend = backend 00355 self.Write() 00356 def SetGlobalEnvironment(self,env_str):
| def python::GBSTask::GBSTask::SetGlobalEnvironment | ( | self, | ||
| env_str | ||||
| ) |
Set, as a comma separated list string, the environment that is global to all jobs.
e.g. task.SetGlobalEnvironment('var1=123,var2=a string with spaces,var3=456')Definition at line 357 of file GBSTask.py.
00357 : 00358 00359 """Set, as a comma separated list string, the environment that is global to all jobs. 00360 00361 e.g. task.SetGlobalEnvironment('var1=123,var2=a string with spaces,var3=456')""" 00362 00363 if self.IsDisabled("SetGlobalEnvironment"): return 00364 d = GBSUtilities.ParseEnvStr(env_str) 00365 if env_str and not d: print "Cannot parse environment string: '" + env_str + "'" 00366 else: 00367 self.__globalEnvironment = env_str 00368 self.Write() 00369 def SetGlobalInputSandbox(self,in_sbox_str):
| def python::GBSTask::GBSTask::SetGlobalInputSandbox | ( | self, | ||
| in_sbox_str | ||||
| ) |
Set, as a comma separated list string, the input sandbox file list that is global to all jobs.
e.g. task.SetGlobalInputSandbox('/home/users/west/my_input_data.dat,../my_script.sh')Definition at line 370 of file GBSTask.py.
00370 : 00371 00372 """Set, as a comma separated list string, the input sandbox file list that is global to all jobs. 00373 00374 e.g. task.SetGlobalInputSandbox('/home/users/west/my_input_data.dat,../my_script.sh')""" 00375 00376 if self.IsDisabled("SetGlobalInputSandbox"): return 00377 (ok,str) = GBSUtilities.ProcessSandboxSetup(self,in_sbox_str,"input") 00378 if ok: 00379 self.__globalInputSandbox = str 00380 self.Write() 00381 def SetGlobalOutputSandbox(self,out_sbox_str):
| def python::GBSTask::GBSTask::SetGlobalOutputSandbox | ( | self, | ||
| out_sbox_str | ||||
| ) |
Set, as a comma separated list string, the output sandbox file list that is global to all jobs.
e.g. task.SetGlobalOutputSandbox('my_output_data.dat,my_output.log')Definition at line 382 of file GBSTask.py.
00382 : 00383 00384 """Set, as a comma separated list string, the output sandbox file list that is global to all jobs. 00385 00386 e.g. task.SetGlobalOutputSandbox('my_output_data.dat,my_output.log')""" 00387 00388 if self.IsDisabled("SetGlobalOutputSandbox"): return 00389 (ok,str) = GBSUtilities.ProcessSandboxSetup(self,out_sbox_str,"output") 00390 if ok: 00391 self.__globalOutputSandbox = str 00392 self.Write() 00393 def SetMaxGangaJobs(self,n):
| def python::GBSTask::GBSTask::SetMaxGangaJobs | ( | self, | ||
| n | ||||
| ) |
Set the maximum number of jobs that can be submitted to Ganga at any one time
Definition at line 394 of file GBSTask.py.
00394 : 00395 """Set the maximum number of jobs that can be submitted to Ganga at any one time""" 00396 00397 self.__maxGangaJobs = n 00398 self.Write() 00399 def SetMaxSubmitJobs(self,n):
| def python::GBSTask::GBSTask::SetMaxSubmitJobs | ( | self, | ||
| n | ||||
| ) |
Set the maximum number of jobs that can be submitted by a single call SubmitJobs()
Definition at line 400 of file GBSTask.py.
00400 : 00401 """Set the maximum number of jobs that can be submitted by a single call SubmitJobs()""" 00402 00403 self.__maxSubmitJobs = n 00404 self.Write() 00405 def SetMode(self,mode):
| def python::GBSTask::GBSTask::SetMode | ( | self, | ||
| mode | ||||
| ) |
Set the Test/Production Mode
Definition at line 406 of file GBSTask.py.
00406 : 00407 """Set the Test/Production Mode """ 00408 if mode != "Test" and mode != "Production": 00409 print "Illegal mode: '" + mode + "', only 'Test' and 'Production' permitted" 00410 return 00411 if self.__mode == mode: return 00412 if mode == 'Production': 00413 print "Caution: Switching to 'Production' mode will disable the following methods:-\n" 00414 print str(self.GetTestOnlyMethods()) 00415 print "\n but should be done before moving into full production." 00416 else: 00417 if not self.GetScriptFileName(): 00418 print "You cannot switch to 'Production' mode; no global script file has been defined." 00419 return 00420 print "Caution: Switching back to 'Test' mode is potentially dangerous as it re-enables the following methods:-\n" 00421 print str(self.GetTestOnlyMethods()) 00422 print "\n and changes using these methods could results that are not homogeneous" 00423 ans = raw_input("Do you want to proceed ?y[n]") 00424 if not re.search(r"^y",ans,re.I): 00425 print "Mode not changed." 00426 return 00427 print "Mode changed to " + mode 00428 self.__mode = mode 00429 self.Write() 00430 def SetScriptFileName(self,ext_file_spec):
| def python::GBSTask::GBSTask::SetScriptFileName | ( | self, | ||
| ext_file_spec | ||||
| ) |
Pass in script file to be used by jobs. GBS make a local copy
so no need to keep once passed.
e.g. task.SetScriptFileName("my/directory/my_script_file.sh")Definition at line 431 of file GBSTask.py.
00431 : 00432 00433 """Pass in script file to be used by jobs. GBS make a local copy 00434 so no need to keep once passed. 00435 00436 e.g. task.SetScriptFileName("my/directory/my_script_file.sh")""" 00437 00438 if self.IsDisabled("SetScriptFileName"): return 00439 import os 00440 if not os.path.isfile(ext_file_spec): 00441 print "Cannot find script file " + str(ext_file_spec) 00442 return 00443 self.__scriptFileName = os.path.basename(ext_file_spec) 00444 int_file_spec = self.GetScriptFileSpec() 00445 print "Copying " + str(ext_file_spec) + " -> " + str(int_file_spec) 00446 if os.system("cp " + str(ext_file_spec) + " " + str(int_file_spec)): 00447 print "Failed to copy file!" 00448 self.__scriptFileName = "" 00449 self.Write() 00450 def SetScriptGlobalArgs(self,arg_str):
| def python::GBSTask::GBSTask::SetScriptGlobalArgs | ( | self, | ||
| arg_str | ||||
| ) |
Set (as a string) the comma separated list of application script args that are global to all jobs.
Definition at line 451 of file GBSTask.py.
00451 : 00452 00453 """Set (as a string) the comma separated list of application script args that are global to all jobs.""" 00454 00455 if self.IsDisabled("SetScriptGlobalArgs"): return 00456 self.__scriptGlobalArgs = arg_str 00457 self.Write() 00458 def SubmitJobs(self,num_req = 0):
| def python::GBSTask::GBSTask::SubmitJobs | ( | self, | ||
num_req = 0 | ||||
| ) |
If enabled (see EnableSubmit()), submit up to the limit (see GetMaxSubmitJobs()) jobs, but not to exceed the maximum (see GetMaxGangaJobs()) first choosing ones scheduled for retry and then new jobs. Function returns number submitted.
Definition at line 459 of file GBSTask.py.
00459 : 00460 00461 """If enabled (see EnableSubmit()), 00462 submit up to the limit (see GetMaxSubmitJobs()) jobs, 00463 but not to exceed the maximum (see GetMaxGangaJobs()) 00464 first choosing ones scheduled for retry and 00465 then new jobs. Function returns number submitted.""" 00466 00467 num_sub = 0 00468 00469 # Begin by making sure that submit is authorised, enabled and that there is a global script. 00470 if not self.IsAuthorisedToSubmit(): return num_sub 00471 if not self.__submitEnabled: 00472 print "Sorry, cannot submit; submit not enabled." 00473 return num_sub 00474 if not self.__scriptFileName: 00475 print "Sorry, cannot submit; no user application script assigned." 00476 return num_sub 00477 00478 # Make sure all jobs are up to date and that limits are not exceeded. 00479 self.UpdateJobsStatus() 00480 if self.__maxGangaJobs <= self.__jstatSubmitAll: 00481 print "Sorry, cannot submit; already have %d (limit %d)" % (self.__jstatSubmitAll,self.__maxGangaJobs) 00482 return num_sub 00483 if num_req <= 0: num_req = self.__maxSubmitJobs 00484 max_job = min(num_req,self.__maxGangaJobs - self.__jstatSubmitAll) 00485 if max_job < num_req: print "Sorry, can only submit %d, close to the limit of %d" % (max_job,self.__maxGangaJobs) 00486 00487 # First look for retries and then look for new jobs. 00488 for jsc in [GID_JSC_RETRY,GID_JSC_NEW]: 00489 job_names = self._jobManagers.keys() 00490 job_names.sort() 00491 for job_name in job_names: 00492 job = self._jobManagers[job_name] 00493 if job.GetStatusCode() != jsc: continue 00494 job.Submit() 00495 num_sub += 1 00496 if num_sub >= max_job: return num_sub 00497 return num_sub 00498 def UpdateJobsStatus(self):
| def python::GBSTask::GBSTask::UpdateJobsStatus | ( | self | ) |
Send all jobs their UpdateStatus(). May involve checking Ganga job and retrieving output. As a by product, also remove any orphaned Ganga jobs i.e. Ganga jobs in Task's jobtree folder but not owned by any Job
Definition at line 499 of file GBSTask.py.
00499 : 00500 00501 """Send all jobs their UpdateStatus(). May involve checking Ganga job and retrieving output. 00502 00503 As a by product, also remove any orphaned Ganga jobs i.e. Ganga jobs in Task's jobtree folder 00504 but not owned by any Job""" 00505 00506 owned_ganga_ids = {} 00507 for job_name,job in self._jobManagers.iteritems(): 00508 job.UpdateStatus() 00509 owned_ganga_ids[job.GetGangaJobId()] = 1 00510 for gj in Ganga.GPI.jobtree.getjobs(self.__gangaTreeDir): 00511 if owned_ganga_ids.has_key(gj.id): continue 00512 Log(logger.WARNING,"Removing orphaned Ganga job %d" % gj.id) 00513 gj.remove() 00514 00515 00516 #----------------------------------------------------------------------------------------------- 00517 def WriteHtmlReport(self,task_dir):
| def python::GBSTask::GBSTask::WriteHtmlReport | ( | self, | ||
| task_dir | ||||
| ) |
Write HTML status report <task_dir>/<task-name>.html. Reports of individual jobs will be created in <task_dir>/<task-name>./<job-name>.html.
Definition at line 518 of file GBSTask.py.
00518 : 00519 """Write HTML status report <task_dir>/<task-name>.html. 00520 00521 Reports of individual jobs will be created in <task_dir>/<task-name>./<job-name>.html.""" 00522 00523 if not os.path.isdir(task_dir): 00524 print "Unable to find directory: " + str(task_dir) 00525 return 00526 task_name = self.GetName() 00527 00528 # Create or wipe directory for job reports if necessary 00529 job_dir = task_dir + "/" + task_name 00530 if not os.path.isdir(job_dir): 00531 try: 00532 os.mkdir(job_dir,0755) 00533 except OSError: 00534 print "Unable to create directory: " + str(job_dir) 00535 return 00536 else: 00537 os.system("rm -f " + job_dir + "/*") 00538 00539 # Make sure that job statistics are right up to date 00540 self.RefreshJobStats() 00541 00542 # Create file for task report and record task data 00543 task_file = task_dir + "/" + task_name + '.html' 00544 try: 00545 f_task = open(task_file, 'w') 00546 except OSError: 00547 print "Unable to create file: " + str(task_file) 00548 return 00549 f_task.write("<html><body>\n") 00550 f_task.write("<h1>Status report for Task " + task_name + " produced on " + GBSTimeStamp() + "</h1>\n<pre>\n") 00551 f_task.write(str(self)) 00552 f_task.write("\n</pre>\n") 00553 00554 # Define colour codes 00555 red = "#ff6666" 00556 green = "#66ff66" 00557 blue = "#66aaff" 00558 purple = "#ff44ff" 00559 grey_light = "#999999" 00560 grey_dark = "#777777" 00561 00562 # Construct the colour table with links to start points in table sorted by job phase. 00563 f_task.write("<h2>Task Status Table</h2>\n") 00564 f_task.write("<p><table border=1>\n") 00565 00566 f_task.write("<tr><td colspan=2 align=center bgcolor=" + grey_light + ">Not ready HELD (") 00567 num_nready_held = self.__jstatNReadyAll - self.__jstatNReadyOther 00568 if num_nready_held: 00569 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_NREADY_HOLD,num_nready_held)) 00570 else: f_task.write('0)\n') 00571 00572 f_task.write("<tr><td colspan=2 align=center bgcolor=" + grey_dark + ">Not ready Not HELD (") 00573 if self.__jstatNReadyOther: 00574 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_NREADY_NHOLD,self.__jstatNReadyOther)) 00575 else: f_task.write('0)\n') 00576 00577 f_task.write("<tr><td rowspan=2 align=center bgcolor=" + blue + \ 00578 ">Ready (" + str(self.__jstatReadyAll) + ")\n") 00579 00580 f_task.write("<td align=center bgcolor=" + green + ">First attempt (") 00581 num_ready_nretry = self.__jstatReadyAll - self.__jstatReadyRetry 00582 if num_ready_nretry: 00583 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_READY_NRETRY,num_ready_nretry)) 00584 else: f_task.write('0)\n') 00585 00586 f_task.write("<tr><td align=center bgcolor=" + red + ">Retry (") 00587 if self.__jstatReadyRetry: 00588 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_READY_RETRY,self.__jstatReadyRetry)) 00589 else: f_task.write('0)\n') 00590 00591 f_task.write("<tr><td rowspan=2 align=center bgcolor=" + purple + \ 00592 ">Submitted (" + str(self.__jstatSubmitAll) + ")\n") 00593 00594 f_task.write("<td align=center bgcolor=" + green + ">Running (") 00595 num_submit_run = self.__jstatSubmitAll - self.__jstatSubmitNRun 00596 if num_submit_run: 00597 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_SUBMIT_RUN,num_submit_run)) 00598 else: f_task.write('0)\n') 00599 00600 f_task.write("<tr><td align=center bgcolor=" + red + ">Not running (\n") 00601 if self.__jstatSubmitNRun: 00602 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_SUBMIT_NRUN,self.__jstatSubmitNRun)) 00603 else: f_task.write('0)\n') 00604 00605 f_task.write("<tr><td colspan=2 align=center bgcolor=" + green + ">Succeeded (") 00606 num_done_nfail = self.__jstatDoneAll - self.__jstatDoneFail 00607 if num_done_nfail: 00608 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_DONE_NFAIL,num_done_nfail)) 00609 else: f_task.write('0)\n') 00610 00611 f_task.write("<tr><td colspan=2 align=center bgcolor=" + red + ">Failed (") 00612 if self.__jstatDoneFail: 00613 f_task.write('<a href="#phase_%d">%d</a>)\n' % (GID_JPC_DONE_FAIL,self.__jstatDoneFail)) 00614 else: f_task.write('0)\n') 00615 00616 f_task.write("</table>\n") 00617 00618 # Prepare a hah of lists: job phase -> list of table rows 00619 phase_rows = {} 00620 phase_rows[GID_JPC_NREADY_HOLD] = [] 00621 phase_rows[GID_JPC_NREADY_NHOLD] = [] 00622 phase_rows[GID_JPC_READY_NRETRY] = [] 00623 phase_rows[GID_JPC_READY_RETRY] = [] 00624 phase_rows[GID_JPC_SUBMIT_RUN] = [] 00625 phase_rows[GID_JPC_SUBMIT_NRUN] = [] 00626 phase_rows[GID_JPC_DONE_NFAIL] = [] 00627 phase_rows[GID_JPC_DONE_FAIL] = [] 00628 00629 ## Loop over all jobs, recording summary in a table in the parent task file with 00630 ## links to a file holding job details and associate Ganga job (if any). 00631 f_task.write('<h2><a name="table_by_name">Job Table ordered by Name</a></h2\n') 00632 f_task.write('<p>See also <a href="#table_by_phase">Job Table ordered by Phase</a><p\n') 00633 table_header = "<p><table border=1>\n<tr><th>Job Name<th>Status<br>Date Time<th>Ganga<br>Job<th>Status\n" 00634 f_task.write(table_header) 00635 job_names = self._jobManagers.keys() 00636 job_names.sort() 00637 for job_name in job_names: 00638 job = self._jobManagers[job_name] 00639 job_name = job.GetName() 00640 00641 # Decide colours for table entry 00642 pc = job.GetPhaseCode() 00643 colour_1,colour_2 = grey_dark, grey_dark 00644 if pc == GID_JPC_NREADY_HOLD: colour_1,colour_2 = grey_light,grey_light 00645 if pc == GID_JPC_NREADY_NHOLD: colour_1,colour_2 = grey_dark, grey_dark 00646 if pc == GID_JPC_READY_NRETRY: colour_1,colour_2 = blue, green 00647 if pc == GID_JPC_READY_RETRY: colour_1,colour_2 = blue, red 00648 if pc == GID_JPC_SUBMIT_RUN: colour_1,colour_2 = purple, green 00649 if pc == GID_JPC_SUBMIT_NRUN: colour_1,colour_2 = purple, red 00650 if pc == GID_JPC_DONE_NFAIL: colour_1,colour_2 = green, green 00651 if pc == GID_JPC_DONE_FAIL: colour_1,colour_2 = red, red 00652 00653 # Initialise string that will hold a single row 00654 table_row = "" 00655 00656 # Create job file and link table entry to it. 00657 job_file = job_dir + "/" + job_name + '.html' 00658 f_job = open(job_file, 'w') 00659 f_job.write("<html><body>\n") 00660 f_job.write("<h1>Status report for Job " + job_name + " produced on " + GBSTimeStamp() + "</h1>\n") 00661 f_job.write("<p>This job is part of the <a href=\"../" + task_name + ".html>") 00662 f_job.write(task_name + "</a> task<p>\n<pre>\n") 00663 f_job.write(str(job)) 00664 f_job.write("</pre></body></html>\n") 00665 f_job.close() 00666 table_row += "<tr><td bgcolor=" + colour_1 + "><a href=\"" + task_name + "/" \ 00667 + job_name + ".html\">" + job_name + "</a>" 00668 00669 # Chop off year and second to keep date more compact 00670 table_row += "<td bgcolor=" + colour_1 + ">" + job.GetStatusTime()[6:-4] 00671 table_row += "<td bgcolor=" + colour_1 + ">" 00672 gj_id = "unknown" 00673 gj_stat_file = "" 00674 gj_obj = job.GetGangaJob() 00675 if gj_obj: 00676 gj_id = gj_obj.id 00677 else : 00678 gj_stat_file = job._GetTryOutputDir() + "/gbs_ganga.status" 00679 if os.path.isfile(gj_stat_file): 00680 f = os.popen('grep "id " ' + gj_stat_file) 00681 line = f.readline() 00682 f.close() 00683 mo = re.search(r"id = (\d+)",line) 00684 if mo: gj_id = mo.group(1) 00685 gj_obj = open(gj_stat_file) 00686 if not gj_obj: 00687 table_row += "n/a" 00688 else: 00689 00690 # Create Ganga job file 00691 gj_file = job_dir + "/" + job_name + '_ganga_job.html' 00692 f_gj = open(gj_file, 'w') 00693 f_gj.write("<html><body>\n") 00694 f_gj.write("<p>This job is part of the <a href=\"../" + task_name + ".html>") 00695 f_gj.write(task_name + "</a> task<p>\n<pre>\n") 00696 if gj_stat_file: 00697 f_gj.write("Dump of expired Ganga Job\n\n") 00698 for line in gj_obj: f_gj.write(line) 00699 gj_obj.close() 00700 else: 00701 f_gj.write("Dump of active Ganga Job\n\n") 00702 f_gj.write(str(gj_obj)) 00703 f_gj.write("</pre></body></html>\n") 00704 f_gj.close() 00705 table_row += "<a href=\"" + task_name + "/" + job_name + "_ganga_job.html\">" + str(gj_id) + "</a>" 00706 00707 table_row += "<td bgcolor=" + colour_2 + ">" + GIDStringForJSC(job.GetStatusCode()).ljust(23) \ 00708 + "[" + job.GetStatusText() + "]\n" 00709 # Record row in table ordered by name and and in hash ready forr table ordered by phase. 00710 f_task.write(table_row) 00711 phase_rows[pc].append(table_row) 00712 00713 f_task.write("</table>\n") 00714 00715 ## Construct table ordered by phase 00716 f_task.write('<h2><a name="table_by_phase">Job Table ordered by Phase</a></h2>\n') 00717 f_task.write('<p>See also <a href="#table_by_name">Job Table ordered by Name</a><p>\n') 00718 f_task.write(table_header) 00719 for pc,rows in phase_rows.iteritems(): 00720 f_task.write('<a name="phase_%d"></a>\n' % pc) 00721 for row in rows: f_task.write(row) 00722 f_task.write("</table>\n") 00723 f_task.write("</body></html>\n") 00724 f_task.close() 00725 print "HTML Status report written to " + task_file 00726 00727 00728 00729 #----------------------------------------------------------------------------------------------- 00730 # 00731 # ProtoJob Methods (user callable) 00732 # 00733 #----------------------------------------------------------------------------------------------- 00734 00735 def AddProtoJob(self,job_name,args_str="",env_str=""):
| def python::GBSTask::GBSTask::AddProtoJob | ( | self, | ||
| job_name, | ||||
args_str = "", |
||||
env_str = "" | ||||
| ) |
Add a ProtJob with the name job_name and assign its local args the string args_str
and environment env_str.Definition at line 736 of file GBSTask.py.
00736 : 00737 00738 """Add a ProtJob with the name job_name and assign its local args the string args_str 00739 and environment env_str.""" 00740 00741 if env_str: 00742 d = GBSUtilities.ParseEnvStr(env_str) 00743 if not d: 00744 print "Cannot parse environment string: " + env_str 00745 return None 00746 return self._AddJobOrProtoJob(job_name,args_str,env_str,"GBSProtoJob") 00747 def PromoteProtoJobs(self,job_name_pattern = ".*"):
| def python::GBSTask::GBSTask::PromoteProtoJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
| ) |
Promote all ProtoJobs whose name matches the supplied pattern. For example ".*" promotes all.
Definition at line 748 of file GBSTask.py.
00748 : 00749 00750 """Promote all ProtoJobs whose name matches the supplied pattern. For example ".*" promotes all.""" 00751 00752 pro_list = [] # List of names to be promoted (give user a last chance to abort) 00753 for job_name,job in self._jobManagers.iteritems(): 00754 if job.GetType() != "GBSProtoJob": continue 00755 if re.search(job_name_pattern,job_name): pro_list.append(job_name) 00756 num_pjobs = len(pro_list) 00757 if not num_pjobs: 00758 print "No ProtoJobs to promote" 00759 return 00760 ans = raw_input(str(num_pjobs) + " ProtoJobs are due for promotion. Do you want to proceed ?y[n]") 00761 if not re.search(r"^y",ans,re.I): 00762 print "Promotion aborted" 00763 return 00764 for job_name in pro_list: 00765 pjob = self._jobManagers[job_name] 00766 job = GetModelRegistry().CreateObject(self.GetModel(),"Job",job_name,self) 00767 job.SetScriptLocalArgs(pjob.GetScriptLocalArgs()) 00768 job.SetLocalEnvironment(pjob.GetLocalEnvironment()) 00769 job.SetLocalInputSandbox(pjob.GetLocalInputSandbox()) 00770 job.SetLocalOutputSandbox(pjob.GetLocalOutputSandbox()) 00771 self._jobManagers[job_name] = job 00772 print str(len(pro_list)) + " ProtoJobs promoted" 00773 def RemoveProtoJobs(self,job_name_pattern = ".*"):
| def python::GBSTask::GBSTask::RemoveProtoJobs | ( | self, | ||
job_name_pattern = ".*" | ||||
| ) |
Remove all ProtoJobs whose name matches the supplied pattern. For example ".*" removes all
Definition at line 774 of file GBSTask.py.
00774 : 00775 00776 """Remove all ProtoJobs whose name matches the supplied pattern. For example ".*" removes all""" 00777 00778 del_list = [] # List of names to be deleted (cannot delete elements while iterating) 00779 for job_name,job in self._jobManagers.iteritems(): 00780 if job.GetType() != "GBSProtoJob": continue 00781 if re.search(job_name_pattern,job_name): del_list.append(job_name) 00782 for job_name in del_list: del self._jobManagers[job_name] 00783 print str(len(del_list)) + " ProtoJobs deleted" 00784 00785 00786 00787 #----------------------------------------------------------------------------------------------- 00788 # 00789 # Private Methods (not user callable) 00790 # 00791 #----------------------------------------------------------------------------------------------- 00792 00793 def _AddJobOrProtoJob(self,job_name,args_str,env_str,type):
| def python::GBSTask::GBSTask::_AddJobOrProtoJob | ( | self, | ||
| job_name, | ||||
| args_str, | ||||
| env_str, | ||||
| type | ||||
| ) | [private] |
Create a new named Job or ProtoJob and optionally assign its application script local args.
Definition at line 794 of file GBSTask.py.
00794 : 00795 00796 """Create a new named Job or ProtoJob and optionally assign its application script local args.""" 00797 00798 # Check that the name is legal and not already in use. 00799 if re.search(r"[^a-zA-Z0-9_\-]",job_name): 00800 print "Sorry, '" + str(job_name) + "' is an illegal job name (characters other than alphanumeric, '_' and '-')" 00801 return None 00802 if not re.search(r"^job_",job_name): job_name = 'job_' + job_name 00803 if self._jobManagers.has_key(job_name): 00804 print "Sorry, there already is a Job named '" + str(job_name) + "'" 00805 return None 00806 00807 # Name O.K., go create the Job. 00808 job = None 00809 if ( type == "GBSProtoJob" ): job = GBSProtoJob(job_name) 00810 else: job = GetModelRegistry().CreateObject(self.GetModel(),"Job",job_name,self) 00811 if args_str: job.SetScriptLocalArgs(args_str) 00812 if env_str: job.SetLocalEnvironment(env_str) 00813 self._jobManagers[job_name] = job 00814 return job 00815 def __ApplyActionToJobs(self,action,job_name_pattern):
| def python::GBSTask::GBSTask::__ApplyActionToJobs | ( | self, | ||
| action, | ||||
| job_name_pattern | ||||
| ) | [private] |
Apply an action (CLEAR ERROR COUNTS, CLEAR HISTORY, HOLD, KILL, RELEASE) to all jobs matching pattern
Definition at line 816 of file GBSTask.py.
00816 : 00817 00818 """Apply an action (CLEAR ERROR COUNTS, CLEAR HISTORY, HOLD, KILL, RELEASE) to all jobs matching pattern""" 00819 job_list = [] # List of job names to which action can be applied 00820 for job_name,job in self._jobManagers.iteritems(): 00821 if not re.search(job_name_pattern,job_name): continue 00822 if ( action == "CLEAR ERROR COUNTS" and job.CanClear() ) \ 00823 or ( action == "CLEAR HISTORY" and job.CanClear() ) \ 00824 or ( action == "HOLD" and job.IsReady() ) \ 00825 or ( action == "KILL" and job.CanKill() ) \ 00826 or ( action == "RELEASE" and job.IsHeld() ) \ 00827 or ( action == "REMOVE" and job.CanClear() ): 00828 job_list.append(job_name) 00829 num_pjobs = len(job_list) 00830 if not num_pjobs: 00831 print "No jobs suitable for action " + action 00832 return 00833 print str(num_pjobs) + " jobs suitable for action " + action + ":-" 00834 for job_name in job_list: print " " + job_name 00835 ans = raw_input("Do you want to proceed ?y[n]") 00836 if not re.search(r"^y",ans,re.I): 00837 print "Action " + action + " aborted" 00838 return 00839 if action == "CLEAR HISTORY" or action == "REMOVE": 00840 warn = "this wipes all output for the job(s)" 00841 if action == "REMOVE": warn = "this entirely removes the job(s)" 00842 ans = raw_input("Are you REALLY sure - " + warn + "! ?y[n]") 00843 if not re.search(r"^y",ans,re.I): 00844 print "Action " + action + " aborted" 00845 return 00846 for job_name in job_list: 00847 job = self._jobManagers[job_name] 00848 if action == "CLEAR ERROR COUNTS": job.ClearErrorCounts() 00849 if action == "CLEAR HISTORY": job.ClearHistory(False) 00850 if action == "HOLD": job.Hold() 00851 if action == "KILL": job.Kill() 00852 if action == "RELEASE": job.Release() 00853 if action == "REMOVE": job.Remove(False) 00854 print "Action " + action + " applied to " + str(len(job_list)) + " jobs" 00855 00856 def __ReloadChildren(self):
| def python::GBSTask::GBSTask::__ReloadChildren | ( | self | ) | [private] |
Reload child Jobs from disk. Loop over all job_*.state files and recreate. Make sure any associated Ganga job is in the associated JobTree directory
Definition at line 857 of file GBSTask.py.
00857 : 00858 00859 """Reload child Jobs from disk. 00860 00861 Loop over all job_*.state files and recreate. 00862 Make sure any associated Ganga job is in the associated JobTree directory""" 00863 00864 import os, re 00865 00866 child_dir = self.GetStoreLocation("child_dir") 00867 child_state_files = os.listdir(child_dir) 00868 for child_state_file in child_state_files: 00869 00870 # Only look at job_*.state files 00871 child_state_file_spec = child_dir + "/" + child_state_file 00872 if not os.path.isfile(child_state_file_spec): continue 00873 mo = re.search(r"^(job_.*)\.state$",child_state_file) 00874 if not mo: continue 00875 child_name = mo.group(1) 00876 job = self.AddJob(child_name) 00877 # Ignore errors while trying to move associated Ganga job into correct JobTree directory 00878 # There may not be any Ganga job 00879 try: 00880 g_job = job.GetGangaJob() 00881 Ganga.GPI.jobtree.add(g_job,self.__gangaTreeDir) 00882 except: pass 00883 00884 class GBSProtoJob:
Definition at line 37 of file GBSTask.py.
Definition at line 38 of file GBSTask.py.
Definition at line 39 of file GBSTask.py.
Definition at line 40 of file GBSTask.py.
Definition at line 41 of file GBSTask.py.
python::GBSTask::GBSTask::__backend [private] |
Definition at line 42 of file GBSTask.py.
Definition at line 43 of file GBSTask.py.
python::GBSTask::GBSTask::__maxGangaJobs [private] |
Definition at line 44 of file GBSTask.py.
Definition at line 45 of file GBSTask.py.
python::GBSTask::GBSTask::__mode [private] |
Definition at line 46 of file GBSTask.py.
python::GBSTask::GBSTask::_jobManagers [private] |
Definition at line 50 of file GBSTask.py.
python::GBSTask::GBSTask::__gangaTreeDir [private] |
Definition at line 51 of file GBSTask.py.
python::GBSTask::GBSTask::__jstatAll [private] |
Definition at line 53 of file GBSTask.py.
Definition at line 54 of file GBSTask.py.
Definition at line 55 of file GBSTask.py.
Definition at line 56 of file GBSTask.py.
Definition at line 57 of file GBSTask.py.
Definition at line 58 of file GBSTask.py.
Definition at line 59 of file GBSTask.py.
python::GBSTask::GBSTask::__jstatDoneAll [private] |
Definition at line 60 of file GBSTask.py.
Definition at line 61 of file GBSTask.py.
1.5.4