Initial port to python3
[htsworkflow.git] / htsworkflow / util / queuecommands.py
1 """
2 Run up to N simultanous jobs from provided of commands
3 """
4
5 import logging
6 import os
7 from subprocess import PIPE
8 import subprocess
9 import select
10 import sys
11 import time
12
13 LOGGER = logging.getLogger(__name__)
14
15 class QueueCommands(object):
16     """
17     Queue up N commands from cmd_list, launching more jobs as the first
18     finish.
19     """
20
21     def __init__(self, cmd_list, N=0, cwd=None, env=None):
22         """
23         cmd_list is a list of elements suitable for subprocess
24         N is the  number of simultanious processes to run.
25         0 is all of them.
26
27         WARNING: this will not work on windows
28         (It depends on being able to pass local file descriptors to the
29         select call with isn't supported by the Win32 API)
30         """
31         self.to_run = cmd_list[:]
32         self.running = {}
33         self.N = N
34         self.cwd = cwd
35         self.env = env
36
37     def under_process_limit(self):
38         """
39         are we still under the total number of allowable jobs?
40         """
41         if self.N == 0:
42             return True
43
44         if len(self.running) < self.N:
45             return True
46
47         return False
48
49     def start_jobs(self):
50         """
51         Launch jobs until we have the maximum allowable running
52         (or have run out of jobs)
53         """
54         while (len(self.to_run) > 0) and self.under_process_limit():
55             LOGGER.info('%d left to run', len(self.to_run))
56             cmd = self.to_run.pop(0)
57             p = subprocess.Popen(cmd,
58                                  stdout=PIPE,
59                                  shell=True,
60                                  cwd=self.cwd,
61                                  env=self.env)
62             self.running[p.stdout] = p
63             LOGGER.info("Created process %d from %s" % (p.pid, str(cmd)))
64
65     def run(self):
66         """
67         run up to N jobs until we run out of jobs
68         """
69         LOGGER.debug('using %s as cwd' % (self.cwd,))
70
71         # to_run slowly gets consumed by start_jobs
72         while len(self.to_run) > 0 or len(self.running) > 0:
73             # fill any empty spots in our job queue
74             self.start_jobs()
75
76             # build a list of file descriptors
77             # fds=file desciptors
78             fds = [ x.stdout for x in list(self.running.values())]
79
80             # wait for something to finish
81             # wl= write list, xl=exception list (not used so get bad names)
82             read_list, wl, xl = select.select(fds, [], fds, 1 )
83
84             # for everything that might have finished...
85             for pending_fd in read_list:
86                 pending = self.running[pending_fd]
87                 # if it really did finish, remove it from running jobs
88                 if pending.poll() is not None:
89                     LOGGER.info("Process %d finished [%d]",
90                                 pending.pid, pending.returncode)
91                     del self.running[pending_fd]
92                 else:
93                     # It's still running, but there's some output
94                     buffer = pending_fd.readline()
95                     buffer = buffer.strip()
96                     msg = "%d:(%d) %s" %(pending.pid, len(buffer), buffer)
97                     LOGGER.debug(msg)