From 76f18f2817abb613b9056488568db6f3746f75e0 Mon Sep 17 00:00:00 2001 From: Brandon King Date: Thu, 15 Nov 2007 21:16:47 +0000 Subject: [PATCH] [project @ Work toward pipeline monitor code (WARNING: testing version)] * Warning, this code is a steping stone to monitoring a running pipeline. * It only runs with tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104 * There are also important FIXME that should be looked at before using this version of the code. * This patch is needed for all future patches to work. --- bin/config_pipeline.py | 202 +++++++++++++++++++++++++++++++++++------ 1 file changed, 174 insertions(+), 28 deletions(-) diff --git a/bin/config_pipeline.py b/bin/config_pipeline.py index 1d49a01..22d0280 100644 --- a/bin/config_pipeline.py +++ b/bin/config_pipeline.py @@ -1,8 +1,10 @@ #!/usr/bin/python import subprocess +import logging +import time import re import os -import logging + logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)-8s %(message)s', @@ -19,12 +21,17 @@ class ConfigInfo: #FLAGS RUN_ABORT = 'abort' +RUN_FAILED = 'failed' + +##################################### +# Configure Step (goat_pipeline.py) #Info s_start = re.compile('Starting Genome Analyzer Pipeline') s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+") s_generating = re.compile('Generating journals, Makefiles and parameter files') s_seq_folder = re.compile('^Sequence folder: ') +s_seq_folder_sub = re.compile('want to make ') s_stderr_taskcomplete = re.compile('^Task complete, exiting') #Errors @@ -36,6 +43,50 @@ s_goat_traceb = re.compile("^Traceback \(most recent call last\):") #Ignore s_skip = re.compile('s_[0-8]_[0-9]+') +########################################## +# Pipeline Run Step (make -j8 recursive) + +##Info + + +##Errors +s_make_error = re.compile('^make[\S\s]+Error') +s_no_gnuplot = re.compile('gnuplot: command not found') +s_no_convert = re.compile('^Can\'t exec "convert"') + +##Ignore +PL_STDERR_IGNORE_LIST = [] +# Info: PF 11802 +PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') ) +# About to analyse intensity file s_4_0101_sig2.txt +PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') ) +# Will send output to standard output +PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') ) +# Found 31877 clusters +PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') ) +# Will use quality criterion ((CHASTITY>=0.6) +PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') ) +# Quality criterion translated to (($F[5]>=0.6)) +PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') ) +# opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt +# AND +# opened s_4_0103_qhg.txt +PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') ) + + +def pl_stderr_ignore(line): + """ + Searches lines for lines to ignore (i.e. not to log) + + returns True if line should be ignored + returns False if line should NOT be ignored + """ + for s in PL_STDERR_IGNORE_LIST: + if s.search(line): + return True + return False + + def config_stdout_handler(line, conf_info): """ Processes each line of output from GOAT @@ -47,24 +98,52 @@ def config_stdout_handler(line, conf_info): returns True if found condition that signifies success. """ - # Irrelevant line + # Skip irrelevant line (without logging) if s_skip.search(line): pass + + # Detect invalid command-line arguments elif s_invalid_cmdline.search(line): logging.error("Invalid commandline options!") + + # Detect starting of configuration elif s_start.search(line): logging.info('START: Configuring pipeline') + + # Detect it made it past invalid arguments elif s_gerald.search(line): logging.info('Running make now') + + # Detect that make files have been generated (based on output) elif s_generating.search(line): logging.info('Make files generted') return True + + # Capture run directory elif s_seq_folder.search(line): - mo = s_seq_folder.search(line) - conf_info.bustard_path = line[mo.end():] - conf_info.run_path, temp = os.path.split(conf_info.bustard_path) + mo = s_seq_folder_sub.search(line) + #Output changed when using --tiles= + # at least in pipeline v0.3.0b2 + if mo: + firecrest_bustard_gerald_makefile = line[mo.end():] + firecrest_bustard_gerald, junk = \ + os.path.split(firecrest_bustard_gerald_makefile) + firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald) + firecrest, junk = os.path.split(firecrest_bustard) + + conf_info.bustard_path = firecrest_bustard + conf_info.run_path = firecrest + + #Standard output handling + else: + print 'Sequence line:', line + mo = s_seq_folder.search(line) + conf_info.bustard_path = line[mo.end():] + conf_info.run_path, temp = os.path.split(conf_info.bustard_path) + + # Log all other output for debugging purposes else: - logging.warning('How to handle: %s' % (line)) + logging.warning('CONF:?: %s' % (line)) return False @@ -78,28 +157,39 @@ def config_stderr_handler(line, conf_info): Loads useful information into conf_info as well, for future use outside the function. - returns RUN_ABORT upon detecting failure; True on success message + returns RUN_ABORT upon detecting failure; + True on success message; + False if neutral message + (i.e. doesn't signify failure or success) """ + # Detect invalid species directory error if s_species_dir_err.search(line): logging.error(line) return RUN_ABORT + # Detect goat_pipeline.py traceback elif s_goat_traceb.search(line): logging.error("Goat config script died, traceback in debug output") return RUN_ABORT + # Detect indication of successful configuration (from stderr; odd, but ok) elif s_stderr_taskcomplete.search(line): logging.info('Configure step successful (from: stderr)') return True + # Log all other output as debug output else: - logging.debug('STDERR: How to handle: %s' % (line)) + logging.debug('CONF:STDERR:?: %s' % (line)) + # Neutral (not failure; nor success) return False + #FIXME: Temperary hack -f = open('pipeline_run.log', 'w') -ferr = open('pipeline_err.log', 'w') +f = open('pipeline_run.log.1', 'w') +#ferr = open('pipeline_err.log.1', 'w') + -def pipeline_handler(line, conf_info): + +def pipeline_stdout_handler(line, conf_info): """ Processes each line of output from running the pipeline and stores useful information using the logging module @@ -115,6 +205,28 @@ def pipeline_handler(line, conf_info): return True + +def pipeline_stderr_handler(line, conf_info): + """ + """ + + if pl_stderr_ignore(line): + pass + elif s_make_error.search(line): + logging.error("make error detected; run failed") + return RUN_FAILED + elif s_no_gnuplot.search(line): + logging.error("gnuplot not found") + return RUN_FAILED + elif s_no_convert.search(line): + logging.error("imagemagick's convert command not found") + return RUN_FAILED + else: + logging.debug('PIPE:STDERR:?: %s' % (line)) + + return False + + def configure(conf_info): """ Attempts to configure the GA pipeline using goat. @@ -140,25 +252,27 @@ def configure(conf_info): # stdout=subprocess.PIPE, # stderr=subprocess.PIPE) - #Not a test; actual run attempt. - pipe = subprocess.Popen(['goat_pipeline.py', - '--GERALD=%s' % (conf_info.config_filepath), - '--make', - '.'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - - # CONTINUE HERE - #FIXME: this only does a run on 5 tiles on lane 4 + ########################## + # Run configuration step + # Not a test; actual configure attempt. #pipe = subprocess.Popen(['goat_pipeline.py', # '--GERALD=%s' % (conf_info.config_filepath), - # '--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_103,s_4_104', # '--make', # '.'], # stdout=subprocess.PIPE, # stderr=subprocess.PIPE) - #Process stdout + # CONTINUE HERE + #FIXME: this only does a run on 5 tiles on lane 4 + pipe = subprocess.Popen(['goat_pipeline.py', + '--GERALD=%s' % (conf_info.config_filepath), + '--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104', + '--make', + '.'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + ################## + # Process stdout stdout_line = pipe.stdout.readline() complete = False @@ -173,7 +287,7 @@ def configure(conf_info): if error_code: logging.error('Recieved error_code: %s' % (error_code)) else: - logging.info ('We are go for launch!') + logging.info('We are go for launch!') #Process stderr stderr_line = pipe.stderr.readline() @@ -211,7 +325,9 @@ def configure(conf_info): def run_pipeline(conf_info): - + """ + Run the pipeline and monitor status. + """ # Fail if the run_path doesn't actually exist if not os.path.exists(conf_info.run_path): logging.error('Run path does not exist: %s' \ @@ -221,6 +337,9 @@ def run_pipeline(conf_info): # Change cwd to run_path os.chdir(conf_info.run_path) + # Log pipeline starting + logging.info('STARTING PIPELINE @ %s' % (time.ctime())) + # Start the pipeline (and hide!) pipe = subprocess.Popen(['make', '-j8', @@ -232,14 +351,41 @@ def run_pipeline(conf_info): complete = False while line != '': - if pipeline_handler(line, conf_info): + if pipeline_stdout_handler(line, conf_info): complete = True line = pipe.stdout.readline() error_code = pipe.wait() - ferr.write(pipe.stderr.read()) - ferr.close() + #ferr.write(pipe.stderr.read()) + #ferr.close() + + stderr_line = pipe.stderr.readline() + + run_succeded = False + run_failed = False + while stderr_line != '': + stderr_status = pipeline_stderr_handler(stderr_line, conf_info) + if stderr_status is True: + run_succeded = True + if stderr_status == RUN_FAILED: + run_failed = True + stderr_line = pipe.stderr.readline() + + ###DEBUG### + print 'RUN STATUS: expect: True, True, True, True' + print ' Status:', + print complete is True, error_code == 0, + print run_succeded is True, run_failed is False + ###END_DEBUG### + + status = complete is True and \ + error_code == 0 and \ + run_succeded is True and \ + run_failed is False + + return status + if __name__ == '__main__': -- 2.30.2