10 from benderjab import rpc
12 from gaworkflow.pipeline.configure_run import *
13 from gaworkflow.pipeline.monitors import _percentCompleted
15 #s_fc = re.compile('FC[0-9]+')
16 s_fc = re.compile('_[0-9a-zA-Z]*$')
19 def _get_flowcell_from_rundir(run_dir):
21 Returns flowcell string based on run_dir.
22 Returns None and logs error if flowcell can't be found.
24 junk, dirname = os.path.split(run_dir)
25 mo = s_fc.search(dirname)
27 logging.error('RunDir 2 FlowCell error: %s' % (run_dir))
30 return dirname[mo.start()+1:]
34 class Runner(rpc.XmlRpcBot):
36 Manage running pipeline jobs.
38 def __init__(self, section=None, configfile=None):
39 #if configfile is None:
40 # self.configfile = "~/.gaworkflow"
41 super(Runner, self).__init__(section, configfile)
43 self.cfg['notify_users'] = None
44 self.cfg['genome_dir'] = None
45 self.cfg['base_analysis_dir'] = None
47 self.cfg['notify_users'] = None
48 self.cfg['notify_postanalysis'] = None
50 self.conf_info_dict = {}
52 self.register_function(self.sequencingFinished)
53 #self.eventTasks.append(self.update)
56 def read_config(self, section=None, configfile=None):
57 super(Runner, self).read_config(section, configfile)
59 self.genome_dir = self._check_required_option('genome_dir')
60 self.base_analysis_dir = self._check_required_option('base_analysis_dir')
62 self.notify_users = self._parse_user_list(self.cfg['notify_users'])
63 #FIXME: process notify_postpipeline cfg
66 def _parser(self, msg, who):
68 Parse xmpp chat messages
70 help = u"I can send [start] a run, or report [status]"
71 if re.match(u"help", msg):
73 elif re.match("status", msg):
76 reply = self.getStatusReport(words[1])
78 reply = u"Status available for: %s" \
79 % (', '.join([k for k in self.conf_info_dict.keys()]))
80 elif re.match(u"start", msg):
83 self.sequencingFinished(words[1])
84 reply = u"starting run for %s" % (words[1])
86 reply = u"need runfolder name"
88 reply = u"I didn't understand '%s'" %(msg)
90 logging.debug("reply: " + str(reply))
94 def getStatusReport(self, fc_num):
96 Returns text status report for flow cell number
98 if fc_num not in self.conf_info_dict:
99 return "No record of a %s run." % (fc_num)
101 status = self.conf_info_dict[fc_num].status
104 return "No status information for %s yet." \
105 " Probably still in configure step. Try again later." % (fc_num)
107 fc,ft = status.statusFirecrest()
108 bc,bt = status.statusBustard()
109 gc,gt = status.statusGerald()
111 tc,tt = status.statusTotal()
113 fp = _percentCompleted(fc, ft)
114 bp = _percentCompleted(bc, bt)
115 gp = _percentCompleted(gc, gt)
116 tp = _percentCompleted(tc, tt)
120 output.append(u'Firecrest: %s%% (%s/%s)' % (fp, fc, ft))
121 output.append(u' Bustard: %s%% (%s/%s)' % (bp, bc, bt))
122 output.append(u' Gerald: %s%% (%s/%s)' % (gp, gc, gt))
123 output.append(u'-----------------------')
124 output.append(u' Total: %s%% (%s/%s)' % (tp, tc, tt))
126 return '\n'.join(output)
129 def sequencingFinished(self, run_dir):
131 Sequenceing (and copying) is finished, time to start pipeline
133 logging.debug("received sequencing finished message")
135 # Setup config info object
137 ci.base_analysis_dir = self.base_analysis_dir
138 ci.analysis_dir = os.path.join(self.base_analysis_dir, run_dir)
140 # get flowcell from run_dir name
141 flowcell = _get_flowcell_from_rundir(run_dir)
143 # Store ci object in dictionary
144 self.conf_info_dict[flowcell] = ci
147 # Launch the job in it's own thread and turn.
148 self.launchJob(run_dir, flowcell, ci)
152 def pipelineFinished(self, run_dir):
153 # need to strip off self.watch_dir from rundir I suspect.
154 logging.info("pipeline finished in" + str(run_dir))
155 #pattern = self.watch_dir
156 #if pattern[-1] != os.path.sep:
157 # pattern += os.path.sep
158 #stripped_run_dir = re.sub(pattern, "", run_dir)
159 #logging.debug("stripped to " + stripped_run_dir)
161 # Notify each user that the run has finished.
162 if self.notify_users is not None:
163 for u in self.notify_users:
164 self.send(u, 'Pipeline run %s finished' % (run_dir))
166 #if self.notify_runner is not None:
167 # for r in self.notify_runner:
168 # self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
170 def reportMsg(self, msg):
172 if self.notify_users is not None:
173 for u in self.notify_users:
177 def _runner(self, run_dir, flowcell, conf_info):
179 # retrieve config step
180 cfg_filepath = os.path.join(conf_info.analysis_dir,
182 status_retrieve_cfg = retrieve_config(conf_info,
186 if status_retrieve_cfg:
187 logging.info("Runner: Retrieve config: success")
188 self.reportMsg("Retrieve config (%s): success" % (run_dir))
190 logging.error("Runner: Retrieve config: failed")
191 self.reportMsg("Retrieve config (%s): FAILED" % (run_dir))
195 if status_retrieve_cfg:
196 status = configure(conf_info)
198 logging.info("Runner: Configure: success")
199 self.reportMsg("Configure (%s): success" % (run_dir))
201 os.linesep.join(glob(os.path.join(run_dir,'Data','C*')))
204 logging.error("Runner: Configure: failed")
205 self.reportMsg("Configure (%s): FAILED" % (run_dir))
207 #if successful, continue
209 # Setup status cmdline status monitor
210 #startCmdLineStatusMonitor(ci)
213 print 'Running pipeline now!'
214 run_status = run_pipeline(conf_info)
215 if run_status is True:
216 logging.info('Runner: Pipeline: success')
217 self.reportMsg("Pipeline run (%s): Finished" % (run_dir,))
219 logging.info('Runner: Pipeline: failed')
220 self.reportMsg("Pipeline run (%s): FAILED" % (run_dir))
223 def launchJob(self, run_dir, flowcell, conf_info):
225 Starts up a thread for running the pipeline
227 t = threading.Thread(target=self._runner,
228 args=[run_dir, flowcell, conf_info])
236 return bot.main(args)
238 if __name__ == "__main__":
239 sys.exit(main(sys.argv[1:]))