4 import logging.handlers
12 from benderjab import rpc
14 def runfolder_validate(fname):
16 Return True if fname looks like a runfolder name
18 if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
24 def __init__(self, source, dest, pwfile):
25 self.pwfile = os.path.expanduser(pwfile)
26 self.cmd = ['/usr/bin/rsync', ]
27 self.cmd.append('--password-file=%s' % (self.pwfile))
28 self.source_base = source
34 """Get a directory listing"""
35 args = copy.copy(self.cmd)
36 args.append(self.source_base)
38 logging.debug("Rsync cmd:" + " ".join(args))
39 short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
40 return self.list_parser(short_process.stdout)
42 def list_filter(self, lines):
44 parse rsync directory listing
47 direntries = [ x[0:42].split() + [x[43:]] for x in lines ]
48 for permissions, size, filedate, filetime, filename in direntries:
49 if permissions[0] == 'd':
50 # hey its a directory, the first step to being something we want to
52 if re.match("[0-9]{6}", filename):
53 # it starts with something that looks like a 6 digit date
54 # aka good enough for me
55 dirs_to_copy.append(filename)
58 def create_copy_process(self, dirname):
59 args = copy.copy(self.cmd)
60 # we want to copy everything
63 args.append(os.path.join(self.source_base, dirname))
65 args.append(self.dest_base)
66 logging.debug("Rsync cmd:" + " ".join(args))
67 return subprocess.Popen(args)
71 copy any interesting looking directories over
72 return list of items that we started copying.
74 # clean up any lingering non-running processes
77 # what's available to copy?
78 dirs_to_copy = self.list()
82 for d in dirs_to_copy:
83 process = self.processes.get(d, None)
86 # we don't have a process, so make one
87 logging.info("rsyncing %s" % (d))
88 self.processes[d] = self.create_copy_process(d)
94 check currently running processes to see if they're done
96 return path roots that have finished.
98 for dir_key, proc_value in self.processes.items():
99 retcode = proc_value.poll()
101 # process hasn't finished yet
104 logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
105 del self.processes[dir_key]
107 logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
111 Return how many active rsync processes we currently have
113 Call poll first to close finished processes.
115 return len(self.processes)
119 Return list of current run folder names
121 return self.processes.keys()
123 class CopierBot(rpc.XmlRpcBot):
124 def __init__(self, section=None, configfile=None):
125 #if configfile is None:
126 # configfile = '~/.gaworkflow'
128 super(CopierBot, self).__init__(section, configfile)
130 # options for rsync command
131 self.cfg['rsync_password_file'] = None
132 self.cfg['rsync_source'] = None
133 self.cfg['rsync_destination'] = None
135 # options for reporting we're done
136 self.cfg['notify_users'] = None
137 self.cfg['notify_runner'] = None
141 self.notify_users = None
142 self.notify_runner = None
144 self.register_function(self.startCopy)
145 self.register_function(self.sequencingFinished)
146 self.eventTasks.append(self.update)
148 def read_config(self, section=None, configfile=None):
152 super(CopierBot, self).read_config(section, configfile)
154 password = self._check_required_option('rsync_password_file')
155 source = self._check_required_option('rsync_source')
156 destination = self._check_required_option('rsync_destination')
157 self.rsync = rsync(source, destination, password)
159 self.notify_users = self._parse_user_list(self.cfg['notify_users'])
161 self.notify_runner = \
162 self._parse_user_list(self.cfg['notify_runner'],
163 require_resource=True)
164 except bot.JIDMissingResource:
165 msg = 'need a full jabber ID + resource for xml-rpc destinations'
167 raise bot.JIDMissingResource(msg)
169 def startCopy(self, *args):
173 logging.info("starting copy scan")
174 started = self.rsync.copy()
175 logging.info("copying:" + " ".join(started)+".")
178 def sequencingFinished(self, runDir, *args):
180 The run was finished, if we're done copying, pass the message on
182 # close any open processes
185 # see if we're still copying
186 if runfolder_validate(runDir):
187 if runDir in self.rsync.keys():
189 self.pending.append(runDir)
190 logging.info("finished sequencing, but still copying" % (runDir))
194 self.reportSequencingFinished(runDir)
195 logging.info("finished sequencing %s" % (runDir))
198 errmsg = "received bad runfolder name (%s)" % (runDir)
199 logging.warning(errmsg)
200 # maybe I should use a different error message
201 raise RuntimeError(errmsg)
203 def reportSequencingFinished(self, runDir):
205 Send the sequencingFinished message to the interested parties
207 if self.notify_users is not None:
208 for u in self.notify_users:
209 self.send(u, 'Sequencing run %s finished' % (runDir))
210 if self.notify_runner is not None:
211 for r in self.notify_runner:
212 self.rpc_send(r, (runDir,), 'sequencingFinished')
213 logging.info("forwarding sequencingFinshed message for %s" % (runDir))
215 def update(self, *args):
217 Update our current status.
218 Report if we've finished copying files.
221 for p in self.pending:
222 if p not in self.rsync.keys():
223 self.reportSequencingFinished(p)
224 self.pending.remove(p)
226 def _parser(self, msg, who):
228 Parse xmpp chat messages
230 help = u"I can [copy], or report current [status]"
231 if re.match(u"help", msg):
233 elif re.match("copy", msg):
234 started = self.startCopy()
235 reply = u"started copying " + ", ".join(started)
236 elif re.match(u"status", msg):
237 msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
238 for d in self.rsync.keys():
240 reply = os.linesep.join(msg)
242 reply = u"I didn't understand '%s'" % (unicode(msg))
249 if __name__ == "__main__":
250 sys.exit(main(sys.argv[1:]))