4 import logging.handlers
12 from benderjab import rpc
14 def runfolder_validate(fname):
16 Return True if fname looks like a runfolder name
18 if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
24 def __init__(self, source, dest, pwfile):
25 self.pwfile = os.path.expanduser(pwfile)
26 self.cmd = ['/usr/bin/rsync', ]
27 self.cmd.append('--password-file=%s' % (self.pwfile))
28 self.source_base = source
34 """Get a directory listing"""
36 args = copy.copy(self.cmd)
37 args.append(self.source_base)
39 logging.debug("Rsync cmd:" + " ".join(args))
40 short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
41 direntries = [ x.split() for x in short_process.stdout ]
42 for permissions, size, filedate, filetime, filename in direntries:
43 if permissions[0] == 'd':
44 # hey its a directory, the first step to being something we want to
46 if re.match("[0-9]{6}", filename):
47 # it starts with something that looks like a 6 digit date
48 # aka good enough for me
49 dirs_to_copy.append(filename)
52 def create_copy_process(self, dirname):
53 args = copy.copy(self.cmd)
54 # we want to copy everything
57 args.append(os.path.join(self.source_base, dirname))
59 args.append(self.dest_base)
60 logging.debug("Rsync cmd:" + " ".join(args))
61 return subprocess.Popen(args)
65 copy any interesting looking directories over
66 return list of items that we started copying.
68 # clean up any lingering non-running processes
71 # what's available to copy?
72 dirs_to_copy = self.list()
76 for d in dirs_to_copy:
77 process = self.processes.get(d, None)
80 # we don't have a process, so make one
81 logging.info("rsyncing %s" % (d))
82 self.processes[d] = self.create_copy_process(d)
88 check currently running processes to see if they're done
90 return path roots that have finished.
92 for dir_key, proc_value in self.processes.items():
93 retcode = proc_value.poll()
95 # process hasn't finished yet
98 logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
99 del self.processes[dir_key]
101 logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
105 Return how many active rsync processes we currently have
107 Call poll first to close finished processes.
109 return len(self.processes)
113 Return list of current run folder names
115 return self.processes.keys()
117 class CopierBot(rpc.XmlRpcBot):
118 def __init__(self, section=None, configfile=None):
119 #if configfile is None:
120 # configfile = '~/.gaworkflow'
122 super(CopierBot, self).__init__(section, configfile)
124 # options for rsync command
125 self.cfg['rsync_password_file'] = None
126 self.cfg['rsync_source'] = None
127 self.cfg['rsync_destination'] = None
129 # options for reporting we're done
130 self.cfg['notify_users'] = None
131 self.cfg['notify_runner'] = None
135 self.notify_users = None
136 self.notify_runner = None
138 self.register_function(self.startCopy)
139 self.register_function(self.runFinished)
140 self.eventTasks.append(self.update)
142 def read_config(self, section=None, configfile=None):
146 super(CopierBot, self).read_config(section, configfile)
148 def check_option(name):
149 if self.cfg[name] is None:
150 errmsg="Please specify %s in the configfile" % (name)
151 logging.fatal(errmsg)
152 raise RuntimeError(errmsg)
154 return self.cfg[name]
156 password = check_option('rsync_password_file')
157 source = check_option('rsync_source')
158 destination = check_option('rsync_destination')
159 self.rsync = rsync(source, destination, password)
161 self.notify_users = self._parse_user_list(self.cfg['notify_users'])
162 self.notify_runner = self._parse_user_list(self.cfg['notify_runner'])
164 def startCopy(self, *args):
168 logging.info("starting copy scan")
169 started = self.rsync.copy()
170 logging.info("copying:" + " ".join(started)+".")
173 def runFinished(self, runDir, *args):
175 The run was finished, if we're done copying, pass the message on
177 # close any open processes
180 # see if we're still copying
181 if runfolder_validate(runDir):
182 if runDir in self.rsync.keys():
184 self.pending.append(runDir)
185 logging.info("%s finished, but still copying" % (runDir))
189 self.reportRunFinished(runDir)
190 logging.info("%s finished" % (runDir))
193 errmsg = "received bad runfolder name (%s)" % (runDir)
194 logging.warning(errmsg)
195 # maybe I should use a different error message
196 raise RuntimeError(errmsg)
198 def reportRunFinished(self, runDir):
200 Send the runFinished message to the interested parties
202 if self.notify_users is not None:
203 for u in self.notify_users:
204 self.send(u, 'run %s finished' % (runDir))
205 if self.notify_runner is not None:
206 for r in self.notify_runner:
207 rpc.send(self.cl, self.runner, (runDir,), 'runFinished')
208 logging.info("forwarding runFinshed message for %s" % (runDir))
210 def update(self, *args):
212 Update our current status.
213 Report if we've finished copying files.
216 for p in self.pending:
217 if p not in self.rsync.keys():
218 self.reportRunFinished(p)
219 self.pending.remove(p)
221 def _parser(self, msg, who):
223 Parse xmpp chat messages
225 help = u"I can [copy], or report current [status]"
226 if re.match(u"help", msg):
228 elif re.match("copy", msg):
229 started = self.startCopy()
230 reply = u"started copying " + ", ".join(started)
231 elif re.match(u"status", msg):
232 msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
233 for d in self.rsync.keys():
235 reply = os.linesep.join(msg)
237 reply = u"I didn't understand '%s'"+os.linesep+help % (unicode(msg))
244 if __name__ == "__main__":
245 sys.exit(main(sys.argv[1:]))