[project @ moved check_option into benderjab]
[htsworkflow.git] / gaworkflow / copier.py
1 import ConfigParser
2 import copy
3 import logging
4 import logging.handlers
5 import os
6 import re
7 import subprocess
8 import sys
9 import time
10 import traceback
11
12 from benderjab import rpc
13
14 def runfolder_validate(fname):
15     """
16     Return True if fname looks like a runfolder name
17     """
18     if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
19         return True
20     else:
21         return False
22     
23 class rsync(object):
24   def __init__(self, source, dest, pwfile):
25     self.pwfile = os.path.expanduser(pwfile)
26     self.cmd = ['/usr/bin/rsync', ]
27     self.cmd.append('--password-file=%s' % (self.pwfile))
28     self.source_base = source
29     self.dest_base = dest
30     self.processes = {}
31     self.exit_code = None
32
33   def list(self):
34     """Get a directory listing"""
35     dirs_to_copy = []
36     args = copy.copy(self.cmd)
37     args.append(self.source_base)
38
39     logging.debug("Rsync cmd:" + " ".join(args))
40     short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
41     direntries = [ x.split() for x in short_process.stdout ]
42     for permissions, size, filedate, filetime, filename in direntries:
43       if permissions[0] == 'd':
44         # hey its a directory, the first step to being something we want to 
45         # copy
46         if re.match("[0-9]{6}", filename):
47           # it starts with something that looks like a 6 digit date
48           # aka good enough for me
49           dirs_to_copy.append(filename)
50     return dirs_to_copy
51
52   def create_copy_process(self, dirname):
53     args = copy.copy(self.cmd)
54     # we want to copy everything
55     args.append('-rlt') 
56     # from here
57     args.append(os.path.join(self.source_base, dirname))
58     # to here
59     args.append(self.dest_base)
60     logging.debug("Rsync cmd:" + " ".join(args))
61     return subprocess.Popen(args)
62  
63   def copy(self):
64     """
65     copy any interesting looking directories over
66     return list of items that we started copying.
67     """
68     # clean up any lingering non-running processes
69     self.poll()
70     
71     # what's available to copy?
72     dirs_to_copy = self.list()
73     
74     # lets start copying
75     started = []
76     for d in dirs_to_copy:
77       process = self.processes.get(d, None)
78       
79       if process is None:
80         # we don't have a process, so make one
81         logging.info("rsyncing %s" % (d))
82         self.processes[d] = self.create_copy_process(d)
83         started.append(d)           
84     return started
85       
86   def poll(self):
87       """
88       check currently running processes to see if they're done
89       
90       return path roots that have finished.
91       """
92       for dir_key, proc_value in self.processes.items():
93           retcode = proc_value.poll()
94           if retcode is None:
95               # process hasn't finished yet
96               pass
97           elif retcode == 0:
98               logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
99               del self.processes[dir_key]
100           else:
101               logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
102               
103   def __len__(self):
104       """
105       Return how many active rsync processes we currently have
106       
107       Call poll first to close finished processes.
108       """
109       return len(self.processes)
110   
111   def keys(self):
112       """
113       Return list of current run folder names
114       """
115       return self.processes.keys()
116
117 class CopierBot(rpc.XmlRpcBot):
118     def __init__(self, section=None, configfile=None):
119         #if configfile is None:
120         #    configfile = '~/.gaworkflow'
121             
122         super(CopierBot, self).__init__(section, configfile)
123         
124         # options for rsync command
125         self.cfg['rsync_password_file'] = None
126         self.cfg['rsync_source'] = None
127         self.cfg['rsync_destination'] = None 
128         
129         # options for reporting we're done 
130         self.cfg['notify_users'] = None
131         self.cfg['notify_runner'] = None
132                             
133         self.pending = []
134         self.rsync = None
135         self.notify_users = None
136         self.notify_runner = None
137         
138         self.register_function(self.startCopy)
139         self.register_function(self.runFinished)
140         self.eventTasks.append(self.update)
141         
142     def read_config(self, section=None, configfile=None):
143         """
144         read the config file
145         """
146         super(CopierBot, self).read_config(section, configfile)
147         
148         password = self._check_required_option('rsync_password_file')
149         source = self._check_required_option('rsync_source')
150         destination = self._check_required_option('rsync_destination')
151         self.rsync = rsync(source, destination, password)
152         
153         self.notify_users = self._parse_user_list(self.cfg['notify_users'])
154         self.notify_runner = self._parse_user_list(self.cfg['notify_runner'])
155
156     def startCopy(self, *args):
157         """
158         start our copy
159         """
160         logging.info("starting copy scan")
161         started = self.rsync.copy()
162         logging.info("copying:" + " ".join(started)+".")
163         return started
164         
165     def runFinished(self, runDir, *args):
166         """
167         The run was finished, if we're done copying, pass the message on        
168         """
169         # close any open processes
170         self.rsync.poll()
171         
172         # see if we're still copying
173         if runfolder_validate(runDir):
174             if runDir in self.rsync.keys():
175                 # still copying
176                 self.pending.append(runDir)
177                 logging.info("%s finished, but still copying" % (runDir))
178                 return "PENDING"
179             else:
180                 # we're done
181                 self.reportRunFinished(runDir)
182                 logging.info("%s finished" % (runDir))
183                 return "DONE"
184         else:
185             errmsg = "received bad runfolder name (%s)" % (runDir)
186             logging.warning(errmsg)
187             # maybe I should use a different error message
188             raise RuntimeError(errmsg)
189     
190     def reportRunFinished(self, runDir):
191         """
192         Send the runFinished message to the interested parties
193         """
194         if self.notify_users is not None:
195             for u in self.notify_users:
196                 self.send(u, 'run %s finished' % (runDir))
197         if self.notify_runner is not None:
198             for r in self.notify_runner:
199                 rpc.send(self.cl, self.runner, (runDir,), 'runFinished')
200         logging.info("forwarding runFinshed message for %s" % (runDir))
201         
202     def update(self, *args):
203         """
204         Update our current status.
205         Report if we've finished copying files.
206         """
207         self.rsync.poll()
208         for p in self.pending:
209             if p not in self.rsync.keys():
210                 self.reportRunFinished(p)
211                 self.pending.remove(p)
212         
213     def _parser(self, msg, who):
214         """
215         Parse xmpp chat messages
216         """
217         help = u"I can [copy], or report current [status]"
218         if re.match(u"help", msg):
219             reply = help
220         elif re.match("copy", msg):            
221             started = self.startCopy()
222             reply = u"started copying " + ", ".join(started)
223         elif re.match(u"status", msg):
224             msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
225             for d in self.rsync.keys():
226               msg.append(u"  " + d)
227             reply = os.linesep.join(msg)
228         else:
229             reply = u"I didn't understand '%s'"+os.linesep+help % (unicode(msg))
230         return reply
231
232 def main(args=None):
233     bot = CopierBot()
234     bot.main(args)
235     
236 if __name__ == "__main__":
237   sys.exit(main(sys.argv[1:]))
238