From 996b4e7c575f1bb8e6b19e34925cacb13fbf8b77 Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Sat, 8 Dec 2007 00:40:11 +0000 Subject: [PATCH] [project @ update CopierBot to new logging daemonizable XML-RPC BenderJab Bot] this version reads all of the parameters out of the .benderjab config file, will report what its currently copying, and when it gets a "runFinished" message, will wait until its finished copying before forwarding that on. If it dies before finishing copying, but after gettting the runFinished message that might get lost. --- gaworkflow/copier.py | 228 ++++++++++++++++++++++++++++++++++--------- test/test_copier.py | 49 ++++++++++ 2 files changed, 233 insertions(+), 44 deletions(-) create mode 100644 test/test_copier.py diff --git a/gaworkflow/copier.py b/gaworkflow/copier.py index 2655320..8f46836 100644 --- a/gaworkflow/copier.py +++ b/gaworkflow/copier.py @@ -1,3 +1,4 @@ +import ConfigParser import copy import logging import logging.handlers @@ -8,16 +9,24 @@ import sys import time import traceback -from benderjab.bot import BenderFactory +from benderjab import rpc +def runfolder_validate(fname): + """ + Return True if fname looks like a runfolder name + """ + if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname): + return True + else: + return False + class rsync(object): - - def __init__(self, pwfile): - self.pwfile = pwfile + def __init__(self, source, dest, pwfile): + self.pwfile = os.path.expanduser(pwfile) self.cmd = ['/usr/bin/rsync', ] - self.cmd.append('--password-file=%s' % (pwfile)) - self.source_base = 'rsync://sequencer@jumpgate.caltech.edu:8730/sequencer/' - self.dest_base = '/home/diane/gec/' + self.cmd.append('--password-file=%s' % (self.pwfile)) + self.source_base = source + self.dest_base = dest self.processes = {} self.exit_code = None @@ -52,55 +61,186 @@ class rsync(object): return subprocess.Popen(args) def copy(self): - """copy any interesting looking directories over """ + copy any interesting looking directories over + return list of items that we started copying. + """ + # clean up any lingering non-running processes + self.poll() + + # what's available to copy? dirs_to_copy = self.list() + + # lets start copying + started = [] for d in dirs_to_copy: process = self.processes.get(d, None) + if process is None: # we don't have a process, so make one logging.info("rsyncing %s" % (d)) self.processes[d] = self.create_copy_process(d) - else: - retcode = process.poll() - if retcode is not None: - # we finished - logging.info("finished rsyncing %s, exitcode %d" % (d, retcode)) - del self.processes[d] - -class copier_bot_parser(object): - def __init__(self, ): - self.rsync = rsync('/home/diane/.sequencer') + started.append(d) + return started + + def poll(self): + """ + check currently running processes to see if they're done + + return path roots that have finished. + """ + for dir_key, proc_value in self.processes.items(): + retcode = proc_value.poll() + if retcode is None: + # process hasn't finished yet + pass + elif retcode == 0: + logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode)) + del self.processes[dir_key] + else: + logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode)) + + def __len__(self): + """ + Return how many active rsync processes we currently have + + Call poll first to close finished processes. + """ + return len(self.processes) - def __call__(self, msg, who): - try: - if re.match("start copy", msg): - logging.info("starting copy for %s" % (who.getStripped())) - self.rsync.copy() - except Exception, e: - errmsg = "Exception: " + str(e) - logging.error(errmsg) - logging.error(traceback.format_exc()) - return errmsg + def keys(self): + """ + Return list of current run folder names + """ + return self.processes.keys() -def main(args=None): - if len(args) != 1: - print "need .benderjab config name" - configname = args[0] +class CopierBot(rpc.XmlRpcBot): + def __init__(self, section=None, configfile=None): + #if configfile is None: + # configfile = '~/.gaworkflow' + + super(CopierBot, self).__init__(section, configfile) + + # options for rsync command + self.cfg['rsync_password_file'] = None + self.cfg['rsync_source'] = None + self.cfg['rsync_destination'] = None + + # options for reporting we're done + self.cfg['notify_users'] = None + self.cfg['notify_runner'] = None + + self.pending = [] + self.rsync = None + self.notify_users = None + self.notify_runner = None + + self.register_function(self.startCopy) + self.register_function(self.runFinished) + self.eventTasks.append(self.update) + + def read_config(self, section=None, configfile=None): + """ + read the config file + """ + super(CopierBot, self).read_config(section, configfile) + + def check_option(name): + if self.cfg[name] is None: + errmsg="Please specify %s in the configfile" % (name) + logging.fatal(errmsg) + raise RuntimeError(errmsg) + else: + return self.cfg[name] + + password = check_option('rsync_password_file') + source = check_option('rsync_source') + destination = check_option('rsync_destination') + self.rsync = rsync(source, destination, password) + + self.notify_users = self._parse_user_list(self.cfg['notify_users']) + self.notify_runner = self._parse_user_list(self.cfg['notify_runner']) - logging.basicConfig(level=logging.INFO, - format='%(asctime)s %(levelname)s %(message)s') - log = logging.getLogger() - log.addHandler(logging.handlers.RotatingFileHandler( - '/tmp/copier_bot.log', maxBytes=1000000, backupCount = 3) - ) - bot = BenderFactory(configname) - bot.parser = copier_bot_parser() - bot.logon() - bot.eventLoop() - logging.shutdown() - return 0 + def startCopy(self, *args): + """ + start our copy + """ + logging.info("starting copy scan") + started = self.rsync.copy() + logging.info("copying:" + " ".join(started)+".") + return started + + def runFinished(self, runDir, *args): + """ + The run was finished, if we're done copying, pass the message on + """ + # close any open processes + self.rsync.poll() + + # see if we're still copying + if runfolder_validate(runDir): + if runDir in self.rsync.keys(): + # still copying + self.pending.append(runDir) + logging.info("%s finished, but still copying" % (runDir)) + return "PENDING" + else: + # we're done + self.reportRunFinished(runDir) + logging.info("%s finished" % (runDir)) + return "DONE" + else: + errmsg = "received bad runfolder name (%s)" % (runDir) + logging.warning(errmsg) + # maybe I should use a different error message + raise RuntimeError(errmsg) + + def reportRunFinished(self, runDir): + """ + Send the runFinished message to the interested parties + """ + if self.notify_users is not None: + for u in self.notify_users: + self.send(u, 'run %s finished' % (runDir)) + if self.notify_runner is not None: + for r in self.notify_runner: + rpc.send(self.cl, self.runner, (runDir,), 'runFinished') + logging.info("forwarding runFinshed message for %s" % (runDir)) + + def update(self, *args): + """ + Update our current status. + Report if we've finished copying files. + """ + self.rsync.poll() + for p in self.pending: + if p not in self.rsync.keys(): + self.reportRunFinished(p) + self.pending.remove(p) + + def _parser(self, msg, who): + """ + Parse xmpp chat messages + """ + help = u"I can [copy], or report current [status]" + if re.match(u"help", msg): + reply = help + elif re.match("copy", msg): + started = self.startCopy() + reply = u"started copying " + ", ".join(started) + elif re.match(u"status", msg): + msg = [u"Currently %d rsync processes are running." % (len(self.rsync))] + for d in self.rsync.keys(): + msg.append(u" " + d) + reply = os.linesep.join(msg) + else: + reply = u"I didn't understand '%s'"+os.linesep+help % (unicode(msg)) + return reply +def main(args=None): + bot = CopierBot() + bot.main(args) + if __name__ == "__main__": sys.exit(main(sys.argv[1:])) diff --git a/test/test_copier.py b/test/test_copier.py new file mode 100644 index 0000000..2cb92f0 --- /dev/null +++ b/test/test_copier.py @@ -0,0 +1,49 @@ +import unittest + +from StringIO import StringIO +from gaworkflow import copier + +class testCopier(unittest.TestCase): + def test_runfolder_validate(self): + self.failUnlessEqual(copier.runfolder_validate(""), False) + self.failUnlessEqual(copier.runfolder_validate("1345_23"), False) + self.failUnlessEqual(copier.runfolder_validate("123456_asdf-$23'"), False) + self.failUnlessEqual(copier.runfolder_validate("123456_USI-EAS44"), True) + self.failUnlessEqual(copier.runfolder_validate("123456_USI-EAS44 "), False) + + def test_empty_config(self): + cfg = StringIO("""[fake] +something: unrelated +""") + bot = copier.CopierBot('fake', configfile=cfg) + self.failUnlessRaises(RuntimeError, bot.read_config) + + def test_full_config(self): + cfg = StringIO("""[copier] +jid: copier@example.fake +password: badpassword +authorized_users: user1@example.fake user2@example.fake +rsync_password_file: ~/.sequencer +rsync_source: /tmp/sequencer_source +rsync_destination: /tmp/sequencer_destination +notify_users: user3@example.fake +# who to run to +#runner: +""") + c = copier.CopierBot("copier", configfile=cfg) + c.read_config() + self.failUnlessEqual(c.jid, 'copier@example.fake') + self.failUnlessEqual(c.cfg['password'], 'badpassword') + self.failUnlessEqual(len(c.authorized_users), 2) + self.failUnlessEqual(c.authorized_users[0], 'user1@example.fake') + self.failUnlessEqual(c.authorized_users[1], 'user2@example.fake') + self.failUnlessEqual(c.rsync.source_base, '/tmp/sequencer_source') + self.failUnlessEqual(c.rsync.dest_base, '/tmp/sequencer_destination') + self.failUnlessEqual(len(c.notify_users), 1) + self.failUnlessEqual(c.notify_users[0], 'user3@example.fake') + +def suite(): + return unittest.makeSuite(testCopier,'test') + +if __name__ == "__main__": + unittest.main(defaultTest="suite") -- 2.30.2