so don't initialize it until after the daemonize code has been
called. this means after BenderJab.start has been called.
(So I changed from SpoolWatcher.start to SpoolWatcher.run
and moved the watch manager configuration into the
start of run)
import shlex
import sys
import time
import shlex
import sys
import time
from htsworkflow.util import mount
from htsworkflow.util import mount
from benderjab import rpc
from benderjab import rpc
def get_top_dir(root, path):
"""
Return the directory in path that is a subdirectory of root.
def get_top_dir(root, path):
"""
Return the directory in path that is a subdirectory of root.
try:
self.bot.sequencingFinished(event.path)
except IOError, e:
try:
self.bot.sequencingFinished(event.path)
except IOError, e:
logging.error("Couldn't send sequencingFinished")
logging.debug(msg)
def process_IN_DELETE(self, event):
logging.debug("Remove: %s" % os.path.join(event.path, event.name))
logging.error("Couldn't send sequencingFinished")
logging.debug(msg)
def process_IN_DELETE(self, event):
logging.debug("Remove: %s" % os.path.join(event.path, event.name))
def process_IN_UNMOUNT(self, event):
pathname = os.path.join(event.path, event.name)
def process_IN_UNMOUNT(self, event):
pathname = os.path.join(event.path, event.name)
self.cfg['notify_users'] = None
self.cfg['notify_runner'] = None
self.cfg['wait_for_ipar'] = 0
self.cfg['notify_users'] = None
self.cfg['notify_runner'] = None
self.cfg['wait_for_ipar'] = 0
+
+ self.watchdirs = []
+ self.watchdir_url_map = {}
self.notify_timeout = 0.001
self.notify_timeout = 0.001
- self.wm = pyinotify.WatchManager()
+
+ self.wm = None
+ self.notify_users = None
+ self.notify_runner = None
# keep track if the specified mount point is currently mounted
self.mounted_points = {}
# keep track of which mount points tie to which watch directories
# so maybe we can remount them.
self.mounts_to_watches = {}
# keep track if the specified mount point is currently mounted
self.mounted_points = {}
# keep track of which mount points tie to which watch directories
# so maybe we can remount them.
self.mounts_to_watches = {}
- self.notify_users = None
- self.notify_runner = None
-
self.eventTasks.append(self.process_notify)
def read_config(self, section=None, configfile=None):
super(SpoolWatcher, self).read_config(section, configfile)
self.watchdirs = shlex.split(self._check_required_option('watchdirs'))
self.eventTasks.append(self.process_notify)
def read_config(self, section=None, configfile=None):
super(SpoolWatcher, self).read_config(section, configfile)
self.watchdirs = shlex.split(self._check_required_option('watchdirs'))
+ # see if there's an alternate url that should be used for the watchdir
+ for watchdir in self.watchdirs:
+ self.watchdir_url_map[watchdir] = self.cfg.get(watchdir, watchdir)
+
self.write_timeout = int(self.cfg['write_timeout'])
self.wait_for_ipar = int(self.cfg['wait_for_ipar'])
self.write_timeout = int(self.cfg['write_timeout'])
self.wait_for_ipar = int(self.cfg['wait_for_ipar'])
- logging.debug('wait for ipar: ' + str(self.cfg['wait_for_ipar']))
self.notify_users = self._parse_user_list(self.cfg['notify_users'])
try:
self.notify_users = self._parse_user_list(self.cfg['notify_users'])
try:
require_resource=True)
except bot.JIDMissingResource:
msg = 'need a full jabber ID + resource for xml-rpc destinations'
require_resource=True)
except bot.JIDMissingResource:
msg = 'need a full jabber ID + resource for xml-rpc destinations'
raise bot.JIDMissingResource(msg)
raise bot.JIDMissingResource(msg)
- self.handler = Handler(self.wm, self, self.wait_for_ipar)
- self.notifier = pyinotify.Notifier(self.wm, self.handler)
+ self.handler = None
+ self.notifier = None
def add_watch(self, watchdirs=None):
"""
start watching watchdir or self.watchdir
we're currently limited to watching one directory tree.
"""
def add_watch(self, watchdirs=None):
"""
start watching watchdir or self.watchdir
we're currently limited to watching one directory tree.
"""
+ # create the watch managers if we need them
+ if self.wm is None:
+ self.wm = pyinotify.WatchManager()
+ self.handler = Handler(self.wm, self, self.wait_for_ipar)
+ self.notifier = pyinotify.Notifier(self.wm, self.handler)
+
# the one tree limit is mostly because self.wdd is a single item
# but managing it as a list might be a bit more annoying
if watchdirs is None:
# the one tree limit is mostly because self.wdd is a single item
# but managing it as a list might be a bit more annoying
if watchdirs is None:
self.mounted = False
def process_notify(self, *args):
self.mounted = False
def process_notify(self, *args):
+ if self.notifier is None:
+ # nothing to do yet
+ return
# process the queue of events as explained above
self.notifier.process_events()
#check events waits timeout
# process the queue of events as explained above
self.notifier.process_events()
#check events waits timeout
# should we do something?
# has something happened?
for watchdir, last_events in self.handler.last_event.items():
# should we do something?
# has something happened?
for watchdir, last_events in self.handler.last_event.items():
- logging.debug('last_events: %s %s' % (watchdir, last_events))
+ #logging.debug('last_events: %s %s' % (watchdir, last_events))
for last_event_dir, last_event_time in last_events.items():
time_delta = time.time() - last_event_time
if time_delta > self.write_timeout:
for last_event_dir, last_event_time in last_events.items():
time_delta = time.time() - last_event_time
if time_delta > self.write_timeout:
reply = u"I didn't understand '%s'" %(msg)
return reply
reply = u"I didn't understand '%s'" %(msg)
return reply
- def start(self, daemonize):
"""
Start application
"""
"""
Start application
"""
+ # we have to configure pyinotify after BenderJab.start is called
+ # as weird things happen to pyinotify if the stdio is closed
+ # after it's initialized.
- super(SpoolWatcher, self).start(daemonize)
+ super(SpoolWatcher, self).run()
def stop(self):
"""
shutdown application
"""
# destroy the inotify's instance on this interrupt (stop monitoring)
def stop(self):
"""
shutdown application
"""
# destroy the inotify's instance on this interrupt (stop monitoring)
+ if self.notifier is not None:
+ self.notifier.stop()
super(SpoolWatcher, self).stop()
def startCopy(self, watchdir=None, event_path=None):
logging.debug("writes seem to have stopped")
super(SpoolWatcher, self).stop()
def startCopy(self, watchdir=None, event_path=None):
logging.debug("writes seem to have stopped")
- logging.debug("watchdir = %s, event_path = %s" % (watchdir, event_path))
if self.notify_runner is not None:
for r in self.notify_runner:
self.rpc_send(r, tuple(), 'startCopy')
if self.notify_runner is not None:
for r in self.notify_runner:
self.rpc_send(r, tuple(), 'startCopy')
return bot.main(args)
if __name__ == "__main__":
return bot.main(args)
if __name__ == "__main__":
- sys.exit(main(sys.argv[1:]))
+ ret = main(sys.argv[1:])
+ #sys.exit(ret)
# TODO:
# send messages to copier specifying which mount to copy
# TODO:
# send messages to copier specifying which mount to copy