python::GBSJob::GBSJob Class Reference

List of all members.

Public Member Functions

def __init__
def GetType
def __repr__
def AsString
def CanClear
def CanKill
def CanSubmit
def GetEarlyFailsCount
def GetGangaJobId
def GetGangaJob
def GetLateHandledFailsCount
def GetLateUnhandledFailsCount
def GetLocalEnvironment
def GetLocalInputSandbox
def GetLocalOutputSandbox
def GetPhaseCode
def GetRetryArgs
def GetScriptLocalArgs
def GetStatusCode
def GetStatusText
def GetStatusTime
def GetTryNumber
def IsComplete
def IsFailure
def IsHeld
def IsNotReady
def IsReady
def IsRunning
def IsSubmitted
def IsSuccessful
def Analyse
def ClearErrorCounts
def ClearHistory
def Hold
def Kill
def Release
def Remove
def SetLocalEnvironment
def SetLocalInputSandbox
def SetLocalOutputSandbox
def SetScriptLocalArgs
def Submit
def UpdateStatus

Private Member Functions

def _DoMemberIO
def _GetTryOutputDir
def _GetGbsLogFileName
def _IncrementEarlyFailsCount
def _IncrementLateHandledFailsCount
def _IncrementLateUnhandledFailsCount
def _SetRetryArgs
def _SetStatusCode
def _SetStatusText

Private Attributes

 __tryNumber
 __earlyFails
 __lateFailsHandled
 __lateFailsUnhandled
 __scriptLocalArgs
 __localEnvironment
 __localInputSandbox
 __localOutputSandbox
 __statusCode
 __statusText
 __statusTime
 __retryArgs
 __gangaJobId


Detailed Description

Object to submit, and if necessary resubmit a job until it succeeds or needs user intervention.

This class is responsible for submitting and if necessary
resubmitting a job until it is successful or user intervention is
unavoidible.
  

Definition at line 17 of file GBSJob.py.


Member Function Documentation

def python::GBSJob::GBSJob::__init__ (   self,
  name,
  parent,
  model,
  model_args 
)

Definition at line 30 of file GBSJob.py.

00030                                                    :
00031         self.__tryNumber          = 0  # 0 before first try
00032         self.__earlyFails         = 0  # Symptomatic of a system failure
00033         self.__lateFailsHandled   = 0  # Job signalling retry
00034         self.__lateFailsUnhandled = 0  # Job aborting without signal
00035         self.__scriptLocalArgs    = "" # Application script local args
00036         self.__localEnvironment   = "" # The environment that are local to this job.
00037         self.__localInputSandbox  = "" # The list of input sandbox files that are local to this job.
00038         self.__localOutputSandbox = "" # The list of output sandbox files that are local to this job.
00039         
00040         # statusCode:  See GBSIdCodes
00041         self.__statusCode         = GID_JSC_NEW
00042         # Associated text to qualify statusCode. Has specific meaning when
00043         # __statusCode has the following values which correspond to the values
00044         # passed back from the application script via the GBS Log File:-
00045         #
00046         #  GID_JSC_SUCCEEDED  The list of output files
00047         #  GID_JSC_FAILED     Diagnostic message about the failure
00048         #  GID_JSC_RETRY      The retry args for next attempt.
00049         self.__statusText         = "Ready to run"
00050         self.__statusTime         = timestamp()
00051         # The current retry args i.e. as determined from previous try (or empty for first try)
00052         self.__retryArgs   = ""
00053         self.__gangaJobId  = -1
00054         
00055         
00056         GBSObject.__init__(self,name,parent,model)
00057 
    def _DoMemberIO(self,ioh):

def python::GBSJob::GBSJob::_DoMemberIO (   self,
  ioh 
) [private]

Definition at line 58 of file GBSJob.py.

00058                              :
00059         self.__tryNumber           = ioh("Try Number","i",self.__tryNumber)
00060         self.__retryArgs           = ioh("Current Retry Args","s",self.__retryArgs)
00061         self.__earlyFails          = ioh("Early Fail Count","i",self.__earlyFails)
00062         self.__lateFailsHandled    = ioh("Late Handled Fails Count","i",self.__lateFailsHandled)
00063         self.__lateFailsUnhandled  = ioh("Late Unhandled Fails Count","i",self.__lateFailsUnhandled)
00064         self.__scriptLocalArgs     = ioh("Script Local Args","s",self.__scriptLocalArgs)
00065         self.__localEnvironment    = ioh("+Local environment","s",self.__localEnvironment)
00066         self.__localInputSandbox   = ioh("+Local Input Sandbox List",    "s",self.__localInputSandbox)
00067         self.__localOutputSandbox  = ioh("+Local Output Sandbox List",   "s",self.__localOutputSandbox)
00068         self.__statusCode          = ioh("Status Code","i",self.__statusCode)
00069         self.__statusText          = ioh("Status Text","s",self.__statusText)
00070         self.__statusTime          = ioh("+Status Time Stamp","s",self.__statusTime)
00071         self.__gangaJobId          = ioh("+Ganga Job Id","i",self.__gangaJobId)
00072 
00073         # During transition may need to infer __gangaJobId from __statusCode
00074         if self.__statusCode >= GID_JSC_SUBMITTED: self.__gangaJobId = self.__statusCode
00075 
00076         GBSObject._DoMemberIO(self,ioh)
00077 
    def GetType(self): return "GBSJob"

def python::GBSJob::GBSJob::GetType (   self  ) 

Definition at line 78 of file GBSJob.py.

00078                      : return "GBSJob"
00079 

def python::GBSJob::GBSJob::__repr__ (   self  ) 

Definition at line 80 of file GBSJob.py.

00080                       : return self.AsString()
00081 
00082 
00083 #-----------------------------------------------------------------------------------------------
00084 #
00085 #  User Callable Methods Setters
00086 #
00087 #-----------------------------------------------------------------------------------------------
00088 

def python::GBSJob::GBSJob::AsString (   self,
  level = "Full" 
)

Return string description.

 Return string description at the following levels:-
"Brief"    one line summary suitable for rows in tables
"Heading"  one line summary suitable as heading for "Brief"
"Full"     full desciption including value of every data member

Definition at line 89 of file GBSJob.py.

00089                                      :
00090 
00091         """Return string description.
00092 
00093          Return string description at the following levels:-
00094         "Brief"    one line summary suitable for rows in tables
00095         "Heading"  one line summary suitable as heading for "Brief"
00096         "Full"     full desciption including value of every data member"""
00097 
00098         if ( level == "Heading"):
00099             s  = "Name".ljust(20)
00100             s += "Status".ljust(23)
00101             s += "Input".ljust(40)
00102             s += "Status Details"
00103             return s
00104         if ( level == "Brief"):
00105             s  = self.GetName().ljust(20)
00106             s += GIDStringForJSC(self.__statusCode).ljust(23)
00107             s += (self.__scriptLocalArgs + ";" + self.__localEnvironment).ljust(40)
00108             s += "[" + self.__statusText + "]"
00109             return s
00110 
00111         s  = GBSObject.__repr__(self) + "\n\n"
00112         s += "Status: " + GIDStringForJSC(self.__statusCode) +  " [" + self.__statusText + "] at " + self.__statusTime + "\n"
00113         s += "  Associated Ganga Job ID: " + str(self.__gangaJobId)  + "\n\n"
00114         s += "Job Definition\n"
00115         s += "  Script local args: '" + self.__scriptLocalArgs + "'\n"
00116         s += "  Local environment: '" + self.__localEnvironment + "'\n"
00117         s += "  Input Sandbox:     '"  + self.__localInputSandbox + "'\n" 
00118         s += "  Output Sandbox:    '"  + self.__localOutputSandbox + "'\n\n" 
00119         s += "Retry Status\n"
00120         s += "  Try:                  "  + str(self.__tryNumber) + "\n"
00121         s += "  Retry Args:           '" + str(self.__retryArgs) + "'\n"
00122         s += "  Early Fails:          "  + str(self.__earlyFails) + "\n"
00123         s += "  Late Handled Fails:   "  + str(self.__lateFailsHandled) + "\n"
00124         s += "  Late Unhandled Fails: "  + str(self.__lateFailsUnhandled) + "\n"
00125 
00126         # If there exist any output, give a summary, either for the current try, if
00127         # complete, or the previous one if currently submitted.
00128 
00129         display_try = self.__tryNumber
00130         if self.__statusCode >= GID_JSC_SUBMITTED: display_try -= 1
00131         if display_try < 1: return s
00132         output_dir = self._GetTryOutputDir(display_try)
00133         s += "\nThe output for try %d can be found in\n\n   %s\n\n and consists of:-\n\n" % (display_try,output_dir)
00134         list_dir = output_dir + '/../listing.tmp'
00135         os.system("cd " + output_dir + ";ls -l > " + list_dir)
00136         f = open(list_dir)
00137         for line in f: s += "  " + line
00138         f.close()
00139         os.remove(list_dir)
00140         gbs_log_file_name = self._GetGbsLogFileName(display_try)
00141         gbs_log_file_spec =  str(output_dir) + "/" + str(gbs_log_file_name)
00142         if not os.path.isfile(gbs_log_file_spec): return s
00143         s += "\nThe GLF (GBS Log File) %s contains:-\n\n" % gbs_log_file_name
00144         f = open(gbs_log_file_spec)
00145         for line in f: s += "  " + line
00146         f.close()
00147         return s
00148 
    def CanClear(self):

def python::GBSJob::GBSJob::CanClear (   self  ) 

Return true if can ClearErrorCounts, ClearHistory and Remove

Definition at line 149 of file GBSJob.py.

00149                       :
00150         """Return true if can ClearErrorCounts, ClearHistory and Remove"""
00151         return not self.IsSubmitted()
00152 
    def CanKill(self):

def python::GBSJob::GBSJob::CanKill (   self  ) 

Return true if can Kill

Definition at line 153 of file GBSJob.py.

00153                      :
00154         """Return true if can Kill"""
00155         return self.IsSubmitted()
00156    
    def CanSubmit(self):

def python::GBSJob::GBSJob::CanSubmit (   self  ) 

Return true if can submit Ganga job

Definition at line 157 of file GBSJob.py.

00157                        :
00158 
00159         """Return true if can submit Ganga job"""
00160 
00161         if      self.__statusCode < GID_JSC_SUBMITTED \
00162             and self.__statusCode > GID_JSC_CANNOT_SUBMIT \
00163             and self.GetParent().GetScriptFileSpec(): return True
00164         return False
00165 
    def GetEarlyFailsCount(self):

def python::GBSJob::GBSJob::GetEarlyFailsCount (   self  ) 

Return Early Fails Count

Definition at line 166 of file GBSJob.py.

00166                                 :
00167         """Return Early Fails Count"""
00168         return self.__earlyFails
00169 
    def GetGangaJobId(self):

def python::GBSJob::GBSJob::GetGangaJobId (   self  ) 

Return associated Ganga job Id (if any) or -1 

Definition at line 170 of file GBSJob.py.

00170                            :
00171         """Return associated Ganga job Id (if any) or -1 """
00172         return self.__gangaJobId
00173 
    def GetGangaJob(self):

def python::GBSJob::GBSJob::GetGangaJob (   self  ) 

Return associated Ganga job (if any)

Definition at line 174 of file GBSJob.py.

00174                          :
00175         """Return associated Ganga job (if any)"""
00176         if self.__gangaJobId < 0: return None
00177         try:
00178             gj = Ganga.GPI.jobs(self.__gangaJobId)
00179         # I need to catch JobAccessError, but it isn't Ganga.GPI.JobAccessError so just
00180         # catch everything and hope it is O.K.!
00181         except: gj = None
00182         return gj
00183         
    def GetLateHandledFailsCount(self):

def python::GBSJob::GBSJob::GetLateHandledFailsCount (   self  ) 

Return Late Handled Fails Count

Definition at line 184 of file GBSJob.py.

00184                                       :
00185         """Return Late Handled Fails Count"""
00186         return self.__lateFailsHandled
00187     
    def GetLateUnhandledFailsCount(self):

def python::GBSJob::GBSJob::GetLateUnhandledFailsCount (   self  ) 

Return Late Unhandled Fails Count

Definition at line 188 of file GBSJob.py.

00188                                         :
00189         """Return Late Unhandled Fails Count"""
00190         return self.__lateFailsUnhandled
00191     
    def GetLocalEnvironment(self):

def python::GBSJob::GBSJob::GetLocalEnvironment (   self  ) 

Return, as a comma separated list string, the environment that is local to this job.

Definition at line 192 of file GBSJob.py.

00192                                  :
00193         """Return, as a comma separated list string, the environment that is local to this job."""
00194         return self.__localEnvironment
00195     
    def GetLocalInputSandbox(self):

def python::GBSJob::GBSJob::GetLocalInputSandbox (   self  ) 

Return, as a comma separated list string, the input sandbox file list that is local to this job.

Definition at line 196 of file GBSJob.py.

00196                                   :
00197         """Return, as a comma separated list string, the input sandbox file list that is local to this job."""
00198         return self.__localInputSandbox
00199     
    def GetLocalOutputSandbox(self):

def python::GBSJob::GBSJob::GetLocalOutputSandbox (   self  ) 

Return, as a comma separated list string, the ouput sandbox file list that is local to this job.

Definition at line 200 of file GBSJob.py.

00200                                    :
00201         """Return, as a comma separated list string, the ouput sandbox file list that is local to this job."""
00202         return self.__localOutputSandbox
00203 
    def GetPhaseCode(self):

def python::GBSJob::GBSJob::GetPhaseCode (   self  ) 

Return phase code. These are broad categories of status code used by task for job statistics.

Definition at line 204 of file GBSJob.py.

00204                           :
00205         """Return phase code. These are broad categories of status code used by task for job statistics."""
00206         pc = 0
00207         if self.IsComplete():
00208             pc = GID_JPC_DONE_NFAIL
00209             if not self.IsSuccessful(): pc = GID_JPC_DONE_FAIL
00210         elif self.IsReady():
00211             pc = GID_JPC_READY_NRETRY 
00212             if self.GetTryNumber() > 0: pc = GID_JPC_READY_RETRY
00213         elif self.IsSubmitted():
00214             pc = GID_JPC_SUBMIT_RUN
00215             if not self.IsRunning(): pc = GID_JPC_SUBMIT_NRUN
00216         else:
00217             pc = GID_JPC_NREADY_HOLD
00218             if not self.IsHeld(): pc = GID_JPC_NREADY_NHOLD
00219         return pc
00220 
00221         
    def GetRetryArgs(self):

def python::GBSJob::GBSJob::GetRetryArgs (   self  ) 

Returm as a string current retry args i.e. as determined from previous try (or empty for first try)

Definition at line 222 of file GBSJob.py.

00222                           :
00223         """Returm as a string current retry args i.e. as determined from previous try (or empty for first try)"""
00224         return self.__retryArgs
00225     
    def GetScriptLocalArgs(self):

def python::GBSJob::GBSJob::GetScriptLocalArgs (   self  ) 

Return (as a string) the comma list of application script args that are local to this job

Definition at line 226 of file GBSJob.py.

00226                                 :
00227         """Return (as a string) the comma list of application script args that are local to this job"""
00228         return self.__scriptLocalArgs
00229     
    def GetStatusCode(self):

def python::GBSJob::GBSJob::GetStatusCode (   self  ) 

Return status code

Definition at line 230 of file GBSJob.py.

00230                            :
00231         """Return status code"""
00232         return self.__statusCode
00233     
    def GetStatusText(self):

def python::GBSJob::GBSJob::GetStatusText (   self  ) 

Return status text which qualifies the Status Code

Definition at line 234 of file GBSJob.py.

00234                            :
00235         """Return status text which qualifies the Status Code"""
00236         return self.__statusText
00237     
    def GetStatusTime(self):

def python::GBSJob::GBSJob::GetStatusTime (   self  ) 

Return date time when current status code and text were achieved

Definition at line 238 of file GBSJob.py.

00238                            :
00239         """Return date time when current status code and text were achieved"""
00240         return self.__statusTime
00241     
    def GetTryNumber(self):

def python::GBSJob::GBSJob::GetTryNumber (   self  ) 

Return Try Number (0 before first try)

Definition at line 242 of file GBSJob.py.

00242                           :
00243         """Return Try Number (0 before first try)"""
00244         return self.__tryNumber
00245     
    def IsComplete(self):

def python::GBSJob::GBSJob::IsComplete (   self  ) 

Return true if job is complete (Successful or Failed)

Definition at line 246 of file GBSJob.py.

00246                         :
00247         """Return true if job is complete (Successful or Failed)"""
00248         return self.__statusCode <= GID_JSC_COMPLETE
00249     
    def IsFailure(self):

def python::GBSJob::GBSJob::IsFailure (   self  ) 

Return true if job is failure

Definition at line 250 of file GBSJob.py.

00250                        :
00251         """Return true if job is failure"""
00252         return self.__statusCode == GID_JSC_FAILED
00253 
    def IsHeld(self):

def python::GBSJob::GBSJob::IsHeld (   self  ) 

Return true if job is Held

Definition at line 254 of file GBSJob.py.

00254                     :
00255         """Return true if job is Held"""
00256         return self.__statusCode == GID_JSC_HELD
00257      
    def IsNotReady(self):

def python::GBSJob::GBSJob::IsNotReady (   self  ) 

Return true if job is not ready to submit

Definition at line 258 of file GBSJob.py.

00258                         :
00259         """Return true if job is not ready to submit"""
00260         return not self.IsReady() 
00261      
    def IsReady(self):

def python::GBSJob::GBSJob::IsReady (   self  ) 

Return true if job is ready to submit

Definition at line 262 of file GBSJob.py.

00262                      :
00263         """Return true if job is ready to submit"""
00264         return self.__statusCode > GID_JSC_CANNOT_SUBMIT and self.__statusCode < GID_JSC_SUBMITTED
00265     
    def IsRunning(self):

def python::GBSJob::GBSJob::IsRunning (   self  ) 

Return true if job is submitted and associated Ganga Job status is running

Definition at line 266 of file GBSJob.py.

00266                        :
00267         """Return true if job is submitted and associated Ganga Job status is running"""
00268         return self.__statusCode >= GID_JSC_SUBMITTED and re.search(r"Ganga status:running",self.__statusText)
00269   
    def IsSubmitted(self):

def python::GBSJob::GBSJob::IsSubmitted (   self  ) 

Return true if job is submitted

Definition at line 270 of file GBSJob.py.

00270                          :
00271         """Return true if job is submitted"""
00272         return self.__statusCode >= GID_JSC_SUBMITTED
00273    
    def IsSuccessful(self):

def python::GBSJob::GBSJob::IsSuccessful (   self  ) 

Return true if job is successful

Definition at line 274 of file GBSJob.py.

00274                           :
00275         """Return true if job is successful"""
00276         return self.__statusCode == GID_JSC_SUCCEEDED
00277 
00278 #-----------------------------------------------------------------------------------------------
00279 #
00280 #  User Callable Methods Setters
00281 #
00282 #-----------------------------------------------------------------------------------------------
00283 
00284 
    def Analyse(self,update = True):

def python::GBSJob::GBSJob::Analyse (   self,
  update = True 
)

Perform job termination analysis and optionally apply the results.

Definition at line 285 of file GBSJob.py.

00285                                    :
00286 
00287         """Perform job termination analysis and optionally apply the results."""
00288 
00289         # Quit if nothing to analyse.
00290         if self.__statusCode != GID_JSC_WAITING_ANALYSIS: return
00291         
00292         analyser = GetModelRegistry().CreateObject(self.GetModel(),"JobAnalyser","Solomon",self)
00293         analyser.Analyse(self)
00294         if update:
00295             analyser.Apply()
00296             gj = self.GetGangaJob()
00297             if gj: gj.remove()
00298             self.__gangaJobId  = -1
00299             self.GetParent().RefreshJobStats()
00300             self.Write()
00301 
    def ClearErrorCounts(self,warn=True):

def python::GBSJob::GBSJob::ClearErrorCounts (   self,
  warn = True 
)

If allowed, clear error counts, but leave retry history intact 

Can only applied to jobs that are ready to be submitted or that have failed and
in this case has the side effect of setting the status back to RETRY

Definition at line 302 of file GBSJob.py.

00302                                         :
00303 
00304         """If allowed, clear error counts, but leave retry history intact 
00305 
00306         Can only applied to jobs that are ready to be submitted or that have failed and
00307         in this case has the side effect of setting the status back to RETRY"""
00308 
00309         if self.CanClear():
00310             self.__earlyFails         = 0
00311             self.__lateFailsHandled   = 0
00312             self.__lateFailsUnhandled = 0
00313             if self.__statusCode == GID_JSC_FAILED:
00314                self.__statusText = "Retrying after users cleared errors"
00315                self._SetStatusCode(GID_JSC_RETRY)
00316             self.GetParent().RefreshJobStats()
00317             self.Write()
00318             return
00319         if warn: print "Cannot ClearErrorCounts on job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00320         
    def ClearHistory(self,confirm=True,warn=True):

def python::GBSJob::GBSJob::ClearHistory (   self,
  confirm = True,
  warn = True 
)

Completely clear all processing history so that processing begins again from scratch.

The only processing state that is retained is that if job was held it will still be.

Definition at line 321 of file GBSJob.py.

00321                                                  :
00322 
00323         """Completely clear all processing history so that processing begins again from scratch.
00324 
00325         The only processing state that is retained is that if job was held it will still be."""
00326         
00327         if not self.CanClear():
00328             if warn: print "Cannot ClearHistory on job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00329             return
00330         if confirm:
00331             ans = raw_input("Are you sure you want to clear the history of job " + self.GetName() + "  ?y[n]")
00332             if not re.search(r"^y",ans,re.I):
00333                 print "History not cleared."
00334                 return
00335         child_dir = self.GetStoreLocation("child_dir")
00336         if os.path.isdir(child_dir):
00337             if shutil.rmtree(child_dir):
00338                 Log(logger.ERROR,"Failed to remove obsolete job history directory:" + child_dir)
00339         self.__tryNumber     = 0
00340         self.__retryArgs     = ""
00341         self.__gangaJobId    = -1
00342         if not self.IsHeld():
00343             self.__statusText = "Ready to run"
00344             self._SetStatusCode(GID_JSC_NEW)
00345         self.ClearErrorCounts()
00346         return
00347         
    def Hold(self,warn=True):

def python::GBSJob::GBSJob::Hold (   self,
  warn = True 
)

Hold job, so that it won't be submitted.  Warn, if requested, if job not suitable for holding

Definition at line 348 of file GBSJob.py.

00348                             :
00349 
00350         """Hold job, so that it won't be submitted.  Warn, if requested, if job not suitable for holding"""
00351 
00352         if self.IsReady():
00353              self.__statusText = "Held by user"
00354              self._SetStatusCode(GID_JSC_HELD)
00355              self.GetParent().RefreshJobStats()
00356              self.Write()
00357              return
00358         if warn: print "Cannot HOLD job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00359 
    def Kill(self,warn=True):

def python::GBSJob::GBSJob::Kill (   self,
  warn = True 
)

Kill job that has been submitted to Ganga.  Warn, if requested, if job not suitable for killing

Definition at line 360 of file GBSJob.py.

00360                             :
00361 
00362        """Kill job that has been submitted to Ganga.  Warn, if requested, if job not suitable for killing"""
00363 
00364        if not self.CanKill():
00365            if warn: print "Cannot KILL job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00366            return
00367        try:
00368             gj = Ganga.GPI.jobs(self.__statusCode)
00369         # I need to catch JobAccessError, but it isn't Ganga.GPI.JobAccessError so just
00370         # catch everything and hope it is O.K.!
00371        except:
00372              Log(logger.ERROR,"Have lost Ganga job with ID " + str(self.__statusCode)\
00373                 + " for job " + self.GetName())
00374              return         
00375        # Kill job
00376        gj.kill()
00377        self.UpdateStatus()
00378 
    def Release(self,warn=True):

def python::GBSJob::GBSJob::Release (   self,
  warn = True 
)

Release job, so that it can be submitted.  Warn, if requested, if job not suitable for releasing

Definition at line 379 of file GBSJob.py.

00379                                :
00380 
00381         """Release job, so that it can be submitted.  Warn, if requested, if job not suitable for releasing"""
00382 
00383         if self.IsHeld():
00384              self.__statusCode = GID_JSC_NEW
00385              if  self.__tryNumber: self.__statusCode = GID_JSC_RETRY
00386              self._SetStatusText("Ready to run")
00387              self.GetParent().RefreshJobStats()
00388              self.Write()
00389              return
00390         if warn: print "Cannot RELEASE job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00391 
    def Remove(self,confirm=True,warn=True):

def python::GBSJob::GBSJob::Remove (   self,
  confirm = True,
  warn = True 
)

Completely remove job.

Definition at line 392 of file GBSJob.py.

00392                                            :
00393 
00394         """Completely remove job."""
00395         
00396         if not self.CanClear():
00397             if warn: print "Cannot Remove job %s; its status is %s" % (self.GetName(),GIDStringForJSC(self.__statusCode))
00398             return
00399         if confirm:
00400             ans = raw_input("Are you sure you want to remove job " + self.GetName() + "  ?y[n]")
00401             if not re.search(r"^y",ans,re.I):
00402                 print "Not removed."
00403                 return
00404         child_dir = self.GetStoreLocation("child_dir")
00405         if os.path.isdir(child_dir):
00406             if shutil.rmtree(child_dir):
00407                 Log(logger.ERROR,"Failed to remove obsolete job history directory:" + child_dir)
00408         state_file = self.GetStoreLocation("self")
00409         if os.remove(state_file):
00410             Log(logger.ERROR,"Failed to remove state file:" + state_file)
00411         # Sneak into parent task's job list and remove entry.
00412         del self.GetParent()._jobManagers[self.GetName()]
00413         self.GetParent().RefreshJobStats()
00414         return
00415         
    def SetLocalEnvironment(self,env_str):

def python::GBSJob::GBSJob::SetLocalEnvironment (   self,
  env_str 
)

Set, as a comma separated list string, the environment that local to this job..

      e.g.  job.SetLocalEnvironment('var1=123,var2=a string with spaces,var3=456')

Definition at line 416 of file GBSJob.py.

00416                                          :
00417 
00418         """Set, as a comma separated list string, the environment that local to this job..
00419 
00420       e.g.  job.SetLocalEnvironment('var1=123,var2=a string with spaces,var3=456')"""
00421 
00422         d =  GBSUtilities.ParseEnvStr(env_str)
00423         if env_str and not d: print "Cannot parse environment string: '" + env_str + "'"
00424         else:
00425             self.__localEnvironment = env_str
00426             self.Write()
00427 
    def SetLocalInputSandbox(self,in_sbox_str):

def python::GBSJob::GBSJob::SetLocalInputSandbox (   self,
  in_sbox_str 
)

Set, as a comma separated list string, the input sandbox file list that is local to this job.

      e.g.  task.SetLocalInputSandbox('/home/users/west/my_input_data.dat,../my_script.sh')

Definition at line 428 of file GBSJob.py.

00428                                               :
00429 
00430         """Set, as a comma separated list string, the input sandbox file list that is local to this job.
00431 
00432       e.g.  task.SetLocalInputSandbox('/home/users/west/my_input_data.dat,../my_script.sh')"""
00433 
00434         (ok,str) = GBSUtilities.ProcessSandboxSetup(self,in_sbox_str,"input")
00435         if ok:
00436             self.__localInputSandbox = str
00437             self.Write()
00438 
    def SetLocalOutputSandbox(self,out_sbox_str):

def python::GBSJob::GBSJob::SetLocalOutputSandbox (   self,
  out_sbox_str 
)

Set, as a comma separated list string, the output sandbox file list that is local to this job.

      e.g.  task.SetLocalOutputSandbox('my_output_data.dat,my_output.log')

Definition at line 439 of file GBSJob.py.

00439                                                 :
00440 
00441         """Set, as a comma separated list string, the output sandbox file list that is local to this job.
00442 
00443       e.g.  task.SetLocalOutputSandbox('my_output_data.dat,my_output.log')"""
00444 
00445         (ok,str) = GBSUtilities.ProcessSandboxSetup(self,out_sbox_str,"output")
00446         if ok:
00447             self.__localOutputSandbox = str
00448             self.Write()
00449 
    def SetScriptLocalArgs(self,arg_str):

def python::GBSJob::GBSJob::SetScriptLocalArgs (   self,
  arg_str 
)

Set (as a string) the comma separated list of application script args that are local to this job.

      e.g.  job.SetScriptLocalArgs('123,a string with spaces,456')

Definition at line 450 of file GBSJob.py.

00450                                         :
00451 
00452         """Set (as a string) the comma separated list of application script args that are local to this job.
00453 
00454       e.g.  job.SetScriptLocalArgs('123,a string with spaces,456')"""
00455 
00456         self.__scriptLocalArgs = arg_str
00457         self.Write()
00458 
    def Submit(self,Perusable=False):

def python::GBSJob::GBSJob::Submit (   self,
  Perusable = False 
)

Submit job if permitted and return True if successful.

Definition at line 459 of file GBSJob.py.

00459                                     :
00460 
00461         """Submit job if permitted and return True if successful."""
00462 
00463         my_manager = self.GetParent()
00464 
00465         #  Get the user application script
00466         script_spec = my_manager.GetScriptFileSpec()
00467         if not script_spec:
00468             print "Cannot submit job, no user application script assigned to Task '" + my_manager.GetName() + "'"
00469             return False
00470         script_name = my_manager.GetScriptFileName()
00471 
00472         if not self.CanSubmit():
00473             print "Cannot submit job " + self.GetName() + ":" \
00474                   " Status: " + GIDStringForJSC(self.__statusCode) \
00475                   + " [" + self.GetStatusText() + "]"
00476             return False
00477 
00478         if not my_manager.IsAuthorisedToSubmit(): return False
00479 
00480         # Increment try number and prepare output directory to receive results
00481         
00482         self.__tryNumber += 1
00483         self.MakeChildDirectory()
00484         output_dir = self._GetTryOutputDir()
00485         # It ought not to exist already, but in case it is, remove it first.
00486         if os.path.isdir(output_dir):
00487             Log(logger.SYNOPSIS,"Removing obsolete directory for job output:" + str(output_dir))
00488             if shutil.rmtree(output_dir):
00489                 Log(logger.ERROR,"Failed to remove obsolete directory for job output:" + str(output_dir))
00490         Log(logger.SYNOPSIS,"Creating directory for job output:" + str(output_dir))
00491         if os.mkdir(output_dir,0755):
00492             Log(logger.ERROR,"Failed to create directory for job output:" + str(output_dir))
00493 
00494         # Prepare GBS Log File in output directory
00495         gbs_log_file_name = self._GetGbsLogFileName()
00496         gbs_log_file_spec =  str(output_dir) + "/" + str(gbs_log_file_name)
00497         os.system("echo " + timestamp() + " INFO GBS_JOB_SUBMIT submitting job > " + gbs_log_file_spec)
00498 
00499         # Prepare arg list: GLF file, appl. script, global args, local args.
00500         args = []
00501         args.append(gbs_log_file_name)
00502         args.append(script_name)
00503 
00504         # Collect up the application script args, taking account of ecaped commas. 
00505         script_args = []
00506         for arglist in [my_manager.GetScriptGlobalArgs(),self.GetScriptLocalArgs()]:
00507             if arglist: GBSUtilities.ParseCommaSepList(arglist,script_args)
00508 
00509         # Passing args with anything but alphanumeric data into the
00510         # application script is a pain.  The logical approach, placing
00511         # each arg as a separate element in 'args' works for the local
00512         # back-end but is broken for LCG where all the args are first
00513         # interpolated and then re-parsed to define the argument list
00514         # that gets passed into the script.  Any string containing
00515         # white space gets broken apart and anything containing "(" causes
00516         # a syntax error! So, instead we manufacture the wrapper
00517         # script on the fly that will call the application script 
00518         # which gives us total control over the way arguments get passed in.
00519 
00520         wrapper_file_spec =  str(output_dir) + "/" + "gbs_job_wrapper.sh"
00521 
00522         #Copy the leading part of the wrapper
00523         os.system("cp $GBS_HOME/python/gbs_job_wrapper_part_1.sh " + wrapper_file_spec)
00524 
00525         # Manufacture the bit where the arguments get passed in
00526         wrapper  = file(wrapper_file_spec,'a')
00527         num_arg  = 0
00528         arglist  = ""
00529         for arg in script_args:
00530             num_arg += 1
00531             wrapper.write("arg" + str(num_arg) + "=\"" + arg + "\"\n")
00532             arglist += "\"$arg" + str(num_arg) +"\" "
00533         wrapper.write("/usr/bin/time --format 'CPU User: %U sec, System: %S sec.  Elapse: %E. ' --output=$GBS_HOME/time_output_$$ "\
00534                       + "./$user_script " + arglist + "\n")
00535         wrapper.close()
00536 
00537         #Copy the trailing part of the wrapper
00538         os.system("cat $GBS_HOME/python/gbs_job_wrapper_part_2.sh >>" + wrapper_file_spec)
00539 
00540         # Prepare the GBS environment
00541         env = {}
00542         env['GBS_MODE'] = my_manager.GetMode()
00543         env['GBS_RETRY_COUNT'] = str(self.__tryNumber -1)
00544         retry_arg_no = 0
00545         for retry_arg in self.__retryArgs.split():
00546             retry_arg_no += 1
00547             env['GBS_RETRY_ARG_' + str(retry_arg_no)] = retry_arg
00548         env['GBS_NUM_RETRY_ARGS'] = str(retry_arg_no)
00549 
00550         # Prepare the user environment
00551         for env_str in [my_manager.GetGlobalEnvironment(),self.GetLocalEnvironment()]:
00552             if not env_str: continue
00553             for var,val in GBSUtilities.ParseEnvStr(env_str).iteritems():
00554                 env[var] = val
00555         
00556         # Prepare the executable
00557         exe = Ganga.GPI.Executable(exe = Ganga.GPI.File(wrapper_file_spec), env = env,args = args)
00558 
00559         # Prepare sandboxes
00560         inputsandbox = []
00561         inputsandbox.append(script_spec)
00562         inputsandbox.append(gbs_log_file_spec)
00563         env_cmds = ""
00564         for e_name,e_value in env.iteritems():
00565            env_cmds += e_name + "=" + str(e_value) + ";" 
00566         for obj in [my_manager,self]:
00567             sb_file_list = []
00568             if obj is self: GBSUtilities.ParseCommaSepList(self.GetLocalInputSandbox(),sb_file_list)
00569             else:           GBSUtilities.ParseCommaSepList(my_manager.GetGlobalInputSandbox(),sb_file_list)
00570             input_dir = obj.GetStoreLocation("child_dir") + "/InputSandbox"
00571             for sb_file in sb_file_list: inputsandbox.append(input_dir + "/" + sb_file)
00572         outputsandbox = ['gbs_output_sandbox.tar.gz']
00573         outputsandbox.append(gbs_log_file_name)
00574         for obj in [my_manager,self]:
00575             sb_file_list = []
00576             if obj is self: GBSUtilities.ParseCommaSepList(self.GetLocalOutputSandbox(),sb_file_list)
00577             else:           GBSUtilities.ParseCommaSepList(my_manager.GetGlobalOutputSandbox(),sb_file_list)
00578             for sb_file in sb_file_list:
00579                 if sb_file[0] == "$":
00580                     inp = os.popen(env_cmds + sb_file[1:])
00581                     sb_file =  inp.read()
00582                     inp.close()
00583                 outputsandbox.append(sb_file)
00584 
00585         # Prepare backend
00586         backend = my_manager.GetBackend()
00587         queue   = ""
00588         mo = re.search(r"(.*?):(.*)",backend)
00589         if mo:
00590             (backend,queue) = mo.groups()
00591             if backend == "Local": queue   = ""
00592         
00593         # Prepare the job
00594         gj               = Ganga.GPI.Job(backend=backend)
00595         Ganga.GPI.jobtree.add(gj,my_manager.GetGangaTreeDir())
00596         if queue :
00597             if backend == "PBS": gj.backend.queue = queue
00598             if backend == "LCG": gj.backend.CE    = queue
00599         # If GLITE_ENABLE is enabled select the correct middleware
00600         if backend == "LCG" and Ganga.GPI.config['LCG']['GLITE_ENABLE']:
00601             gj.backend.middleware = 'GLITE'
00602             if Perusable: gj.backend.perusable = True
00603         elif Perusable: print "Cannot select perusable; backend not LCG/GLITE"
00604         gj.application   = exe
00605         gj.inputsandbox  = inputsandbox
00606         gj.outputsandbox = outputsandbox
00607         if GetLoggerThreshold() <= logger.SYNOPSIS: print "Contents of Ganga job:-\n\n" + str(gj)
00608         try:
00609             print "Submitting ",self.GetName()
00610             timeout_call(gj.submit,2*60)
00611         except Exception,inst:
00612              Log(logger.ERROR,"Caught exception:" + str(inst) +" Failed to complete Ganga job submit")
00613              self.__tryNumber -= 1
00614              return False
00615         self.__gangaJobId = gj.id
00616         self.__statusText = "Ganga status:" + gj.status
00617         self._SetStatusCode(gj.id)
00618         self.GetParent().RefreshJobStats()
00619         self.Write()
00620         return True
00621    
00622 
    def UpdateStatus(self):

def python::GBSJob::GBSJob::UpdateStatus (   self  ) 

Get latest status.  May involve checking Ganga job and retrieving output.

Definition at line 623 of file GBSJob.py.

00623                           :
00624         
00625         """Get latest status.  May involve checking Ganga job and retrieving output."""
00626 
00627         if not self.IsSubmitted(): return
00628         try:
00629             gj = Ganga.GPI.jobs(self.__statusCode)
00630         # I need to catch JobAccessError, but it isn't Ganga.GPI.JobAccessError so just
00631         # catch everything and hope it is O.K.!
00632         except:
00633             gj = None
00634 
00635         # Deal with lost Ganga job, ought not to happen, but then lots of things in life ought not to happen.
00636         if not gj:
00637             Log(logger.ERROR,"Have lost Ganga job with ID " + str(self.__statusCode)\
00638                 + " for job " + self.GetName())
00639             # Treat as if status has reverted to "new"
00640             g_status = 'new'
00641         else:
00642             g_status = gj.status
00643 
00644         # Deal with cases when job hasn't finished
00645         if     g_status == "completing" \
00646            or  g_status == "ready"      \
00647            or  g_status == "running"    \
00648            or  g_status == "scheduled"  \
00649            or  g_status == "submitted"  \
00650            or  g_status == "waiting":
00651             self._SetStatusText("Ganga status:" + g_status)
00652 
00653             # If Ganga job does not appear stalled, just record any change in status.
00654             t_stall_hours = (time.time() \
00655                              - time.mktime(time.strptime(self.__statusTime.strip(),"%Y-%m-%d %H:%M:%S")))/3600.
00656             # Set a 3 hour time limit except for scheduled and running where 3 days is allowed.
00657             t_timeout_hours = 3.
00658             if g_status == "scheduled" or  g_status == "running": t_timeout = 3.*24.
00659             if t_stall_hours < t_timeout_hours:
00660                 self.GetParent().RefreshJobStats()
00661                 self.Write()
00662                 return
00663             # Ganga job has stalled; kill it and treat as new.
00664             self._SetStatusText("Ganga job %d stalled for %5.1f hours; killed" % (gj.id,t_stall_hours))
00665             Log(logger.ERROR,self.__statusText)
00666             gj.remove()
00667             g_status = 'new'
00668 
00669         # If status has reverted to "new" submit never took place
00670         if g_status == 'new':
00671             self.__tryNumber -= 1
00672             self.__statusCode = GID_JSC_NEW
00673             self.__gangaJobId  = -1
00674             if self.__tryNumber > 1: self.__statusCode = GID_JSC_RETRY
00675             self._SetStatusText("Ready to run")
00676             self.GetParent().RefreshJobStats()
00677             self.Write()
00678             return
00679 
00680 
00681         # For all other cases move over output data, unpack gbs_output_sandbox.tar.gz (if present) and save Ganga status
00682         output_dir = self._GetTryOutputDir()
00683         Log(logger.SYNOPSIS,"Retrieving output from " + str(gj.outputdir) + " into " + str(output_dir))
00684         if not os.listdir(gj.outputdir):
00685            Log(logger.ERROR,"No output files in " + str(gj.outputdir))
00686         elif os.system("cp " + str(gj.outputdir) + "* " + str(output_dir)):
00687             Log(logger.ERROR,"Failed to retrieve output from " + str(gj.outputdir) + " into " + str(output_dir))
00688         if gj.backend.perusable:
00689             Log(logger.INFO,"Job is perusable, attempting to recover stdout, as stdout.perusable")
00690             cmd = "glite-wms-job-perusal --get -f stdout --all --noint " \
00691                   + gj.backend.id + " > " + output_dir + "/stdout.perusable"
00692             if os.system(cmd):
00693                 Log(logger.ERROR,"Failed to retrieve persuable output using ",cmd)
00694         gbs_output_sandbox = output_dir + "/gbs_output_sandbox.tar.gz"
00695         if os.path.isfile(gbs_output_sandbox):
00696             Log(logger.SYNOPSIS,"Unpacking " + gbs_output_sandbox)
00697             if os.system("cd " + output_dir + "; tar zxf gbs_output_sandbox.tar.gz"):
00698                 Log(logger.ERROR,"Failed to unpack " + gbs_output_sandbox)
00699             else:
00700                 os.remove(gbs_output_sandbox)
00701         gangaStatusFile = output_dir + "/gbs_ganga.status"
00702         f = open(gangaStatusFile,"w")
00703         f.write(str(gj))
00704         f.close()
00705         self.__statusCode = GID_JSC_WAITING_ANALYSIS
00706         self._SetStatusText("Waiting to analyse.  Ganga exist status was " + g_status)
00707 
00708         # Now analyse the results and decide what happens next
00709         self.Analyse()
00710         # No need to write self, the Analyse() call will have done that.
00711 
00712         
    ######  Private Methods (not user callable)  ######

def python::GBSJob::GBSJob::_GetTryOutputDir (   self,
  try_req = 0 
) [private]

Return the file spec for the current try (or supplied try) output directory

Definition at line 715 of file GBSJob.py.

00715                                            :
00716 
00717         """Return the file spec for the current try (or supplied try) output directory"""
00718 
00719         try_use = self.__tryNumber
00720         if try_req > 0: try_use = try_req
00721         return self.GetStoreLocation("child_dir") + "/try_" + str(try_use).zfill(3)
00722 
00723 
    def _GetGbsLogFileName(self, try_req = 0):

def python::GBSJob::GBSJob::_GetGbsLogFileName (   self,
  try_req = 0 
) [private]

Return the name of the GBS Log File for the current try (or supplied try).

Definition at line 724 of file GBSJob.py.

00724                                              :
00725         
00726         """Return the name of the GBS Log File for the current try (or supplied try)."""
00727         try_use = self.__tryNumber
00728         if try_req > 0: try_use = try_req
00729         return "gbs_" + self.GetParent().GetName() + "_" + self.GetName() +  "_" + str(try_use) + ".log"
00730 
00731     #  Methods used by JobAnalyser to update 
    def _IncrementEarlyFailsCount(self):         self.__earlyFails += 1

def python::GBSJob::GBSJob::_IncrementEarlyFailsCount (   self  )  [private]

Definition at line 732 of file GBSJob.py.

00732 :         self.__earlyFails += 1

def python::GBSJob::GBSJob::_IncrementLateHandledFailsCount (   self  )  [private]

Definition at line 733 of file GBSJob.py.

def python::GBSJob::GBSJob::_IncrementLateUnhandledFailsCount (   self  )  [private]

Definition at line 734 of file GBSJob.py.

def python::GBSJob::GBSJob::_SetRetryArgs (   self,
  value 
) [private]

Definition at line 735 of file GBSJob.py.

00735                                  :               self.__retryArgs = value
00736 
00737     # Methods that record state changes.  These also update state time stamp and add
00738     # entry to GLF if it exists.

def python::GBSJob::GBSJob::_SetStatusCode (   self,
  value 
) [private]

Definition at line 739 of file GBSJob.py.

00739                                   :
00740         if self.__statusCode == value: return
00741         self.__statusCode = value
00742         self.__statusTime = timestamp()
00743         gbs_log_file_spec = self._GetTryOutputDir() + "/" + self._GetGbsLogFileName()
00744         if os.path.isfile(gbs_log_file_spec):
00745             os.system("echo '" + timestamp() + "INFO State change " + GIDStringForJSC(self.__statusCode) \
00746                       +  " [" + self.__statusText + "]' >> " + gbs_log_file_spec)
    def _SetStatusText(self,value):

def python::GBSJob::GBSJob::_SetStatusText (   self,
  value 
) [private]

Definition at line 747 of file GBSJob.py.

00747                                   :
00748         if self.__statusText == value: return
00749         self.__statusText = value
00750         self.__statusTime = timestamp()
00751         gbs_log_file_spec = self._GetTryOutputDir() + "/" + self._GetGbsLogFileName()
00752         if os.path.isfile(gbs_log_file_spec):
00753             os.system("echo '" + timestamp() + "INFO State change " + GIDStringForJSC(self.__statusCode) \
00754                       +  " [" + self.__statusText + "]' >> " + gbs_log_file_spec)
00755 


Member Data Documentation

python::GBSJob::GBSJob::__tryNumber [private]

Definition at line 31 of file GBSJob.py.

python::GBSJob::GBSJob::__earlyFails [private]

Definition at line 32 of file GBSJob.py.

python::GBSJob::GBSJob::__lateFailsHandled [private]

Definition at line 33 of file GBSJob.py.

python::GBSJob::GBSJob::__lateFailsUnhandled [private]

Definition at line 34 of file GBSJob.py.

python::GBSJob::GBSJob::__scriptLocalArgs [private]

Definition at line 35 of file GBSJob.py.

python::GBSJob::GBSJob::__localEnvironment [private]

Definition at line 36 of file GBSJob.py.

python::GBSJob::GBSJob::__localInputSandbox [private]

Definition at line 37 of file GBSJob.py.

python::GBSJob::GBSJob::__localOutputSandbox [private]

Definition at line 38 of file GBSJob.py.

python::GBSJob::GBSJob::__statusCode [private]

Definition at line 41 of file GBSJob.py.

python::GBSJob::GBSJob::__statusText [private]

Definition at line 49 of file GBSJob.py.

python::GBSJob::GBSJob::__statusTime [private]

Definition at line 50 of file GBSJob.py.

python::GBSJob::GBSJob::__retryArgs [private]

Definition at line 52 of file GBSJob.py.

python::GBSJob::GBSJob::__gangaJobId [private]

Definition at line 53 of file GBSJob.py.


The documentation for this class was generated from the following file:
Generated on Mon Feb 18 14:42:03 2008 for gbs by  doxygen 1.5.4