4 import logging.handlers
14 from benderjab import rpc
16 from htsworkflow.automation.solexa import is_runfolder
18 LOGGER = logging.getLogger(__name__)
21 def __init__(self, sources, dest, pwfile):
22 self.cmd = ['/usr/bin/rsync', ]
23 self.pwfile = os.path.expanduser(pwfile)
24 self.cmd.append('--password-file=%s' % (self.pwfile))
25 self.source_base_list = [ self._normalize_rsync_source(x) for x in sources]
32 Get a directory listing for all our sources
34 LOGGER.debug("searching for entries in: %s" % (self.source_base_list,))
36 for source in self.source_base_list:
37 LOGGER.debug("Scanning %s" % (source,))
38 args = copy.copy(self.cmd)
41 LOGGER.debug("Rsync cmd:" + " ".join(args))
42 short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
43 exit_code = short_process.wait()
44 stdout = short_process.stdout
45 # We made sure source ends in a / earlier
46 cur_list = [ source+subdir for subdir in self.list_filter(stdout)]
47 entries.extend(cur_list)
48 LOGGER.debug("Found the following: %s" % (str(entries)))
51 def list_filter(self, lines):
53 parse rsync directory listing
56 direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ]
57 LOGGER.debug('direntries: %s' % (str(direntries),))
58 for permissions, size, filedate, filetime, filename in direntries:
59 if permissions[0] == 'd':
60 # hey its a directory, the first step to being something we want to
62 if re.match("[0-9]{6}", filename):
63 # it starts with something that looks like a 6 digit date
64 # aka good enough for me
65 dirs_to_copy.append(filename)
68 def create_copy_process(self, urlname):
69 args = copy.copy(self.cmd)
70 # args.append('--dry-run') # Makes testing easier
71 # we want to copy everything
76 args.append(self.dest_base)
77 LOGGER.debug("Rsync cmd:" + " ".join(args))
78 return subprocess.Popen(args)
80 def copy(self, url_list=None):
82 copy any interesting looking directories over
83 return list of items that we started copying.
85 # clean up any lingering non-running processes
88 if url_list is None or len(url_list) == 0:
89 # what's available to copy?
90 dirs_to_copy = self.list()
92 dirs_to_copy = url_list
94 LOGGER.info("dirs to copy %s" % (dirs_to_copy,))
98 for d in dirs_to_copy:
99 process = self.processes.get(d, None)
102 # we don't have a process, so make one
103 LOGGER.info("rsyncing %s" % (d))
104 self.processes[d] = self.create_copy_process(d)
108 def _normalize_rsync_source(self, source):
110 Make sure that we have a reasonable looking source
111 a source must be a directory/collection.
113 # we must be a directory
114 if source[-1] != '/':
116 # I suppose we could check to see if we start with rsync:// or something
121 check currently running processes to see if they're done
123 return path roots that have finished.
125 for dir_key, proc_value in list(self.processes.items()):
126 retcode = proc_value.poll()
128 # process hasn't finished yet
131 LOGGER.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
132 del self.processes[dir_key]
134 LOGGER.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
138 Return how many active rsync processes we currently have
140 Call poll first to close finished processes.
142 return len(self.processes)
146 Return list of current run folder names
148 return list(self.processes.keys())
150 class CopierBot(rpc.XmlRpcBot):
151 def __init__(self, section=None, configfile=None):
152 #if configfile is None:
153 # configfile = '~/.htsworkflow'
155 super(CopierBot, self).__init__(section, configfile)
157 # options for rsync command
158 self.cfg['rsync_password_file'] = None
159 self.cfg['rsync_sources'] = None
160 self.cfg['rsync_destination'] = None
162 # options for reporting we're done
163 self.cfg['notify_users'] = None
164 self.cfg['notify_runner'] = None
168 self.notify_users = None
169 self.notify_runner = None
171 self.register_function(self.startCopy)
172 self.register_function(self.sequencingFinished)
173 self.eventTasks.append(self.update)
175 def _init_rsync(self):
177 Initalize rsync class
179 This is only accessible for test purposes.
181 # we can't call any LOGGER function until after start finishes.
182 # this got moved to a seperate function from run to help with test code
183 if self.rsync is None:
184 self.rsync = rsync(self.sources, self.destination, self.password)
186 def read_config(self, section=None, configfile=None):
190 super(CopierBot, self).read_config(section, configfile)
192 self.sources = shlex.split(self._check_required_option('rsync_sources'))
193 self.password = self._check_required_option('rsync_password_file')
194 self.destination = self._check_required_option('rsync_destination')
196 self.notify_users = self._parse_user_list(self.cfg['notify_users'])
198 self.notify_runner = \
199 self._parse_user_list(self.cfg['notify_runner'],
200 require_resource=True)
201 except bot.JIDMissingResource:
202 msg = 'need a full jabber ID + resource for xml-rpc destinations'
203 print(msg, file=sys.stderr)
204 raise bot.JIDMissingResource(msg)
211 super(CopierBot, self).run()
213 def startCopy(self, *args):
217 # Note, args comes in over the network, so don't trust it.
218 LOGGER.debug("Arguments to startCopy %s" % (str(args),))
221 clean_url = self.validate_url(a)
222 if clean_url is not None:
223 copy_urls.append(clean_url)
225 LOGGER.info("Validated urls = %s" % (copy_urls,))
226 started = self.rsync.copy(copy_urls)
227 LOGGER.info("copying:" + " ".join(started)+".")
230 def sequencingFinished(self, runDir, *args):
232 The run was finished, if we're done copying, pass the message on
234 # close any open processes
237 # see if we're still copying
238 if is_runfolder(runDir):
239 LOGGER.info("recevied sequencing finshed for %s" % (runDir))
240 self.pending.append(runDir)
244 errmsg = "received bad runfolder name (%s)" % (runDir)
245 LOGGER.warning(errmsg)
246 # maybe I should use a different error message
247 raise RuntimeError(errmsg)
249 def reportSequencingFinished(self, runDir):
251 Send the sequencingFinished message to the interested parties
253 if self.notify_users is not None:
254 for u in self.notify_users:
255 self.send(u, 'Sequencing run %s finished' % (runDir))
256 if self.notify_runner is not None:
257 for r in self.notify_runner:
258 self.rpc_send(r, (runDir,), 'sequencingFinished')
259 LOGGER.info("forwarding sequencingFinshed message for %s" % (runDir))
261 def update(self, *args):
263 Update our current status.
264 Report if we've finished copying files.
267 for p in self.pending:
268 if p not in list(self.rsync.keys()):
269 self.reportSequencingFinished(p)
270 self.pending.remove(p)
272 def _parser(self, msg, who):
274 Parse xmpp chat messages
276 help = "I can [copy], or report current [status]"
277 if re.match("help", msg):
279 elif re.match("copy", msg):
280 started = self.startCopy()
281 reply = "started copying " + ", ".join(started)
282 elif re.match("status", msg):
283 msg = ["Currently %d rsync processes are running." % (len(self.rsync))]
284 for d in list(self.rsync.keys()):
286 reply = os.linesep.join(msg)
288 reply = "I didn't understand '%s'" % (str(msg))
291 def validate_url(self, url):
292 user_url = urllib.parse.urlsplit(url)
293 user_scheme = user_url[0]
294 user_netloc = user_url[1]
295 user_path = user_url[2]
297 for source in self.sources:
298 source_url = urllib.parse.urlsplit(source)
299 source_scheme = source_url[0]
300 source_netloc = source_url[1]
301 source_path = source_url[2]
302 if (user_scheme == source_scheme) and \
303 (user_netloc == source_netloc) and \
304 (user_path.startswith(source_path)):
312 if __name__ == "__main__":
313 sys.exit(main(sys.argv[1:]))