4 import logging.handlers
14 from benderjab import rpc
16 from htsworkflow.automation.solexa import is_runfolder
19 def __init__(self, sources, dest, pwfile):
20 self.cmd = ['/usr/bin/rsync', ]
21 self.pwfile = os.path.expanduser(pwfile)
22 self.cmd.append('--password-file=%s' % (self.pwfile))
23 self.source_base_list = [ self._normalize_rsync_source(x) for x in sources]
30 Get a directory listing for all our sources
32 logging.debug("searching for entries in: %s" % (self.source_base_list,))
34 for source in self.source_base_list:
35 logging.debug("Scanning %s" % (source,))
36 args = copy.copy(self.cmd)
39 logging.debug("Rsync cmd:" + " ".join(args))
40 short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
41 exit_code = short_process.wait()
42 stdout = short_process.stdout
43 # We made sure source ends in a / earlier
44 cur_list = [ source+subdir for subdir in self.list_filter(stdout)]
45 entries.extend(cur_list)
46 logging.debug(u"Found the following: %s" % (unicode(entries)))
49 def list_filter(self, lines):
51 parse rsync directory listing
54 direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ]
55 logging.debug(u'direntries: %s' % (unicode(direntries),))
56 for permissions, size, filedate, filetime, filename in direntries:
57 if permissions[0] == 'd':
58 # hey its a directory, the first step to being something we want to
60 if re.match("[0-9]{6}", filename):
61 # it starts with something that looks like a 6 digit date
62 # aka good enough for me
63 dirs_to_copy.append(filename)
66 def create_copy_process(self, urlname):
67 args = copy.copy(self.cmd)
68 # args.append('--dry-run') # Makes testing easier
69 # we want to copy everything
74 args.append(self.dest_base)
75 logging.debug("Rsync cmd:" + " ".join(args))
76 return subprocess.Popen(args)
78 def copy(self, url_list=None):
80 copy any interesting looking directories over
81 return list of items that we started copying.
83 # clean up any lingering non-running processes
86 if url_list is None or len(url_list) == 0:
87 # what's available to copy?
88 dirs_to_copy = self.list()
90 dirs_to_copy = url_list
92 logging.info("dirs to copy %s" % (dirs_to_copy,))
96 for d in dirs_to_copy:
97 process = self.processes.get(d, None)
100 # we don't have a process, so make one
101 logging.info("rsyncing %s" % (d))
102 self.processes[d] = self.create_copy_process(d)
106 def _normalize_rsync_source(self, source):
108 Make sure that we have a reasonable looking source
109 a source must be a directory/collection.
111 # we must be a directory
112 if source[-1] != '/':
114 # I suppose we could check to see if we start with rsync:// or something
119 check currently running processes to see if they're done
121 return path roots that have finished.
123 for dir_key, proc_value in self.processes.items():
124 retcode = proc_value.poll()
126 # process hasn't finished yet
129 logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
130 del self.processes[dir_key]
132 logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
136 Return how many active rsync processes we currently have
138 Call poll first to close finished processes.
140 return len(self.processes)
144 Return list of current run folder names
146 return self.processes.keys()
148 class CopierBot(rpc.XmlRpcBot):
149 def __init__(self, section=None, configfile=None):
150 #if configfile is None:
151 # configfile = '~/.htsworkflow'
153 super(CopierBot, self).__init__(section, configfile)
155 # options for rsync command
156 self.cfg['rsync_password_file'] = None
157 self.cfg['rsync_sources'] = None
158 self.cfg['rsync_destination'] = None
160 # options for reporting we're done
161 self.cfg['notify_users'] = None
162 self.cfg['notify_runner'] = None
166 self.notify_users = None
167 self.notify_runner = None
169 self.register_function(self.startCopy)
170 self.register_function(self.sequencingFinished)
171 self.eventTasks.append(self.update)
173 def _init_rsync(self):
175 Initalize rsync class
177 This is only accessible for test purposes.
179 # we can't call any logging function until after start finishes.
180 # this got moved to a seperate function from run to help with test code
181 if self.rsync is None:
182 self.rsync = rsync(self.sources, self.destination, self.password)
184 def read_config(self, section=None, configfile=None):
188 super(CopierBot, self).read_config(section, configfile)
190 self.sources = shlex.split(self._check_required_option('rsync_sources'))
191 self.password = self._check_required_option('rsync_password_file')
192 self.destination = self._check_required_option('rsync_destination')
194 self.notify_users = self._parse_user_list(self.cfg['notify_users'])
196 self.notify_runner = \
197 self._parse_user_list(self.cfg['notify_runner'],
198 require_resource=True)
199 except bot.JIDMissingResource:
200 msg = 'need a full jabber ID + resource for xml-rpc destinations'
201 print >>sys.stderr, msg
202 raise bot.JIDMissingResource(msg)
209 super(CopierBot, self).run()
211 def startCopy(self, *args):
215 # Note, args comes in over the network, so don't trust it.
216 logging.debug("Arguments to startCopy %s" % (unicode(args),))
219 clean_url = self.validate_url(a)
220 if clean_url is not None:
221 copy_urls.append(clean_url)
223 logging.info("Validated urls = %s" % (copy_urls,))
224 started = self.rsync.copy(copy_urls)
225 logging.info("copying:" + " ".join(started)+".")
228 def sequencingFinished(self, runDir, *args):
230 The run was finished, if we're done copying, pass the message on
232 # close any open processes
235 # see if we're still copying
236 if is_runfolder(runDir):
237 logging.info("recevied sequencing finshed for %s" % (runDir))
238 self.pending.append(runDir)
242 errmsg = "received bad runfolder name (%s)" % (runDir)
243 logging.warning(errmsg)
244 # maybe I should use a different error message
245 raise RuntimeError(errmsg)
247 def reportSequencingFinished(self, runDir):
249 Send the sequencingFinished message to the interested parties
251 if self.notify_users is not None:
252 for u in self.notify_users:
253 self.send(u, 'Sequencing run %s finished' % (runDir))
254 if self.notify_runner is not None:
255 for r in self.notify_runner:
256 self.rpc_send(r, (runDir,), 'sequencingFinished')
257 logging.info("forwarding sequencingFinshed message for %s" % (runDir))
259 def update(self, *args):
261 Update our current status.
262 Report if we've finished copying files.
265 for p in self.pending:
266 if p not in self.rsync.keys():
267 self.reportSequencingFinished(p)
268 self.pending.remove(p)
270 def _parser(self, msg, who):
272 Parse xmpp chat messages
274 help = u"I can [copy], or report current [status]"
275 if re.match(u"help", msg):
277 elif re.match("copy", msg):
278 started = self.startCopy()
279 reply = u"started copying " + ", ".join(started)
280 elif re.match(u"status", msg):
281 msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
282 for d in self.rsync.keys():
284 reply = os.linesep.join(msg)
286 reply = u"I didn't understand '%s'" % (unicode(msg))
289 def validate_url(self, url):
290 user_url = urlparse.urlsplit(url)
291 user_scheme = user_url[0]
292 user_netloc = user_url[1]
293 user_path = user_url[2]
295 for source in self.sources:
296 source_url = urlparse.urlsplit(source)
297 source_scheme = source_url[0]
298 source_netloc = source_url[1]
299 source_path = source_url[2]
300 if (user_scheme == source_scheme) and \
301 (user_netloc == source_netloc) and \
302 (user_path.startswith(source_path)):
310 if __name__ == "__main__":
311 sys.exit(main(sys.argv[1:]))