+import ConfigParser
import copy
import logging
import logging.handlers
import time
import traceback
-from benderjab.bot import BenderFactory
+from benderjab import rpc
+def runfolder_validate(fname):
+ """
+ Return True if fname looks like a runfolder name
+ """
+ if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
+ return True
+ else:
+ return False
+
class rsync(object):
-
- def __init__(self, pwfile):
- self.pwfile = pwfile
+ def __init__(self, source, dest, pwfile):
+ self.pwfile = os.path.expanduser(pwfile)
self.cmd = ['/usr/bin/rsync', ]
- self.cmd.append('--password-file=%s' % (pwfile))
- self.source_base = 'rsync://sequencer@jumpgate.caltech.edu:8730/sequencer/'
- self.dest_base = '/home/diane/gec/'
+ self.cmd.append('--password-file=%s' % (self.pwfile))
+ self.source_base = source
+ self.dest_base = dest
self.processes = {}
self.exit_code = None
return subprocess.Popen(args)
def copy(self):
- """copy any interesting looking directories over
"""
+ copy any interesting looking directories over
+ return list of items that we started copying.
+ """
+ # clean up any lingering non-running processes
+ self.poll()
+
+ # what's available to copy?
dirs_to_copy = self.list()
+
+ # lets start copying
+ started = []
for d in dirs_to_copy:
process = self.processes.get(d, None)
+
if process is None:
# we don't have a process, so make one
logging.info("rsyncing %s" % (d))
self.processes[d] = self.create_copy_process(d)
- else:
- retcode = process.poll()
- if retcode is not None:
- # we finished
- logging.info("finished rsyncing %s, exitcode %d" % (d, retcode))
- del self.processes[d]
-
-class copier_bot_parser(object):
- def __init__(self, ):
- self.rsync = rsync('/home/diane/.sequencer')
+ started.append(d)
+ return started
+
+ def poll(self):
+ """
+ check currently running processes to see if they're done
+
+ return path roots that have finished.
+ """
+ for dir_key, proc_value in self.processes.items():
+ retcode = proc_value.poll()
+ if retcode is None:
+ # process hasn't finished yet
+ pass
+ elif retcode == 0:
+ logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
+ del self.processes[dir_key]
+ else:
+ logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
+
+ def __len__(self):
+ """
+ Return how many active rsync processes we currently have
+
+ Call poll first to close finished processes.
+ """
+ return len(self.processes)
- def __call__(self, msg, who):
- try:
- if re.match("start copy", msg):
- logging.info("starting copy for %s" % (who.getStripped()))
- self.rsync.copy()
- except Exception, e:
- errmsg = "Exception: " + str(e)
- logging.error(errmsg)
- logging.error(traceback.format_exc())
- return errmsg
+ def keys(self):
+ """
+ Return list of current run folder names
+ """
+ return self.processes.keys()
-def main(args=None):
- if len(args) != 1:
- print "need .benderjab config name"
- configname = args[0]
+class CopierBot(rpc.XmlRpcBot):
+ def __init__(self, section=None, configfile=None):
+ #if configfile is None:
+ # configfile = '~/.gaworkflow'
+
+ super(CopierBot, self).__init__(section, configfile)
+
+ # options for rsync command
+ self.cfg['rsync_password_file'] = None
+ self.cfg['rsync_source'] = None
+ self.cfg['rsync_destination'] = None
+
+ # options for reporting we're done
+ self.cfg['notify_users'] = None
+ self.cfg['notify_runner'] = None
+
+ self.pending = []
+ self.rsync = None
+ self.notify_users = None
+ self.notify_runner = None
+
+ self.register_function(self.startCopy)
+ self.register_function(self.runFinished)
+ self.eventTasks.append(self.update)
+
+ def read_config(self, section=None, configfile=None):
+ """
+ read the config file
+ """
+ super(CopierBot, self).read_config(section, configfile)
+
+ def check_option(name):
+ if self.cfg[name] is None:
+ errmsg="Please specify %s in the configfile" % (name)
+ logging.fatal(errmsg)
+ raise RuntimeError(errmsg)
+ else:
+ return self.cfg[name]
+
+ password = check_option('rsync_password_file')
+ source = check_option('rsync_source')
+ destination = check_option('rsync_destination')
+ self.rsync = rsync(source, destination, password)
+
+ self.notify_users = self._parse_user_list(self.cfg['notify_users'])
+ self.notify_runner = self._parse_user_list(self.cfg['notify_runner'])
- logging.basicConfig(level=logging.INFO,
- format='%(asctime)s %(levelname)s %(message)s')
- log = logging.getLogger()
- log.addHandler(logging.handlers.RotatingFileHandler(
- '/tmp/copier_bot.log', maxBytes=1000000, backupCount = 3)
- )
- bot = BenderFactory(configname)
- bot.parser = copier_bot_parser()
- bot.logon()
- bot.eventLoop()
- logging.shutdown()
- return 0
+ def startCopy(self, *args):
+ """
+ start our copy
+ """
+ logging.info("starting copy scan")
+ started = self.rsync.copy()
+ logging.info("copying:" + " ".join(started)+".")
+ return started
+
+ def runFinished(self, runDir, *args):
+ """
+ The run was finished, if we're done copying, pass the message on
+ """
+ # close any open processes
+ self.rsync.poll()
+
+ # see if we're still copying
+ if runfolder_validate(runDir):
+ if runDir in self.rsync.keys():
+ # still copying
+ self.pending.append(runDir)
+ logging.info("%s finished, but still copying" % (runDir))
+ return "PENDING"
+ else:
+ # we're done
+ self.reportRunFinished(runDir)
+ logging.info("%s finished" % (runDir))
+ return "DONE"
+ else:
+ errmsg = "received bad runfolder name (%s)" % (runDir)
+ logging.warning(errmsg)
+ # maybe I should use a different error message
+ raise RuntimeError(errmsg)
+
+ def reportRunFinished(self, runDir):
+ """
+ Send the runFinished message to the interested parties
+ """
+ if self.notify_users is not None:
+ for u in self.notify_users:
+ self.send(u, 'run %s finished' % (runDir))
+ if self.notify_runner is not None:
+ for r in self.notify_runner:
+ rpc.send(self.cl, self.runner, (runDir,), 'runFinished')
+ logging.info("forwarding runFinshed message for %s" % (runDir))
+
+ def update(self, *args):
+ """
+ Update our current status.
+ Report if we've finished copying files.
+ """
+ self.rsync.poll()
+ for p in self.pending:
+ if p not in self.rsync.keys():
+ self.reportRunFinished(p)
+ self.pending.remove(p)
+
+ def _parser(self, msg, who):
+ """
+ Parse xmpp chat messages
+ """
+ help = u"I can [copy], or report current [status]"
+ if re.match(u"help", msg):
+ reply = help
+ elif re.match("copy", msg):
+ started = self.startCopy()
+ reply = u"started copying " + ", ".join(started)
+ elif re.match(u"status", msg):
+ msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
+ for d in self.rsync.keys():
+ msg.append(u" " + d)
+ reply = os.linesep.join(msg)
+ else:
+ reply = u"I didn't understand '%s'"+os.linesep+help % (unicode(msg))
+ return reply
+def main(args=None):
+ bot = CopierBot()
+ bot.main(args)
+
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
--- /dev/null
+import unittest
+
+from StringIO import StringIO
+from gaworkflow import copier
+
+class testCopier(unittest.TestCase):
+ def test_runfolder_validate(self):
+ self.failUnlessEqual(copier.runfolder_validate(""), False)
+ self.failUnlessEqual(copier.runfolder_validate("1345_23"), False)
+ self.failUnlessEqual(copier.runfolder_validate("123456_asdf-$23'"), False)
+ self.failUnlessEqual(copier.runfolder_validate("123456_USI-EAS44"), True)
+ self.failUnlessEqual(copier.runfolder_validate("123456_USI-EAS44 "), False)
+
+ def test_empty_config(self):
+ cfg = StringIO("""[fake]
+something: unrelated
+""")
+ bot = copier.CopierBot('fake', configfile=cfg)
+ self.failUnlessRaises(RuntimeError, bot.read_config)
+
+ def test_full_config(self):
+ cfg = StringIO("""[copier]
+jid: copier@example.fake
+password: badpassword
+authorized_users: user1@example.fake user2@example.fake
+rsync_password_file: ~/.sequencer
+rsync_source: /tmp/sequencer_source
+rsync_destination: /tmp/sequencer_destination
+notify_users: user3@example.fake
+# who to run to
+#runner:
+""")
+ c = copier.CopierBot("copier", configfile=cfg)
+ c.read_config()
+ self.failUnlessEqual(c.jid, 'copier@example.fake')
+ self.failUnlessEqual(c.cfg['password'], 'badpassword')
+ self.failUnlessEqual(len(c.authorized_users), 2)
+ self.failUnlessEqual(c.authorized_users[0], 'user1@example.fake')
+ self.failUnlessEqual(c.authorized_users[1], 'user2@example.fake')
+ self.failUnlessEqual(c.rsync.source_base, '/tmp/sequencer_source')
+ self.failUnlessEqual(c.rsync.dest_base, '/tmp/sequencer_destination')
+ self.failUnlessEqual(len(c.notify_users), 1)
+ self.failUnlessEqual(c.notify_users[0], 'user3@example.fake')
+
+def suite():
+ return unittest.makeSuite(testCopier,'test')
+
+if __name__ == "__main__":
+ unittest.main(defaultTest="suite")