[project @ Proof of concept!!!]
authorBrandon King <kingb@caltech.edu>
Sat, 17 Nov 2007 01:17:59 +0000 (01:17 +0000)
committerBrandon King <kingb@caltech.edu>
Sat, 17 Nov 2007 01:17:59 +0000 (01:17 +0000)
 * Now uses inotify to confirm gerald, bustard, and firecrest
   "finished.txt" files have been created! if not, fail!
 * Also goes back an reads the stderr log for known errors and
   reports failure if one is detected here.
 * Just needs to be generalized an hooked up to config generator
   as well as inserted into the run daemon!

bin/config_pipeline2.py

index 63919562aa567f931d6b493d9b473770c4b5131b..d421892cb095279d7cb4d141901dde3be114acb6 100644 (file)
@@ -22,19 +22,44 @@ class ConfigInfo:
     self.config_filepath = None
 
 
+####################################
+# inotify event processor
+
+s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
+s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
+s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
+
 class RunEvent(ProcessEvent):
 
+  def __init__(self):
+
+    self.run_status_dict = {'firecrest': False,
+                            'bustard': False,
+                            'gerald': False}
+
+    ProcessEvent.__init__(self)
+
   def process_IN_CREATE(self, event):
     fullpath = os.path.join(event.path, event.name)
     if s_finished.search(fullpath):
       logging.info("File Found: %s" % (fullpath))
+
+      if s_firecrest_finished.search(fullpath):
+        self.run_status_dict['firecrest'] = True
+      elif s_bustard_finished.search(fullpath):
+        self.run_status_dict['bustard'] = True
+      elif s_gerald_finished.search(fullpath):
+        self.run_status_dict['gerald'] = True
+      
     print "Create: %s" % (os.path.join(event.path, event.name))
 
   def process_IN_DELETE(self, event):
     print "Remove %s" % (os.path.join(event.path, event.name))
 
 #FLAGS
+# Config Step Error
 RUN_ABORT = 'abort'
+# Run Step Error
 RUN_FAILED = 'failed'
 
 
@@ -208,30 +233,41 @@ def config_stderr_handler(line, conf_info):
 
 
 #FIXME: Temperary hack
-f = open('pipeline_run.log', 'w')
+#f = open('pipeline_run.log', 'w')
 #ferr = open('pipeline_err.log', 'w')
 
 
 
-def pipeline_stdout_handler(line, conf_info):
-  """
-  Processes each line of output from running the pipeline
-  and stores useful information using the logging module
-
-  Loads useful information into conf_info as well, for future
-  use outside the function.
-
-  returns True if found condition that signifies success.
-  """
-
-  f.write(line + '\n')
-
-  return True
+#def pipeline_stdout_handler(line, conf_info):
+#  """
+#  Processes each line of output from running the pipeline
+#  and stores useful information using the logging module
+#
+#  Loads useful information into conf_info as well, for future
+#  use outside the function.
+#
+#  returns True if found condition that signifies success.
+#  """
+#
+#  f.write(line + '\n')
+#
+#  return True
 
 
 
 def pipeline_stderr_handler(line, conf_info):
   """
+  Processes each line of stderr from pipelien run
+  and stores useful information using the logging module
+
+  ##FIXME: Future feature (doesn't actually do this yet)
+  #Loads useful information into conf_info as well, for future
+  #use outside the function.
+
+  returns RUN_FAILED upon detecting failure;
+          #True on success message; (no clear success state)
+          False if neutral message
+            (i.e. doesn't signify failure or success)
   """
 
   if pl_stderr_ignore(line):
@@ -363,11 +399,14 @@ def run_pipeline(conf_info):
 
   # Change cwd to run_path
   os.chdir(conf_info.run_path)
+  stdout_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stdout.txt')
+  stderr_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stderr.txt')
 
   # Monitor file creation
   wm = WatchManager()
   mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
-  notifier = ThreadedNotifier(wm, RunEvent())
+  event = RunEvent()
+  notifier = ThreadedNotifier(wm, event)
   notifier.start()
   wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
 
@@ -381,8 +420,8 @@ def run_pipeline(conf_info):
   #                        stdout=subprocess.PIPE,
   #                        stderr=subprocess.PIPE)
 
-  fout = open('pipeline_run_stdout.txt', 'w')
-  ferr = open('pipeline_run_stderr.txt', 'w')
+  fout = open(stdout_filepath, 'w')
+  ferr = open(stderr_filepath, 'w')
 
   pipe = subprocess.Popen(['make',
                              '-j8',
@@ -390,16 +429,39 @@ def run_pipeline(conf_info):
                              stdout=fout,
                              stderr=ferr)
                              #shell=True)
+  # Wait for run to finish
   retcode = pipe.wait()
 
-  notifier.stop()
 
+  # Clean up
+  notifier.stop()
   fout.close()
   ferr.close()
 
-  #print ': %s' % (sts)
-    
-  status = (retcode == 0)
+  # Process stderr
+  ferr = open(stderr_filepath, 'r')
+
+  run_failed_stderr = False
+  for line in ferr:
+    err_status = pipeline_stderr_handler(line, conf_info)
+    if err_status == RUN_FAILED:
+      run_failed_stderr = True
+
+  ferr.close()
+
+  # Finished file check!
+  print 'RUN SUCCESS CHECK:'
+  for key, value in event.run_status_dict.items():
+    print '  %s: %s' % (key, value)
+
+  dstatus = event.run_status_dict
+
+  # Success or failure check
+  status = (retcode == 0) and \
+           run_failed_stderr is False and \
+           dstatus['firecrest'] is True and \
+           dstatus['bustard'] is True and \
+           dstatus['gerald'] is True
 
   return status
 
@@ -427,4 +489,4 @@ if __name__ == '__main__':
       print 'Pipeline run failed.'
 
   #FIXME: Temperary hack
-  f.close()
+  #f.close()