Runner now reports status when user sends status request!
[htsworkflow.git] / gaworkflow / runner.py
1 #!/usr/bin/env python
2 import logging
3 import os
4 import re
5 import sys
6 import time
7 import threading
8
9 from benderjab import rpc
10
11 from gaworkflow.pipeline.configure_run import *
12 from gaworkflow.pipeline.monitors import _percentCompleted
13
14 #s_fc = re.compile('FC[0-9]+')
15 s_fc = re.compile('_[0-9a-zA-Z]*$')
16
17
18 def _get_flowcell_from_rundir(run_dir):
19     """
20     Returns flowcell string based on run_dir.
21     Returns None and logs error if flowcell can't be found.
22     """
23     junk, dirname = os.path.split(run_dir)
24     mo = s_fc.search(dirname)
25     if not mo:
26         logging.error('RunDir 2 FlowCell error: %s' % (run_dir))
27         return None
28
29     return dirname[mo.start()+1:]
30     
31
32
33 class Runner(rpc.XmlRpcBot):
34     """
35     Manage running pipeline jobs.
36     """    
37     def __init__(self, section=None, configfile=None):
38         #if configfile is None:
39         #    self.configfile = "~/.gaworkflow"
40         super(Runner, self).__init__(section, configfile)
41         
42         self.cfg['notify_users'] = None
43         self.cfg['genome_dir'] = None
44         self.cfg['base_analysis_dir'] = None
45
46         self.cfg['notify_users'] = None
47         self.cfg['notify_postanalysis'] = None
48
49         self.conf_info_dict = {}
50         
51         self.register_function(self.sequencingFinished)
52         #self.eventTasks.append(self.update)
53
54     
55     def read_config(self, section=None, configfile=None):
56         super(Runner, self).read_config(section, configfile)
57
58         self.genome_dir = self._check_required_option('genome_dir')
59         self.base_analysis_dir = self._check_required_option('base_analysis_dir')
60
61         self.notify_users = self._parse_user_list(self.cfg['notify_users'])
62         #FIXME: process notify_postpipeline cfg
63         
64     
65     def _parser(self, msg, who):
66         """
67         Parse xmpp chat messages
68         """
69         help = u"I can send [start] a run, or report [status]"
70         if re.match(u"help", msg):
71             reply = help
72         elif re.match("status", msg):
73             words = msg.split()
74             if len(words) == 2:
75                 reply = self.getStatusReport(words[1])
76             else:
77                 reply = u"Status available for: %s" \
78                         % (', '.join([k for k in self.conf_info_dict.keys()]))
79         elif re.match(u"start", msg):
80             words = msg.split()
81             if len(words) == 2:
82                 self.sequencingFinished(words[1])
83                 reply = u"starting run for %s" % (words[1])
84             else:
85                 reply = u"need runfolder name"
86         else:
87             reply = u"I didn't understand '%s'" %(msg)
88
89         logging.debug("reply: " + str(reply))
90         return reply
91
92         
93     def start(self, daemonize):
94         """
95         Start application
96         """
97         super(Runner, self).start(daemonize)
98
99         
100     def stop(self):
101         """
102         shutdown application
103         """
104         super(Runner, self).stop()
105
106
107     def getStatusReport(self, fc_num):
108         """
109         Returns text status report for flow cell number 
110         """
111         if fc_num not in self.conf_info_dict:
112             return "No record of a %s run." % (fc_num)
113
114         status = self.conf_info_dict[fc_num].status
115
116         if status is None:
117             return "No status information for %s yet." \
118                    " Probably still in configure step. Try again later." % (fc_num)
119
120         fc,ft = status.statusFirecrest()
121         bc,bt = status.statusBustard()
122         gc,gt = status.statusGerald()
123
124         tc,tt = status.statusTotal()
125
126         fp = _percentCompleted(fc, ft)
127         bp = _percentCompleted(bc, bt)
128         gp = _percentCompleted(gc, gt)
129         tp = _percentCompleted(tc, tt)
130
131         output = []
132
133         output.append(u'Firecrest: %s%% (%s/%s)' % (fp, fc, ft))
134         output.append(u'  Bustard: %s%% (%s/%s)' % (bp, bc, bt))
135         output.append(u'   Gerald: %s%% (%s/%s)' % (gp, gc, gt))
136         output.append(u'-----------------------')
137         output.append(u'    Total: %s%% (%s/%s)' % (tp, tc, tt))
138
139         return '\n'.join(output)
140     
141             
142     def sequencingFinished(self, run_dir):
143         """
144         Sequenceing (and copying) is finished, time to start pipeline
145         """
146         logging.debug("received sequencing finished message")
147
148         # Setup config info object
149         ci = ConfigInfo()
150         ci.base_analysis_dir = self.base_analysis_dir
151         ci.analysis_dir = os.path.join(self.base_analysis_dir, run_dir)        
152
153         # get flowcell from run_dir name
154         flowcell = _get_flowcell_from_rundir(run_dir)
155
156         # Store ci object in dictionary
157         self.conf_info_dict[flowcell] = ci
158
159
160         # Launch the job in it's own thread and turn.
161         self.launchJob(run_dir, flowcell, ci)
162         
163         
164     def pipelineFinished(self, run_dir):
165         # need to strip off self.watch_dir from rundir I suspect.
166         logging.info("pipeline finished in" + str(run_dir))
167         #pattern = self.watch_dir
168         #if pattern[-1] != os.path.sep:
169         #    pattern += os.path.sep
170         #stripped_run_dir = re.sub(pattern, "", run_dir)
171         #logging.debug("stripped to " + stripped_run_dir)
172
173         # Notify each user that the run has finished.
174         if self.notify_users is not None:
175             for u in self.notify_users:
176                 self.send(u, 'Pipeline run %s finished' % (run_dir))
177                 
178         #if self.notify_runner is not None:
179         #    for r in self.notify_runner:
180         #        self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
181
182     def reportMsg(self, msg):
183
184         if self.notify_users is not None:
185             for u in self.notify_users:
186                 self.send(u, msg)
187
188
189     def _runner(self, run_dir, flowcell, conf_info):
190
191         # retrieve config step
192         cfg_filepath = os.path.join(conf_info.analysis_dir,
193                                     'config32auto.txt')
194         status_retrieve_cfg = retrieve_config(conf_info,
195                                           flowcell,
196                                           cfg_filepath,
197                                           self.genome_dir)
198         if status_retrieve_cfg:
199             logging.info("Runner: Retrieve config: success")
200             self.reportMsg("Retrieve config (%s): success" % (run_dir))
201         else:
202             logging.error("Runner: Retrieve config: failed")
203             self.reportMsg("Retrieve config (%s): FAILED" % (run_dir))
204
205         
206         # configure step
207         if status_retrieve_cfg:
208             status = configure(conf_info)
209             if status:
210                 logging.info("Runner: Configure: success")
211                 self.reportMsg("Configure (%s): success" % (run_dir))
212             else:
213                 logging.error("Runner: Configure: failed")
214                 self.reportMsg("Configure (%s): FAILED" % (run_dir))
215
216             #if successful, continue
217             if status:
218                 # Setup status cmdline status monitor
219                 #startCmdLineStatusMonitor(ci)
220                 
221                 # running step
222                 print 'Running pipeline now!'
223                 run_status = run_pipeline(conf_info)
224                 if run_status is True:
225                     logging.info('Runner: Pipeline: success')
226                     self.piplineFinished(run_dir)
227                 else:
228                     logging.info('Runner: Pipeline: failed')
229                     self.reportMsg("Pipeline run (%s): FAILED" % (run_dir))
230
231
232     def launchJob(self, run_dir, flowcell, conf_info):
233         """
234         Starts up a thread for running the pipeline
235         """
236         t = threading.Thread(target=self._runner,
237                         args=[run_dir, flowcell, conf_info])
238         t.setDaemon(True)
239         t.start()
240         
241
242         
243 def main(args=None):
244     bot = Runner('demobot')
245     bot.cfg['loglevel'] = 'DEBUG'
246     return bot.main(args)
247     
248 if __name__ == "__main__":
249     sys.exit(main(sys.argv[1:]))
250