84e9af2d310d36c37cbdf632b4f0846adabf7e68
[htsworkflow.git] / gaworkflow / automation / runner.py
1 #!/usr/bin/env python
2 from glob import glob
3 import logging
4 import os
5 import re
6 import sys
7 import time
8 import threading
9
10 from benderjab import rpc
11
12 from gaworkflow.pipeline.configure_run import *
13 from gaworkflow.pipeline.monitors import _percentCompleted
14
15 #s_fc = re.compile('FC[0-9]+')
16 s_fc = re.compile('_[0-9a-zA-Z]*$')
17
18
19 def _get_flowcell_from_rundir(run_dir):
20     """
21     Returns flowcell string based on run_dir.
22     Returns None and logs error if flowcell can't be found.
23     """
24     junk, dirname = os.path.split(run_dir)
25     mo = s_fc.search(dirname)
26     if not mo:
27         logging.error('RunDir 2 FlowCell error: %s' % (run_dir))
28         return None
29
30     return dirname[mo.start()+1:]
31     
32
33
34 class Runner(rpc.XmlRpcBot):
35     """
36     Manage running pipeline jobs.
37     """    
38     def __init__(self, section=None, configfile=None):
39         #if configfile is None:
40         #    self.configfile = "~/.gaworkflow"
41         super(Runner, self).__init__(section, configfile)
42         
43         self.cfg['notify_users'] = None
44         self.cfg['genome_dir'] = None
45         self.cfg['base_analysis_dir'] = None
46
47         self.cfg['notify_users'] = None
48         self.cfg['notify_postanalysis'] = None
49
50         self.conf_info_dict = {}
51         
52         self.register_function(self.sequencingFinished)
53         #self.eventTasks.append(self.update)
54
55     
56     def read_config(self, section=None, configfile=None):
57         super(Runner, self).read_config(section, configfile)
58
59         self.genome_dir = self._check_required_option('genome_dir')
60         self.base_analysis_dir = self._check_required_option('base_analysis_dir')
61
62         self.notify_users = self._parse_user_list(self.cfg['notify_users'])
63         #FIXME: process notify_postpipeline cfg
64         
65     
66     def _parser(self, msg, who):
67         """
68         Parse xmpp chat messages
69         """
70         help = u"I can send [start] a run, or report [status]"
71         if re.match(u"help", msg):
72             reply = help
73         elif re.match("status", msg):
74             words = msg.split()
75             if len(words) == 2:
76                 reply = self.getStatusReport(words[1])
77             else:
78                 reply = u"Status available for: %s" \
79                         % (', '.join([k for k in self.conf_info_dict.keys()]))
80         elif re.match(u"start", msg):
81             words = msg.split()
82             if len(words) == 2:
83                 self.sequencingFinished(words[1])
84                 reply = u"starting run for %s" % (words[1])
85             else:
86                 reply = u"need runfolder name"
87         else:
88             reply = u"I didn't understand '%s'" %(msg)
89
90         logging.debug("reply: " + str(reply))
91         return reply
92
93
94     def getStatusReport(self, fc_num):
95         """
96         Returns text status report for flow cell number 
97         """
98         if fc_num not in self.conf_info_dict:
99             return "No record of a %s run." % (fc_num)
100
101         status = self.conf_info_dict[fc_num].status
102
103         if status is None:
104             return "No status information for %s yet." \
105                    " Probably still in configure step. Try again later." % (fc_num)
106
107         fc,ft = status.statusFirecrest()
108         bc,bt = status.statusBustard()
109         gc,gt = status.statusGerald()
110
111         tc,tt = status.statusTotal()
112
113         fp = _percentCompleted(fc, ft)
114         bp = _percentCompleted(bc, bt)
115         gp = _percentCompleted(gc, gt)
116         tp = _percentCompleted(tc, tt)
117
118         output = []
119
120         output.append(u'Firecrest: %s%% (%s/%s)' % (fp, fc, ft))
121         output.append(u'  Bustard: %s%% (%s/%s)' % (bp, bc, bt))
122         output.append(u'   Gerald: %s%% (%s/%s)' % (gp, gc, gt))
123         output.append(u'-----------------------')
124         output.append(u'    Total: %s%% (%s/%s)' % (tp, tc, tt))
125
126         return '\n'.join(output)
127     
128             
129     def sequencingFinished(self, run_dir):
130         """
131         Sequenceing (and copying) is finished, time to start pipeline
132         """
133         logging.debug("received sequencing finished message")
134
135         # Setup config info object
136         ci = ConfigInfo()
137         ci.base_analysis_dir = self.base_analysis_dir
138         ci.analysis_dir = os.path.join(self.base_analysis_dir, run_dir)        
139
140         # get flowcell from run_dir name
141         flowcell = _get_flowcell_from_rundir(run_dir)
142
143         # Store ci object in dictionary
144         self.conf_info_dict[flowcell] = ci
145
146
147         # Launch the job in it's own thread and turn.
148         self.launchJob(run_dir, flowcell, ci)
149         return "started"
150         
151         
152     def pipelineFinished(self, run_dir):
153         # need to strip off self.watch_dir from rundir I suspect.
154         logging.info("pipeline finished in" + str(run_dir))
155         #pattern = self.watch_dir
156         #if pattern[-1] != os.path.sep:
157         #    pattern += os.path.sep
158         #stripped_run_dir = re.sub(pattern, "", run_dir)
159         #logging.debug("stripped to " + stripped_run_dir)
160
161         # Notify each user that the run has finished.
162         if self.notify_users is not None:
163             for u in self.notify_users:
164                 self.send(u, 'Pipeline run %s finished' % (run_dir))
165                 
166         #if self.notify_runner is not None:
167         #    for r in self.notify_runner:
168         #        self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
169
170     def reportMsg(self, msg):
171
172         if self.notify_users is not None:
173             for u in self.notify_users:
174                 self.send(u, msg)
175
176
177     def _runner(self, run_dir, flowcell, conf_info):
178
179         # retrieve config step
180         cfg_filepath = os.path.join(conf_info.analysis_dir,
181                                     'config32auto.txt')
182         status_retrieve_cfg = retrieve_config(conf_info,
183                                           flowcell,
184                                           cfg_filepath,
185                                           self.genome_dir)
186         if status_retrieve_cfg:
187             logging.info("Runner: Retrieve config: success")
188             self.reportMsg("Retrieve config (%s): success" % (run_dir))
189         else:
190             logging.error("Runner: Retrieve config: failed")
191             self.reportMsg("Retrieve config (%s): FAILED" % (run_dir))
192
193         
194         # configure step
195         if status_retrieve_cfg:
196             status = configure(conf_info)
197             if status:
198                 logging.info("Runner: Configure: success")
199                 self.reportMsg("Configure (%s): success" % (run_dir))
200                 self.reportMsg(
201                     os.linesep.join(glob(os.path.join(run_dir,'Data','C*')))
202                 )
203             else:
204                 logging.error("Runner: Configure: failed")
205                 self.reportMsg("Configure (%s): FAILED" % (run_dir))
206
207             #if successful, continue
208             if status:
209                 # Setup status cmdline status monitor
210                 #startCmdLineStatusMonitor(ci)
211                 
212                 # running step
213                 print 'Running pipeline now!'
214                 run_status = run_pipeline(conf_info)
215                 if run_status is True:
216                     logging.info('Runner: Pipeline: success')
217                     self.reportMsg("Pipeline run (%s): Finished" % (run_dir,))
218                 else:
219                     logging.info('Runner: Pipeline: failed')
220                     self.reportMsg("Pipeline run (%s): FAILED" % (run_dir))
221
222
223     def launchJob(self, run_dir, flowcell, conf_info):
224         """
225         Starts up a thread for running the pipeline
226         """
227         t = threading.Thread(target=self._runner,
228                         args=[run_dir, flowcell, conf_info])
229         t.setDaemon(True)
230         t.start()
231         
232
233         
234 def main(args=None):
235     bot = Runner()
236     return bot.main(args)
237     
238 if __name__ == "__main__":
239     sys.exit(main(sys.argv[1:]))
240