b2520b206c563b0908e29c3dbe42e257656a2f43
[htsworkflow.git] / uashelper / spoolwatcher.py
1 #!/usr/bin/env python
2
3 import os
4 import re
5 import sys
6 import time
7
8 # this uses pyinotify
9 import pyinotify
10 from pyinotify import EventsCodes
11
12 from benderjab.xsend import send
13
14 copy_jid = "cellthumper@chaos.caltech.edu"
15 runner_jid = "diane@ghic.org"
16 copy_jid = runner_jid
17 class Handler(pyinotify.ProcessEvent):
18     def __init__(self, watchmanager):
19       self.last_event_time = None
20       self.watchmanager = watchmanager
21
22     def process_IN_CREATE(self, event):
23       self.last_event_time = time.time()
24       msg = "Create: %s" %  os.path.join(event.path, event.name)
25       if event.name == "Run.completed":
26         print "Run is done!"
27         send(runner_jid, "a run finished, launch it, and swap the drive")
28       print msg
29
30     def process_IN_DELETE(self, event):
31       print "Remove: %s" %  os.path.join(event.path, event.name)
32
33     def process_IN_UNMOUNT(self, event):
34       print "Unmounted ", str(event)
35
36 class TreeWriteDoneNotifier:
37   """
38   Watch a directory and send a message when another process is done writing.
39
40   This monitors a directory tree using inotify (linux specific) and 
41   after some files having been written will send a message after <timeout>
42   seconds of no file writing.
43
44   (Basically when the solexa machine finishes dumping a round of data
45   this'll hopefully send out a message saying hey look theres data available
46   """
47
48   def __init__(self, watchdir, write_timeout=10, notify_timeout=5):
49      self.watchdirs = []
50      self.write_timeout = write_timeout
51      self.notify_timeout = int(notify_timeout * 1000)
52
53      self.wm = pyinotify.WatchManager()
54      self.handler = Handler(self.wm)
55      self.notifier = pyinotify.Notifier(self.wm, self.handler)
56      
57      self.add_watch(watchdir)
58
59   def add_watch(self, watchdir):
60      self.watchdirs.append(watchdir)
61      print "Watching:", watchdir
62      mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
63      # rec traverses the tree and adds all the directories that are there 
64      # at the start.
65      # auto_add will add in new directories as they are created
66      self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True)
67
68   def event_loop(self):
69     while True:  # loop forever
70       try:
71         # process the queue of events as explained above
72         self.notifier.process_events()
73         #check events waits timeout
74         if self.notifier.check_events(self.notify_timeout):
75           # read notified events and enqeue them
76           self.notifier.read_events()
77         # should we do something?
78         last_event_time = self.handler.last_event_time
79         if last_event_time is not None:
80           time_delta = time.time() - last_event_time
81           if time_delta > self.write_timeout:
82             self.copying_paused()
83             self.handler.last_event_time = None
84
85       except KeyboardInterrupt:
86         # destroy the inotify's instance on this interrupt (stop monitoring)
87         self.notifier.stop()
88         break
89
90   def copying_paused(self):
91     print "more than 10 seconds have elapsed"
92     send(copy_jid, "start copy")
93
94
95 if __name__ == "__main__":
96   #watcher = TreeWriteDoneNotifier("/tmp")
97   watcher = TreeWriteDoneNotifier("/gec/jumpgate/ext0")
98   watcher.event_loop()
99