Allow overriding the queued commands environment.
[htsworkflow.git] / htsworkflow / util / queuecommands.py
index 0426fe68fa67cdfe8521b988a3e4b7f24f07fd7a..23fff168e5337084804758569c884b6084da168b 100644 (file)
@@ -3,6 +3,7 @@ Run up to N simultanous jobs from provided of commands
 """
 
 import logging
+import os
 from subprocess import PIPE
 import subprocess
 import select
@@ -15,7 +16,7 @@ class QueueCommands(object):
     finish.
     """
 
-    def __init__(self, cmd_list, N=0, cwd=None):
+    def __init__(self, cmd_list, N=0, cwd=None, env=None):
         """
         cmd_list is a list of elements suitable for subprocess
         N is the  number of simultanious processes to run. 
@@ -29,6 +30,7 @@ class QueueCommands(object):
         self.running = {}
         self.N = N
         self.cwd = cwd
+        self.env = env
 
     def under_process_limit(self):
         """
@@ -48,12 +50,15 @@ class QueueCommands(object):
         (or have run out of jobs)
         """
         queue_log = logging.getLogger('queue')
-        queue_log.debug('using %s as cwd' % (self.cwd,))
 
         while (len(self.to_run) > 0) and self.under_process_limit():
             queue_log.info('%d left to run', len(self.to_run))
             cmd = self.to_run.pop(0)
-            p = subprocess.Popen(cmd, stdout=PIPE, cwd=self.cwd, shell=True)
+            p = subprocess.Popen(cmd, 
+                                 stdout=PIPE, 
+                                 shell=True, 
+                                 cwd=self.cwd, 
+                                 env=self.env)
             self.running[p.stdout] = p
             queue_log.info("Created process %d from %s" % (p.pid, str(cmd)))
 
@@ -62,6 +67,7 @@ class QueueCommands(object):
         run up to N jobs until we run out of jobs
         """
         queue_log = logging.getLogger('queue')
+        queue_log.debug('using %s as cwd' % (self.cwd,))
 
         # to_run slowly gets consumed by start_jobs
         while len(self.to_run) > 0 or len(self.running) > 0:
@@ -74,8 +80,8 @@ class QueueCommands(object):
 
             # wait for something to finish
             # wl= write list, xl=exception list (not used so get bad names)
-            read_list, wl, xl = select.select(fds, [], fds)
-        
+            read_list, wl, xl = select.select(fds, [], fds, 1 )
+
             # for everything that might have finished...
             for pending_fd in read_list:
                 pending = self.running[pending_fd]
@@ -84,4 +90,10 @@ class QueueCommands(object):
                     queue_log.info("Process %d finished [%d]",
                                    pending.pid, pending.returncode)
                     del self.running[pending_fd]
+                else:
+                    # It's still running, but there's some output
+                    buffer = pending_fd.readline()
+                    buffer = buffer.strip()
+                    msg = "%d:(%d) %s" %(pending.pid, len(buffer), buffer)
+                    logging.debug(msg)
             time.sleep(1)