Move some common runfolder path management code into its own module
[htsworkflow.git] / htsworkflow / automation / copier.py
1 import ConfigParser
2 import copy
3 import logging
4 import logging.handlers
5 import os
6 import re
7 import shlex
8 import subprocess
9 import sys
10 import time
11 import traceback
12 import urlparse
13
14 from benderjab import rpc
15
16 from htsworkflow.automation.solexa import is_runfolder
17     
18 class rsync(object):
19   def __init__(self, sources, dest, pwfile):
20     self.cmd = ['/usr/bin/rsync', ]
21     self.pwfile = os.path.expanduser(pwfile)
22     self.cmd.append('--password-file=%s' % (self.pwfile))
23     self.source_base_list = [ self._normalize_rsync_source(x) for x in sources]
24     self.dest_base = dest
25     self.processes = {}
26     self.exit_code = None
27
28   def list(self):
29     """
30     Get a directory listing for all our sources
31     """
32     logging.debug("searching for entries in: %s" % (self.source_base_list,))
33     entries = []
34     for source in self.source_base_list:
35         logging.debug("Scanning %s" % (source,))
36         args = copy.copy(self.cmd)
37         args.append(source)
38
39         logging.debug("Rsync cmd:" + " ".join(args))
40         short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
41         exit_code = short_process.wait()
42         stdout = short_process.stdout
43         # We made sure source ends in a / earlier
44         cur_list = [ source+subdir for subdir in self.list_filter(stdout)]
45         entries.extend(cur_list)
46     logging.debug(u"Found the following: %s" % (unicode(entries)))
47     return entries
48
49   def list_filter(self, lines):
50     """
51     parse rsync directory listing
52     """
53     dirs_to_copy = []
54     direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ]
55     logging.debug(u'direntries: %s' % (unicode(direntries),))
56     for permissions, size, filedate, filetime, filename in direntries:
57       if permissions[0] == 'd':
58         # hey its a directory, the first step to being something we want to 
59         # copy
60         if re.match("[0-9]{6}", filename):
61           # it starts with something that looks like a 6 digit date
62           # aka good enough for me
63           dirs_to_copy.append(filename)
64     return dirs_to_copy
65
66   def create_copy_process(self, urlname):
67     args = copy.copy(self.cmd)
68     # args.append('--dry-run') # Makes testing easier
69     # we want to copy everything
70     args.append('-rlt') 
71     # from here
72     args.append(urlname)
73     # to here
74     args.append(self.dest_base)
75     logging.debug("Rsync cmd:" + " ".join(args))
76     return subprocess.Popen(args)
77  
78   def copy(self, url_list=None):
79     """
80     copy any interesting looking directories over
81     return list of items that we started copying.
82     """
83     # clean up any lingering non-running processes
84     self.poll()
85
86     if url_list is None or len(url_list) == 0: 
87             # what's available to copy?
88         dirs_to_copy = self.list()
89     else:
90         dirs_to_copy = url_list
91    
92     logging.info("dirs to copy %s" % (dirs_to_copy,))
93
94     # lets start copying
95     started = []
96     for d in dirs_to_copy:
97       process = self.processes.get(d, None)
98       
99       if process is None:
100         # we don't have a process, so make one
101         logging.info("rsyncing %s" % (d))
102         self.processes[d] = self.create_copy_process(d)
103         started.append(d)           
104     return started
105
106   def _normalize_rsync_source(self, source):
107       """
108       Make sure that we have a reasonable looking source
109       a source must be a directory/collection.
110       """
111       # we must be a directory
112       if source[-1] != '/':
113         source += '/'
114       # I suppose we could check to see if we start with rsync:// or something
115       return source
116
117   def poll(self):
118       """
119       check currently running processes to see if they're done
120       
121       return path roots that have finished.
122       """
123       for dir_key, proc_value in self.processes.items():
124           retcode = proc_value.poll()
125           if retcode is None:
126               # process hasn't finished yet
127               pass
128           elif retcode == 0:
129               logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
130               del self.processes[dir_key]
131           else:
132               logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
133               
134   def __len__(self):
135       """
136       Return how many active rsync processes we currently have
137       
138       Call poll first to close finished processes.
139       """
140       return len(self.processes)
141   
142   def keys(self):
143       """
144       Return list of current run folder names
145       """
146       return self.processes.keys()
147
148 class CopierBot(rpc.XmlRpcBot):
149     def __init__(self, section=None, configfile=None):
150         #if configfile is None:
151         #    configfile = '~/.htsworkflow'
152             
153         super(CopierBot, self).__init__(section, configfile)
154         
155         # options for rsync command
156         self.cfg['rsync_password_file'] = None
157         self.cfg['rsync_sources'] = None
158         self.cfg['rsync_destination'] = None 
159         
160         # options for reporting we're done 
161         self.cfg['notify_users'] = None
162         self.cfg['notify_runner'] = None
163                             
164         self.pending = []
165         self.rsync = None
166         self.notify_users = None
167         self.notify_runner = None
168         
169         self.register_function(self.startCopy)
170         self.register_function(self.sequencingFinished)
171         self.eventTasks.append(self.update)
172        
173     def _init_rsync(self):
174         """
175         Initalize rsync class
176
177         This is only accessible for test purposes.
178         """
179         # we can't call any logging function until after start finishes.
180         # this got moved to a seperate function from run to help with test code
181         if self.rsync is None:
182             self.rsync = rsync(self.sources, self.destination, self.password)
183
184     def read_config(self, section=None, configfile=None):
185         """
186         read the config file
187         """
188         super(CopierBot, self).read_config(section, configfile)
189         
190         self.sources = shlex.split(self._check_required_option('rsync_sources'))
191         self.password = self._check_required_option('rsync_password_file')
192         self.destination = self._check_required_option('rsync_destination')
193         
194         self.notify_users = self._parse_user_list(self.cfg['notify_users'])
195         try:
196           self.notify_runner = \
197              self._parse_user_list(self.cfg['notify_runner'],
198                                    require_resource=True)
199         except bot.JIDMissingResource:
200             msg = 'need a full jabber ID + resource for xml-rpc destinations'
201             print >>sys.stderr, msg
202             raise bot.JIDMissingResource(msg)
203
204     def run(self):
205         """
206         Start application
207         """
208         self._init_rsync()
209         super(CopierBot, self).run()
210
211     def startCopy(self, *args):
212         """
213         start our copy
214         """
215         # Note, args comes in over the network, so don't trust it.
216         logging.debug("Arguments to startCopy %s" % (unicode(args),))
217         copy_urls = []
218         for a in args:
219             clean_url = self.validate_url(a)
220             if clean_url is not None:
221                 copy_urls.append(clean_url)
222
223         logging.info("Validated urls = %s" % (copy_urls,))
224         started = self.rsync.copy(copy_urls)
225         logging.info("copying:" + " ".join(started)+".")
226         return started
227         
228     def sequencingFinished(self, runDir, *args):
229         """
230         The run was finished, if we're done copying, pass the message on        
231         """
232         # close any open processes
233         self.rsync.poll()
234         
235         # see if we're still copying
236         if is_runfolder(runDir):
237             logging.info("recevied sequencing finshed for %s" % (runDir))
238             self.pending.append(runDir)
239             self.startCopy()
240             return "PENDING"
241         else:
242             errmsg = "received bad runfolder name (%s)" % (runDir)
243             logging.warning(errmsg)
244             # maybe I should use a different error message
245             raise RuntimeError(errmsg)
246     
247     def reportSequencingFinished(self, runDir):
248         """
249         Send the sequencingFinished message to the interested parties
250         """
251         if self.notify_users is not None:
252             for u in self.notify_users:
253                 self.send(u, 'Sequencing run %s finished' % (runDir))
254         if self.notify_runner is not None:
255             for r in self.notify_runner:
256                 self.rpc_send(r, (runDir,), 'sequencingFinished')
257         logging.info("forwarding sequencingFinshed message for %s" % (runDir))
258         
259     def update(self, *args):
260         """
261         Update our current status.
262         Report if we've finished copying files.
263         """
264         self.rsync.poll()
265         for p in self.pending:
266             if p not in self.rsync.keys():
267                 self.reportSequencingFinished(p)
268                 self.pending.remove(p)
269         
270     def _parser(self, msg, who):
271         """
272         Parse xmpp chat messages
273         """
274         help = u"I can [copy], or report current [status]"
275         if re.match(u"help", msg):
276             reply = help
277         elif re.match("copy", msg):            
278             started = self.startCopy()
279             reply = u"started copying " + ", ".join(started)
280         elif re.match(u"status", msg):
281             msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
282             for d in self.rsync.keys():
283               msg.append(u"  " + d)
284             reply = os.linesep.join(msg)
285         else:
286             reply = u"I didn't understand '%s'" % (unicode(msg))
287         return reply
288
289     def validate_url(self, url):
290         user_url = urlparse.urlsplit(url)
291         user_scheme = user_url[0]
292         user_netloc = user_url[1]
293         user_path = user_url[2]
294
295         for source in self.sources:
296             source_url = urlparse.urlsplit(source)
297             source_scheme = source_url[0]
298             source_netloc = source_url[1]
299             source_path = source_url[2]
300             if (user_scheme == source_scheme) and \
301                (user_netloc == source_netloc) and \
302                (user_path.startswith(source_path)):
303                return url
304         return None
305
306 def main(args=None):
307     bot = CopierBot()
308     bot.main(args)
309     
310 if __name__ == "__main__":
311   sys.exit(main(sys.argv[1:]))
312