Split rsync initialization out of run so we can test
[htsworkflow.git] / htsworkflow / automation / copier.py
1 import ConfigParser
2 import copy
3 import logging
4 import logging.handlers
5 import os
6 import re
7 import shlex
8 import subprocess
9 import sys
10 import time
11 import traceback
12
13 from benderjab import rpc
14
15 def runfolder_validate(fname):
16     """
17     Return True if fname looks like a runfolder name
18     """
19     if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
20         return True
21     else:
22         return False
23     
24 class rsync(object):
25   def __init__(self, sources, dest, pwfile):
26     self.cmd = ['/usr/bin/rsync', ]
27     self.pwfile = os.path.expanduser(pwfile)
28     self.cmd.append('--password-file=%s' % (self.pwfile))
29     self.source_base_list = [ self._normalize_rsync_source(x) for x in sources]
30     self.dest_base = dest
31     self.processes = {}
32     self.exit_code = None
33
34   def list(self):
35     """
36     Get a directory listing for all our sources
37     """
38     logging.debug("searching for entries in: %s" % (self.source_base_list,))
39     entries = []
40     for source in self.source_base_list:
41         logging.debug("Scanning %s" % (source,))
42         args = copy.copy(self.cmd)
43         args.append(source)
44
45         logging.debug("Rsync cmd:" + " ".join(args))
46         short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
47         exit_code = short_process.wait()
48         stdout = short_process.stdout
49         # We made sure source ends in a / earlier
50         cur_list = [ source+subdir for subdir in self.list_filter(stdout)]
51         entries.extend(cur_list)
52     logging.debug(u"Found the following: %s" % (unicode(entries)))
53     return entries
54
55   def list_filter(self, lines):
56     """
57     parse rsync directory listing
58     """
59     dirs_to_copy = []
60     direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ]
61     logging.debug(u'direntries: %s' % (unicode(direntries),))
62     for permissions, size, filedate, filetime, filename in direntries:
63       if permissions[0] == 'd':
64         # hey its a directory, the first step to being something we want to 
65         # copy
66         if re.match("[0-9]{6}", filename):
67           # it starts with something that looks like a 6 digit date
68           # aka good enough for me
69           dirs_to_copy.append(filename)
70     return dirs_to_copy
71
72   def create_copy_process(self, urlname):
73     args = copy.copy(self.cmd)
74     # args.append('--dry-run') # Makes testing easier
75     # we want to copy everything
76     args.append('-rlt') 
77     # from here
78     args.append(urlname)
79     # to here
80     args.append(self.dest_base)
81     logging.debug("Rsync cmd:" + " ".join(args))
82     return subprocess.Popen(args)
83  
84   def copy(self):
85     """
86     copy any interesting looking directories over
87     return list of items that we started copying.
88     """
89     # clean up any lingering non-running processes
90     self.poll()
91     
92     # what's available to copy?
93     dirs_to_copy = self.list()
94     
95     # lets start copying
96     started = []
97     for d in dirs_to_copy:
98       process = self.processes.get(d, None)
99       
100       if process is None:
101         # we don't have a process, so make one
102         logging.info("rsyncing %s" % (d))
103         self.processes[d] = self.create_copy_process(d)
104         started.append(d)           
105     return started
106
107   def _normalize_rsync_source(self, source):
108       """
109       Make sure that we have a reasonable looking source
110       a source must be a directory/collection.
111       """
112       # we must be a directory
113       if source[-1] != '/':
114         source += '/'
115       # I suppose we could check to see if we start with rsync:// or something
116       return source
117
118   def poll(self):
119       """
120       check currently running processes to see if they're done
121       
122       return path roots that have finished.
123       """
124       for dir_key, proc_value in self.processes.items():
125           retcode = proc_value.poll()
126           if retcode is None:
127               # process hasn't finished yet
128               pass
129           elif retcode == 0:
130               logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
131               del self.processes[dir_key]
132           else:
133               logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
134               
135   def __len__(self):
136       """
137       Return how many active rsync processes we currently have
138       
139       Call poll first to close finished processes.
140       """
141       return len(self.processes)
142   
143   def keys(self):
144       """
145       Return list of current run folder names
146       """
147       return self.processes.keys()
148
149 class CopierBot(rpc.XmlRpcBot):
150     def __init__(self, section=None, configfile=None):
151         #if configfile is None:
152         #    configfile = '~/.htsworkflow'
153             
154         super(CopierBot, self).__init__(section, configfile)
155         
156         # options for rsync command
157         self.cfg['rsync_password_file'] = None
158         self.cfg['rsync_source'] = None
159         self.cfg['rsync_destination'] = None 
160         
161         # options for reporting we're done 
162         self.cfg['notify_users'] = None
163         self.cfg['notify_runner'] = None
164                             
165         self.pending = []
166         self.rsync = None
167         self.notify_users = None
168         self.notify_runner = None
169         
170         self.register_function(self.startCopy)
171         self.register_function(self.sequencingFinished)
172         self.eventTasks.append(self.update)
173        
174     def _init_rsync(self):
175         """
176         Initalize rsync class
177
178         This is only accessible for test purposes.
179         """
180         # we can't call any logging function until after start finishes.
181         # this got moved to a seperate function from run to help with test code
182         if self.rsync is None:
183             self.rsync = rsync(self.sources, self.destination, self.password)
184
185     def read_config(self, section=None, configfile=None):
186         """
187         read the config file
188         """
189         super(CopierBot, self).read_config(section, configfile)
190         
191         self.sources = shlex.split(self._check_required_option('rsync_sources'))
192         self.password = self._check_required_option('rsync_password_file')
193         self.destination = self._check_required_option('rsync_destination')
194         
195         self.notify_users = self._parse_user_list(self.cfg['notify_users'])
196         try:
197           self.notify_runner = \
198              self._parse_user_list(self.cfg['notify_runner'],
199                                    require_resource=True)
200         except bot.JIDMissingResource:
201             msg = 'need a full jabber ID + resource for xml-rpc destinations'
202             print >>sys.stderr, msg
203             raise bot.JIDMissingResource(msg)
204
205     def run(self):
206         """
207         Start application
208         """
209         self._init_rsync()
210         super(CopierBot, self).run()
211
212     def startCopy(self, *args):
213         """
214         start our copy
215         """
216         logging.info("starting copy scan, %s" % (args,))
217         started = self.rsync.copy()
218         logging.info("copying:" + " ".join(started)+".")
219         return started
220         
221     def sequencingFinished(self, runDir, *args):
222         """
223         The run was finished, if we're done copying, pass the message on        
224         """
225         # close any open processes
226         self.rsync.poll()
227         
228         # see if we're still copying
229         if runfolder_validate(runDir):
230             logging.info("recevied sequencing finshed for %s" % (runDir))
231             self.pending.append(runDir)
232             self.startCopy()
233             return "PENDING"
234         else:
235             errmsg = "received bad runfolder name (%s)" % (runDir)
236             logging.warning(errmsg)
237             # maybe I should use a different error message
238             raise RuntimeError(errmsg)
239     
240     def reportSequencingFinished(self, runDir):
241         """
242         Send the sequencingFinished message to the interested parties
243         """
244         if self.notify_users is not None:
245             for u in self.notify_users:
246                 self.send(u, 'Sequencing run %s finished' % (runDir))
247         if self.notify_runner is not None:
248             for r in self.notify_runner:
249                 self.rpc_send(r, (runDir,), 'sequencingFinished')
250         logging.info("forwarding sequencingFinshed message for %s" % (runDir))
251         
252     def update(self, *args):
253         """
254         Update our current status.
255         Report if we've finished copying files.
256         """
257         self.rsync.poll()
258         for p in self.pending:
259             if p not in self.rsync.keys():
260                 self.reportSequencingFinished(p)
261                 self.pending.remove(p)
262         
263     def _parser(self, msg, who):
264         """
265         Parse xmpp chat messages
266         """
267         help = u"I can [copy], or report current [status]"
268         if re.match(u"help", msg):
269             reply = help
270         elif re.match("copy", msg):            
271             started = self.startCopy()
272             reply = u"started copying " + ", ".join(started)
273         elif re.match(u"status", msg):
274             msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
275             for d in self.rsync.keys():
276               msg.append(u"  " + d)
277             reply = os.linesep.join(msg)
278         else:
279             reply = u"I didn't understand '%s'" % (unicode(msg))
280         return reply
281
282 def main(args=None):
283     bot = CopierBot()
284     bot.main(args)
285     
286 if __name__ == "__main__":
287   sys.exit(main(sys.argv[1:]))
288