import time
#import glob
-#from gaworkflow.pipeline.recipe_parser import get_cycles
+from gaworkflow.util import mount
# this uses pyinotify
import pyinotify
logging.debug("Remove: %s" % os.path.join(event.path, event.name))
def process_IN_UNMOUNT(self, event):
+ pathname = os.path.join(event.path, event.name)
+ logging.debug("IN_UNMOUNT: %s" % (pathname,))
self.bot.unmount_watch()
class SpoolWatcher(rpc.XmlRpcBot):
self.handler = Handler(self.wm, self)
self.notifier = pyinotify.Notifier(self.wm, self.handler)
self.wdd = None
+ self.mount_point = None
+ self.mounted = True
self.notify_users = None
self.notify_runner = None
msg = 'need a full jabber ID + resource for xml-rpc destinations'
logging.FATAL(msg)
raise bot.JIDMissingResource(msg)
-
+
def add_watch(self, watchdir=None):
"""
start watching watchdir or self.watch_dir
if watchdir is None:
watchdir = self.watch_dir
logging.info("Watching:"+str(watchdir))
+
+ self.mount_point = mount.find_mount_point_for(watchdir)
+
mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
# rec traverses the tree and adds all the directories that are there
# at the start.
def unmount_watch(self):
if self.wdd is not None:
- logging.debug("disabling watch")
- logging.debug(str(self.wdd))
self.wm.rm_watch(self.wdd.values())
self.wdd = None
+ self.mounted = False
def process_notify(self, *args):
# process the queue of events as explained above
# read notified events and enqeue them
self.notifier.read_events()
# should we do something?
+ # has something happened?
last_event_time = self.handler.last_event_time
if last_event_time is not None:
time_delta = time.time() - last_event_time
if time_delta > self.write_timeout:
self.startCopy()
self.handler.last_event_time = None
-
+ # handle unmounted filesystems
+ if not self.mounted:
+ if mount.is_mounted(self.mount_point):
+ # we've been remounted. Huzzah!
+ # restart the watch
+ self.add_watch()
+ self.mounted = True
+ logging.info(
+ "%s was remounted, restarting watch" % \
+ (self.mount_point)
+ )
+
def _parser(self, msg, who):
"""
Parse xmpp chat messages
--- /dev/null
+"""
+Utilities for working with unix-style mounts.
+"""
+import os
+import subprocess
+
+def list_mount_points():
+ """
+ Return list of current mount points
+
+ Note: unix-like OS specific
+ """
+ mount_points = []
+ likely_locations = ['/sbin/mount', '/bin/mount']
+ for mount in likely_locations:
+ if os.path.exists(mount):
+ p = subprocess.Popen(mount, stdout=subprocess.PIPE)
+ p.wait()
+ for l in p.stdout.readlines():
+ rec = l.split()
+ device = rec[0]
+ mount_point = rec[2]
+ assert rec[1] == 'on'
+ # looking at the output of mount on linux, osx, and
+ # sunos, the first 3 elements are always the same
+ # devicename on path
+ # everything after that displays the attributes
+ # of the mount points in wildly differing formats
+ mount_points.append(mount_point)
+ return mount_points
+ else:
+ raise RuntimeError("Couldn't find a mount executable")
+
+def is_mounted(point_to_check):
+ """
+ Return true if argument exactly matches a current mount point.
+ """
+ for mount_point in list_mount_points():
+ if point_to_check == mount_point:
+ return True
+ else:
+ return False
+
+def find_mount_point_for(pathname):
+ """
+ Find the deepest mount point pathname is located on
+ """
+ realpath = os.path.realpath(pathname)
+ mount_points = list_mount_points()
+
+ prefixes = set()
+ for current_mount in mount_points:
+ cp = os.path.commonprefix([current_mount, realpath])
+ prefixes.add((len(cp), cp))
+
+ prefixes = list(prefixes)
+ prefixes.sort()
+ if len(prefixes) == 0:
+ return None
+ else:
+ print prefixes
+ # return longest common prefix
+ return prefixes[-1][1]
+
+