[project @ make spoolwatcher a benderjab bot]
authorDiane Trout <diane@caltech.edu>
Mon, 10 Dec 2007 21:12:20 +0000 (21:12 +0000)
committerDiane Trout <diane@caltech.edu>
Mon, 10 Dec 2007 21:12:20 +0000 (21:12 +0000)
this was a pretty significant update to spool watcher, changing
the main event loop from being driven by inotify to BenderJab, and
changing the start copying and run finished messages from being
chat messages to xml-rpc messages.

gaworkflow/spoolwatcher.py
scripts/spoolwatcher

index bba9fdbe5a4c77b4642764279a8b45af6107cc3f..33a69b1db73908a90d255a3e83b87cdd9e0dd064 100644 (file)
@@ -1,5 +1,5 @@
 #!/usr/bin/env python
-
+import logging
 import os
 import re
 import sys
@@ -9,90 +9,167 @@ import time
 import pyinotify
 from pyinotify import EventsCodes
 
-from benderjab.xsend import send
+from benderjab import rpc
+
 
 class Handler(pyinotify.ProcessEvent):
-    def __init__(self, watchmanager, runner_jid):
-      self.last_event_time = None
-      self.watchmanager = watchmanager
-      self.runner_jid = runner_jid
+    def __init__(self, watchmanager, bot):
+        self.last_event_time = None
+        self.watchmanager = watchmanager
+        self.bot = bot
 
     def process_IN_CREATE(self, event):
-      self.last_event_time = time.time()
-      msg = "Create: %s" %  os.path.join(event.path, event.name)
-      if event.name.lower() == "run.completed":
-        print "Run is done!"
-        try:
-          send(self.runner_jid, "a run finished, launch it, and swap the drive")
-        except IOError, e:
-          print "ERROR: couldn't send message"
-          print str(e)
-      print msg
+        self.last_event_time = time.time()
+        msg = "Create: %s" %  os.path.join(event.path, event.name)
+        if event.name.lower() == "run.completed":
+            try:
+                self.bot.runFinished(event.path)
+            except IOError, e:
+                logging.error("Couldn't send runFinished")
+        logging.debug(msg)
 
     def process_IN_DELETE(self, event):
-      print "Remove: %s" %  os.path.join(event.path, event.name)
+        logging.debug("Remove: %s" %  os.path.join(event.path, event.name))
 
     def process_IN_UNMOUNT(self, event):
-      print "Unmounted ", str(event)
-
-class TreeWriteDoneNotifier:
-  """
-  Watch a directory and send a message when another process is done writing.
-
-  This monitors a directory tree using inotify (linux specific) and 
-  after some files having been written will send a message after <timeout>
-  seconds of no file writing.
-
-  (Basically when the solexa machine finishes dumping a round of data
-  this'll hopefully send out a message saying hey look theres data available
-  """
-
-  def __init__(self, watchdir, 
-                     copy_jid, runner_jid, 
-                     write_timeout=10, notify_timeout=5):
-     self.watchdir = watchdir
-     self.copy_jid = copy_jid
-     self.runner_jid = runner_jid
-     self.write_timeout = write_timeout
-     self.notify_timeout = int(notify_timeout * 1000)
-
-     self.wm = pyinotify.WatchManager()
-     self.handler = Handler(self.wm, self.runner_jid)
-     self.notifier = pyinotify.Notifier(self.wm, self.handler)
-     
-     self.add_watch(self.watchdir)
-
-  def add_watch(self, watchdir):
-     print "Watching:", watchdir
-     mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
-     # rec traverses the tree and adds all the directories that are there 
-     # at the start.
-     # auto_add will add in new directories as they are created
-     self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True)
-
-  def event_loop(self):
-    while True:  # loop forever
-      try:
+        self.bot.unmount_watch()
+
+class SpoolWatcher(rpc.XmlRpcBot):
+    """
+    Watch a directory and send a message when another process is done writing.
+    
+    This monitors a directory tree using inotify (linux specific) and
+    after some files having been written will send a message after <timeout>
+    seconds of no file writing.
+    
+    (Basically when the solexa machine finishes dumping a round of data
+    this'll hopefully send out a message saying hey look theres data available
+    
+    """
+    # these params need to be in the config file
+    # I wonder where I should put the documentation
+    #:Parameters:
+    #    `watchdir` - which directory tree to monitor for modifications
+    #    `profile` - specify which .gaworkflow profile to use
+    #    `write_timeout` - how many seconds to wait for writes to finish to
+    #                      the spool
+    #    `notify_timeout` - how often to timeout from notify
+    
+    def __init__(self, section=None, configfile=None):
+        #if configfile is None:
+        #    self.configfile = "~/.gaworkflow"
+        super(SpoolWatcher, self).__init__(section, configfile)
+        
+        self.cfg['watchdir'] = None
+        self.cfg['write_timeout'] = 10
+        self.cfg['notify_users'] = None
+        self.cfg['notify_runner'] = None
+        
+        self.notify_timeout = 0.001
+        self.wm = pyinotify.WatchManager()
+        self.handler = Handler(self.wm, self)
+        self.notifier = pyinotify.Notifier(self.wm, self.handler)
+        self.wdd = None
+        
+        self.notify_users = None
+        self.notify_runner = None
+        
+        self.eventTasks.append(self.process_notify)
+
+    def read_config(self, section=None, configfile=None):
+        super(SpoolWatcher, self).read_config(section, configfile)
+        
+        self.watch_dir = self._check_required_option('watchdir')
+        self.write_timeout = int(self.cfg['write_timeout'])
+        
+        self.notify_users = self._parse_user_list(self.cfg['notify_users'])
+        try:
+          self.notify_runner = \
+             self._parse_user_list(self.cfg['notify_runner'],
+                                   require_resource=True)
+        except bot.JIDMissingResource:
+            msg = 'need a full jabber ID + resource for xml-rpc destinations'
+            logging.FATAL(msg)
+            raise bot.JIDMissingResource(msg)
+            
+    def add_watch(self, watchdir=None):
+        """
+        start watching watchdir or self.watch_dir
+        we're currently limited to watching one directory tree.
+        """
+        # the one tree limit is mostly because self.wdd is a single item
+        # but managing it as a list might be a bit more annoying
+        if watchdir is None:
+            watchdir = self.watch_dir
+        logging.info("Watching:"+str(watchdir))
+        mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
+        # rec traverses the tree and adds all the directories that are there
+        # at the start.
+        # auto_add will add in new directories as they are created
+        self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True)
+
+    def unmount_watch(self):
+        if self.wdd is not None:
+            logging.debug("disabling watch")
+            logging.debug(str(self.wdd))
+            self.wm.rm_watch(self.wdd)
+            self.wdd = None
+            
+    def process_notify(self, *args):
         # process the queue of events as explained above
         self.notifier.process_events()
         #check events waits timeout
         if self.notifier.check_events(self.notify_timeout):
-          # read notified events and enqeue them
-          self.notifier.read_events()
-        # should we do something?
-       last_event_time = self.handler.last_event_time
-       if last_event_time is not None:
-         time_delta = time.time() - last_event_time
-         if time_delta > self.write_timeout:
-           self.copying_paused()
-           self.handler.last_event_time = None
-
-      except KeyboardInterrupt:
+            # read notified events and enqeue them
+            self.notifier.read_events()
+            # should we do something?
+        last_event_time = self.handler.last_event_time
+        if last_event_time is not None:
+            time_delta = time.time() - last_event_time
+            if time_delta > self.write_timeout:
+                self.startCopy()
+                self.handler.last_event_time = None
+    
+    def start(self, daemonize):
+        """
+        Start application
+        """
+        self.add_watch()
+        super(SpoolWatcher, self).start(daemonize)
+        
+    def stop(self):
+        """
+        shutdown application
+        """
         # destroy the inotify's instance on this interrupt (stop monitoring)
         self.notifier.stop()
-        break
-
-  def copying_paused(self):
-    print "more than 10 seconds have elapsed"
-    send(self.copy_jid, "start copy")
-
+        super(SpoolWatcher, self).stop()
+    
+    def startCopy(self):
+        logging.debug("writes seem to have stopped")
+        if self.notify_runner is not None:
+            for r in self.notify_runner:
+                self.rpc_send(r, tuple(), 'startCopy')
+        
+    def runFinished(self, run_dir):
+        # need to strip off self.watch_dir from rundir I suspect.
+        logging.info("run.completed in " + str(run_dir))
+        pattern = self.watch_dir
+        if pattern[-1] != os.path.sep:
+            pattern += os.path.sep
+        stripped_run_dir = re.sub(pattern, "", run_dir)
+        logging.debug("stripped to " + stripped_run_dir)
+        if self.notify_users is not None:
+            for u in self.notify_users:
+                self.send(u, 'Sequencing run %s finished' % (stripped_run_dir))
+        if self.notify_runner is not None:
+            for r in self.notify_runner:
+                self.rpc_send(r, (stripped_run_dir,), 'runFinished')
+        
+def main(args=None):
+    bot = SpoolWatcher()
+    return bot.main(args)
+    
+if __name__ == "__main__":
+    sys.exit(main(sys.argv[1:]))
+    
\ No newline at end of file
index 96c452fe209b3313361345722f6b7ce29e5b2212..25a277f08ff3ad842ce80578439b5f63e955ddb2 100644 (file)
@@ -1,10 +1,6 @@
 #!/usr/bin/env python
-
-from gaworkflow.spoolwatcher import TreeWriteDoneNotifier
+import sys
+from gaworkflow.spoolwatcher import main
 
 if __name__ == "__main__":
-  copy_jid = "cellthumper@chaos.caltech.edu"
-  runner_jid = "diane@ghic.org"
-
-  watcher = TreeWriteDoneNotifier("/gec/jumpgate/ext0", copy_jid, runner_jid)
-  watcher.event_loop()
+    sys.exit(main(sys.argv[1:]))