--- /dev/null
+"""
+Run up to N simultanous jobs from provided of commands
+"""
+
+import logging
+from subprocess import PIPE
+import subprocess
+import select
+import sys
+
+class QueueCommands(object):
+ """
+ Queue up N commands from cmd_list, launching more jobs as the first
+ finish.
+ """
+
+ def __init__(self, cmd_list, N=0):
+ """
+ cmd_list is a list of elements suitable for subprocess
+ N is the number of simultanious processes to run.
+ 0 is all of them.
+
+ WARNING: this will not work on windows
+ (It depends on being able to pass local file descriptors to the
+ select call with isn't supported by the Win32 API)
+ """
+ self.to_run = cmd_list[:]
+ self.running = {}
+ self.N = N
+
+ def under_process_limit(self):
+ """
+ are we still under the total number of allowable jobs?
+ """
+ if self.N == 0:
+ return True
+
+ if len(self.running) < self.N:
+ return True
+
+ return False
+
+ def start_jobs(self):
+ """
+ Launch jobs until we have the maximum allowable running
+ (or have run out of jobs)
+ """
+ queue_log = logging.getLogger('queue')
+
+ while (len(self.to_run) > 0) and self.under_process_limit():
+ cmd = self.to_run.pop(0)
+ p = subprocess.Popen(cmd, stdout=PIPE)
+ self.running[p.stdout] = p
+ queue_log.info("Created process %d from %s" % (p.pid, str(cmd)))
+
+ def run(self):
+ """
+ run up to N jobs until we run out of jobs
+ """
+ queue_log = logging.getLogger('queue')
+
+ # to_run slowly gets consumed by start_jobs
+ while len(self.to_run) > 0 or len(self.running) > 0:
+ # fill any empty spots in our job queue
+ self.start_jobs()
+
+ # build a list of file descriptors
+ # fds=file desciptors
+ fds = [ x.stdout for x in self.running.values()]
+
+ # wait for something to finish
+ # wl= write list, xl=exception list (not used so get bad names)
+ read_list, wl, xl = select.select(fds, [], fds)
+
+ # for everything that might have finished...
+ for pending_fd in read_list:
+ pending = self.running[pending_fd]
+ # if it really did finish, remove it from running jobs
+ if pending.poll() is not None:
+ queue_log.info("Process %d finished" % (pending.pid,))
+ del self.running[pending_fd]
+
--- /dev/null
+import os
+import logging
+import time
+import unittest
+
+
+from gaworkflow.util.queuecommands import QueueCommands
+
+class testQueueCommands(unittest.TestCase):
+ def setUp(self):
+ logging.basicConfig(level=logging.DEBUG,
+ format='%(asctime)s %(name)-8s %(message)s')
+
+
+
+ def test_unlimited_run(self):
+ """
+ Run everything at once
+ """
+ cmds = [['/bin/sleep', '0'],
+ ['/bin/sleep', '1'],
+ ['/bin/sleep', '2'],]
+
+ q = QueueCommands(cmds)
+ start = time.time()
+ q.run()
+ end = time.time()-start
+ # we should only take the length of the longest sleep
+ self.failUnless( end > 1.9 and end < 2.1,
+ "took %s seconds, exected ~5" % (end,))
+
+ def test_limited_run(self):
+ """
+ Run a limited number of jobs
+ """
+ cmds = [['/bin/sleep', '1'],
+ ['/bin/sleep', '2'],
+ ['/bin/sleep', '3'],]
+
+ q = QueueCommands(cmds, 2)
+
+ start = time.time()
+ q.run()
+ end = time.time()-start
+ self.failUnless( end > 3.9 and end < 4.1,
+ "took %s seconds, expected ~6" % (end,))
+
+def suite():
+ return unittest.makeSuite(testQueueCommands, 'test')
+
+if __name__ == "__main__":
+ unittest.main(defaultTest='suite')
+
+
+
+