From: Brandon King Date: Sat, 17 Nov 2007 01:17:59 +0000 (+0000) Subject: [project @ Proof of concept!!!] X-Git-Tag: 0.1.0~65 X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=commitdiff_plain;h=95fb343367ce9089f42e0ff92ed50081562a3b6f [project @ Proof of concept!!!] * Now uses inotify to confirm gerald, bustard, and firecrest "finished.txt" files have been created! if not, fail! * Also goes back an reads the stderr log for known errors and reports failure if one is detected here. * Just needs to be generalized an hooked up to config generator as well as inserted into the run daemon! --- diff --git a/bin/config_pipeline2.py b/bin/config_pipeline2.py index 6391956..d421892 100644 --- a/bin/config_pipeline2.py +++ b/bin/config_pipeline2.py @@ -22,19 +22,44 @@ class ConfigInfo: self.config_filepath = None +#################################### +# inotify event processor + +s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt') +s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt') +s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt') + class RunEvent(ProcessEvent): + def __init__(self): + + self.run_status_dict = {'firecrest': False, + 'bustard': False, + 'gerald': False} + + ProcessEvent.__init__(self) + def process_IN_CREATE(self, event): fullpath = os.path.join(event.path, event.name) if s_finished.search(fullpath): logging.info("File Found: %s" % (fullpath)) + + if s_firecrest_finished.search(fullpath): + self.run_status_dict['firecrest'] = True + elif s_bustard_finished.search(fullpath): + self.run_status_dict['bustard'] = True + elif s_gerald_finished.search(fullpath): + self.run_status_dict['gerald'] = True + print "Create: %s" % (os.path.join(event.path, event.name)) def process_IN_DELETE(self, event): print "Remove %s" % (os.path.join(event.path, event.name)) #FLAGS +# Config Step Error RUN_ABORT = 'abort' +# Run Step Error RUN_FAILED = 'failed' @@ -208,30 +233,41 @@ def config_stderr_handler(line, conf_info): #FIXME: Temperary hack -f = open('pipeline_run.log', 'w') +#f = open('pipeline_run.log', 'w') #ferr = open('pipeline_err.log', 'w') -def pipeline_stdout_handler(line, conf_info): - """ - Processes each line of output from running the pipeline - and stores useful information using the logging module - - Loads useful information into conf_info as well, for future - use outside the function. - - returns True if found condition that signifies success. - """ - - f.write(line + '\n') - - return True +#def pipeline_stdout_handler(line, conf_info): +# """ +# Processes each line of output from running the pipeline +# and stores useful information using the logging module +# +# Loads useful information into conf_info as well, for future +# use outside the function. +# +# returns True if found condition that signifies success. +# """ +# +# f.write(line + '\n') +# +# return True def pipeline_stderr_handler(line, conf_info): """ + Processes each line of stderr from pipelien run + and stores useful information using the logging module + + ##FIXME: Future feature (doesn't actually do this yet) + #Loads useful information into conf_info as well, for future + #use outside the function. + + returns RUN_FAILED upon detecting failure; + #True on success message; (no clear success state) + False if neutral message + (i.e. doesn't signify failure or success) """ if pl_stderr_ignore(line): @@ -363,11 +399,14 @@ def run_pipeline(conf_info): # Change cwd to run_path os.chdir(conf_info.run_path) + stdout_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stdout.txt') + stderr_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stderr.txt') # Monitor file creation wm = WatchManager() mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE - notifier = ThreadedNotifier(wm, RunEvent()) + event = RunEvent() + notifier = ThreadedNotifier(wm, event) notifier.start() wdd = wm.add_watch(conf_info.run_path, mask, rec=True) @@ -381,8 +420,8 @@ def run_pipeline(conf_info): # stdout=subprocess.PIPE, # stderr=subprocess.PIPE) - fout = open('pipeline_run_stdout.txt', 'w') - ferr = open('pipeline_run_stderr.txt', 'w') + fout = open(stdout_filepath, 'w') + ferr = open(stderr_filepath, 'w') pipe = subprocess.Popen(['make', '-j8', @@ -390,16 +429,39 @@ def run_pipeline(conf_info): stdout=fout, stderr=ferr) #shell=True) + # Wait for run to finish retcode = pipe.wait() - notifier.stop() + # Clean up + notifier.stop() fout.close() ferr.close() - #print ': %s' % (sts) - - status = (retcode == 0) + # Process stderr + ferr = open(stderr_filepath, 'r') + + run_failed_stderr = False + for line in ferr: + err_status = pipeline_stderr_handler(line, conf_info) + if err_status == RUN_FAILED: + run_failed_stderr = True + + ferr.close() + + # Finished file check! + print 'RUN SUCCESS CHECK:' + for key, value in event.run_status_dict.items(): + print ' %s: %s' % (key, value) + + dstatus = event.run_status_dict + + # Success or failure check + status = (retcode == 0) and \ + run_failed_stderr is False and \ + dstatus['firecrest'] is True and \ + dstatus['bustard'] is True and \ + dstatus['gerald'] is True return status @@ -427,4 +489,4 @@ if __name__ == '__main__': print 'Pipeline run failed.' #FIXME: Temperary hack - f.close() + #f.close()