From 3b2aa93cf5c13fefd8912056ae9907476885e073 Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Sat, 20 Jun 2009 00:06:44 +0000 Subject: [PATCH] Send the specific directory that needs to be copied in the startCopy message. Also I changed it to send "urls" which need to match in both spoolwatcher and copier. (as security feature it checks to make sure the rsync directory it got over the network matches one of its internal list) --- htsworkflow/automation/copier.py | 39 ++++++++++++++++++++------ htsworkflow/automation/spoolwatcher.py | 31 +++++++++++--------- test/test_copier.py | 20 +++++++++++-- 3 files changed, 65 insertions(+), 25 deletions(-) diff --git a/htsworkflow/automation/copier.py b/htsworkflow/automation/copier.py index 9ec1e2b..775e6c6 100644 --- a/htsworkflow/automation/copier.py +++ b/htsworkflow/automation/copier.py @@ -9,6 +9,7 @@ import subprocess import sys import time import traceback +import urlparse from benderjab import rpc @@ -81,17 +82,22 @@ class rsync(object): logging.debug("Rsync cmd:" + " ".join(args)) return subprocess.Popen(args) - def copy(self): + def copy(self, url_list=None): """ copy any interesting looking directories over return list of items that we started copying. """ # clean up any lingering non-running processes self.poll() - - # what's available to copy? - dirs_to_copy = self.list() - + + if url_list is None or len(url_list) == 0: + # what's available to copy? + dirs_to_copy = self.list() + else: + dirs_to_copy = url_list + + logging.info("dirs to copy %s" % (dirs_to_copy,)) + # lets start copying started = [] for d in dirs_to_copy: @@ -155,7 +161,7 @@ class CopierBot(rpc.XmlRpcBot): # options for rsync command self.cfg['rsync_password_file'] = None - self.cfg['rsync_source'] = None + self.cfg['rsync_sources'] = None self.cfg['rsync_destination'] = None # options for reporting we're done @@ -213,8 +219,15 @@ class CopierBot(rpc.XmlRpcBot): """ start our copy """ - logging.info("starting copy scan, %s" % (args,)) - started = self.rsync.copy() + # Note, args comes in over the network, so don't trust it. + copy_urls = [] + for a in args: + clean_url = self.validate_url(a) + if clean_url is not None: + copy_urls.append(clean_url) + + logging.info("Validated urls = %s" % (copy_urls,)) + started = self.rsync.copy(copy_urls) logging.info("copying:" + " ".join(started)+".") return started @@ -279,6 +292,16 @@ class CopierBot(rpc.XmlRpcBot): reply = u"I didn't understand '%s'" % (unicode(msg)) return reply + def validate_url(self, url): + split_url = urlparse.urlsplit(url) + for source in self.sources: + split_source = urlparse.urlsplit(source) + if (split_url.scheme == split_source.scheme) and \ + (split_url.netloc == split_source.netloc) and \ + (split_url.path.startswith(split_source.path)): + return url + return None + def main(args=None): bot = CopierBot() bot.main(args) diff --git a/htsworkflow/automation/spoolwatcher.py b/htsworkflow/automation/spoolwatcher.py index 25c012d..783d596 100644 --- a/htsworkflow/automation/spoolwatcher.py +++ b/htsworkflow/automation/spoolwatcher.py @@ -251,7 +251,7 @@ class SpoolWatcher(rpc.XmlRpcBot): if re.match(u"help", msg): reply = help elif re.match("copy", msg): - self.startCopy() + self.startCopy(msg) reply = u"sent copy message" elif re.match(u"finished", msg): words = msg.split() @@ -287,25 +287,28 @@ class SpoolWatcher(rpc.XmlRpcBot): logging.debug("writes seem to have stopped") if self.notify_runner is not None: for r in self.notify_runner: - self.rpc_send(r, tuple(), 'startCopy') + self.rpc_send(r, tuple([copy_url]), 'startCopy') if self.notify_users is not None: for u in self.notify_users: - self.send(u, 'startCopy %s.' % (copy_url,)) + self.send(u, 'startCopy %s.' % (copy_urls,)) def sequencingFinished(self, run_dir): # need to strip off self.watchdirs from rundir I suspect. logging.info("run.completed in " + str(run_dir)) - pattern = self.watch_dir - if pattern[-1] != os.path.sep: - pattern += os.path.sep - stripped_run_dir = re.sub(pattern, "", run_dir) - logging.debug("stripped to " + stripped_run_dir) - if self.notify_users is not None: - for u in self.notify_users: - self.send(u, 'Sequencing run %s finished' % (stripped_run_dir)) - if self.notify_runner is not None: - for r in self.notify_runner: - self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished') + for watch in self.watchdirs: + if not run_dir.startswith(watch): + continue + if watch[-1] != os.path.sep: + watch += os.path.sep + stripped_run_dir = re.sub(watch, "", run_dir) + logging.debug("stripped to " + stripped_run_dir) + if self.notify_users is not None: + for u in self.notify_users: + self.send(u, 'Sequencing run %s finished' % \ + (stripped_run_dir)) + if self.notify_runner is not None: + for r in self.notify_runner: + self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished') def main(args=None): bot = SpoolWatcher() diff --git a/test/test_copier.py b/test/test_copier.py index f34be14..da48340 100644 --- a/test/test_copier.py +++ b/test/test_copier.py @@ -16,7 +16,7 @@ class testCopier(unittest.TestCase): something: unrelated """) bot = copier.CopierBot('fake', configfile=cfg) - self.failUnlessRaises(KeyError, bot.read_config) + self.failUnlessRaises(RuntimeError, bot.read_config) def test_full_config(self): cfg = StringIO("""[copier] @@ -24,7 +24,7 @@ jid: copier@example.fake password: badpassword authorized_users: user1@example.fake user2@example.fake rsync_password_file: ~/.sequencer -rsync_sources: /tmp/sequencer_source +rsync_sources: rsync://localhost/tmp/sequencer_source rsync_destination: /tmp/sequencer_destination notify_users: user3@example.fake # who to run to @@ -39,10 +39,24 @@ notify_users: user3@example.fake self.failUnlessEqual(c.authorized_users[0], 'user1@example.fake') self.failUnlessEqual(c.authorized_users[1], 'user2@example.fake') self.failUnlessEqual(c.rsync.source_base_list[0], - '/tmp/sequencer_source/') + 'rsync://localhost/tmp/sequencer_source/') self.failUnlessEqual(c.rsync.dest_base, '/tmp/sequencer_destination') self.failUnlessEqual(len(c.notify_users), 1) self.failUnlessEqual(c.notify_users[0], 'user3@example.fake') + self.failUnlessEqual(c.validate_url('rsync://other/tmp'), None) + self.failUnlessEqual(c.validate_url('http://localhost/tmp'), None) + # In the rsync process the URL gets a trailing '/' added to it + # But in the bot config its still slash-less. + # It is debatable when to add the trailing slash. + self.failUnlessEqual( + c.validate_url('rsync://localhost/tmp/sequencer_source'), + 'rsync://localhost/tmp/sequencer_source') + self.failUnlessEqual( + c.validate_url('rsync://localhost/tmp/sequencer_source/'), + 'rsync://localhost/tmp/sequencer_source/') + self.failUnlessEqual( + c.validate_url('rsync://localhost/tmp/sequencer_source/bleem'), + 'rsync://localhost/tmp/sequencer_source/bleem') def test_dirlist_filter(self): """ -- 2.30.2