Add QueueCommands, a class that allows controlling how many
[htsworkflow.git] / gaworkflow / util / queuecommands.py
1 """
2 Run up to N simultanous jobs from provided of commands 
3 """
4
5 import logging
6 from subprocess import PIPE
7 import subprocess
8 import select
9 import sys
10
11 class QueueCommands(object):
12     """
13     Queue up N commands from cmd_list, launching more jobs as the first
14     finish.
15     """
16
17     def __init__(self, cmd_list, N=0):
18         """
19         cmd_list is a list of elements suitable for subprocess
20         N is the  number of simultanious processes to run. 
21         0 is all of them.
22         
23         WARNING: this will not work on windows
24         (It depends on being able to pass local file descriptors to the 
25         select call with isn't supported by the Win32 API)
26         """
27         self.to_run = cmd_list[:]
28         self.running = {}
29         self.N = N
30
31     def under_process_limit(self):
32         """
33         are we still under the total number of allowable jobs?
34         """
35         if self.N == 0:
36             return True
37
38         if len(self.running) < self.N:
39             return True
40
41         return False
42
43     def start_jobs(self):
44         """
45         Launch jobs until we have the maximum allowable running
46         (or have run out of jobs)
47         """
48         queue_log = logging.getLogger('queue')
49
50         while (len(self.to_run) > 0) and self.under_process_limit():
51             cmd = self.to_run.pop(0)
52             p = subprocess.Popen(cmd, stdout=PIPE)
53             self.running[p.stdout] = p
54             queue_log.info("Created process %d from %s" % (p.pid, str(cmd)))
55
56     def run(self):
57         """
58         run up to N jobs until we run out of jobs
59         """
60         queue_log = logging.getLogger('queue')
61
62         # to_run slowly gets consumed by start_jobs
63         while len(self.to_run) > 0 or len(self.running) > 0:
64             # fill any empty spots in our job queue
65             self.start_jobs()
66
67             # build a list of file descriptors
68             # fds=file desciptors
69             fds = [ x.stdout for x in self.running.values()]
70
71             # wait for something to finish
72             # wl= write list, xl=exception list (not used so get bad names)
73             read_list, wl, xl = select.select(fds, [], fds)
74         
75             # for everything that might have finished...
76             for pending_fd in read_list:
77                 pending = self.running[pending_fd]
78                 # if it really did finish, remove it from running jobs
79                 if pending.poll() is not None:
80                     queue_log.info("Process %d finished" % (pending.pid,))
81                     del self.running[pending_fd]
82