+import ConfigParser
import copy
import logging
import logging.handlers
import time
import traceback
-from benderjab.bot import BenderFactory
+from benderjab import rpc
+def runfolder_validate(fname):
+ """
+ Return True if fname looks like a runfolder name
+ """
+ if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
+ return True
+ else:
+ return False
+
class rsync(object):
-
- def __init__(self, pwfile):
- self.pwfile = pwfile
+ def __init__(self, source, dest, pwfile):
+ self.pwfile = os.path.expanduser(pwfile)
self.cmd = ['/usr/bin/rsync', ]
- self.cmd.append('--password-file=%s' % (pwfile))
- self.source_base = 'rsync://sequencer@jumpgate.caltech.edu:8730/sequencer/'
- self.dest_base = '/home/diane/gec/'
+ self.cmd.append('--password-file=%s' % (self.pwfile))
+ self.source_base = source
+ self.dest_base = dest
self.processes = {}
self.exit_code = None
return subprocess.Popen(args)
def copy(self):
- """copy any interesting looking directories over
"""
+ copy any interesting looking directories over
+ return list of items that we started copying.
+ """
+ # clean up any lingering non-running processes
+ self.poll()
+
+ # what's available to copy?
dirs_to_copy = self.list()
+
+ # lets start copying
+ started = []
for d in dirs_to_copy:
process = self.processes.get(d, None)
+
if process is None:
# we don't have a process, so make one
logging.info("rsyncing %s" % (d))
self.processes[d] = self.create_copy_process(d)
- else:
- retcode = process.poll()
- if retcode is not None:
- # we finished
- logging.info("finished rsyncing %s, exitcode %d" % (d, retcode))
- del self.processes[d]
-
-class copier_bot_parser(object):
- def __init__(self, ):
- self.rsync = rsync('/home/diane/.sequencer')
+ started.append(d)
+ return started
+
+ def poll(self):
+ """
+ check currently running processes to see if they're done
+
+ return path roots that have finished.
+ """
+ for dir_key, proc_value in self.processes.items():
+ retcode = proc_value.poll()
+ if retcode is None:
+ # process hasn't finished yet
+ pass
+ elif retcode == 0:
+ logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
+ del self.processes[dir_key]
+ else:
+ logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
+
+ def __len__(self):
+ """
+ Return how many active rsync processes we currently have
+
+ Call poll first to close finished processes.
+ """
+ return len(self.processes)
- def __call__(self, msg, who):
- try:
- if re.match("start copy", msg):
- logging.info("starting copy for %s" % (who.getStripped()))
- self.rsync.copy()
- except Exception, e:
- errmsg = "Exception: " + str(e)
- logging.error(errmsg)
- logging.error(traceback.format_exc())
- return errmsg
+ def keys(self):
+ """
+ Return list of current run folder names
+ """
+ return self.processes.keys()
-def main(args=None):
- if len(args) != 1:
- print "need .benderjab config name"
- configname = args[0]
+class CopierBot(rpc.XmlRpcBot):
+ def __init__(self, section=None, configfile=None):
+ #if configfile is None:
+ # configfile = '~/.gaworkflow'
+
+ super(CopierBot, self).__init__(section, configfile)
+
+ # options for rsync command
+ self.cfg['rsync_password_file'] = None
+ self.cfg['rsync_source'] = None
+ self.cfg['rsync_destination'] = None
+
+ # options for reporting we're done
+ self.cfg['notify_users'] = None
+ self.cfg['notify_runner'] = None
+
+ self.pending = []
+ self.rsync = None
+ self.notify_users = None
+ self.notify_runner = None
+
+ self.register_function(self.startCopy)
+ self.register_function(self.runFinished)
+ self.eventTasks.append(self.update)
+
+ def read_config(self, section=None, configfile=None):
+ """
+ read the config file
+ """
+ super(CopierBot, self).read_config(section, configfile)
+
+ def check_option(name):
+ if self.cfg[name] is None:
+ errmsg="Please specify %s in the configfile" % (name)
+ logging.fatal(errmsg)
+ raise RuntimeError(errmsg)
+ else:
+ return self.cfg[name]
+
+ password = check_option('rsync_password_file')
+ source = check_option('rsync_source')
+ destination = check_option('rsync_destination')
+ self.rsync = rsync(source, destination, password)
+
+ self.notify_users = self._parse_user_list(self.cfg['notify_users'])
+ self.notify_runner = self._parse_user_list(self.cfg['notify_runner'])
- logging.basicConfig(level=logging.INFO,
- format='%(asctime)s %(levelname)s %(message)s')
- log = logging.getLogger()
- log.addHandler(logging.handlers.RotatingFileHandler(
- '/tmp/copier_bot.log', maxBytes=1000000, backupCount = 3)
- )
- bot = BenderFactory(configname)
- bot.parser = copier_bot_parser()
- bot.logon()
- bot.eventLoop()
- logging.shutdown()
- return 0
+ def startCopy(self, *args):
+ """
+ start our copy
+ """
+ logging.info("starting copy scan")
+ started = self.rsync.copy()
+ logging.info("copying:" + " ".join(started)+".")
+ return started
+
+ def runFinished(self, runDir, *args):
+ """
+ The run was finished, if we're done copying, pass the message on
+ """
+ # close any open processes
+ self.rsync.poll()
+
+ # see if we're still copying
+ if runfolder_validate(runDir):
+ if runDir in self.rsync.keys():
+ # still copying
+ self.pending.append(runDir)
+ logging.info("%s finished, but still copying" % (runDir))
+ return "PENDING"
+ else:
+ # we're done
+ self.reportRunFinished(runDir)
+ logging.info("%s finished" % (runDir))
+ return "DONE"
+ else:
+ errmsg = "received bad runfolder name (%s)" % (runDir)
+ logging.warning(errmsg)
+ # maybe I should use a different error message
+ raise RuntimeError(errmsg)
+
+ def reportRunFinished(self, runDir):
+ """
+ Send the runFinished message to the interested parties
+ """
+ if self.notify_users is not None:
+ for u in self.notify_users:
+ self.send(u, 'run %s finished' % (runDir))
+ if self.notify_runner is not None:
+ for r in self.notify_runner:
+ rpc.send(self.cl, self.runner, (runDir,), 'runFinished')
+ logging.info("forwarding runFinshed message for %s" % (runDir))
+
+ def update(self, *args):
+ """
+ Update our current status.
+ Report if we've finished copying files.
+ """
+ self.rsync.poll()
+ for p in self.pending:
+ if p not in self.rsync.keys():
+ self.reportRunFinished(p)
+ self.pending.remove(p)
+
+ def _parser(self, msg, who):
+ """
+ Parse xmpp chat messages
+ """
+ help = u"I can [copy], or report current [status]"
+ if re.match(u"help", msg):
+ reply = help
+ elif re.match("copy", msg):
+ started = self.startCopy()
+ reply = u"started copying " + ", ".join(started)
+ elif re.match(u"status", msg):
+ msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
+ for d in self.rsync.keys():
+ msg.append(u" " + d)
+ reply = os.linesep.join(msg)
+ else:
+ reply = u"I didn't understand '%s'"+os.linesep+help % (unicode(msg))
+ return reply
+def main(args=None):
+ bot = CopierBot()
+ bot.main(args)
+
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))