Send the specific directory that needs to be copied in the startCopy message.
authorDiane Trout <diane@caltech.edu>
Sat, 20 Jun 2009 00:06:44 +0000 (00:06 +0000)
committerDiane Trout <diane@caltech.edu>
Sat, 20 Jun 2009 00:06:44 +0000 (00:06 +0000)
Also I changed it to send "urls" which need to match in both spoolwatcher
and copier. (as security feature it checks to make sure the rsync
directory it got over the network matches one of its internal list)

htsworkflow/automation/copier.py
htsworkflow/automation/spoolwatcher.py
test/test_copier.py

index 9ec1e2bdfc8597aaa42c0a016761b0a549af0310..775e6c61aa3718c99775e1d31f1ac880e61b5c2d 100644 (file)
@@ -9,6 +9,7 @@ import subprocess
 import sys
 import time
 import traceback
+import urlparse
 
 from benderjab import rpc
 
@@ -81,17 +82,22 @@ class rsync(object):
     logging.debug("Rsync cmd:" + " ".join(args))
     return subprocess.Popen(args)
  
-  def copy(self):
+  def copy(self, url_list=None):
     """
     copy any interesting looking directories over
     return list of items that we started copying.
     """
     # clean up any lingering non-running processes
     self.poll()
-    
-    # what's available to copy?
-    dirs_to_copy = self.list()
-    
+
+    if url_list is None or len(url_list) == 0: 
+           # what's available to copy?
+        dirs_to_copy = self.list()
+    else:
+        dirs_to_copy = url_list
+   
+    logging.info("dirs to copy %s" % (dirs_to_copy,))
+
     # lets start copying
     started = []
     for d in dirs_to_copy:
@@ -155,7 +161,7 @@ class CopierBot(rpc.XmlRpcBot):
         
         # options for rsync command
         self.cfg['rsync_password_file'] = None
-        self.cfg['rsync_source'] = None
+        self.cfg['rsync_sources'] = None
         self.cfg['rsync_destination'] = None 
         
         # options for reporting we're done 
@@ -213,8 +219,15 @@ class CopierBot(rpc.XmlRpcBot):
         """
         start our copy
         """
-        logging.info("starting copy scan, %s" % (args,))
-        started = self.rsync.copy()
+        # Note, args comes in over the network, so don't trust it.
+        copy_urls = []
+        for a in args:
+            clean_url = self.validate_url(a)
+            if clean_url is not None:
+                copy_urls.append(clean_url)
+
+        logging.info("Validated urls = %s" % (copy_urls,))
+        started = self.rsync.copy(copy_urls)
         logging.info("copying:" + " ".join(started)+".")
         return started
         
@@ -279,6 +292,16 @@ class CopierBot(rpc.XmlRpcBot):
             reply = u"I didn't understand '%s'" % (unicode(msg))
         return reply
 
+    def validate_url(self, url):
+        split_url = urlparse.urlsplit(url)
+        for source in self.sources:
+            split_source = urlparse.urlsplit(source)
+            if (split_url.scheme == split_source.scheme) and \
+               (split_url.netloc == split_source.netloc) and \
+               (split_url.path.startswith(split_source.path)):
+               return url
+        return None
+
 def main(args=None):
     bot = CopierBot()
     bot.main(args)
index 25c012dee3c9b9c2b33e53e38ccf61b884d90849..783d5964b83dcb7a39c60d839208bc11b3e05e7e 100644 (file)
@@ -251,7 +251,7 @@ class SpoolWatcher(rpc.XmlRpcBot):
         if re.match(u"help", msg):
             reply = help
         elif re.match("copy", msg):            
-            self.startCopy()
+            self.startCopy(msg)
             reply = u"sent copy message"
         elif re.match(u"finished", msg):
             words = msg.split()
@@ -287,25 +287,28 @@ class SpoolWatcher(rpc.XmlRpcBot):
         logging.debug("writes seem to have stopped")
         if self.notify_runner is not None:
             for r in self.notify_runner:
-                self.rpc_send(r, tuple(), 'startCopy')
+                self.rpc_send(r, tuple([copy_url]), 'startCopy')
         if self.notify_users is not None:
             for u in self.notify_users:
-                self.send(u, 'startCopy %s.' % (copy_url,))
+                self.send(u, 'startCopy %s.' % (copy_urls,))
         
     def sequencingFinished(self, run_dir):
         # need to strip off self.watchdirs from rundir I suspect.
         logging.info("run.completed in " + str(run_dir))
-        pattern = self.watch_dir
-        if pattern[-1] != os.path.sep:
-            pattern += os.path.sep
-        stripped_run_dir = re.sub(pattern, "", run_dir)
-        logging.debug("stripped to " + stripped_run_dir)
-        if self.notify_users is not None:
-            for u in self.notify_users:
-                self.send(u, 'Sequencing run %s finished' % (stripped_run_dir))
-        if self.notify_runner is not None:
-            for r in self.notify_runner:
-                self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
+        for watch in self.watchdirs:
+            if not run_dir.startswith(watch):
+                continue
+            if watch[-1] != os.path.sep:
+                watch += os.path.sep
+            stripped_run_dir = re.sub(watch, "", run_dir)
+            logging.debug("stripped to " + stripped_run_dir)
+            if self.notify_users is not None:
+                for u in self.notify_users:
+                    self.send(u, 'Sequencing run %s finished' % \
+                              (stripped_run_dir))
+            if self.notify_runner is not None:
+                for r in self.notify_runner:
+                    self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
 
 def main(args=None):
     bot = SpoolWatcher()
index f34be14159a83e41d51f2673231dabb7859a1551..da4834010d17c18d06226fa9070ac6a3c8f295c6 100644 (file)
@@ -16,7 +16,7 @@ class testCopier(unittest.TestCase):
 something: unrelated
 """)
         bot = copier.CopierBot('fake', configfile=cfg)
-        self.failUnlessRaises(KeyError, bot.read_config)
+        self.failUnlessRaises(RuntimeError, bot.read_config)
         
     def test_full_config(self):
         cfg = StringIO("""[copier]        
@@ -24,7 +24,7 @@ jid: copier@example.fake
 password: badpassword
 authorized_users: user1@example.fake user2@example.fake
 rsync_password_file: ~/.sequencer
-rsync_sources: /tmp/sequencer_source
+rsync_sources: rsync://localhost/tmp/sequencer_source
 rsync_destination: /tmp/sequencer_destination
 notify_users: user3@example.fake
 # who to run to
@@ -39,10 +39,24 @@ notify_users: user3@example.fake
         self.failUnlessEqual(c.authorized_users[0], 'user1@example.fake')
         self.failUnlessEqual(c.authorized_users[1], 'user2@example.fake')
         self.failUnlessEqual(c.rsync.source_base_list[0], 
-                             '/tmp/sequencer_source/')
+                             'rsync://localhost/tmp/sequencer_source/')
         self.failUnlessEqual(c.rsync.dest_base, '/tmp/sequencer_destination')
         self.failUnlessEqual(len(c.notify_users), 1)
         self.failUnlessEqual(c.notify_users[0], 'user3@example.fake')
+        self.failUnlessEqual(c.validate_url('rsync://other/tmp'), None)
+        self.failUnlessEqual(c.validate_url('http://localhost/tmp'), None)
+        # In the rsync process the URL gets a trailing '/' added to it
+        # But in the bot config its still slash-less. 
+        # It is debatable when to add the trailing slash.
+        self.failUnlessEqual(
+          c.validate_url('rsync://localhost/tmp/sequencer_source'), 
+          'rsync://localhost/tmp/sequencer_source') 
+        self.failUnlessEqual(
+          c.validate_url('rsync://localhost/tmp/sequencer_source/'), 
+          'rsync://localhost/tmp/sequencer_source/')
+        self.failUnlessEqual(
+          c.validate_url('rsync://localhost/tmp/sequencer_source/bleem'), 
+          'rsync://localhost/tmp/sequencer_source/bleem')
 
     def test_dirlist_filter(self):
        """