Watch more than one directory tree for modification.
authorDiane Trout <diane@caltech.edu>
Sat, 30 May 2009 01:07:08 +0000 (01:07 +0000)
committerDiane Trout <diane@caltech.edu>
Sat, 30 May 2009 01:07:08 +0000 (01:07 +0000)
I also added code to record which directory watch, and what entry in that
directory was being created (AKA whatever element of the runfolder is being
touched, I record the root of the runfolder directory).

To support this I changed the config option in the benderjab file from
watchdir to watchdirs to make it a bit clearer that things have changed.

I still need to work out the communication protocol to copier so it
can figure out what to start copying. Also I'm recording the
watchdirectory, but copier needs access to a different url. So there's
some question about where the right place to map watchdir to copy url source
might be.

htsworkflow/automation/spoolwatcher.py

index 361a7795d0926f01c2e9fb91ec0a2d123a630130..d1008798f206c9d4ee25cefa62cdc16782151074 100644 (file)
@@ -2,6 +2,7 @@
 import logging
 import os
 import re
+import shlex
 import sys
 import time
 #import glob
@@ -15,6 +16,28 @@ from pyinotify import EventsCodes
 from benderjab import rpc
 
 
+def get_top_dir(root, path):
+    """
+    Return the directory in path that is a subdirectory of root.
+    e.g.
+
+    >>> print get_top_dir('/a/b/c', '/a/b/c/d/e/f')
+    d
+    >>> print get_top_dir('/a/b/c/', '/a/b/c/d/e/f')
+    d
+    >>> print get_top_dir('/a/b/c', '/g/e/f')
+    None
+    >>> print get_top_dir('/a/b/c', '/a/b/c')
+    <BLANKLINE>
+    """
+    if path.startswith(root):
+        subpath = path[len(root):]
+        if subpath.startswith('/'):
+            subpath = subpath[1:]
+        return subpath.split(os.path.sep)[0]
+    else:
+        return None
+
 class WatcherEvents(object):
     # two events need to be tracked
     # one to send startCopy
@@ -33,8 +56,7 @@ class Handler(pyinotify.ProcessEvent):
         ipar flag indicates we should wait for ipar to finish, instead of 
              just the run finishing
         """
-        print 'ipar flag: ' + str(ipar)
-        self.last_event_time = None
+        self.last_event = {}
         self.watchmanager = watchmanager
         self.bot = bot
         self.ipar_mode = ipar
@@ -44,15 +66,20 @@ class Handler(pyinotify.ProcessEvent):
             self.last_file = "run.completed".lower()
 
     def process_IN_CREATE(self, event):
-        self.last_event_time = time.time()
-        msg = "Create: %s" %  os.path.join(event.path, event.name)
+        for wdd in self.bot.wdds:
+            for watch_path in self.bot.watchdirs:
+                if event.path.startswith(watch_path):
+                    target = get_top_dir(watch_path, event.path)
+                    self.last_event.setdefault(watch_path, {})[target] = time.time()
+
+                    msg = "Create: %s %s" % (event.path, event.name)
 
-        if event.name.lower() == self.last_file:
-            try:
-                self.bot.sequencingFinished(event.path)
-            except IOError, e:
-                logging.error("Couldn't send sequencingFinished")
-        logging.debug(msg)
+                    if event.name.lower() == self.last_file:
+                        try:
+                            self.bot.sequencingFinished(event.path)
+                        except IOError, e:
+                            logging.error("Couldn't send sequencingFinished")
+                    logging.debug(msg)
 
     def process_IN_DELETE(self, event):
         logging.debug("Remove: %s" %  os.path.join(event.path, event.name))
@@ -60,7 +87,7 @@ class Handler(pyinotify.ProcessEvent):
     def process_IN_UNMOUNT(self, event):
         pathname = os.path.join(event.path, event.name)
         logging.debug("IN_UNMOUNT: %s" % (pathname,))
-        self.bot.unmount_watch()
+        self.bot.unmount_watch(event.path)
 
 class SpoolWatcher(rpc.XmlRpcBot):
     """
@@ -77,7 +104,7 @@ class SpoolWatcher(rpc.XmlRpcBot):
     # these params need to be in the config file
     # I wonder where I should put the documentation
     #:Parameters:
-    #    `watchdir` - which directory tree to monitor for modifications
+    #    `watchdirs` - list of directories to monitor for modifications
     #    `profile` - specify which .htsworkflow profile to use
     #    `write_timeout` - how many seconds to wait for writes to finish to
     #                      the spool
@@ -88,7 +115,7 @@ class SpoolWatcher(rpc.XmlRpcBot):
         #    self.configfile = "~/.htsworkflow"
         super(SpoolWatcher, self).__init__(section, configfile)
         
-        self.cfg['watchdir'] = None
+        self.cfg['watchdirs'] = None
         self.cfg['write_timeout'] = 10
         self.cfg['notify_users'] = None
         self.cfg['notify_runner'] = None
@@ -96,9 +123,12 @@ class SpoolWatcher(rpc.XmlRpcBot):
         
         self.notify_timeout = 0.001
         self.wm = pyinotify.WatchManager()
-        self.wdd = None
-        self.mount_point = None
-        self.mounted = True
+        self.wdds = []
+        # keep track if the specified mount point is currently mounted
+        self.mounted_points = {}
+        # keep track of which mount points tie to which watch directories
+        # so maybe we can remount them.
+        self.mounts_to_watches = {}
         
         self.notify_users = None
         self.notify_runner = None
@@ -108,10 +138,10 @@ class SpoolWatcher(rpc.XmlRpcBot):
     def read_config(self, section=None, configfile=None):
         super(SpoolWatcher, self).read_config(section, configfile)
         
-        self.watch_dir = self._check_required_option('watchdir')
+        self.watchdirs = shlex.split(self._check_required_option('watchdirs'))
         self.write_timeout = int(self.cfg['write_timeout'])
         self.wait_for_ipar = int(self.cfg['wait_for_ipar'])
-       print 'wait for ipar: ' + str(self.cfg['wait_for_ipar'])
+        logging.debug('wait for ipar: ' + str(self.cfg['wait_for_ipar']))
         
         self.notify_users = self._parse_user_list(self.cfg['notify_users'])
         try:
@@ -126,30 +156,40 @@ class SpoolWatcher(rpc.XmlRpcBot):
         self.handler = Handler(self.wm, self, self.wait_for_ipar)
         self.notifier = pyinotify.Notifier(self.wm, self.handler)
 
-    def add_watch(self, watchdir=None):
+    def add_watch(self, watchdirs=None):
         """
         start watching watchdir or self.watch_dir
         we're currently limited to watching one directory tree.
         """
         # the one tree limit is mostly because self.wdd is a single item
         # but managing it as a list might be a bit more annoying
-        if watchdir is None:
-            watchdir = self.watch_dir
-        logging.info("Watching:"+str(watchdir))
-
-        self.mount_point = mount.find_mount_point_for(watchdir)
+        if watchdirs is None:
+            watchdirs = self.watchdirs
 
         mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
         # rec traverses the tree and adds all the directories that are there
         # at the start.
         # auto_add will add in new directories as they are created
-        self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True)
+        for w in watchdirs:
+            mount_location = mount.find_mount_point_for(w)
+            self.mounted_points[mount_location] = True
+            mounts = self.mounts_to_watches.get(mount_location, [])
+            if w not in mounts:
+                mounts.append(w)
+                self.mounts_to_watches[mount_location] = mounts
+
+            logging.info(u"Watching:"+unicode(w))
+            self.wdds.append(self.wm.add_watch(w, mask, rec=True, auto_add=True))
 
-    def unmount_watch(self):
-        if self.wdd is not None:
-            self.wm.rm_watch(self.wdd.values())
-            self.wdd = None
-            self.mounted = False
+    def unmount_watch(self, event_path):
+        # remove backwards so we don't get weirdness from 
+        # the list getting shorter
+        for i in range(len(self.wdds),0, -1):
+            wdd = self.wdds[i]
+            logging.info(u'unmounting: '+unicode(wdd.items()))
+            self.wm.rm_watch(wdd.values())
+            del self.wdds[i]
+        self.mounted = False
             
     def process_notify(self, *args):
         # process the queue of events as explained above
@@ -160,23 +200,25 @@ class SpoolWatcher(rpc.XmlRpcBot):
             self.notifier.read_events()
             # should we do something?
         # has something happened?
-        last_event_time = self.handler.last_event_time
-        if last_event_time is not None:
-            time_delta = time.time() - last_event_time
-            if time_delta > self.write_timeout:
-                self.startCopy()
-                self.handler.last_event_time = None
+        for watch_dir, last_events in self.handler.last_event.items():
+            logging.debug('last_events: %s %s' % (watch_dir, last_events))
+            for last_event_dir, last_event_time in last_events.items():
+                time_delta = time.time() - last_event_time
+                if time_delta > self.write_timeout:
+                    self.startCopy(watch_dir, last_event_dir)
+                    self.handler.last_event[watch_dir] = {}
         # handle unmounted filesystems
-        if not self.mounted:
-            if mount.is_mounted(self.mount_point):
+        for mount_point, was_mounted in self.mounted_points.items():
+            if not was_mounted and mount.is_mounted(mount_point):
                 # we've been remounted. Huzzah!
                 # restart the watch
-                self.add_watch()
-                self.mounted = True
-                logging.info(
-                    "%s was remounted, restarting watch" % \
-                        (self.mount_point)
-                )
+                for watch in self.mounts_to_watches[mount_point]:
+                    self.add_watch(watch)
+                    logging.info(
+                        "%s was remounted, restarting watch" % \
+                            (mount_point)
+                    )
+                self.mounted_points[mount_point] = True
 
     def _parser(self, msg, who):
         """
@@ -214,11 +256,15 @@ class SpoolWatcher(rpc.XmlRpcBot):
         self.notifier.stop()
         super(SpoolWatcher, self).stop()
     
-    def startCopy(self):
+    def startCopy(self, watchdir=None, event_path=None):
         logging.debug("writes seem to have stopped")
+        logging.debug("watchdir = %s, event_path = %s" % (watchdir, event_path))
         if self.notify_runner is not None:
             for r in self.notify_runner:
                 self.rpc_send(r, tuple(), 'startCopy')
+        if self.notify_users is not None:
+            for u in self.notify_users:
+                self.send(u, 'startCopy %s %s' % (watchdir, event_path))
         
     def sequencingFinished(self, run_dir):
         # need to strip off self.watch_dir from rundir I suspect.
@@ -242,3 +288,5 @@ def main(args=None):
 if __name__ == "__main__":
     sys.exit(main(sys.argv[1:]))
 
+# TODO:
+# send messages to copier specifying which mount to copy