7 from htsworkflow.util.queuecommands import QueueCommands
9 class testQueueCommands(unittest.TestCase):
11 logging.basicConfig(level=logging.DEBUG,
12 format='%(asctime)s %(name)-8s %(message)s')
16 def test_unlimited_run(self):
18 Run everything at once
20 cmds = ['/bin/sleep 0',
24 q = QueueCommands(cmds)
27 end = time.time()-start
28 # we should only take the length of the longest sleep
29 # pity I had to add a 1 second sleep
30 self.failUnless( end > 2.9 and end < 3.1,
31 "took %s seconds, exected ~3" % (end,))
33 def test_limited_run(self):
35 Run a limited number of jobs
37 cmds = ['/bin/sleep 1',
41 q = QueueCommands(cmds, 2)
45 end = time.time()-start
46 # pity I had to add a 1 second sleep
47 self.failUnless( end > 5.9 and end < 6.1,
48 "took %s seconds, expected ~6" % (end,))
51 return unittest.makeSuite(testQueueCommands, 'test')
53 if __name__ == "__main__":
54 unittest.main(defaultTest='suite')