Detect if our watch is on a mount point.
authorDiane Trout <diane@caltech.edu>
Tue, 24 Jun 2008 00:36:17 +0000 (00:36 +0000)
committerDiane Trout <diane@caltech.edu>
Tue, 24 Jun 2008 00:36:17 +0000 (00:36 +0000)
If we're on something that is unmounted, keep watching until there's a
new mount. Once something has been remounted, restart the watch.

gaworkflow/automation/spoolwatcher.py
gaworkflow/util/mount.py [new file with mode: 0644]

index abd970982263975cd23b0144024be88711f81f06..56ad42f3c81e55fc89aa4491c08b5457f0026301 100644 (file)
@@ -6,7 +6,7 @@ import sys
 import time
 #import glob
 
-#from gaworkflow.pipeline.recipe_parser import get_cycles
+from gaworkflow.util import mount
 
 # this uses pyinotify
 import pyinotify
@@ -47,6 +47,8 @@ class Handler(pyinotify.ProcessEvent):
         logging.debug("Remove: %s" %  os.path.join(event.path, event.name))
 
     def process_IN_UNMOUNT(self, event):
+        pathname = os.path.join(event.path, event.name)
+        logging.debug("IN_UNMOUNT: %s" % (pathname,))
         self.bot.unmount_watch()
 
 class SpoolWatcher(rpc.XmlRpcBot):
@@ -85,6 +87,8 @@ class SpoolWatcher(rpc.XmlRpcBot):
         self.handler = Handler(self.wm, self)
         self.notifier = pyinotify.Notifier(self.wm, self.handler)
         self.wdd = None
+        self.mount_point = None
+        self.mounted = True
         
         self.notify_users = None
         self.notify_runner = None
@@ -106,7 +110,7 @@ class SpoolWatcher(rpc.XmlRpcBot):
             msg = 'need a full jabber ID + resource for xml-rpc destinations'
             logging.FATAL(msg)
             raise bot.JIDMissingResource(msg)
-            
+
     def add_watch(self, watchdir=None):
         """
         start watching watchdir or self.watch_dir
@@ -117,6 +121,9 @@ class SpoolWatcher(rpc.XmlRpcBot):
         if watchdir is None:
             watchdir = self.watch_dir
         logging.info("Watching:"+str(watchdir))
+
+        self.mount_point = mount.find_mount_point_for(watchdir)
+
         mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
         # rec traverses the tree and adds all the directories that are there
         # at the start.
@@ -125,10 +132,9 @@ class SpoolWatcher(rpc.XmlRpcBot):
 
     def unmount_watch(self):
         if self.wdd is not None:
-            logging.debug("disabling watch")
-            logging.debug(str(self.wdd))
             self.wm.rm_watch(self.wdd.values())
             self.wdd = None
+            self.mounted = False
             
     def process_notify(self, *args):
         # process the queue of events as explained above
@@ -138,13 +144,25 @@ class SpoolWatcher(rpc.XmlRpcBot):
             # read notified events and enqeue them
             self.notifier.read_events()
             # should we do something?
+        # has something happened?
         last_event_time = self.handler.last_event_time
         if last_event_time is not None:
             time_delta = time.time() - last_event_time
             if time_delta > self.write_timeout:
                 self.startCopy()
                 self.handler.last_event_time = None
-    
+        # handle unmounted filesystems
+        if not self.mounted:
+            if mount.is_mounted(self.mount_point):
+                # we've been remounted. Huzzah!
+                # restart the watch
+                self.add_watch()
+                self.mounted = True
+                logging.info(
+                    "%s was remounted, restarting watch" % \
+                        (self.mount_point)
+                )
+
     def _parser(self, msg, who):
         """
         Parse xmpp chat messages
diff --git a/gaworkflow/util/mount.py b/gaworkflow/util/mount.py
new file mode 100644 (file)
index 0000000..75dbe0a
--- /dev/null
@@ -0,0 +1,65 @@
+"""
+Utilities for working with unix-style mounts.
+"""
+import os
+import subprocess
+
+def list_mount_points():
+    """
+    Return list of current mount points
+
+    Note: unix-like OS specific
+    """
+    mount_points = []
+    likely_locations = ['/sbin/mount', '/bin/mount']
+    for mount in likely_locations:
+        if os.path.exists(mount):
+            p = subprocess.Popen(mount, stdout=subprocess.PIPE)
+            p.wait()
+            for l in p.stdout.readlines():
+                rec = l.split()
+                device = rec[0]            
+                mount_point = rec[2]
+                assert rec[1] == 'on'
+                # looking at the output of mount on linux, osx, and 
+                # sunos, the first 3 elements are always the same
+                # devicename on path
+                # everything after that displays the attributes
+                # of the mount points in wildly differing formats
+                mount_points.append(mount_point)
+            return mount_points
+    else:
+        raise RuntimeError("Couldn't find a mount executable")
+
+def is_mounted(point_to_check):
+    """
+    Return true if argument exactly matches a current mount point.
+    """
+    for mount_point in list_mount_points():
+        if point_to_check == mount_point:
+            return True
+    else:
+        return False
+
+def find_mount_point_for(pathname):
+    """
+    Find the deepest mount point pathname is located on
+    """
+    realpath = os.path.realpath(pathname)
+    mount_points = list_mount_points()
+
+    prefixes = set()
+    for current_mount in mount_points:
+        cp = os.path.commonprefix([current_mount, realpath])
+        prefixes.add((len(cp), cp))
+
+    prefixes = list(prefixes)
+    prefixes.sort()
+    if len(prefixes) == 0:
+        return None
+    else:
+        print prefixes
+        # return longest common prefix
+        return prefixes[-1][1]
+
+