9 from benderjab import rpc
11 from gaworkflow.pipeline.configure_run import *
12 from gaworkflow.pipeline.monitors import startCmdLineStatusMonitor
14 #s_fc = re.compile('FC[0-9]+')
15 s_fc = re.compile('_[0-9a-zA-Z]*$')
18 def _get_flowcell_from_rundir(run_dir):
20 Returns flowcell string based on run_dir.
21 Returns None and logs error if flowcell can't be found.
23 junk, dirname = os.path.split(run_dir)
24 mo = s_fc.search(dirname)
26 logging.error('RunDir 2 FlowCell error: %s' % (run_dir))
29 return dirname[mo.start()+1:]
33 class Runner(rpc.XmlRpcBot):
35 Manage running pipeline jobs.
37 def __init__(self, section=None, configfile=None):
38 #if configfile is None:
39 # self.configfile = "~/.gaworkflow"
40 super(Runner, self).__init__(section, configfile)
42 self.cfg['notify_users'] = None
43 self.cfg['genome_dir'] = None
45 self.conf_info_dict = {}
47 self.register_function(self.sequencingFinished)
48 #self.eventTasks.append(self.update)
51 def read_config(self, section=None, configfile=None):
52 super(Runner, self).read_config(section, configfile)
54 self.genome_dir = self._check_required_option('genome_dir')
57 def _parser(self, msg, who):
59 Parse xmpp chat messages
61 help = u"I can send [start] a run, or report [status]"
62 if re.match(u"help", msg):
64 elif re.match("status", msg):
65 reply = u"not implemented"
66 elif re.match(u"start", msg):
69 self.sequencingFinished(words[1])
70 reply = u"starting run for %s" % (words[1])
72 reply = u"need runfolder name"
74 reply = u"I didn't understand '%s'" %(msg)
76 logging.debug("reply: " + str(reply))
80 def start(self, daemonize):
84 super(Runner, self).start(daemonize)
91 super(Runner, self).stop()
94 def sequencingFinished(self, run_dir):
96 Sequenceing (and copying) is finished, time to start pipeline
98 logging.debug("received sequencing finished message")
100 # Setup config info object
102 ci.run_path = run_dir
104 # get flowcell from run_dir name
105 flowcell = _get_flowcell_from_rundir(run_dir)
107 # Store ci object in dictionary
108 self.conf_info_dict[flowcell] = ci
111 # Launch the job in it's own thread and turn.
112 self.launchJob(run_dir, flowcell, ci)
115 def pipelineFinished(self, run_dir):
116 # need to strip off self.watch_dir from rundir I suspect.
117 logging.info("pipeline finished in" + str(run_dir))
118 #pattern = self.watch_dir
119 #if pattern[-1] != os.path.sep:
120 # pattern += os.path.sep
121 #stripped_run_dir = re.sub(pattern, "", run_dir)
122 #logging.debug("stripped to " + stripped_run_dir)
123 #if self.notify_users is not None:
124 # for u in self.notify_users:
125 # self.send(u, 'Sequencing run %s finished' % #(stripped_run_dir))
126 #if self.notify_runner is not None:
127 # for r in self.notify_runner:
128 # self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
131 def _runner(self, run_dir, flowcell, conf_info):
133 # retrieve config step
134 cfg_filepath = os.path.abspath('config32auto.txt')
135 status_retrieve_cfg = retrieve_config(conf_info,
139 if status_retrieve_cfg:
140 logging.info("Runner: Retrieve config: success")
142 logging.error("Runner: Retrieve config: failed")
146 if status_retrieve_cfg:
147 status = configure(conf_info)
149 logging.info("Runner: Configure: success")
151 logging.error("Runner: Configure: failed")
153 #if successful, continue
155 # Setup status cmdline status monitor
156 #startCmdLineStatusMonitor(ci)
159 print 'Running pipeline now!'
160 run_status = run_pipeline(conf_info)
161 if run_status is True:
162 logging.info('Runner: Pipeline: success')
163 self.piplineFinished(run_dir)
165 logging.info('Runner: Pipeline: failed')
168 def launchJob(self, run_dir, flowcell, conf_info):
170 Starts up a thread for running the pipeline
172 t = threading.Thread(target=self._runner,
173 args=[run_dir, flowcell, conf_info])
180 bot = Runner('demobot')
181 bot.cfg['loglevel'] = 'DEBUG'
182 return bot.main(args)
184 if __name__ == "__main__":
185 sys.exit(main(sys.argv[1:]))