[project @ make a spoolwatcher script]
[htsworkflow.git] / uashelper / spoolwatcher.py
1 #!/usr/bin/env python
2
3 import os
4 import re
5 import sys
6 import time
7
8 # this uses pyinotify
9 import pyinotify
10 from pyinotify import EventsCodes
11
12 from benderjab.xsend import send
13
14 class Handler(pyinotify.ProcessEvent):
15     def __init__(self, watchmanager, runner_jid):
16       self.last_event_time = None
17       self.watchmanager = watchmanager
18       self.runner_jid = runner_jid
19
20     def process_IN_CREATE(self, event):
21       self.last_event_time = time.time()
22       msg = "Create: %s" %  os.path.join(event.path, event.name)
23       if event.name.lower() == "run.completed":
24         print "Run is done!"
25         send(self.runner_jid, "a run finished, launch it, and swap the drive")
26       print msg
27
28     def process_IN_DELETE(self, event):
29       print "Remove: %s" %  os.path.join(event.path, event.name)
30
31     def process_IN_UNMOUNT(self, event):
32       print "Unmounted ", str(event)
33
34 class TreeWriteDoneNotifier:
35   """
36   Watch a directory and send a message when another process is done writing.
37
38   This monitors a directory tree using inotify (linux specific) and 
39   after some files having been written will send a message after <timeout>
40   seconds of no file writing.
41
42   (Basically when the solexa machine finishes dumping a round of data
43   this'll hopefully send out a message saying hey look theres data available
44   """
45
46   def __init__(self, watchdir, 
47                      copy_jid, runner_jid, 
48                      write_timeout=10, notify_timeout=5):
49      self.watchdir = watchdir
50      self.copy_jid = copy_jid
51      self.runner_jid = runner_jid
52      self.write_timeout = write_timeout
53      self.notify_timeout = int(notify_timeout * 1000)
54
55      self.wm = pyinotify.WatchManager()
56      self.handler = Handler(self.wm, self.runner_jid)
57      self.notifier = pyinotify.Notifier(self.wm, self.handler)
58      
59      self.add_watch(self.watchdir)
60
61   def add_watch(self, watchdir):
62      print "Watching:", watchdir
63      mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
64      # rec traverses the tree and adds all the directories that are there 
65      # at the start.
66      # auto_add will add in new directories as they are created
67      self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True)
68
69   def event_loop(self):
70     while True:  # loop forever
71       try:
72         # process the queue of events as explained above
73         self.notifier.process_events()
74         #check events waits timeout
75         if self.notifier.check_events(self.notify_timeout):
76           # read notified events and enqeue them
77           self.notifier.read_events()
78         # should we do something?
79         last_event_time = self.handler.last_event_time
80         if last_event_time is not None:
81           time_delta = time.time() - last_event_time
82           if time_delta > self.write_timeout:
83             self.copying_paused()
84             self.handler.last_event_time = None
85
86       except KeyboardInterrupt:
87         # destroy the inotify's instance on this interrupt (stop monitoring)
88         self.notifier.stop()
89         break
90
91   def copying_paused(self):
92     print "more than 10 seconds have elapsed"
93     send(self.copy_jid, "start copy")
94