[project @ gaworkflow.runner progress]
[htsworkflow.git] / gaworkflow / runner.py
1 #!/usr/bin/env python
2 import logging
3 import os
4 import re
5 import sys
6 import time
7 import threading
8
9 from benderjab import rpc
10
11 from gaworkflow.pipeline.configure_run import *
12 from gaworkflow.pipeline.monitors import startCmdLineStatusMonitor
13
14 s_fc = re.compile('FC[0-9]+')
15
16
17 def _get_flowcell_from_rundir(run_dir):
18     """
19     Returns flowcell string based on run_dir.
20     Returns None and logs error if flowcell can't be found.
21     """
22     junk, dirname = os.path.split(run_dir)
23     mo = s_fc.search(dirname)
24     if not mo:
25         logging.error('RunDir 2 FlowCell error: %s' % (run_dir))
26         return None
27
28     return dirname[mo.start():]
29     
30
31 def _runner():
32
33
34 class Runner(rpc.XmlRpcBot):
35     """
36     Manage running pipeline jobs.
37     """    
38     def __init__(self, section=None, configfile=None):
39         #if configfile is None:
40         #    self.configfile = "~/.gaworkflow"
41         super(Runner, self).__init__(section, configfile)
42         
43         self.cfg['notify_users'] = None
44         self.cfg['genome_dir'] = None
45
46         self.conf_info_dict = {}
47         
48         self.register_function(self.sequencingFinished)
49         self.eventTasks.append(self.update)
50     
51     def read_config(self, section=None, configfile=None):
52         super(Runner, self).read_config(section, configfile)
53
54         self.genome_dir = self._check_required_option('genome_dir')
55         
56     
57     def _parser(self, msg, who):
58         """
59         Parse xmpp chat messages
60         """
61         help = u"I can send [start] a run, or report [status]"
62         if re.match(u"help", msg):
63             reply = help
64         elif re.match("status", msg):            
65             reply = u"not implemented"
66         elif re.match(u"start", msg):
67             words = msg.split()
68             if len(words) == 2:
69                 self.sequencingFinished(words[1])
70                 reply = u"starting run for %s" % (words[1])
71             else:
72                 reply = u"need runfolder name"
73         else:
74             reply = u"I didn't understand '%s'" %(msg)            
75         return reply
76         
77     def start(self, daemonize):
78         """
79         Start application
80         """
81         super(Runner, self).start(daemonize)
82         
83     def stop(self):
84         """
85         shutdown application
86         """
87         super(Runner, self).stop()
88             
89     def sequencingFinished(self, run_dir):
90         """
91         Sequenceing (and copying) is finished, time to start pipeline
92         """
93         logging.debug("received sequencing finished message")
94
95         # Setup config info object
96         ci = ConfigInfo()
97         ci.run_path = run_dir
98
99         # get flowcell from run_dir name
100         flowcell = _get_flowcell_from_rundir(run_dir)
101
102         # Store ci object in dictionary
103         self.conf_info_dict[flowcell] = ci
104
105
106         # Launch the job in it's own thread and turn.
107         self.launchJob(run_dir, flowcell, ci)
108         
109
110         
111     def pipelineFinished(self, run_dir):
112         # need to strip off self.watch_dir from rundir I suspect.
113         logging.info("pipeline finished in" + str(run_dir))
114         #pattern = self.watch_dir
115         #if pattern[-1] != os.path.sep:
116         #    pattern += os.path.sep
117         #stripped_run_dir = re.sub(pattern, "", run_dir)
118         #logging.debug("stripped to " + stripped_run_dir)
119         #if self.notify_users is not None:
120         #    for u in self.notify_users:
121         #        self.send(u, 'Sequencing run %s finished' % #(stripped_run_dir))
122         #if self.notify_runner is not None:
123         #    for r in self.notify_runner:
124         #        self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
125
126
127     def _runner(self, run_dir, flowcell, conf_info):
128         # retrieve config step
129         cfg_filepath = os.path.abspath('config32auto.txt')
130         status_retrieve_cfg = retrieve_config(ci,
131                                           flowcell,
132                                           cfg_filepath,
133                                           self.genome_dir)
134         if status_retrieve_cfg:
135             logging.info("Runner: Retrieve config: success")
136         else:
137             logging.error("Runner: Retrieve config: failed")
138
139         
140         # configure step
141         if status_retrieve_cfg:
142             status = configure(ci)
143             if status:
144                 logging.info("Runner: Configure: success")
145             else:
146                 logging.error("Runner: Configure: failed")
147
148             #if successful, continue
149             if status:
150                 # Setup status cmdline status monitor
151                 #startCmdLineStatusMonitor(ci)
152                 
153                 # running step
154                 print 'Running pipeline now!'
155                 run_status = run_pipeline(ci)
156                 if run_status is True:
157                     logging.info('Runner: Pipeline: success')
158                     self.piplineFinished(run_dir)
159                 else:
160                     logging.info('Runner: Pipeline: failed')
161
162
163     def launchJob(self, run_dir, flowcell, conf_info):
164         """
165         Starts up a thread for running the pipeline
166         """
167         t = threading.Thread(target=self._runner,
168                         args=[run_dir, flowcell, conf_info])
169         t.setDaemon(True)
170         t.start()
171         
172
173         
174 def main(args=None):
175     bot = Runner()
176     return bot.main(args)
177     
178 if __name__ == "__main__":
179     sys.exit(main(sys.argv[1:]))
180