import re
import sys
import time
+import threading
from benderjab import rpc
+from gaworkflow.pipeline.configure_run import *
+from gaworkflow.pipeline.monitors import startCmdLineStatusMonitor
+
+s_fc = re.compile('FC[0-9]+')
+
+
+def _get_flowcell_from_rundir(run_dir):
+ """
+ Returns flowcell string based on run_dir.
+ Returns None and logs error if flowcell can't be found.
+ """
+ junk, dirname = os.path.split(run_dir)
+ mo = s_fc.search(dirname)
+ if not mo:
+ logging.error('RunDir 2 FlowCell error: %s' % (run_dir))
+ return None
+
+ return dirname[mo.start():]
+
+
+def _runner():
+
+
class Runner(rpc.XmlRpcBot):
"""
Manage running pipeline jobs.
super(Runner, self).__init__(section, configfile)
self.cfg['notify_users'] = None
+ self.cfg['genome_dir'] = None
+
+ self.conf_info_dict = {}
self.register_function(self.sequencingFinished)
self.eventTasks.append(self.update)
def read_config(self, section=None, configfile=None):
super(Runner, self).read_config(section, configfile)
+
+ self.genome_dir = self._check_required_option('genome_dir')
+
def _parser(self, msg, who):
"""
Sequenceing (and copying) is finished, time to start pipeline
"""
logging.debug("received sequencing finished message")
+
+ # Setup config info object
+ ci = ConfigInfo()
+ ci.run_path = run_dir
+
+ # get flowcell from run_dir name
+ flowcell = _get_flowcell_from_rundir(run_dir)
+
+ # Store ci object in dictionary
+ self.conf_info_dict[flowcell] = ci
+
+
+ # Launch the job in it's own thread and turn.
+ self.launchJob(run_dir, flowcell, ci)
+
+
def pipelineFinished(self, run_dir):
# need to strip off self.watch_dir from rundir I suspect.
#if self.notify_runner is not None:
# for r in self.notify_runner:
# self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
+
+
+ def _runner(self, run_dir, flowcell, conf_info):
+ # retrieve config step
+ cfg_filepath = os.path.abspath('config32auto.txt')
+ status_retrieve_cfg = retrieve_config(ci,
+ flowcell,
+ cfg_filepath,
+ self.genome_dir)
+ if status_retrieve_cfg:
+ logging.info("Runner: Retrieve config: success")
+ else:
+ logging.error("Runner: Retrieve config: failed")
+
+
+ # configure step
+ if status_retrieve_cfg:
+ status = configure(ci)
+ if status:
+ logging.info("Runner: Configure: success")
+ else:
+ logging.error("Runner: Configure: failed")
+
+ #if successful, continue
+ if status:
+ # Setup status cmdline status monitor
+ #startCmdLineStatusMonitor(ci)
+
+ # running step
+ print 'Running pipeline now!'
+ run_status = run_pipeline(ci)
+ if run_status is True:
+ logging.info('Runner: Pipeline: success')
+ self.piplineFinished(run_dir)
+ else:
+ logging.info('Runner: Pipeline: failed')
+
+
+ def launchJob(self, run_dir, flowcell, conf_info):
+ """
+ Starts up a thread for running the pipeline
+ """
+ t = threading.Thread(target=self._runner,
+ args=[run_dir, flowcell, conf_info])
+ t.setDaemon(True)
+ t.start()
+
+
def main(args=None):
bot = Runner()
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
-
\ No newline at end of file
+