Spoolwatch function for getting cycle number from Recipe*.xml file.
[htsworkflow.git] / gaworkflow / automation / spoolwatcher.py
1 #!/usr/bin/env python
2 import logging
3 import os
4 import re
5 import sys
6 import time
7 import glob
8
9
10
11 # this uses pyinotify
12 import pyinotify
13 from pyinotify import EventsCodes
14
15 from benderjab import rpc
16
17
18 s_cycles = re.compile('No. Cycles: (?P<cycles>[0-9]+)')
19
20 def get_cycles(run_dir="."):
21   """
22   Find the number of cycles from the Recipe*.xml file found in run_dir.
23   """
24   
25   file_path_list = glob.glob(os.path.join(run_dir, "Recipe*.xml"))
26
27   # Error handling
28   if len(file_path_list) == 0:
29     msg = "Recipe xml file not found."
30     raise IOError, msg
31
32   elif len(file_path_list) > 1:
33     msg = "%s Recipe files found, expected 1." % (len(file_path_list))
34     raise ValueError, msg
35
36   f = open(file_path_list[0], 'r')
37
38   #Find the line of the file with the cycle number in it.
39   for line in f:
40     mo = s_cycles.search(line)
41     if mo:
42       break
43
44   f.close()
45
46   # Process the line with the cycle number in it.
47   cycle_num = int(mo.group('cycles'))
48
49   return cycle_num
50
51
52
53 class WatcherEvents(object):
54     # two events need to be tracked
55     # one to send startCopy
56     # one to send OMG its broken
57     # OMG its broken needs to stop when we've seen enough
58     #  cycles
59     # this should be per runfolder. 
60     # read the xml files 
61     def __init__(self):
62         pass
63         
64
65 class Handler(pyinotify.ProcessEvent):
66     def __init__(self, watchmanager, bot):
67         self.last_event_time = None
68         self.watchmanager = watchmanager
69         self.bot = bot
70
71     def process_IN_CREATE(self, event):
72         self.last_event_time = time.time()
73         msg = "Create: %s" %  os.path.join(event.path, event.name)
74         if event.name.lower() == "run.completed":
75             try:
76                 self.bot.sequencingFinished(event.path)
77             except IOError, e:
78                 logging.error("Couldn't send sequencingFinished")
79         logging.debug(msg)
80
81     def process_IN_DELETE(self, event):
82         logging.debug("Remove: %s" %  os.path.join(event.path, event.name))
83
84     def process_IN_UNMOUNT(self, event):
85         self.bot.unmount_watch()
86
87 class SpoolWatcher(rpc.XmlRpcBot):
88     """
89     Watch a directory and send a message when another process is done writing.
90     
91     This monitors a directory tree using inotify (linux specific) and
92     after some files having been written will send a message after <timeout>
93     seconds of no file writing.
94     
95     (Basically when the solexa machine finishes dumping a round of data
96     this'll hopefully send out a message saying hey look theres data available
97     
98     """
99     # these params need to be in the config file
100     # I wonder where I should put the documentation
101     #:Parameters:
102     #    `watchdir` - which directory tree to monitor for modifications
103     #    `profile` - specify which .gaworkflow profile to use
104     #    `write_timeout` - how many seconds to wait for writes to finish to
105     #                      the spool
106     #    `notify_timeout` - how often to timeout from notify
107     
108     def __init__(self, section=None, configfile=None):
109         #if configfile is None:
110         #    self.configfile = "~/.gaworkflow"
111         super(SpoolWatcher, self).__init__(section, configfile)
112         
113         self.cfg['watchdir'] = None
114         self.cfg['write_timeout'] = 10
115         self.cfg['notify_users'] = None
116         self.cfg['notify_runner'] = None
117         
118         self.notify_timeout = 0.001
119         self.wm = pyinotify.WatchManager()
120         self.handler = Handler(self.wm, self)
121         self.notifier = pyinotify.Notifier(self.wm, self.handler)
122         self.wdd = None
123         
124         self.notify_users = None
125         self.notify_runner = None
126         
127         self.eventTasks.append(self.process_notify)
128
129     def read_config(self, section=None, configfile=None):
130         super(SpoolWatcher, self).read_config(section, configfile)
131         
132         self.watch_dir = self._check_required_option('watchdir')
133         self.write_timeout = int(self.cfg['write_timeout'])
134         
135         self.notify_users = self._parse_user_list(self.cfg['notify_users'])
136         try:
137           self.notify_runner = \
138              self._parse_user_list(self.cfg['notify_runner'],
139                                    require_resource=True)
140         except bot.JIDMissingResource:
141             msg = 'need a full jabber ID + resource for xml-rpc destinations'
142             logging.FATAL(msg)
143             raise bot.JIDMissingResource(msg)
144             
145     def add_watch(self, watchdir=None):
146         """
147         start watching watchdir or self.watch_dir
148         we're currently limited to watching one directory tree.
149         """
150         # the one tree limit is mostly because self.wdd is a single item
151         # but managing it as a list might be a bit more annoying
152         if watchdir is None:
153             watchdir = self.watch_dir
154         logging.info("Watching:"+str(watchdir))
155         mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
156         # rec traverses the tree and adds all the directories that are there
157         # at the start.
158         # auto_add will add in new directories as they are created
159         self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True)
160
161     def unmount_watch(self):
162         if self.wdd is not None:
163             logging.debug("disabling watch")
164             logging.debug(str(self.wdd))
165             self.wm.rm_watch(self.wdd.values())
166             self.wdd = None
167             
168     def process_notify(self, *args):
169         # process the queue of events as explained above
170         self.notifier.process_events()
171         #check events waits timeout
172         if self.notifier.check_events(self.notify_timeout):
173             # read notified events and enqeue them
174             self.notifier.read_events()
175             # should we do something?
176         last_event_time = self.handler.last_event_time
177         if last_event_time is not None:
178             time_delta = time.time() - last_event_time
179             if time_delta > self.write_timeout:
180                 self.startCopy()
181                 self.handler.last_event_time = None
182     
183     def _parser(self, msg, who):
184         """
185         Parse xmpp chat messages
186         """
187         help = u"I can send [copy] message, or squencer [finished]"
188         if re.match(u"help", msg):
189             reply = help
190         elif re.match("copy", msg):            
191             self.startCopy()
192             reply = u"sent copy message"
193         elif re.match(u"finished", msg):
194             words = msg.split()
195             if len(words) == 2:
196                 self.sequencingFinished(words[1])
197                 reply = u"sending sequencing finished for %s" % (words[1])
198             else:
199                 reply = u"need runfolder name"
200         else:
201             reply = u"I didn't understand '%s'" %(msg)            
202         return reply
203         
204     def start(self, daemonize):
205         """
206         Start application
207         """
208         self.add_watch()
209         super(SpoolWatcher, self).start(daemonize)
210         
211     def stop(self):
212         """
213         shutdown application
214         """
215         # destroy the inotify's instance on this interrupt (stop monitoring)
216         self.notifier.stop()
217         super(SpoolWatcher, self).stop()
218     
219     def startCopy(self):
220         logging.debug("writes seem to have stopped")
221         if self.notify_runner is not None:
222             for r in self.notify_runner:
223                 self.rpc_send(r, tuple(), 'startCopy')
224         
225     def sequencingFinished(self, run_dir):
226         # need to strip off self.watch_dir from rundir I suspect.
227         logging.info("run.completed in " + str(run_dir))
228         pattern = self.watch_dir
229         if pattern[-1] != os.path.sep:
230             pattern += os.path.sep
231         stripped_run_dir = re.sub(pattern, "", run_dir)
232         logging.debug("stripped to " + stripped_run_dir)
233         if self.notify_users is not None:
234             for u in self.notify_users:
235                 self.send(u, 'Sequencing run %s finished' % (stripped_run_dir))
236         if self.notify_runner is not None:
237             for r in self.notify_runner:
238                 self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
239         
240 def main(args=None):
241     bot = SpoolWatcher()
242     return bot.main(args)
243     
244 if __name__ == "__main__":
245     sys.exit(main(sys.argv[1:]))
246