4 import logging.handlers
12 from benderjab import rpc
14 def runfolder_validate(fname):
16 Return True if fname looks like a runfolder name
18 if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
24 def __init__(self, source, dest, pwfile):
25 self.pwfile = os.path.expanduser(pwfile)
26 self.cmd = ['/usr/bin/rsync', ]
27 self.cmd.append('--password-file=%s' % (self.pwfile))
28 self.source_base = source
34 """Get a directory listing"""
36 args = copy.copy(self.cmd)
37 args.append(self.source_base)
39 logging.debug("Rsync cmd:" + " ".join(args))
40 short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
41 direntries = [ x.split() for x in short_process.stdout ]
42 for permissions, size, filedate, filetime, filename in direntries:
43 if permissions[0] == 'd':
44 # hey its a directory, the first step to being something we want to
46 if re.match("[0-9]{6}", filename):
47 # it starts with something that looks like a 6 digit date
48 # aka good enough for me
49 dirs_to_copy.append(filename)
52 def create_copy_process(self, dirname):
53 args = copy.copy(self.cmd)
54 # we want to copy everything
57 args.append(os.path.join(self.source_base, dirname))
59 args.append(self.dest_base)
60 logging.debug("Rsync cmd:" + " ".join(args))
61 return subprocess.Popen(args)
65 copy any interesting looking directories over
66 return list of items that we started copying.
68 # clean up any lingering non-running processes
71 # what's available to copy?
72 dirs_to_copy = self.list()
76 for d in dirs_to_copy:
77 process = self.processes.get(d, None)
80 # we don't have a process, so make one
81 logging.info("rsyncing %s" % (d))
82 self.processes[d] = self.create_copy_process(d)
88 check currently running processes to see if they're done
90 return path roots that have finished.
92 for dir_key, proc_value in self.processes.items():
93 retcode = proc_value.poll()
95 # process hasn't finished yet
98 logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
99 del self.processes[dir_key]
101 logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
105 Return how many active rsync processes we currently have
107 Call poll first to close finished processes.
109 return len(self.processes)
113 Return list of current run folder names
115 return self.processes.keys()
117 class CopierBot(rpc.XmlRpcBot):
118 def __init__(self, section=None, configfile=None):
119 #if configfile is None:
120 # configfile = '~/.gaworkflow'
122 super(CopierBot, self).__init__(section, configfile)
124 # options for rsync command
125 self.cfg['rsync_password_file'] = None
126 self.cfg['rsync_source'] = None
127 self.cfg['rsync_destination'] = None
129 # options for reporting we're done
130 self.cfg['notify_users'] = None
131 self.cfg['notify_runner'] = None
135 self.notify_users = None
136 self.notify_runner = None
138 self.register_function(self.startCopy)
139 self.register_function(self.runFinished)
140 self.eventTasks.append(self.update)
142 def read_config(self, section=None, configfile=None):
146 super(CopierBot, self).read_config(section, configfile)
148 password = self._check_required_option('rsync_password_file')
149 source = self._check_required_option('rsync_source')
150 destination = self._check_required_option('rsync_destination')
151 self.rsync = rsync(source, destination, password)
153 self.notify_users = self._parse_user_list(self.cfg['notify_users'])
154 self.notify_runner = self._parse_user_list(self.cfg['notify_runner'])
156 def startCopy(self, *args):
160 logging.info("starting copy scan")
161 started = self.rsync.copy()
162 logging.info("copying:" + " ".join(started)+".")
165 def runFinished(self, runDir, *args):
167 The run was finished, if we're done copying, pass the message on
169 # close any open processes
172 # see if we're still copying
173 if runfolder_validate(runDir):
174 if runDir in self.rsync.keys():
176 self.pending.append(runDir)
177 logging.info("%s finished, but still copying" % (runDir))
181 self.reportRunFinished(runDir)
182 logging.info("%s finished" % (runDir))
185 errmsg = "received bad runfolder name (%s)" % (runDir)
186 logging.warning(errmsg)
187 # maybe I should use a different error message
188 raise RuntimeError(errmsg)
190 def reportRunFinished(self, runDir):
192 Send the runFinished message to the interested parties
194 if self.notify_users is not None:
195 for u in self.notify_users:
196 self.send(u, 'run %s finished' % (runDir))
197 if self.notify_runner is not None:
198 for r in self.notify_runner:
199 rpc.send(self.cl, self.runner, (runDir,), 'runFinished')
200 logging.info("forwarding runFinshed message for %s" % (runDir))
202 def update(self, *args):
204 Update our current status.
205 Report if we've finished copying files.
208 for p in self.pending:
209 if p not in self.rsync.keys():
210 self.reportRunFinished(p)
211 self.pending.remove(p)
213 def _parser(self, msg, who):
215 Parse xmpp chat messages
217 help = u"I can [copy], or report current [status]"
218 if re.match(u"help", msg):
220 elif re.match("copy", msg):
221 started = self.startCopy()
222 reply = u"started copying " + ", ".join(started)
223 elif re.match(u"status", msg):
224 msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
225 for d in self.rsync.keys():
227 reply = os.linesep.join(msg)
229 reply = u"I didn't understand '%s'"+os.linesep+help % (unicode(msg))
236 if __name__ == "__main__":
237 sys.exit(main(sys.argv[1:]))