convert several not covered by unit-test modules to use print function
[htsworkflow.git] / htsworkflow / automation / runner.py
1 #!/usr/bin/env python
2 from __future__ import print_function
3 from glob import glob
4 import logging
5 import os
6 import re
7 import sys
8 import time
9 import threading
10
11 from benderjab import rpc
12
13 from htsworkflow.pipelines.configure_run import *
14
15 LOGGER = logging.getLogger(__name__)
16
17 #s_fc = re.compile('FC[0-9]+')
18 s_fc = re.compile('_[0-9a-zA-Z]*$')
19
20
21 def _get_flowcell_from_rundir(run_dir):
22     """
23     Returns flowcell string based on run_dir.
24     Returns None and logs error if flowcell can't be found.
25     """
26     junk, dirname = os.path.split(run_dir)
27     mo = s_fc.search(dirname)
28     if not mo:
29         LOGGER.error('RunDir 2 FlowCell error: %s' % (run_dir))
30         return None
31
32     return dirname[mo.start()+1:]
33
34
35
36 class Runner(rpc.XmlRpcBot):
37     """
38     Manage running pipeline jobs.
39     """
40     def __init__(self, section=None, configfile=None):
41         #if configfile is None:
42         #    self.configfile = "~/.htsworkflow"
43         super(Runner, self).__init__(section, configfile)
44
45         self.cfg['notify_users'] = None
46         self.cfg['genome_dir'] = None
47         self.cfg['base_analysis_dir'] = None
48
49         self.cfg['notify_users'] = None
50         self.cfg['notify_postanalysis'] = None
51
52         self.conf_info_dict = {}
53
54         self.register_function(self.sequencingFinished)
55         #self.eventTasks.append(self.update)
56
57
58     def read_config(self, section=None, configfile=None):
59         super(Runner, self).read_config(section, configfile)
60
61         self.genome_dir = self._check_required_option('genome_dir')
62         self.base_analysis_dir = self._check_required_option('base_analysis_dir')
63
64         self.notify_users = self._parse_user_list(self.cfg['notify_users'])
65         #FIXME: process notify_postpipeline cfg
66
67
68     def _parser(self, msg, who):
69         """
70         Parse xmpp chat messages
71         """
72         help = u"I can send [start] a run, or report [status]"
73         if re.match(u"help", msg):
74             reply = help
75         elif re.match("status", msg):
76             words = msg.split()
77             if len(words) == 2:
78                 reply = self.getStatusReport(words[1])
79             else:
80                 reply = u"Status available for: %s" \
81                         % (', '.join([k for k in self.conf_info_dict.keys()]))
82         elif re.match(u"start", msg):
83             words = msg.split()
84             if len(words) == 2:
85                 self.sequencingFinished(words[1])
86                 reply = u"starting run for %s" % (words[1])
87             else:
88                 reply = u"need runfolder name"
89         elif re.match(u"path", msg):
90             reply = u"My path is: " + unicode(os.environ['PATH'])
91         else:
92             reply = u"I didn't understand '%s'" %(msg)
93
94         LOGGER.debug("reply: " + str(reply))
95         return reply
96
97
98     def getStatusReport(self, fc_num):
99         """
100         Returns text status report for flow cell number
101         """
102         if fc_num not in self.conf_info_dict:
103             return "No record of a %s run." % (fc_num)
104
105         status = self.conf_info_dict[fc_num].status
106
107         if status is None:
108             return "No status information for %s yet." \
109                    " Probably still in configure step. Try again later." % (fc_num)
110
111         output = status.statusReport()
112
113         return '\n'.join(output)
114
115
116     def sequencingFinished(self, run_dir):
117         """
118         Sequenceing (and copying) is finished, time to start pipeline
119         """
120         LOGGER.debug("received sequencing finished message")
121
122         # Setup config info object
123         ci = ConfigInfo()
124         ci.base_analysis_dir = self.base_analysis_dir
125         ci.analysis_dir = os.path.join(self.base_analysis_dir, run_dir)
126
127         # get flowcell from run_dir name
128         flowcell = _get_flowcell_from_rundir(run_dir)
129
130         # Store ci object in dictionary
131         self.conf_info_dict[flowcell] = ci
132
133
134         # Launch the job in it's own thread and turn.
135         self.launchJob(run_dir, flowcell, ci)
136         return "started"
137
138
139     def pipelineFinished(self, run_dir):
140         # need to strip off self.watch_dir from rundir I suspect.
141         LOGGER.info("pipeline finished in" + str(run_dir))
142         #pattern = self.watch_dir
143         #if pattern[-1] != os.path.sep:
144         #    pattern += os.path.sep
145         #stripped_run_dir = re.sub(pattern, "", run_dir)
146         #LOGGER.debug("stripped to " + stripped_run_dir)
147
148         # Notify each user that the run has finished.
149         if self.notify_users is not None:
150             for u in self.notify_users:
151                 self.send(u, 'Pipeline run %s finished' % (run_dir))
152
153         #if self.notify_runner is not None:
154         #    for r in self.notify_runner:
155         #        self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
156
157     def reportMsg(self, msg):
158
159         if self.notify_users is not None:
160             for u in self.notify_users:
161                 self.send(u, msg)
162
163
164     def _runner(self, run_dir, flowcell, conf_info):
165
166         # retrieve config step
167         cfg_filepath = os.path.join(conf_info.analysis_dir,
168                                     'config-auto.txt')
169         status_retrieve_cfg = retrieve_config(conf_info,
170                                           flowcell,
171                                           cfg_filepath,
172                                           self.genome_dir)
173         if status_retrieve_cfg:
174             LOGGER.info("Runner: Retrieve config: success")
175             self.reportMsg("Retrieve config (%s): success" % (run_dir))
176         else:
177             LOGGER.error("Runner: Retrieve config: failed")
178             self.reportMsg("Retrieve config (%s): FAILED" % (run_dir))
179
180
181         # configure step
182         if status_retrieve_cfg:
183             status = configure(conf_info)
184             if status:
185                 LOGGER.info("Runner: Configure: success")
186                 self.reportMsg("Configure (%s): success" % (run_dir))
187                 self.reportMsg(
188                     os.linesep.join(glob(os.path.join(run_dir,'Data','C*')))
189                 )
190             else:
191                 LOGGER.error("Runner: Configure: failed")
192                 self.reportMsg("Configure (%s): FAILED" % (run_dir))
193
194             #if successful, continue
195             if status:
196                 # Setup status cmdline status monitor
197                 #startCmdLineStatusMonitor(ci)
198
199                 # running step
200                 print('Running pipeline now!')
201                 run_status = run_pipeline(conf_info)
202                 if run_status is True:
203                     LOGGER.info('Runner: Pipeline: success')
204                     self.reportMsg("Pipeline run (%s): Finished" % (run_dir,))
205                 else:
206                     LOGGER.info('Runner: Pipeline: failed')
207                     self.reportMsg("Pipeline run (%s): FAILED" % (run_dir))
208
209
210     def launchJob(self, run_dir, flowcell, conf_info):
211         """
212         Starts up a thread for running the pipeline
213         """
214         t = threading.Thread(target=self._runner,
215                         args=[run_dir, flowcell, conf_info])
216         t.setDaemon(True)
217         t.start()
218
219
220
221 def main(args=None):
222     bot = Runner()
223     return bot.main(args)
224
225 if __name__ == "__main__":
226     sys.exit(main(sys.argv[1:]))