From ea64c5ed5b19fcf6a171ed1136ec577deb014a57 Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Tue, 24 Jun 2008 00:36:17 +0000 Subject: [PATCH] Detect if our watch is on a mount point. If we're on something that is unmounted, keep watching until there's a new mount. Once something has been remounted, restart the watch. --- gaworkflow/automation/spoolwatcher.py | 28 +++++++++--- gaworkflow/util/mount.py | 65 +++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 5 deletions(-) create mode 100644 gaworkflow/util/mount.py diff --git a/gaworkflow/automation/spoolwatcher.py b/gaworkflow/automation/spoolwatcher.py index abd9709..56ad42f 100644 --- a/gaworkflow/automation/spoolwatcher.py +++ b/gaworkflow/automation/spoolwatcher.py @@ -6,7 +6,7 @@ import sys import time #import glob -#from gaworkflow.pipeline.recipe_parser import get_cycles +from gaworkflow.util import mount # this uses pyinotify import pyinotify @@ -47,6 +47,8 @@ class Handler(pyinotify.ProcessEvent): logging.debug("Remove: %s" % os.path.join(event.path, event.name)) def process_IN_UNMOUNT(self, event): + pathname = os.path.join(event.path, event.name) + logging.debug("IN_UNMOUNT: %s" % (pathname,)) self.bot.unmount_watch() class SpoolWatcher(rpc.XmlRpcBot): @@ -85,6 +87,8 @@ class SpoolWatcher(rpc.XmlRpcBot): self.handler = Handler(self.wm, self) self.notifier = pyinotify.Notifier(self.wm, self.handler) self.wdd = None + self.mount_point = None + self.mounted = True self.notify_users = None self.notify_runner = None @@ -106,7 +110,7 @@ class SpoolWatcher(rpc.XmlRpcBot): msg = 'need a full jabber ID + resource for xml-rpc destinations' logging.FATAL(msg) raise bot.JIDMissingResource(msg) - + def add_watch(self, watchdir=None): """ start watching watchdir or self.watch_dir @@ -117,6 +121,9 @@ class SpoolWatcher(rpc.XmlRpcBot): if watchdir is None: watchdir = self.watch_dir logging.info("Watching:"+str(watchdir)) + + self.mount_point = mount.find_mount_point_for(watchdir) + mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT # rec traverses the tree and adds all the directories that are there # at the start. @@ -125,10 +132,9 @@ class SpoolWatcher(rpc.XmlRpcBot): def unmount_watch(self): if self.wdd is not None: - logging.debug("disabling watch") - logging.debug(str(self.wdd)) self.wm.rm_watch(self.wdd.values()) self.wdd = None + self.mounted = False def process_notify(self, *args): # process the queue of events as explained above @@ -138,13 +144,25 @@ class SpoolWatcher(rpc.XmlRpcBot): # read notified events and enqeue them self.notifier.read_events() # should we do something? + # has something happened? last_event_time = self.handler.last_event_time if last_event_time is not None: time_delta = time.time() - last_event_time if time_delta > self.write_timeout: self.startCopy() self.handler.last_event_time = None - + # handle unmounted filesystems + if not self.mounted: + if mount.is_mounted(self.mount_point): + # we've been remounted. Huzzah! + # restart the watch + self.add_watch() + self.mounted = True + logging.info( + "%s was remounted, restarting watch" % \ + (self.mount_point) + ) + def _parser(self, msg, who): """ Parse xmpp chat messages diff --git a/gaworkflow/util/mount.py b/gaworkflow/util/mount.py new file mode 100644 index 0000000..75dbe0a --- /dev/null +++ b/gaworkflow/util/mount.py @@ -0,0 +1,65 @@ +""" +Utilities for working with unix-style mounts. +""" +import os +import subprocess + +def list_mount_points(): + """ + Return list of current mount points + + Note: unix-like OS specific + """ + mount_points = [] + likely_locations = ['/sbin/mount', '/bin/mount'] + for mount in likely_locations: + if os.path.exists(mount): + p = subprocess.Popen(mount, stdout=subprocess.PIPE) + p.wait() + for l in p.stdout.readlines(): + rec = l.split() + device = rec[0] + mount_point = rec[2] + assert rec[1] == 'on' + # looking at the output of mount on linux, osx, and + # sunos, the first 3 elements are always the same + # devicename on path + # everything after that displays the attributes + # of the mount points in wildly differing formats + mount_points.append(mount_point) + return mount_points + else: + raise RuntimeError("Couldn't find a mount executable") + +def is_mounted(point_to_check): + """ + Return true if argument exactly matches a current mount point. + """ + for mount_point in list_mount_points(): + if point_to_check == mount_point: + return True + else: + return False + +def find_mount_point_for(pathname): + """ + Find the deepest mount point pathname is located on + """ + realpath = os.path.realpath(pathname) + mount_points = list_mount_points() + + prefixes = set() + for current_mount in mount_points: + cp = os.path.commonprefix([current_mount, realpath]) + prefixes.add((len(cp), cp)) + + prefixes = list(prefixes) + prefixes.sort() + if len(prefixes) == 0: + return None + else: + print prefixes + # return longest common prefix + return prefixes[-1][1] + + -- 2.30.2