From: Diane Trout Date: Tue, 15 Jan 2008 01:07:48 +0000 (+0000) Subject: move the autotmation scripts into gaworkflow.automation X-Git-Tag: 0.1.0~11 X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=commitdiff_plain;h=28e86f4b1c8fdfbaf6005f7a85f25e5c2bd7d172 move the autotmation scripts into gaworkflow.automation the scripts that were providing the tools to automate running the solexa pipeline were unfairly "priviledged" compared to the components that wrapped talking to the pipeline commands and providing the website, in that the other components were in sub-packages while the automation was just in the gaworkflow package. So I moved them into the somewhat clearer "gaworkflow.automation". The intent is that gaworkflow.automation contains modules that make things happen without human intervention. --- diff --git a/gaworkflow/automation/__init__.py b/gaworkflow/automation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gaworkflow/automation/copier.py b/gaworkflow/automation/copier.py new file mode 100644 index 0000000..2e0d3ae --- /dev/null +++ b/gaworkflow/automation/copier.py @@ -0,0 +1,245 @@ +import ConfigParser +import copy +import logging +import logging.handlers +import os +import re +import subprocess +import sys +import time +import traceback + +from benderjab import rpc + +def runfolder_validate(fname): + """ + Return True if fname looks like a runfolder name + """ + if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname): + return True + else: + return False + +class rsync(object): + def __init__(self, source, dest, pwfile): + self.pwfile = os.path.expanduser(pwfile) + self.cmd = ['/usr/bin/rsync', ] + self.cmd.append('--password-file=%s' % (self.pwfile)) + self.source_base = source + self.dest_base = dest + self.processes = {} + self.exit_code = None + + def list(self): + """Get a directory listing""" + args = copy.copy(self.cmd) + args.append(self.source_base) + + logging.debug("Rsync cmd:" + " ".join(args)) + short_process = subprocess.Popen(args, stdout=subprocess.PIPE) + return self.list_filter(short_process.stdout) + + def list_filter(self, lines): + """ + parse rsync directory listing + """ + dirs_to_copy = [] + direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ] + for permissions, size, filedate, filetime, filename in direntries: + if permissions[0] == 'd': + # hey its a directory, the first step to being something we want to + # copy + if re.match("[0-9]{6}", filename): + # it starts with something that looks like a 6 digit date + # aka good enough for me + dirs_to_copy.append(filename) + return dirs_to_copy + + def create_copy_process(self, dirname): + args = copy.copy(self.cmd) + # we want to copy everything + args.append('-rlt') + # from here + args.append(os.path.join(self.source_base, dirname)) + # to here + args.append(self.dest_base) + logging.debug("Rsync cmd:" + " ".join(args)) + return subprocess.Popen(args) + + def copy(self): + """ + copy any interesting looking directories over + return list of items that we started copying. + """ + # clean up any lingering non-running processes + self.poll() + + # what's available to copy? + dirs_to_copy = self.list() + + # lets start copying + started = [] + for d in dirs_to_copy: + process = self.processes.get(d, None) + + if process is None: + # we don't have a process, so make one + logging.info("rsyncing %s" % (d)) + self.processes[d] = self.create_copy_process(d) + started.append(d) + return started + + def poll(self): + """ + check currently running processes to see if they're done + + return path roots that have finished. + """ + for dir_key, proc_value in self.processes.items(): + retcode = proc_value.poll() + if retcode is None: + # process hasn't finished yet + pass + elif retcode == 0: + logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode)) + del self.processes[dir_key] + else: + logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode)) + + def __len__(self): + """ + Return how many active rsync processes we currently have + + Call poll first to close finished processes. + """ + return len(self.processes) + + def keys(self): + """ + Return list of current run folder names + """ + return self.processes.keys() + +class CopierBot(rpc.XmlRpcBot): + def __init__(self, section=None, configfile=None): + #if configfile is None: + # configfile = '~/.gaworkflow' + + super(CopierBot, self).__init__(section, configfile) + + # options for rsync command + self.cfg['rsync_password_file'] = None + self.cfg['rsync_source'] = None + self.cfg['rsync_destination'] = None + + # options for reporting we're done + self.cfg['notify_users'] = None + self.cfg['notify_runner'] = None + + self.pending = [] + self.rsync = None + self.notify_users = None + self.notify_runner = None + + self.register_function(self.startCopy) + self.register_function(self.sequencingFinished) + self.eventTasks.append(self.update) + + def read_config(self, section=None, configfile=None): + """ + read the config file + """ + super(CopierBot, self).read_config(section, configfile) + + password = self._check_required_option('rsync_password_file') + source = self._check_required_option('rsync_source') + destination = self._check_required_option('rsync_destination') + self.rsync = rsync(source, destination, password) + + self.notify_users = self._parse_user_list(self.cfg['notify_users']) + try: + self.notify_runner = \ + self._parse_user_list(self.cfg['notify_runner'], + require_resource=True) + except bot.JIDMissingResource: + msg = 'need a full jabber ID + resource for xml-rpc destinations' + logging.FATAL(msg) + raise bot.JIDMissingResource(msg) + + def startCopy(self, *args): + """ + start our copy + """ + logging.info("starting copy scan") + started = self.rsync.copy() + logging.info("copying:" + " ".join(started)+".") + return started + + def sequencingFinished(self, runDir, *args): + """ + The run was finished, if we're done copying, pass the message on + """ + # close any open processes + self.rsync.poll() + + # see if we're still copying + if runfolder_validate(runDir): + logging.info("recevied sequencing finshed for %s" % (runDir)) + self.pending.append(runDir) + self.startCopy() + return "PENDING" + else: + errmsg = "received bad runfolder name (%s)" % (runDir) + logging.warning(errmsg) + # maybe I should use a different error message + raise RuntimeError(errmsg) + + def reportSequencingFinished(self, runDir): + """ + Send the sequencingFinished message to the interested parties + """ + if self.notify_users is not None: + for u in self.notify_users: + self.send(u, 'Sequencing run %s finished' % (runDir)) + if self.notify_runner is not None: + for r in self.notify_runner: + self.rpc_send(r, (runDir,), 'sequencingFinished') + logging.info("forwarding sequencingFinshed message for %s" % (runDir)) + + def update(self, *args): + """ + Update our current status. + Report if we've finished copying files. + """ + self.rsync.poll() + for p in self.pending: + if p not in self.rsync.keys(): + self.reportSequencingFinished(p) + self.pending.remove(p) + + def _parser(self, msg, who): + """ + Parse xmpp chat messages + """ + help = u"I can [copy], or report current [status]" + if re.match(u"help", msg): + reply = help + elif re.match("copy", msg): + started = self.startCopy() + reply = u"started copying " + ", ".join(started) + elif re.match(u"status", msg): + msg = [u"Currently %d rsync processes are running." % (len(self.rsync))] + for d in self.rsync.keys(): + msg.append(u" " + d) + reply = os.linesep.join(msg) + else: + reply = u"I didn't understand '%s'" % (unicode(msg)) + return reply + +def main(args=None): + bot = CopierBot() + bot.main(args) + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) + diff --git a/gaworkflow/automation/runner.py b/gaworkflow/automation/runner.py new file mode 100644 index 0000000..f81b682 --- /dev/null +++ b/gaworkflow/automation/runner.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python +import logging +import os +import re +import sys +import time +import threading + +from benderjab import rpc + +from gaworkflow.pipeline.configure_run import * +from gaworkflow.pipeline.monitors import _percentCompleted + +#s_fc = re.compile('FC[0-9]+') +s_fc = re.compile('_[0-9a-zA-Z]*$') + + +def _get_flowcell_from_rundir(run_dir): + """ + Returns flowcell string based on run_dir. + Returns None and logs error if flowcell can't be found. + """ + junk, dirname = os.path.split(run_dir) + mo = s_fc.search(dirname) + if not mo: + logging.error('RunDir 2 FlowCell error: %s' % (run_dir)) + return None + + return dirname[mo.start()+1:] + + + +class Runner(rpc.XmlRpcBot): + """ + Manage running pipeline jobs. + """ + def __init__(self, section=None, configfile=None): + #if configfile is None: + # self.configfile = "~/.gaworkflow" + super(Runner, self).__init__(section, configfile) + + self.cfg['notify_users'] = None + self.cfg['genome_dir'] = None + self.cfg['base_analysis_dir'] = None + + self.cfg['notify_users'] = None + self.cfg['notify_postanalysis'] = None + + self.conf_info_dict = {} + + self.register_function(self.sequencingFinished) + #self.eventTasks.append(self.update) + + + def read_config(self, section=None, configfile=None): + super(Runner, self).read_config(section, configfile) + + self.genome_dir = self._check_required_option('genome_dir') + self.base_analysis_dir = self._check_required_option('base_analysis_dir') + + self.notify_users = self._parse_user_list(self.cfg['notify_users']) + #FIXME: process notify_postpipeline cfg + + + def _parser(self, msg, who): + """ + Parse xmpp chat messages + """ + help = u"I can send [start] a run, or report [status]" + if re.match(u"help", msg): + reply = help + elif re.match("status", msg): + words = msg.split() + if len(words) == 2: + reply = self.getStatusReport(words[1]) + else: + reply = u"Status available for: %s" \ + % (', '.join([k for k in self.conf_info_dict.keys()])) + elif re.match(u"start", msg): + words = msg.split() + if len(words) == 2: + self.sequencingFinished(words[1]) + reply = u"starting run for %s" % (words[1]) + else: + reply = u"need runfolder name" + else: + reply = u"I didn't understand '%s'" %(msg) + + logging.debug("reply: " + str(reply)) + return reply + + + def getStatusReport(self, fc_num): + """ + Returns text status report for flow cell number + """ + if fc_num not in self.conf_info_dict: + return "No record of a %s run." % (fc_num) + + status = self.conf_info_dict[fc_num].status + + if status is None: + return "No status information for %s yet." \ + " Probably still in configure step. Try again later." % (fc_num) + + fc,ft = status.statusFirecrest() + bc,bt = status.statusBustard() + gc,gt = status.statusGerald() + + tc,tt = status.statusTotal() + + fp = _percentCompleted(fc, ft) + bp = _percentCompleted(bc, bt) + gp = _percentCompleted(gc, gt) + tp = _percentCompleted(tc, tt) + + output = [] + + output.append(u'Firecrest: %s%% (%s/%s)' % (fp, fc, ft)) + output.append(u' Bustard: %s%% (%s/%s)' % (bp, bc, bt)) + output.append(u' Gerald: %s%% (%s/%s)' % (gp, gc, gt)) + output.append(u'-----------------------') + output.append(u' Total: %s%% (%s/%s)' % (tp, tc, tt)) + + return '\n'.join(output) + + + def sequencingFinished(self, run_dir): + """ + Sequenceing (and copying) is finished, time to start pipeline + """ + logging.debug("received sequencing finished message") + + # Setup config info object + ci = ConfigInfo() + ci.base_analysis_dir = self.base_analysis_dir + ci.analysis_dir = os.path.join(self.base_analysis_dir, run_dir) + + # get flowcell from run_dir name + flowcell = _get_flowcell_from_rundir(run_dir) + + # Store ci object in dictionary + self.conf_info_dict[flowcell] = ci + + + # Launch the job in it's own thread and turn. + self.launchJob(run_dir, flowcell, ci) + return "started" + + + def pipelineFinished(self, run_dir): + # need to strip off self.watch_dir from rundir I suspect. + logging.info("pipeline finished in" + str(run_dir)) + #pattern = self.watch_dir + #if pattern[-1] != os.path.sep: + # pattern += os.path.sep + #stripped_run_dir = re.sub(pattern, "", run_dir) + #logging.debug("stripped to " + stripped_run_dir) + + # Notify each user that the run has finished. + if self.notify_users is not None: + for u in self.notify_users: + self.send(u, 'Pipeline run %s finished' % (run_dir)) + + #if self.notify_runner is not None: + # for r in self.notify_runner: + # self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished') + + def reportMsg(self, msg): + + if self.notify_users is not None: + for u in self.notify_users: + self.send(u, msg) + + + def _runner(self, run_dir, flowcell, conf_info): + + # retrieve config step + cfg_filepath = os.path.join(conf_info.analysis_dir, + 'config32auto.txt') + status_retrieve_cfg = retrieve_config(conf_info, + flowcell, + cfg_filepath, + self.genome_dir) + if status_retrieve_cfg: + logging.info("Runner: Retrieve config: success") + self.reportMsg("Retrieve config (%s): success" % (run_dir)) + else: + logging.error("Runner: Retrieve config: failed") + self.reportMsg("Retrieve config (%s): FAILED" % (run_dir)) + + + # configure step + if status_retrieve_cfg: + status = configure(conf_info) + if status: + logging.info("Runner: Configure: success") + self.reportMsg("Configure (%s): success" % (run_dir)) + else: + logging.error("Runner: Configure: failed") + self.reportMsg("Configure (%s): FAILED" % (run_dir)) + + #if successful, continue + if status: + # Setup status cmdline status monitor + #startCmdLineStatusMonitor(ci) + + # running step + print 'Running pipeline now!' + run_status = run_pipeline(conf_info) + if run_status is True: + logging.info('Runner: Pipeline: success') + self.piplineFinished(run_dir) + else: + logging.info('Runner: Pipeline: failed') + self.reportMsg("Pipeline run (%s): FAILED" % (run_dir)) + + + def launchJob(self, run_dir, flowcell, conf_info): + """ + Starts up a thread for running the pipeline + """ + t = threading.Thread(target=self._runner, + args=[run_dir, flowcell, conf_info]) + t.setDaemon(True) + t.start() + + + +def main(args=None): + bot = Runner() + return bot.main(args) + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) + diff --git a/gaworkflow/automation/spoolwatcher.py b/gaworkflow/automation/spoolwatcher.py new file mode 100644 index 0000000..ba566cb --- /dev/null +++ b/gaworkflow/automation/spoolwatcher.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python +import logging +import os +import re +import sys +import time + +# this uses pyinotify +import pyinotify +from pyinotify import EventsCodes + +from benderjab import rpc + + +class Handler(pyinotify.ProcessEvent): + def __init__(self, watchmanager, bot): + self.last_event_time = None + self.watchmanager = watchmanager + self.bot = bot + + def process_IN_CREATE(self, event): + self.last_event_time = time.time() + msg = "Create: %s" % os.path.join(event.path, event.name) + if event.name.lower() == "run.completed": + try: + self.bot.sequencingFinished(event.path) + except IOError, e: + logging.error("Couldn't send sequencingFinished") + logging.debug(msg) + + def process_IN_DELETE(self, event): + logging.debug("Remove: %s" % os.path.join(event.path, event.name)) + + def process_IN_UNMOUNT(self, event): + self.bot.unmount_watch() + +class SpoolWatcher(rpc.XmlRpcBot): + """ + Watch a directory and send a message when another process is done writing. + + This monitors a directory tree using inotify (linux specific) and + after some files having been written will send a message after + seconds of no file writing. + + (Basically when the solexa machine finishes dumping a round of data + this'll hopefully send out a message saying hey look theres data available + + """ + # these params need to be in the config file + # I wonder where I should put the documentation + #:Parameters: + # `watchdir` - which directory tree to monitor for modifications + # `profile` - specify which .gaworkflow profile to use + # `write_timeout` - how many seconds to wait for writes to finish to + # the spool + # `notify_timeout` - how often to timeout from notify + + def __init__(self, section=None, configfile=None): + #if configfile is None: + # self.configfile = "~/.gaworkflow" + super(SpoolWatcher, self).__init__(section, configfile) + + self.cfg['watchdir'] = None + self.cfg['write_timeout'] = 10 + self.cfg['notify_users'] = None + self.cfg['notify_runner'] = None + + self.notify_timeout = 0.001 + self.wm = pyinotify.WatchManager() + self.handler = Handler(self.wm, self) + self.notifier = pyinotify.Notifier(self.wm, self.handler) + self.wdd = None + + self.notify_users = None + self.notify_runner = None + + self.eventTasks.append(self.process_notify) + + def read_config(self, section=None, configfile=None): + super(SpoolWatcher, self).read_config(section, configfile) + + self.watch_dir = self._check_required_option('watchdir') + self.write_timeout = int(self.cfg['write_timeout']) + + self.notify_users = self._parse_user_list(self.cfg['notify_users']) + try: + self.notify_runner = \ + self._parse_user_list(self.cfg['notify_runner'], + require_resource=True) + except bot.JIDMissingResource: + msg = 'need a full jabber ID + resource for xml-rpc destinations' + logging.FATAL(msg) + raise bot.JIDMissingResource(msg) + + def add_watch(self, watchdir=None): + """ + start watching watchdir or self.watch_dir + we're currently limited to watching one directory tree. + """ + # the one tree limit is mostly because self.wdd is a single item + # but managing it as a list might be a bit more annoying + if watchdir is None: + watchdir = self.watch_dir + logging.info("Watching:"+str(watchdir)) + mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT + # rec traverses the tree and adds all the directories that are there + # at the start. + # auto_add will add in new directories as they are created + self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True) + + def unmount_watch(self): + if self.wdd is not None: + logging.debug("disabling watch") + logging.debug(str(self.wdd)) + self.wm.rm_watch(self.wdd) + self.wdd = None + + def process_notify(self, *args): + # process the queue of events as explained above + self.notifier.process_events() + #check events waits timeout + if self.notifier.check_events(self.notify_timeout): + # read notified events and enqeue them + self.notifier.read_events() + # should we do something? + last_event_time = self.handler.last_event_time + if last_event_time is not None: + time_delta = time.time() - last_event_time + if time_delta > self.write_timeout: + self.startCopy() + self.handler.last_event_time = None + + def _parser(self, msg, who): + """ + Parse xmpp chat messages + """ + help = u"I can send [copy] message, or squencer [finished]" + if re.match(u"help", msg): + reply = help + elif re.match("copy", msg): + self.startCopy() + reply = u"sent copy message" + elif re.match(u"finished", msg): + words = msg.split() + if len(words) == 2: + self.sequencingFinished(words[1]) + reply = u"sending sequencing finished for %s" % (words[1]) + else: + reply = u"need runfolder name" + else: + reply = u"I didn't understand '%s'" %(msg) + return reply + + def start(self, daemonize): + """ + Start application + """ + self.add_watch() + super(SpoolWatcher, self).start(daemonize) + + def stop(self): + """ + shutdown application + """ + # destroy the inotify's instance on this interrupt (stop monitoring) + self.notifier.stop() + super(SpoolWatcher, self).stop() + + def startCopy(self): + logging.debug("writes seem to have stopped") + if self.notify_runner is not None: + for r in self.notify_runner: + self.rpc_send(r, tuple(), 'startCopy') + + def sequencingFinished(self, run_dir): + # need to strip off self.watch_dir from rundir I suspect. + logging.info("run.completed in " + str(run_dir)) + pattern = self.watch_dir + if pattern[-1] != os.path.sep: + pattern += os.path.sep + stripped_run_dir = re.sub(pattern, "", run_dir) + logging.debug("stripped to " + stripped_run_dir) + if self.notify_users is not None: + for u in self.notify_users: + self.send(u, 'Sequencing run %s finished' % (stripped_run_dir)) + if self.notify_runner is not None: + for r in self.notify_runner: + self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished') + +def main(args=None): + bot = SpoolWatcher() + return bot.main(args) + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) + \ No newline at end of file diff --git a/gaworkflow/copier.py b/gaworkflow/copier.py deleted file mode 100644 index 2e0d3ae..0000000 --- a/gaworkflow/copier.py +++ /dev/null @@ -1,245 +0,0 @@ -import ConfigParser -import copy -import logging -import logging.handlers -import os -import re -import subprocess -import sys -import time -import traceback - -from benderjab import rpc - -def runfolder_validate(fname): - """ - Return True if fname looks like a runfolder name - """ - if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname): - return True - else: - return False - -class rsync(object): - def __init__(self, source, dest, pwfile): - self.pwfile = os.path.expanduser(pwfile) - self.cmd = ['/usr/bin/rsync', ] - self.cmd.append('--password-file=%s' % (self.pwfile)) - self.source_base = source - self.dest_base = dest - self.processes = {} - self.exit_code = None - - def list(self): - """Get a directory listing""" - args = copy.copy(self.cmd) - args.append(self.source_base) - - logging.debug("Rsync cmd:" + " ".join(args)) - short_process = subprocess.Popen(args, stdout=subprocess.PIPE) - return self.list_filter(short_process.stdout) - - def list_filter(self, lines): - """ - parse rsync directory listing - """ - dirs_to_copy = [] - direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ] - for permissions, size, filedate, filetime, filename in direntries: - if permissions[0] == 'd': - # hey its a directory, the first step to being something we want to - # copy - if re.match("[0-9]{6}", filename): - # it starts with something that looks like a 6 digit date - # aka good enough for me - dirs_to_copy.append(filename) - return dirs_to_copy - - def create_copy_process(self, dirname): - args = copy.copy(self.cmd) - # we want to copy everything - args.append('-rlt') - # from here - args.append(os.path.join(self.source_base, dirname)) - # to here - args.append(self.dest_base) - logging.debug("Rsync cmd:" + " ".join(args)) - return subprocess.Popen(args) - - def copy(self): - """ - copy any interesting looking directories over - return list of items that we started copying. - """ - # clean up any lingering non-running processes - self.poll() - - # what's available to copy? - dirs_to_copy = self.list() - - # lets start copying - started = [] - for d in dirs_to_copy: - process = self.processes.get(d, None) - - if process is None: - # we don't have a process, so make one - logging.info("rsyncing %s" % (d)) - self.processes[d] = self.create_copy_process(d) - started.append(d) - return started - - def poll(self): - """ - check currently running processes to see if they're done - - return path roots that have finished. - """ - for dir_key, proc_value in self.processes.items(): - retcode = proc_value.poll() - if retcode is None: - # process hasn't finished yet - pass - elif retcode == 0: - logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode)) - del self.processes[dir_key] - else: - logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode)) - - def __len__(self): - """ - Return how many active rsync processes we currently have - - Call poll first to close finished processes. - """ - return len(self.processes) - - def keys(self): - """ - Return list of current run folder names - """ - return self.processes.keys() - -class CopierBot(rpc.XmlRpcBot): - def __init__(self, section=None, configfile=None): - #if configfile is None: - # configfile = '~/.gaworkflow' - - super(CopierBot, self).__init__(section, configfile) - - # options for rsync command - self.cfg['rsync_password_file'] = None - self.cfg['rsync_source'] = None - self.cfg['rsync_destination'] = None - - # options for reporting we're done - self.cfg['notify_users'] = None - self.cfg['notify_runner'] = None - - self.pending = [] - self.rsync = None - self.notify_users = None - self.notify_runner = None - - self.register_function(self.startCopy) - self.register_function(self.sequencingFinished) - self.eventTasks.append(self.update) - - def read_config(self, section=None, configfile=None): - """ - read the config file - """ - super(CopierBot, self).read_config(section, configfile) - - password = self._check_required_option('rsync_password_file') - source = self._check_required_option('rsync_source') - destination = self._check_required_option('rsync_destination') - self.rsync = rsync(source, destination, password) - - self.notify_users = self._parse_user_list(self.cfg['notify_users']) - try: - self.notify_runner = \ - self._parse_user_list(self.cfg['notify_runner'], - require_resource=True) - except bot.JIDMissingResource: - msg = 'need a full jabber ID + resource for xml-rpc destinations' - logging.FATAL(msg) - raise bot.JIDMissingResource(msg) - - def startCopy(self, *args): - """ - start our copy - """ - logging.info("starting copy scan") - started = self.rsync.copy() - logging.info("copying:" + " ".join(started)+".") - return started - - def sequencingFinished(self, runDir, *args): - """ - The run was finished, if we're done copying, pass the message on - """ - # close any open processes - self.rsync.poll() - - # see if we're still copying - if runfolder_validate(runDir): - logging.info("recevied sequencing finshed for %s" % (runDir)) - self.pending.append(runDir) - self.startCopy() - return "PENDING" - else: - errmsg = "received bad runfolder name (%s)" % (runDir) - logging.warning(errmsg) - # maybe I should use a different error message - raise RuntimeError(errmsg) - - def reportSequencingFinished(self, runDir): - """ - Send the sequencingFinished message to the interested parties - """ - if self.notify_users is not None: - for u in self.notify_users: - self.send(u, 'Sequencing run %s finished' % (runDir)) - if self.notify_runner is not None: - for r in self.notify_runner: - self.rpc_send(r, (runDir,), 'sequencingFinished') - logging.info("forwarding sequencingFinshed message for %s" % (runDir)) - - def update(self, *args): - """ - Update our current status. - Report if we've finished copying files. - """ - self.rsync.poll() - for p in self.pending: - if p not in self.rsync.keys(): - self.reportSequencingFinished(p) - self.pending.remove(p) - - def _parser(self, msg, who): - """ - Parse xmpp chat messages - """ - help = u"I can [copy], or report current [status]" - if re.match(u"help", msg): - reply = help - elif re.match("copy", msg): - started = self.startCopy() - reply = u"started copying " + ", ".join(started) - elif re.match(u"status", msg): - msg = [u"Currently %d rsync processes are running." % (len(self.rsync))] - for d in self.rsync.keys(): - msg.append(u" " + d) - reply = os.linesep.join(msg) - else: - reply = u"I didn't understand '%s'" % (unicode(msg)) - return reply - -def main(args=None): - bot = CopierBot() - bot.main(args) - -if __name__ == "__main__": - sys.exit(main(sys.argv[1:])) - diff --git a/gaworkflow/runner.py b/gaworkflow/runner.py deleted file mode 100644 index f81b682..0000000 --- a/gaworkflow/runner.py +++ /dev/null @@ -1,236 +0,0 @@ -#!/usr/bin/env python -import logging -import os -import re -import sys -import time -import threading - -from benderjab import rpc - -from gaworkflow.pipeline.configure_run import * -from gaworkflow.pipeline.monitors import _percentCompleted - -#s_fc = re.compile('FC[0-9]+') -s_fc = re.compile('_[0-9a-zA-Z]*$') - - -def _get_flowcell_from_rundir(run_dir): - """ - Returns flowcell string based on run_dir. - Returns None and logs error if flowcell can't be found. - """ - junk, dirname = os.path.split(run_dir) - mo = s_fc.search(dirname) - if not mo: - logging.error('RunDir 2 FlowCell error: %s' % (run_dir)) - return None - - return dirname[mo.start()+1:] - - - -class Runner(rpc.XmlRpcBot): - """ - Manage running pipeline jobs. - """ - def __init__(self, section=None, configfile=None): - #if configfile is None: - # self.configfile = "~/.gaworkflow" - super(Runner, self).__init__(section, configfile) - - self.cfg['notify_users'] = None - self.cfg['genome_dir'] = None - self.cfg['base_analysis_dir'] = None - - self.cfg['notify_users'] = None - self.cfg['notify_postanalysis'] = None - - self.conf_info_dict = {} - - self.register_function(self.sequencingFinished) - #self.eventTasks.append(self.update) - - - def read_config(self, section=None, configfile=None): - super(Runner, self).read_config(section, configfile) - - self.genome_dir = self._check_required_option('genome_dir') - self.base_analysis_dir = self._check_required_option('base_analysis_dir') - - self.notify_users = self._parse_user_list(self.cfg['notify_users']) - #FIXME: process notify_postpipeline cfg - - - def _parser(self, msg, who): - """ - Parse xmpp chat messages - """ - help = u"I can send [start] a run, or report [status]" - if re.match(u"help", msg): - reply = help - elif re.match("status", msg): - words = msg.split() - if len(words) == 2: - reply = self.getStatusReport(words[1]) - else: - reply = u"Status available for: %s" \ - % (', '.join([k for k in self.conf_info_dict.keys()])) - elif re.match(u"start", msg): - words = msg.split() - if len(words) == 2: - self.sequencingFinished(words[1]) - reply = u"starting run for %s" % (words[1]) - else: - reply = u"need runfolder name" - else: - reply = u"I didn't understand '%s'" %(msg) - - logging.debug("reply: " + str(reply)) - return reply - - - def getStatusReport(self, fc_num): - """ - Returns text status report for flow cell number - """ - if fc_num not in self.conf_info_dict: - return "No record of a %s run." % (fc_num) - - status = self.conf_info_dict[fc_num].status - - if status is None: - return "No status information for %s yet." \ - " Probably still in configure step. Try again later." % (fc_num) - - fc,ft = status.statusFirecrest() - bc,bt = status.statusBustard() - gc,gt = status.statusGerald() - - tc,tt = status.statusTotal() - - fp = _percentCompleted(fc, ft) - bp = _percentCompleted(bc, bt) - gp = _percentCompleted(gc, gt) - tp = _percentCompleted(tc, tt) - - output = [] - - output.append(u'Firecrest: %s%% (%s/%s)' % (fp, fc, ft)) - output.append(u' Bustard: %s%% (%s/%s)' % (bp, bc, bt)) - output.append(u' Gerald: %s%% (%s/%s)' % (gp, gc, gt)) - output.append(u'-----------------------') - output.append(u' Total: %s%% (%s/%s)' % (tp, tc, tt)) - - return '\n'.join(output) - - - def sequencingFinished(self, run_dir): - """ - Sequenceing (and copying) is finished, time to start pipeline - """ - logging.debug("received sequencing finished message") - - # Setup config info object - ci = ConfigInfo() - ci.base_analysis_dir = self.base_analysis_dir - ci.analysis_dir = os.path.join(self.base_analysis_dir, run_dir) - - # get flowcell from run_dir name - flowcell = _get_flowcell_from_rundir(run_dir) - - # Store ci object in dictionary - self.conf_info_dict[flowcell] = ci - - - # Launch the job in it's own thread and turn. - self.launchJob(run_dir, flowcell, ci) - return "started" - - - def pipelineFinished(self, run_dir): - # need to strip off self.watch_dir from rundir I suspect. - logging.info("pipeline finished in" + str(run_dir)) - #pattern = self.watch_dir - #if pattern[-1] != os.path.sep: - # pattern += os.path.sep - #stripped_run_dir = re.sub(pattern, "", run_dir) - #logging.debug("stripped to " + stripped_run_dir) - - # Notify each user that the run has finished. - if self.notify_users is not None: - for u in self.notify_users: - self.send(u, 'Pipeline run %s finished' % (run_dir)) - - #if self.notify_runner is not None: - # for r in self.notify_runner: - # self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished') - - def reportMsg(self, msg): - - if self.notify_users is not None: - for u in self.notify_users: - self.send(u, msg) - - - def _runner(self, run_dir, flowcell, conf_info): - - # retrieve config step - cfg_filepath = os.path.join(conf_info.analysis_dir, - 'config32auto.txt') - status_retrieve_cfg = retrieve_config(conf_info, - flowcell, - cfg_filepath, - self.genome_dir) - if status_retrieve_cfg: - logging.info("Runner: Retrieve config: success") - self.reportMsg("Retrieve config (%s): success" % (run_dir)) - else: - logging.error("Runner: Retrieve config: failed") - self.reportMsg("Retrieve config (%s): FAILED" % (run_dir)) - - - # configure step - if status_retrieve_cfg: - status = configure(conf_info) - if status: - logging.info("Runner: Configure: success") - self.reportMsg("Configure (%s): success" % (run_dir)) - else: - logging.error("Runner: Configure: failed") - self.reportMsg("Configure (%s): FAILED" % (run_dir)) - - #if successful, continue - if status: - # Setup status cmdline status monitor - #startCmdLineStatusMonitor(ci) - - # running step - print 'Running pipeline now!' - run_status = run_pipeline(conf_info) - if run_status is True: - logging.info('Runner: Pipeline: success') - self.piplineFinished(run_dir) - else: - logging.info('Runner: Pipeline: failed') - self.reportMsg("Pipeline run (%s): FAILED" % (run_dir)) - - - def launchJob(self, run_dir, flowcell, conf_info): - """ - Starts up a thread for running the pipeline - """ - t = threading.Thread(target=self._runner, - args=[run_dir, flowcell, conf_info]) - t.setDaemon(True) - t.start() - - - -def main(args=None): - bot = Runner() - return bot.main(args) - -if __name__ == "__main__": - sys.exit(main(sys.argv[1:])) - diff --git a/gaworkflow/spoolwatcher.py b/gaworkflow/spoolwatcher.py deleted file mode 100644 index ba566cb..0000000 --- a/gaworkflow/spoolwatcher.py +++ /dev/null @@ -1,196 +0,0 @@ -#!/usr/bin/env python -import logging -import os -import re -import sys -import time - -# this uses pyinotify -import pyinotify -from pyinotify import EventsCodes - -from benderjab import rpc - - -class Handler(pyinotify.ProcessEvent): - def __init__(self, watchmanager, bot): - self.last_event_time = None - self.watchmanager = watchmanager - self.bot = bot - - def process_IN_CREATE(self, event): - self.last_event_time = time.time() - msg = "Create: %s" % os.path.join(event.path, event.name) - if event.name.lower() == "run.completed": - try: - self.bot.sequencingFinished(event.path) - except IOError, e: - logging.error("Couldn't send sequencingFinished") - logging.debug(msg) - - def process_IN_DELETE(self, event): - logging.debug("Remove: %s" % os.path.join(event.path, event.name)) - - def process_IN_UNMOUNT(self, event): - self.bot.unmount_watch() - -class SpoolWatcher(rpc.XmlRpcBot): - """ - Watch a directory and send a message when another process is done writing. - - This monitors a directory tree using inotify (linux specific) and - after some files having been written will send a message after - seconds of no file writing. - - (Basically when the solexa machine finishes dumping a round of data - this'll hopefully send out a message saying hey look theres data available - - """ - # these params need to be in the config file - # I wonder where I should put the documentation - #:Parameters: - # `watchdir` - which directory tree to monitor for modifications - # `profile` - specify which .gaworkflow profile to use - # `write_timeout` - how many seconds to wait for writes to finish to - # the spool - # `notify_timeout` - how often to timeout from notify - - def __init__(self, section=None, configfile=None): - #if configfile is None: - # self.configfile = "~/.gaworkflow" - super(SpoolWatcher, self).__init__(section, configfile) - - self.cfg['watchdir'] = None - self.cfg['write_timeout'] = 10 - self.cfg['notify_users'] = None - self.cfg['notify_runner'] = None - - self.notify_timeout = 0.001 - self.wm = pyinotify.WatchManager() - self.handler = Handler(self.wm, self) - self.notifier = pyinotify.Notifier(self.wm, self.handler) - self.wdd = None - - self.notify_users = None - self.notify_runner = None - - self.eventTasks.append(self.process_notify) - - def read_config(self, section=None, configfile=None): - super(SpoolWatcher, self).read_config(section, configfile) - - self.watch_dir = self._check_required_option('watchdir') - self.write_timeout = int(self.cfg['write_timeout']) - - self.notify_users = self._parse_user_list(self.cfg['notify_users']) - try: - self.notify_runner = \ - self._parse_user_list(self.cfg['notify_runner'], - require_resource=True) - except bot.JIDMissingResource: - msg = 'need a full jabber ID + resource for xml-rpc destinations' - logging.FATAL(msg) - raise bot.JIDMissingResource(msg) - - def add_watch(self, watchdir=None): - """ - start watching watchdir or self.watch_dir - we're currently limited to watching one directory tree. - """ - # the one tree limit is mostly because self.wdd is a single item - # but managing it as a list might be a bit more annoying - if watchdir is None: - watchdir = self.watch_dir - logging.info("Watching:"+str(watchdir)) - mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT - # rec traverses the tree and adds all the directories that are there - # at the start. - # auto_add will add in new directories as they are created - self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True) - - def unmount_watch(self): - if self.wdd is not None: - logging.debug("disabling watch") - logging.debug(str(self.wdd)) - self.wm.rm_watch(self.wdd) - self.wdd = None - - def process_notify(self, *args): - # process the queue of events as explained above - self.notifier.process_events() - #check events waits timeout - if self.notifier.check_events(self.notify_timeout): - # read notified events and enqeue them - self.notifier.read_events() - # should we do something? - last_event_time = self.handler.last_event_time - if last_event_time is not None: - time_delta = time.time() - last_event_time - if time_delta > self.write_timeout: - self.startCopy() - self.handler.last_event_time = None - - def _parser(self, msg, who): - """ - Parse xmpp chat messages - """ - help = u"I can send [copy] message, or squencer [finished]" - if re.match(u"help", msg): - reply = help - elif re.match("copy", msg): - self.startCopy() - reply = u"sent copy message" - elif re.match(u"finished", msg): - words = msg.split() - if len(words) == 2: - self.sequencingFinished(words[1]) - reply = u"sending sequencing finished for %s" % (words[1]) - else: - reply = u"need runfolder name" - else: - reply = u"I didn't understand '%s'" %(msg) - return reply - - def start(self, daemonize): - """ - Start application - """ - self.add_watch() - super(SpoolWatcher, self).start(daemonize) - - def stop(self): - """ - shutdown application - """ - # destroy the inotify's instance on this interrupt (stop monitoring) - self.notifier.stop() - super(SpoolWatcher, self).stop() - - def startCopy(self): - logging.debug("writes seem to have stopped") - if self.notify_runner is not None: - for r in self.notify_runner: - self.rpc_send(r, tuple(), 'startCopy') - - def sequencingFinished(self, run_dir): - # need to strip off self.watch_dir from rundir I suspect. - logging.info("run.completed in " + str(run_dir)) - pattern = self.watch_dir - if pattern[-1] != os.path.sep: - pattern += os.path.sep - stripped_run_dir = re.sub(pattern, "", run_dir) - logging.debug("stripped to " + stripped_run_dir) - if self.notify_users is not None: - for u in self.notify_users: - self.send(u, 'Sequencing run %s finished' % (stripped_run_dir)) - if self.notify_runner is not None: - for r in self.notify_runner: - self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished') - -def main(args=None): - bot = SpoolWatcher() - return bot.main(args) - -if __name__ == "__main__": - sys.exit(main(sys.argv[1:])) - \ No newline at end of file diff --git a/scripts/copier b/scripts/copier index 898f685..7025d57 100644 --- a/scripts/copier +++ b/scripts/copier @@ -1,6 +1,6 @@ #!/usr/bin/env python import sys -from gaworkflow.copier import main +from gaworkflow.automation.copier import main if __name__ == "__main__": sys.exit(main(sys.argv[1:])) diff --git a/scripts/runner b/scripts/runner index 655659a..6b6da11 100644 --- a/scripts/runner +++ b/scripts/runner @@ -1,6 +1,6 @@ #!/usr/bin/env python import sys -from gaworkflow.runner import main +from gaworkflow.automation.runner import main if __name__ == "__main__": sys.exit(main(sys.argv[1:])) diff --git a/scripts/spoolwatcher b/scripts/spoolwatcher index 25a277f..269292e 100644 --- a/scripts/spoolwatcher +++ b/scripts/spoolwatcher @@ -1,6 +1,6 @@ #!/usr/bin/env python import sys -from gaworkflow.spoolwatcher import main +from gaworkflow.automation.spoolwatcher import main if __name__ == "__main__": sys.exit(main(sys.argv[1:]))