Pyinotify behaves oddly when the stdio file descriptors are closed.
authorDiane Trout <diane@caltech.edu>
Fri, 5 Jun 2009 00:32:00 +0000 (00:32 +0000)
committerDiane Trout <diane@caltech.edu>
Fri, 5 Jun 2009 00:32:00 +0000 (00:32 +0000)
so don't initialize it until after the daemonize code has been
called. this means after BenderJab.start has been called.
(So I changed from SpoolWatcher.start to SpoolWatcher.run
and moved the watch manager configuration into the
start of run)

htsworkflow/automation/spoolwatcher.py

index d57ab590660151a9b6807579935c0641d77bd5d9..6683a65862e7a3eda37e5f7905915f19acf4f091 100644 (file)
@@ -5,7 +5,6 @@ import re
 import shlex
 import sys
 import time
-#import glob
 
 from htsworkflow.util import mount
 
@@ -15,7 +14,6 @@ from pyinotify import EventsCodes
 
 from benderjab import rpc
 
-
 def get_top_dir(root, path):
     """
     Return the directory in path that is a subdirectory of root.
@@ -78,11 +76,13 @@ class Handler(pyinotify.ProcessEvent):
                         try:
                             self.bot.sequencingFinished(event.path)
                         except IOError, e:
+                            pass
                             logging.error("Couldn't send sequencingFinished")
                     logging.debug(msg)
 
     def process_IN_DELETE(self, event):
         logging.debug("Remove: %s" %  os.path.join(event.path, event.name))
+        pass
 
     def process_IN_UNMOUNT(self, event):
         pathname = os.path.join(event.path, event.name)
@@ -120,28 +120,34 @@ class SpoolWatcher(rpc.XmlRpcBot):
         self.cfg['notify_users'] = None
         self.cfg['notify_runner'] = None
         self.cfg['wait_for_ipar'] = 0
-        
+       
+        self.watchdirs = []
+        self.watchdir_url_map = {}
         self.notify_timeout = 0.001
-        self.wm = pyinotify.WatchManager()
+
+        self.wm = None 
+        self.notify_users = None
+        self.notify_runner = None
         self.wdds = []
+
         # keep track if the specified mount point is currently mounted
         self.mounted_points = {}
         # keep track of which mount points tie to which watch directories
         # so maybe we can remount them.
         self.mounts_to_watches = {}
         
-        self.notify_users = None
-        self.notify_runner = None
-        
         self.eventTasks.append(self.process_notify)
 
     def read_config(self, section=None, configfile=None):
         super(SpoolWatcher, self).read_config(section, configfile)
         
         self.watchdirs = shlex.split(self._check_required_option('watchdirs'))
+        # see if there's an alternate url that should be used for the watchdir
+        for watchdir in self.watchdirs:
+            self.watchdir_url_map[watchdir] = self.cfg.get(watchdir, watchdir)
+
         self.write_timeout = int(self.cfg['write_timeout'])
         self.wait_for_ipar = int(self.cfg['wait_for_ipar'])
-        logging.debug('wait for ipar: ' + str(self.cfg['wait_for_ipar']))
         
         self.notify_users = self._parse_user_list(self.cfg['notify_users'])
         try:
@@ -150,17 +156,22 @@ class SpoolWatcher(rpc.XmlRpcBot):
                                    require_resource=True)
         except bot.JIDMissingResource:
             msg = 'need a full jabber ID + resource for xml-rpc destinations'
-            logging.FATAL(msg)
             raise bot.JIDMissingResource(msg)
 
-        self.handler = Handler(self.wm, self, self.wait_for_ipar)
-        self.notifier = pyinotify.Notifier(self.wm, self.handler)
+        self.handler = None
+        self.notifier = None
 
     def add_watch(self, watchdirs=None):
         """
         start watching watchdir or self.watchdir
         we're currently limited to watching one directory tree.
         """
+        # create the watch managers if we need them
+        if self.wm is None:
+            self.wm = pyinotify.WatchManager()
+            self.handler = Handler(self.wm, self, self.wait_for_ipar)
+            self.notifier = pyinotify.Notifier(self.wm, self.handler)
+
         # the one tree limit is mostly because self.wdd is a single item
         # but managing it as a list might be a bit more annoying
         if watchdirs is None:
@@ -192,6 +203,9 @@ class SpoolWatcher(rpc.XmlRpcBot):
         self.mounted = False
             
     def process_notify(self, *args):
+        if self.notifier is None:
+            # nothing to do yet
+            return
         # process the queue of events as explained above
         self.notifier.process_events()
         #check events waits timeout
@@ -201,7 +215,7 @@ class SpoolWatcher(rpc.XmlRpcBot):
             # should we do something?
         # has something happened?
         for watchdir, last_events in self.handler.last_event.items():
-            logging.debug('last_events: %s %s' % (watchdir, last_events))
+            #logging.debug('last_events: %s %s' % (watchdir, last_events))
             for last_event_dir, last_event_time in last_events.items():
                 time_delta = time.time() - last_event_time
                 if time_delta > self.write_timeout:
@@ -241,24 +255,27 @@ class SpoolWatcher(rpc.XmlRpcBot):
             reply = u"I didn't understand '%s'" %(msg)            
         return reply
         
-    def start(self, daemonize):
+    def run(self):
         """
         Start application
         """
+        # we have to configure pyinotify after BenderJab.start is called
+        # as weird things happen to pyinotify if the stdio is closed
+        # after it's initialized.
         self.add_watch()
-        super(SpoolWatcher, self).start(daemonize)
+        super(SpoolWatcher, self).run()
         
     def stop(self):
         """
         shutdown application
         """
         # destroy the inotify's instance on this interrupt (stop monitoring)
-        self.notifier.stop()
+        if self.notifier is not None:
+            self.notifier.stop()
         super(SpoolWatcher, self).stop()
     
     def startCopy(self, watchdir=None, event_path=None):
         logging.debug("writes seem to have stopped")
-        logging.debug("watchdir = %s, event_path = %s" % (watchdir, event_path))
         if self.notify_runner is not None:
             for r in self.notify_runner:
                 self.rpc_send(r, tuple(), 'startCopy')
@@ -286,7 +303,8 @@ def main(args=None):
     return bot.main(args)
     
 if __name__ == "__main__":
-    sys.exit(main(sys.argv[1:]))
+    ret = main(sys.argv[1:])
+    #sys.exit(ret)
 
 # TODO:
 # send messages to copier specifying which mount to copy