Public Member Functions | |
| def | __init__ |
| def | Clear |
| def | GetType |
| def | GetRetryArgs |
| def | GetJob |
| def | __repr__ |
| def | Analyse |
| def | Apply |
Private Member Functions | |
| def | __CollectData |
| def | __MakeJudgement |
| def | __WriteLoggingInfoFile |
Private Attributes | |
| __clientJob | |
| __comLevelCode | |
| __comLevelText | |
| __gangaStatus | |
| __glfTimeInterval | |
| __applStatusCode | |
| __applStatusText | |
| __judgeStatusCode | |
| __judgeStatusText | |
| __judgeRetryArgs | |
| __judgeFailCategory | |
Job termination analysis object. Based on output returned and past history decides what to do next. Unlike other role-playing objects this does not inherit from GBSObject as it has no persistable state; it is called into existance by a Job to analyse the results of a Ganga job and then discarded. It has no user callable methods
Definition at line 11 of file GBSJobAnalyser.py.
| def python::GBSJobAnalyser::GBSJobAnalyser::__init__ | ( | self, | ||
| name, | ||||
| parent, | ||||
| model, | ||||
| model_args | ||||
| ) |
| def python::GBSJobAnalyser::GBSJobAnalyser::Clear | ( | self | ) |
Clear out existing analysis in case we ever want to use the object again.
Definition at line 29 of file GBSJobAnalyser.py.
00029 : 00030 00031 """Clear out existing analysis in case we ever want to use the object again.""" 00032 00033 self.__clientJob = None # Client Job to be analysed 00034 00035 # The following are set during data collection phase:- 00036 self.__comLevelCode = 0 # Communication level code of returned information 00037 self.__comLevelText = "" # Communication level text qualifying failure to reach USER level 00038 self.__gangaStatus = "" # Ganga exit status 00039 self.__glfTimeInterval = 0 # Time duration as measured by GBS Log File, if available 00040 self.__applStatusCode = GID_JSC_UNKNOWN # Application requested status code, if available 00041 self.__applStatusText = "" # Application requested status text, if available 00042 00043 # The following are set during judgement phase. 00044 self.__judgeStatusCode = 0 # New value of Job statusCode (JSC) 00045 self.__judgeStatusText = "" # New value of Job statusText 00046 self.__judgeRetryArgs = "" # New value of Job retryArgs 00047 self.__judgeFailCategory = 0 # Failure category code (FCC) 00048 00049 def GetType(self): return "GBSJobAnalyser"
| def python::GBSJobAnalyser::GBSJobAnalyser::GetType | ( | self | ) |
| def python::GBSJobAnalyser::GBSJobAnalyser::GetRetryArgs | ( | self | ) |
| def python::GBSJobAnalyser::GBSJobAnalyser::GetJob | ( | self | ) |
| def python::GBSJobAnalyser::GBSJobAnalyser::__repr__ | ( | self | ) |
Definition at line 56 of file GBSJobAnalyser.py.
00056 : 00057 if not self.__clientJob: return "No associated client Job" 00058 return " Communication Level: " + GIDStringForCLC(self.__comLevelCode) \ 00059 + " [" + str(self.__comLevelText) + "]\n" \ 00060 + " Ganga Exit Status: '" + self.__gangaStatus \ 00061 + "' Recorded job interval:" + str(self.__glfTimeInterval) + "mins\n" \ 00062 + " Appl. Job Status Code: " + GIDStringForJSC(self.__applStatusCode) \ 00063 + " [" + self.__applStatusText + "]\n" \ 00064 + " Failure category: " + GIDStringForFCC(self.__judgeFailCategory) + "\n" \ 00065 + " Judgement: Status Code:" + GIDStringForJSC(self.__judgeStatusCode) \ 00066 + " [" + self.__judgeStatusText + "]" + " Retry Args:'" + self.__judgeRetryArgs + "'" 00067 def Analyse(self,job):
| def python::GBSJobAnalyser::GBSJobAnalyser::Analyse | ( | self, | ||
| job | ||||
| ) |
Analyse client job termination.
Definition at line 68 of file GBSJobAnalyser.py.
00068 : 00069 00070 """Analyse client job termination.""" 00071 00072 self.Clear() 00073 self.__clientJob = job 00074 00075 self.__CollectData() 00076 self.__MakeJudgement() 00077 def Apply(self):
| def python::GBSJobAnalyser::GBSJobAnalyser::Apply | ( | self | ) |
Apply results of analysis to client job.
Definition at line 78 of file GBSJobAnalyser.py.
00078 : 00079 00080 """Apply results of analysis to client job.""" 00081 00082 job = self.__clientJob 00083 00084 # Increment fail counts as appropriate. 00085 if self.__judgeFailCategory == GID_FCC_EARLY: job._IncrementEarlyFailsCount() 00086 if self.__judgeFailCategory == GID_FCC_LATE_HANDLED : job._IncrementLateHandledFailsCount() 00087 if self.__judgeFailCategory == GID_FCC_LATE_UNHANDLED: job._IncrementLateUnhandledFailsCount() 00088 00089 # Update job status. 00090 job._SetStatusCode(self.__judgeStatusCode) 00091 job._SetStatusText(self.__judgeStatusText) 00092 job._SetRetryArgs(self.__judgeRetryArgs) 00093 00094 # Record the results to the GLF. 00095 jobTryDir = self.__clientJob._GetTryOutputDir() 00096 gbs_log_file_spec = jobTryDir + "/" + self.__clientJob._GetGbsLogFileName() 00097 if not os.path.isfile(gbs_log_file_spec):os.system("echo " + timestamp() + \ 00098 " INFO GBS_JOB_ANALYSIS Unable to find file, recreating it. > " + gbs_log_file_spec) 00099 f = open(gbs_log_file_spec,'a') 00100 f.write(timestamp() + "INFO GBS_JOB_ANALYSIS:-\n") 00101 f.write(str(self) + "\n") 00102 f.close() 00103 ###### Private Methods (not user callable) ######
| def python::GBSJobAnalyser::GBSJobAnalyser::__CollectData | ( | self | ) | [private] |
Colect information about the client job termination.
Definition at line 107 of file GBSJobAnalyser.py.
00107 : 00108 00109 """Colect information about the client job termination.""" 00110 00111 self.__judgeFailCategory = GID_FCC_NONE 00112 self.__judgeStatusCode = GID_JSC_WAITING_ANALYSIS 00113 self.__judgeStatusText = "" 00114 00115 jobTryDir = self.__clientJob._GetTryOutputDir() 00116 00117 # GANGA Level: Check for valid Ganga exit code 00118 00119 self.__comLevelCode = GID_CLC_GANGA 00120 self.__gangaStatus = "Cannot obtain Ganga exit code" 00121 gangaStatusFile = jobTryDir + "/gbs_ganga.status" 00122 if not os.path.isfile(gangaStatusFile): 00123 self.__comLevelText = "Failed to find Ganga status file" + str(gangaStatusFile) 00124 Log(logger.ERROR,self.__comLevelText) 00125 return 00126 Log(logger.DEBUG,"Reading Ganga status file" + str(gangaStatusFile)) 00127 self.__gangaStatus = "" 00128 grid_middleware = "" 00129 grid_id = "" 00130 f = open(gangaStatusFile) 00131 for line in f: 00132 # Collect the first status = ... line 00133 mo = re.search(r"status = '(.*)'",line) 00134 if mo and not self.__gangaStatus: self.__gangaStatus = mo.group(1) 00135 mo = re.search(r"middleware = '(.*)'",line) 00136 if mo: grid_middleware = mo.group(1) 00137 mo = re.search(r"id = '(http.*)'",line) 00138 if mo: grid_id = mo.group(1) 00139 f.close() 00140 self.__WriteLoggingInfoFile(grid_middleware,grid_id) 00141 if self.__gangaStatus != "aborted" \ 00142 and self.__gangaStatus != "cancelled" \ 00143 and self.__gangaStatus != "completed" \ 00144 and self.__gangaStatus != "done"\ 00145 and self.__gangaStatus != "failed"\ 00146 and self.__gangaStatus != "killed": 00147 self.__gangaStatus = "Unknown Ganga status:'" + self.__gangaStatus + "'" 00148 self.__comLevelText = self.__gangaStatus 00149 Log(logger.ERROR,self.__gangaStatus) 00150 return 00151 00152 # GRID Level:Check for GRID failures 00153 00154 self.__comLevelCode = GID_CLC_GRID 00155 if self.__gangaStatus == "aborted": 00156 self.__comLevelText = "GRID Failure: ABORTED" 00157 return 00158 if self.__gangaStatus == "cancelled"\ 00159 or self.__gangaStatus == "killed": 00160 self.__comLevelText = "GRID Failure: CANCELLED" 00161 return 00162 00163 # BATCH, WORKER, APPLICATION and USER level: Attempt to read the GBS Log File and collection information. 00164 00165 self.__comLevelCode = GID_CLC_BATCH 00166 if self.__gangaStatus == "failed": 00167 self.__comLevelText = "BATCH Failure: FAILED" 00168 return 00169 gbs_log_file_spec = jobTryDir + "/" + self.__clientJob._GetGbsLogFileName() 00170 if not os.path.isfile(gbs_log_file_spec): 00171 self.__comLevelText = "Failed to find " + str(gbs_log_file_spec) 00172 return 00173 Log(logger.DEBUG,"Reading GBS Log File " + str(gbs_log_file_spec)) 00174 00175 has_wrapper_start = 0 00176 has_wrapper_term = 0 00177 first_time_string = "" 00178 last_time_string = "" 00179 app_exit_code = 0 00180 f = open(gbs_log_file_spec) 00181 for line in f: 00182 mo = re.search(r"^(\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d)\s+(\S+)\s+(.+)\s$",line) 00183 if not mo: 00184 Log(logger.WARNING,gbs_log_file_spec + ":Ignoring malformed line: " + string.rstrip(line)) 00185 continue 00186 (date_time,keyword,data) = mo.groups() 00187 Log(logger.DEBUG," Parse GBS Log File line. date_time:" + date_time + ",keyword:" + keyword + ",data:" + data + ".") 00188 mo = re.search(r"^GBS_JOB_WRAPPER Starting",data ) 00189 if mo: 00190 has_wrapper_start = 1 00191 first_time_string = date_time 00192 mo = re.search(r"^GBS_JOB_WRAPPER Terminating.*User script returned (\d+)",data ) 00193 # Collect end date time only until wrapper end. 00194 if not has_wrapper_term: last_time_string = date_time 00195 if mo: 00196 has_wrapper_term = 1 00197 app_exit_code = int(mo.group(1)) 00198 if (keyword == "SUCCEEDED"): 00199 self.__applStatusCode = GID_JSC_SUCCEEDED 00200 self.__applStatusText = data 00201 if (keyword == "FAILED"): 00202 self.__applStatusCode = GID_JSC_FAILED 00203 self.__applStatusText = data 00204 if (keyword == "RETRY"): 00205 self.__applStatusCode = GID_JSC_RETRY 00206 self.__applStatusText = data 00207 f.close() 00208 00209 # If we have collect two times, compute the interval 00210 if first_time_string and last_time_string: 00211 try: 00212 t_start = time.mktime(time.strptime(first_time_string,"%Y-%m-%d %H:%M:%S")) 00213 t_term = time.mktime(time.strptime(last_time_string,"%Y-%m-%d %H:%M:%S")) 00214 self.__glfTimeInterval = (t_term -t_start)/60. 00215 except ValueError: 00216 Log(logger.WARNING,"Bad times found in GLF: " + str(first_time_string) + ";"+ str(last_time_string) + ".") 00217 00218 # We now have all the information to complete data collection 00219 00220 if not has_wrapper_start: 00221 self.__comLevelText = "GBS Log File has no job wrapper start line" 00222 return 00223 00224 self.__comLevelCode = GID_CLC_WORKER 00225 if not has_wrapper_term: 00226 self.__comLevelText = "GBS Log File has no job wrapper end line" 00227 return 00228 00229 self.__comLevelCode = GID_CLC_APPLICATION 00230 if app_exit_code: 00231 self.__comLevelText = "Application returned exit code:" + str(app_exit_code) 00232 return 00233 00234 if self.__applStatusCode == GID_JSC_UNKNOWN: 00235 self.__comLevelText = "Application failed to record SUCCEEDED, FAILED or RETRY" 00236 return 00237 00238 # Hurrah! we have actually managed to communicate with the application. 00239 00240 self.__comLevelCode = GID_CLC_USER 00241 self.__comLevelText = "Achieved communication with application" 00242 return 00243 def __MakeJudgement(self):
| def python::GBSJobAnalyser::GBSJobAnalyser::__MakeJudgement | ( | self | ) | [private] |
Make judgement on what next to do with Job but leave application to Apply() method.
Definition at line 244 of file GBSJobAnalyser.py.
00244 : 00245 00246 """Make judgement on what next to do with Job but leave application to Apply() method.""" 00247 00248 # Use current RetryArgs by default but replace if get RETRY signal from application 00249 self.__judgeRetryArgs = self.__clientJob.GetRetryArgs() 00250 00251 if self.__comLevelCode == GID_CLC_GANGA: 00252 self.__judgeFailCategory = GID_FCC_LATE_UNHANDLED 00253 00254 elif self.__comLevelCode == GID_CLC_GRID: 00255 00256 if self.__gangaStatus == "aborted": 00257 self.__judgeFailCategory = GID_FCC_LATE_UNHANDLED 00258 else: 00259 self.__judgeStatusCode = GID_JSC_HELD 00260 self.__judgeStatusText = "Ganga returned 'cancelled' or 'killed'" 00261 return 00262 00263 elif self.__comLevelCode == GID_CLC_BATCH: 00264 00265 self.__judgeFailCategory = GID_FCC_EARLY 00266 00267 elif self.__comLevelCode == GID_CLC_WORKER \ 00268 or self.__comLevelCode == GID_CLC_APPLICATION: 00269 00270 self.__judgeFailCategory = GID_FCC_EARLY 00271 if self.__glfTimeInterval > GetConfigValue("MaxTimeEarlyFails"): 00272 self.__judgeFailCategory = GID_FCC_LATE_UNHANDLED 00273 00274 elif self.__comLevelCode == GID_CLC_USER: 00275 00276 if self.__applStatusCode == GID_JSC_RETRY: 00277 self.__judgeStatusText = self.__applStatusText 00278 self.__judgeRetryArgs = self.__applStatusText 00279 self.__judgeFailCategory = GID_FCC_EARLY 00280 if self.__glfTimeInterval > GetConfigValue("MaxTimeEarlyFails"): 00281 self.__judgeFailCategory = GID_FCC_LATE_HANDLED 00282 else: 00283 self.__judgeStatusCode = self.__applStatusCode 00284 self.__judgeStatusText = self.__applStatusText 00285 self.__judgeRetryArgs = "" 00286 return 00287 00288 # If we reach this point then we have dealt with SUCCEEDED, FAILED and HOLD 00289 # All that's left are various types of failures and RETRY. 00290 # Make decision between FAILED and RETRY based on the error counts. 00291 00292 # Should never happen, but in case I break the logic sometime! 00293 if self.__judgeFailCategory == GID_FCC_NONE: 00294 Log(logger.ERROR,"GBSJobAnalyser: Program error: dealing with failure but category not set!") 00295 self.__judgeFailCategory = GID_FCC_EARLY # Have to pick something 00296 00297 self.__judgeStatusCode = GID_JSC_RETRY 00298 00299 fc = self.__judgeFailCategory 00300 max_early = int(GetConfigValue("MaxRetryEarlyFails")) 00301 max_late_handled = int(GetConfigValue("MaxRetryLateFailsHandled")) 00302 max_late_unhandled = int(GetConfigValue("MaxRetryLateFailsUnhandled")) 00303 job = self.__clientJob 00304 job_early = job.GetEarlyFailsCount() 00305 job_late_handled = job.GetLateHandledFailsCount() 00306 job_late_unhandled = job.GetLateUnhandledFailsCount() 00307 00308 if fc == GID_FCC_EARLY and job_early >= max_early: 00309 self.__judgeStatusCode = GID_JSC_FAILED 00310 self.__judgeStatusText = "EARLY fails limit of " + str(max_early) + " exceeded" 00311 00312 if fc == GID_FCC_LATE_HANDLED and job_late_handled >= max_late_handled: 00313 self.__judgeStatusCode = GID_JSC_FAILED 00314 self.__judgeStatusText = "LATE_HANDLED fails limit of " + str(max_late_handled) + " exceeded" 00315 00316 if fc == GID_FCC_LATE_UNHANDLED and job_late_unhandled >= max_late_unhandled: 00317 self.__judgeStatusCode = GID_JSC_FAILED 00318 self.__judgeStatusText = "LATE_UNHANDLED fails limit of " + str(max_late_unhandled) + " exceeded" 00319 def __WriteLoggingInfoFile(self,grid_middleware,grid_id):
| def python::GBSJobAnalyser::GBSJobAnalyser::__WriteLoggingInfoFile | ( | self, | ||
| grid_middleware, | ||||
| grid_id | ||||
| ) | [private] |
For EDG/GLITE submissions, produce a logging summary into 'gbs_grid_info.log'. Update __judgeStatusText for some classic GRID errors.
Definition at line 320 of file GBSJobAnalyser.py.
00320 : 00321 """For EDG/GLITE submissions, produce a logging summary into 'gbs_grid_info.log'. 00322 00323 Update __judgeStatusText for some classic GRID errors.""" 00324 00325 if not grid_id or not grid_middleware: return 00326 log_cmd = "glite-wms-job-logging-info -v 2 " + grid_id 00327 if grid_middleware == "EDG": log_cmd = "edg-job-get-logging-info -v 1 " + grid_id 00328 jobTryDir = self.__clientJob._GetTryOutputDir() 00329 grid_log_file_spec = jobTryDir + "/gbs_grid_info.log" 00330 inp = os.popen(log_cmd,"r") 00331 out = open(grid_log_file_spec,'w') 00332 00333 event = "" 00334 host = "" 00335 reason = "" 00336 result = "" 00337 timestamp = "" 00338 for line in inp: 00339 mo = re.search(r"Event:\s+(\S+)",line,re.IGNORECASE) 00340 if mo: event = mo.group(1) 00341 mo = re.search(r"- host\s+=\s+(.*)",line,re.IGNORECASE) 00342 if mo: host = mo.group(1) 00343 mo = re.search(r"- reason\s+=\s+(.*)",line,re.IGNORECASE) 00344 if mo: 00345 reason = mo.group(1) 00346 # Look for a few classic errors and record the first 00347 if not self.__judgeStatusText and re.search(r"(expire|maradona)",line,re.IGNORECASE): 00348 self.__judgeStatusText = "GRID error: " + reason 00349 mo = re.search(r"- (result|status_code)\s+=\s+(.*)",line,re.IGNORECASE) 00350 if mo: result = mo.group(2) 00351 mo = re.search(r"- timestamp\s+=\s+(.*)",line,re.IGNORECASE) 00352 if mo: 00353 timestamp = mo.group(1) 00354 out.write("%s %-12s %-10s %15s %s\n" % (timestamp,event,result,host,reason)) 00355 event = "" 00356 host = "" 00357 reason = "" 00358 result = "" 00359 timestamp = "" 00360 inp.close() 00361 out.close() 00362 00363 00364 00365 00366 00367 00368
Definition at line 33 of file GBSJobAnalyser.py.
Definition at line 36 of file GBSJobAnalyser.py.
Definition at line 37 of file GBSJobAnalyser.py.
Definition at line 38 of file GBSJobAnalyser.py.
Definition at line 39 of file GBSJobAnalyser.py.
Definition at line 40 of file GBSJobAnalyser.py.
Definition at line 41 of file GBSJobAnalyser.py.
Definition at line 44 of file GBSJobAnalyser.py.
Definition at line 45 of file GBSJobAnalyser.py.
Definition at line 46 of file GBSJobAnalyser.py.
Definition at line 47 of file GBSJobAnalyser.py.
1.5.4