Initial port to python3
[htsworkflow.git] / htsworkflow / automation / copier.py
1 import configparser
2 import copy
3 import logging
4 import logging.handlers
5 import os
6 import re
7 import shlex
8 import subprocess
9 import sys
10 import time
11 import traceback
12 import urllib.parse
13
14 from benderjab import rpc
15
16 from htsworkflow.automation.solexa import is_runfolder
17
18 LOGGER = logging.getLogger(__name__)
19
20 class rsync(object):
21   def __init__(self, sources, dest, pwfile):
22     self.cmd = ['/usr/bin/rsync', ]
23     self.pwfile = os.path.expanduser(pwfile)
24     self.cmd.append('--password-file=%s' % (self.pwfile))
25     self.source_base_list = [ self._normalize_rsync_source(x) for x in sources]
26     self.dest_base = dest
27     self.processes = {}
28     self.exit_code = None
29
30   def list(self):
31     """
32     Get a directory listing for all our sources
33     """
34     LOGGER.debug("searching for entries in: %s" % (self.source_base_list,))
35     entries = []
36     for source in self.source_base_list:
37         LOGGER.debug("Scanning %s" % (source,))
38         args = copy.copy(self.cmd)
39         args.append(source)
40
41         LOGGER.debug("Rsync cmd:" + " ".join(args))
42         short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
43         exit_code = short_process.wait()
44         stdout = short_process.stdout
45         # We made sure source ends in a / earlier
46         cur_list = [ source+subdir for subdir in self.list_filter(stdout)]
47         entries.extend(cur_list)
48     LOGGER.debug("Found the following: %s" % (str(entries)))
49     return entries
50
51   def list_filter(self, lines):
52     """
53     parse rsync directory listing
54     """
55     dirs_to_copy = []
56     direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ]
57     LOGGER.debug('direntries: %s' % (str(direntries),))
58     for permissions, size, filedate, filetime, filename in direntries:
59       if permissions[0] == 'd':
60         # hey its a directory, the first step to being something we want to
61         # copy
62         if re.match("[0-9]{6}", filename):
63           # it starts with something that looks like a 6 digit date
64           # aka good enough for me
65           dirs_to_copy.append(filename)
66     return dirs_to_copy
67
68   def create_copy_process(self, urlname):
69     args = copy.copy(self.cmd)
70     # args.append('--dry-run') # Makes testing easier
71     # we want to copy everything
72     args.append('-rlt')
73     # from here
74     args.append(urlname)
75     # to here
76     args.append(self.dest_base)
77     LOGGER.debug("Rsync cmd:" + " ".join(args))
78     return subprocess.Popen(args)
79
80   def copy(self, url_list=None):
81     """
82     copy any interesting looking directories over
83     return list of items that we started copying.
84     """
85     # clean up any lingering non-running processes
86     self.poll()
87
88     if url_list is None or len(url_list) == 0:
89             # what's available to copy?
90         dirs_to_copy = self.list()
91     else:
92         dirs_to_copy = url_list
93
94     LOGGER.info("dirs to copy %s" % (dirs_to_copy,))
95
96     # lets start copying
97     started = []
98     for d in dirs_to_copy:
99       process = self.processes.get(d, None)
100
101       if process is None:
102         # we don't have a process, so make one
103         LOGGER.info("rsyncing %s" % (d))
104         self.processes[d] = self.create_copy_process(d)
105         started.append(d)
106     return started
107
108   def _normalize_rsync_source(self, source):
109       """
110       Make sure that we have a reasonable looking source
111       a source must be a directory/collection.
112       """
113       # we must be a directory
114       if source[-1] != '/':
115         source += '/'
116       # I suppose we could check to see if we start with rsync:// or something
117       return source
118
119   def poll(self):
120       """
121       check currently running processes to see if they're done
122
123       return path roots that have finished.
124       """
125       for dir_key, proc_value in list(self.processes.items()):
126           retcode = proc_value.poll()
127           if retcode is None:
128               # process hasn't finished yet
129               pass
130           elif retcode == 0:
131               LOGGER.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
132               del self.processes[dir_key]
133           else:
134               LOGGER.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
135
136   def __len__(self):
137       """
138       Return how many active rsync processes we currently have
139
140       Call poll first to close finished processes.
141       """
142       return len(self.processes)
143
144   def keys(self):
145       """
146       Return list of current run folder names
147       """
148       return list(self.processes.keys())
149
150 class CopierBot(rpc.XmlRpcBot):
151     def __init__(self, section=None, configfile=None):
152         #if configfile is None:
153         #    configfile = '~/.htsworkflow'
154
155         super(CopierBot, self).__init__(section, configfile)
156
157         # options for rsync command
158         self.cfg['rsync_password_file'] = None
159         self.cfg['rsync_sources'] = None
160         self.cfg['rsync_destination'] = None
161
162         # options for reporting we're done
163         self.cfg['notify_users'] = None
164         self.cfg['notify_runner'] = None
165
166         self.pending = []
167         self.rsync = None
168         self.notify_users = None
169         self.notify_runner = None
170
171         self.register_function(self.startCopy)
172         self.register_function(self.sequencingFinished)
173         self.eventTasks.append(self.update)
174
175     def _init_rsync(self):
176         """
177         Initalize rsync class
178
179         This is only accessible for test purposes.
180         """
181         # we can't call any LOGGER function until after start finishes.
182         # this got moved to a seperate function from run to help with test code
183         if self.rsync is None:
184             self.rsync = rsync(self.sources, self.destination, self.password)
185
186     def read_config(self, section=None, configfile=None):
187         """
188         read the config file
189         """
190         super(CopierBot, self).read_config(section, configfile)
191
192         self.sources = shlex.split(self._check_required_option('rsync_sources'))
193         self.password = self._check_required_option('rsync_password_file')
194         self.destination = self._check_required_option('rsync_destination')
195
196         self.notify_users = self._parse_user_list(self.cfg['notify_users'])
197         try:
198           self.notify_runner = \
199              self._parse_user_list(self.cfg['notify_runner'],
200                                    require_resource=True)
201         except bot.JIDMissingResource:
202             msg = 'need a full jabber ID + resource for xml-rpc destinations'
203             print(msg, file=sys.stderr)
204             raise bot.JIDMissingResource(msg)
205
206     def run(self):
207         """
208         Start application
209         """
210         self._init_rsync()
211         super(CopierBot, self).run()
212
213     def startCopy(self, *args):
214         """
215         start our copy
216         """
217         # Note, args comes in over the network, so don't trust it.
218         LOGGER.debug("Arguments to startCopy %s" % (str(args),))
219         copy_urls = []
220         for a in args:
221             clean_url = self.validate_url(a)
222             if clean_url is not None:
223                 copy_urls.append(clean_url)
224
225         LOGGER.info("Validated urls = %s" % (copy_urls,))
226         started = self.rsync.copy(copy_urls)
227         LOGGER.info("copying:" + " ".join(started)+".")
228         return started
229
230     def sequencingFinished(self, runDir, *args):
231         """
232         The run was finished, if we're done copying, pass the message on
233         """
234         # close any open processes
235         self.rsync.poll()
236
237         # see if we're still copying
238         if is_runfolder(runDir):
239             LOGGER.info("recevied sequencing finshed for %s" % (runDir))
240             self.pending.append(runDir)
241             self.startCopy()
242             return "PENDING"
243         else:
244             errmsg = "received bad runfolder name (%s)" % (runDir)
245             LOGGER.warning(errmsg)
246             # maybe I should use a different error message
247             raise RuntimeError(errmsg)
248
249     def reportSequencingFinished(self, runDir):
250         """
251         Send the sequencingFinished message to the interested parties
252         """
253         if self.notify_users is not None:
254             for u in self.notify_users:
255                 self.send(u, 'Sequencing run %s finished' % (runDir))
256         if self.notify_runner is not None:
257             for r in self.notify_runner:
258                 self.rpc_send(r, (runDir,), 'sequencingFinished')
259         LOGGER.info("forwarding sequencingFinshed message for %s" % (runDir))
260
261     def update(self, *args):
262         """
263         Update our current status.
264         Report if we've finished copying files.
265         """
266         self.rsync.poll()
267         for p in self.pending:
268             if p not in list(self.rsync.keys()):
269                 self.reportSequencingFinished(p)
270                 self.pending.remove(p)
271
272     def _parser(self, msg, who):
273         """
274         Parse xmpp chat messages
275         """
276         help = "I can [copy], or report current [status]"
277         if re.match("help", msg):
278             reply = help
279         elif re.match("copy", msg):
280             started = self.startCopy()
281             reply = "started copying " + ", ".join(started)
282         elif re.match("status", msg):
283             msg = ["Currently %d rsync processes are running." % (len(self.rsync))]
284             for d in list(self.rsync.keys()):
285               msg.append("  " + d)
286             reply = os.linesep.join(msg)
287         else:
288             reply = "I didn't understand '%s'" % (str(msg))
289         return reply
290
291     def validate_url(self, url):
292         user_url = urllib.parse.urlsplit(url)
293         user_scheme = user_url[0]
294         user_netloc = user_url[1]
295         user_path = user_url[2]
296
297         for source in self.sources:
298             source_url = urllib.parse.urlsplit(source)
299             source_scheme = source_url[0]
300             source_netloc = source_url[1]
301             source_path = source_url[2]
302             if (user_scheme == source_scheme) and \
303                (user_netloc == source_netloc) and \
304                (user_path.startswith(source_path)):
305                return url
306         return None
307
308 def main(args=None):
309     bot = CopierBot()
310     bot.main(args)
311
312 if __name__ == "__main__":
313   sys.exit(main(sys.argv[1:]))
314