--- /dev/null
+#!/usr/bin/env python
+from glob import glob
+import logging
+import os
+import re
+import sys
+import time
+import threading
+
+from benderjab import rpc
+
+#from htswdataprod.configure_run import *
+#from htswdataprod.monitors import _percentCompleted
+
+#s_fc = re.compile('FC[0-9]+')
+s_fc = re.compile('_[0-9a-zA-Z]*$')
+
+
+class Runner(rpc.XmlRpcBot):
+ """
+ Manage running pipeline jobs.
+ """
+ def __init__(self, section=None, configfile=None):
+ #if configfile is None:
+ # self.configfile = "~/.gaworkflow"
+ super(Runner, self).__init__(section, configfile)
+
+ self.cfg['notify_users'] = None
+ self.cfg['genome_dir'] = None
+ self.cfg['base_analysis_dir'] = None
+
+ #self.cfg['notify_users'] = None
+ self.cfg['notify_postanalysis'] = None
+
+ self.conf_info_dict = {}
+
+ self.register_function(self.sequencingFinished)
+ #self.eventTasks.append(self.update)
+
+
+ def read_config(self, section=None, configfile=None):
+ super(Runner, self).read_config(section, configfile)
+
+ self.genome_dir = self._check_required_option('genome_dir')
+ self.base_analysis_dir = self._check_required_option('base_analysis_dir')
+
+ self.notify_users = self._parse_user_list(self.cfg['notify_users'])
+ #FIXME: process notify_postpipeline cfg
+
+
+ def _parser(self, msg, who):
+ """
+ Parse xmpp chat messages
+ """
+ ################
+ # Help response
+ help = u"I can send [start] a run, or report [status]"
+ if re.match(u"help", msg):
+ reply = help
+
+ ################
+ # Status response
+ elif re.match("status", msg):
+ words = msg.split()
+ #if len(words) == 2:
+ # reply = self.getStatusReport(words[1])
+ #else:
+ # reply = u"Status available for: %s" \
+ # % (', '.join([k for k in self.conf_info_dict.keys()]))
+ return 'Status not implemented yet.'
+
+ ###############
+ # Start run
+ elif re.match(u"start", msg):
+ words = msg.split()
+ if len(words) == 2:
+ self.sequencingFinished(words[1])
+ reply = u"starting run for %s" % (words[1])
+ else:
+ reply = u"need runfolder name"
+ else:
+ reply = u"I didn't understand '%s'" %(msg)
+
+ logging.debug("reply: " + str(reply))
+ return reply
+
+
+ def getStatusReport(self, fc_num):
+ """
+ Returns text status report for flow cell number
+ """
+
+ return 'StatusReport: Not implemented'
+
+
+ def sequencingFinished(self, run_dir):
+ """
+ Sequenceing (and copying) is finished, time to start pipeline
+
+ xmlrpc call from another bot
+ """
+ logging.debug("received sequencing finished message")
+
+ self.launchJob(run_dir, flowcell, ci)
+ return "started"
+
+
+ def pipelineFinished(self, run_dir):
+ # need to strip off self.watch_dir from rundir I suspect.
+ logging.info("pipeline finished in" + str(run_dir))
+
+ # Notify each user that the run has finished.
+ if self.notify_users is not None:
+ for u in self.notify_users:
+ self.send(u, 'Pipeline run %s finished' % (run_dir))
+
+ #if self.notify_runner is not None:
+ # for r in self.notify_runner:
+ # self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
+
+ def reportMsg(self, msg):
+
+ if self.notify_users is not None:
+ for u in self.notify_users:
+ self.send(u, msg)
+
+
+ def _runner(self, run_dir, flowcell, conf_info):
+
+ # retrieve config step
+ # example error message
+ #self.reportMsg("Retrieve config (%s): FAILED" % (run_dir))
+
+
+ # configure step
+ if status_retrieve_cfg:
+ status = configure(conf_info)
+ if status:
+ logging.info("Runner: Configure: success")
+ self.reportMsg("Configure (%s): success" % (run_dir))
+ self.reportMsg(
+ os.linesep.join(glob(os.path.join(run_dir,'Data','C*')))
+ )
+ else:
+ logging.error("Runner: Configure: failed")
+ self.reportMsg("Configure (%s): FAILED" % (run_dir))
+
+ #if successful, continue
+ if status:
+ # Setup status cmdline status monitor
+ #startCmdLineStatusMonitor(ci)
+
+ # running step
+ print 'Running pipeline now!'
+ run_status = run_pipeline(conf_info)
+ if run_status is True:
+ logging.info('Runner: Pipeline: success')
+ self.reportMsg("Pipeline run (%s): Finished" % (run_dir,))
+ else:
+ logging.info('Runner: Pipeline: failed')
+ self.reportMsg("Pipeline run (%s): FAILED" % (run_dir))
+
+
+ def launchJob(self, run_dir, flowcell, conf_info):
+ """
+ Starts up a thread for running the pipeline
+ """
+ t = threading.Thread(target=self._runner,
+ args=[run_dir, flowcell, conf_info])
+ t.setDaemon(True)
+ t.start()
+
+
+
+def main(args=None):
+ bot = Runner()
+ return bot.main(args)
+
+if __name__ == "__main__":
+ sys.exit(main(sys.argv[1:]))
+