10 from pyinotify import EventsCodes
12 from benderjab.xsend import send
14 class Handler(pyinotify.ProcessEvent):
15 def __init__(self, watchmanager, runner_jid):
16 self.last_event_time = None
17 self.watchmanager = watchmanager
18 self.runner_jid = runner_jid
20 def process_IN_CREATE(self, event):
21 self.last_event_time = time.time()
22 msg = "Create: %s" % os.path.join(event.path, event.name)
23 if event.name.lower() == "run.completed":
26 send(self.runner_jid, "a run finished, launch it, and swap the drive")
28 print "ERROR: couldn't send message"
32 def process_IN_DELETE(self, event):
33 print "Remove: %s" % os.path.join(event.path, event.name)
35 def process_IN_UNMOUNT(self, event):
36 print "Unmounted ", str(event)
38 class TreeWriteDoneNotifier:
40 Watch a directory and send a message when another process is done writing.
42 This monitors a directory tree using inotify (linux specific) and
43 after some files having been written will send a message after <timeout>
44 seconds of no file writing.
46 (Basically when the solexa machine finishes dumping a round of data
47 this'll hopefully send out a message saying hey look theres data available
50 def __init__(self, watchdir,
52 write_timeout=10, notify_timeout=5):
53 self.watchdir = watchdir
54 self.copy_jid = copy_jid
55 self.runner_jid = runner_jid
56 self.write_timeout = write_timeout
57 self.notify_timeout = int(notify_timeout * 1000)
59 self.wm = pyinotify.WatchManager()
60 self.handler = Handler(self.wm, self.runner_jid)
61 self.notifier = pyinotify.Notifier(self.wm, self.handler)
63 self.add_watch(self.watchdir)
65 def add_watch(self, watchdir):
66 print "Watching:", watchdir
67 mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
68 # rec traverses the tree and adds all the directories that are there
70 # auto_add will add in new directories as they are created
71 self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True)
74 while True: # loop forever
76 # process the queue of events as explained above
77 self.notifier.process_events()
78 #check events waits timeout
79 if self.notifier.check_events(self.notify_timeout):
80 # read notified events and enqeue them
81 self.notifier.read_events()
82 # should we do something?
83 last_event_time = self.handler.last_event_time
84 if last_event_time is not None:
85 time_delta = time.time() - last_event_time
86 if time_delta > self.write_timeout:
88 self.handler.last_event_time = None
90 except KeyboardInterrupt:
91 # destroy the inotify's instance on this interrupt (stop monitoring)
95 def copying_paused(self):
96 print "more than 10 seconds have elapsed"
97 send(self.copy_jid, "start copy")