forgot to rename the caller and called when refactoring the dirlist parsing
[htsworkflow.git] / gaworkflow / copier.py
1 import ConfigParser
2 import copy
3 import logging
4 import logging.handlers
5 import os
6 import re
7 import subprocess
8 import sys
9 import time
10 import traceback
11
12 from benderjab import rpc
13
14 def runfolder_validate(fname):
15     """
16     Return True if fname looks like a runfolder name
17     """
18     if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
19         return True
20     else:
21         return False
22     
23 class rsync(object):
24   def __init__(self, source, dest, pwfile):
25     self.pwfile = os.path.expanduser(pwfile)
26     self.cmd = ['/usr/bin/rsync', ]
27     self.cmd.append('--password-file=%s' % (self.pwfile))
28     self.source_base = source
29     self.dest_base = dest
30     self.processes = {}
31     self.exit_code = None
32
33   def list(self):
34     """Get a directory listing"""
35     args = copy.copy(self.cmd)
36     args.append(self.source_base)
37
38     logging.debug("Rsync cmd:" + " ".join(args))
39     short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
40     return self.list_filter(short_process.stdout)
41
42   def list_filter(self, lines):
43     """
44     parse rsync directory listing
45     """
46     dirs_to_copy = []
47     direntries = [ x[0:42].split() + [x[43:]] for x in lines ]
48     for permissions, size, filedate, filetime, filename in direntries:
49       if permissions[0] == 'd':
50         # hey its a directory, the first step to being something we want to 
51         # copy
52         if re.match("[0-9]{6}", filename):
53           # it starts with something that looks like a 6 digit date
54           # aka good enough for me
55           dirs_to_copy.append(filename)
56     return dirs_to_copy
57
58   def create_copy_process(self, dirname):
59     args = copy.copy(self.cmd)
60     # we want to copy everything
61     args.append('-rlt') 
62     # from here
63     args.append(os.path.join(self.source_base, dirname))
64     # to here
65     args.append(self.dest_base)
66     logging.debug("Rsync cmd:" + " ".join(args))
67     return subprocess.Popen(args)
68  
69   def copy(self):
70     """
71     copy any interesting looking directories over
72     return list of items that we started copying.
73     """
74     # clean up any lingering non-running processes
75     self.poll()
76     
77     # what's available to copy?
78     dirs_to_copy = self.list()
79     
80     # lets start copying
81     started = []
82     for d in dirs_to_copy:
83       process = self.processes.get(d, None)
84       
85       if process is None:
86         # we don't have a process, so make one
87         logging.info("rsyncing %s" % (d))
88         self.processes[d] = self.create_copy_process(d)
89         started.append(d)           
90     return started
91       
92   def poll(self):
93       """
94       check currently running processes to see if they're done
95       
96       return path roots that have finished.
97       """
98       for dir_key, proc_value in self.processes.items():
99           retcode = proc_value.poll()
100           if retcode is None:
101               # process hasn't finished yet
102               pass
103           elif retcode == 0:
104               logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
105               del self.processes[dir_key]
106           else:
107               logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
108               
109   def __len__(self):
110       """
111       Return how many active rsync processes we currently have
112       
113       Call poll first to close finished processes.
114       """
115       return len(self.processes)
116   
117   def keys(self):
118       """
119       Return list of current run folder names
120       """
121       return self.processes.keys()
122
123 class CopierBot(rpc.XmlRpcBot):
124     def __init__(self, section=None, configfile=None):
125         #if configfile is None:
126         #    configfile = '~/.gaworkflow'
127             
128         super(CopierBot, self).__init__(section, configfile)
129         
130         # options for rsync command
131         self.cfg['rsync_password_file'] = None
132         self.cfg['rsync_source'] = None
133         self.cfg['rsync_destination'] = None 
134         
135         # options for reporting we're done 
136         self.cfg['notify_users'] = None
137         self.cfg['notify_runner'] = None
138                             
139         self.pending = []
140         self.rsync = None
141         self.notify_users = None
142         self.notify_runner = None
143         
144         self.register_function(self.startCopy)
145         self.register_function(self.sequencingFinished)
146         self.eventTasks.append(self.update)
147         
148     def read_config(self, section=None, configfile=None):
149         """
150         read the config file
151         """
152         super(CopierBot, self).read_config(section, configfile)
153         
154         password = self._check_required_option('rsync_password_file')
155         source = self._check_required_option('rsync_source')
156         destination = self._check_required_option('rsync_destination')
157         self.rsync = rsync(source, destination, password)
158         
159         self.notify_users = self._parse_user_list(self.cfg['notify_users'])
160         try:
161           self.notify_runner = \
162              self._parse_user_list(self.cfg['notify_runner'],
163                                    require_resource=True)
164         except bot.JIDMissingResource:
165             msg = 'need a full jabber ID + resource for xml-rpc destinations'
166             logging.FATAL(msg)
167             raise bot.JIDMissingResource(msg)
168
169     def startCopy(self, *args):
170         """
171         start our copy
172         """
173         logging.info("starting copy scan")
174         started = self.rsync.copy()
175         logging.info("copying:" + " ".join(started)+".")
176         return started
177         
178     def sequencingFinished(self, runDir, *args):
179         """
180         The run was finished, if we're done copying, pass the message on        
181         """
182         # close any open processes
183         self.rsync.poll()
184         
185         # see if we're still copying
186         if runfolder_validate(runDir):
187             if runDir in self.rsync.keys():
188                 # still copying
189                 self.pending.append(runDir)
190                 logging.info("finished sequencing, but still copying" % (runDir))
191                 return "PENDING"
192             else:
193                 # we're done
194                 self.reportSequencingFinished(runDir)
195                 logging.info("finished sequencing %s" % (runDir))
196                 return "DONE"
197         else:
198             errmsg = "received bad runfolder name (%s)" % (runDir)
199             logging.warning(errmsg)
200             # maybe I should use a different error message
201             raise RuntimeError(errmsg)
202     
203     def reportSequencingFinished(self, runDir):
204         """
205         Send the sequencingFinished message to the interested parties
206         """
207         if self.notify_users is not None:
208             for u in self.notify_users:
209                 self.send(u, 'Sequencing run %s finished' % (runDir))
210         if self.notify_runner is not None:
211             for r in self.notify_runner:
212                 self.rpc_send(r, (runDir,), 'sequencingFinished')
213         logging.info("forwarding sequencingFinshed message for %s" % (runDir))
214         
215     def update(self, *args):
216         """
217         Update our current status.
218         Report if we've finished copying files.
219         """
220         self.rsync.poll()
221         for p in self.pending:
222             if p not in self.rsync.keys():
223                 self.reportSequencingFinished(p)
224                 self.pending.remove(p)
225         
226     def _parser(self, msg, who):
227         """
228         Parse xmpp chat messages
229         """
230         help = u"I can [copy], or report current [status]"
231         if re.match(u"help", msg):
232             reply = help
233         elif re.match("copy", msg):            
234             started = self.startCopy()
235             reply = u"started copying " + ", ".join(started)
236         elif re.match(u"status", msg):
237             msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
238             for d in self.rsync.keys():
239               msg.append(u"  " + d)
240             reply = os.linesep.join(msg)
241         else:
242             reply = u"I didn't understand '%s'" % (unicode(msg))
243         return reply
244
245 def main(args=None):
246     bot = CopierBot()
247     bot.main(args)
248     
249 if __name__ == "__main__":
250   sys.exit(main(sys.argv[1:]))
251