From: Brandon King Date: Fri, 14 Dec 2007 20:56:30 +0000 (+0000) Subject: [project @ gaworkflow.runner progress] X-Git-Tag: 0.1.0~32 X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=commitdiff_plain;h=aef99a54982b9045bf29df2d115449d29388f008 [project @ gaworkflow.runner progress] * Work towards getting runner to launch jobs in a seperate thread. * Stores status in a dictionary. --- diff --git a/gaworkflow/runner.py b/gaworkflow/runner.py index 40eae64..40aa9c8 100644 --- a/gaworkflow/runner.py +++ b/gaworkflow/runner.py @@ -4,9 +4,33 @@ import os import re import sys import time +import threading from benderjab import rpc +from gaworkflow.pipeline.configure_run import * +from gaworkflow.pipeline.monitors import startCmdLineStatusMonitor + +s_fc = re.compile('FC[0-9]+') + + +def _get_flowcell_from_rundir(run_dir): + """ + Returns flowcell string based on run_dir. + Returns None and logs error if flowcell can't be found. + """ + junk, dirname = os.path.split(run_dir) + mo = s_fc.search(dirname) + if not mo: + logging.error('RunDir 2 FlowCell error: %s' % (run_dir)) + return None + + return dirname[mo.start():] + + +def _runner(): + + class Runner(rpc.XmlRpcBot): """ Manage running pipeline jobs. @@ -17,12 +41,18 @@ class Runner(rpc.XmlRpcBot): super(Runner, self).__init__(section, configfile) self.cfg['notify_users'] = None + self.cfg['genome_dir'] = None + + self.conf_info_dict = {} self.register_function(self.sequencingFinished) self.eventTasks.append(self.update) def read_config(self, section=None, configfile=None): super(Runner, self).read_config(section, configfile) + + self.genome_dir = self._check_required_option('genome_dir') + def _parser(self, msg, who): """ @@ -61,6 +91,22 @@ class Runner(rpc.XmlRpcBot): Sequenceing (and copying) is finished, time to start pipeline """ logging.debug("received sequencing finished message") + + # Setup config info object + ci = ConfigInfo() + ci.run_path = run_dir + + # get flowcell from run_dir name + flowcell = _get_flowcell_from_rundir(run_dir) + + # Store ci object in dictionary + self.conf_info_dict[flowcell] = ci + + + # Launch the job in it's own thread and turn. + self.launchJob(run_dir, flowcell, ci) + + def pipelineFinished(self, run_dir): # need to strip off self.watch_dir from rundir I suspect. @@ -76,6 +122,54 @@ class Runner(rpc.XmlRpcBot): #if self.notify_runner is not None: # for r in self.notify_runner: # self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished') + + + def _runner(self, run_dir, flowcell, conf_info): + # retrieve config step + cfg_filepath = os.path.abspath('config32auto.txt') + status_retrieve_cfg = retrieve_config(ci, + flowcell, + cfg_filepath, + self.genome_dir) + if status_retrieve_cfg: + logging.info("Runner: Retrieve config: success") + else: + logging.error("Runner: Retrieve config: failed") + + + # configure step + if status_retrieve_cfg: + status = configure(ci) + if status: + logging.info("Runner: Configure: success") + else: + logging.error("Runner: Configure: failed") + + #if successful, continue + if status: + # Setup status cmdline status monitor + #startCmdLineStatusMonitor(ci) + + # running step + print 'Running pipeline now!' + run_status = run_pipeline(ci) + if run_status is True: + logging.info('Runner: Pipeline: success') + self.piplineFinished(run_dir) + else: + logging.info('Runner: Pipeline: failed') + + + def launchJob(self, run_dir, flowcell, conf_info): + """ + Starts up a thread for running the pipeline + """ + t = threading.Thread(target=self._runner, + args=[run_dir, flowcell, conf_info]) + t.setDaemon(True) + t.start() + + def main(args=None): bot = Runner() @@ -83,4 +177,4 @@ def main(args=None): if __name__ == "__main__": sys.exit(main(sys.argv[1:])) - \ No newline at end of file +