5eb625b863933b4dd5ffa43f1c934211b0126eb1
[htsworkflow.git] / gaworkflow / runner.py
1 #!/usr/bin/env python
2 import logging
3 import os
4 import re
5 import sys
6 import time
7 import threading
8
9 from benderjab import rpc
10
11 from gaworkflow.pipeline.configure_run import *
12 from gaworkflow.pipeline.monitors import startCmdLineStatusMonitor
13
14 s_fc = re.compile('FC[0-9]+')
15
16
17 def _get_flowcell_from_rundir(run_dir):
18     """
19     Returns flowcell string based on run_dir.
20     Returns None and logs error if flowcell can't be found.
21     """
22     junk, dirname = os.path.split(run_dir)
23     mo = s_fc.search(dirname)
24     if not mo:
25         logging.error('RunDir 2 FlowCell error: %s' % (run_dir))
26         return None
27
28     return dirname[mo.start():]
29     
30
31
32 class Runner(rpc.XmlRpcBot):
33     """
34     Manage running pipeline jobs.
35     """    
36     def __init__(self, section=None, configfile=None):
37         #if configfile is None:
38         #    self.configfile = "~/.gaworkflow"
39         super(Runner, self).__init__(section, configfile)
40         
41         self.cfg['notify_users'] = None
42         self.cfg['genome_dir'] = None
43
44         self.conf_info_dict = {}
45         
46         self.register_function(self.sequencingFinished)
47         #self.eventTasks.append(self.update)
48
49     
50     def read_config(self, section=None, configfile=None):
51         super(Runner, self).read_config(section, configfile)
52
53         self.genome_dir = self._check_required_option('genome_dir')
54         
55     
56     def _parser(self, msg, who):
57         """
58         Parse xmpp chat messages
59         """
60         help = u"I can send [start] a run, or report [status]"
61         if re.match(u"help", msg):
62             reply = help
63         elif re.match("status", msg):            
64             reply = u"not implemented"
65         elif re.match(u"start", msg):
66             words = msg.split()
67             if len(words) == 2:
68                 self.sequencingFinished(words[1])
69                 reply = u"starting run for %s" % (words[1])
70             else:
71                 reply = u"need runfolder name"
72         else:
73             reply = u"I didn't understand '%s'" %(msg)
74
75         logging.debug("reply: " + str(reply))
76         return reply
77
78         
79     def start(self, daemonize):
80         """
81         Start application
82         """
83         super(Runner, self).start(daemonize)
84
85         
86     def stop(self):
87         """
88         shutdown application
89         """
90         super(Runner, self).stop()
91
92             
93     def sequencingFinished(self, run_dir):
94         """
95         Sequenceing (and copying) is finished, time to start pipeline
96         """
97         logging.debug("received sequencing finished message")
98
99         # Setup config info object
100         ci = ConfigInfo()
101         ci.run_path = run_dir
102
103         # get flowcell from run_dir name
104         flowcell = _get_flowcell_from_rundir(run_dir)
105
106         # Store ci object in dictionary
107         self.conf_info_dict[flowcell] = ci
108
109
110         # Launch the job in it's own thread and turn.
111         self.launchJob(run_dir, flowcell, ci)
112         
113         
114     def pipelineFinished(self, run_dir):
115         # need to strip off self.watch_dir from rundir I suspect.
116         logging.info("pipeline finished in" + str(run_dir))
117         #pattern = self.watch_dir
118         #if pattern[-1] != os.path.sep:
119         #    pattern += os.path.sep
120         #stripped_run_dir = re.sub(pattern, "", run_dir)
121         #logging.debug("stripped to " + stripped_run_dir)
122         #if self.notify_users is not None:
123         #    for u in self.notify_users:
124         #        self.send(u, 'Sequencing run %s finished' % #(stripped_run_dir))
125         #if self.notify_runner is not None:
126         #    for r in self.notify_runner:
127         #        self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
128
129
130     def _runner(self, run_dir, flowcell, conf_info):
131         # retrieve config step
132         cfg_filepath = os.path.abspath('config32auto.txt')
133         status_retrieve_cfg = retrieve_config(conf_info,
134                                           flowcell,
135                                           cfg_filepath,
136                                           self.genome_dir)
137         if status_retrieve_cfg:
138             logging.info("Runner: Retrieve config: success")
139         else:
140             logging.error("Runner: Retrieve config: failed")
141
142         
143         # configure step
144         if status_retrieve_cfg:
145             status = configure(ci)
146             if status:
147                 logging.info("Runner: Configure: success")
148             else:
149                 logging.error("Runner: Configure: failed")
150
151             #if successful, continue
152             if status:
153                 # Setup status cmdline status monitor
154                 #startCmdLineStatusMonitor(ci)
155                 
156                 # running step
157                 print 'Running pipeline now!'
158                 run_status = run_pipeline(ci)
159                 if run_status is True:
160                     logging.info('Runner: Pipeline: success')
161                     self.piplineFinished(run_dir)
162                 else:
163                     logging.info('Runner: Pipeline: failed')
164
165
166     def launchJob(self, run_dir, flowcell, conf_info):
167         """
168         Starts up a thread for running the pipeline
169         """
170         t = threading.Thread(target=self._runner,
171                         args=[run_dir, flowcell, conf_info])
172         t.setDaemon(True)
173         t.start()
174         
175
176         
177 def main(args=None):
178     bot = Runner()
179     bot.cfg['loglevel'] = 'DEBUG'
180     return bot.main(args)
181     
182 if __name__ == "__main__":
183     sys.exit(main(sys.argv[1:]))
184