Object to submit, and if necessary resubmit a job until it succeeds or needs user intervention. This class is responsible for submitting and if necessary resubmitting a job until it is successful or user intervention is unavoidible.
Definition at line 17 of file GBSJob.py.
| def python::GBSJob::GBSJob::__init__ | ( | self, | ||
| name, | ||||
| parent, | ||||
| model, | ||||
| model_args | ||||
| ) |
Definition at line 30 of file GBSJob.py.
00030 : 00031 self.__tryNumber = 0 # 0 before first try 00032 self.__earlyFails = 0 # Symptomatic of a system failure 00033 self.__lateFailsHandled = 0 # Job signalling retry 00034 self.__lateFailsUnhandled = 0 # Job aborting without signal 00035 self.__scriptLocalArgs = "" # Application script local args 00036 self.__localEnvironment = "" # The environment that are local to this job. 00037 self.__localInputSandbox = "" # The list of input sandbox files that are local to this job. 00038 self.__localOutputSandbox = "" # The list of output sandbox files that are local to this job. 00039 00040 # statusCode: See GBSIdCodes 00041 self.__statusCode = GID_JSC_NEW 00042 # Associated text to qualify statusCode. Has specific meaning when 00043 # __statusCode has the following values which correspond to the values 00044 # passed back from the application script via the GBS Log File:- 00045 # 00046 # GID_JSC_SUCCEEDED The list of output files 00047 # GID_JSC_FAILED Diagnostic message about the failure 00048 # GID_JSC_RETRY The retry args for next attempt. 00049 self.__statusText = "Ready to run" 00050 self.__statusTime = timestamp() 00051 # The current retry args i.e. as determined from previous try (or empty for first try) 00052 self.__retryArgs = "" 00053 self.__gangaJobId = -1 00054 00055 00056 GBSObject.__init__(self,name,parent,model) 00057 def _DoMemberIO(self,ioh):
| def python::GBSJob::GBSJob::_DoMemberIO | ( | self, | ||
| ioh | ||||
| ) | [private] |
Definition at line 58 of file GBSJob.py.
00058 : 00059 self.__tryNumber = ioh("Try Number","i",self.__tryNumber) 00060 self.__retryArgs = ioh("Current Retry Args","s",self.__retryArgs) 00061 self.__earlyFails = ioh("Early Fail Count","i",self.__earlyFails) 00062 self.__lateFailsHandled = ioh("Late Handled Fails Count","i",self.__lateFailsHandled) 00063 self.__lateFailsUnhandled = ioh("Late Unhandled Fails Count","i",self.__lateFailsUnhandled) 00064 self.__scriptLocalArgs = ioh("Script Local Args","s",self.__scriptLocalArgs) 00065 self.__localEnvironment = ioh("+Local environment","s",self.__localEnvironment) 00066 self.__localInputSandbox = ioh("+Local Input Sandbox List", "s",self.__localInputSandbox) 00067 self.__localOutputSandbox = ioh("+Local Output Sandbox List", "s",self.__localOutputSandbox) 00068 self.__statusCode = ioh("Status Code","i",self.__statusCode) 00069 self.__statusText = ioh("Status Text","s",self.__statusText) 00070 self.__statusTime = ioh("+Status Time Stamp","s",self.__statusTime) 00071 self.__gangaJobId = ioh("+Ganga Job Id","i",self.__gangaJobId) 00072 00073 # During transition may need to infer __gangaJobId from __statusCode 00074 if self.__statusCode >= GID_JSC_SUBMITTED: self.__gangaJobId = self.__statusCode 00075 00076 GBSObject._DoMemberIO(self,ioh) 00077 def GetType(self): return "GBSJob"
| def python::GBSJob::GBSJob::GetType | ( | self | ) |
| def python::GBSJob::GBSJob::__repr__ | ( | self | ) |
Definition at line 80 of file GBSJob.py.
00080 : return self.AsString() 00081 00082 00083 #----------------------------------------------------------------------------------------------- 00084 # 00085 # User Callable Methods Setters 00086 # 00087 #----------------------------------------------------------------------------------------------- 00088
| def python::GBSJob::GBSJob::AsString | ( | self, | ||
level = "Full" | ||||
| ) |
Return string description. Return string description at the following levels:- "Brief" one line summary suitable for rows in tables "Heading" one line summary suitable as heading for "Brief" "Full" full desciption including value of every data member
Definition at line 89 of file GBSJob.py.
00089 : 00090 00091 """Return string description. 00092 00093 Return string description at the following levels:- 00094 "Brief" one line summary suitable for rows in tables 00095 "Heading" one line summary suitable as heading for "Brief" 00096 "Full" full desciption including value of every data member""" 00097 00098 if ( level == "Heading"): 00099 s = "Name".ljust(20) 00100 s += "Status".ljust(23) 00101 s += "Input".ljust(40) 00102 s += "Status Details" 00103 return s 00104 if ( level == "Brief"): 00105 s = self.GetName().ljust(20) 00106 s += GIDStringForJSC(self.__statusCode).ljust(23) 00107 s += (self.__scriptLocalArgs + ";" + self.__localEnvironment).ljust(40) 00108 s += "[" + self.__statusText + "]" 00109 return s 00110 00111 s = GBSObject.__repr__(self) + "\n\n" 00112 s += "Status: " + GIDStringForJSC(self.__statusCode) + " [" + self.__statusText + "] at " + self.__statusTime + "\n" 00113 s += " Associated Ganga Job ID: " + str(self.__gangaJobId) + "\n\n" 00114 s += "Job Definition\n" 00115 s += " Script local args: '" + self.__scriptLocalArgs + "'\n" 00116 s += " Local environment: '" + self.__localEnvironment + "'\n" 00117 s += " Input Sandbox: '" + self.__localInputSandbox + "'\n" 00118 s += " Output Sandbox: '" + self.__localOutputSandbox + "'\n\n" 00119 s += "Retry Status\n" 00120 s += " Try: " + str(self.__tryNumber) + "\n" 00121 s += " Retry Args: '" + str(self.__retryArgs) + "'\n" 00122 s += " Early Fails: " + str(self.__earlyFails) + "\n" 00123 s += " Late Handled Fails: " + str(self.__lateFailsHandled) + "\n" 00124 s += " Late Unhandled Fails: " + str(self.__lateFailsUnhandled) + "\n" 00125 00126 # If there exist any output, give a summary, either for the current try, if 00127 # complete, or the previous one if currently submitted. 00128 00129 display_try = self.__tryNumber 00130 if self.__statusCode >= GID_JSC_SUBMITTED: display_try -= 1 00131 if display_try < 1: return s 00132 output_dir = self._GetTryOutputDir(display_try) 00133 s += "\nThe output for try %d can be found in\n\n %s\n\n and consists of:-\n\n" % (display_try,output_dir) 00134 list_dir = output_dir + '/../listing.tmp' 00135 os.system("cd " + output_dir + ";ls -l > " + list_dir) 00136 f = open(list_dir) 00137 for line in f: s += " " + line 00138 f.close() 00139 os.remove(list_dir) 00140 gbs_log_file_name = self._GetGbsLogFileName(display_try) 00141 gbs_log_file_spec = str(output_dir) + "/" + str(gbs_log_file_name) 00142 if not os.path.isfile(gbs_log_file_spec): return s 00143 s += "\nThe GLF (GBS Log File) %s contains:-\n\n" % gbs_log_file_name 00144 f = open(gbs_log_file_spec) 00145 for line in f: s += " " + line 00146 f.close() 00147 return s 00148 def CanClear(self):
| def python::GBSJob::GBSJob::CanClear | ( | self | ) |
Return true if can ClearErrorCounts, ClearHistory and Remove
Definition at line 149 of file GBSJob.py.
00149 : 00150 """Return true if can ClearErrorCounts, ClearHistory and Remove""" 00151 return not self.IsSubmitted() 00152 def CanKill(self):
| def python::GBSJob::GBSJob::CanKill | ( | self | ) |
Return true if can Kill
Definition at line 153 of file GBSJob.py.
00153 : 00154 """Return true if can Kill""" 00155 return self.IsSubmitted() 00156 def CanSubmit(self):
| def python::GBSJob::GBSJob::CanSubmit | ( | self | ) |
Return true if can submit Ganga job
Definition at line 157 of file GBSJob.py.
00157 : 00158 00159 """Return true if can submit Ganga job""" 00160 00161 if self.__statusCode < GID_JSC_SUBMITTED \ 00162 and self.__statusCode > GID_JSC_CANNOT_SUBMIT \ 00163 and self.GetParent().GetScriptFileSpec(): return True 00164 return False 00165 def GetEarlyFailsCount(self):
| def python::GBSJob::GBSJob::GetEarlyFailsCount | ( | self | ) |
Return Early Fails Count
Definition at line 166 of file GBSJob.py.
00166 : 00167 """Return Early Fails Count""" 00168 return self.__earlyFails 00169 def GetGangaJobId(self):
| def python::GBSJob::GBSJob::GetGangaJobId | ( | self | ) |
Return associated Ganga job Id (if any) or -1
Definition at line 170 of file GBSJob.py.
00170 : 00171 """Return associated Ganga job Id (if any) or -1 """ 00172 return self.__gangaJobId 00173 def GetGangaJob(self):
| def python::GBSJob::GBSJob::GetGangaJob | ( | self | ) |
Return associated Ganga job (if any)
Definition at line 174 of file GBSJob.py.
00174 : 00175 """Return associated Ganga job (if any)""" 00176 if self.__gangaJobId < 0: return None 00177 try: 00178 gj = Ganga.GPI.jobs(self.__gangaJobId) 00179 # I need to catch JobAccessError, but it isn't Ganga.GPI.JobAccessError so just 00180 # catch everything and hope it is O.K.! 00181 except: gj = None 00182 return gj 00183 def GetLateHandledFailsCount(self):
| def python::GBSJob::GBSJob::GetLateHandledFailsCount | ( | self | ) |
Return Late Handled Fails Count
Definition at line 184 of file GBSJob.py.
00184 : 00185 """Return Late Handled Fails Count""" 00186 return self.__lateFailsHandled 00187 def GetLateUnhandledFailsCount(self):
| def python::GBSJob::GBSJob::GetLateUnhandledFailsCount | ( | self | ) |
Return Late Unhandled Fails Count
Definition at line 188 of file GBSJob.py.
00188 : 00189 """Return Late Unhandled Fails Count""" 00190 return self.__lateFailsUnhandled 00191 def GetLocalEnvironment(self):
| def python::GBSJob::GBSJob::GetLocalEnvironment | ( | self | ) |
Return, as a comma separated list string, the environment that is local to this job.
Definition at line 192 of file GBSJob.py.
00192 : 00193 """Return, as a comma separated list string, the environment that is local to this job.""" 00194 return self.__localEnvironment 00195 def GetLocalInputSandbox(self):
| def python::GBSJob::GBSJob::GetLocalInputSandbox | ( | self | ) |
Return, as a comma separated list string, the input sandbox file list that is local to this job.
Definition at line 196 of file GBSJob.py.
00196 : 00197 """Return, as a comma separated list string, the input sandbox file list that is local to this job.""" 00198 return self.__localInputSandbox 00199 def GetLocalOutputSandbox(self):
| def python::GBSJob::GBSJob::GetLocalOutputSandbox | ( | self | ) |
Return, as a comma separated list string, the ouput sandbox file list that is local to this job.
Definition at line 200 of file GBSJob.py.
00200 : 00201 """Return, as a comma separated list string, the ouput sandbox file list that is local to this job.""" 00202 return self.__localOutputSandbox 00203 def GetPhaseCode(self):
| def python::GBSJob::GBSJob::GetPhaseCode | ( | self | ) |
Return phase code. These are broad categories of status code used by task for job statistics.
Definition at line 204 of file GBSJob.py.
00204 : 00205 """Return phase code. These are broad categories of status code used by task for job statistics.""" 00206 pc = 0 00207 if self.IsComplete(): 00208 pc = GID_JPC_DONE_NFAIL 00209 if not self.IsSuccessful(): pc = GID_JPC_DONE_FAIL 00210 elif self.IsReady(): 00211 pc = GID_JPC_READY_NRETRY 00212 if self.GetTryNumber() > 0: pc = GID_JPC_READY_RETRY 00213 elif self.IsSubmitted(): 00214 pc = GID_JPC_SUBMIT_RUN 00215 if not self.IsRunning(): pc = GID_JPC_SUBMIT_NRUN 00216 else: 00217 pc = GID_JPC_NREADY_HOLD 00218 if not self.IsHeld(): pc = GID_JPC_NREADY_NHOLD 00219 return pc 00220 00221 def GetRetryArgs(self):
| def python::GBSJob::GBSJob::GetRetryArgs | ( | self | ) |
Returm as a string current retry args i.e. as determined from previous try (or empty for first try)
Definition at line 222 of file GBSJob.py.
00222 : 00223 """Returm as a string current retry args i.e. as determined from previous try (or empty for first try)""" 00224 return self.__retryArgs 00225 def GetScriptLocalArgs(self):
| def python::GBSJob::GBSJob::GetScriptLocalArgs | ( | self | ) |
Return (as a string) the comma list of application script args that are local to this job
Definition at line 226 of file GBSJob.py.
00226 : 00227 """Return (as a string) the comma list of application script args that are local to this job""" 00228 return self.__scriptLocalArgs 00229 def GetStatusCode(self):
| def python::GBSJob::GBSJob::GetStatusCode | ( | self | ) |
Return status code
Definition at line 230 of file GBSJob.py.
00230 : 00231 """Return status code""" 00232 return self.__statusCode 00233 def GetStatusText(self):
| def python::GBSJob::GBSJob::GetStatusText | ( | self | ) |
Return status text which qualifies the Status Code
Definition at line 234 of file GBSJob.py.
00234 : 00235 """Return status text which qualifies the Status Code""" 00236 return self.__statusText 00237 def GetStatusTime(self):
| def python::GBSJob::GBSJob::GetStatusTime | ( | self | ) |
Return date time when current status code and text were achieved
Definition at line 238 of file GBSJob.py.
00238 : 00239 """Return date time when current status code and text were achieved""" 00240 return self.__statusTime 00241 def GetTryNumber(self):
| def python::GBSJob::GBSJob::GetTryNumber | ( | self | ) |
Return Try Number (0 before first try)
Definition at line 242 of file GBSJob.py.
00242 : 00243 """Return Try Number (0 before first try)""" 00244 return self.__tryNumber 00245 def IsComplete(self):
| def python::GBSJob::GBSJob::IsComplete | ( | self | ) |
Return true if job is complete (Successful or Failed)
Definition at line 246 of file GBSJob.py.
00246 : 00247 """Return true if job is complete (Successful or Failed)""" 00248 return self.__statusCode <= GID_JSC_COMPLETE 00249 def IsFailure(self):
| def python::GBSJob::GBSJob::IsFailure | ( | self | ) |
Return true if job is failure
Definition at line 250 of file GBSJob.py.
00250 : 00251 """Return true if job is failure""" 00252 return self.__statusCode == GID_JSC_FAILED 00253 def IsHeld(self):
| def python::GBSJob::GBSJob::IsHeld | ( | self | ) |
Return true if job is Held
Definition at line 254 of file GBSJob.py.
00254 : 00255 """Return true if job is Held""" 00256 return self.__statusCode == GID_JSC_HELD 00257 def IsNotReady(self):
| def python::GBSJob::GBSJob::IsNotReady | ( | self | ) |
| def python::GBSJob::GBSJob::IsReady | ( | self | ) |
Return true if job is ready to submit
Definition at line 262 of file GBSJob.py.
00262 : 00263 """Return true if job is ready to submit""" 00264 return self.__statusCode > GID_JSC_CANNOT_SUBMIT and self.__statusCode < GID_JSC_SUBMITTED 00265 def IsRunning(self):
| def python::GBSJob::GBSJob::IsRunning | ( | self | ) |
Return true if job is submitted and associated Ganga Job status is running
Definition at line 266 of file GBSJob.py.
00266 : 00267 """Return true if job is submitted and associated Ganga Job status is running""" 00268 return self.__statusCode >= GID_JSC_SUBMITTED and re.search(r"Ganga status:running",self.__statusText) 00269 def IsSubmitted(self):
| def python::GBSJob::GBSJob::IsSubmitted | ( | self | ) |
Return true if job is submitted
Definition at line 270 of file GBSJob.py.
00270 : 00271 """Return true if job is submitted""" 00272 return self.__statusCode >= GID_JSC_SUBMITTED 00273 def IsSuccessful(self):
| def python::GBSJob::GBSJob::IsSuccessful | ( | self | ) |
Return true if job is successful
Definition at line 274 of file GBSJob.py.
00274 : 00275 """Return true if job is successful""" 00276 return self.__statusCode == GID_JSC_SUCCEEDED 00277 00278 #----------------------------------------------------------------------------------------------- 00279 # 00280 # User Callable Methods Setters 00281 # 00282 #----------------------------------------------------------------------------------------------- 00283 00284 def Analyse(self,update = True):
| def python::GBSJob::GBSJob::Analyse | ( | self, | ||
update = True | ||||
| ) |
Perform job termination analysis and optionally apply the results.
Definition at line 285 of file GBSJob.py.
00285 : 00286 00287 """Perform job termination analysis and optionally apply the results.""" 00288 00289 # Quit if nothing to analyse. 00290 if self.__statusCode != GID_JSC_WAITING_ANALYSIS: return 00291 00292 analyser = GetModelRegistry().CreateObject(self.GetModel(),"JobAnalyser","Solomon",self) 00293 analyser.Analyse(self) 00294 if update: 00295 analyser.Apply() 00296 gj = self.GetGangaJob() 00297 if gj: gj.remove() 00298 self.__gangaJobId = -1 00299 self.GetParent().RefreshJobStats() 00300 self.Write() 00301 def ClearErrorCounts(self,warn=True):
| def python::GBSJob::GBSJob::ClearErrorCounts | ( | self, | ||
warn = True | ||||
| ) |
If allowed, clear error counts, but leave retry history intact Can only applied to jobs that are ready to be submitted or that have failed and in this case has the side effect of setting the status back to RETRY
Definition at line 302 of file GBSJob.py.
00302 : 00303 00304 """If allowed, clear error counts, but leave retry history intact 00305 00306 Can only applied to jobs that are ready to be submitted or that have failed and 00307 in this case has the side effect of setting the status back to RETRY""" 00308 00309 if self.CanClear(): 00310 self.__earlyFails = 0 00311 self.__lateFailsHandled = 0 00312 self.__lateFailsUnhandled = 0 00313 if self.__statusCode == GID_JSC_FAILED: 00314 self.__statusText = "Retrying after users cleared errors" 00315 self._SetStatusCode(GID_JSC_RETRY) 00316 self.GetParent().RefreshJobStats() 00317 self.Write() 00318 return 00319 if warn: print "Cannot ClearErrorCounts on job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode)) 00320 def ClearHistory(self,confirm=True,warn=True):
| def python::GBSJob::GBSJob::ClearHistory | ( | self, | ||
confirm = True, |
||||
warn = True | ||||
| ) |
Completely clear all processing history so that processing begins again from scratch. The only processing state that is retained is that if job was held it will still be.
Definition at line 321 of file GBSJob.py.
00321 : 00322 00323 """Completely clear all processing history so that processing begins again from scratch. 00324 00325 The only processing state that is retained is that if job was held it will still be.""" 00326 00327 if not self.CanClear(): 00328 if warn: print "Cannot ClearHistory on job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode)) 00329 return 00330 if confirm: 00331 ans = raw_input("Are you sure you want to clear the history of job " + self.GetName() + " ?y[n]") 00332 if not re.search(r"^y",ans,re.I): 00333 print "History not cleared." 00334 return 00335 child_dir = self.GetStoreLocation("child_dir") 00336 if os.path.isdir(child_dir): 00337 if shutil.rmtree(child_dir): 00338 Log(logger.ERROR,"Failed to remove obsolete job history directory:" + child_dir) 00339 self.__tryNumber = 0 00340 self.__retryArgs = "" 00341 self.__gangaJobId = -1 00342 if not self.IsHeld(): 00343 self.__statusText = "Ready to run" 00344 self._SetStatusCode(GID_JSC_NEW) 00345 self.ClearErrorCounts() 00346 return 00347 def Hold(self,warn=True):
| def python::GBSJob::GBSJob::Hold | ( | self, | ||
warn = True | ||||
| ) |
Hold job, so that it won't be submitted. Warn, if requested, if job not suitable for holding
Definition at line 348 of file GBSJob.py.
00348 : 00349 00350 """Hold job, so that it won't be submitted. Warn, if requested, if job not suitable for holding""" 00351 00352 if self.IsReady(): 00353 self.__statusText = "Held by user" 00354 self._SetStatusCode(GID_JSC_HELD) 00355 self.GetParent().RefreshJobStats() 00356 self.Write() 00357 return 00358 if warn: print "Cannot HOLD job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode)) 00359 def Kill(self,warn=True):
| def python::GBSJob::GBSJob::Kill | ( | self, | ||
warn = True | ||||
| ) |
Kill job that has been submitted to Ganga. Warn, if requested, if job not suitable for killing
Definition at line 360 of file GBSJob.py.
00360 : 00361 00362 """Kill job that has been submitted to Ganga. Warn, if requested, if job not suitable for killing""" 00363 00364 if not self.CanKill(): 00365 if warn: print "Cannot KILL job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode)) 00366 return 00367 try: 00368 gj = Ganga.GPI.jobs(self.__statusCode) 00369 # I need to catch JobAccessError, but it isn't Ganga.GPI.JobAccessError so just 00370 # catch everything and hope it is O.K.! 00371 except: 00372 Log(logger.ERROR,"Have lost Ganga job with ID " + str(self.__statusCode)\ 00373 + " for job " + self.GetName()) 00374 return 00375 # Kill job 00376 gj.kill() 00377 self.UpdateStatus() 00378 def Release(self,warn=True):
| def python::GBSJob::GBSJob::Release | ( | self, | ||
warn = True | ||||
| ) |
Release job, so that it can be submitted. Warn, if requested, if job not suitable for releasing
Definition at line 379 of file GBSJob.py.
00379 : 00380 00381 """Release job, so that it can be submitted. Warn, if requested, if job not suitable for releasing""" 00382 00383 if self.IsHeld(): 00384 self.__statusCode = GID_JSC_NEW 00385 if self.__tryNumber: self.__statusCode = GID_JSC_RETRY 00386 self._SetStatusText("Ready to run") 00387 self.GetParent().RefreshJobStats() 00388 self.Write() 00389 return 00390 if warn: print "Cannot RELEASE job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode)) 00391 def Remove(self,confirm=True,warn=True):
| def python::GBSJob::GBSJob::Remove | ( | self, | ||
confirm = True, |
||||
warn = True | ||||
| ) |
Completely remove job.
Definition at line 392 of file GBSJob.py.
00392 : 00393 00394 """Completely remove job.""" 00395 00396 if not self.CanClear(): 00397 if warn: print "Cannot Remove job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode)) 00398 return 00399 if confirm: 00400 ans = raw_input("Are you sure you want to remove job " + self.GetName() + " ?y[n]") 00401 if not re.search(r"^y",ans,re.I): 00402 print "Not removed." 00403 return 00404 child_dir = self.GetStoreLocation("child_dir") 00405 if os.path.isdir(child_dir): 00406 if shutil.rmtree(child_dir): 00407 Log(logger.ERROR,"Failed to remove obsolete job history directory:" + child_dir) 00408 state_file = self.GetStoreLocation("self") 00409 if os.remove(state_file): 00410 Log(logger.ERROR,"Failed to remove state file:" + state_file) 00411 # Sneak into parent task's job list and remove entry. 00412 del self.GetParent()._jobManagers[self.GetName()] 00413 self.GetParent().RefreshJobStats() 00414 return 00415 def SetLocalEnvironment(self,env_str):
| def python::GBSJob::GBSJob::SetLocalEnvironment | ( | self, | ||
| env_str | ||||
| ) |
Set, as a comma separated list string, the environment that local to this job..
e.g. job.SetLocalEnvironment('var1=123,var2=a string with spaces,var3=456')Definition at line 416 of file GBSJob.py.
00416 : 00417 00418 """Set, as a comma separated list string, the environment that local to this job.. 00419 00420 e.g. job.SetLocalEnvironment('var1=123,var2=a string with spaces,var3=456')""" 00421 00422 d = GBSUtilities.ParseEnvStr(env_str) 00423 if env_str and not d: print "Cannot parse environment string: '" + env_str + "'" 00424 else: 00425 self.__localEnvironment = env_str 00426 self.Write() 00427 def SetLocalInputSandbox(self,in_sbox_str):
| def python::GBSJob::GBSJob::SetLocalInputSandbox | ( | self, | ||
| in_sbox_str | ||||
| ) |
Set, as a comma separated list string, the input sandbox file list that is local to this job.
e.g. task.SetLocalInputSandbox('/home/users/west/my_input_data.dat,../my_script.sh')Definition at line 428 of file GBSJob.py.
00428 : 00429 00430 """Set, as a comma separated list string, the input sandbox file list that is local to this job. 00431 00432 e.g. task.SetLocalInputSandbox('/home/users/west/my_input_data.dat,../my_script.sh')""" 00433 00434 (ok,str) = GBSUtilities.ProcessSandboxSetup(self,in_sbox_str,"input") 00435 if ok: 00436 self.__localInputSandbox = str 00437 self.Write() 00438 def SetLocalOutputSandbox(self,out_sbox_str):
| def python::GBSJob::GBSJob::SetLocalOutputSandbox | ( | self, | ||
| out_sbox_str | ||||
| ) |
Set, as a comma separated list string, the output sandbox file list that is local to this job.
e.g. task.SetLocalOutputSandbox('my_output_data.dat,my_output.log')Definition at line 439 of file GBSJob.py.
00439 : 00440 00441 """Set, as a comma separated list string, the output sandbox file list that is local to this job. 00442 00443 e.g. task.SetLocalOutputSandbox('my_output_data.dat,my_output.log')""" 00444 00445 (ok,str) = GBSUtilities.ProcessSandboxSetup(self,out_sbox_str,"output") 00446 if ok: 00447 self.__localOutputSandbox = str 00448 self.Write() 00449 def SetScriptLocalArgs(self,arg_str):
| def python::GBSJob::GBSJob::SetScriptLocalArgs | ( | self, | ||
| arg_str | ||||
| ) |
Set (as a string) the comma separated list of application script args that are local to this job.
e.g. job.SetScriptLocalArgs('123,a string with spaces,456')Definition at line 450 of file GBSJob.py.
00450 : 00451 00452 """Set (as a string) the comma separated list of application script args that are local to this job. 00453 00454 e.g. job.SetScriptLocalArgs('123,a string with spaces,456')""" 00455 00456 self.__scriptLocalArgs = arg_str 00457 self.Write() 00458 def Submit(self,Perusable=False):
| def python::GBSJob::GBSJob::Submit | ( | self, | ||
Perusable = False | ||||
| ) |
Submit job if permitted and return True if successful.
Definition at line 459 of file GBSJob.py.
00459 : 00460 00461 """Submit job if permitted and return True if successful.""" 00462 00463 my_manager = self.GetParent() 00464 00465 # Get the user application script 00466 script_spec = my_manager.GetScriptFileSpec() 00467 if not script_spec: 00468 print "Cannot submit job, no user application script assigned to Task '" + my_manager.GetName() + "'" 00469 return False 00470 script_name = my_manager.GetScriptFileName() 00471 00472 if not self.CanSubmit(): 00473 print "Cannot submit job " + self.GetName() + ":" \ 00474 " Status: " + GIDStringForJSC(self.__statusCode) \ 00475 + " [" + self.GetStatusText() + "]" 00476 return False 00477 00478 if not my_manager.IsAuthorisedToSubmit(): return False 00479 00480 # Increment try number and prepare output directory to receive results 00481 00482 self.__tryNumber += 1 00483 self.MakeChildDirectory() 00484 output_dir = self._GetTryOutputDir() 00485 # It ought not to exist already, but in case it is, remove it first. 00486 if os.path.isdir(output_dir): 00487 Log(logger.SYNOPSIS,"Removing obsolete directory for job output:" + str(output_dir)) 00488 if shutil.rmtree(output_dir): 00489 Log(logger.ERROR,"Failed to remove obsolete directory for job output:" + str(output_dir)) 00490 Log(logger.SYNOPSIS,"Creating directory for job output:" + str(output_dir)) 00491 if os.mkdir(output_dir,0755): 00492 Log(logger.ERROR,"Failed to create directory for job output:" + str(output_dir)) 00493 00494 # Prepare GBS Log File in output directory 00495 gbs_log_file_name = self._GetGbsLogFileName() 00496 gbs_log_file_spec = str(output_dir) + "/" + str(gbs_log_file_name) 00497 os.system("echo " + timestamp() + " INFO GBS_JOB_SUBMIT submitting job > " + gbs_log_file_spec) 00498 00499 # Prepare arg list: GLF file, appl. script, global args, local args. 00500 args = [] 00501 args.append(gbs_log_file_name) 00502 args.append(script_name) 00503 00504 # Collect up the application script args, taking account of ecaped commas. 00505 script_args = [] 00506 for arglist in [my_manager.GetScriptGlobalArgs(),self.GetScriptLocalArgs()]: 00507 if arglist: GBSUtilities.ParseCommaSepList(arglist,script_args) 00508 00509 # Passing args with anything but alphanumeric data into the 00510 # application script is a pain. The logical approach, placing 00511 # each arg as a separate element in 'args' works for the local 00512 # back-end but is broken for LCG where all the args are first 00513 # interpolated and then re-parsed to define the argument list 00514 # that gets passed into the script. Any string containing 00515 # white space gets broken apart and anything containing "(" causes 00516 # a syntax error! So, instead we manufacture the wrapper 00517 # script on the fly that will call the application script 00518 # which gives us total control over the way arguments get passed in. 00519 00520 wrapper_file_spec = str(output_dir) + "/" + "gbs_job_wrapper.sh" 00521 00522 #Copy the leading part of the wrapper 00523 os.system("cp $GBS_HOME/python/gbs_job_wrapper_part_1.sh " + wrapper_file_spec) 00524 00525 # Manufacture the bit where the arguments get passed in 00526 wrapper = file(wrapper_file_spec,'a') 00527 num_arg = 0 00528 arglist = "" 00529 for arg in script_args: 00530 num_arg += 1 00531 wrapper.write("arg" + str(num_arg) + "=\"" + arg + "\"\n") 00532 arglist += "\"$arg" + str(num_arg) +"\" " 00533 wrapper.write("/usr/bin/time --format 'CPU User: %U sec, System: %S sec. Elapse: %E. ' --output=$GBS_HOME/time_output_$$ "\ 00534 + "./$user_script " + arglist + "\n") 00535 wrapper.close() 00536 00537 #Copy the trailing part of the wrapper 00538 os.system("cat $GBS_HOME/python/gbs_job_wrapper_part_2.sh >>" + wrapper_file_spec) 00539 00540 # Prepare the GBS environment 00541 env = {} 00542 env['GBS_MODE'] = my_manager.GetMode() 00543 env['GBS_RETRY_COUNT'] = str(self.__tryNumber -1) 00544 retry_arg_no = 0 00545 for retry_arg in self.__retryArgs.split(): 00546 retry_arg_no += 1 00547 env['GBS_RETRY_ARG_' + str(retry_arg_no)] = retry_arg 00548 env['GBS_NUM_RETRY_ARGS'] = str(retry_arg_no) 00549 00550 # Prepare the user environment 00551 for env_str in [my_manager.GetGlobalEnvironment(),self.GetLocalEnvironment()]: 00552 if not env_str: continue 00553 for var,val in GBSUtilities.ParseEnvStr(env_str).iteritems(): 00554 env[var] = val 00555 00556 # Prepare the executable 00557 exe = Ganga.GPI.Executable(exe = Ganga.GPI.File(wrapper_file_spec), env = env,args = args) 00558 00559 # Prepare sandboxes 00560 inputsandbox = [] 00561 inputsandbox.append(script_spec) 00562 inputsandbox.append(gbs_log_file_spec) 00563 env_cmds = "" 00564 for e_name,e_value in env.iteritems(): 00565 env_cmds += e_name + "=" + str(e_value) + ";" 00566 for obj in [my_manager,self]: 00567 sb_file_list = [] 00568 if obj is self: GBSUtilities.ParseCommaSepList(self.GetLocalInputSandbox(),sb_file_list) 00569 else: GBSUtilities.ParseCommaSepList(my_manager.GetGlobalInputSandbox(),sb_file_list) 00570 input_dir = obj.GetStoreLocation("child_dir") + "/InputSandbox" 00571 for sb_file in sb_file_list: inputsandbox.append(input_dir + "/" + sb_file) 00572 outputsandbox = ['gbs_output_sandbox.tar.gz'] 00573 outputsandbox.append(gbs_log_file_name) 00574 for obj in [my_manager,self]: 00575 sb_file_list = [] 00576 if obj is self: GBSUtilities.ParseCommaSepList(self.GetLocalOutputSandbox(),sb_file_list) 00577 else: GBSUtilities.ParseCommaSepList(my_manager.GetGlobalOutputSandbox(),sb_file_list) 00578 for sb_file in sb_file_list: 00579 if sb_file[0] == "$": 00580 inp = os.popen(env_cmds + sb_file[1:]) 00581 sb_file = inp.read() 00582 inp.close() 00583 outputsandbox.append(sb_file) 00584 00585 # Prepare backend 00586 backend = my_manager.GetBackend() 00587 queue = "" 00588 mo = re.search(r"(.*?):(.*)",backend) 00589 if mo: 00590 (backend,queue) = mo.groups() 00591 if backend == "Local": queue = "" 00592 00593 # Prepare the job 00594 gj = Ganga.GPI.Job(backend=backend) 00595 Ganga.GPI.jobtree.add(gj,my_manager.GetGangaTreeDir()) 00596 if queue : 00597 if backend == "PBS": gj.backend.queue = queue 00598 if backend == "LCG": gj.backend.CE = queue 00599 # If GLITE_ENABLE is enabled select the correct middleware 00600 if backend == "LCG" and Ganga.GPI.config['LCG']['GLITE_ENABLE']: 00601 gj.backend.middleware = 'GLITE' 00602 if Perusable: gj.backend.perusable = True 00603 elif Perusable: print "Cannot select perusable; backend not LCG/GLITE" 00604 gj.application = exe 00605 gj.inputsandbox = inputsandbox 00606 gj.outputsandbox = outputsandbox 00607 if GetLoggerThreshold() <= logger.SYNOPSIS: print "Contents of Ganga job:-\n\n" + str(gj) 00608 try: 00609 print "Submitting ",self.GetName() 00610 timeout_call(gj.submit,2*60) 00611 except Exception,inst: 00612 Log(logger.ERROR,"Caught exception:" + str(inst) +" Failed to complete Ganga job submit") 00613 self.__tryNumber -= 1 00614 return False 00615 self.__gangaJobId = gj.id 00616 self.__statusText = "Ganga status:" + gj.status 00617 self._SetStatusCode(gj.id) 00618 self.GetParent().RefreshJobStats() 00619 self.Write() 00620 return True 00621 00622 def UpdateStatus(self):
| def python::GBSJob::GBSJob::UpdateStatus | ( | self | ) |
Get latest status. May involve checking Ganga job and retrieving output.
Definition at line 623 of file GBSJob.py.
00623 : 00624 00625 """Get latest status. May involve checking Ganga job and retrieving output.""" 00626 00627 if not self.IsSubmitted(): return 00628 try: 00629 gj = Ganga.GPI.jobs(self.__statusCode) 00630 # I need to catch JobAccessError, but it isn't Ganga.GPI.JobAccessError so just 00631 # catch everything and hope it is O.K.! 00632 except: 00633 gj = None 00634 00635 # Deal with lost Ganga job, ought not to happen, but then lots of things in life ought not to happen. 00636 if not gj: 00637 Log(logger.ERROR,"Have lost Ganga job with ID " + str(self.__statusCode)\ 00638 + " for job " + self.GetName()) 00639 # Treat as if status has reverted to "new" 00640 g_status = 'new' 00641 else: 00642 g_status = gj.status 00643 00644 # Deal with cases when job hasn't finished 00645 if g_status == "completing" \ 00646 or g_status == "ready" \ 00647 or g_status == "running" \ 00648 or g_status == "scheduled" \ 00649 or g_status == "submitted" \ 00650 or g_status == "waiting": 00651 self._SetStatusText("Ganga status:" + g_status) 00652 00653 # If Ganga job does not appear stalled, just record any change in status. 00654 t_stall_hours = (time.time() \ 00655 - time.mktime(time.strptime(self.__statusTime.strip(),"%Y-%m-%d %H:%M:%S")))/3600. 00656 # Set a 3 hour time limit except for scheduled and running where 3 days is allowed. 00657 t_timeout_hours = 3. 00658 if g_status == "scheduled" or g_status == "running": t_timeout = 3.*24. 00659 if t_stall_hours < t_timeout_hours: 00660 self.GetParent().RefreshJobStats() 00661 self.Write() 00662 return 00663 # Ganga job has stalled; kill it and treat as new. 00664 self._SetStatusText("Ganga job %d stalled for %5.1f hours; killed" % (gj.id,t_stall_hours)) 00665 Log(logger.ERROR,self.__statusText) 00666 gj.remove() 00667 g_status = 'new' 00668 00669 # If status has reverted to "new" submit never took place 00670 if g_status == 'new': 00671 self.__tryNumber -= 1 00672 self.__statusCode = GID_JSC_NEW 00673 self.__gangaJobId = -1 00674 if self.__tryNumber > 1: self.__statusCode = GID_JSC_RETRY 00675 self._SetStatusText("Ready to run") 00676 self.GetParent().RefreshJobStats() 00677 self.Write() 00678 return 00679 00680 00681 # For all other cases move over output data, unpack gbs_output_sandbox.tar.gz (if present) and save Ganga status 00682 output_dir = self._GetTryOutputDir() 00683 Log(logger.SYNOPSIS,"Retrieving output from " + str(gj.outputdir) + " into " + str(output_dir)) 00684 if not os.listdir(gj.outputdir): 00685 Log(logger.ERROR,"No output files in " + str(gj.outputdir)) 00686 elif os.system("cp " + str(gj.outputdir) + "* " + str(output_dir)): 00687 Log(logger.ERROR,"Failed to retrieve output from " + str(gj.outputdir) + " into " + str(output_dir)) 00688 if gj.backend.perusable: 00689 Log(logger.INFO,"Job is perusable, attempting to recover stdout, as stdout.perusable") 00690 cmd = "glite-wms-job-perusal --get -f stdout --all --noint " \ 00691 + gj.backend.id + " > " + output_dir + "/stdout.perusable" 00692 if os.system(cmd): 00693 Log(logger.ERROR,"Failed to retrieve persuable output using ",cmd) 00694 gbs_output_sandbox = output_dir + "/gbs_output_sandbox.tar.gz" 00695 if os.path.isfile(gbs_output_sandbox): 00696 Log(logger.SYNOPSIS,"Unpacking " + gbs_output_sandbox) 00697 if os.system("cd " + output_dir + "; tar zxf gbs_output_sandbox.tar.gz"): 00698 Log(logger.ERROR,"Failed to unpack " + gbs_output_sandbox) 00699 else: 00700 os.remove(gbs_output_sandbox) 00701 gangaStatusFile = output_dir + "/gbs_ganga.status" 00702 f = open(gangaStatusFile,"w") 00703 f.write(str(gj)) 00704 f.close() 00705 self.__statusCode = GID_JSC_WAITING_ANALYSIS 00706 self._SetStatusText("Waiting to analyse. Ganga exist status was " + g_status) 00707 00708 # Now analyse the results and decide what happens next 00709 self.Analyse() 00710 # No need to write self, the Analyse() call will have done that. 00711 00712 ###### Private Methods (not user callable) ######
| def python::GBSJob::GBSJob::_GetTryOutputDir | ( | self, | ||
try_req = 0 | ||||
| ) | [private] |
Return the file spec for the current try (or supplied try) output directory
Definition at line 715 of file GBSJob.py.
00715 : 00716 00717 """Return the file spec for the current try (or supplied try) output directory""" 00718 00719 try_use = self.__tryNumber 00720 if try_req > 0: try_use = try_req 00721 return self.GetStoreLocation("child_dir") + "/try_" + str(try_use).zfill(3) 00722 00723 def _GetGbsLogFileName(self, try_req = 0):
| def python::GBSJob::GBSJob::_GetGbsLogFileName | ( | self, | ||
try_req = 0 | ||||
| ) | [private] |
Return the name of the GBS Log File for the current try (or supplied try).
Definition at line 724 of file GBSJob.py.
00724 : 00725 00726 """Return the name of the GBS Log File for the current try (or supplied try).""" 00727 try_use = self.__tryNumber 00728 if try_req > 0: try_use = try_req 00729 return "gbs_" + self.GetParent().GetName() + "_" + self.GetName() + "_" + str(try_use) + ".log" 00730 00731 # Methods used by JobAnalyser to update def _IncrementEarlyFailsCount(self): self.__earlyFails += 1
| def python::GBSJob::GBSJob::_IncrementEarlyFailsCount | ( | self | ) | [private] |
| def python::GBSJob::GBSJob::_IncrementLateHandledFailsCount | ( | self | ) | [private] |
| def python::GBSJob::GBSJob::_IncrementLateUnhandledFailsCount | ( | self | ) | [private] |
| def python::GBSJob::GBSJob::_SetRetryArgs | ( | self, | ||
| value | ||||
| ) | [private] |
Definition at line 735 of file GBSJob.py.
00735 : self.__retryArgs = value 00736 00737 # Methods that record state changes. These also update state time stamp and add 00738 # entry to GLF if it exists.
| def python::GBSJob::GBSJob::_SetStatusCode | ( | self, | ||
| value | ||||
| ) | [private] |
Definition at line 739 of file GBSJob.py.
00739 : 00740 if self.__statusCode == value: return 00741 self.__statusCode = value 00742 self.__statusTime = timestamp() 00743 gbs_log_file_spec = self._GetTryOutputDir() + "/" + self._GetGbsLogFileName() 00744 if os.path.isfile(gbs_log_file_spec): 00745 os.system("echo '" + timestamp() + "INFO State change " + GIDStringForJSC(self.__statusCode) \ 00746 + " [" + self.__statusText + "]' >> " + gbs_log_file_spec) def _SetStatusText(self,value):
| def python::GBSJob::GBSJob::_SetStatusText | ( | self, | ||
| value | ||||
| ) | [private] |
Definition at line 747 of file GBSJob.py.
00747 : 00748 if self.__statusText == value: return 00749 self.__statusText = value 00750 self.__statusTime = timestamp() 00751 gbs_log_file_spec = self._GetTryOutputDir() + "/" + self._GetGbsLogFileName() 00752 if os.path.isfile(gbs_log_file_spec): 00753 os.system("echo '" + timestamp() + "INFO State change " + GIDStringForJSC(self.__statusCode) \ 00754 + " [" + self.__statusText + "]' >> " + gbs_log_file_spec) 00755
python::GBSJob::GBSJob::__tryNumber [private] |
python::GBSJob::GBSJob::__earlyFails [private] |
python::GBSJob::GBSJob::__statusCode [private] |
python::GBSJob::GBSJob::__statusText [private] |
python::GBSJob::GBSJob::__statusTime [private] |
python::GBSJob::GBSJob::__retryArgs [private] |
python::GBSJob::GBSJob::__gangaJobId [private] |
1.5.4