9 from benderjab import rpc
11 from gaworkflow.pipeline.configure_run import *
12 from gaworkflow.pipeline.monitors import startCmdLineStatusMonitor
14 s_fc = re.compile('FC[0-9]+')
17 def _get_flowcell_from_rundir(run_dir):
19 Returns flowcell string based on run_dir.
20 Returns None and logs error if flowcell can't be found.
22 junk, dirname = os.path.split(run_dir)
23 mo = s_fc.search(dirname)
25 logging.error('RunDir 2 FlowCell error: %s' % (run_dir))
28 return dirname[mo.start():]
34 class Runner(rpc.XmlRpcBot):
36 Manage running pipeline jobs.
38 def __init__(self, section=None, configfile=None):
39 #if configfile is None:
40 # self.configfile = "~/.gaworkflow"
41 super(Runner, self).__init__(section, configfile)
43 self.cfg['notify_users'] = None
44 self.cfg['genome_dir'] = None
46 self.conf_info_dict = {}
48 self.register_function(self.sequencingFinished)
49 self.eventTasks.append(self.update)
51 def read_config(self, section=None, configfile=None):
52 super(Runner, self).read_config(section, configfile)
54 self.genome_dir = self._check_required_option('genome_dir')
57 def _parser(self, msg, who):
59 Parse xmpp chat messages
61 help = u"I can send [start] a run, or report [status]"
62 if re.match(u"help", msg):
64 elif re.match("status", msg):
65 reply = u"not implemented"
66 elif re.match(u"start", msg):
69 self.sequencingFinished(words[1])
70 reply = u"starting run for %s" % (words[1])
72 reply = u"need runfolder name"
74 reply = u"I didn't understand '%s'" %(msg)
77 def start(self, daemonize):
81 super(Runner, self).start(daemonize)
87 super(Runner, self).stop()
89 def sequencingFinished(self, run_dir):
91 Sequenceing (and copying) is finished, time to start pipeline
93 logging.debug("received sequencing finished message")
95 # Setup config info object
99 # get flowcell from run_dir name
100 flowcell = _get_flowcell_from_rundir(run_dir)
102 # Store ci object in dictionary
103 self.conf_info_dict[flowcell] = ci
106 # Launch the job in it's own thread and turn.
107 self.launchJob(run_dir, flowcell, ci)
111 def pipelineFinished(self, run_dir):
112 # need to strip off self.watch_dir from rundir I suspect.
113 logging.info("pipeline finished in" + str(run_dir))
114 #pattern = self.watch_dir
115 #if pattern[-1] != os.path.sep:
116 # pattern += os.path.sep
117 #stripped_run_dir = re.sub(pattern, "", run_dir)
118 #logging.debug("stripped to " + stripped_run_dir)
119 #if self.notify_users is not None:
120 # for u in self.notify_users:
121 # self.send(u, 'Sequencing run %s finished' % #(stripped_run_dir))
122 #if self.notify_runner is not None:
123 # for r in self.notify_runner:
124 # self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
127 def _runner(self, run_dir, flowcell, conf_info):
128 # retrieve config step
129 cfg_filepath = os.path.abspath('config32auto.txt')
130 status_retrieve_cfg = retrieve_config(ci,
134 if status_retrieve_cfg:
135 logging.info("Runner: Retrieve config: success")
137 logging.error("Runner: Retrieve config: failed")
141 if status_retrieve_cfg:
142 status = configure(ci)
144 logging.info("Runner: Configure: success")
146 logging.error("Runner: Configure: failed")
148 #if successful, continue
150 # Setup status cmdline status monitor
151 #startCmdLineStatusMonitor(ci)
154 print 'Running pipeline now!'
155 run_status = run_pipeline(ci)
156 if run_status is True:
157 logging.info('Runner: Pipeline: success')
158 self.piplineFinished(run_dir)
160 logging.info('Runner: Pipeline: failed')
163 def launchJob(self, run_dir, flowcell, conf_info):
165 Starts up a thread for running the pipeline
167 t = threading.Thread(target=self._runner,
168 args=[run_dir, flowcell, conf_info])
176 return bot.main(args)
178 if __name__ == "__main__":
179 sys.exit(main(sys.argv[1:]))