from benderjab import rpc
from htsworkflow.automation.solexa import is_runfolder
-
+
+LOGGER = logging.getLogger(__name__)
+
class rsync(object):
def __init__(self, sources, dest, pwfile):
self.cmd = ['/usr/bin/rsync', ]
"""
Get a directory listing for all our sources
"""
- logging.debug("searching for entries in: %s" % (self.source_base_list,))
+ LOGGER.debug("searching for entries in: %s" % (self.source_base_list,))
entries = []
for source in self.source_base_list:
- logging.debug("Scanning %s" % (source,))
+ LOGGER.debug("Scanning %s" % (source,))
args = copy.copy(self.cmd)
args.append(source)
- logging.debug("Rsync cmd:" + " ".join(args))
+ LOGGER.debug("Rsync cmd:" + " ".join(args))
short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
exit_code = short_process.wait()
stdout = short_process.stdout
# We made sure source ends in a / earlier
cur_list = [ source+subdir for subdir in self.list_filter(stdout)]
entries.extend(cur_list)
- logging.debug(u"Found the following: %s" % (unicode(entries)))
+ LOGGER.debug(u"Found the following: %s" % (unicode(entries)))
return entries
def list_filter(self, lines):
"""
dirs_to_copy = []
direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ]
- logging.debug(u'direntries: %s' % (unicode(direntries),))
+ LOGGER.debug(u'direntries: %s' % (unicode(direntries),))
for permissions, size, filedate, filetime, filename in direntries:
if permissions[0] == 'd':
- # hey its a directory, the first step to being something we want to
+ # hey its a directory, the first step to being something we want to
# copy
if re.match("[0-9]{6}", filename):
# it starts with something that looks like a 6 digit date
args = copy.copy(self.cmd)
# args.append('--dry-run') # Makes testing easier
# we want to copy everything
- args.append('-rlt')
+ args.append('-rlt')
# from here
args.append(urlname)
# to here
args.append(self.dest_base)
- logging.debug("Rsync cmd:" + " ".join(args))
+ LOGGER.debug("Rsync cmd:" + " ".join(args))
return subprocess.Popen(args)
-
+
def copy(self, url_list=None):
"""
copy any interesting looking directories over
# clean up any lingering non-running processes
self.poll()
- if url_list is None or len(url_list) == 0:
+ if url_list is None or len(url_list) == 0:
# what's available to copy?
dirs_to_copy = self.list()
else:
dirs_to_copy = url_list
-
- logging.info("dirs to copy %s" % (dirs_to_copy,))
+
+ LOGGER.info("dirs to copy %s" % (dirs_to_copy,))
# lets start copying
started = []
for d in dirs_to_copy:
process = self.processes.get(d, None)
-
+
if process is None:
# we don't have a process, so make one
- logging.info("rsyncing %s" % (d))
+ LOGGER.info("rsyncing %s" % (d))
self.processes[d] = self.create_copy_process(d)
- started.append(d)
+ started.append(d)
return started
def _normalize_rsync_source(self, source):
def poll(self):
"""
check currently running processes to see if they're done
-
+
return path roots that have finished.
"""
for dir_key, proc_value in self.processes.items():
# process hasn't finished yet
pass
elif retcode == 0:
- logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
+ LOGGER.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
del self.processes[dir_key]
else:
- logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
-
+ LOGGER.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
+
def __len__(self):
"""
Return how many active rsync processes we currently have
-
+
Call poll first to close finished processes.
"""
return len(self.processes)
-
+
def keys(self):
"""
Return list of current run folder names
def __init__(self, section=None, configfile=None):
#if configfile is None:
# configfile = '~/.htsworkflow'
-
+
super(CopierBot, self).__init__(section, configfile)
-
+
# options for rsync command
self.cfg['rsync_password_file'] = None
self.cfg['rsync_sources'] = None
- self.cfg['rsync_destination'] = None
-
- # options for reporting we're done
+ self.cfg['rsync_destination'] = None
+
+ # options for reporting we're done
self.cfg['notify_users'] = None
self.cfg['notify_runner'] = None
-
+
self.pending = []
self.rsync = None
self.notify_users = None
self.notify_runner = None
-
+
self.register_function(self.startCopy)
self.register_function(self.sequencingFinished)
self.eventTasks.append(self.update)
-
+
def _init_rsync(self):
"""
Initalize rsync class
This is only accessible for test purposes.
"""
- # we can't call any logging function until after start finishes.
+ # we can't call any LOGGER function until after start finishes.
# this got moved to a seperate function from run to help with test code
if self.rsync is None:
self.rsync = rsync(self.sources, self.destination, self.password)
read the config file
"""
super(CopierBot, self).read_config(section, configfile)
-
+
self.sources = shlex.split(self._check_required_option('rsync_sources'))
self.password = self._check_required_option('rsync_password_file')
self.destination = self._check_required_option('rsync_destination')
-
+
self.notify_users = self._parse_user_list(self.cfg['notify_users'])
try:
self.notify_runner = \
start our copy
"""
# Note, args comes in over the network, so don't trust it.
- logging.debug("Arguments to startCopy %s" % (unicode(args),))
+ LOGGER.debug("Arguments to startCopy %s" % (unicode(args),))
copy_urls = []
for a in args:
clean_url = self.validate_url(a)
if clean_url is not None:
copy_urls.append(clean_url)
- logging.info("Validated urls = %s" % (copy_urls,))
+ LOGGER.info("Validated urls = %s" % (copy_urls,))
started = self.rsync.copy(copy_urls)
- logging.info("copying:" + " ".join(started)+".")
+ LOGGER.info("copying:" + " ".join(started)+".")
return started
-
+
def sequencingFinished(self, runDir, *args):
"""
- The run was finished, if we're done copying, pass the message on
+ The run was finished, if we're done copying, pass the message on
"""
# close any open processes
self.rsync.poll()
-
+
# see if we're still copying
if is_runfolder(runDir):
- logging.info("recevied sequencing finshed for %s" % (runDir))
+ LOGGER.info("recevied sequencing finshed for %s" % (runDir))
self.pending.append(runDir)
self.startCopy()
return "PENDING"
else:
errmsg = "received bad runfolder name (%s)" % (runDir)
- logging.warning(errmsg)
+ LOGGER.warning(errmsg)
# maybe I should use a different error message
raise RuntimeError(errmsg)
-
+
def reportSequencingFinished(self, runDir):
"""
Send the sequencingFinished message to the interested parties
if self.notify_runner is not None:
for r in self.notify_runner:
self.rpc_send(r, (runDir,), 'sequencingFinished')
- logging.info("forwarding sequencingFinshed message for %s" % (runDir))
-
+ LOGGER.info("forwarding sequencingFinshed message for %s" % (runDir))
+
def update(self, *args):
"""
Update our current status.
if p not in self.rsync.keys():
self.reportSequencingFinished(p)
self.pending.remove(p)
-
+
def _parser(self, msg, who):
"""
Parse xmpp chat messages
help = u"I can [copy], or report current [status]"
if re.match(u"help", msg):
reply = help
- elif re.match("copy", msg):
+ elif re.match("copy", msg):
started = self.startCopy()
reply = u"started copying " + ", ".join(started)
elif re.match(u"status", msg):
def main(args=None):
bot = CopierBot()
bot.main(args)
-
+
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))