4 import logging.handlers
14 from benderjab import rpc
16 def runfolder_validate(fname):
18 Return True if fname looks like a runfolder name
20 if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
26 def __init__(self, sources, dest, pwfile):
27 self.cmd = ['/usr/bin/rsync', ]
28 self.pwfile = os.path.expanduser(pwfile)
29 self.cmd.append('--password-file=%s' % (self.pwfile))
30 self.source_base_list = [ self._normalize_rsync_source(x) for x in sources]
37 Get a directory listing for all our sources
39 logging.debug("searching for entries in: %s" % (self.source_base_list,))
41 for source in self.source_base_list:
42 logging.debug("Scanning %s" % (source,))
43 args = copy.copy(self.cmd)
46 logging.debug("Rsync cmd:" + " ".join(args))
47 short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
48 exit_code = short_process.wait()
49 stdout = short_process.stdout
50 # We made sure source ends in a / earlier
51 cur_list = [ source+subdir for subdir in self.list_filter(stdout)]
52 entries.extend(cur_list)
53 logging.debug(u"Found the following: %s" % (unicode(entries)))
56 def list_filter(self, lines):
58 parse rsync directory listing
61 direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ]
62 logging.debug(u'direntries: %s' % (unicode(direntries),))
63 for permissions, size, filedate, filetime, filename in direntries:
64 if permissions[0] == 'd':
65 # hey its a directory, the first step to being something we want to
67 if re.match("[0-9]{6}", filename):
68 # it starts with something that looks like a 6 digit date
69 # aka good enough for me
70 dirs_to_copy.append(filename)
73 def create_copy_process(self, urlname):
74 args = copy.copy(self.cmd)
75 # args.append('--dry-run') # Makes testing easier
76 # we want to copy everything
81 args.append(self.dest_base)
82 logging.debug("Rsync cmd:" + " ".join(args))
83 return subprocess.Popen(args)
85 def copy(self, url_list=None):
87 copy any interesting looking directories over
88 return list of items that we started copying.
90 # clean up any lingering non-running processes
93 if url_list is None or len(url_list) == 0:
94 # what's available to copy?
95 dirs_to_copy = self.list()
97 dirs_to_copy = url_list
99 logging.info("dirs to copy %s" % (dirs_to_copy,))
103 for d in dirs_to_copy:
104 process = self.processes.get(d, None)
107 # we don't have a process, so make one
108 logging.info("rsyncing %s" % (d))
109 self.processes[d] = self.create_copy_process(d)
113 def _normalize_rsync_source(self, source):
115 Make sure that we have a reasonable looking source
116 a source must be a directory/collection.
118 # we must be a directory
119 if source[-1] != '/':
121 # I suppose we could check to see if we start with rsync:// or something
126 check currently running processes to see if they're done
128 return path roots that have finished.
130 for dir_key, proc_value in self.processes.items():
131 retcode = proc_value.poll()
133 # process hasn't finished yet
136 logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
137 del self.processes[dir_key]
139 logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
143 Return how many active rsync processes we currently have
145 Call poll first to close finished processes.
147 return len(self.processes)
151 Return list of current run folder names
153 return self.processes.keys()
155 class CopierBot(rpc.XmlRpcBot):
156 def __init__(self, section=None, configfile=None):
157 #if configfile is None:
158 # configfile = '~/.htsworkflow'
160 super(CopierBot, self).__init__(section, configfile)
162 # options for rsync command
163 self.cfg['rsync_password_file'] = None
164 self.cfg['rsync_sources'] = None
165 self.cfg['rsync_destination'] = None
167 # options for reporting we're done
168 self.cfg['notify_users'] = None
169 self.cfg['notify_runner'] = None
173 self.notify_users = None
174 self.notify_runner = None
176 self.register_function(self.startCopy)
177 self.register_function(self.sequencingFinished)
178 self.eventTasks.append(self.update)
180 def _init_rsync(self):
182 Initalize rsync class
184 This is only accessible for test purposes.
186 # we can't call any logging function until after start finishes.
187 # this got moved to a seperate function from run to help with test code
188 if self.rsync is None:
189 self.rsync = rsync(self.sources, self.destination, self.password)
191 def read_config(self, section=None, configfile=None):
195 super(CopierBot, self).read_config(section, configfile)
197 self.sources = shlex.split(self._check_required_option('rsync_sources'))
198 self.password = self._check_required_option('rsync_password_file')
199 self.destination = self._check_required_option('rsync_destination')
201 self.notify_users = self._parse_user_list(self.cfg['notify_users'])
203 self.notify_runner = \
204 self._parse_user_list(self.cfg['notify_runner'],
205 require_resource=True)
206 except bot.JIDMissingResource:
207 msg = 'need a full jabber ID + resource for xml-rpc destinations'
208 print >>sys.stderr, msg
209 raise bot.JIDMissingResource(msg)
216 super(CopierBot, self).run()
218 def startCopy(self, *args):
222 # Note, args comes in over the network, so don't trust it.
225 clean_url = self.validate_url(a)
226 if clean_url is not None:
227 copy_urls.append(clean_url)
229 logging.info("Validated urls = %s" % (copy_urls,))
230 started = self.rsync.copy(copy_urls)
231 logging.info("copying:" + " ".join(started)+".")
234 def sequencingFinished(self, runDir, *args):
236 The run was finished, if we're done copying, pass the message on
238 # close any open processes
241 # see if we're still copying
242 if runfolder_validate(runDir):
243 logging.info("recevied sequencing finshed for %s" % (runDir))
244 self.pending.append(runDir)
248 errmsg = "received bad runfolder name (%s)" % (runDir)
249 logging.warning(errmsg)
250 # maybe I should use a different error message
251 raise RuntimeError(errmsg)
253 def reportSequencingFinished(self, runDir):
255 Send the sequencingFinished message to the interested parties
257 if self.notify_users is not None:
258 for u in self.notify_users:
259 self.send(u, 'Sequencing run %s finished' % (runDir))
260 if self.notify_runner is not None:
261 for r in self.notify_runner:
262 self.rpc_send(r, (runDir,), 'sequencingFinished')
263 logging.info("forwarding sequencingFinshed message for %s" % (runDir))
265 def update(self, *args):
267 Update our current status.
268 Report if we've finished copying files.
271 for p in self.pending:
272 if p not in self.rsync.keys():
273 self.reportSequencingFinished(p)
274 self.pending.remove(p)
276 def _parser(self, msg, who):
278 Parse xmpp chat messages
280 help = u"I can [copy], or report current [status]"
281 if re.match(u"help", msg):
283 elif re.match("copy", msg):
284 started = self.startCopy()
285 reply = u"started copying " + ", ".join(started)
286 elif re.match(u"status", msg):
287 msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
288 for d in self.rsync.keys():
290 reply = os.linesep.join(msg)
292 reply = u"I didn't understand '%s'" % (unicode(msg))
295 def validate_url(self, url):
296 split_url = urlparse.urlsplit(url)
297 for source in self.sources:
298 split_source = urlparse.urlsplit(source)
299 if (split_url.scheme == split_source.scheme) and \
300 (split_url.netloc == split_source.netloc) and \
301 (split_url.path.startswith(split_source.path)):
309 if __name__ == "__main__":
310 sys.exit(main(sys.argv[1:]))