2 from __future__ import print_function
11 from benderjab import rpc
13 from htsworkflow.pipelines.configure_run import *
15 LOGGER = logging.getLogger(__name__)
17 #s_fc = re.compile('FC[0-9]+')
18 s_fc = re.compile('_[0-9a-zA-Z]*$')
21 def _get_flowcell_from_rundir(run_dir):
23 Returns flowcell string based on run_dir.
24 Returns None and logs error if flowcell can't be found.
26 junk, dirname = os.path.split(run_dir)
27 mo = s_fc.search(dirname)
29 LOGGER.error('RunDir 2 FlowCell error: %s' % (run_dir))
32 return dirname[mo.start()+1:]
36 class Runner(rpc.XmlRpcBot):
38 Manage running pipeline jobs.
40 def __init__(self, section=None, configfile=None):
41 #if configfile is None:
42 # self.configfile = "~/.htsworkflow"
43 super(Runner, self).__init__(section, configfile)
45 self.cfg['notify_users'] = None
46 self.cfg['genome_dir'] = None
47 self.cfg['base_analysis_dir'] = None
49 self.cfg['notify_users'] = None
50 self.cfg['notify_postanalysis'] = None
52 self.conf_info_dict = {}
54 self.register_function(self.sequencingFinished)
55 #self.eventTasks.append(self.update)
58 def read_config(self, section=None, configfile=None):
59 super(Runner, self).read_config(section, configfile)
61 self.genome_dir = self._check_required_option('genome_dir')
62 self.base_analysis_dir = self._check_required_option('base_analysis_dir')
64 self.notify_users = self._parse_user_list(self.cfg['notify_users'])
65 #FIXME: process notify_postpipeline cfg
68 def _parser(self, msg, who):
70 Parse xmpp chat messages
72 help = u"I can send [start] a run, or report [status]"
73 if re.match(u"help", msg):
75 elif re.match("status", msg):
78 reply = self.getStatusReport(words[1])
80 reply = u"Status available for: %s" \
81 % (', '.join([k for k in self.conf_info_dict.keys()]))
82 elif re.match(u"start", msg):
85 self.sequencingFinished(words[1])
86 reply = u"starting run for %s" % (words[1])
88 reply = u"need runfolder name"
89 elif re.match(u"path", msg):
90 reply = u"My path is: " + unicode(os.environ['PATH'])
92 reply = u"I didn't understand '%s'" %(msg)
94 LOGGER.debug("reply: " + str(reply))
98 def getStatusReport(self, fc_num):
100 Returns text status report for flow cell number
102 if fc_num not in self.conf_info_dict:
103 return "No record of a %s run." % (fc_num)
105 status = self.conf_info_dict[fc_num].status
108 return "No status information for %s yet." \
109 " Probably still in configure step. Try again later." % (fc_num)
111 output = status.statusReport()
113 return '\n'.join(output)
116 def sequencingFinished(self, run_dir):
118 Sequenceing (and copying) is finished, time to start pipeline
120 LOGGER.debug("received sequencing finished message")
122 # Setup config info object
124 ci.base_analysis_dir = self.base_analysis_dir
125 ci.analysis_dir = os.path.join(self.base_analysis_dir, run_dir)
127 # get flowcell from run_dir name
128 flowcell = _get_flowcell_from_rundir(run_dir)
130 # Store ci object in dictionary
131 self.conf_info_dict[flowcell] = ci
134 # Launch the job in it's own thread and turn.
135 self.launchJob(run_dir, flowcell, ci)
139 def pipelineFinished(self, run_dir):
140 # need to strip off self.watch_dir from rundir I suspect.
141 LOGGER.info("pipeline finished in" + str(run_dir))
142 #pattern = self.watch_dir
143 #if pattern[-1] != os.path.sep:
144 # pattern += os.path.sep
145 #stripped_run_dir = re.sub(pattern, "", run_dir)
146 #LOGGER.debug("stripped to " + stripped_run_dir)
148 # Notify each user that the run has finished.
149 if self.notify_users is not None:
150 for u in self.notify_users:
151 self.send(u, 'Pipeline run %s finished' % (run_dir))
153 #if self.notify_runner is not None:
154 # for r in self.notify_runner:
155 # self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
157 def reportMsg(self, msg):
159 if self.notify_users is not None:
160 for u in self.notify_users:
164 def _runner(self, run_dir, flowcell, conf_info):
166 # retrieve config step
167 cfg_filepath = os.path.join(conf_info.analysis_dir,
169 status_retrieve_cfg = retrieve_config(conf_info,
173 if status_retrieve_cfg:
174 LOGGER.info("Runner: Retrieve config: success")
175 self.reportMsg("Retrieve config (%s): success" % (run_dir))
177 LOGGER.error("Runner: Retrieve config: failed")
178 self.reportMsg("Retrieve config (%s): FAILED" % (run_dir))
182 if status_retrieve_cfg:
183 status = configure(conf_info)
185 LOGGER.info("Runner: Configure: success")
186 self.reportMsg("Configure (%s): success" % (run_dir))
188 os.linesep.join(glob(os.path.join(run_dir,'Data','C*')))
191 LOGGER.error("Runner: Configure: failed")
192 self.reportMsg("Configure (%s): FAILED" % (run_dir))
194 #if successful, continue
196 # Setup status cmdline status monitor
197 #startCmdLineStatusMonitor(ci)
200 print('Running pipeline now!')
201 run_status = run_pipeline(conf_info)
202 if run_status is True:
203 LOGGER.info('Runner: Pipeline: success')
204 self.reportMsg("Pipeline run (%s): Finished" % (run_dir,))
206 LOGGER.info('Runner: Pipeline: failed')
207 self.reportMsg("Pipeline run (%s): FAILED" % (run_dir))
210 def launchJob(self, run_dir, flowcell, conf_info):
212 Starts up a thread for running the pipeline
214 t = threading.Thread(target=self._runner,
215 args=[run_dir, flowcell, conf_info])
223 return bot.main(args)
225 if __name__ == "__main__":
226 sys.exit(main(sys.argv[1:]))