From 31008ca04cf70aa4d40a0ed3dbd37e131931367e Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Fri, 5 Jun 2009 00:32:00 +0000 Subject: [PATCH] Pyinotify behaves oddly when the stdio file descriptors are closed. so don't initialize it until after the daemonize code has been called. this means after BenderJab.start has been called. (So I changed from SpoolWatcher.start to SpoolWatcher.run and moved the watch manager configuration into the start of run) --- htsworkflow/automation/spoolwatcher.py | 52 +++++++++++++++++--------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/htsworkflow/automation/spoolwatcher.py b/htsworkflow/automation/spoolwatcher.py index d57ab59..6683a65 100644 --- a/htsworkflow/automation/spoolwatcher.py +++ b/htsworkflow/automation/spoolwatcher.py @@ -5,7 +5,6 @@ import re import shlex import sys import time -#import glob from htsworkflow.util import mount @@ -15,7 +14,6 @@ from pyinotify import EventsCodes from benderjab import rpc - def get_top_dir(root, path): """ Return the directory in path that is a subdirectory of root. @@ -78,11 +76,13 @@ class Handler(pyinotify.ProcessEvent): try: self.bot.sequencingFinished(event.path) except IOError, e: + pass logging.error("Couldn't send sequencingFinished") logging.debug(msg) def process_IN_DELETE(self, event): logging.debug("Remove: %s" % os.path.join(event.path, event.name)) + pass def process_IN_UNMOUNT(self, event): pathname = os.path.join(event.path, event.name) @@ -120,28 +120,34 @@ class SpoolWatcher(rpc.XmlRpcBot): self.cfg['notify_users'] = None self.cfg['notify_runner'] = None self.cfg['wait_for_ipar'] = 0 - + + self.watchdirs = [] + self.watchdir_url_map = {} self.notify_timeout = 0.001 - self.wm = pyinotify.WatchManager() + + self.wm = None + self.notify_users = None + self.notify_runner = None self.wdds = [] + # keep track if the specified mount point is currently mounted self.mounted_points = {} # keep track of which mount points tie to which watch directories # so maybe we can remount them. self.mounts_to_watches = {} - self.notify_users = None - self.notify_runner = None - self.eventTasks.append(self.process_notify) def read_config(self, section=None, configfile=None): super(SpoolWatcher, self).read_config(section, configfile) self.watchdirs = shlex.split(self._check_required_option('watchdirs')) + # see if there's an alternate url that should be used for the watchdir + for watchdir in self.watchdirs: + self.watchdir_url_map[watchdir] = self.cfg.get(watchdir, watchdir) + self.write_timeout = int(self.cfg['write_timeout']) self.wait_for_ipar = int(self.cfg['wait_for_ipar']) - logging.debug('wait for ipar: ' + str(self.cfg['wait_for_ipar'])) self.notify_users = self._parse_user_list(self.cfg['notify_users']) try: @@ -150,17 +156,22 @@ class SpoolWatcher(rpc.XmlRpcBot): require_resource=True) except bot.JIDMissingResource: msg = 'need a full jabber ID + resource for xml-rpc destinations' - logging.FATAL(msg) raise bot.JIDMissingResource(msg) - self.handler = Handler(self.wm, self, self.wait_for_ipar) - self.notifier = pyinotify.Notifier(self.wm, self.handler) + self.handler = None + self.notifier = None def add_watch(self, watchdirs=None): """ start watching watchdir or self.watchdir we're currently limited to watching one directory tree. """ + # create the watch managers if we need them + if self.wm is None: + self.wm = pyinotify.WatchManager() + self.handler = Handler(self.wm, self, self.wait_for_ipar) + self.notifier = pyinotify.Notifier(self.wm, self.handler) + # the one tree limit is mostly because self.wdd is a single item # but managing it as a list might be a bit more annoying if watchdirs is None: @@ -192,6 +203,9 @@ class SpoolWatcher(rpc.XmlRpcBot): self.mounted = False def process_notify(self, *args): + if self.notifier is None: + # nothing to do yet + return # process the queue of events as explained above self.notifier.process_events() #check events waits timeout @@ -201,7 +215,7 @@ class SpoolWatcher(rpc.XmlRpcBot): # should we do something? # has something happened? for watchdir, last_events in self.handler.last_event.items(): - logging.debug('last_events: %s %s' % (watchdir, last_events)) + #logging.debug('last_events: %s %s' % (watchdir, last_events)) for last_event_dir, last_event_time in last_events.items(): time_delta = time.time() - last_event_time if time_delta > self.write_timeout: @@ -241,24 +255,27 @@ class SpoolWatcher(rpc.XmlRpcBot): reply = u"I didn't understand '%s'" %(msg) return reply - def start(self, daemonize): + def run(self): """ Start application """ + # we have to configure pyinotify after BenderJab.start is called + # as weird things happen to pyinotify if the stdio is closed + # after it's initialized. self.add_watch() - super(SpoolWatcher, self).start(daemonize) + super(SpoolWatcher, self).run() def stop(self): """ shutdown application """ # destroy the inotify's instance on this interrupt (stop monitoring) - self.notifier.stop() + if self.notifier is not None: + self.notifier.stop() super(SpoolWatcher, self).stop() def startCopy(self, watchdir=None, event_path=None): logging.debug("writes seem to have stopped") - logging.debug("watchdir = %s, event_path = %s" % (watchdir, event_path)) if self.notify_runner is not None: for r in self.notify_runner: self.rpc_send(r, tuple(), 'startCopy') @@ -286,7 +303,8 @@ def main(args=None): return bot.main(args) if __name__ == "__main__": - sys.exit(main(sys.argv[1:])) + ret = main(sys.argv[1:]) + #sys.exit(ret) # TODO: # send messages to copier specifying which mount to copy -- 2.30.2