4 import logging.handlers
13 from benderjab import rpc
15 def runfolder_validate(fname):
17 Return True if fname looks like a runfolder name
19 if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
25 def __init__(self, sources, dest, pwfile):
26 self.cmd = ['/usr/bin/rsync', ]
27 self.pwfile = os.path.expanduser(pwfile)
28 self.cmd.append('--password-file=%s' % (self.pwfile))
29 self.source_base_list = [ self._normalize_rsync_source(x) for x in sources]
36 Get a directory listing for all our sources
38 logging.debug("searching for entries in: %s" % (self.source_base_list,))
40 for source in self.source_base_list:
41 logging.debug("Scanning %s" % (source,))
42 args = copy.copy(self.cmd)
45 logging.debug("Rsync cmd:" + " ".join(args))
46 short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
47 exit_code = short_process.wait()
48 stdout = short_process.stdout
49 # We made sure source ends in a / earlier
50 cur_list = [ source+subdir for subdir in self.list_filter(stdout)]
51 entries.extend(cur_list)
52 logging.debug(u"Found the following: %s" % (unicode(entries)))
55 def list_filter(self, lines):
57 parse rsync directory listing
60 direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ]
61 logging.debug(u'direntries: %s' % (unicode(direntries),))
62 for permissions, size, filedate, filetime, filename in direntries:
63 if permissions[0] == 'd':
64 # hey its a directory, the first step to being something we want to
66 if re.match("[0-9]{6}", filename):
67 # it starts with something that looks like a 6 digit date
68 # aka good enough for me
69 dirs_to_copy.append(filename)
72 def create_copy_process(self, urlname):
73 args = copy.copy(self.cmd)
74 # args.append('--dry-run') # Makes testing easier
75 # we want to copy everything
80 args.append(self.dest_base)
81 logging.debug("Rsync cmd:" + " ".join(args))
82 return subprocess.Popen(args)
86 copy any interesting looking directories over
87 return list of items that we started copying.
89 # clean up any lingering non-running processes
92 # what's available to copy?
93 dirs_to_copy = self.list()
97 for d in dirs_to_copy:
98 process = self.processes.get(d, None)
101 # we don't have a process, so make one
102 logging.info("rsyncing %s" % (d))
103 self.processes[d] = self.create_copy_process(d)
107 def _normalize_rsync_source(self, source):
109 Make sure that we have a reasonable looking source
110 a source must be a directory/collection.
112 # we must be a directory
113 if source[-1] != '/':
115 # I suppose we could check to see if we start with rsync:// or something
120 check currently running processes to see if they're done
122 return path roots that have finished.
124 for dir_key, proc_value in self.processes.items():
125 retcode = proc_value.poll()
127 # process hasn't finished yet
130 logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
131 del self.processes[dir_key]
133 logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
137 Return how many active rsync processes we currently have
139 Call poll first to close finished processes.
141 return len(self.processes)
145 Return list of current run folder names
147 return self.processes.keys()
149 class CopierBot(rpc.XmlRpcBot):
150 def __init__(self, section=None, configfile=None):
151 #if configfile is None:
152 # configfile = '~/.htsworkflow'
154 super(CopierBot, self).__init__(section, configfile)
156 # options for rsync command
157 self.cfg['rsync_password_file'] = None
158 self.cfg['rsync_source'] = None
159 self.cfg['rsync_destination'] = None
161 # options for reporting we're done
162 self.cfg['notify_users'] = None
163 self.cfg['notify_runner'] = None
167 self.notify_users = None
168 self.notify_runner = None
170 self.register_function(self.startCopy)
171 self.register_function(self.sequencingFinished)
172 self.eventTasks.append(self.update)
174 def _init_rsync(self):
176 Initalize rsync class
178 This is only accessible for test purposes.
180 # we can't call any logging function until after start finishes.
181 # this got moved to a seperate function from run to help with test code
182 if self.rsync is None:
183 self.rsync = rsync(self.sources, self.destination, self.password)
185 def read_config(self, section=None, configfile=None):
189 super(CopierBot, self).read_config(section, configfile)
191 self.sources = shlex.split(self._check_required_option('rsync_sources'))
192 self.password = self._check_required_option('rsync_password_file')
193 self.destination = self._check_required_option('rsync_destination')
195 self.notify_users = self._parse_user_list(self.cfg['notify_users'])
197 self.notify_runner = \
198 self._parse_user_list(self.cfg['notify_runner'],
199 require_resource=True)
200 except bot.JIDMissingResource:
201 msg = 'need a full jabber ID + resource for xml-rpc destinations'
202 print >>sys.stderr, msg
203 raise bot.JIDMissingResource(msg)
210 super(CopierBot, self).run()
212 def startCopy(self, *args):
216 logging.info("starting copy scan, %s" % (args,))
217 started = self.rsync.copy()
218 logging.info("copying:" + " ".join(started)+".")
221 def sequencingFinished(self, runDir, *args):
223 The run was finished, if we're done copying, pass the message on
225 # close any open processes
228 # see if we're still copying
229 if runfolder_validate(runDir):
230 logging.info("recevied sequencing finshed for %s" % (runDir))
231 self.pending.append(runDir)
235 errmsg = "received bad runfolder name (%s)" % (runDir)
236 logging.warning(errmsg)
237 # maybe I should use a different error message
238 raise RuntimeError(errmsg)
240 def reportSequencingFinished(self, runDir):
242 Send the sequencingFinished message to the interested parties
244 if self.notify_users is not None:
245 for u in self.notify_users:
246 self.send(u, 'Sequencing run %s finished' % (runDir))
247 if self.notify_runner is not None:
248 for r in self.notify_runner:
249 self.rpc_send(r, (runDir,), 'sequencingFinished')
250 logging.info("forwarding sequencingFinshed message for %s" % (runDir))
252 def update(self, *args):
254 Update our current status.
255 Report if we've finished copying files.
258 for p in self.pending:
259 if p not in self.rsync.keys():
260 self.reportSequencingFinished(p)
261 self.pending.remove(p)
263 def _parser(self, msg, who):
265 Parse xmpp chat messages
267 help = u"I can [copy], or report current [status]"
268 if re.match(u"help", msg):
270 elif re.match("copy", msg):
271 started = self.startCopy()
272 reply = u"started copying " + ", ".join(started)
273 elif re.match(u"status", msg):
274 msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
275 for d in self.rsync.keys():
277 reply = os.linesep.join(msg)
279 reply = u"I didn't understand '%s'" % (unicode(msg))
286 if __name__ == "__main__":
287 sys.exit(main(sys.argv[1:]))