Send the specific directory that needs to be copied in the startCopy message.
[htsworkflow.git] / htsworkflow / automation / copier.py
1 import ConfigParser
2 import copy
3 import logging
4 import logging.handlers
5 import os
6 import re
7 import shlex
8 import subprocess
9 import sys
10 import time
11 import traceback
12 import urlparse
13
14 from benderjab import rpc
15
16 def runfolder_validate(fname):
17     """
18     Return True if fname looks like a runfolder name
19     """
20     if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
21         return True
22     else:
23         return False
24     
25 class rsync(object):
26   def __init__(self, sources, dest, pwfile):
27     self.cmd = ['/usr/bin/rsync', ]
28     self.pwfile = os.path.expanduser(pwfile)
29     self.cmd.append('--password-file=%s' % (self.pwfile))
30     self.source_base_list = [ self._normalize_rsync_source(x) for x in sources]
31     self.dest_base = dest
32     self.processes = {}
33     self.exit_code = None
34
35   def list(self):
36     """
37     Get a directory listing for all our sources
38     """
39     logging.debug("searching for entries in: %s" % (self.source_base_list,))
40     entries = []
41     for source in self.source_base_list:
42         logging.debug("Scanning %s" % (source,))
43         args = copy.copy(self.cmd)
44         args.append(source)
45
46         logging.debug("Rsync cmd:" + " ".join(args))
47         short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
48         exit_code = short_process.wait()
49         stdout = short_process.stdout
50         # We made sure source ends in a / earlier
51         cur_list = [ source+subdir for subdir in self.list_filter(stdout)]
52         entries.extend(cur_list)
53     logging.debug(u"Found the following: %s" % (unicode(entries)))
54     return entries
55
56   def list_filter(self, lines):
57     """
58     parse rsync directory listing
59     """
60     dirs_to_copy = []
61     direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ]
62     logging.debug(u'direntries: %s' % (unicode(direntries),))
63     for permissions, size, filedate, filetime, filename in direntries:
64       if permissions[0] == 'd':
65         # hey its a directory, the first step to being something we want to 
66         # copy
67         if re.match("[0-9]{6}", filename):
68           # it starts with something that looks like a 6 digit date
69           # aka good enough for me
70           dirs_to_copy.append(filename)
71     return dirs_to_copy
72
73   def create_copy_process(self, urlname):
74     args = copy.copy(self.cmd)
75     # args.append('--dry-run') # Makes testing easier
76     # we want to copy everything
77     args.append('-rlt') 
78     # from here
79     args.append(urlname)
80     # to here
81     args.append(self.dest_base)
82     logging.debug("Rsync cmd:" + " ".join(args))
83     return subprocess.Popen(args)
84  
85   def copy(self, url_list=None):
86     """
87     copy any interesting looking directories over
88     return list of items that we started copying.
89     """
90     # clean up any lingering non-running processes
91     self.poll()
92
93     if url_list is None or len(url_list) == 0: 
94             # what's available to copy?
95         dirs_to_copy = self.list()
96     else:
97         dirs_to_copy = url_list
98    
99     logging.info("dirs to copy %s" % (dirs_to_copy,))
100
101     # lets start copying
102     started = []
103     for d in dirs_to_copy:
104       process = self.processes.get(d, None)
105       
106       if process is None:
107         # we don't have a process, so make one
108         logging.info("rsyncing %s" % (d))
109         self.processes[d] = self.create_copy_process(d)
110         started.append(d)           
111     return started
112
113   def _normalize_rsync_source(self, source):
114       """
115       Make sure that we have a reasonable looking source
116       a source must be a directory/collection.
117       """
118       # we must be a directory
119       if source[-1] != '/':
120         source += '/'
121       # I suppose we could check to see if we start with rsync:// or something
122       return source
123
124   def poll(self):
125       """
126       check currently running processes to see if they're done
127       
128       return path roots that have finished.
129       """
130       for dir_key, proc_value in self.processes.items():
131           retcode = proc_value.poll()
132           if retcode is None:
133               # process hasn't finished yet
134               pass
135           elif retcode == 0:
136               logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
137               del self.processes[dir_key]
138           else:
139               logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
140               
141   def __len__(self):
142       """
143       Return how many active rsync processes we currently have
144       
145       Call poll first to close finished processes.
146       """
147       return len(self.processes)
148   
149   def keys(self):
150       """
151       Return list of current run folder names
152       """
153       return self.processes.keys()
154
155 class CopierBot(rpc.XmlRpcBot):
156     def __init__(self, section=None, configfile=None):
157         #if configfile is None:
158         #    configfile = '~/.htsworkflow'
159             
160         super(CopierBot, self).__init__(section, configfile)
161         
162         # options for rsync command
163         self.cfg['rsync_password_file'] = None
164         self.cfg['rsync_sources'] = None
165         self.cfg['rsync_destination'] = None 
166         
167         # options for reporting we're done 
168         self.cfg['notify_users'] = None
169         self.cfg['notify_runner'] = None
170                             
171         self.pending = []
172         self.rsync = None
173         self.notify_users = None
174         self.notify_runner = None
175         
176         self.register_function(self.startCopy)
177         self.register_function(self.sequencingFinished)
178         self.eventTasks.append(self.update)
179        
180     def _init_rsync(self):
181         """
182         Initalize rsync class
183
184         This is only accessible for test purposes.
185         """
186         # we can't call any logging function until after start finishes.
187         # this got moved to a seperate function from run to help with test code
188         if self.rsync is None:
189             self.rsync = rsync(self.sources, self.destination, self.password)
190
191     def read_config(self, section=None, configfile=None):
192         """
193         read the config file
194         """
195         super(CopierBot, self).read_config(section, configfile)
196         
197         self.sources = shlex.split(self._check_required_option('rsync_sources'))
198         self.password = self._check_required_option('rsync_password_file')
199         self.destination = self._check_required_option('rsync_destination')
200         
201         self.notify_users = self._parse_user_list(self.cfg['notify_users'])
202         try:
203           self.notify_runner = \
204              self._parse_user_list(self.cfg['notify_runner'],
205                                    require_resource=True)
206         except bot.JIDMissingResource:
207             msg = 'need a full jabber ID + resource for xml-rpc destinations'
208             print >>sys.stderr, msg
209             raise bot.JIDMissingResource(msg)
210
211     def run(self):
212         """
213         Start application
214         """
215         self._init_rsync()
216         super(CopierBot, self).run()
217
218     def startCopy(self, *args):
219         """
220         start our copy
221         """
222         # Note, args comes in over the network, so don't trust it.
223         copy_urls = []
224         for a in args:
225             clean_url = self.validate_url(a)
226             if clean_url is not None:
227                 copy_urls.append(clean_url)
228
229         logging.info("Validated urls = %s" % (copy_urls,))
230         started = self.rsync.copy(copy_urls)
231         logging.info("copying:" + " ".join(started)+".")
232         return started
233         
234     def sequencingFinished(self, runDir, *args):
235         """
236         The run was finished, if we're done copying, pass the message on        
237         """
238         # close any open processes
239         self.rsync.poll()
240         
241         # see if we're still copying
242         if runfolder_validate(runDir):
243             logging.info("recevied sequencing finshed for %s" % (runDir))
244             self.pending.append(runDir)
245             self.startCopy()
246             return "PENDING"
247         else:
248             errmsg = "received bad runfolder name (%s)" % (runDir)
249             logging.warning(errmsg)
250             # maybe I should use a different error message
251             raise RuntimeError(errmsg)
252     
253     def reportSequencingFinished(self, runDir):
254         """
255         Send the sequencingFinished message to the interested parties
256         """
257         if self.notify_users is not None:
258             for u in self.notify_users:
259                 self.send(u, 'Sequencing run %s finished' % (runDir))
260         if self.notify_runner is not None:
261             for r in self.notify_runner:
262                 self.rpc_send(r, (runDir,), 'sequencingFinished')
263         logging.info("forwarding sequencingFinshed message for %s" % (runDir))
264         
265     def update(self, *args):
266         """
267         Update our current status.
268         Report if we've finished copying files.
269         """
270         self.rsync.poll()
271         for p in self.pending:
272             if p not in self.rsync.keys():
273                 self.reportSequencingFinished(p)
274                 self.pending.remove(p)
275         
276     def _parser(self, msg, who):
277         """
278         Parse xmpp chat messages
279         """
280         help = u"I can [copy], or report current [status]"
281         if re.match(u"help", msg):
282             reply = help
283         elif re.match("copy", msg):            
284             started = self.startCopy()
285             reply = u"started copying " + ", ".join(started)
286         elif re.match(u"status", msg):
287             msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
288             for d in self.rsync.keys():
289               msg.append(u"  " + d)
290             reply = os.linesep.join(msg)
291         else:
292             reply = u"I didn't understand '%s'" % (unicode(msg))
293         return reply
294
295     def validate_url(self, url):
296         split_url = urlparse.urlsplit(url)
297         for source in self.sources:
298             split_source = urlparse.urlsplit(source)
299             if (split_url.scheme == split_source.scheme) and \
300                (split_url.netloc == split_source.netloc) and \
301                (split_url.path.startswith(split_source.path)):
302                return url
303         return None
304
305 def main(args=None):
306     bot = CopierBot()
307     bot.main(args)
308     
309 if __name__ == "__main__":
310   sys.exit(main(sys.argv[1:]))
311