Watch more than one directory tree for modification.
authorDiane Trout <diane@caltech.edu>
Sat, 30 May 2009 01:07:08 +0000 (01:07 +0000)
committerDiane Trout <diane@caltech.edu>
Sat, 30 May 2009 01:07:08 +0000 (01:07 +0000)
I also added code to record which directory watch, and what entry in that
directory was being created (AKA whatever element of the runfolder is being
touched, I record the root of the runfolder directory).

To support this I changed the config option in the benderjab file from
watchdir to watchdirs to make it a bit clearer that things have changed.

I still need to work out the communication protocol to copier so it
can figure out what to start copying. Also I'm recording the
watchdirectory, but copier needs access to a different url. So there's
some question about where the right place to map watchdir to copy url source
might be.

htsworkflow/automation/spoolwatcher.py

index 361a7795d0926f01c2e9fb91ec0a2d123a630130..d1008798f206c9d4ee25cefa62cdc16782151074 100644 (file)
@@ -2,6 +2,7 @@
 import logging
 import os
 import re
 import logging
 import os
 import re
+import shlex
 import sys
 import time
 #import glob
 import sys
 import time
 #import glob
@@ -15,6 +16,28 @@ from pyinotify import EventsCodes
 from benderjab import rpc
 
 
 from benderjab import rpc
 
 
+def get_top_dir(root, path):
+    """
+    Return the directory in path that is a subdirectory of root.
+    e.g.
+
+    >>> print get_top_dir('/a/b/c', '/a/b/c/d/e/f')
+    d
+    >>> print get_top_dir('/a/b/c/', '/a/b/c/d/e/f')
+    d
+    >>> print get_top_dir('/a/b/c', '/g/e/f')
+    None
+    >>> print get_top_dir('/a/b/c', '/a/b/c')
+    <BLANKLINE>
+    """
+    if path.startswith(root):
+        subpath = path[len(root):]
+        if subpath.startswith('/'):
+            subpath = subpath[1:]
+        return subpath.split(os.path.sep)[0]
+    else:
+        return None
+
 class WatcherEvents(object):
     # two events need to be tracked
     # one to send startCopy
 class WatcherEvents(object):
     # two events need to be tracked
     # one to send startCopy
@@ -33,8 +56,7 @@ class Handler(pyinotify.ProcessEvent):
         ipar flag indicates we should wait for ipar to finish, instead of 
              just the run finishing
         """
         ipar flag indicates we should wait for ipar to finish, instead of 
              just the run finishing
         """
-        print 'ipar flag: ' + str(ipar)
-        self.last_event_time = None
+        self.last_event = {}
         self.watchmanager = watchmanager
         self.bot = bot
         self.ipar_mode = ipar
         self.watchmanager = watchmanager
         self.bot = bot
         self.ipar_mode = ipar
@@ -44,15 +66,20 @@ class Handler(pyinotify.ProcessEvent):
             self.last_file = "run.completed".lower()
 
     def process_IN_CREATE(self, event):
             self.last_file = "run.completed".lower()
 
     def process_IN_CREATE(self, event):
-        self.last_event_time = time.time()
-        msg = "Create: %s" %  os.path.join(event.path, event.name)
+        for wdd in self.bot.wdds:
+            for watch_path in self.bot.watchdirs:
+                if event.path.startswith(watch_path):
+                    target = get_top_dir(watch_path, event.path)
+                    self.last_event.setdefault(watch_path, {})[target] = time.time()
+
+                    msg = "Create: %s %s" % (event.path, event.name)
 
 
-        if event.name.lower() == self.last_file:
-            try:
-                self.bot.sequencingFinished(event.path)
-            except IOError, e:
-                logging.error("Couldn't send sequencingFinished")
-        logging.debug(msg)
+                    if event.name.lower() == self.last_file:
+                        try:
+                            self.bot.sequencingFinished(event.path)
+                        except IOError, e:
+                            logging.error("Couldn't send sequencingFinished")
+                    logging.debug(msg)
 
     def process_IN_DELETE(self, event):
         logging.debug("Remove: %s" %  os.path.join(event.path, event.name))
 
     def process_IN_DELETE(self, event):
         logging.debug("Remove: %s" %  os.path.join(event.path, event.name))
@@ -60,7 +87,7 @@ class Handler(pyinotify.ProcessEvent):
     def process_IN_UNMOUNT(self, event):
         pathname = os.path.join(event.path, event.name)
         logging.debug("IN_UNMOUNT: %s" % (pathname,))
     def process_IN_UNMOUNT(self, event):
         pathname = os.path.join(event.path, event.name)
         logging.debug("IN_UNMOUNT: %s" % (pathname,))
-        self.bot.unmount_watch()
+        self.bot.unmount_watch(event.path)
 
 class SpoolWatcher(rpc.XmlRpcBot):
     """
 
 class SpoolWatcher(rpc.XmlRpcBot):
     """
@@ -77,7 +104,7 @@ class SpoolWatcher(rpc.XmlRpcBot):
     # these params need to be in the config file
     # I wonder where I should put the documentation
     #:Parameters:
     # these params need to be in the config file
     # I wonder where I should put the documentation
     #:Parameters:
-    #    `watchdir` - which directory tree to monitor for modifications
+    #    `watchdirs` - list of directories to monitor for modifications
     #    `profile` - specify which .htsworkflow profile to use
     #    `write_timeout` - how many seconds to wait for writes to finish to
     #                      the spool
     #    `profile` - specify which .htsworkflow profile to use
     #    `write_timeout` - how many seconds to wait for writes to finish to
     #                      the spool
@@ -88,7 +115,7 @@ class SpoolWatcher(rpc.XmlRpcBot):
         #    self.configfile = "~/.htsworkflow"
         super(SpoolWatcher, self).__init__(section, configfile)
         
         #    self.configfile = "~/.htsworkflow"
         super(SpoolWatcher, self).__init__(section, configfile)
         
-        self.cfg['watchdir'] = None
+        self.cfg['watchdirs'] = None
         self.cfg['write_timeout'] = 10
         self.cfg['notify_users'] = None
         self.cfg['notify_runner'] = None
         self.cfg['write_timeout'] = 10
         self.cfg['notify_users'] = None
         self.cfg['notify_runner'] = None
@@ -96,9 +123,12 @@ class SpoolWatcher(rpc.XmlRpcBot):
         
         self.notify_timeout = 0.001
         self.wm = pyinotify.WatchManager()
         
         self.notify_timeout = 0.001
         self.wm = pyinotify.WatchManager()
-        self.wdd = None
-        self.mount_point = None
-        self.mounted = True
+        self.wdds = []
+        # keep track if the specified mount point is currently mounted
+        self.mounted_points = {}
+        # keep track of which mount points tie to which watch directories
+        # so maybe we can remount them.
+        self.mounts_to_watches = {}
         
         self.notify_users = None
         self.notify_runner = None
         
         self.notify_users = None
         self.notify_runner = None
@@ -108,10 +138,10 @@ class SpoolWatcher(rpc.XmlRpcBot):
     def read_config(self, section=None, configfile=None):
         super(SpoolWatcher, self).read_config(section, configfile)
         
     def read_config(self, section=None, configfile=None):
         super(SpoolWatcher, self).read_config(section, configfile)
         
-        self.watch_dir = self._check_required_option('watchdir')
+        self.watchdirs = shlex.split(self._check_required_option('watchdirs'))
         self.write_timeout = int(self.cfg['write_timeout'])
         self.wait_for_ipar = int(self.cfg['wait_for_ipar'])
         self.write_timeout = int(self.cfg['write_timeout'])
         self.wait_for_ipar = int(self.cfg['wait_for_ipar'])
-       print 'wait for ipar: ' + str(self.cfg['wait_for_ipar'])
+        logging.debug('wait for ipar: ' + str(self.cfg['wait_for_ipar']))
         
         self.notify_users = self._parse_user_list(self.cfg['notify_users'])
         try:
         
         self.notify_users = self._parse_user_list(self.cfg['notify_users'])
         try:
@@ -126,30 +156,40 @@ class SpoolWatcher(rpc.XmlRpcBot):
         self.handler = Handler(self.wm, self, self.wait_for_ipar)
         self.notifier = pyinotify.Notifier(self.wm, self.handler)
 
         self.handler = Handler(self.wm, self, self.wait_for_ipar)
         self.notifier = pyinotify.Notifier(self.wm, self.handler)
 
-    def add_watch(self, watchdir=None):
+    def add_watch(self, watchdirs=None):
         """
         start watching watchdir or self.watch_dir
         we're currently limited to watching one directory tree.
         """
         # the one tree limit is mostly because self.wdd is a single item
         # but managing it as a list might be a bit more annoying
         """
         start watching watchdir or self.watch_dir
         we're currently limited to watching one directory tree.
         """
         # the one tree limit is mostly because self.wdd is a single item
         # but managing it as a list might be a bit more annoying
-        if watchdir is None:
-            watchdir = self.watch_dir
-        logging.info("Watching:"+str(watchdir))
-
-        self.mount_point = mount.find_mount_point_for(watchdir)
+        if watchdirs is None:
+            watchdirs = self.watchdirs
 
         mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
         # rec traverses the tree and adds all the directories that are there
         # at the start.
         # auto_add will add in new directories as they are created
 
         mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
         # rec traverses the tree and adds all the directories that are there
         # at the start.
         # auto_add will add in new directories as they are created
-        self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True)
+        for w in watchdirs:
+            mount_location = mount.find_mount_point_for(w)
+            self.mounted_points[mount_location] = True
+            mounts = self.mounts_to_watches.get(mount_location, [])
+            if w not in mounts:
+                mounts.append(w)
+                self.mounts_to_watches[mount_location] = mounts
+
+            logging.info(u"Watching:"+unicode(w))
+            self.wdds.append(self.wm.add_watch(w, mask, rec=True, auto_add=True))
 
 
-    def unmount_watch(self):
-        if self.wdd is not None:
-            self.wm.rm_watch(self.wdd.values())
-            self.wdd = None
-            self.mounted = False
+    def unmount_watch(self, event_path):
+        # remove backwards so we don't get weirdness from 
+        # the list getting shorter
+        for i in range(len(self.wdds),0, -1):
+            wdd = self.wdds[i]
+            logging.info(u'unmounting: '+unicode(wdd.items()))
+            self.wm.rm_watch(wdd.values())
+            del self.wdds[i]
+        self.mounted = False
             
     def process_notify(self, *args):
         # process the queue of events as explained above
             
     def process_notify(self, *args):
         # process the queue of events as explained above
@@ -160,23 +200,25 @@ class SpoolWatcher(rpc.XmlRpcBot):
             self.notifier.read_events()
             # should we do something?
         # has something happened?
             self.notifier.read_events()
             # should we do something?
         # has something happened?
-        last_event_time = self.handler.last_event_time
-        if last_event_time is not None:
-            time_delta = time.time() - last_event_time
-            if time_delta > self.write_timeout:
-                self.startCopy()
-                self.handler.last_event_time = None
+        for watch_dir, last_events in self.handler.last_event.items():
+            logging.debug('last_events: %s %s' % (watch_dir, last_events))
+            for last_event_dir, last_event_time in last_events.items():
+                time_delta = time.time() - last_event_time
+                if time_delta > self.write_timeout:
+                    self.startCopy(watch_dir, last_event_dir)
+                    self.handler.last_event[watch_dir] = {}
         # handle unmounted filesystems
         # handle unmounted filesystems
-        if not self.mounted:
-            if mount.is_mounted(self.mount_point):
+        for mount_point, was_mounted in self.mounted_points.items():
+            if not was_mounted and mount.is_mounted(mount_point):
                 # we've been remounted. Huzzah!
                 # restart the watch
                 # we've been remounted. Huzzah!
                 # restart the watch
-                self.add_watch()
-                self.mounted = True
-                logging.info(
-                    "%s was remounted, restarting watch" % \
-                        (self.mount_point)
-                )
+                for watch in self.mounts_to_watches[mount_point]:
+                    self.add_watch(watch)
+                    logging.info(
+                        "%s was remounted, restarting watch" % \
+                            (mount_point)
+                    )
+                self.mounted_points[mount_point] = True
 
     def _parser(self, msg, who):
         """
 
     def _parser(self, msg, who):
         """
@@ -214,11 +256,15 @@ class SpoolWatcher(rpc.XmlRpcBot):
         self.notifier.stop()
         super(SpoolWatcher, self).stop()
     
         self.notifier.stop()
         super(SpoolWatcher, self).stop()
     
-    def startCopy(self):
+    def startCopy(self, watchdir=None, event_path=None):
         logging.debug("writes seem to have stopped")
         logging.debug("writes seem to have stopped")
+        logging.debug("watchdir = %s, event_path = %s" % (watchdir, event_path))
         if self.notify_runner is not None:
             for r in self.notify_runner:
                 self.rpc_send(r, tuple(), 'startCopy')
         if self.notify_runner is not None:
             for r in self.notify_runner:
                 self.rpc_send(r, tuple(), 'startCopy')
+        if self.notify_users is not None:
+            for u in self.notify_users:
+                self.send(u, 'startCopy %s %s' % (watchdir, event_path))
         
     def sequencingFinished(self, run_dir):
         # need to strip off self.watch_dir from rundir I suspect.
         
     def sequencingFinished(self, run_dir):
         # need to strip off self.watch_dir from rundir I suspect.
@@ -242,3 +288,5 @@ def main(args=None):
 if __name__ == "__main__":
     sys.exit(main(sys.argv[1:]))
 
 if __name__ == "__main__":
     sys.exit(main(sys.argv[1:]))
 
+# TODO:
+# send messages to copier specifying which mount to copy