4 import logging.handlers
14 from benderjab import rpc
16 def runfolder_validate(fname):
18 Return True if fname looks like a runfolder name
20 if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
26 def __init__(self, sources, dest, pwfile):
27 self.cmd = ['/usr/bin/rsync', ]
28 self.pwfile = os.path.expanduser(pwfile)
29 self.cmd.append('--password-file=%s' % (self.pwfile))
30 self.source_base_list = [ self._normalize_rsync_source(x) for x in sources]
37 Get a directory listing for all our sources
39 logging.debug("searching for entries in: %s" % (self.source_base_list,))
41 for source in self.source_base_list:
42 logging.debug("Scanning %s" % (source,))
43 args = copy.copy(self.cmd)
46 logging.debug("Rsync cmd:" + " ".join(args))
47 short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
48 exit_code = short_process.wait()
49 stdout = short_process.stdout
50 # We made sure source ends in a / earlier
51 cur_list = [ source+subdir for subdir in self.list_filter(stdout)]
52 entries.extend(cur_list)
53 logging.debug(u"Found the following: %s" % (unicode(entries)))
56 def list_filter(self, lines):
58 parse rsync directory listing
61 direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ]
62 logging.debug(u'direntries: %s' % (unicode(direntries),))
63 for permissions, size, filedate, filetime, filename in direntries:
64 if permissions[0] == 'd':
65 # hey its a directory, the first step to being something we want to
67 if re.match("[0-9]{6}", filename):
68 # it starts with something that looks like a 6 digit date
69 # aka good enough for me
70 dirs_to_copy.append(filename)
73 def create_copy_process(self, urlname):
74 args = copy.copy(self.cmd)
75 # args.append('--dry-run') # Makes testing easier
76 # we want to copy everything
81 args.append(self.dest_base)
82 logging.debug("Rsync cmd:" + " ".join(args))
83 return subprocess.Popen(args)
85 def copy(self, url_list=None):
87 copy any interesting looking directories over
88 return list of items that we started copying.
90 # clean up any lingering non-running processes
93 if url_list is None or len(url_list) == 0:
94 # what's available to copy?
95 dirs_to_copy = self.list()
97 dirs_to_copy = url_list
99 logging.info("dirs to copy %s" % (dirs_to_copy,))
103 for d in dirs_to_copy:
104 process = self.processes.get(d, None)
107 # we don't have a process, so make one
108 logging.info("rsyncing %s" % (d))
109 self.processes[d] = self.create_copy_process(d)
113 def _normalize_rsync_source(self, source):
115 Make sure that we have a reasonable looking source
116 a source must be a directory/collection.
118 # we must be a directory
119 if source[-1] != '/':
121 # I suppose we could check to see if we start with rsync:// or something
126 check currently running processes to see if they're done
128 return path roots that have finished.
130 for dir_key, proc_value in self.processes.items():
131 retcode = proc_value.poll()
133 # process hasn't finished yet
136 logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
137 del self.processes[dir_key]
139 logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
143 Return how many active rsync processes we currently have
145 Call poll first to close finished processes.
147 return len(self.processes)
151 Return list of current run folder names
153 return self.processes.keys()
155 class CopierBot(rpc.XmlRpcBot):
156 def __init__(self, section=None, configfile=None):
157 #if configfile is None:
158 # configfile = '~/.htsworkflow'
160 super(CopierBot, self).__init__(section, configfile)
162 # options for rsync command
163 self.cfg['rsync_password_file'] = None
164 self.cfg['rsync_sources'] = None
165 self.cfg['rsync_destination'] = None
167 # options for reporting we're done
168 self.cfg['notify_users'] = None
169 self.cfg['notify_runner'] = None
173 self.notify_users = None
174 self.notify_runner = None
176 self.register_function(self.startCopy)
177 self.register_function(self.sequencingFinished)
178 self.eventTasks.append(self.update)
180 def _init_rsync(self):
182 Initalize rsync class
184 This is only accessible for test purposes.
186 # we can't call any logging function until after start finishes.
187 # this got moved to a seperate function from run to help with test code
188 if self.rsync is None:
189 self.rsync = rsync(self.sources, self.destination, self.password)
191 def read_config(self, section=None, configfile=None):
195 super(CopierBot, self).read_config(section, configfile)
197 self.sources = shlex.split(self._check_required_option('rsync_sources'))
198 self.password = self._check_required_option('rsync_password_file')
199 self.destination = self._check_required_option('rsync_destination')
201 self.notify_users = self._parse_user_list(self.cfg['notify_users'])
203 self.notify_runner = \
204 self._parse_user_list(self.cfg['notify_runner'],
205 require_resource=True)
206 except bot.JIDMissingResource:
207 msg = 'need a full jabber ID + resource for xml-rpc destinations'
208 print >>sys.stderr, msg
209 raise bot.JIDMissingResource(msg)
216 super(CopierBot, self).run()
218 def startCopy(self, *args):
222 # Note, args comes in over the network, so don't trust it.
223 logging.debug("Arguments to startCopy %s" % (unicode(args),))
226 clean_url = self.validate_url(a)
227 if clean_url is not None:
228 copy_urls.append(clean_url)
230 logging.info("Validated urls = %s" % (copy_urls,))
231 started = self.rsync.copy(copy_urls)
232 logging.info("copying:" + " ".join(started)+".")
235 def sequencingFinished(self, runDir, *args):
237 The run was finished, if we're done copying, pass the message on
239 # close any open processes
242 # see if we're still copying
243 if runfolder_validate(runDir):
244 logging.info("recevied sequencing finshed for %s" % (runDir))
245 self.pending.append(runDir)
249 errmsg = "received bad runfolder name (%s)" % (runDir)
250 logging.warning(errmsg)
251 # maybe I should use a different error message
252 raise RuntimeError(errmsg)
254 def reportSequencingFinished(self, runDir):
256 Send the sequencingFinished message to the interested parties
258 if self.notify_users is not None:
259 for u in self.notify_users:
260 self.send(u, 'Sequencing run %s finished' % (runDir))
261 if self.notify_runner is not None:
262 for r in self.notify_runner:
263 self.rpc_send(r, (runDir,), 'sequencingFinished')
264 logging.info("forwarding sequencingFinshed message for %s" % (runDir))
266 def update(self, *args):
268 Update our current status.
269 Report if we've finished copying files.
272 for p in self.pending:
273 if p not in self.rsync.keys():
274 self.reportSequencingFinished(p)
275 self.pending.remove(p)
277 def _parser(self, msg, who):
279 Parse xmpp chat messages
281 help = u"I can [copy], or report current [status]"
282 if re.match(u"help", msg):
284 elif re.match("copy", msg):
285 started = self.startCopy()
286 reply = u"started copying " + ", ".join(started)
287 elif re.match(u"status", msg):
288 msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
289 for d in self.rsync.keys():
291 reply = os.linesep.join(msg)
293 reply = u"I didn't understand '%s'" % (unicode(msg))
296 def validate_url(self, url):
297 user_url = urlparse.urlsplit(url)
298 user_scheme = user_url[0]
299 user_netloc = user_url[1]
300 user_path = user_url[2]
302 for source in self.sources:
303 source_url = urlparse.urlsplit(source)
304 source_scheme = source_url[0]
305 source_netloc = source_url[1]
306 source_path = source_url[2]
307 if (user_scheme == source_scheme) and \
308 (user_netloc == source_netloc) and \
309 (user_path.startswith(source_path)):
317 if __name__ == "__main__":
318 sys.exit(main(sys.argv[1:]))