from benderjab import rpc
from gaworkflow.pipeline.configure_run import *
-from gaworkflow.pipeline.monitors import startCmdLineStatusMonitor
+from gaworkflow.pipeline.monitors import _percentCompleted
-s_fc = re.compile('FC[0-9]+')
+#s_fc = re.compile('FC[0-9]+')
+s_fc = re.compile('_[0-9a-zA-Z]*$')
def _get_flowcell_from_rundir(run_dir):
logging.error('RunDir 2 FlowCell error: %s' % (run_dir))
return None
- return dirname[mo.start():]
+ return dirname[mo.start()+1:]
self.cfg['notify_users'] = None
self.cfg['genome_dir'] = None
+ self.cfg['base_analysis_dir'] = None
+
+ self.cfg['notify_users'] = None
+ self.cfg['notify_postanalysis'] = None
self.conf_info_dict = {}
super(Runner, self).read_config(section, configfile)
self.genome_dir = self._check_required_option('genome_dir')
+ self.base_analysis_dir = self._check_required_option('base_analysis_dir')
+
+ self.notify_users = self._parse_user_list(self.cfg['notify_users'])
+ #FIXME: process notify_postpipeline cfg
def _parser(self, msg, who):
help = u"I can send [start] a run, or report [status]"
if re.match(u"help", msg):
reply = help
- elif re.match("status", msg):
- reply = u"not implemented"
+ elif re.match("status", msg):
+ words = msg.split()
+ if len(words) == 2:
+ reply = self.getStatusReport(words[1])
+ else:
+ reply = u"Status available for: %s" \
+ % (', '.join([k for k in self.conf_info_dict.keys()]))
elif re.match(u"start", msg):
words = msg.split()
if len(words) == 2:
logging.debug("reply: " + str(reply))
return reply
-
- def start(self, daemonize):
- """
- Start application
- """
- super(Runner, self).start(daemonize)
-
- def stop(self):
+ def getStatusReport(self, fc_num):
"""
- shutdown application
+ Returns text status report for flow cell number
"""
- super(Runner, self).stop()
+ if fc_num not in self.conf_info_dict:
+ return "No record of a %s run." % (fc_num)
+
+ status = self.conf_info_dict[fc_num].status
+
+ if status is None:
+ return "No status information for %s yet." \
+ " Probably still in configure step. Try again later." % (fc_num)
+ fc,ft = status.statusFirecrest()
+ bc,bt = status.statusBustard()
+ gc,gt = status.statusGerald()
+
+ tc,tt = status.statusTotal()
+
+ fp = _percentCompleted(fc, ft)
+ bp = _percentCompleted(bc, bt)
+ gp = _percentCompleted(gc, gt)
+ tp = _percentCompleted(tc, tt)
+
+ output = []
+
+ output.append(u'Firecrest: %s%% (%s/%s)' % (fp, fc, ft))
+ output.append(u' Bustard: %s%% (%s/%s)' % (bp, bc, bt))
+ output.append(u' Gerald: %s%% (%s/%s)' % (gp, gc, gt))
+ output.append(u'-----------------------')
+ output.append(u' Total: %s%% (%s/%s)' % (tp, tc, tt))
+
+ return '\n'.join(output)
+
def sequencingFinished(self, run_dir):
"""
# Setup config info object
ci = ConfigInfo()
- ci.run_path = run_dir
+ ci.base_analysis_dir = self.base_analysis_dir
+ ci.analysis_dir = os.path.join(self.base_analysis_dir, run_dir)
# get flowcell from run_dir name
flowcell = _get_flowcell_from_rundir(run_dir)
# Launch the job in it's own thread and turn.
self.launchJob(run_dir, flowcell, ci)
+ return "started"
def pipelineFinished(self, run_dir):
# pattern += os.path.sep
#stripped_run_dir = re.sub(pattern, "", run_dir)
#logging.debug("stripped to " + stripped_run_dir)
- #if self.notify_users is not None:
- # for u in self.notify_users:
- # self.send(u, 'Sequencing run %s finished' % #(stripped_run_dir))
+
+ # Notify each user that the run has finished.
+ if self.notify_users is not None:
+ for u in self.notify_users:
+ self.send(u, 'Pipeline run %s finished' % (run_dir))
+
#if self.notify_runner is not None:
# for r in self.notify_runner:
# self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
+ def reportMsg(self, msg):
+
+ if self.notify_users is not None:
+ for u in self.notify_users:
+ self.send(u, msg)
+
def _runner(self, run_dir, flowcell, conf_info):
+
# retrieve config step
- cfg_filepath = os.path.abspath('config32auto.txt')
+ cfg_filepath = os.path.join(conf_info.analysis_dir,
+ 'config32auto.txt')
status_retrieve_cfg = retrieve_config(conf_info,
flowcell,
cfg_filepath,
self.genome_dir)
if status_retrieve_cfg:
logging.info("Runner: Retrieve config: success")
+ self.reportMsg("Retrieve config (%s): success" % (run_dir))
else:
logging.error("Runner: Retrieve config: failed")
+ self.reportMsg("Retrieve config (%s): FAILED" % (run_dir))
# configure step
if status_retrieve_cfg:
- status = configure(ci)
+ status = configure(conf_info)
if status:
logging.info("Runner: Configure: success")
+ self.reportMsg("Configure (%s): success" % (run_dir))
else:
logging.error("Runner: Configure: failed")
+ self.reportMsg("Configure (%s): FAILED" % (run_dir))
#if successful, continue
if status:
# running step
print 'Running pipeline now!'
- run_status = run_pipeline(ci)
+ run_status = run_pipeline(conf_info)
if run_status is True:
logging.info('Runner: Pipeline: success')
self.piplineFinished(run_dir)
else:
logging.info('Runner: Pipeline: failed')
+ self.reportMsg("Pipeline run (%s): FAILED" % (run_dir))
def launchJob(self, run_dir, flowcell, conf_info):
def main(args=None):
bot = Runner()
- bot.cfg['loglevel'] = 'DEBUG'
return bot.main(args)
if __name__ == "__main__":