From bdb3dad08d7fa045db9de765f5c117e71f2ae0a1 Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Sat, 30 May 2009 01:07:08 +0000 Subject: [PATCH] Watch more than one directory tree for modification. I also added code to record which directory watch, and what entry in that directory was being created (AKA whatever element of the runfolder is being touched, I record the root of the runfolder directory). To support this I changed the config option in the benderjab file from watchdir to watchdirs to make it a bit clearer that things have changed. I still need to work out the communication protocol to copier so it can figure out what to start copying. Also I'm recording the watchdirectory, but copier needs access to a different url. So there's some question about where the right place to map watchdir to copy url source might be. --- htsworkflow/automation/spoolwatcher.py | 138 +++++++++++++++++-------- 1 file changed, 93 insertions(+), 45 deletions(-) diff --git a/htsworkflow/automation/spoolwatcher.py b/htsworkflow/automation/spoolwatcher.py index 361a779..d100879 100644 --- a/htsworkflow/automation/spoolwatcher.py +++ b/htsworkflow/automation/spoolwatcher.py @@ -2,6 +2,7 @@ import logging import os import re +import shlex import sys import time #import glob @@ -15,6 +16,28 @@ from pyinotify import EventsCodes from benderjab import rpc +def get_top_dir(root, path): + """ + Return the directory in path that is a subdirectory of root. + e.g. + + >>> print get_top_dir('/a/b/c', '/a/b/c/d/e/f') + d + >>> print get_top_dir('/a/b/c/', '/a/b/c/d/e/f') + d + >>> print get_top_dir('/a/b/c', '/g/e/f') + None + >>> print get_top_dir('/a/b/c', '/a/b/c') + + """ + if path.startswith(root): + subpath = path[len(root):] + if subpath.startswith('/'): + subpath = subpath[1:] + return subpath.split(os.path.sep)[0] + else: + return None + class WatcherEvents(object): # two events need to be tracked # one to send startCopy @@ -33,8 +56,7 @@ class Handler(pyinotify.ProcessEvent): ipar flag indicates we should wait for ipar to finish, instead of just the run finishing """ - print 'ipar flag: ' + str(ipar) - self.last_event_time = None + self.last_event = {} self.watchmanager = watchmanager self.bot = bot self.ipar_mode = ipar @@ -44,15 +66,20 @@ class Handler(pyinotify.ProcessEvent): self.last_file = "run.completed".lower() def process_IN_CREATE(self, event): - self.last_event_time = time.time() - msg = "Create: %s" % os.path.join(event.path, event.name) + for wdd in self.bot.wdds: + for watch_path in self.bot.watchdirs: + if event.path.startswith(watch_path): + target = get_top_dir(watch_path, event.path) + self.last_event.setdefault(watch_path, {})[target] = time.time() + + msg = "Create: %s %s" % (event.path, event.name) - if event.name.lower() == self.last_file: - try: - self.bot.sequencingFinished(event.path) - except IOError, e: - logging.error("Couldn't send sequencingFinished") - logging.debug(msg) + if event.name.lower() == self.last_file: + try: + self.bot.sequencingFinished(event.path) + except IOError, e: + logging.error("Couldn't send sequencingFinished") + logging.debug(msg) def process_IN_DELETE(self, event): logging.debug("Remove: %s" % os.path.join(event.path, event.name)) @@ -60,7 +87,7 @@ class Handler(pyinotify.ProcessEvent): def process_IN_UNMOUNT(self, event): pathname = os.path.join(event.path, event.name) logging.debug("IN_UNMOUNT: %s" % (pathname,)) - self.bot.unmount_watch() + self.bot.unmount_watch(event.path) class SpoolWatcher(rpc.XmlRpcBot): """ @@ -77,7 +104,7 @@ class SpoolWatcher(rpc.XmlRpcBot): # these params need to be in the config file # I wonder where I should put the documentation #:Parameters: - # `watchdir` - which directory tree to monitor for modifications + # `watchdirs` - list of directories to monitor for modifications # `profile` - specify which .htsworkflow profile to use # `write_timeout` - how many seconds to wait for writes to finish to # the spool @@ -88,7 +115,7 @@ class SpoolWatcher(rpc.XmlRpcBot): # self.configfile = "~/.htsworkflow" super(SpoolWatcher, self).__init__(section, configfile) - self.cfg['watchdir'] = None + self.cfg['watchdirs'] = None self.cfg['write_timeout'] = 10 self.cfg['notify_users'] = None self.cfg['notify_runner'] = None @@ -96,9 +123,12 @@ class SpoolWatcher(rpc.XmlRpcBot): self.notify_timeout = 0.001 self.wm = pyinotify.WatchManager() - self.wdd = None - self.mount_point = None - self.mounted = True + self.wdds = [] + # keep track if the specified mount point is currently mounted + self.mounted_points = {} + # keep track of which mount points tie to which watch directories + # so maybe we can remount them. + self.mounts_to_watches = {} self.notify_users = None self.notify_runner = None @@ -108,10 +138,10 @@ class SpoolWatcher(rpc.XmlRpcBot): def read_config(self, section=None, configfile=None): super(SpoolWatcher, self).read_config(section, configfile) - self.watch_dir = self._check_required_option('watchdir') + self.watchdirs = shlex.split(self._check_required_option('watchdirs')) self.write_timeout = int(self.cfg['write_timeout']) self.wait_for_ipar = int(self.cfg['wait_for_ipar']) - print 'wait for ipar: ' + str(self.cfg['wait_for_ipar']) + logging.debug('wait for ipar: ' + str(self.cfg['wait_for_ipar'])) self.notify_users = self._parse_user_list(self.cfg['notify_users']) try: @@ -126,30 +156,40 @@ class SpoolWatcher(rpc.XmlRpcBot): self.handler = Handler(self.wm, self, self.wait_for_ipar) self.notifier = pyinotify.Notifier(self.wm, self.handler) - def add_watch(self, watchdir=None): + def add_watch(self, watchdirs=None): """ start watching watchdir or self.watch_dir we're currently limited to watching one directory tree. """ # the one tree limit is mostly because self.wdd is a single item # but managing it as a list might be a bit more annoying - if watchdir is None: - watchdir = self.watch_dir - logging.info("Watching:"+str(watchdir)) - - self.mount_point = mount.find_mount_point_for(watchdir) + if watchdirs is None: + watchdirs = self.watchdirs mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT # rec traverses the tree and adds all the directories that are there # at the start. # auto_add will add in new directories as they are created - self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True) + for w in watchdirs: + mount_location = mount.find_mount_point_for(w) + self.mounted_points[mount_location] = True + mounts = self.mounts_to_watches.get(mount_location, []) + if w not in mounts: + mounts.append(w) + self.mounts_to_watches[mount_location] = mounts + + logging.info(u"Watching:"+unicode(w)) + self.wdds.append(self.wm.add_watch(w, mask, rec=True, auto_add=True)) - def unmount_watch(self): - if self.wdd is not None: - self.wm.rm_watch(self.wdd.values()) - self.wdd = None - self.mounted = False + def unmount_watch(self, event_path): + # remove backwards so we don't get weirdness from + # the list getting shorter + for i in range(len(self.wdds),0, -1): + wdd = self.wdds[i] + logging.info(u'unmounting: '+unicode(wdd.items())) + self.wm.rm_watch(wdd.values()) + del self.wdds[i] + self.mounted = False def process_notify(self, *args): # process the queue of events as explained above @@ -160,23 +200,25 @@ class SpoolWatcher(rpc.XmlRpcBot): self.notifier.read_events() # should we do something? # has something happened? - last_event_time = self.handler.last_event_time - if last_event_time is not None: - time_delta = time.time() - last_event_time - if time_delta > self.write_timeout: - self.startCopy() - self.handler.last_event_time = None + for watch_dir, last_events in self.handler.last_event.items(): + logging.debug('last_events: %s %s' % (watch_dir, last_events)) + for last_event_dir, last_event_time in last_events.items(): + time_delta = time.time() - last_event_time + if time_delta > self.write_timeout: + self.startCopy(watch_dir, last_event_dir) + self.handler.last_event[watch_dir] = {} # handle unmounted filesystems - if not self.mounted: - if mount.is_mounted(self.mount_point): + for mount_point, was_mounted in self.mounted_points.items(): + if not was_mounted and mount.is_mounted(mount_point): # we've been remounted. Huzzah! # restart the watch - self.add_watch() - self.mounted = True - logging.info( - "%s was remounted, restarting watch" % \ - (self.mount_point) - ) + for watch in self.mounts_to_watches[mount_point]: + self.add_watch(watch) + logging.info( + "%s was remounted, restarting watch" % \ + (mount_point) + ) + self.mounted_points[mount_point] = True def _parser(self, msg, who): """ @@ -214,11 +256,15 @@ class SpoolWatcher(rpc.XmlRpcBot): self.notifier.stop() super(SpoolWatcher, self).stop() - def startCopy(self): + def startCopy(self, watchdir=None, event_path=None): logging.debug("writes seem to have stopped") + logging.debug("watchdir = %s, event_path = %s" % (watchdir, event_path)) if self.notify_runner is not None: for r in self.notify_runner: self.rpc_send(r, tuple(), 'startCopy') + if self.notify_users is not None: + for u in self.notify_users: + self.send(u, 'startCopy %s %s' % (watchdir, event_path)) def sequencingFinished(self, run_dir): # need to strip off self.watch_dir from rundir I suspect. @@ -242,3 +288,5 @@ def main(args=None): if __name__ == "__main__": sys.exit(main(sys.argv[1:])) +# TODO: +# send messages to copier specifying which mount to copy -- 2.30.2