4 import logging.handlers
12 from benderjab import rpc
14 def runfolder_validate(fname):
16 Return True if fname looks like a runfolder name
18 if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
24 def __init__(self, source, dest, pwfile):
25 self.pwfile = os.path.expanduser(pwfile)
26 self.cmd = ['/usr/bin/rsync', ]
27 self.cmd.append('--password-file=%s' % (self.pwfile))
28 self.source_base = source
34 """Get a directory listing"""
36 args = copy.copy(self.cmd)
37 args.append(self.source_base)
39 logging.debug("Rsync cmd:" + " ".join(args))
40 short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
41 direntries = [ x.split() for x in short_process.stdout ]
42 for permissions, size, filedate, filetime, filename in direntries:
43 if permissions[0] == 'd':
44 # hey its a directory, the first step to being something we want to
46 if re.match("[0-9]{6}", filename):
47 # it starts with something that looks like a 6 digit date
48 # aka good enough for me
49 dirs_to_copy.append(filename)
52 def create_copy_process(self, dirname):
53 args = copy.copy(self.cmd)
54 # we want to copy everything
57 args.append(os.path.join(self.source_base, dirname))
59 args.append(self.dest_base)
60 logging.debug("Rsync cmd:" + " ".join(args))
61 return subprocess.Popen(args)
65 copy any interesting looking directories over
66 return list of items that we started copying.
68 # clean up any lingering non-running processes
71 # what's available to copy?
72 dirs_to_copy = self.list()
76 for d in dirs_to_copy:
77 process = self.processes.get(d, None)
80 # we don't have a process, so make one
81 logging.info("rsyncing %s" % (d))
82 self.processes[d] = self.create_copy_process(d)
88 check currently running processes to see if they're done
90 return path roots that have finished.
92 for dir_key, proc_value in self.processes.items():
93 retcode = proc_value.poll()
95 # process hasn't finished yet
98 logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
99 del self.processes[dir_key]
101 logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
105 Return how many active rsync processes we currently have
107 Call poll first to close finished processes.
109 return len(self.processes)
113 Return list of current run folder names
115 return self.processes.keys()
117 class CopierBot(rpc.XmlRpcBot):
118 def __init__(self, section=None, configfile=None):
119 #if configfile is None:
120 # configfile = '~/.gaworkflow'
122 super(CopierBot, self).__init__(section, configfile)
124 # options for rsync command
125 self.cfg['rsync_password_file'] = None
126 self.cfg['rsync_source'] = None
127 self.cfg['rsync_destination'] = None
129 # options for reporting we're done
130 self.cfg['notify_users'] = None
131 self.cfg['notify_runner'] = None
135 self.notify_users = None
136 self.notify_runner = None
138 self.register_function(self.startCopy)
139 self.register_function(self.runFinished)
140 self.eventTasks.append(self.update)
142 def read_config(self, section=None, configfile=None):
146 super(CopierBot, self).read_config(section, configfile)
148 password = self._check_required_option('rsync_password_file')
149 source = self._check_required_option('rsync_source')
150 destination = self._check_required_option('rsync_destination')
151 self.rsync = rsync(source, destination, password)
153 self.notify_users = self._parse_user_list(self.cfg['notify_users'])
155 self.notify_runner = \
156 self._parse_user_list(self.cfg['notify_runner'],
157 require_resource=True)
158 except bot.JIDMissingResource:
159 msg = 'need a full jabber ID + resource for xml-rpc destinations'
161 raise bot.JIDMissingResource(msg)
163 def startCopy(self, *args):
167 logging.info("starting copy scan")
168 started = self.rsync.copy()
169 logging.info("copying:" + " ".join(started)+".")
172 def runFinished(self, runDir, *args):
174 The run was finished, if we're done copying, pass the message on
176 # close any open processes
179 # see if we're still copying
180 if runfolder_validate(runDir):
181 if runDir in self.rsync.keys():
183 self.pending.append(runDir)
184 logging.info("%s finished, but still copying" % (runDir))
188 self.reportRunFinished(runDir)
189 logging.info("%s finished" % (runDir))
192 errmsg = "received bad runfolder name (%s)" % (runDir)
193 logging.warning(errmsg)
194 # maybe I should use a different error message
195 raise RuntimeError(errmsg)
197 def reportRunFinished(self, runDir):
199 Send the runFinished message to the interested parties
201 if self.notify_users is not None:
202 for u in self.notify_users:
203 self.send(u, 'run %s finished' % (runDir))
204 if self.notify_runner is not None:
205 for r in self.notify_runner:
206 rpc.send(self.cl, self.runner, (runDir,), 'runFinished')
207 logging.info("forwarding runFinshed message for %s" % (runDir))
209 def update(self, *args):
211 Update our current status.
212 Report if we've finished copying files.
215 for p in self.pending:
216 if p not in self.rsync.keys():
217 self.reportRunFinished(p)
218 self.pending.remove(p)
220 def _parser(self, msg, who):
222 Parse xmpp chat messages
224 help = u"I can [copy], or report current [status]"
225 if re.match(u"help", msg):
227 elif re.match("copy", msg):
228 started = self.startCopy()
229 reply = u"started copying " + ", ".join(started)
230 elif re.match(u"status", msg):
231 msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
232 for d in self.rsync.keys():
234 reply = os.linesep.join(msg)
236 reply = u"I didn't understand '%s'"+os.linesep+help % (unicode(msg))
243 if __name__ == "__main__":
244 sys.exit(main(sys.argv[1:]))