4 import logging.handlers
13 from benderjab import rpc
15 def runfolder_validate(fname):
17 Return True if fname looks like a runfolder name
19 if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
25 def __init__(self, sources, dest, pwfile):
26 self.cmd = ['/usr/bin/rsync', ]
27 self.pwfile = os.path.expanduser(pwfile)
28 self.cmd.append('--password-file=%s' % (self.pwfile))
29 self.source_base_list = [ self._normalize_rsync_source(x) for x in sources]
36 Get a directory listing for all our sources
38 logging.debug("searching for entries in: %s" % (self.source_base_list,))
40 for source in self.source_base_list:
41 logging.debug("Scanning %s" % (source,))
42 args = copy.copy(self.cmd)
45 logging.debug("Rsync cmd:" + " ".join(args))
46 short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
47 exit_code = short_process.wait()
48 stdout = short_process.stdout
49 # We made sure source ends in a / earlier
50 cur_list = [ source+subdir for subdir in self.list_filter(stdout)]
51 entries.extend(cur_list)
52 logging.debug(u"Found the following: %s" % (unicode(entries)))
55 def list_filter(self, lines):
57 parse rsync directory listing
60 direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ]
61 logging.debug(u'direntries: %s' % (unicode(direntries),))
62 for permissions, size, filedate, filetime, filename in direntries:
63 if permissions[0] == 'd':
64 # hey its a directory, the first step to being something we want to
66 if re.match("[0-9]{6}", filename):
67 # it starts with something that looks like a 6 digit date
68 # aka good enough for me
69 dirs_to_copy.append(filename)
72 def create_copy_process(self, urlname):
73 args = copy.copy(self.cmd)
74 # args.append('--dry-run') # Makes testing easier
75 # we want to copy everything
80 args.append(self.dest_base)
81 logging.debug("Rsync cmd:" + " ".join(args))
82 return subprocess.Popen(args)
86 copy any interesting looking directories over
87 return list of items that we started copying.
89 # clean up any lingering non-running processes
92 # what's available to copy?
93 dirs_to_copy = self.list()
97 for d in dirs_to_copy:
98 process = self.processes.get(d, None)
101 # we don't have a process, so make one
102 logging.info("rsyncing %s" % (d))
103 self.processes[d] = self.create_copy_process(d)
107 def _normalize_rsync_source(self, source):
109 Make sure that we have a reasonable looking source
110 a source must be a directory/collection.
112 # we must be a directory
113 if source[-1] != '/':
115 # I suppose we could check to see if we start with rsync:// or something
120 check currently running processes to see if they're done
122 return path roots that have finished.
124 for dir_key, proc_value in self.processes.items():
125 retcode = proc_value.poll()
127 # process hasn't finished yet
130 logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
131 del self.processes[dir_key]
133 logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
137 Return how many active rsync processes we currently have
139 Call poll first to close finished processes.
141 return len(self.processes)
145 Return list of current run folder names
147 return self.processes.keys()
149 class CopierBot(rpc.XmlRpcBot):
150 def __init__(self, section=None, configfile=None):
151 #if configfile is None:
152 # configfile = '~/.htsworkflow'
154 super(CopierBot, self).__init__(section, configfile)
156 # options for rsync command
157 self.cfg['rsync_password_file'] = None
158 self.cfg['rsync_source'] = None
159 self.cfg['rsync_destination'] = None
161 # options for reporting we're done
162 self.cfg['notify_users'] = None
163 self.cfg['notify_runner'] = None
167 self.notify_users = None
168 self.notify_runner = None
170 self.register_function(self.startCopy)
171 self.register_function(self.sequencingFinished)
172 self.eventTasks.append(self.update)
174 def read_config(self, section=None, configfile=None):
178 super(CopierBot, self).read_config(section, configfile)
180 self.sources = shlex.split(self._check_required_option('rsync_sources'))
181 self.password = self._check_required_option('rsync_password_file')
182 self.destination = self._check_required_option('rsync_destination')
184 self.notify_users = self._parse_user_list(self.cfg['notify_users'])
186 self.notify_runner = \
187 self._parse_user_list(self.cfg['notify_runner'],
188 require_resource=True)
189 except bot.JIDMissingResource:
190 msg = 'need a full jabber ID + resource for xml-rpc destinations'
191 print >>sys.stderr, msg
192 raise bot.JIDMissingResource(msg)
198 # we can't call any logging function until after start finishes.
199 if self.rsync is None:
200 self.rsync = rsync(self.sources, self.destination, self.password)
201 super(CopierBot, self).run()
203 def startCopy(self, *args):
207 logging.info("starting copy scan, %s" % (args,))
208 started = self.rsync.copy()
209 logging.info("copying:" + " ".join(started)+".")
212 def sequencingFinished(self, runDir, *args):
214 The run was finished, if we're done copying, pass the message on
216 # close any open processes
219 # see if we're still copying
220 if runfolder_validate(runDir):
221 logging.info("recevied sequencing finshed for %s" % (runDir))
222 self.pending.append(runDir)
226 errmsg = "received bad runfolder name (%s)" % (runDir)
227 logging.warning(errmsg)
228 # maybe I should use a different error message
229 raise RuntimeError(errmsg)
231 def reportSequencingFinished(self, runDir):
233 Send the sequencingFinished message to the interested parties
235 if self.notify_users is not None:
236 for u in self.notify_users:
237 self.send(u, 'Sequencing run %s finished' % (runDir))
238 if self.notify_runner is not None:
239 for r in self.notify_runner:
240 self.rpc_send(r, (runDir,), 'sequencingFinished')
241 logging.info("forwarding sequencingFinshed message for %s" % (runDir))
243 def update(self, *args):
245 Update our current status.
246 Report if we've finished copying files.
249 for p in self.pending:
250 if p not in self.rsync.keys():
251 self.reportSequencingFinished(p)
252 self.pending.remove(p)
254 def _parser(self, msg, who):
256 Parse xmpp chat messages
258 help = u"I can [copy], or report current [status]"
259 if re.match(u"help", msg):
261 elif re.match("copy", msg):
262 started = self.startCopy()
263 reply = u"started copying " + ", ".join(started)
264 elif re.match(u"status", msg):
265 msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
266 for d in self.rsync.keys():
268 reply = os.linesep.join(msg)
270 reply = u"I didn't understand '%s'" % (unicode(msg))
277 if __name__ == "__main__":
278 sys.exit(main(sys.argv[1:]))