9 from benderjab import rpc
11 from gaworkflow.pipeline.configure_run import *
12 from gaworkflow.pipeline.monitors import startCmdLineStatusMonitor
14 s_fc = re.compile('FC[0-9]+')
17 def _get_flowcell_from_rundir(run_dir):
19 Returns flowcell string based on run_dir.
20 Returns None and logs error if flowcell can't be found.
22 junk, dirname = os.path.split(run_dir)
23 mo = s_fc.search(dirname)
25 logging.error('RunDir 2 FlowCell error: %s' % (run_dir))
28 return dirname[mo.start():]
32 class Runner(rpc.XmlRpcBot):
34 Manage running pipeline jobs.
36 def __init__(self, section=None, configfile=None):
37 #if configfile is None:
38 # self.configfile = "~/.gaworkflow"
39 super(Runner, self).__init__(section, configfile)
41 self.cfg['notify_users'] = None
42 self.cfg['genome_dir'] = None
44 self.conf_info_dict = {}
46 self.register_function(self.sequencingFinished)
47 #self.eventTasks.append(self.update)
50 def read_config(self, section=None, configfile=None):
51 super(Runner, self).read_config(section, configfile)
53 self.genome_dir = self._check_required_option('genome_dir')
56 def _parser(self, msg, who):
58 Parse xmpp chat messages
60 help = u"I can send [start] a run, or report [status]"
61 if re.match(u"help", msg):
63 elif re.match("status", msg):
64 reply = u"not implemented"
65 elif re.match(u"start", msg):
68 self.sequencingFinished(words[1])
69 reply = u"starting run for %s" % (words[1])
71 reply = u"need runfolder name"
73 reply = u"I didn't understand '%s'" %(msg)
75 logging.debug("reply: " + str(reply))
79 def start(self, daemonize):
83 super(Runner, self).start(daemonize)
90 super(Runner, self).stop()
93 def sequencingFinished(self, run_dir):
95 Sequenceing (and copying) is finished, time to start pipeline
97 logging.debug("received sequencing finished message")
99 # Setup config info object
101 ci.run_path = run_dir
103 # get flowcell from run_dir name
104 flowcell = _get_flowcell_from_rundir(run_dir)
106 # Store ci object in dictionary
107 self.conf_info_dict[flowcell] = ci
110 # Launch the job in it's own thread and turn.
111 self.launchJob(run_dir, flowcell, ci)
114 def pipelineFinished(self, run_dir):
115 # need to strip off self.watch_dir from rundir I suspect.
116 logging.info("pipeline finished in" + str(run_dir))
117 #pattern = self.watch_dir
118 #if pattern[-1] != os.path.sep:
119 # pattern += os.path.sep
120 #stripped_run_dir = re.sub(pattern, "", run_dir)
121 #logging.debug("stripped to " + stripped_run_dir)
122 #if self.notify_users is not None:
123 # for u in self.notify_users:
124 # self.send(u, 'Sequencing run %s finished' % #(stripped_run_dir))
125 #if self.notify_runner is not None:
126 # for r in self.notify_runner:
127 # self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
130 def _runner(self, run_dir, flowcell, conf_info):
131 # retrieve config step
132 cfg_filepath = os.path.abspath('config32auto.txt')
133 status_retrieve_cfg = retrieve_config(conf_info,
137 if status_retrieve_cfg:
138 logging.info("Runner: Retrieve config: success")
140 logging.error("Runner: Retrieve config: failed")
144 if status_retrieve_cfg:
145 status = configure(ci)
147 logging.info("Runner: Configure: success")
149 logging.error("Runner: Configure: failed")
151 #if successful, continue
153 # Setup status cmdline status monitor
154 #startCmdLineStatusMonitor(ci)
157 print 'Running pipeline now!'
158 run_status = run_pipeline(ci)
159 if run_status is True:
160 logging.info('Runner: Pipeline: success')
161 self.piplineFinished(run_dir)
163 logging.info('Runner: Pipeline: failed')
166 def launchJob(self, run_dir, flowcell, conf_info):
168 Starts up a thread for running the pipeline
170 t = threading.Thread(target=self._runner,
171 args=[run_dir, flowcell, conf_info])
179 bot.cfg['loglevel'] = 'DEBUG'
180 return bot.main(args)
182 if __name__ == "__main__":
183 sys.exit(main(sys.argv[1:]))