Initial port to python3
[htsworkflow.git] / htsworkflow / automation / runner.py
1 #!/usr/bin/env python
2 from glob import glob
3 import logging
4 import os
5 import re
6 import sys
7 import time
8 import threading
9
10 from benderjab import rpc
11
12 from htsworkflow.pipelines.configure_run import *
13
14 LOGGER = logging.getLogger(__name__)
15
16 #s_fc = re.compile('FC[0-9]+')
17 s_fc = re.compile('_[0-9a-zA-Z]*$')
18
19
20 def _get_flowcell_from_rundir(run_dir):
21     """
22     Returns flowcell string based on run_dir.
23     Returns None and logs error if flowcell can't be found.
24     """
25     junk, dirname = os.path.split(run_dir)
26     mo = s_fc.search(dirname)
27     if not mo:
28         LOGGER.error('RunDir 2 FlowCell error: %s' % (run_dir))
29         return None
30
31     return dirname[mo.start()+1:]
32
33
34
35 class Runner(rpc.XmlRpcBot):
36     """
37     Manage running pipeline jobs.
38     """
39     def __init__(self, section=None, configfile=None):
40         #if configfile is None:
41         #    self.configfile = "~/.htsworkflow"
42         super(Runner, self).__init__(section, configfile)
43
44         self.cfg['notify_users'] = None
45         self.cfg['genome_dir'] = None
46         self.cfg['base_analysis_dir'] = None
47
48         self.cfg['notify_users'] = None
49         self.cfg['notify_postanalysis'] = None
50
51         self.conf_info_dict = {}
52
53         self.register_function(self.sequencingFinished)
54         #self.eventTasks.append(self.update)
55
56
57     def read_config(self, section=None, configfile=None):
58         super(Runner, self).read_config(section, configfile)
59
60         self.genome_dir = self._check_required_option('genome_dir')
61         self.base_analysis_dir = self._check_required_option('base_analysis_dir')
62
63         self.notify_users = self._parse_user_list(self.cfg['notify_users'])
64         #FIXME: process notify_postpipeline cfg
65
66
67     def _parser(self, msg, who):
68         """
69         Parse xmpp chat messages
70         """
71         help = "I can send [start] a run, or report [status]"
72         if re.match("help", msg):
73             reply = help
74         elif re.match("status", msg):
75             words = msg.split()
76             if len(words) == 2:
77                 reply = self.getStatusReport(words[1])
78             else:
79                 reply = "Status available for: %s" \
80                         % (', '.join([k for k in list(self.conf_info_dict.keys())]))
81         elif re.match("start", msg):
82             words = msg.split()
83             if len(words) == 2:
84                 self.sequencingFinished(words[1])
85                 reply = "starting run for %s" % (words[1])
86             else:
87                 reply = "need runfolder name"
88         elif re.match("path", msg):
89             reply = "My path is: " + str(os.environ['PATH'])
90         else:
91             reply = "I didn't understand '%s'" %(msg)
92
93         LOGGER.debug("reply: " + str(reply))
94         return reply
95
96
97     def getStatusReport(self, fc_num):
98         """
99         Returns text status report for flow cell number
100         """
101         if fc_num not in self.conf_info_dict:
102             return "No record of a %s run." % (fc_num)
103
104         status = self.conf_info_dict[fc_num].status
105
106         if status is None:
107             return "No status information for %s yet." \
108                    " Probably still in configure step. Try again later." % (fc_num)
109
110         output = status.statusReport()
111
112         return '\n'.join(output)
113
114
115     def sequencingFinished(self, run_dir):
116         """
117         Sequenceing (and copying) is finished, time to start pipeline
118         """
119         LOGGER.debug("received sequencing finished message")
120
121         # Setup config info object
122         ci = ConfigInfo()
123         ci.base_analysis_dir = self.base_analysis_dir
124         ci.analysis_dir = os.path.join(self.base_analysis_dir, run_dir)
125
126         # get flowcell from run_dir name
127         flowcell = _get_flowcell_from_rundir(run_dir)
128
129         # Store ci object in dictionary
130         self.conf_info_dict[flowcell] = ci
131
132
133         # Launch the job in it's own thread and turn.
134         self.launchJob(run_dir, flowcell, ci)
135         return "started"
136
137
138     def pipelineFinished(self, run_dir):
139         # need to strip off self.watch_dir from rundir I suspect.
140         LOGGER.info("pipeline finished in" + str(run_dir))
141         #pattern = self.watch_dir
142         #if pattern[-1] != os.path.sep:
143         #    pattern += os.path.sep
144         #stripped_run_dir = re.sub(pattern, "", run_dir)
145         #LOGGER.debug("stripped to " + stripped_run_dir)
146
147         # Notify each user that the run has finished.
148         if self.notify_users is not None:
149             for u in self.notify_users:
150                 self.send(u, 'Pipeline run %s finished' % (run_dir))
151
152         #if self.notify_runner is not None:
153         #    for r in self.notify_runner:
154         #        self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
155
156     def reportMsg(self, msg):
157
158         if self.notify_users is not None:
159             for u in self.notify_users:
160                 self.send(u, msg)
161
162
163     def _runner(self, run_dir, flowcell, conf_info):
164
165         # retrieve config step
166         cfg_filepath = os.path.join(conf_info.analysis_dir,
167                                     'config-auto.txt')
168         status_retrieve_cfg = retrieve_config(conf_info,
169                                           flowcell,
170                                           cfg_filepath,
171                                           self.genome_dir)
172         if status_retrieve_cfg:
173             LOGGER.info("Runner: Retrieve config: success")
174             self.reportMsg("Retrieve config (%s): success" % (run_dir))
175         else:
176             LOGGER.error("Runner: Retrieve config: failed")
177             self.reportMsg("Retrieve config (%s): FAILED" % (run_dir))
178
179
180         # configure step
181         if status_retrieve_cfg:
182             status = configure(conf_info)
183             if status:
184                 LOGGER.info("Runner: Configure: success")
185                 self.reportMsg("Configure (%s): success" % (run_dir))
186                 self.reportMsg(
187                     os.linesep.join(glob(os.path.join(run_dir,'Data','C*')))
188                 )
189             else:
190                 LOGGER.error("Runner: Configure: failed")
191                 self.reportMsg("Configure (%s): FAILED" % (run_dir))
192
193             #if successful, continue
194             if status:
195                 # Setup status cmdline status monitor
196                 #startCmdLineStatusMonitor(ci)
197
198                 # running step
199                 print('Running pipeline now!')
200                 run_status = run_pipeline(conf_info)
201                 if run_status is True:
202                     LOGGER.info('Runner: Pipeline: success')
203                     self.reportMsg("Pipeline run (%s): Finished" % (run_dir,))
204                 else:
205                     LOGGER.info('Runner: Pipeline: failed')
206                     self.reportMsg("Pipeline run (%s): FAILED" % (run_dir))
207
208
209     def launchJob(self, run_dir, flowcell, conf_info):
210         """
211         Starts up a thread for running the pipeline
212         """
213         t = threading.Thread(target=self._runner,
214                         args=[run_dir, flowcell, conf_info])
215         t.setDaemon(True)
216         t.start()
217
218
219
220 def main(args=None):
221     bot = Runner()
222     return bot.main(args)
223
224 if __name__ == "__main__":
225     sys.exit(main(sys.argv[1:]))
226