Add QueueCommands, a class that allows controlling how many
authorDiane Trout <diane@caltech.edu>
Tue, 17 Jun 2008 00:25:03 +0000 (00:25 +0000)
committerDiane Trout <diane@caltech.edu>
Tue, 17 Jun 2008 00:25:03 +0000 (00:25 +0000)
processes to run simultaniously.

I still need to and a driver script to handle getting jobs from the
user.

It's mostly in so I can control launching the solexa2srf commands for
submitting stuff to the SRA.

gaworkflow/util/queuecommands.py [new file with mode: 0644]
gaworkflow/util/test/test_queuecommands.py [new file with mode: 0644]

diff --git a/gaworkflow/util/queuecommands.py b/gaworkflow/util/queuecommands.py
new file mode 100644 (file)
index 0000000..4873a51
--- /dev/null
@@ -0,0 +1,82 @@
+"""
+Run up to N simultanous jobs from provided of commands 
+"""
+
+import logging
+from subprocess import PIPE
+import subprocess
+import select
+import sys
+
+class QueueCommands(object):
+    """
+    Queue up N commands from cmd_list, launching more jobs as the first
+    finish.
+    """
+
+    def __init__(self, cmd_list, N=0):
+        """
+        cmd_list is a list of elements suitable for subprocess
+        N is the  number of simultanious processes to run. 
+        0 is all of them.
+        
+        WARNING: this will not work on windows
+        (It depends on being able to pass local file descriptors to the 
+        select call with isn't supported by the Win32 API)
+        """
+        self.to_run = cmd_list[:]
+        self.running = {}
+        self.N = N
+
+    def under_process_limit(self):
+        """
+        are we still under the total number of allowable jobs?
+        """
+        if self.N == 0:
+            return True
+
+        if len(self.running) < self.N:
+            return True
+
+        return False
+
+    def start_jobs(self):
+        """
+        Launch jobs until we have the maximum allowable running
+        (or have run out of jobs)
+        """
+        queue_log = logging.getLogger('queue')
+
+        while (len(self.to_run) > 0) and self.under_process_limit():
+            cmd = self.to_run.pop(0)
+            p = subprocess.Popen(cmd, stdout=PIPE)
+            self.running[p.stdout] = p
+            queue_log.info("Created process %d from %s" % (p.pid, str(cmd)))
+
+    def run(self):
+        """
+        run up to N jobs until we run out of jobs
+        """
+        queue_log = logging.getLogger('queue')
+
+        # to_run slowly gets consumed by start_jobs
+        while len(self.to_run) > 0 or len(self.running) > 0:
+            # fill any empty spots in our job queue
+            self.start_jobs()
+
+            # build a list of file descriptors
+            # fds=file desciptors
+            fds = [ x.stdout for x in self.running.values()]
+
+            # wait for something to finish
+            # wl= write list, xl=exception list (not used so get bad names)
+            read_list, wl, xl = select.select(fds, [], fds)
+        
+            # for everything that might have finished...
+            for pending_fd in read_list:
+                pending = self.running[pending_fd]
+                # if it really did finish, remove it from running jobs
+                if pending.poll() is not None:
+                    queue_log.info("Process %d finished" % (pending.pid,))
+                    del self.running[pending_fd]
+
diff --git a/gaworkflow/util/test/test_queuecommands.py b/gaworkflow/util/test/test_queuecommands.py
new file mode 100644 (file)
index 0000000..940ddbe
--- /dev/null
@@ -0,0 +1,56 @@
+import os
+import logging
+import time
+import unittest
+
+
+from gaworkflow.util.queuecommands import QueueCommands
+
+class testQueueCommands(unittest.TestCase):
+    def setUp(self):
+        logging.basicConfig(level=logging.DEBUG,
+                            format='%(asctime)s %(name)-8s %(message)s')
+
+       
+
+    def test_unlimited_run(self):
+        """
+        Run everything at once
+        """
+        cmds = [['/bin/sleep', '0'],
+                ['/bin/sleep', '1'],
+                ['/bin/sleep', '2'],]
+
+        q = QueueCommands(cmds)
+        start = time.time()
+        q.run()
+        end = time.time()-start
+        # we should only take the length of the longest sleep
+        self.failUnless( end > 1.9 and end < 2.1,
+                         "took %s seconds, exected ~5" % (end,))
+
+    def test_limited_run(self):
+        """
+        Run a limited number of jobs
+        """
+        cmds = [['/bin/sleep', '1'],
+                ['/bin/sleep', '2'],
+                ['/bin/sleep', '3'],]
+
+        q = QueueCommands(cmds, 2)
+
+        start = time.time()
+        q.run()
+        end = time.time()-start
+        self.failUnless( end > 3.9 and end < 4.1,
+                         "took %s seconds, expected ~6" % (end,)) 
+
+def suite():
+    return unittest.makeSuite(testQueueCommands, 'test')
+
+if __name__ == "__main__":
+    unittest.main(defaultTest='suite')
+
+
+
+