split rsync directory listing correctly
[htsworkflow.git] / gaworkflow / copier.py
index 8f46836a53a93cb4e4a24bc83411ccf76957cc35..f59eafb9a1eb851f7e3237413b3ba1b3dcf38dca 100644 (file)
@@ -32,13 +32,19 @@ class rsync(object):
 
   def list(self):
     """Get a directory listing"""
-    dirs_to_copy = []
     args = copy.copy(self.cmd)
     args.append(self.source_base)
 
     logging.debug("Rsync cmd:" + " ".join(args))
     short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
-    direntries = [ x.split() for x in short_process.stdout ]
+    return self.list_parser(short_process.stdout)
+
+  def list_filter(self, lines):
+    """
+    parse rsync directory listing
+    """
+    dirs_to_copy = []
+    direntries = [ x[0:42].split() + [x[43:]] for x in lines ]
     for permissions, size, filedate, filetime, filename in direntries:
       if permissions[0] == 'd':
         # hey its a directory, the first step to being something we want to 
@@ -136,7 +142,7 @@ class CopierBot(rpc.XmlRpcBot):
         self.notify_runner = None
         
         self.register_function(self.startCopy)
-        self.register_function(self.runFinished)
+        self.register_function(self.sequencingFinished)
         self.eventTasks.append(self.update)
         
     def read_config(self, section=None, configfile=None):
@@ -145,21 +151,20 @@ class CopierBot(rpc.XmlRpcBot):
         """
         super(CopierBot, self).read_config(section, configfile)
         
-        def check_option(name):
-            if self.cfg[name] is None:
-                errmsg="Please specify %s in the configfile" % (name)
-                logging.fatal(errmsg)
-                raise RuntimeError(errmsg)
-            else:
-                return self.cfg[name]
-            
-        password = check_option('rsync_password_file')
-        source = check_option('rsync_source')
-        destination = check_option('rsync_destination')
+        password = self._check_required_option('rsync_password_file')
+        source = self._check_required_option('rsync_source')
+        destination = self._check_required_option('rsync_destination')
         self.rsync = rsync(source, destination, password)
         
         self.notify_users = self._parse_user_list(self.cfg['notify_users'])
-        self.notify_runner = self._parse_user_list(self.cfg['notify_runner'])
+        try:
+          self.notify_runner = \
+             self._parse_user_list(self.cfg['notify_runner'],
+                                   require_resource=True)
+        except bot.JIDMissingResource:
+            msg = 'need a full jabber ID + resource for xml-rpc destinations'
+            logging.FATAL(msg)
+            raise bot.JIDMissingResource(msg)
 
     def startCopy(self, *args):
         """
@@ -170,7 +175,7 @@ class CopierBot(rpc.XmlRpcBot):
         logging.info("copying:" + " ".join(started)+".")
         return started
         
-    def runFinished(self, runDir, *args):
+    def sequencingFinished(self, runDir, *args):
         """
         The run was finished, if we're done copying, pass the message on        
         """
@@ -182,12 +187,12 @@ class CopierBot(rpc.XmlRpcBot):
             if runDir in self.rsync.keys():
                 # still copying
                 self.pending.append(runDir)
-                logging.info("%s finished, but still copying" % (runDir))
+                logging.info("finished sequencing, but still copying" % (runDir))
                 return "PENDING"
             else:
                 # we're done
-                self.reportRunFinished(runDir)
-                logging.info("%s finished" % (runDir))
+                self.reportSequencingFinished(runDir)
+                logging.info("finished sequencing %s" % (runDir))
                 return "DONE"
         else:
             errmsg = "received bad runfolder name (%s)" % (runDir)
@@ -195,17 +200,17 @@ class CopierBot(rpc.XmlRpcBot):
             # maybe I should use a different error message
             raise RuntimeError(errmsg)
     
-    def reportRunFinished(self, runDir):
+    def reportSequencingFinished(self, runDir):
         """
-        Send the runFinished message to the interested parties
+        Send the sequencingFinished message to the interested parties
         """
         if self.notify_users is not None:
             for u in self.notify_users:
-                self.send(u, 'run %s finished' % (runDir))
+                self.send(u, 'Sequencing run %s finished' % (runDir))
         if self.notify_runner is not None:
             for r in self.notify_runner:
-                rpc.send(self.cl, self.runner, (runDir,), 'runFinished')
-        logging.info("forwarding runFinshed message for %s" % (runDir))
+                self.rpc_send(r, (runDir,), 'sequencingFinished')
+        logging.info("forwarding sequencingFinshed message for %s" % (runDir))
         
     def update(self, *args):
         """
@@ -215,7 +220,7 @@ class CopierBot(rpc.XmlRpcBot):
         self.rsync.poll()
         for p in self.pending:
             if p not in self.rsync.keys():
-                self.reportRunFinished(p)
+                self.reportSequencingFinished(p)
                 self.pending.remove(p)
         
     def _parser(self, msg, who):
@@ -234,7 +239,7 @@ class CopierBot(rpc.XmlRpcBot):
               msg.append(u"  " + d)
             reply = os.linesep.join(msg)
         else:
-            reply = u"I didn't understand '%s'"+os.linesep+help % (unicode(msg))
+            reply = u"I didn't understand '%s'" % (unicode(msg))
         return reply
 
 def main(args=None):