From: Diane Trout Date: Tue, 17 Jun 2008 00:25:03 +0000 (+0000) Subject: Add QueueCommands, a class that allows controlling how many X-Git-Tag: stanford.caltech-merged-database-2009-jan-15~52 X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=commitdiff_plain;h=ccf96d586b71dec1f4d98f38e810134cd448a3f1 Add QueueCommands, a class that allows controlling how many processes to run simultaniously. I still need to and a driver script to handle getting jobs from the user. It's mostly in so I can control launching the solexa2srf commands for submitting stuff to the SRA. --- diff --git a/gaworkflow/util/queuecommands.py b/gaworkflow/util/queuecommands.py new file mode 100644 index 0000000..4873a51 --- /dev/null +++ b/gaworkflow/util/queuecommands.py @@ -0,0 +1,82 @@ +""" +Run up to N simultanous jobs from provided of commands +""" + +import logging +from subprocess import PIPE +import subprocess +import select +import sys + +class QueueCommands(object): + """ + Queue up N commands from cmd_list, launching more jobs as the first + finish. + """ + + def __init__(self, cmd_list, N=0): + """ + cmd_list is a list of elements suitable for subprocess + N is the number of simultanious processes to run. + 0 is all of them. + + WARNING: this will not work on windows + (It depends on being able to pass local file descriptors to the + select call with isn't supported by the Win32 API) + """ + self.to_run = cmd_list[:] + self.running = {} + self.N = N + + def under_process_limit(self): + """ + are we still under the total number of allowable jobs? + """ + if self.N == 0: + return True + + if len(self.running) < self.N: + return True + + return False + + def start_jobs(self): + """ + Launch jobs until we have the maximum allowable running + (or have run out of jobs) + """ + queue_log = logging.getLogger('queue') + + while (len(self.to_run) > 0) and self.under_process_limit(): + cmd = self.to_run.pop(0) + p = subprocess.Popen(cmd, stdout=PIPE) + self.running[p.stdout] = p + queue_log.info("Created process %d from %s" % (p.pid, str(cmd))) + + def run(self): + """ + run up to N jobs until we run out of jobs + """ + queue_log = logging.getLogger('queue') + + # to_run slowly gets consumed by start_jobs + while len(self.to_run) > 0 or len(self.running) > 0: + # fill any empty spots in our job queue + self.start_jobs() + + # build a list of file descriptors + # fds=file desciptors + fds = [ x.stdout for x in self.running.values()] + + # wait for something to finish + # wl= write list, xl=exception list (not used so get bad names) + read_list, wl, xl = select.select(fds, [], fds) + + # for everything that might have finished... + for pending_fd in read_list: + pending = self.running[pending_fd] + # if it really did finish, remove it from running jobs + if pending.poll() is not None: + queue_log.info("Process %d finished" % (pending.pid,)) + del self.running[pending_fd] + diff --git a/gaworkflow/util/test/test_queuecommands.py b/gaworkflow/util/test/test_queuecommands.py new file mode 100644 index 0000000..940ddbe --- /dev/null +++ b/gaworkflow/util/test/test_queuecommands.py @@ -0,0 +1,56 @@ +import os +import logging +import time +import unittest + + +from gaworkflow.util.queuecommands import QueueCommands + +class testQueueCommands(unittest.TestCase): + def setUp(self): + logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s %(name)-8s %(message)s') + + + + def test_unlimited_run(self): + """ + Run everything at once + """ + cmds = [['/bin/sleep', '0'], + ['/bin/sleep', '1'], + ['/bin/sleep', '2'],] + + q = QueueCommands(cmds) + start = time.time() + q.run() + end = time.time()-start + # we should only take the length of the longest sleep + self.failUnless( end > 1.9 and end < 2.1, + "took %s seconds, exected ~5" % (end,)) + + def test_limited_run(self): + """ + Run a limited number of jobs + """ + cmds = [['/bin/sleep', '1'], + ['/bin/sleep', '2'], + ['/bin/sleep', '3'],] + + q = QueueCommands(cmds, 2) + + start = time.time() + q.run() + end = time.time()-start + self.failUnless( end > 3.9 and end < 4.1, + "took %s seconds, expected ~6" % (end,)) + +def suite(): + return unittest.makeSuite(testQueueCommands, 'test') + +if __name__ == "__main__": + unittest.main(defaultTest='suite') + + + +