Detect if our watch is on a mount point.
[htsworkflow.git] / gaworkflow / automation / spoolwatcher.py
index abd970982263975cd23b0144024be88711f81f06..56ad42f3c81e55fc89aa4491c08b5457f0026301 100644 (file)
@@ -6,7 +6,7 @@ import sys
 import time
 #import glob
 
-#from gaworkflow.pipeline.recipe_parser import get_cycles
+from gaworkflow.util import mount
 
 # this uses pyinotify
 import pyinotify
@@ -47,6 +47,8 @@ class Handler(pyinotify.ProcessEvent):
         logging.debug("Remove: %s" %  os.path.join(event.path, event.name))
 
     def process_IN_UNMOUNT(self, event):
+        pathname = os.path.join(event.path, event.name)
+        logging.debug("IN_UNMOUNT: %s" % (pathname,))
         self.bot.unmount_watch()
 
 class SpoolWatcher(rpc.XmlRpcBot):
@@ -85,6 +87,8 @@ class SpoolWatcher(rpc.XmlRpcBot):
         self.handler = Handler(self.wm, self)
         self.notifier = pyinotify.Notifier(self.wm, self.handler)
         self.wdd = None
+        self.mount_point = None
+        self.mounted = True
         
         self.notify_users = None
         self.notify_runner = None
@@ -106,7 +110,7 @@ class SpoolWatcher(rpc.XmlRpcBot):
             msg = 'need a full jabber ID + resource for xml-rpc destinations'
             logging.FATAL(msg)
             raise bot.JIDMissingResource(msg)
-            
+
     def add_watch(self, watchdir=None):
         """
         start watching watchdir or self.watch_dir
@@ -117,6 +121,9 @@ class SpoolWatcher(rpc.XmlRpcBot):
         if watchdir is None:
             watchdir = self.watch_dir
         logging.info("Watching:"+str(watchdir))
+
+        self.mount_point = mount.find_mount_point_for(watchdir)
+
         mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
         # rec traverses the tree and adds all the directories that are there
         # at the start.
@@ -125,10 +132,9 @@ class SpoolWatcher(rpc.XmlRpcBot):
 
     def unmount_watch(self):
         if self.wdd is not None:
-            logging.debug("disabling watch")
-            logging.debug(str(self.wdd))
             self.wm.rm_watch(self.wdd.values())
             self.wdd = None
+            self.mounted = False
             
     def process_notify(self, *args):
         # process the queue of events as explained above
@@ -138,13 +144,25 @@ class SpoolWatcher(rpc.XmlRpcBot):
             # read notified events and enqeue them
             self.notifier.read_events()
             # should we do something?
+        # has something happened?
         last_event_time = self.handler.last_event_time
         if last_event_time is not None:
             time_delta = time.time() - last_event_time
             if time_delta > self.write_timeout:
                 self.startCopy()
                 self.handler.last_event_time = None
-    
+        # handle unmounted filesystems
+        if not self.mounted:
+            if mount.is_mounted(self.mount_point):
+                # we've been remounted. Huzzah!
+                # restart the watch
+                self.add_watch()
+                self.mounted = True
+                logging.info(
+                    "%s was remounted, restarting watch" % \
+                        (self.mount_point)
+                )
+
     def _parser(self, msg, who):
         """
         Parse xmpp chat messages