Add queuecommand to htswcommon,
authorDiane Trout <diane@caltech.edu>
Thu, 28 Aug 2008 22:17:56 +0000 (22:17 +0000)
committerDiane Trout <diane@caltech.edu>
Thu, 28 Aug 2008 22:17:56 +0000 (22:17 +0000)
add tests for ethelp and queuecommand, though nose will only work
if run from within the htswcommon directory

htswcommon/htswcommon/util/queuecommands.py [new file with mode: 0644]
htswcommon/htswcommon/util/test/test_ethelp.py [new file with mode: 0644]
htswcommon/htswcommon/util/test/test_queuecommands.py [new file with mode: 0644]

diff --git a/htswcommon/htswcommon/util/queuecommands.py b/htswcommon/htswcommon/util/queuecommands.py
new file mode 100644 (file)
index 0000000..0c34292
--- /dev/null
@@ -0,0 +1,84 @@
+"""
+Run up to N simultanous jobs from provided of commands 
+"""
+
+import logging
+from subprocess import PIPE
+import subprocess
+import select
+import sys
+
+class QueueCommands(object):
+    """
+    Queue up N commands from cmd_list, launching more jobs as the first
+    finish.
+    """
+
+    def __init__(self, cmd_list, N=0, cwd=None):
+        """
+        cmd_list is a list of elements suitable for subprocess
+        N is the  number of simultanious processes to run. 
+        0 is all of them.
+        
+        WARNING: this will not work on windows
+        (It depends on being able to pass local file descriptors to the 
+        select call with isn't supported by the Win32 API)
+        """
+        self.to_run = cmd_list[:]
+        self.running = {}
+        self.N = N
+        self.cwd = cwd
+
+    def under_process_limit(self):
+        """
+        are we still under the total number of allowable jobs?
+        """
+        if self.N == 0:
+            return True
+
+        if len(self.running) < self.N:
+            return True
+
+        return False
+
+    def start_jobs(self):
+        """
+        Launch jobs until we have the maximum allowable running
+        (or have run out of jobs)
+        """
+        queue_log = logging.getLogger('queue')
+        queue_log.info('using %s as cwd' % (self.cwd,))
+
+        while (len(self.to_run) > 0) and self.under_process_limit():
+            cmd = self.to_run.pop(0)
+            p = subprocess.Popen(cmd, stdout=PIPE, cwd=self.cwd, shell=True)
+            self.running[p.stdout] = p
+            queue_log.info("Created process %d from %s" % (p.pid, str(cmd)))
+
+    def run(self):
+        """
+        run up to N jobs until we run out of jobs
+        """
+        queue_log = logging.getLogger('queue')
+
+        # to_run slowly gets consumed by start_jobs
+        while len(self.to_run) > 0 or len(self.running) > 0:
+            # fill any empty spots in our job queue
+            self.start_jobs()
+
+            # build a list of file descriptors
+            # fds=file desciptors
+            fds = [ x.stdout for x in self.running.values()]
+
+            # wait for something to finish
+            # wl= write list, xl=exception list (not used so get bad names)
+            read_list, wl, xl = select.select(fds, [], fds)
+        
+            # for everything that might have finished...
+            for pending_fd in read_list:
+                pending = self.running[pending_fd]
+                # if it really did finish, remove it from running jobs
+                if pending.poll() is not None:
+                    queue_log.info("Process %d finished" % (pending.pid,))
+                    del self.running[pending_fd]
+
diff --git a/htswcommon/htswcommon/util/test/test_ethelp.py b/htswcommon/htswcommon/util/test/test_ethelp.py
new file mode 100644 (file)
index 0000000..5dbf307
--- /dev/null
@@ -0,0 +1,35 @@
+import os
+import unittest
+
+try:
+  from xml.etree import ElementTree
+except ImportError, e:
+  from elementtree import ElementTree
+
+from htswcommon.util.ethelp import indent, flatten
+
+class testETHelper(unittest.TestCase):
+    def setUp(self):
+        self.foo = '<foo><bar>asdf</bar><br/></foo>'
+        self.foo_tree = ElementTree.fromstring(self.foo)
+
+    def test_indent(self):
+        flat_foo = ElementTree.tostring(self.foo_tree)
+        self.failUnlessEqual(len(flat_foo.split('\n')), 1)
+
+        indent(self.foo_tree)
+        pretty_foo = ElementTree.tostring(self.foo_tree)
+        self.failUnlessEqual(len(pretty_foo.split('\n')), 5)
+
+    def test_flatten(self):
+        self.failUnless(flatten(self.foo_tree), 'asdf')
+
+def suite():
+    return unittest.makeSuite(testETHelper, 'test')
+
+if __name__ == "__main__":
+    unittest.main(defaultTest='suite')
+
+
+
+
diff --git a/htswcommon/htswcommon/util/test/test_queuecommands.py b/htswcommon/htswcommon/util/test/test_queuecommands.py
new file mode 100644 (file)
index 0000000..c7da419
--- /dev/null
@@ -0,0 +1,56 @@
+import os
+import logging
+import time
+import unittest
+
+
+from htswcommon.util.queuecommands import QueueCommands
+
+class testQueueCommands(unittest.TestCase):
+    def setUp(self):
+        logging.basicConfig(level=logging.DEBUG,
+                            format='%(asctime)s %(name)-8s %(message)s')
+
+       
+
+    def test_unlimited_run(self):
+        """
+        Run everything at once
+        """
+        cmds = ['/bin/sleep 0',
+                '/bin/sleep 1',
+                '/bin/sleep 2',]
+
+        q = QueueCommands(cmds)
+        start = time.time()
+        q.run()
+        end = time.time()-start
+        # we should only take the length of the longest sleep
+        self.failUnless( end > 1.9 and end < 2.1,
+                         "took %s seconds, exected ~5" % (end,))
+
+    def test_limited_run(self):
+        """
+        Run a limited number of jobs
+        """
+        cmds = ['/bin/sleep 1',
+                '/bin/sleep 2',
+                '/bin/sleep 3',]
+
+        q = QueueCommands(cmds, 2)
+
+        start = time.time()
+        q.run()
+        end = time.time()-start
+        self.failUnless( end > 3.9 and end < 4.1,
+                         "took %s seconds, expected ~6" % (end,)) 
+
+def suite():
+    return unittest.makeSuite(testQueueCommands, 'test')
+
+if __name__ == "__main__":
+    unittest.main(defaultTest='suite')
+
+
+
+