From: Diane Trout Date: Tue, 20 Nov 2007 09:50:58 +0000 (+0000) Subject: [project @ move brandon's pipeline handling code into gaworkflow.pipeline] X-Git-Tag: 0.1.0~56 X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=commitdiff_plain;h=c6ad3af76120d3aa6c17c910a1700ecbcd262d1d [project @ move brandon's pipeline handling code into gaworkflow.pipeline] the code that was in the if __name__ == "__main__" got moved into similary named scripts in the scripts directory. Those import everything from their corresponding gaworkfile.pipeline module. I still wish the names were shorter, and yet still descriptive. Other refactoring ideas, break configure_run up, make a single module to hold all the exceptions from all the varios parts of the pipeline. And: I (still) find our lack of tests disturbing. --- diff --git a/bin/config_pipeline2.py b/bin/config_pipeline2.py deleted file mode 100644 index 2739cce..0000000 --- a/bin/config_pipeline2.py +++ /dev/null @@ -1,551 +0,0 @@ -#!/usr/bin/python -import subprocess -import logging -import time -import re -import os - -from retrieve_eland_config import getCombinedOptions, saveConfigFile -from retrieve_eland_config import FlowCellNotFound, WebError404 -from genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict - -from pyinotify import WatchManager, ThreadedNotifier -from pyinotify import EventsCodes, ProcessEvent - -logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s %(levelname)-8s %(message)s', - datefmt='%a, %d %b %Y %H:%M:%S', - filename='pipeline_main.log', - filemode='w') - -class ConfigInfo: - - def __init__(self): - self.run_path = None - self.bustard_path = None - self.config_filepath = None - - -#################################### -# inotify event processor - -s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt') -s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt') -s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt') - -class RunEvent(ProcessEvent): - - def __init__(self): - - self.run_status_dict = {'firecrest': False, - 'bustard': False, - 'gerald': False} - - ProcessEvent.__init__(self) - - def process_IN_CREATE(self, event): - fullpath = os.path.join(event.path, event.name) - if s_finished.search(fullpath): - logging.info("File Found: %s" % (fullpath)) - - if s_firecrest_finished.search(fullpath): - self.run_status_dict['firecrest'] = True - elif s_bustard_finished.search(fullpath): - self.run_status_dict['bustard'] = True - elif s_gerald_finished.search(fullpath): - self.run_status_dict['gerald'] = True - - print "Create: %s" % (os.path.join(event.path, event.name)) - - def process_IN_DELETE(self, event): - print "Remove %s" % (os.path.join(event.path, event.name)) - -#FLAGS -# Config Step Error -RUN_ABORT = 'abort' -# Run Step Error -RUN_FAILED = 'failed' - - -##################################### -# Configure Step (goat_pipeline.py) -#Info -s_start = re.compile('Starting Genome Analyzer Pipeline') -s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+") -s_generating = re.compile('Generating journals, Makefiles and parameter files') -s_seq_folder = re.compile('^Sequence folder: ') -s_seq_folder_sub = re.compile('want to make ') -s_stderr_taskcomplete = re.compile('^Task complete, exiting') - -#Errors -s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py') -s_species_dir_err = re.compile('Error: Lane [1-8]:') -s_goat_traceb = re.compile("^Traceback \(most recent call last\):") - - -##Ignore - Example of out above each ignore regex. -#NOTE: Commenting out an ignore will cause it to be -# logged as DEBUG with the logging module. -#CF_STDERR_IGNORE_LIST = [] -s_skip = re.compile('s_[0-8]_[0-9]+') - - -########################################## -# Pipeline Run Step (make -j8 recursive) - -##Info -s_finished = re.compile('finished') - -##Errors -s_make_error = re.compile('^make[\S\s]+Error') -s_no_gnuplot = re.compile('gnuplot: command not found') -s_no_convert = re.compile('^Can\'t exec "convert"') -s_no_ghostscript = re.compile('gs: command not found') - -##Ignore - Example of out above each ignore regex. -#NOTE: Commenting out an ignore will cause it to be -# logged as DEBUG with the logging module. -# -PL_STDERR_IGNORE_LIST = [] -# Info: PF 11802 -PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') ) -# About to analyse intensity file s_4_0101_sig2.txt -PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') ) -# Will send output to standard output -PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') ) -# Found 31877 clusters -PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') ) -# Will use quality criterion ((CHASTITY>=0.6) -PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') ) -# Quality criterion translated to (($F[5]>=0.6)) -PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') ) -# opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt -# AND -# opened s_4_0103_qhg.txt -PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') ) -# 81129 sequences out of 157651 passed filter criteria -PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') ) - - -def pl_stderr_ignore(line): - """ - Searches lines for lines to ignore (i.e. not to log) - - returns True if line should be ignored - returns False if line should NOT be ignored - """ - for s in PL_STDERR_IGNORE_LIST: - if s.search(line): - return True - return False - - -def config_stdout_handler(line, conf_info): - """ - Processes each line of output from GOAT - and stores useful information using the logging module - - Loads useful information into conf_info as well, for future - use outside the function. - - returns True if found condition that signifies success. - """ - - # Skip irrelevant line (without logging) - if s_skip.search(line): - pass - - # Detect invalid command-line arguments - elif s_invalid_cmdline.search(line): - logging.error("Invalid commandline options!") - - # Detect starting of configuration - elif s_start.search(line): - logging.info('START: Configuring pipeline') - - # Detect it made it past invalid arguments - elif s_gerald.search(line): - logging.info('Running make now') - - # Detect that make files have been generated (based on output) - elif s_generating.search(line): - logging.info('Make files generted') - return True - - # Capture run directory - elif s_seq_folder.search(line): - mo = s_seq_folder_sub.search(line) - #Output changed when using --tiles= - # at least in pipeline v0.3.0b2 - if mo: - firecrest_bustard_gerald_makefile = line[mo.end():] - firecrest_bustard_gerald, junk = \ - os.path.split(firecrest_bustard_gerald_makefile) - firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald) - firecrest, junk = os.path.split(firecrest_bustard) - - conf_info.bustard_path = firecrest_bustard - conf_info.run_path = firecrest - - #Standard output handling - else: - print 'Sequence line:', line - mo = s_seq_folder.search(line) - conf_info.bustard_path = line[mo.end():] - conf_info.run_path, temp = os.path.split(conf_info.bustard_path) - - # Log all other output for debugging purposes - else: - logging.warning('CONF:?: %s' % (line)) - - return False - - - -def config_stderr_handler(line, conf_info): - """ - Processes each line of output from GOAT - and stores useful information using the logging module - - Loads useful information into conf_info as well, for future - use outside the function. - - returns RUN_ABORT upon detecting failure; - True on success message; - False if neutral message - (i.e. doesn't signify failure or success) - """ - - # Detect invalid species directory error - if s_species_dir_err.search(line): - logging.error(line) - return RUN_ABORT - # Detect goat_pipeline.py traceback - elif s_goat_traceb.search(line): - logging.error("Goat config script died, traceback in debug output") - return RUN_ABORT - # Detect indication of successful configuration (from stderr; odd, but ok) - elif s_stderr_taskcomplete.search(line): - logging.info('Configure step successful (from: stderr)') - return True - # Log all other output as debug output - else: - logging.debug('CONF:STDERR:?: %s' % (line)) - - # Neutral (not failure; nor success) - return False - - -#def pipeline_stdout_handler(line, conf_info): -# """ -# Processes each line of output from running the pipeline -# and stores useful information using the logging module -# -# Loads useful information into conf_info as well, for future -# use outside the function. -# -# returns True if found condition that signifies success. -# """ -# -# #f.write(line + '\n') -# -# return True - - - -def pipeline_stderr_handler(line, conf_info): - """ - Processes each line of stderr from pipelien run - and stores useful information using the logging module - - ##FIXME: Future feature (doesn't actually do this yet) - #Loads useful information into conf_info as well, for future - #use outside the function. - - returns RUN_FAILED upon detecting failure; - #True on success message; (no clear success state) - False if neutral message - (i.e. doesn't signify failure or success) - """ - - if pl_stderr_ignore(line): - pass - elif s_make_error.search(line): - logging.error("make error detected; run failed") - return RUN_FAILED - elif s_no_gnuplot.search(line): - logging.error("gnuplot not found") - return RUN_FAILED - elif s_no_convert.search(line): - logging.error("imagemagick's convert command not found") - return RUN_FAILED - elif s_no_ghostscript.search(line): - logging.error("ghostscript not found") - return RUN_FAILED - else: - logging.debug('PIPE:STDERR:?: %s' % (line)) - - return False - - -def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir): - """ - Gets the config file from server... - requires config file in: - /etc/ga_frontend/ga_frontend.conf - or - ~/.ga_frontend.conf - - with: - [config_file_server] - base_host_url: http://host:port - - return True if successful, False is failure - """ - options = getCombinedOptions() - - if options.url is None: - logging.error("~/.ga_frontend.conf or /etc/ga_frontend/ga_frontend.conf" \ - " missing base_host_url option") - return False - - try: - saveConfigFile(flowcell, options.url, cfg_filepath) - conf_info.config_filepath = cfg_filepath - except FlowCellNotFound, e: - logging.error(e) - return False - except WebError404, e: - logging.error(e) - return False - except IOError, e: - logging.error(e) - return False - except Exception, e: - logging.error(e) - return False - - f = open(cfg_filepath, 'r') - data = f.read() - f.close() - - genome_dict = getAvailableGenomes(genome_dir) - mapper_dict = constructMapperDict(genome_dict) - - f = open(cfg_filepath, 'w') - f.write(data % (mapper_dict)) - f.close() - - return True - - - -def configure(conf_info): - """ - Attempts to configure the GA pipeline using goat. - - Uses logging module to store information about status. - - returns True if configuration successful, otherwise False. - """ - #ERROR Test: - #pipe = subprocess.Popen(['goat_pipeline.py', - # '--GERALD=config32bk.txt', - # '--make .',], - # #'.'], - # stdout=subprocess.PIPE, - # stderr=subprocess.PIPE) - - #ERROR Test (2), causes goat_pipeline.py traceback - #pipe = subprocess.Popen(['goat_pipeline.py', - # '--GERALD=%s' % (conf_info.config_filepath), - # '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104', - # '--make', - # '.'], - # stdout=subprocess.PIPE, - # stderr=subprocess.PIPE) - - ########################## - # Run configuration step - # Not a test; actual configure attempt. - #pipe = subprocess.Popen(['goat_pipeline.py', - # '--GERALD=%s' % (conf_info.config_filepath), - # '--make', - # '.'], - # stdout=subprocess.PIPE, - # stderr=subprocess.PIPE) - - # CONTINUE HERE - #FIXME: this only does a run on 5 tiles on lane 4 - pipe = subprocess.Popen(['goat_pipeline.py', - '--GERALD=%s' % (conf_info.config_filepath), - '--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104', - '--make', - '.'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - ################## - # Process stdout - stdout_line = pipe.stdout.readline() - - complete = False - while stdout_line != '': - # Handle stdout - if config_stdout_handler(stdout_line, conf_info): - complete = True - stdout_line = pipe.stdout.readline() - - - error_code = pipe.wait() - if error_code: - logging.error('Recieved error_code: %s' % (error_code)) - else: - logging.info('We are go for launch!') - - #Process stderr - stderr_line = pipe.stderr.readline() - - abort = 'NO!' - stderr_success = False - while stderr_line != '': - stderr_status = config_stderr_handler(stderr_line, conf_info) - if stderr_status == RUN_ABORT: - abort = RUN_ABORT - elif stderr_status is True: - stderr_success = True - stderr_line = pipe.stderr.readline() - - - #Success requirements: - # 1) The stdout completed without error - # 2) The program exited with status 0 - # 3) No errors found in stdout - print '#Expect: True, False, True, True' - print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True - status = complete is True and \ - bool(error_code) is False and \ - abort != RUN_ABORT and \ - stderr_success is True - - # If everything was successful, but for some reason - # we didn't retrieve the path info, log it. - if status is True: - if conf_info.bustard_path is None or conf_info.run_path is None: - logging.error("Failed to retrieve run_path") - return False - - return status - - -def run_pipeline(conf_info): - """ - Run the pipeline and monitor status. - """ - # Fail if the run_path doesn't actually exist - if not os.path.exists(conf_info.run_path): - logging.error('Run path does not exist: %s' \ - % (conf_info.run_path)) - return False - - # Change cwd to run_path - os.chdir(conf_info.run_path) - stdout_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stdout.txt') - stderr_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stderr.txt') - - # Monitor file creation - wm = WatchManager() - mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE - event = RunEvent() - notifier = ThreadedNotifier(wm, event) - notifier.start() - wdd = wm.add_watch(conf_info.run_path, mask, rec=True) - - # Log pipeline starting - logging.info('STARTING PIPELINE @ %s' % (time.ctime())) - - # Start the pipeline (and hide!) - #pipe = subprocess.Popen(['make', - # '-j8', - # 'recursive'], - # stdout=subprocess.PIPE, - # stderr=subprocess.PIPE) - - fout = open(stdout_filepath, 'w') - ferr = open(stderr_filepath, 'w') - - pipe = subprocess.Popen(['make', - '-j8', - 'recursive'], - stdout=fout, - stderr=ferr) - #shell=True) - # Wait for run to finish - retcode = pipe.wait() - - - # Clean up - notifier.stop() - fout.close() - ferr.close() - - # Process stderr - ferr = open(stderr_filepath, 'r') - - run_failed_stderr = False - for line in ferr: - err_status = pipeline_stderr_handler(line, conf_info) - if err_status == RUN_FAILED: - run_failed_stderr = True - - ferr.close() - - # Finished file check! - print 'RUN SUCCESS CHECK:' - for key, value in event.run_status_dict.items(): - print ' %s: %s' % (key, value) - - dstatus = event.run_status_dict - - # Success or failure check - status = (retcode == 0) and \ - run_failed_stderr is False and \ - dstatus['firecrest'] is True and \ - dstatus['bustard'] is True and \ - dstatus['gerald'] is True - - return status - - - -if __name__ == '__main__': - ci = ConfigInfo() - - flowcell = 'FC12150' - cfg_filepath = 'config32auto.txt' - genome_dir = '/home/king/trog_drive/' - - status_retrieve_cfg = retrieve_config(ci, flowcell, cfg_filepath, genome_dir) - if status_retrieve_cfg: - print "Retrieve config file successful" - else: - print "Failed to retrieve config file" - #ci.config_filepath = 'config32bk.txt' - - if status_retrieve_cfg: - status = configure(ci) - if status: - print "Configure success" - else: - print "Configure failed" - - print 'Run Dir:', ci.run_path - print 'Bustard Dir:', ci.bustard_path - - if status: - print 'Running pipeline now!' - run_status = run_pipeline(ci) - if run_status is True: - print 'Pipeline ran successfully.' - else: - print 'Pipeline run failed.' - diff --git a/bin/genome_mapper.py b/bin/genome_mapper.py deleted file mode 100644 index aacc068..0000000 --- a/bin/genome_mapper.py +++ /dev/null @@ -1,94 +0,0 @@ -#!/usr/bin/python -import glob -import sys -import os - -import logging - -class DuplicateGenome(Exception): pass - - -def _has_metainfo(genome_dir): - metapath = os.path.join(genome_dir, '_metainfo_') - if os.path.isfile(metapath): - return True - else: - return False - -def getAvailableGenomes(genome_base_dir): - """ - raises IOError (on genome_base_dir not found) - raises DuplicateGenome on duplicate genomes found. - - returns a double dictionary (i.e. d[species][build] = path) - """ - - # Need valid directory - if not os.path.exists(genome_base_dir): - msg = "Directory does not exist: %s" % (genome_base_dir) - raise IOError, msg - - # Find all subdirectories - filepath_list = glob.glob(os.path.join(genome_base_dir, '*')) - potential_genome_dirs = \ - [ filepath for filepath in filepath_list if os.path.isdir(filepath)] - - # Get list of metadata files - genome_dir_list = \ - [ dirpath \ - for dirpath in potential_genome_dirs \ - if _has_metainfo(dirpath) ] - - # Genome double dictionary - d = {} - - for genome_dir in genome_dir_list: - line = open(os.path.join(genome_dir, '_metainfo_'), 'r').readline().strip() - - # Get species, build... log and skip on failure - try: - species, build = line.split('|') - except: - logging.warning('Skipping: Invalid metafile (%s) line: %s' \ - % (metafile, line)) - continue - - build_dict = d.setdefault(species, {}) - if build in build_dict: - msg = "Duplicate genome for %s|%s" % (species, build) - raise DuplicateGenome, msg - - build_dict[build] = genome_dir - - return d - - -def constructMapperDict(genome_dict): - """ - Creates a dictionary which can map the genome - in the eland config generator output to a local - genome path - - ie. 'Homo sapiens|hg18' -> - """ - mapper_dict = {} - for species in genome_dict.keys(): - for build in genome_dict[species]: - mapper_dict[species+'|'+build] = genome_dict[species][build] - - return mapper_dict - - -if __name__ == '__main__': - - if len(sys.argv) != 2: - print 'useage: %s ' % (sys.argv[0]) - sys.exit(1) - - d = getAvailableGenomes(sys.argv[1]) - d2 = constructMapperDict(d) - - for k,v in d2.items(): - print '%s: %s' % (k,v) - - diff --git a/bin/retrieve_eland_config.py b/bin/retrieve_eland_config.py deleted file mode 100644 index cb4fa84..0000000 --- a/bin/retrieve_eland_config.py +++ /dev/null @@ -1,170 +0,0 @@ -#!/usr/bin/env python - -from optparse import OptionParser, IndentedHelpFormatter -from ConfigParser import SafeConfigParser - -import os -import sys -import urllib - -CONFIG_SYSTEM = '/etc/ga_frontend/ga_frontend.conf' -CONFIG_USER = os.path.expanduser('~/.ga_frontend.conf') - - -class FlowCellNotFound(Exception): pass -class WebError404(Exception): pass - - -class PreformattedDescriptionFormatter(IndentedHelpFormatter): - - #def format_description(self, description): - # - # if description: - # return description + "\n" - # else: - # return "" - - def format_epilog(self, epilog): - """ - It was removing my preformated epilog, so this should override - that behavior! Muhahaha! - """ - if epilog: - return "\n" + epilog + "\n" - else: - return "" - - -def constructOptionParser(): - """ - returns a pre-setup optparser - """ - parser = OptionParser(formatter=PreformattedDescriptionFormatter()) - - parser.set_description('Retrieves eland config file from ga_frontend web frontend.') - - parser.epilog = """ -Config File: - * %s (System wide) - * %s (User specific; overrides system) - * command line overrides all config file options - - Example Config File: - - [server_info] - base_host_url=http://somewhere.domain:port -""" % (CONFIG_SYSTEM, CONFIG_USER) - - #Special formatter for allowing preformatted description. - ##parser.format_epilog(PreformattedDescriptionFormatter()) - - parser.add_option("-u", "--url", - action="store", type="string", dest="url") - - parser.add_option("-o", "--output", - action="store", type="string", dest="output_filepath") - - parser.add_option("-f", "--flowcell", - action="store", type="string", dest="flowcell") - - #parser.set_default("url", "default") - - return parser - -def constructConfigParser(): - """ - returns a pre-setup config parser - """ - parser = SafeConfigParser() - parser.read([CONFIG_SYSTEM, CONFIG_USER]) - if not parser.has_section('config_file_server'): - parser.add_section('config_file_server') - - return parser - - -def getCombinedOptions(): - """ - Returns optparse options after it has be updated with ConfigParser - config files and merged with parsed commandline options. - """ - cl_parser = constructOptionParser() - conf_parser = constructConfigParser() - - options, args = cl_parser.parse_args() - - if options.url is None: - if conf_parser.has_option('config_file_server', 'base_host_url'): - options.url = conf_parser.get('config_file_server', 'base_host_url') - - print 'USING OPTIONS:' - print ' URL:', options.url - print ' OUT:', options.output_filepath - print ' FC:', options.flowcell - print '' - - return options - - -def saveConfigFile(flowcell, base_host_url, output_filepath): - """ - retrieves the flowcell eland config file, give the base_host_url - (i.e. http://sub.domain.edu:port) - """ - url = base_host_url + '/eland_config/%s/' % (flowcell) - - f = open(output_filepath, 'w') - #try: - web = urllib.urlopen(url) - #except IOError, msg: - # if str(msg).find("Connection refused") >= 0: - # print 'Error: Connection refused for: %s' % (url) - # f.close() - # sys.exit(1) - # elif str(msg).find("Name or service not known") >= 0: - # print 'Error: Invalid domain or ip address for: %s' % (url) - # f.close() - # sys.exit(2) - # else: - # raise IOError, msg - - data = web.read() - - if data.find('Hmm, config file for') >= 0: - msg = "Flowcell (%s) not found in DB; full url(%s)" % (flowcell, url) - raise FlowCellNotFound, msg - - if data.find('404 - Not Found') >= 0: - msg = "404 - Not Found: Flowcell (%s); base_host_url (%s);\n full url(%s)\n " \ - "Did you get right port #?" % (flowcell, base_host_url, url) - raise FlowCellNotFound, msg - - f.write(data) - web.close() - f.close() - print 'Wrote config file to %s' % (output_filepath) - - -if __name__ == '__main__': - #Display help if no args are presented - if len(sys.argv) == 1: - sys.argv.append('-h') - - options = getCombinedOptions() - msg_list = ['ERROR MESSAGES:'] - if options.output_filepath is None: - msg_list.append(" Output filepath argument required. -o or --output=") - - if options.flowcell is None: - msg_list.append(" Flow cell argument required. -f or --flowcell=") - - if options.url is None: - msg_list.append(" URL argument required (-u or --url=), or entry\n" \ - " in /etc/elandifier/elandifer.conf or ~/.elandifier.conf") - - if len(msg_list) > 1: - print '\n'.join(msg_list) - sys.exit(0) - - saveConfigFile(options.flowcell, options.url, options.output_filepath) - diff --git a/gaworkflow/pipeline/configure_run.py b/gaworkflow/pipeline/configure_run.py new file mode 100644 index 0000000..f71bfc5 --- /dev/null +++ b/gaworkflow/pipeline/configure_run.py @@ -0,0 +1,518 @@ +#!/usr/bin/python +import subprocess +import logging +import time +import re +import os + +from gaworkflow.pipeline.retrieve_config import getCombinedOptions, saveConfigFile +from gaworkflow.pipeline.retrieve_config import FlowCellNotFound, WebError404 +from gaworkflow.pipeline.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict + +from pyinotify import WatchManager, ThreadedNotifier +from pyinotify import EventsCodes, ProcessEvent + +logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s %(levelname)-8s %(message)s', + datefmt='%a, %d %b %Y %H:%M:%S', + filename='pipeline_main.log', + filemode='w') + +class ConfigInfo: + + def __init__(self): + self.run_path = None + self.bustard_path = None + self.config_filepath = None + + +#################################### +# inotify event processor + +s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt') +s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt') +s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt') + +class RunEvent(ProcessEvent): + + def __init__(self): + + self.run_status_dict = {'firecrest': False, + 'bustard': False, + 'gerald': False} + + ProcessEvent.__init__(self) + + def process_IN_CREATE(self, event): + fullpath = os.path.join(event.path, event.name) + if s_finished.search(fullpath): + logging.info("File Found: %s" % (fullpath)) + + if s_firecrest_finished.search(fullpath): + self.run_status_dict['firecrest'] = True + elif s_bustard_finished.search(fullpath): + self.run_status_dict['bustard'] = True + elif s_gerald_finished.search(fullpath): + self.run_status_dict['gerald'] = True + + print "Create: %s" % (os.path.join(event.path, event.name)) + + def process_IN_DELETE(self, event): + print "Remove %s" % (os.path.join(event.path, event.name)) + +#FLAGS +# Config Step Error +RUN_ABORT = 'abort' +# Run Step Error +RUN_FAILED = 'failed' + + +##################################### +# Configure Step (goat_pipeline.py) +#Info +s_start = re.compile('Starting Genome Analyzer Pipeline') +s_gerald = re.compile("[\S\s]+--GERALD[\S\s]+--make[\S\s]+") +s_generating = re.compile('Generating journals, Makefiles and parameter files') +s_seq_folder = re.compile('^Sequence folder: ') +s_seq_folder_sub = re.compile('want to make ') +s_stderr_taskcomplete = re.compile('^Task complete, exiting') + +#Errors +s_invalid_cmdline = re.compile('Usage:[\S\s]*goat_pipeline.py') +s_species_dir_err = re.compile('Error: Lane [1-8]:') +s_goat_traceb = re.compile("^Traceback \(most recent call last\):") + + +##Ignore - Example of out above each ignore regex. +#NOTE: Commenting out an ignore will cause it to be +# logged as DEBUG with the logging module. +#CF_STDERR_IGNORE_LIST = [] +s_skip = re.compile('s_[0-8]_[0-9]+') + + +########################################## +# Pipeline Run Step (make -j8 recursive) + +##Info +s_finished = re.compile('finished') + +##Errors +s_make_error = re.compile('^make[\S\s]+Error') +s_no_gnuplot = re.compile('gnuplot: command not found') +s_no_convert = re.compile('^Can\'t exec "convert"') +s_no_ghostscript = re.compile('gs: command not found') + +##Ignore - Example of out above each ignore regex. +#NOTE: Commenting out an ignore will cause it to be +# logged as DEBUG with the logging module. +# +PL_STDERR_IGNORE_LIST = [] +# Info: PF 11802 +PL_STDERR_IGNORE_LIST.append( re.compile('^Info: PF') ) +# About to analyse intensity file s_4_0101_sig2.txt +PL_STDERR_IGNORE_LIST.append( re.compile('^About to analyse intensity file') ) +# Will send output to standard output +PL_STDERR_IGNORE_LIST.append( re.compile('^Will send output to standard output') ) +# Found 31877 clusters +PL_STDERR_IGNORE_LIST.append( re.compile('^Found [0-9]+ clusters') ) +# Will use quality criterion ((CHASTITY>=0.6) +PL_STDERR_IGNORE_LIST.append( re.compile('^Will use quality criterion') ) +# Quality criterion translated to (($F[5]>=0.6)) +PL_STDERR_IGNORE_LIST.append( re.compile('^Quality criterion translated to') ) +# opened /woldlab/trog/data1/king/070924_USI-EAS44_0022_FC12150/Data/C1-36_Firecrest1.9.1_14-11-2007_king.4/Bustard1.9.1_14-11-2007_king/s_4_0101_qhg.txt +# AND +# opened s_4_0103_qhg.txt +PL_STDERR_IGNORE_LIST.append( re.compile('^opened[\S\s]+qhg.txt') ) +# 81129 sequences out of 157651 passed filter criteria +PL_STDERR_IGNORE_LIST.append( re.compile('^[0-9]+ sequences out of [0-9]+ passed filter criteria') ) + + +def pl_stderr_ignore(line): + """ + Searches lines for lines to ignore (i.e. not to log) + + returns True if line should be ignored + returns False if line should NOT be ignored + """ + for s in PL_STDERR_IGNORE_LIST: + if s.search(line): + return True + return False + + +def config_stdout_handler(line, conf_info): + """ + Processes each line of output from GOAT + and stores useful information using the logging module + + Loads useful information into conf_info as well, for future + use outside the function. + + returns True if found condition that signifies success. + """ + + # Skip irrelevant line (without logging) + if s_skip.search(line): + pass + + # Detect invalid command-line arguments + elif s_invalid_cmdline.search(line): + logging.error("Invalid commandline options!") + + # Detect starting of configuration + elif s_start.search(line): + logging.info('START: Configuring pipeline') + + # Detect it made it past invalid arguments + elif s_gerald.search(line): + logging.info('Running make now') + + # Detect that make files have been generated (based on output) + elif s_generating.search(line): + logging.info('Make files generted') + return True + + # Capture run directory + elif s_seq_folder.search(line): + mo = s_seq_folder_sub.search(line) + #Output changed when using --tiles= + # at least in pipeline v0.3.0b2 + if mo: + firecrest_bustard_gerald_makefile = line[mo.end():] + firecrest_bustard_gerald, junk = \ + os.path.split(firecrest_bustard_gerald_makefile) + firecrest_bustard, junk = os.path.split(firecrest_bustard_gerald) + firecrest, junk = os.path.split(firecrest_bustard) + + conf_info.bustard_path = firecrest_bustard + conf_info.run_path = firecrest + + #Standard output handling + else: + print 'Sequence line:', line + mo = s_seq_folder.search(line) + conf_info.bustard_path = line[mo.end():] + conf_info.run_path, temp = os.path.split(conf_info.bustard_path) + + # Log all other output for debugging purposes + else: + logging.warning('CONF:?: %s' % (line)) + + return False + + + +def config_stderr_handler(line, conf_info): + """ + Processes each line of output from GOAT + and stores useful information using the logging module + + Loads useful information into conf_info as well, for future + use outside the function. + + returns RUN_ABORT upon detecting failure; + True on success message; + False if neutral message + (i.e. doesn't signify failure or success) + """ + + # Detect invalid species directory error + if s_species_dir_err.search(line): + logging.error(line) + return RUN_ABORT + # Detect goat_pipeline.py traceback + elif s_goat_traceb.search(line): + logging.error("Goat config script died, traceback in debug output") + return RUN_ABORT + # Detect indication of successful configuration (from stderr; odd, but ok) + elif s_stderr_taskcomplete.search(line): + logging.info('Configure step successful (from: stderr)') + return True + # Log all other output as debug output + else: + logging.debug('CONF:STDERR:?: %s' % (line)) + + # Neutral (not failure; nor success) + return False + + +#def pipeline_stdout_handler(line, conf_info): +# """ +# Processes each line of output from running the pipeline +# and stores useful information using the logging module +# +# Loads useful information into conf_info as well, for future +# use outside the function. +# +# returns True if found condition that signifies success. +# """ +# +# #f.write(line + '\n') +# +# return True + + + +def pipeline_stderr_handler(line, conf_info): + """ + Processes each line of stderr from pipelien run + and stores useful information using the logging module + + ##FIXME: Future feature (doesn't actually do this yet) + #Loads useful information into conf_info as well, for future + #use outside the function. + + returns RUN_FAILED upon detecting failure; + #True on success message; (no clear success state) + False if neutral message + (i.e. doesn't signify failure or success) + """ + + if pl_stderr_ignore(line): + pass + elif s_make_error.search(line): + logging.error("make error detected; run failed") + return RUN_FAILED + elif s_no_gnuplot.search(line): + logging.error("gnuplot not found") + return RUN_FAILED + elif s_no_convert.search(line): + logging.error("imagemagick's convert command not found") + return RUN_FAILED + elif s_no_ghostscript.search(line): + logging.error("ghostscript not found") + return RUN_FAILED + else: + logging.debug('PIPE:STDERR:?: %s' % (line)) + + return False + + +def retrieve_config(conf_info, flowcell, cfg_filepath, genome_dir): + """ + Gets the config file from server... + requires config file in: + /etc/ga_frontend/ga_frontend.conf + or + ~/.ga_frontend.conf + + with: + [config_file_server] + base_host_url: http://host:port + + return True if successful, False is failure + """ + options = getCombinedOptions() + + if options.url is None: + logging.error("~/.ga_frontend.conf or /etc/ga_frontend/ga_frontend.conf" \ + " missing base_host_url option") + return False + + try: + saveConfigFile(flowcell, options.url, cfg_filepath) + conf_info.config_filepath = cfg_filepath + except FlowCellNotFound, e: + logging.error(e) + return False + except WebError404, e: + logging.error(e) + return False + except IOError, e: + logging.error(e) + return False + except Exception, e: + logging.error(e) + return False + + f = open(cfg_filepath, 'r') + data = f.read() + f.close() + + genome_dict = getAvailableGenomes(genome_dir) + mapper_dict = constructMapperDict(genome_dict) + + f = open(cfg_filepath, 'w') + f.write(data % (mapper_dict)) + f.close() + + return True + + + +def configure(conf_info): + """ + Attempts to configure the GA pipeline using goat. + + Uses logging module to store information about status. + + returns True if configuration successful, otherwise False. + """ + #ERROR Test: + #pipe = subprocess.Popen(['goat_pipeline.py', + # '--GERALD=config32bk.txt', + # '--make .',], + # #'.'], + # stdout=subprocess.PIPE, + # stderr=subprocess.PIPE) + + #ERROR Test (2), causes goat_pipeline.py traceback + #pipe = subprocess.Popen(['goat_pipeline.py', + # '--GERALD=%s' % (conf_info.config_filepath), + # '--tiles=s_4_100,s_4_101,s_4_102,s_4_103,s_4_104', + # '--make', + # '.'], + # stdout=subprocess.PIPE, + # stderr=subprocess.PIPE) + + ########################## + # Run configuration step + # Not a test; actual configure attempt. + #pipe = subprocess.Popen(['goat_pipeline.py', + # '--GERALD=%s' % (conf_info.config_filepath), + # '--make', + # '.'], + # stdout=subprocess.PIPE, + # stderr=subprocess.PIPE) + + # CONTINUE HERE + #FIXME: this only does a run on 5 tiles on lane 4 + pipe = subprocess.Popen(['goat_pipeline.py', + '--GERALD=%s' % (conf_info.config_filepath), + '--tiles=s_4_0100,s_4_0101,s_4_0102,s_4_0103,s_4_0104', + '--make', + '.'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + ################## + # Process stdout + stdout_line = pipe.stdout.readline() + + complete = False + while stdout_line != '': + # Handle stdout + if config_stdout_handler(stdout_line, conf_info): + complete = True + stdout_line = pipe.stdout.readline() + + + error_code = pipe.wait() + if error_code: + logging.error('Recieved error_code: %s' % (error_code)) + else: + logging.info('We are go for launch!') + + #Process stderr + stderr_line = pipe.stderr.readline() + + abort = 'NO!' + stderr_success = False + while stderr_line != '': + stderr_status = config_stderr_handler(stderr_line, conf_info) + if stderr_status == RUN_ABORT: + abort = RUN_ABORT + elif stderr_status is True: + stderr_success = True + stderr_line = pipe.stderr.readline() + + + #Success requirements: + # 1) The stdout completed without error + # 2) The program exited with status 0 + # 3) No errors found in stdout + print '#Expect: True, False, True, True' + print complete, bool(error_code), abort != RUN_ABORT, stderr_success is True + status = complete is True and \ + bool(error_code) is False and \ + abort != RUN_ABORT and \ + stderr_success is True + + # If everything was successful, but for some reason + # we didn't retrieve the path info, log it. + if status is True: + if conf_info.bustard_path is None or conf_info.run_path is None: + logging.error("Failed to retrieve run_path") + return False + + return status + + +def run_pipeline(conf_info): + """ + Run the pipeline and monitor status. + """ + # Fail if the run_path doesn't actually exist + if not os.path.exists(conf_info.run_path): + logging.error('Run path does not exist: %s' \ + % (conf_info.run_path)) + return False + + # Change cwd to run_path + os.chdir(conf_info.run_path) + stdout_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stdout.txt') + stderr_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stderr.txt') + + # Monitor file creation + wm = WatchManager() + mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE + event = RunEvent() + notifier = ThreadedNotifier(wm, event) + notifier.start() + wdd = wm.add_watch(conf_info.run_path, mask, rec=True) + + # Log pipeline starting + logging.info('STARTING PIPELINE @ %s' % (time.ctime())) + + # Start the pipeline (and hide!) + #pipe = subprocess.Popen(['make', + # '-j8', + # 'recursive'], + # stdout=subprocess.PIPE, + # stderr=subprocess.PIPE) + + fout = open(stdout_filepath, 'w') + ferr = open(stderr_filepath, 'w') + + pipe = subprocess.Popen(['make', + '-j8', + 'recursive'], + stdout=fout, + stderr=ferr) + #shell=True) + # Wait for run to finish + retcode = pipe.wait() + + + # Clean up + notifier.stop() + fout.close() + ferr.close() + + # Process stderr + ferr = open(stderr_filepath, 'r') + + run_failed_stderr = False + for line in ferr: + err_status = pipeline_stderr_handler(line, conf_info) + if err_status == RUN_FAILED: + run_failed_stderr = True + + ferr.close() + + # Finished file check! + print 'RUN SUCCESS CHECK:' + for key, value in event.run_status_dict.items(): + print ' %s: %s' % (key, value) + + dstatus = event.run_status_dict + + # Success or failure check + status = (retcode == 0) and \ + run_failed_stderr is False and \ + dstatus['firecrest'] is True and \ + dstatus['bustard'] is True and \ + dstatus['gerald'] is True + + return status + + diff --git a/gaworkflow/pipeline/genome_mapper.py b/gaworkflow/pipeline/genome_mapper.py new file mode 100644 index 0000000..aacc068 --- /dev/null +++ b/gaworkflow/pipeline/genome_mapper.py @@ -0,0 +1,94 @@ +#!/usr/bin/python +import glob +import sys +import os + +import logging + +class DuplicateGenome(Exception): pass + + +def _has_metainfo(genome_dir): + metapath = os.path.join(genome_dir, '_metainfo_') + if os.path.isfile(metapath): + return True + else: + return False + +def getAvailableGenomes(genome_base_dir): + """ + raises IOError (on genome_base_dir not found) + raises DuplicateGenome on duplicate genomes found. + + returns a double dictionary (i.e. d[species][build] = path) + """ + + # Need valid directory + if not os.path.exists(genome_base_dir): + msg = "Directory does not exist: %s" % (genome_base_dir) + raise IOError, msg + + # Find all subdirectories + filepath_list = glob.glob(os.path.join(genome_base_dir, '*')) + potential_genome_dirs = \ + [ filepath for filepath in filepath_list if os.path.isdir(filepath)] + + # Get list of metadata files + genome_dir_list = \ + [ dirpath \ + for dirpath in potential_genome_dirs \ + if _has_metainfo(dirpath) ] + + # Genome double dictionary + d = {} + + for genome_dir in genome_dir_list: + line = open(os.path.join(genome_dir, '_metainfo_'), 'r').readline().strip() + + # Get species, build... log and skip on failure + try: + species, build = line.split('|') + except: + logging.warning('Skipping: Invalid metafile (%s) line: %s' \ + % (metafile, line)) + continue + + build_dict = d.setdefault(species, {}) + if build in build_dict: + msg = "Duplicate genome for %s|%s" % (species, build) + raise DuplicateGenome, msg + + build_dict[build] = genome_dir + + return d + + +def constructMapperDict(genome_dict): + """ + Creates a dictionary which can map the genome + in the eland config generator output to a local + genome path + + ie. 'Homo sapiens|hg18' -> + """ + mapper_dict = {} + for species in genome_dict.keys(): + for build in genome_dict[species]: + mapper_dict[species+'|'+build] = genome_dict[species][build] + + return mapper_dict + + +if __name__ == '__main__': + + if len(sys.argv) != 2: + print 'useage: %s ' % (sys.argv[0]) + sys.exit(1) + + d = getAvailableGenomes(sys.argv[1]) + d2 = constructMapperDict(d) + + for k,v in d2.items(): + print '%s: %s' % (k,v) + + diff --git a/gaworkflow/pipeline/retrieve_config.py b/gaworkflow/pipeline/retrieve_config.py new file mode 100644 index 0000000..322d5ab --- /dev/null +++ b/gaworkflow/pipeline/retrieve_config.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python + +from optparse import OptionParser, IndentedHelpFormatter +from ConfigParser import SafeConfigParser + +import os +import sys +import urllib + +CONFIG_SYSTEM = '/etc/ga_frontend/ga_frontend.conf' +CONFIG_USER = os.path.expanduser('~/.ga_frontend.conf') + + +class FlowCellNotFound(Exception): pass +class WebError404(Exception): pass + + +class PreformattedDescriptionFormatter(IndentedHelpFormatter): + + #def format_description(self, description): + # + # if description: + # return description + "\n" + # else: + # return "" + + def format_epilog(self, epilog): + """ + It was removing my preformated epilog, so this should override + that behavior! Muhahaha! + """ + if epilog: + return "\n" + epilog + "\n" + else: + return "" + + +def constructOptionParser(): + """ + returns a pre-setup optparser + """ + parser = OptionParser(formatter=PreformattedDescriptionFormatter()) + + parser.set_description('Retrieves eland config file from ga_frontend web frontend.') + + parser.epilog = """ +Config File: + * %s (System wide) + * %s (User specific; overrides system) + * command line overrides all config file options + + Example Config File: + + [server_info] + base_host_url=http://somewhere.domain:port +""" % (CONFIG_SYSTEM, CONFIG_USER) + + #Special formatter for allowing preformatted description. + ##parser.format_epilog(PreformattedDescriptionFormatter()) + + parser.add_option("-u", "--url", + action="store", type="string", dest="url") + + parser.add_option("-o", "--output", + action="store", type="string", dest="output_filepath") + + parser.add_option("-f", "--flowcell", + action="store", type="string", dest="flowcell") + + #parser.set_default("url", "default") + + return parser + +def constructConfigParser(): + """ + returns a pre-setup config parser + """ + parser = SafeConfigParser() + parser.read([CONFIG_SYSTEM, CONFIG_USER]) + if not parser.has_section('config_file_server'): + parser.add_section('config_file_server') + + return parser + + +def getCombinedOptions(): + """ + Returns optparse options after it has be updated with ConfigParser + config files and merged with parsed commandline options. + """ + cl_parser = constructOptionParser() + conf_parser = constructConfigParser() + + options, args = cl_parser.parse_args() + + if options.url is None: + if conf_parser.has_option('config_file_server', 'base_host_url'): + options.url = conf_parser.get('config_file_server', 'base_host_url') + + print 'USING OPTIONS:' + print ' URL:', options.url + print ' OUT:', options.output_filepath + print ' FC:', options.flowcell + print '' + + return options + + +def saveConfigFile(flowcell, base_host_url, output_filepath): + """ + retrieves the flowcell eland config file, give the base_host_url + (i.e. http://sub.domain.edu:port) + """ + url = base_host_url + '/eland_config/%s/' % (flowcell) + + f = open(output_filepath, 'w') + #try: + web = urllib.urlopen(url) + #except IOError, msg: + # if str(msg).find("Connection refused") >= 0: + # print 'Error: Connection refused for: %s' % (url) + # f.close() + # sys.exit(1) + # elif str(msg).find("Name or service not known") >= 0: + # print 'Error: Invalid domain or ip address for: %s' % (url) + # f.close() + # sys.exit(2) + # else: + # raise IOError, msg + + data = web.read() + + if data.find('Hmm, config file for') >= 0: + msg = "Flowcell (%s) not found in DB; full url(%s)" % (flowcell, url) + raise FlowCellNotFound, msg + + if data.find('404 - Not Found') >= 0: + msg = "404 - Not Found: Flowcell (%s); base_host_url (%s);\n full url(%s)\n " \ + "Did you get right port #?" % (flowcell, base_host_url, url) + raise FlowCellNotFound, msg + + f.write(data) + web.close() + f.close() + print 'Wrote config file to %s' % (output_filepath) + + diff --git a/scripts/configure_pipeline b/scripts/configure_pipeline new file mode 100644 index 0000000..b491a38 --- /dev/null +++ b/scripts/configure_pipeline @@ -0,0 +1,42 @@ +#!/usr/bin/env python +import sys +from gaworkflow.pipeline.configure_run import * + +def main(args=None): + ci = ConfigInfo() + + flowcell = 'FC12150' + cfg_filepath = 'config32auto.txt' + genome_dir = '/home/king/trog_drive/' + + status_retrieve_cfg = retrieve_config(ci, flowcell, cfg_filepath, genome_dir) + if status_retrieve_cfg: + print "Retrieve config file successful" + else: + print "Failed to retrieve config file" + #ci.config_filepath = 'config32bk.txt' + + if status_retrieve_cfg: + status = configure(ci) + if status: + print "Configure success" + else: + print "Configure failed" + + print 'Run Dir:', ci.run_path + print 'Bustard Dir:', ci.bustard_path + + if status: + print 'Running pipeline now!' + run_status = run_pipeline(ci) + if run_status is True: + print 'Pipeline ran successfully.' + return 0 + else: + print 'Pipeline run failed.' + return 1 + + return 2 + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) diff --git a/scripts/retrieve_config b/scripts/retrieve_config new file mode 100644 index 0000000..3056c16 --- /dev/null +++ b/scripts/retrieve_config @@ -0,0 +1,30 @@ +#!/usr/bin/env python +import sys +from gaworkflow.pipeline.retrieve_config import * + +def main(args=None): + #Display help if no args are presented + if len(sys.argv) == 1: + sys.argv.append('-h') + + options = getCombinedOptions() + msg_list = ['ERROR MESSAGES:'] + if options.output_filepath is None: + msg_list.append(" Output filepath argument required. -o or --output=") + + if options.flowcell is None: + msg_list.append(" Flow cell argument required. -f or --flowcell=") + + if options.url is None: + msg_list.append(" URL argument required (-u or --url=), or entry\n" \ + " in /etc/elandifier/elandifer.conf or ~/.elandifier.conf") + + if len(msg_list) > 1: + print '\n'.join(msg_list) + return 1 + + saveConfigFile(options.flowcell, options.url, options.output_filepath) + return 0 + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) diff --git a/setup.py b/setup.py index eb4f676..0dc2dbf 100644 --- a/setup.py +++ b/setup.py @@ -6,5 +6,8 @@ setup( author="Diane Trout", author_email="diane@caltech.edu", packages=["uashelper"], - scripts=['scripts/spoolwatcher', 'scripts/copier'], + scripts=['scripts/spoolwatcher', + 'scripts/copier', + 'scripts/retreive_config', + 'scripts/configure_pipeline'], )