From: Diane Trout Date: Fri, 5 Jun 2009 00:32:00 +0000 (+0000) Subject: Pyinotify behaves oddly when the stdio file descriptors are closed. X-Git-Tag: 0.2.0.5~2 X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=commitdiff_plain;h=31008ca04cf70aa4d40a0ed3dbd37e131931367e Pyinotify behaves oddly when the stdio file descriptors are closed. so don't initialize it until after the daemonize code has been called. this means after BenderJab.start has been called. (So I changed from SpoolWatcher.start to SpoolWatcher.run and moved the watch manager configuration into the start of run) --- diff --git a/htsworkflow/automation/spoolwatcher.py b/htsworkflow/automation/spoolwatcher.py index d57ab59..6683a65 100644 --- a/htsworkflow/automation/spoolwatcher.py +++ b/htsworkflow/automation/spoolwatcher.py @@ -5,7 +5,6 @@ import re import shlex import sys import time -#import glob from htsworkflow.util import mount @@ -15,7 +14,6 @@ from pyinotify import EventsCodes from benderjab import rpc - def get_top_dir(root, path): """ Return the directory in path that is a subdirectory of root. @@ -78,11 +76,13 @@ class Handler(pyinotify.ProcessEvent): try: self.bot.sequencingFinished(event.path) except IOError, e: + pass logging.error("Couldn't send sequencingFinished") logging.debug(msg) def process_IN_DELETE(self, event): logging.debug("Remove: %s" % os.path.join(event.path, event.name)) + pass def process_IN_UNMOUNT(self, event): pathname = os.path.join(event.path, event.name) @@ -120,28 +120,34 @@ class SpoolWatcher(rpc.XmlRpcBot): self.cfg['notify_users'] = None self.cfg['notify_runner'] = None self.cfg['wait_for_ipar'] = 0 - + + self.watchdirs = [] + self.watchdir_url_map = {} self.notify_timeout = 0.001 - self.wm = pyinotify.WatchManager() + + self.wm = None + self.notify_users = None + self.notify_runner = None self.wdds = [] + # keep track if the specified mount point is currently mounted self.mounted_points = {} # keep track of which mount points tie to which watch directories # so maybe we can remount them. self.mounts_to_watches = {} - self.notify_users = None - self.notify_runner = None - self.eventTasks.append(self.process_notify) def read_config(self, section=None, configfile=None): super(SpoolWatcher, self).read_config(section, configfile) self.watchdirs = shlex.split(self._check_required_option('watchdirs')) + # see if there's an alternate url that should be used for the watchdir + for watchdir in self.watchdirs: + self.watchdir_url_map[watchdir] = self.cfg.get(watchdir, watchdir) + self.write_timeout = int(self.cfg['write_timeout']) self.wait_for_ipar = int(self.cfg['wait_for_ipar']) - logging.debug('wait for ipar: ' + str(self.cfg['wait_for_ipar'])) self.notify_users = self._parse_user_list(self.cfg['notify_users']) try: @@ -150,17 +156,22 @@ class SpoolWatcher(rpc.XmlRpcBot): require_resource=True) except bot.JIDMissingResource: msg = 'need a full jabber ID + resource for xml-rpc destinations' - logging.FATAL(msg) raise bot.JIDMissingResource(msg) - self.handler = Handler(self.wm, self, self.wait_for_ipar) - self.notifier = pyinotify.Notifier(self.wm, self.handler) + self.handler = None + self.notifier = None def add_watch(self, watchdirs=None): """ start watching watchdir or self.watchdir we're currently limited to watching one directory tree. """ + # create the watch managers if we need them + if self.wm is None: + self.wm = pyinotify.WatchManager() + self.handler = Handler(self.wm, self, self.wait_for_ipar) + self.notifier = pyinotify.Notifier(self.wm, self.handler) + # the one tree limit is mostly because self.wdd is a single item # but managing it as a list might be a bit more annoying if watchdirs is None: @@ -192,6 +203,9 @@ class SpoolWatcher(rpc.XmlRpcBot): self.mounted = False def process_notify(self, *args): + if self.notifier is None: + # nothing to do yet + return # process the queue of events as explained above self.notifier.process_events() #check events waits timeout @@ -201,7 +215,7 @@ class SpoolWatcher(rpc.XmlRpcBot): # should we do something? # has something happened? for watchdir, last_events in self.handler.last_event.items(): - logging.debug('last_events: %s %s' % (watchdir, last_events)) + #logging.debug('last_events: %s %s' % (watchdir, last_events)) for last_event_dir, last_event_time in last_events.items(): time_delta = time.time() - last_event_time if time_delta > self.write_timeout: @@ -241,24 +255,27 @@ class SpoolWatcher(rpc.XmlRpcBot): reply = u"I didn't understand '%s'" %(msg) return reply - def start(self, daemonize): + def run(self): """ Start application """ + # we have to configure pyinotify after BenderJab.start is called + # as weird things happen to pyinotify if the stdio is closed + # after it's initialized. self.add_watch() - super(SpoolWatcher, self).start(daemonize) + super(SpoolWatcher, self).run() def stop(self): """ shutdown application """ # destroy the inotify's instance on this interrupt (stop monitoring) - self.notifier.stop() + if self.notifier is not None: + self.notifier.stop() super(SpoolWatcher, self).stop() def startCopy(self, watchdir=None, event_path=None): logging.debug("writes seem to have stopped") - logging.debug("watchdir = %s, event_path = %s" % (watchdir, event_path)) if self.notify_runner is not None: for r in self.notify_runner: self.rpc_send(r, tuple(), 'startCopy') @@ -286,7 +303,8 @@ def main(args=None): return bot.main(args) if __name__ == "__main__": - sys.exit(main(sys.argv[1:])) + ret = main(sys.argv[1:]) + #sys.exit(ret) # TODO: # send messages to copier specifying which mount to copy