Scan more than one rsync repository for directory trees to copy.
authorDiane Trout <diane@caltech.edu>
Wed, 10 Jun 2009 00:51:49 +0000 (00:51 +0000)
committerDiane Trout <diane@caltech.edu>
Wed, 10 Jun 2009 00:51:49 +0000 (00:51 +0000)
Also don't configure the rsync class until the bot.run function
to avoid problems with logging while daemonized.

Also add some code to make sure that we have a trailing /

htsworkflow/automation/copier.py

index 572cfb3abacbff9abe8ad296245fafc695db66b9..785691eae4e7afaded5e4b0e377ccb68156ed422 100644 (file)
@@ -4,6 +4,7 @@ import logging
 import logging.handlers
 import os
 import re
+import shlex
 import subprocess
 import sys
 import time
@@ -21,23 +22,35 @@ def runfolder_validate(fname):
         return False
     
 class rsync(object):
-  def __init__(self, source, dest, pwfile):
-    self.pwfile = os.path.expanduser(pwfile)
+  def __init__(self, sources, dest, pwfile):
     self.cmd = ['/usr/bin/rsync', ]
+    self.pwfile = os.path.expanduser(pwfile)
     self.cmd.append('--password-file=%s' % (self.pwfile))
-    self.source_base = source
+    self.source_base_list = [ self._normalize_rsync_source(x) for x in sources]
     self.dest_base = dest
     self.processes = {}
     self.exit_code = None
 
   def list(self):
-    """Get a directory listing"""
-    args = copy.copy(self.cmd)
-    args.append(self.source_base)
+    """
+    Get a directory listing for all our sources
+    """
+    logging.debug("searching for entries in: %s" % (self.source_base_list,))
+    entries = []
+    for source in self.source_base_list:
+        logging.debug("Scanning %s" % (source,))
+        args = copy.copy(self.cmd)
+        args.append(source)
 
-    logging.debug("Rsync cmd:" + " ".join(args))
-    short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
-    return self.list_filter(short_process.stdout)
+        logging.debug("Rsync cmd:" + " ".join(args))
+        short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
+        exit_code = short_process.wait()
+        stdout = short_process.stdout
+        # We made sure source ends in a / earlier
+        cur_list = [ source+subdir for subdir in self.list_filter(stdout)]
+        entries.extend(cur_list)
+    logging.debug(u"Found the following: %s" % (unicode(entries)))
+    return entries
 
   def list_filter(self, lines):
     """
@@ -45,6 +58,7 @@ class rsync(object):
     """
     dirs_to_copy = []
     direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ]
+    logging.debug(u'direntries: %s' % (unicode(direntries),))
     for permissions, size, filedate, filetime, filename in direntries:
       if permissions[0] == 'd':
         # hey its a directory, the first step to being something we want to 
@@ -55,12 +69,13 @@ class rsync(object):
           dirs_to_copy.append(filename)
     return dirs_to_copy
 
-  def create_copy_process(self, dirname):
+  def create_copy_process(self, urlname):
     args = copy.copy(self.cmd)
+    # args.append('--dry-run') # Makes testing easier
     # we want to copy everything
     args.append('-rlt') 
     # from here
-    args.append(os.path.join(self.source_base, dirname))
+    args.append(urlname)
     # to here
     args.append(self.dest_base)
     logging.debug("Rsync cmd:" + " ".join(args))
@@ -88,7 +103,18 @@ class rsync(object):
         self.processes[d] = self.create_copy_process(d)
         started.append(d)           
     return started
-      
+
+  def _normalize_rsync_source(self, source):
+      """
+      Make sure that we have a reasonable looking source
+      a source must be a directory/collection.
+      """
+      # we must be a directory
+      if source[-1] != '/':
+        source += '/'
+      # I suppose we could check to see if we start with rsync:// or something
+      return source
+
   def poll(self):
       """
       check currently running processes to see if they're done
@@ -151,10 +177,9 @@ class CopierBot(rpc.XmlRpcBot):
         """
         super(CopierBot, self).read_config(section, configfile)
         
-        password = self._check_required_option('rsync_password_file')
-        source = self._check_required_option('rsync_source')
-        destination = self._check_required_option('rsync_destination')
-        self.rsync = rsync(source, destination, password)
+        self.sources = shlex.split(self._check_required_option('rsync_sources'))
+        self.password = self._check_required_option('rsync_password_file')
+        self.destination = self._check_required_option('rsync_destination')
         
         self.notify_users = self._parse_user_list(self.cfg['notify_users'])
         try:
@@ -163,14 +188,23 @@ class CopierBot(rpc.XmlRpcBot):
                                    require_resource=True)
         except bot.JIDMissingResource:
             msg = 'need a full jabber ID + resource for xml-rpc destinations'
-            logging.FATAL(msg)
+            print >>sys.stderr, msg
             raise bot.JIDMissingResource(msg)
 
+    def run(self):
+        """
+        Start application
+        """
+        # we can't call any logging function until after start finishes.
+        if self.rsync is None:
+            self.rsync = rsync(self.sources, self.destination, self.password)
+        super(CopierBot, self).run()
+
     def startCopy(self, *args):
         """
         start our copy
         """
-        logging.info("starting copy scan")
+        logging.info("starting copy scan, %s" % (args,))
         started = self.rsync.copy()
         logging.info("copying:" + " ".join(started)+".")
         return started