[project @ rename python module to gaworkflow from uashelper]
[htsworkflow.git] / gaworkflow / spoolwatcher.py
1 #!/usr/bin/env python
2
3 import os
4 import re
5 import sys
6 import time
7
8 # this uses pyinotify
9 import pyinotify
10 from pyinotify import EventsCodes
11
12 from benderjab.xsend import send
13
14 class Handler(pyinotify.ProcessEvent):
15     def __init__(self, watchmanager, runner_jid):
16       self.last_event_time = None
17       self.watchmanager = watchmanager
18       self.runner_jid = runner_jid
19
20     def process_IN_CREATE(self, event):
21       self.last_event_time = time.time()
22       msg = "Create: %s" %  os.path.join(event.path, event.name)
23       if event.name.lower() == "run.completed":
24         print "Run is done!"
25         try:
26           send(self.runner_jid, "a run finished, launch it, and swap the drive")
27         except IOError, e:
28           print "ERROR: couldn't send message"
29           print str(e)
30       print msg
31
32     def process_IN_DELETE(self, event):
33       print "Remove: %s" %  os.path.join(event.path, event.name)
34
35     def process_IN_UNMOUNT(self, event):
36       print "Unmounted ", str(event)
37
38 class TreeWriteDoneNotifier:
39   """
40   Watch a directory and send a message when another process is done writing.
41
42   This monitors a directory tree using inotify (linux specific) and 
43   after some files having been written will send a message after <timeout>
44   seconds of no file writing.
45
46   (Basically when the solexa machine finishes dumping a round of data
47   this'll hopefully send out a message saying hey look theres data available
48   """
49
50   def __init__(self, watchdir, 
51                      copy_jid, runner_jid, 
52                      write_timeout=10, notify_timeout=5):
53      self.watchdir = watchdir
54      self.copy_jid = copy_jid
55      self.runner_jid = runner_jid
56      self.write_timeout = write_timeout
57      self.notify_timeout = int(notify_timeout * 1000)
58
59      self.wm = pyinotify.WatchManager()
60      self.handler = Handler(self.wm, self.runner_jid)
61      self.notifier = pyinotify.Notifier(self.wm, self.handler)
62      
63      self.add_watch(self.watchdir)
64
65   def add_watch(self, watchdir):
66      print "Watching:", watchdir
67      mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
68      # rec traverses the tree and adds all the directories that are there 
69      # at the start.
70      # auto_add will add in new directories as they are created
71      self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True)
72
73   def event_loop(self):
74     while True:  # loop forever
75       try:
76         # process the queue of events as explained above
77         self.notifier.process_events()
78         #check events waits timeout
79         if self.notifier.check_events(self.notify_timeout):
80           # read notified events and enqeue them
81           self.notifier.read_events()
82         # should we do something?
83         last_event_time = self.handler.last_event_time
84         if last_event_time is not None:
85           time_delta = time.time() - last_event_time
86           if time_delta > self.write_timeout:
87             self.copying_paused()
88             self.handler.last_event_time = None
89
90       except KeyboardInterrupt:
91         # destroy the inotify's instance on this interrupt (stop monitoring)
92         self.notifier.stop()
93         break
94
95   def copying_paused(self):
96     print "more than 10 seconds have elapsed"
97     send(self.copy_jid, "start copy")
98