From: Diane Trout Date: Mon, 10 Dec 2007 21:12:20 +0000 (+0000) Subject: [project @ make spoolwatcher a benderjab bot] X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=commitdiff_plain;h=98baa9318ec11bf25354f39034faa1bb2027a2d8 [project @ make spoolwatcher a benderjab bot] this was a pretty significant update to spool watcher, changing the main event loop from being driven by inotify to BenderJab, and changing the start copying and run finished messages from being chat messages to xml-rpc messages. --- diff --git a/gaworkflow/spoolwatcher.py b/gaworkflow/spoolwatcher.py index bba9fdb..33a69b1 100644 --- a/gaworkflow/spoolwatcher.py +++ b/gaworkflow/spoolwatcher.py @@ -1,5 +1,5 @@ #!/usr/bin/env python - +import logging import os import re import sys @@ -9,90 +9,167 @@ import time import pyinotify from pyinotify import EventsCodes -from benderjab.xsend import send +from benderjab import rpc + class Handler(pyinotify.ProcessEvent): - def __init__(self, watchmanager, runner_jid): - self.last_event_time = None - self.watchmanager = watchmanager - self.runner_jid = runner_jid + def __init__(self, watchmanager, bot): + self.last_event_time = None + self.watchmanager = watchmanager + self.bot = bot def process_IN_CREATE(self, event): - self.last_event_time = time.time() - msg = "Create: %s" % os.path.join(event.path, event.name) - if event.name.lower() == "run.completed": - print "Run is done!" - try: - send(self.runner_jid, "a run finished, launch it, and swap the drive") - except IOError, e: - print "ERROR: couldn't send message" - print str(e) - print msg + self.last_event_time = time.time() + msg = "Create: %s" % os.path.join(event.path, event.name) + if event.name.lower() == "run.completed": + try: + self.bot.runFinished(event.path) + except IOError, e: + logging.error("Couldn't send runFinished") + logging.debug(msg) def process_IN_DELETE(self, event): - print "Remove: %s" % os.path.join(event.path, event.name) + logging.debug("Remove: %s" % os.path.join(event.path, event.name)) def process_IN_UNMOUNT(self, event): - print "Unmounted ", str(event) - -class TreeWriteDoneNotifier: - """ - Watch a directory and send a message when another process is done writing. - - This monitors a directory tree using inotify (linux specific) and - after some files having been written will send a message after - seconds of no file writing. - - (Basically when the solexa machine finishes dumping a round of data - this'll hopefully send out a message saying hey look theres data available - """ - - def __init__(self, watchdir, - copy_jid, runner_jid, - write_timeout=10, notify_timeout=5): - self.watchdir = watchdir - self.copy_jid = copy_jid - self.runner_jid = runner_jid - self.write_timeout = write_timeout - self.notify_timeout = int(notify_timeout * 1000) - - self.wm = pyinotify.WatchManager() - self.handler = Handler(self.wm, self.runner_jid) - self.notifier = pyinotify.Notifier(self.wm, self.handler) - - self.add_watch(self.watchdir) - - def add_watch(self, watchdir): - print "Watching:", watchdir - mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT - # rec traverses the tree and adds all the directories that are there - # at the start. - # auto_add will add in new directories as they are created - self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True) - - def event_loop(self): - while True: # loop forever - try: + self.bot.unmount_watch() + +class SpoolWatcher(rpc.XmlRpcBot): + """ + Watch a directory and send a message when another process is done writing. + + This monitors a directory tree using inotify (linux specific) and + after some files having been written will send a message after + seconds of no file writing. + + (Basically when the solexa machine finishes dumping a round of data + this'll hopefully send out a message saying hey look theres data available + + """ + # these params need to be in the config file + # I wonder where I should put the documentation + #:Parameters: + # `watchdir` - which directory tree to monitor for modifications + # `profile` - specify which .gaworkflow profile to use + # `write_timeout` - how many seconds to wait for writes to finish to + # the spool + # `notify_timeout` - how often to timeout from notify + + def __init__(self, section=None, configfile=None): + #if configfile is None: + # self.configfile = "~/.gaworkflow" + super(SpoolWatcher, self).__init__(section, configfile) + + self.cfg['watchdir'] = None + self.cfg['write_timeout'] = 10 + self.cfg['notify_users'] = None + self.cfg['notify_runner'] = None + + self.notify_timeout = 0.001 + self.wm = pyinotify.WatchManager() + self.handler = Handler(self.wm, self) + self.notifier = pyinotify.Notifier(self.wm, self.handler) + self.wdd = None + + self.notify_users = None + self.notify_runner = None + + self.eventTasks.append(self.process_notify) + + def read_config(self, section=None, configfile=None): + super(SpoolWatcher, self).read_config(section, configfile) + + self.watch_dir = self._check_required_option('watchdir') + self.write_timeout = int(self.cfg['write_timeout']) + + self.notify_users = self._parse_user_list(self.cfg['notify_users']) + try: + self.notify_runner = \ + self._parse_user_list(self.cfg['notify_runner'], + require_resource=True) + except bot.JIDMissingResource: + msg = 'need a full jabber ID + resource for xml-rpc destinations' + logging.FATAL(msg) + raise bot.JIDMissingResource(msg) + + def add_watch(self, watchdir=None): + """ + start watching watchdir or self.watch_dir + we're currently limited to watching one directory tree. + """ + # the one tree limit is mostly because self.wdd is a single item + # but managing it as a list might be a bit more annoying + if watchdir is None: + watchdir = self.watch_dir + logging.info("Watching:"+str(watchdir)) + mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT + # rec traverses the tree and adds all the directories that are there + # at the start. + # auto_add will add in new directories as they are created + self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True) + + def unmount_watch(self): + if self.wdd is not None: + logging.debug("disabling watch") + logging.debug(str(self.wdd)) + self.wm.rm_watch(self.wdd) + self.wdd = None + + def process_notify(self, *args): # process the queue of events as explained above self.notifier.process_events() #check events waits timeout if self.notifier.check_events(self.notify_timeout): - # read notified events and enqeue them - self.notifier.read_events() - # should we do something? - last_event_time = self.handler.last_event_time - if last_event_time is not None: - time_delta = time.time() - last_event_time - if time_delta > self.write_timeout: - self.copying_paused() - self.handler.last_event_time = None - - except KeyboardInterrupt: + # read notified events and enqeue them + self.notifier.read_events() + # should we do something? + last_event_time = self.handler.last_event_time + if last_event_time is not None: + time_delta = time.time() - last_event_time + if time_delta > self.write_timeout: + self.startCopy() + self.handler.last_event_time = None + + def start(self, daemonize): + """ + Start application + """ + self.add_watch() + super(SpoolWatcher, self).start(daemonize) + + def stop(self): + """ + shutdown application + """ # destroy the inotify's instance on this interrupt (stop monitoring) self.notifier.stop() - break - - def copying_paused(self): - print "more than 10 seconds have elapsed" - send(self.copy_jid, "start copy") - + super(SpoolWatcher, self).stop() + + def startCopy(self): + logging.debug("writes seem to have stopped") + if self.notify_runner is not None: + for r in self.notify_runner: + self.rpc_send(r, tuple(), 'startCopy') + + def runFinished(self, run_dir): + # need to strip off self.watch_dir from rundir I suspect. + logging.info("run.completed in " + str(run_dir)) + pattern = self.watch_dir + if pattern[-1] != os.path.sep: + pattern += os.path.sep + stripped_run_dir = re.sub(pattern, "", run_dir) + logging.debug("stripped to " + stripped_run_dir) + if self.notify_users is not None: + for u in self.notify_users: + self.send(u, 'Sequencing run %s finished' % (stripped_run_dir)) + if self.notify_runner is not None: + for r in self.notify_runner: + self.rpc_send(r, (stripped_run_dir,), 'runFinished') + +def main(args=None): + bot = SpoolWatcher() + return bot.main(args) + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) + \ No newline at end of file diff --git a/scripts/spoolwatcher b/scripts/spoolwatcher index 96c452f..25a277f 100644 --- a/scripts/spoolwatcher +++ b/scripts/spoolwatcher @@ -1,10 +1,6 @@ #!/usr/bin/env python - -from gaworkflow.spoolwatcher import TreeWriteDoneNotifier +import sys +from gaworkflow.spoolwatcher import main if __name__ == "__main__": - copy_jid = "cellthumper@chaos.caltech.edu" - runner_jid = "diane@ghic.org" - - watcher = TreeWriteDoneNotifier("/gec/jumpgate/ext0", copy_jid, runner_jid) - watcher.event_loop() + sys.exit(main(sys.argv[1:]))