[project @ update CopierBot to new logging daemonizable XML-RPC BenderJab Bot]
authorDiane Trout <diane@caltech.edu>
Sat, 8 Dec 2007 00:40:11 +0000 (00:40 +0000)
committerDiane Trout <diane@caltech.edu>
Sat, 8 Dec 2007 00:40:11 +0000 (00:40 +0000)
this version reads all of the parameters out of the .benderjab config
file, will report what its currently copying, and when it gets a
"runFinished" message, will wait until its finished copying before
forwarding that on.

If it dies before finishing copying, but after gettting the runFinished
message that might get lost.

gaworkflow/copier.py
test/test_copier.py [new file with mode: 0644]

index 2655320638d2558f33b19e7cca55c04610dda376..8f46836a53a93cb4e4a24bc83411ccf76957cc35 100644 (file)
@@ -1,3 +1,4 @@
+import ConfigParser
 import copy
 import logging
 import logging.handlers
@@ -8,16 +9,24 @@ import sys
 import time
 import traceback
 
-from benderjab.bot import BenderFactory
+from benderjab import rpc
 
+def runfolder_validate(fname):
+    """
+    Return True if fname looks like a runfolder name
+    """
+    if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
+        return True
+    else:
+        return False
+    
 class rsync(object):
-  def __init__(self, pwfile):
-    self.pwfile = pwfile
+  def __init__(self, source, dest, pwfile):
+    self.pwfile = os.path.expanduser(pwfile)
     self.cmd = ['/usr/bin/rsync', ]
-    self.cmd.append('--password-file=%s' % (pwfile))
-    self.source_base = 'rsync://sequencer@jumpgate.caltech.edu:8730/sequencer/'
-    self.dest_base = '/home/diane/gec/'
+    self.cmd.append('--password-file=%s' % (self.pwfile))
+    self.source_base = source
+    self.dest_base = dest
     self.processes = {}
     self.exit_code = None
 
@@ -52,55 +61,186 @@ class rsync(object):
     return subprocess.Popen(args)
  
   def copy(self):
-    """copy any interesting looking directories over
     """
+    copy any interesting looking directories over
+    return list of items that we started copying.
+    """
+    # clean up any lingering non-running processes
+    self.poll()
+    
+    # what's available to copy?
     dirs_to_copy = self.list()
+    
+    # lets start copying
+    started = []
     for d in dirs_to_copy:
       process = self.processes.get(d, None)
+      
       if process is None:
         # we don't have a process, so make one
         logging.info("rsyncing %s" % (d))
         self.processes[d] = self.create_copy_process(d)
-      else:
-        retcode = process.poll()
-        if retcode is not None:
-           # we finished
-           logging.info("finished rsyncing %s, exitcode %d" % (d, retcode))
-           del self.processes[d]
-
-class copier_bot_parser(object):
-  def __init__(self, ):
-    self.rsync = rsync('/home/diane/.sequencer')
+        started.append(d)           
+    return started
+      
+  def poll(self):
+      """
+      check currently running processes to see if they're done
+      
+      return path roots that have finished.
+      """
+      for dir_key, proc_value in self.processes.items():
+          retcode = proc_value.poll()
+          if retcode is None:
+              # process hasn't finished yet
+              pass
+          elif retcode == 0:
+              logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
+              del self.processes[dir_key]
+          else:
+              logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
+              
+  def __len__(self):
+      """
+      Return how many active rsync processes we currently have
+      
+      Call poll first to close finished processes.
+      """
+      return len(self.processes)
   
-  def __call__(self, msg, who):
-    try:
-      if re.match("start copy", msg):
-        logging.info("starting copy for %s" % (who.getStripped()))
-        self.rsync.copy()
-    except Exception, e:
-      errmsg = "Exception: " + str(e)
-      logging.error(errmsg)
-      logging.error(traceback.format_exc())
-      return errmsg
+  def keys(self):
+      """
+      Return list of current run folder names
+      """
+      return self.processes.keys()
 
-def main(args=None):
-  if len(args) != 1:
-    print "need .benderjab config name"
-  configname = args[0]
+class CopierBot(rpc.XmlRpcBot):
+    def __init__(self, section=None, configfile=None):
+        #if configfile is None:
+        #    configfile = '~/.gaworkflow'
+            
+        super(CopierBot, self).__init__(section, configfile)
+        
+        # options for rsync command
+        self.cfg['rsync_password_file'] = None
+        self.cfg['rsync_source'] = None
+        self.cfg['rsync_destination'] = None 
+        
+        # options for reporting we're done 
+        self.cfg['notify_users'] = None
+        self.cfg['notify_runner'] = None
+                            
+        self.pending = []
+        self.rsync = None
+        self.notify_users = None
+        self.notify_runner = None
+        
+        self.register_function(self.startCopy)
+        self.register_function(self.runFinished)
+        self.eventTasks.append(self.update)
+        
+    def read_config(self, section=None, configfile=None):
+        """
+        read the config file
+        """
+        super(CopierBot, self).read_config(section, configfile)
+        
+        def check_option(name):
+            if self.cfg[name] is None:
+                errmsg="Please specify %s in the configfile" % (name)
+                logging.fatal(errmsg)
+                raise RuntimeError(errmsg)
+            else:
+                return self.cfg[name]
+            
+        password = check_option('rsync_password_file')
+        source = check_option('rsync_source')
+        destination = check_option('rsync_destination')
+        self.rsync = rsync(source, destination, password)
+        
+        self.notify_users = self._parse_user_list(self.cfg['notify_users'])
+        self.notify_runner = self._parse_user_list(self.cfg['notify_runner'])
 
-  logging.basicConfig(level=logging.INFO, 
-                      format='%(asctime)s %(levelname)s %(message)s')
-  log = logging.getLogger()
-  log.addHandler(logging.handlers.RotatingFileHandler(
-                   '/tmp/copier_bot.log', maxBytes=1000000, backupCount = 3)
-                )
-  bot = BenderFactory(configname)
-  bot.parser = copier_bot_parser()
-  bot.logon()
-  bot.eventLoop()
-  logging.shutdown()
-  return 0
+    def startCopy(self, *args):
+        """
+        start our copy
+        """
+        logging.info("starting copy scan")
+        started = self.rsync.copy()
+        logging.info("copying:" + " ".join(started)+".")
+        return started
+        
+    def runFinished(self, runDir, *args):
+        """
+        The run was finished, if we're done copying, pass the message on        
+        """
+        # close any open processes
+        self.rsync.poll()
+        
+        # see if we're still copying
+        if runfolder_validate(runDir):
+            if runDir in self.rsync.keys():
+                # still copying
+                self.pending.append(runDir)
+                logging.info("%s finished, but still copying" % (runDir))
+                return "PENDING"
+            else:
+                # we're done
+                self.reportRunFinished(runDir)
+                logging.info("%s finished" % (runDir))
+                return "DONE"
+        else:
+            errmsg = "received bad runfolder name (%s)" % (runDir)
+            logging.warning(errmsg)
+            # maybe I should use a different error message
+            raise RuntimeError(errmsg)
+    
+    def reportRunFinished(self, runDir):
+        """
+        Send the runFinished message to the interested parties
+        """
+        if self.notify_users is not None:
+            for u in self.notify_users:
+                self.send(u, 'run %s finished' % (runDir))
+        if self.notify_runner is not None:
+            for r in self.notify_runner:
+                rpc.send(self.cl, self.runner, (runDir,), 'runFinished')
+        logging.info("forwarding runFinshed message for %s" % (runDir))
+        
+    def update(self, *args):
+        """
+        Update our current status.
+        Report if we've finished copying files.
+        """
+        self.rsync.poll()
+        for p in self.pending:
+            if p not in self.rsync.keys():
+                self.reportRunFinished(p)
+                self.pending.remove(p)
+        
+    def _parser(self, msg, who):
+        """
+        Parse xmpp chat messages
+        """
+        help = u"I can [copy], or report current [status]"
+        if re.match(u"help", msg):
+            reply = help
+        elif re.match("copy", msg):            
+            started = self.startCopy()
+            reply = u"started copying " + ", ".join(started)
+        elif re.match(u"status", msg):
+            msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
+            for d in self.rsync.keys():
+              msg.append(u"  " + d)
+            reply = os.linesep.join(msg)
+        else:
+            reply = u"I didn't understand '%s'"+os.linesep+help % (unicode(msg))
+        return reply
 
+def main(args=None):
+    bot = CopierBot()
+    bot.main(args)
+    
 if __name__ == "__main__":
   sys.exit(main(sys.argv[1:]))
 
diff --git a/test/test_copier.py b/test/test_copier.py
new file mode 100644 (file)
index 0000000..2cb92f0
--- /dev/null
@@ -0,0 +1,49 @@
+import unittest
+
+from StringIO import StringIO
+from gaworkflow import copier
+
+class testCopier(unittest.TestCase):
+    def test_runfolder_validate(self):
+        self.failUnlessEqual(copier.runfolder_validate(""), False)
+        self.failUnlessEqual(copier.runfolder_validate("1345_23"), False)
+        self.failUnlessEqual(copier.runfolder_validate("123456_asdf-$23'"), False)
+        self.failUnlessEqual(copier.runfolder_validate("123456_USI-EAS44"), True)
+        self.failUnlessEqual(copier.runfolder_validate("123456_USI-EAS44 "), False)
+        
+    def test_empty_config(self):
+        cfg = StringIO("""[fake]
+something: unrelated
+""")
+        bot = copier.CopierBot('fake', configfile=cfg)
+        self.failUnlessRaises(RuntimeError, bot.read_config)
+        
+    def test_full_config(self):
+        cfg = StringIO("""[copier]        
+jid: copier@example.fake
+password: badpassword
+authorized_users: user1@example.fake user2@example.fake
+rsync_password_file: ~/.sequencer
+rsync_source: /tmp/sequencer_source
+rsync_destination: /tmp/sequencer_destination
+notify_users: user3@example.fake
+# who to run to
+#runner:
+""")
+        c = copier.CopierBot("copier", configfile=cfg)
+        c.read_config()
+        self.failUnlessEqual(c.jid, 'copier@example.fake')
+        self.failUnlessEqual(c.cfg['password'], 'badpassword')
+        self.failUnlessEqual(len(c.authorized_users), 2)
+        self.failUnlessEqual(c.authorized_users[0], 'user1@example.fake')
+        self.failUnlessEqual(c.authorized_users[1], 'user2@example.fake')
+        self.failUnlessEqual(c.rsync.source_base, '/tmp/sequencer_source')
+        self.failUnlessEqual(c.rsync.dest_base, '/tmp/sequencer_destination')
+        self.failUnlessEqual(len(c.notify_users), 1)
+        self.failUnlessEqual(c.notify_users[0], 'user3@example.fake')
+
+def suite():
+    return unittest.makeSuite(testCopier,'test')
+
+if __name__ == "__main__":
+    unittest.main(defaultTest="suite")