4 import logging.handlers
12 from benderjab import rpc
14 def runfolder_validate(fname):
16 Return True if fname looks like a runfolder name
18 if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
24 def __init__(self, source, dest, pwfile):
25 self.pwfile = os.path.expanduser(pwfile)
26 self.cmd = ['/usr/bin/rsync', ]
27 self.cmd.append('--password-file=%s' % (self.pwfile))
28 self.source_base = source
34 """Get a directory listing"""
35 args = copy.copy(self.cmd)
36 args.append(self.source_base)
38 logging.debug("Rsync cmd:" + " ".join(args))
39 short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
40 return self.list_filter(short_process.stdout)
42 def list_filter(self, lines):
44 parse rsync directory listing
47 direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ]
48 for permissions, size, filedate, filetime, filename in direntries:
49 if permissions[0] == 'd':
50 # hey its a directory, the first step to being something we want to
52 if re.match("[0-9]{6}", filename):
53 # it starts with something that looks like a 6 digit date
54 # aka good enough for me
55 dirs_to_copy.append(filename)
58 def create_copy_process(self, dirname):
59 args = copy.copy(self.cmd)
60 # we want to copy everything
63 args.append(os.path.join(self.source_base, dirname))
65 args.append(self.dest_base)
66 logging.debug("Rsync cmd:" + " ".join(args))
67 return subprocess.Popen(args)
71 copy any interesting looking directories over
72 return list of items that we started copying.
74 # clean up any lingering non-running processes
77 # what's available to copy?
78 dirs_to_copy = self.list()
82 for d in dirs_to_copy:
83 process = self.processes.get(d, None)
86 # we don't have a process, so make one
87 logging.info("rsyncing %s" % (d))
88 self.processes[d] = self.create_copy_process(d)
94 check currently running processes to see if they're done
96 return path roots that have finished.
98 for dir_key, proc_value in self.processes.items():
99 retcode = proc_value.poll()
101 # process hasn't finished yet
104 logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
105 del self.processes[dir_key]
107 logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
111 Return how many active rsync processes we currently have
113 Call poll first to close finished processes.
115 return len(self.processes)
119 Return list of current run folder names
121 return self.processes.keys()
123 class CopierBot(rpc.XmlRpcBot):
124 def __init__(self, section=None, configfile=None):
125 #if configfile is None:
126 # configfile = '~/.htsworkflow'
128 super(CopierBot, self).__init__(section, configfile)
130 # options for rsync command
131 self.cfg['rsync_password_file'] = None
132 self.cfg['rsync_source'] = None
133 self.cfg['rsync_destination'] = None
135 # options for reporting we're done
136 self.cfg['notify_users'] = None
137 self.cfg['notify_runner'] = None
141 self.notify_users = None
142 self.notify_runner = None
144 self.register_function(self.startCopy)
145 self.register_function(self.sequencingFinished)
146 self.eventTasks.append(self.update)
148 def read_config(self, section=None, configfile=None):
152 super(CopierBot, self).read_config(section, configfile)
154 password = self._check_required_option('rsync_password_file')
155 source = self._check_required_option('rsync_source')
156 destination = self._check_required_option('rsync_destination')
157 self.rsync = rsync(source, destination, password)
159 self.notify_users = self._parse_user_list(self.cfg['notify_users'])
161 self.notify_runner = \
162 self._parse_user_list(self.cfg['notify_runner'],
163 require_resource=True)
164 except bot.JIDMissingResource:
165 msg = 'need a full jabber ID + resource for xml-rpc destinations'
167 raise bot.JIDMissingResource(msg)
169 def startCopy(self, *args):
173 logging.info("starting copy scan")
174 started = self.rsync.copy()
175 logging.info("copying:" + " ".join(started)+".")
178 def sequencingFinished(self, runDir, *args):
180 The run was finished, if we're done copying, pass the message on
182 # close any open processes
185 # see if we're still copying
186 if runfolder_validate(runDir):
187 logging.info("recevied sequencing finshed for %s" % (runDir))
188 self.pending.append(runDir)
192 errmsg = "received bad runfolder name (%s)" % (runDir)
193 logging.warning(errmsg)
194 # maybe I should use a different error message
195 raise RuntimeError(errmsg)
197 def reportSequencingFinished(self, runDir):
199 Send the sequencingFinished message to the interested parties
201 if self.notify_users is not None:
202 for u in self.notify_users:
203 self.send(u, 'Sequencing run %s finished' % (runDir))
204 if self.notify_runner is not None:
205 for r in self.notify_runner:
206 self.rpc_send(r, (runDir,), 'sequencingFinished')
207 logging.info("forwarding sequencingFinshed message for %s" % (runDir))
209 def update(self, *args):
211 Update our current status.
212 Report if we've finished copying files.
215 for p in self.pending:
216 if p not in self.rsync.keys():
217 self.reportSequencingFinished(p)
218 self.pending.remove(p)
220 def _parser(self, msg, who):
222 Parse xmpp chat messages
224 help = u"I can [copy], or report current [status]"
225 if re.match(u"help", msg):
227 elif re.match("copy", msg):
228 started = self.startCopy()
229 reply = u"started copying " + ", ".join(started)
230 elif re.match(u"status", msg):
231 msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
232 for d in self.rsync.keys():
234 reply = os.linesep.join(msg)
236 reply = u"I didn't understand '%s'" % (unicode(msg))
243 if __name__ == "__main__":
244 sys.exit(main(sys.argv[1:]))