Runner seems to work with running the pipeline when launch from within
[htsworkflow.git] / gaworkflow / runner.py
1 #!/usr/bin/env python
2 import logging
3 import os
4 import re
5 import sys
6 import time
7 import threading
8
9 from benderjab import rpc
10
11 from gaworkflow.pipeline.configure_run import *
12 from gaworkflow.pipeline.monitors import startCmdLineStatusMonitor
13
14 #s_fc = re.compile('FC[0-9]+')
15 s_fc = re.compile('_[0-9a-zA-Z]*$')
16
17
18 def _get_flowcell_from_rundir(run_dir):
19     """
20     Returns flowcell string based on run_dir.
21     Returns None and logs error if flowcell can't be found.
22     """
23     junk, dirname = os.path.split(run_dir)
24     mo = s_fc.search(dirname)
25     if not mo:
26         logging.error('RunDir 2 FlowCell error: %s' % (run_dir))
27         return None
28
29     return dirname[mo.start()+1:]
30     
31
32
33 class Runner(rpc.XmlRpcBot):
34     """
35     Manage running pipeline jobs.
36     """    
37     def __init__(self, section=None, configfile=None):
38         #if configfile is None:
39         #    self.configfile = "~/.gaworkflow"
40         super(Runner, self).__init__(section, configfile)
41         
42         self.cfg['notify_users'] = None
43         self.cfg['genome_dir'] = None
44         self.cfg['base_analysis_dir'] = None
45
46         self.conf_info_dict = {}
47         
48         self.register_function(self.sequencingFinished)
49         #self.eventTasks.append(self.update)
50
51     
52     def read_config(self, section=None, configfile=None):
53         super(Runner, self).read_config(section, configfile)
54
55         self.genome_dir = self._check_required_option('genome_dir')
56         self.base_analysis_dir = self._check_required_option('base_analysis_dir')
57         
58     
59     def _parser(self, msg, who):
60         """
61         Parse xmpp chat messages
62         """
63         help = u"I can send [start] a run, or report [status]"
64         if re.match(u"help", msg):
65             reply = help
66         elif re.match("status", msg):            
67             reply = u"not implemented"
68         elif re.match(u"start", msg):
69             words = msg.split()
70             if len(words) == 2:
71                 self.sequencingFinished(words[1])
72                 reply = u"starting run for %s" % (words[1])
73             else:
74                 reply = u"need runfolder name"
75         else:
76             reply = u"I didn't understand '%s'" %(msg)
77
78         logging.debug("reply: " + str(reply))
79         return reply
80
81         
82     def start(self, daemonize):
83         """
84         Start application
85         """
86         super(Runner, self).start(daemonize)
87
88         
89     def stop(self):
90         """
91         shutdown application
92         """
93         super(Runner, self).stop()
94
95             
96     def sequencingFinished(self, run_dir):
97         """
98         Sequenceing (and copying) is finished, time to start pipeline
99         """
100         logging.debug("received sequencing finished message")
101
102         # Setup config info object
103         ci = ConfigInfo()
104         ci.base_analysis_dir = self.base_analysis_dir
105         ci.analysis_dir = os.path.join(self.base_analysis_dir, run_dir)        
106
107         # get flowcell from run_dir name
108         flowcell = _get_flowcell_from_rundir(run_dir)
109
110         # Store ci object in dictionary
111         self.conf_info_dict[flowcell] = ci
112
113
114         # Launch the job in it's own thread and turn.
115         self.launchJob(run_dir, flowcell, ci)
116         
117         
118     def pipelineFinished(self, run_dir):
119         # need to strip off self.watch_dir from rundir I suspect.
120         logging.info("pipeline finished in" + str(run_dir))
121         #pattern = self.watch_dir
122         #if pattern[-1] != os.path.sep:
123         #    pattern += os.path.sep
124         #stripped_run_dir = re.sub(pattern, "", run_dir)
125         #logging.debug("stripped to " + stripped_run_dir)
126         #if self.notify_users is not None:
127         #    for u in self.notify_users:
128         #        self.send(u, 'Sequencing run %s finished' % #(stripped_run_dir))
129         #if self.notify_runner is not None:
130         #    for r in self.notify_runner:
131         #        self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
132
133
134     def _runner(self, run_dir, flowcell, conf_info):
135
136         # retrieve config step
137         cfg_filepath = os.path.join(conf_info.analysis_dir,
138                                     'config32auto.txt')
139         status_retrieve_cfg = retrieve_config(conf_info,
140                                           flowcell,
141                                           cfg_filepath,
142                                           self.genome_dir)
143         if status_retrieve_cfg:
144             logging.info("Runner: Retrieve config: success")
145         else:
146             logging.error("Runner: Retrieve config: failed")
147
148         
149         # configure step
150         if status_retrieve_cfg:
151             status = configure(conf_info)
152             if status:
153                 logging.info("Runner: Configure: success")
154             else:
155                 logging.error("Runner: Configure: failed")
156
157             #if successful, continue
158             if status:
159                 # Setup status cmdline status monitor
160                 #startCmdLineStatusMonitor(ci)
161                 
162                 # running step
163                 print 'Running pipeline now!'
164                 run_status = run_pipeline(conf_info)
165                 if run_status is True:
166                     logging.info('Runner: Pipeline: success')
167                     self.piplineFinished(run_dir)
168                 else:
169                     logging.info('Runner: Pipeline: failed')
170
171
172     def launchJob(self, run_dir, flowcell, conf_info):
173         """
174         Starts up a thread for running the pipeline
175         """
176         t = threading.Thread(target=self._runner,
177                         args=[run_dir, flowcell, conf_info])
178         t.setDaemon(True)
179         t.start()
180         
181
182         
183 def main(args=None):
184     bot = Runner('demobot')
185     bot.cfg['loglevel'] = 'DEBUG'
186     return bot.main(args)
187     
188 if __name__ == "__main__":
189     sys.exit(main(sys.argv[1:]))
190