From: Diane Trout Date: Tue, 20 Nov 2007 01:04:19 +0000 (+0000) Subject: [project @ rename python module to gaworkflow from uashelper] X-Git-Tag: 0.1.0~57 X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=commitdiff_plain;h=ab1f319fe1a80b5ba4d9ccb5c95812cf23455c94 [project @ rename python module to gaworkflow from uashelper] --- diff --git a/gaworkflow/__init__.py b/gaworkflow/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gaworkflow/copier.py b/gaworkflow/copier.py new file mode 100644 index 0000000..2655320 --- /dev/null +++ b/gaworkflow/copier.py @@ -0,0 +1,106 @@ +import copy +import logging +import logging.handlers +import os +import re +import subprocess +import sys +import time +import traceback + +from benderjab.bot import BenderFactory + +class rsync(object): + + def __init__(self, pwfile): + self.pwfile = pwfile + self.cmd = ['/usr/bin/rsync', ] + self.cmd.append('--password-file=%s' % (pwfile)) + self.source_base = 'rsync://sequencer@jumpgate.caltech.edu:8730/sequencer/' + self.dest_base = '/home/diane/gec/' + self.processes = {} + self.exit_code = None + + def list(self): + """Get a directory listing""" + dirs_to_copy = [] + args = copy.copy(self.cmd) + args.append(self.source_base) + + logging.debug("Rsync cmd:" + " ".join(args)) + short_process = subprocess.Popen(args, stdout=subprocess.PIPE) + direntries = [ x.split() for x in short_process.stdout ] + for permissions, size, filedate, filetime, filename in direntries: + if permissions[0] == 'd': + # hey its a directory, the first step to being something we want to + # copy + if re.match("[0-9]{6}", filename): + # it starts with something that looks like a 6 digit date + # aka good enough for me + dirs_to_copy.append(filename) + return dirs_to_copy + + def create_copy_process(self, dirname): + args = copy.copy(self.cmd) + # we want to copy everything + args.append('-rlt') + # from here + args.append(os.path.join(self.source_base, dirname)) + # to here + args.append(self.dest_base) + logging.debug("Rsync cmd:" + " ".join(args)) + return subprocess.Popen(args) + + def copy(self): + """copy any interesting looking directories over + """ + dirs_to_copy = self.list() + for d in dirs_to_copy: + process = self.processes.get(d, None) + if process is None: + # we don't have a process, so make one + logging.info("rsyncing %s" % (d)) + self.processes[d] = self.create_copy_process(d) + else: + retcode = process.poll() + if retcode is not None: + # we finished + logging.info("finished rsyncing %s, exitcode %d" % (d, retcode)) + del self.processes[d] + +class copier_bot_parser(object): + def __init__(self, ): + self.rsync = rsync('/home/diane/.sequencer') + + def __call__(self, msg, who): + try: + if re.match("start copy", msg): + logging.info("starting copy for %s" % (who.getStripped())) + self.rsync.copy() + except Exception, e: + errmsg = "Exception: " + str(e) + logging.error(errmsg) + logging.error(traceback.format_exc()) + return errmsg + +def main(args=None): + if len(args) != 1: + print "need .benderjab config name" + configname = args[0] + + logging.basicConfig(level=logging.INFO, + format='%(asctime)s %(levelname)s %(message)s') + log = logging.getLogger() + log.addHandler(logging.handlers.RotatingFileHandler( + '/tmp/copier_bot.log', maxBytes=1000000, backupCount = 3) + ) + bot = BenderFactory(configname) + bot.parser = copier_bot_parser() + bot.logon() + bot.eventLoop() + logging.shutdown() + return 0 + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) + diff --git a/gaworkflow/spoolwatcher.py b/gaworkflow/spoolwatcher.py new file mode 100644 index 0000000..bba9fdb --- /dev/null +++ b/gaworkflow/spoolwatcher.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python + +import os +import re +import sys +import time + +# this uses pyinotify +import pyinotify +from pyinotify import EventsCodes + +from benderjab.xsend import send + +class Handler(pyinotify.ProcessEvent): + def __init__(self, watchmanager, runner_jid): + self.last_event_time = None + self.watchmanager = watchmanager + self.runner_jid = runner_jid + + def process_IN_CREATE(self, event): + self.last_event_time = time.time() + msg = "Create: %s" % os.path.join(event.path, event.name) + if event.name.lower() == "run.completed": + print "Run is done!" + try: + send(self.runner_jid, "a run finished, launch it, and swap the drive") + except IOError, e: + print "ERROR: couldn't send message" + print str(e) + print msg + + def process_IN_DELETE(self, event): + print "Remove: %s" % os.path.join(event.path, event.name) + + def process_IN_UNMOUNT(self, event): + print "Unmounted ", str(event) + +class TreeWriteDoneNotifier: + """ + Watch a directory and send a message when another process is done writing. + + This monitors a directory tree using inotify (linux specific) and + after some files having been written will send a message after + seconds of no file writing. + + (Basically when the solexa machine finishes dumping a round of data + this'll hopefully send out a message saying hey look theres data available + """ + + def __init__(self, watchdir, + copy_jid, runner_jid, + write_timeout=10, notify_timeout=5): + self.watchdir = watchdir + self.copy_jid = copy_jid + self.runner_jid = runner_jid + self.write_timeout = write_timeout + self.notify_timeout = int(notify_timeout * 1000) + + self.wm = pyinotify.WatchManager() + self.handler = Handler(self.wm, self.runner_jid) + self.notifier = pyinotify.Notifier(self.wm, self.handler) + + self.add_watch(self.watchdir) + + def add_watch(self, watchdir): + print "Watching:", watchdir + mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT + # rec traverses the tree and adds all the directories that are there + # at the start. + # auto_add will add in new directories as they are created + self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True) + + def event_loop(self): + while True: # loop forever + try: + # process the queue of events as explained above + self.notifier.process_events() + #check events waits timeout + if self.notifier.check_events(self.notify_timeout): + # read notified events and enqeue them + self.notifier.read_events() + # should we do something? + last_event_time = self.handler.last_event_time + if last_event_time is not None: + time_delta = time.time() - last_event_time + if time_delta > self.write_timeout: + self.copying_paused() + self.handler.last_event_time = None + + except KeyboardInterrupt: + # destroy the inotify's instance on this interrupt (stop monitoring) + self.notifier.stop() + break + + def copying_paused(self): + print "more than 10 seconds have elapsed" + send(self.copy_jid, "start copy") + diff --git a/scripts/copier b/scripts/copier index f0d780a..898f685 100644 --- a/scripts/copier +++ b/scripts/copier @@ -1,6 +1,6 @@ #!/usr/bin/env python import sys -from uashelper.copier import main +from gaworkflow.copier import main if __name__ == "__main__": sys.exit(main(sys.argv[1:])) diff --git a/scripts/spoolwatcher b/scripts/spoolwatcher index 1e5d029..96c452f 100644 --- a/scripts/spoolwatcher +++ b/scripts/spoolwatcher @@ -1,6 +1,6 @@ #!/usr/bin/env python -from uashelper.spoolwatcher import TreeWriteDoneNotifier +from gaworkflow.spoolwatcher import TreeWriteDoneNotifier if __name__ == "__main__": copy_jid = "cellthumper@chaos.caltech.edu" diff --git a/uashelper/__init__.py b/uashelper/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/uashelper/copier.py b/uashelper/copier.py deleted file mode 100644 index 2655320..0000000 --- a/uashelper/copier.py +++ /dev/null @@ -1,106 +0,0 @@ -import copy -import logging -import logging.handlers -import os -import re -import subprocess -import sys -import time -import traceback - -from benderjab.bot import BenderFactory - -class rsync(object): - - def __init__(self, pwfile): - self.pwfile = pwfile - self.cmd = ['/usr/bin/rsync', ] - self.cmd.append('--password-file=%s' % (pwfile)) - self.source_base = 'rsync://sequencer@jumpgate.caltech.edu:8730/sequencer/' - self.dest_base = '/home/diane/gec/' - self.processes = {} - self.exit_code = None - - def list(self): - """Get a directory listing""" - dirs_to_copy = [] - args = copy.copy(self.cmd) - args.append(self.source_base) - - logging.debug("Rsync cmd:" + " ".join(args)) - short_process = subprocess.Popen(args, stdout=subprocess.PIPE) - direntries = [ x.split() for x in short_process.stdout ] - for permissions, size, filedate, filetime, filename in direntries: - if permissions[0] == 'd': - # hey its a directory, the first step to being something we want to - # copy - if re.match("[0-9]{6}", filename): - # it starts with something that looks like a 6 digit date - # aka good enough for me - dirs_to_copy.append(filename) - return dirs_to_copy - - def create_copy_process(self, dirname): - args = copy.copy(self.cmd) - # we want to copy everything - args.append('-rlt') - # from here - args.append(os.path.join(self.source_base, dirname)) - # to here - args.append(self.dest_base) - logging.debug("Rsync cmd:" + " ".join(args)) - return subprocess.Popen(args) - - def copy(self): - """copy any interesting looking directories over - """ - dirs_to_copy = self.list() - for d in dirs_to_copy: - process = self.processes.get(d, None) - if process is None: - # we don't have a process, so make one - logging.info("rsyncing %s" % (d)) - self.processes[d] = self.create_copy_process(d) - else: - retcode = process.poll() - if retcode is not None: - # we finished - logging.info("finished rsyncing %s, exitcode %d" % (d, retcode)) - del self.processes[d] - -class copier_bot_parser(object): - def __init__(self, ): - self.rsync = rsync('/home/diane/.sequencer') - - def __call__(self, msg, who): - try: - if re.match("start copy", msg): - logging.info("starting copy for %s" % (who.getStripped())) - self.rsync.copy() - except Exception, e: - errmsg = "Exception: " + str(e) - logging.error(errmsg) - logging.error(traceback.format_exc()) - return errmsg - -def main(args=None): - if len(args) != 1: - print "need .benderjab config name" - configname = args[0] - - logging.basicConfig(level=logging.INFO, - format='%(asctime)s %(levelname)s %(message)s') - log = logging.getLogger() - log.addHandler(logging.handlers.RotatingFileHandler( - '/tmp/copier_bot.log', maxBytes=1000000, backupCount = 3) - ) - bot = BenderFactory(configname) - bot.parser = copier_bot_parser() - bot.logon() - bot.eventLoop() - logging.shutdown() - return 0 - -if __name__ == "__main__": - sys.exit(main(sys.argv[1:])) - diff --git a/uashelper/spoolwatcher.py b/uashelper/spoolwatcher.py deleted file mode 100644 index bba9fdb..0000000 --- a/uashelper/spoolwatcher.py +++ /dev/null @@ -1,98 +0,0 @@ -#!/usr/bin/env python - -import os -import re -import sys -import time - -# this uses pyinotify -import pyinotify -from pyinotify import EventsCodes - -from benderjab.xsend import send - -class Handler(pyinotify.ProcessEvent): - def __init__(self, watchmanager, runner_jid): - self.last_event_time = None - self.watchmanager = watchmanager - self.runner_jid = runner_jid - - def process_IN_CREATE(self, event): - self.last_event_time = time.time() - msg = "Create: %s" % os.path.join(event.path, event.name) - if event.name.lower() == "run.completed": - print "Run is done!" - try: - send(self.runner_jid, "a run finished, launch it, and swap the drive") - except IOError, e: - print "ERROR: couldn't send message" - print str(e) - print msg - - def process_IN_DELETE(self, event): - print "Remove: %s" % os.path.join(event.path, event.name) - - def process_IN_UNMOUNT(self, event): - print "Unmounted ", str(event) - -class TreeWriteDoneNotifier: - """ - Watch a directory and send a message when another process is done writing. - - This monitors a directory tree using inotify (linux specific) and - after some files having been written will send a message after - seconds of no file writing. - - (Basically when the solexa machine finishes dumping a round of data - this'll hopefully send out a message saying hey look theres data available - """ - - def __init__(self, watchdir, - copy_jid, runner_jid, - write_timeout=10, notify_timeout=5): - self.watchdir = watchdir - self.copy_jid = copy_jid - self.runner_jid = runner_jid - self.write_timeout = write_timeout - self.notify_timeout = int(notify_timeout * 1000) - - self.wm = pyinotify.WatchManager() - self.handler = Handler(self.wm, self.runner_jid) - self.notifier = pyinotify.Notifier(self.wm, self.handler) - - self.add_watch(self.watchdir) - - def add_watch(self, watchdir): - print "Watching:", watchdir - mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT - # rec traverses the tree and adds all the directories that are there - # at the start. - # auto_add will add in new directories as they are created - self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True) - - def event_loop(self): - while True: # loop forever - try: - # process the queue of events as explained above - self.notifier.process_events() - #check events waits timeout - if self.notifier.check_events(self.notify_timeout): - # read notified events and enqeue them - self.notifier.read_events() - # should we do something? - last_event_time = self.handler.last_event_time - if last_event_time is not None: - time_delta = time.time() - last_event_time - if time_delta > self.write_timeout: - self.copying_paused() - self.handler.last_event_time = None - - except KeyboardInterrupt: - # destroy the inotify's instance on this interrupt (stop monitoring) - self.notifier.stop() - break - - def copying_paused(self): - print "more than 10 seconds have elapsed" - send(self.copy_jid, "start copy") -