I also added code to record which directory watch, and what entry in that
directory was being created (AKA whatever element of the runfolder is being
touched, I record the root of the runfolder directory).
To support this I changed the config option in the benderjab file from
watchdir to watchdirs to make it a bit clearer that things have changed.
I still need to work out the communication protocol to copier so it
can figure out what to start copying. Also I'm recording the
watchdirectory, but copier needs access to a different url. So there's
some question about where the right place to map watchdir to copy url source
might be.
import logging
import os
import re
import logging
import os
import re
import sys
import time
#import glob
import sys
import time
#import glob
from benderjab import rpc
from benderjab import rpc
+def get_top_dir(root, path):
+ """
+ Return the directory in path that is a subdirectory of root.
+ e.g.
+
+ >>> print get_top_dir('/a/b/c', '/a/b/c/d/e/f')
+ d
+ >>> print get_top_dir('/a/b/c/', '/a/b/c/d/e/f')
+ d
+ >>> print get_top_dir('/a/b/c', '/g/e/f')
+ None
+ >>> print get_top_dir('/a/b/c', '/a/b/c')
+ <BLANKLINE>
+ """
+ if path.startswith(root):
+ subpath = path[len(root):]
+ if subpath.startswith('/'):
+ subpath = subpath[1:]
+ return subpath.split(os.path.sep)[0]
+ else:
+ return None
+
class WatcherEvents(object):
# two events need to be tracked
# one to send startCopy
class WatcherEvents(object):
# two events need to be tracked
# one to send startCopy
ipar flag indicates we should wait for ipar to finish, instead of
just the run finishing
"""
ipar flag indicates we should wait for ipar to finish, instead of
just the run finishing
"""
- print 'ipar flag: ' + str(ipar)
- self.last_event_time = None
self.watchmanager = watchmanager
self.bot = bot
self.ipar_mode = ipar
self.watchmanager = watchmanager
self.bot = bot
self.ipar_mode = ipar
self.last_file = "run.completed".lower()
def process_IN_CREATE(self, event):
self.last_file = "run.completed".lower()
def process_IN_CREATE(self, event):
- self.last_event_time = time.time()
- msg = "Create: %s" % os.path.join(event.path, event.name)
+ for wdd in self.bot.wdds:
+ for watch_path in self.bot.watchdirs:
+ if event.path.startswith(watch_path):
+ target = get_top_dir(watch_path, event.path)
+ self.last_event.setdefault(watch_path, {})[target] = time.time()
+
+ msg = "Create: %s %s" % (event.path, event.name)
- if event.name.lower() == self.last_file:
- try:
- self.bot.sequencingFinished(event.path)
- except IOError, e:
- logging.error("Couldn't send sequencingFinished")
- logging.debug(msg)
+ if event.name.lower() == self.last_file:
+ try:
+ self.bot.sequencingFinished(event.path)
+ except IOError, e:
+ logging.error("Couldn't send sequencingFinished")
+ logging.debug(msg)
def process_IN_DELETE(self, event):
logging.debug("Remove: %s" % os.path.join(event.path, event.name))
def process_IN_DELETE(self, event):
logging.debug("Remove: %s" % os.path.join(event.path, event.name))
def process_IN_UNMOUNT(self, event):
pathname = os.path.join(event.path, event.name)
logging.debug("IN_UNMOUNT: %s" % (pathname,))
def process_IN_UNMOUNT(self, event):
pathname = os.path.join(event.path, event.name)
logging.debug("IN_UNMOUNT: %s" % (pathname,))
- self.bot.unmount_watch()
+ self.bot.unmount_watch(event.path)
class SpoolWatcher(rpc.XmlRpcBot):
"""
class SpoolWatcher(rpc.XmlRpcBot):
"""
# these params need to be in the config file
# I wonder where I should put the documentation
#:Parameters:
# these params need to be in the config file
# I wonder where I should put the documentation
#:Parameters:
- # `watchdir` - which directory tree to monitor for modifications
+ # `watchdirs` - list of directories to monitor for modifications
# `profile` - specify which .htsworkflow profile to use
# `write_timeout` - how many seconds to wait for writes to finish to
# the spool
# `profile` - specify which .htsworkflow profile to use
# `write_timeout` - how many seconds to wait for writes to finish to
# the spool
# self.configfile = "~/.htsworkflow"
super(SpoolWatcher, self).__init__(section, configfile)
# self.configfile = "~/.htsworkflow"
super(SpoolWatcher, self).__init__(section, configfile)
- self.cfg['watchdir'] = None
+ self.cfg['watchdirs'] = None
self.cfg['write_timeout'] = 10
self.cfg['notify_users'] = None
self.cfg['notify_runner'] = None
self.cfg['write_timeout'] = 10
self.cfg['notify_users'] = None
self.cfg['notify_runner'] = None
self.notify_timeout = 0.001
self.wm = pyinotify.WatchManager()
self.notify_timeout = 0.001
self.wm = pyinotify.WatchManager()
- self.wdd = None
- self.mount_point = None
- self.mounted = True
+ self.wdds = []
+ # keep track if the specified mount point is currently mounted
+ self.mounted_points = {}
+ # keep track of which mount points tie to which watch directories
+ # so maybe we can remount them.
+ self.mounts_to_watches = {}
self.notify_users = None
self.notify_runner = None
self.notify_users = None
self.notify_runner = None
def read_config(self, section=None, configfile=None):
super(SpoolWatcher, self).read_config(section, configfile)
def read_config(self, section=None, configfile=None):
super(SpoolWatcher, self).read_config(section, configfile)
- self.watch_dir = self._check_required_option('watchdir')
+ self.watchdirs = shlex.split(self._check_required_option('watchdirs'))
self.write_timeout = int(self.cfg['write_timeout'])
self.wait_for_ipar = int(self.cfg['wait_for_ipar'])
self.write_timeout = int(self.cfg['write_timeout'])
self.wait_for_ipar = int(self.cfg['wait_for_ipar'])
- print 'wait for ipar: ' + str(self.cfg['wait_for_ipar'])
+ logging.debug('wait for ipar: ' + str(self.cfg['wait_for_ipar']))
self.notify_users = self._parse_user_list(self.cfg['notify_users'])
try:
self.notify_users = self._parse_user_list(self.cfg['notify_users'])
try:
self.handler = Handler(self.wm, self, self.wait_for_ipar)
self.notifier = pyinotify.Notifier(self.wm, self.handler)
self.handler = Handler(self.wm, self, self.wait_for_ipar)
self.notifier = pyinotify.Notifier(self.wm, self.handler)
- def add_watch(self, watchdir=None):
+ def add_watch(self, watchdirs=None):
"""
start watching watchdir or self.watch_dir
we're currently limited to watching one directory tree.
"""
# the one tree limit is mostly because self.wdd is a single item
# but managing it as a list might be a bit more annoying
"""
start watching watchdir or self.watch_dir
we're currently limited to watching one directory tree.
"""
# the one tree limit is mostly because self.wdd is a single item
# but managing it as a list might be a bit more annoying
- if watchdir is None:
- watchdir = self.watch_dir
- logging.info("Watching:"+str(watchdir))
-
- self.mount_point = mount.find_mount_point_for(watchdir)
+ if watchdirs is None:
+ watchdirs = self.watchdirs
mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
# rec traverses the tree and adds all the directories that are there
# at the start.
# auto_add will add in new directories as they are created
mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
# rec traverses the tree and adds all the directories that are there
# at the start.
# auto_add will add in new directories as they are created
- self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True)
+ for w in watchdirs:
+ mount_location = mount.find_mount_point_for(w)
+ self.mounted_points[mount_location] = True
+ mounts = self.mounts_to_watches.get(mount_location, [])
+ if w not in mounts:
+ mounts.append(w)
+ self.mounts_to_watches[mount_location] = mounts
+
+ logging.info(u"Watching:"+unicode(w))
+ self.wdds.append(self.wm.add_watch(w, mask, rec=True, auto_add=True))
- def unmount_watch(self):
- if self.wdd is not None:
- self.wm.rm_watch(self.wdd.values())
- self.wdd = None
- self.mounted = False
+ def unmount_watch(self, event_path):
+ # remove backwards so we don't get weirdness from
+ # the list getting shorter
+ for i in range(len(self.wdds),0, -1):
+ wdd = self.wdds[i]
+ logging.info(u'unmounting: '+unicode(wdd.items()))
+ self.wm.rm_watch(wdd.values())
+ del self.wdds[i]
+ self.mounted = False
def process_notify(self, *args):
# process the queue of events as explained above
def process_notify(self, *args):
# process the queue of events as explained above
self.notifier.read_events()
# should we do something?
# has something happened?
self.notifier.read_events()
# should we do something?
# has something happened?
- last_event_time = self.handler.last_event_time
- if last_event_time is not None:
- time_delta = time.time() - last_event_time
- if time_delta > self.write_timeout:
- self.startCopy()
- self.handler.last_event_time = None
+ for watch_dir, last_events in self.handler.last_event.items():
+ logging.debug('last_events: %s %s' % (watch_dir, last_events))
+ for last_event_dir, last_event_time in last_events.items():
+ time_delta = time.time() - last_event_time
+ if time_delta > self.write_timeout:
+ self.startCopy(watch_dir, last_event_dir)
+ self.handler.last_event[watch_dir] = {}
# handle unmounted filesystems
# handle unmounted filesystems
- if not self.mounted:
- if mount.is_mounted(self.mount_point):
+ for mount_point, was_mounted in self.mounted_points.items():
+ if not was_mounted and mount.is_mounted(mount_point):
# we've been remounted. Huzzah!
# restart the watch
# we've been remounted. Huzzah!
# restart the watch
- self.add_watch()
- self.mounted = True
- logging.info(
- "%s was remounted, restarting watch" % \
- (self.mount_point)
- )
+ for watch in self.mounts_to_watches[mount_point]:
+ self.add_watch(watch)
+ logging.info(
+ "%s was remounted, restarting watch" % \
+ (mount_point)
+ )
+ self.mounted_points[mount_point] = True
def _parser(self, msg, who):
"""
def _parser(self, msg, who):
"""
self.notifier.stop()
super(SpoolWatcher, self).stop()
self.notifier.stop()
super(SpoolWatcher, self).stop()
+ def startCopy(self, watchdir=None, event_path=None):
logging.debug("writes seem to have stopped")
logging.debug("writes seem to have stopped")
+ logging.debug("watchdir = %s, event_path = %s" % (watchdir, event_path))
if self.notify_runner is not None:
for r in self.notify_runner:
self.rpc_send(r, tuple(), 'startCopy')
if self.notify_runner is not None:
for r in self.notify_runner:
self.rpc_send(r, tuple(), 'startCopy')
+ if self.notify_users is not None:
+ for u in self.notify_users:
+ self.send(u, 'startCopy %s %s' % (watchdir, event_path))
def sequencingFinished(self, run_dir):
# need to strip off self.watch_dir from rundir I suspect.
def sequencingFinished(self, run_dir):
# need to strip off self.watch_dir from rundir I suspect.
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
+# TODO:
+# send messages to copier specifying which mount to copy