[project @ Work toward pipeline monitor code (WARNING: testing version)]
authorBrandon King <kingb@caltech.edu>
Thu, 15 Nov 2007 21:16:47 +0000 (21:16 +0000)
committerBrandon King <kingb@caltech.edu>
Thu, 15 Nov 2007 21:16:47 +0000 (21:16 +0000)
 * Warning, this code is a steping stone to monitoring a running pipeline.
   * It only runs with tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104
   * There are also important FIXME that should be looked at before using this version of the code.
 * This patch is needed for all future patches to work.

bin/config_pipeline.py

index 1d49a0168107b067f18a4b00a3037035d14dc295..22d028002dcc4f20f93edbbd6a0a60e6825ed26c 100644 (file)
@@ -1,8 +1,10 @@
 #!/usr/bin/python
 import subprocess
+import logging
+import time
 import re
 import os
-import logging
+
 
 logging.basicConfig(level=logging.DEBUG,
                     format='%(asctime)s %(levelname)-8s %(message)s',
@@ -19,12 +21,17 @@ class ConfigInfo:
 
 #FLAGS
 RUN_ABORT = 'abort'
+RUN_FAILED = 'failed'
+
 
+#####################################
+# Configure Step (goat_pipeline.py)
 #Info
 s_start = re.compile('Starting Genome Analyzer Pipeline')
 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
 s_generating = re.compile('Generating journals, Makefiles and parameter files')
 s_seq_folder = re.compile('^Sequence folder: ')
+s_seq_folder_sub = re.compile('want to make ')
 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
 
 #Errors
@@ -36,6 +43,50 @@ s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
 #Ignore
 s_skip = re.compile('s_[0-8]_[0-9]+')
 
+##########################################
+# Pipeline Run Step (make -j8 recursive)
+
+##Info
+
+
+##Errors
+s_make_error = re.compile('^make[\S\s]+Error')
+s_no_gnuplot = re.compile('gnuplot: command not found')
+s_no_convert = re.compile('^Can\'t exec "convert"')
+
+##Ignore
+PL_STDERR_IGNORE_LIST = []
+# Info: PF 11802
+PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
+# About to analyse intensity file s_4_0101_sig2.txt
+PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
+# Will send output to standard output
+PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
+# Found 31877 clusters
+PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
+# Will use quality criterion ((CHASTITY>=0.6)
+PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
+# Quality criterion translated to (($F[5]>=0.6))
+PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
+# opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
+#  AND
+# opened s_4_0103_qhg.txt
+PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
+
+
+def pl_stderr_ignore(line):
+  """
+  Searches lines for lines to ignore (i.e. not to log)
+
+  returns True if line should be ignored
+  returns False if line should NOT be ignored
+  """
+  for s in PL_STDERR_IGNORE_LIST:
+    if s.search(line):
+      return True
+  return False
+
+
 def config_stdout_handler(line, conf_info):
   """
   Processes each line of output from GOAT
@@ -47,24 +98,52 @@ def config_stdout_handler(line, conf_info):
   returns True if found condition that signifies success.
   """
 
-  # Irrelevant line
+  # Skip irrelevant line (without logging)
   if s_skip.search(line):
     pass
+
+  # Detect invalid command-line arguments
   elif s_invalid_cmdline.search(line):
     logging.error("Invalid commandline options!")
+
+  # Detect starting of configuration
   elif s_start.search(line):
     logging.info('START: Configuring pipeline')
+
+  # Detect it made it past invalid arguments
   elif s_gerald.search(line):
     logging.info('Running make now')
+
+  # Detect that make files have been generated (based on output)
   elif s_generating.search(line):
     logging.info('Make files generted')
     return True
+
+  # Capture run directory
   elif s_seq_folder.search(line):
-    mo = s_seq_folder.search(line)
-    conf_info.bustard_path = line[mo.end():]
-    conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
+    mo = s_seq_folder_sub.search(line)
+    #Output changed when using --tiles=<tiles>
+    # at least in pipeline v0.3.0b2
+    if mo:
+      firecrest_bustard_gerald_makefile = line[mo.end():]
+      firecrest_bustard_gerald, junk = \
+                                os.path.split(firecrest_bustard_gerald_makefile)
+      firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
+      firecrest, junk = os.path.split(firecrest_bustard)
+
+      conf_info.bustard_path = firecrest_bustard
+      conf_info.run_path = firecrest
+    
+    #Standard output handling
+    else:
+      print 'Sequence line:', line
+      mo = s_seq_folder.search(line)
+      conf_info.bustard_path = line[mo.end():]
+      conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
+
+  # Log all other output for debugging purposes
   else:
-    logging.warning('How to handle: %s' % (line))
+    logging.warning('CONF:?: %s' % (line))
 
   return False
 
@@ -78,28 +157,39 @@ def config_stderr_handler(line, conf_info):
   Loads useful information into conf_info as well, for future
   use outside the function.
 
-  returns RUN_ABORT upon detecting failure; True on success message
+  returns RUN_ABORT upon detecting failure;
+          True on success message;
+          False if neutral message
+            (i.e. doesn't signify failure or success)
   """
 
+  # Detect invalid species directory error
   if s_species_dir_err.search(line):
     logging.error(line)
     return RUN_ABORT
+  # Detect goat_pipeline.py traceback
   elif s_goat_traceb.search(line):
     logging.error("Goat config script died, traceback in debug output")
     return RUN_ABORT
+  # Detect indication of successful configuration (from stderr; odd, but ok)
   elif s_stderr_taskcomplete.search(line):
     logging.info('Configure step successful (from: stderr)')
     return True
+  # Log all other output as debug output
   else:
-    logging.debug('STDERR: How to handle: %s' % (line))
+    logging.debug('CONF:STDERR:?: %s' % (line))
 
+  # Neutral (not failure; nor success)
   return False
 
+
 #FIXME: Temperary hack
-f = open('pipeline_run.log', 'w')
-ferr = open('pipeline_err.log', 'w')
+f = open('pipeline_run.log.1', 'w')
+#ferr = open('pipeline_err.log.1', 'w')
+
 
-def pipeline_handler(line, conf_info):
+
+def pipeline_stdout_handler(line, conf_info):
   """
   Processes each line of output from running the pipeline
   and stores useful information using the logging module
@@ -115,6 +205,28 @@ def pipeline_handler(line, conf_info):
   return True
 
 
+
+def pipeline_stderr_handler(line, conf_info):
+  """
+  """
+
+  if pl_stderr_ignore(line):
+    pass
+  elif s_make_error.search(line):
+    logging.error("make error detected; run failed")
+    return RUN_FAILED
+  elif s_no_gnuplot.search(line):
+    logging.error("gnuplot not found")
+    return RUN_FAILED
+  elif s_no_convert.search(line):
+    logging.error("imagemagick's convert command not found")
+    return RUN_FAILED
+  else:
+    logging.debug('PIPE:STDERR:?: %s' % (line))
+
+  return False
+
+
 def configure(conf_info):
   """
   Attempts to configure the GA pipeline using goat.
@@ -140,25 +252,27 @@ def configure(conf_info):
   #                        stdout=subprocess.PIPE,
   #                        stderr=subprocess.PIPE)
 
-  #Not a test; actual run attempt.
-  pipe = subprocess.Popen(['goat_pipeline.py',
-                    '--GERALD=%s' % (conf_info.config_filepath),
-                           '--make',
-                           '.'],
-                          stdout=subprocess.PIPE,
-                          stderr=subprocess.PIPE)
-
-  # CONTINUE HERE
-  #FIXME: this only does a run on 5 tiles on lane 4
+  ##########################
+  # Run configuration step
+  #   Not a test; actual configure attempt.
   #pipe = subprocess.Popen(['goat_pipeline.py',
   #                  '--GERALD=%s' % (conf_info.config_filepath),
-  #                         '--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_103,s_4_104',
   #                         '--make',
   #                         '.'],
   #                        stdout=subprocess.PIPE,
   #                        stderr=subprocess.PIPE)
 
-  #Process stdout
+  # CONTINUE HERE
+  #FIXME: this only does a run on 5 tiles on lane 4
+  pipe = subprocess.Popen(['goat_pipeline.py',
+                    '--GERALD=%s' % (conf_info.config_filepath),
+                           '--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104',
+                           '--make',
+                           '.'],
+                          stdout=subprocess.PIPE,
+                          stderr=subprocess.PIPE)
+  ##################
+  # Process stdout
   stdout_line = pipe.stdout.readline()
 
   complete = False
@@ -173,7 +287,7 @@ def configure(conf_info):
   if error_code:
     logging.error('Recieved error_code: %s' % (error_code))
   else:
-    logging.info ('We are go for launch!')
+    logging.info('We are go for launch!')
 
   #Process stderr
   stderr_line = pipe.stderr.readline()
@@ -211,7 +325,9 @@ def configure(conf_info):
 
 
 def run_pipeline(conf_info):
-
+  """
+  Run the pipeline and monitor status.
+  """
   # Fail if the run_path doesn't actually exist
   if not os.path.exists(conf_info.run_path):
     logging.error('Run path does not exist: %s' \
@@ -221,6 +337,9 @@ def run_pipeline(conf_info):
   # Change cwd to run_path
   os.chdir(conf_info.run_path)
 
+  # Log pipeline starting
+  logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
+  
   # Start the pipeline (and hide!)
   pipe = subprocess.Popen(['make',
                            '-j8',
@@ -232,14 +351,41 @@ def run_pipeline(conf_info):
 
   complete = False
   while line != '':
-    if pipeline_handler(line, conf_info):
+    if pipeline_stdout_handler(line, conf_info):
       complete = True
     line = pipe.stdout.readline()
 
   error_code = pipe.wait()
 
-  ferr.write(pipe.stderr.read())
-  ferr.close()
+  #ferr.write(pipe.stderr.read())
+  #ferr.close()
+
+  stderr_line = pipe.stderr.readline()
+
+  run_succeded = False
+  run_failed = False
+  while stderr_line != '':
+    stderr_status = pipeline_stderr_handler(stderr_line, conf_info)
+    if stderr_status is True:
+      run_succeded = True
+    if stderr_status == RUN_FAILED:
+      run_failed = True
+    stderr_line = pipe.stderr.readline()
+
+  ###DEBUG###
+  print 'RUN STATUS: expect: True, True, True, True'
+  print '            Status:',
+  print complete is True, error_code == 0,
+  print run_succeded is True, run_failed is False
+  ###END_DEBUG###
+  
+  status = complete is True and \
+           error_code == 0 and \
+           run_succeded is True and \
+           run_failed is False
+
+  return status
+
 
 
 if __name__ == '__main__':