[project @ Work toward pipeline monitor code (WARNING: testing version)]
authorBrandon King <kingb@caltech.edu>
Thu, 15 Nov 2007 21:16:47 +0000 (21:16 +0000)
committerBrandon King <kingb@caltech.edu>
Thu, 15 Nov 2007 21:16:47 +0000 (21:16 +0000)
 * Warning, this code is a steping stone to monitoring a running pipeline.
   * It only runs with tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104
   * There are also important FIXME that should be looked at before using this version of the code.
 * This patch is needed for all future patches to work.

bin/config_pipeline.py

index 1d49a0168107b067f18a4b00a3037035d14dc295..22d028002dcc4f20f93edbbd6a0a60e6825ed26c 100644 (file)
@@ -1,8 +1,10 @@
 #!/usr/bin/python
 import subprocess
 #!/usr/bin/python
 import subprocess
+import logging
+import time
 import re
 import os
 import re
 import os
-import logging
+
 
 logging.basicConfig(level=logging.DEBUG,
                     format='%(asctime)s %(levelname)-8s %(message)s',
 
 logging.basicConfig(level=logging.DEBUG,
                     format='%(asctime)s %(levelname)-8s %(message)s',
@@ -19,12 +21,17 @@ class ConfigInfo:
 
 #FLAGS
 RUN_ABORT = 'abort'
 
 #FLAGS
 RUN_ABORT = 'abort'
+RUN_FAILED = 'failed'
+
 
 
+#####################################
+# Configure Step (goat_pipeline.py)
 #Info
 s_start = re.compile('Starting Genome Analyzer Pipeline')
 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
 s_generating = re.compile('Generating journals, Makefiles and parameter files')
 s_seq_folder = re.compile('^Sequence folder: ')
 #Info
 s_start = re.compile('Starting Genome Analyzer Pipeline')
 s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+")
 s_generating = re.compile('Generating journals, Makefiles and parameter files')
 s_seq_folder = re.compile('^Sequence folder: ')
+s_seq_folder_sub = re.compile('want to make ')
 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
 
 #Errors
 s_stderr_taskcomplete = re.compile('^Task complete, exiting')
 
 #Errors
@@ -36,6 +43,50 @@ s_goat_traceb = re.compile("^Traceback \(most recent call last\):")
 #Ignore
 s_skip = re.compile('s_[0-8]_[0-9]+')
 
 #Ignore
 s_skip = re.compile('s_[0-8]_[0-9]+')
 
+##########################################
+# Pipeline Run Step (make -j8 recursive)
+
+##Info
+
+
+##Errors
+s_make_error = re.compile('^make[\S\s]+Error')
+s_no_gnuplot = re.compile('gnuplot: command not found')
+s_no_convert = re.compile('^Can\'t exec "convert"')
+
+##Ignore
+PL_STDERR_IGNORE_LIST = []
+# Info: PF 11802
+PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') )
+# About to analyse intensity file s_4_0101_sig2.txt
+PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') )
+# Will send output to standard output
+PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') )
+# Found 31877 clusters
+PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') )
+# Will use quality criterion ((CHASTITY>=0.6)
+PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') )
+# Quality criterion translated to (($F[5]>=0.6))
+PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') )
+# opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt
+#  AND
+# opened s_4_0103_qhg.txt
+PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') )
+
+
+def pl_stderr_ignore(line):
+  """
+  Searches lines for lines to ignore (i.e. not to log)
+
+  returns True if line should be ignored
+  returns False if line should NOT be ignored
+  """
+  for s in PL_STDERR_IGNORE_LIST:
+    if s.search(line):
+      return True
+  return False
+
+
 def config_stdout_handler(line, conf_info):
   """
   Processes each line of output from GOAT
 def config_stdout_handler(line, conf_info):
   """
   Processes each line of output from GOAT
@@ -47,24 +98,52 @@ def config_stdout_handler(line, conf_info):
   returns True if found condition that signifies success.
   """
 
   returns True if found condition that signifies success.
   """
 
-  # Irrelevant line
+  # Skip irrelevant line (without logging)
   if s_skip.search(line):
     pass
   if s_skip.search(line):
     pass
+
+  # Detect invalid command-line arguments
   elif s_invalid_cmdline.search(line):
     logging.error("Invalid commandline options!")
   elif s_invalid_cmdline.search(line):
     logging.error("Invalid commandline options!")
+
+  # Detect starting of configuration
   elif s_start.search(line):
     logging.info('START: Configuring pipeline')
   elif s_start.search(line):
     logging.info('START: Configuring pipeline')
+
+  # Detect it made it past invalid arguments
   elif s_gerald.search(line):
     logging.info('Running make now')
   elif s_gerald.search(line):
     logging.info('Running make now')
+
+  # Detect that make files have been generated (based on output)
   elif s_generating.search(line):
     logging.info('Make files generted')
     return True
   elif s_generating.search(line):
     logging.info('Make files generted')
     return True
+
+  # Capture run directory
   elif s_seq_folder.search(line):
   elif s_seq_folder.search(line):
-    mo = s_seq_folder.search(line)
-    conf_info.bustard_path = line[mo.end():]
-    conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
+    mo = s_seq_folder_sub.search(line)
+    #Output changed when using --tiles=<tiles>
+    # at least in pipeline v0.3.0b2
+    if mo:
+      firecrest_bustard_gerald_makefile = line[mo.end():]
+      firecrest_bustard_gerald, junk = \
+                                os.path.split(firecrest_bustard_gerald_makefile)
+      firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald)
+      firecrest, junk = os.path.split(firecrest_bustard)
+
+      conf_info.bustard_path = firecrest_bustard
+      conf_info.run_path = firecrest
+    
+    #Standard output handling
+    else:
+      print 'Sequence line:', line
+      mo = s_seq_folder.search(line)
+      conf_info.bustard_path = line[mo.end():]
+      conf_info.run_path, temp = os.path.split(conf_info.bustard_path)
+
+  # Log all other output for debugging purposes
   else:
   else:
-    logging.warning('How to handle: %s' % (line))
+    logging.warning('CONF:?: %s' % (line))
 
   return False
 
 
   return False
 
@@ -78,28 +157,39 @@ def config_stderr_handler(line, conf_info):
   Loads useful information into conf_info as well, for future
   use outside the function.
 
   Loads useful information into conf_info as well, for future
   use outside the function.
 
-  returns RUN_ABORT upon detecting failure; True on success message
+  returns RUN_ABORT upon detecting failure;
+          True on success message;
+          False if neutral message
+            (i.e. doesn't signify failure or success)
   """
 
   """
 
+  # Detect invalid species directory error
   if s_species_dir_err.search(line):
     logging.error(line)
     return RUN_ABORT
   if s_species_dir_err.search(line):
     logging.error(line)
     return RUN_ABORT
+  # Detect goat_pipeline.py traceback
   elif s_goat_traceb.search(line):
     logging.error("Goat config script died, traceback in debug output")
     return RUN_ABORT
   elif s_goat_traceb.search(line):
     logging.error("Goat config script died, traceback in debug output")
     return RUN_ABORT
+  # Detect indication of successful configuration (from stderr; odd, but ok)
   elif s_stderr_taskcomplete.search(line):
     logging.info('Configure step successful (from: stderr)')
     return True
   elif s_stderr_taskcomplete.search(line):
     logging.info('Configure step successful (from: stderr)')
     return True
+  # Log all other output as debug output
   else:
   else:
-    logging.debug('STDERR: How to handle: %s' % (line))
+    logging.debug('CONF:STDERR:?: %s' % (line))
 
 
+  # Neutral (not failure; nor success)
   return False
 
   return False
 
+
 #FIXME: Temperary hack
 #FIXME: Temperary hack
-f = open('pipeline_run.log', 'w')
-ferr = open('pipeline_err.log', 'w')
+f = open('pipeline_run.log.1', 'w')
+#ferr = open('pipeline_err.log.1', 'w')
+
 
 
-def pipeline_handler(line, conf_info):
+
+def pipeline_stdout_handler(line, conf_info):
   """
   Processes each line of output from running the pipeline
   and stores useful information using the logging module
   """
   Processes each line of output from running the pipeline
   and stores useful information using the logging module
@@ -115,6 +205,28 @@ def pipeline_handler(line, conf_info):
   return True
 
 
   return True
 
 
+
+def pipeline_stderr_handler(line, conf_info):
+  """
+  """
+
+  if pl_stderr_ignore(line):
+    pass
+  elif s_make_error.search(line):
+    logging.error("make error detected; run failed")
+    return RUN_FAILED
+  elif s_no_gnuplot.search(line):
+    logging.error("gnuplot not found")
+    return RUN_FAILED
+  elif s_no_convert.search(line):
+    logging.error("imagemagick's convert command not found")
+    return RUN_FAILED
+  else:
+    logging.debug('PIPE:STDERR:?: %s' % (line))
+
+  return False
+
+
 def configure(conf_info):
   """
   Attempts to configure the GA pipeline using goat.
 def configure(conf_info):
   """
   Attempts to configure the GA pipeline using goat.
@@ -140,25 +252,27 @@ def configure(conf_info):
   #                        stdout=subprocess.PIPE,
   #                        stderr=subprocess.PIPE)
 
   #                        stdout=subprocess.PIPE,
   #                        stderr=subprocess.PIPE)
 
-  #Not a test; actual run attempt.
-  pipe = subprocess.Popen(['goat_pipeline.py',
-                    '--GERALD=%s' % (conf_info.config_filepath),
-                           '--make',
-                           '.'],
-                          stdout=subprocess.PIPE,
-                          stderr=subprocess.PIPE)
-
-  # CONTINUE HERE
-  #FIXME: this only does a run on 5 tiles on lane 4
+  ##########################
+  # Run configuration step
+  #   Not a test; actual configure attempt.
   #pipe = subprocess.Popen(['goat_pipeline.py',
   #                  '--GERALD=%s' % (conf_info.config_filepath),
   #pipe = subprocess.Popen(['goat_pipeline.py',
   #                  '--GERALD=%s' % (conf_info.config_filepath),
-  #                         '--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_103,s_4_104',
   #                         '--make',
   #                         '.'],
   #                        stdout=subprocess.PIPE,
   #                        stderr=subprocess.PIPE)
 
   #                         '--make',
   #                         '.'],
   #                        stdout=subprocess.PIPE,
   #                        stderr=subprocess.PIPE)
 
-  #Process stdout
+  # CONTINUE HERE
+  #FIXME: this only does a run on 5 tiles on lane 4
+  pipe = subprocess.Popen(['goat_pipeline.py',
+                    '--GERALD=%s' % (conf_info.config_filepath),
+                           '--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104',
+                           '--make',
+                           '.'],
+                          stdout=subprocess.PIPE,
+                          stderr=subprocess.PIPE)
+  ##################
+  # Process stdout
   stdout_line = pipe.stdout.readline()
 
   complete = False
   stdout_line = pipe.stdout.readline()
 
   complete = False
@@ -173,7 +287,7 @@ def configure(conf_info):
   if error_code:
     logging.error('Recieved error_code: %s' % (error_code))
   else:
   if error_code:
     logging.error('Recieved error_code: %s' % (error_code))
   else:
-    logging.info ('We are go for launch!')
+    logging.info('We are go for launch!')
 
   #Process stderr
   stderr_line = pipe.stderr.readline()
 
   #Process stderr
   stderr_line = pipe.stderr.readline()
@@ -211,7 +325,9 @@ def configure(conf_info):
 
 
 def run_pipeline(conf_info):
 
 
 def run_pipeline(conf_info):
-
+  """
+  Run the pipeline and monitor status.
+  """
   # Fail if the run_path doesn't actually exist
   if not os.path.exists(conf_info.run_path):
     logging.error('Run path does not exist: %s' \
   # Fail if the run_path doesn't actually exist
   if not os.path.exists(conf_info.run_path):
     logging.error('Run path does not exist: %s' \
@@ -221,6 +337,9 @@ def run_pipeline(conf_info):
   # Change cwd to run_path
   os.chdir(conf_info.run_path)
 
   # Change cwd to run_path
   os.chdir(conf_info.run_path)
 
+  # Log pipeline starting
+  logging.info('STARTING PIPELINE @ %s' % (time.ctime()))
+  
   # Start the pipeline (and hide!)
   pipe = subprocess.Popen(['make',
                            '-j8',
   # Start the pipeline (and hide!)
   pipe = subprocess.Popen(['make',
                            '-j8',
@@ -232,14 +351,41 @@ def run_pipeline(conf_info):
 
   complete = False
   while line != '':
 
   complete = False
   while line != '':
-    if pipeline_handler(line, conf_info):
+    if pipeline_stdout_handler(line, conf_info):
       complete = True
     line = pipe.stdout.readline()
 
   error_code = pipe.wait()
 
       complete = True
     line = pipe.stdout.readline()
 
   error_code = pipe.wait()
 
-  ferr.write(pipe.stderr.read())
-  ferr.close()
+  #ferr.write(pipe.stderr.read())
+  #ferr.close()
+
+  stderr_line = pipe.stderr.readline()
+
+  run_succeded = False
+  run_failed = False
+  while stderr_line != '':
+    stderr_status = pipeline_stderr_handler(stderr_line, conf_info)
+    if stderr_status is True:
+      run_succeded = True
+    if stderr_status == RUN_FAILED:
+      run_failed = True
+    stderr_line = pipe.stderr.readline()
+
+  ###DEBUG###
+  print 'RUN STATUS: expect: True, True, True, True'
+  print '            Status:',
+  print complete is True, error_code == 0,
+  print run_succeded is True, run_failed is False
+  ###END_DEBUG###
+  
+  status = complete is True and \
+           error_code == 0 and \
+           run_succeded is True and \
+           run_failed is False
+
+  return status
+
 
 
 if __name__ == '__main__':
 
 
 if __name__ == '__main__':