From: Diane Trout Date: Tue, 21 Jul 2009 22:23:21 +0000 (+0000) Subject: Allow specifying the we're finished sequencing file in the config file. X-Git-Tag: 0.2.6~6 X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=commitdiff_plain;h=52aa3ba8c730fc1fa5639529b5308d8b626395ec Allow specifying the we're finished sequencing file in the config file. (Currently the default is netcopy_complete.txt, but that might not be a good choice). While I was in there I changed it to catch the current event, and set a completed flag in the event instead of just directly sending the sequencingFinished message. I also put some effort into trying to only send startCopy messages for things that look like runfolders, both by not sending a copy message for the root of the watch directory and by checking to see if the top level directory that contains the event looks kind of like a runfolder name. (AKA "[0-9]{6}_" ) --- diff --git a/htsworkflow/automation/spoolwatcher.py b/htsworkflow/automation/spoolwatcher.py index 783d596..607b2dd 100644 --- a/htsworkflow/automation/spoolwatcher.py +++ b/htsworkflow/automation/spoolwatcher.py @@ -14,6 +14,20 @@ from pyinotify import EventsCodes from benderjab import rpc +def is_runfolder(name): + """ + Is it a runfolder? + + >>> print is_runfolder('090630_HWUSI-EAS999_0006_30LNFAAXX') + True + >>> print is_runfolder('hello') + False + """ + if re.match("[0-9]{6}_.*", name): + return True + else: + return False + def get_top_dir(root, path): """ Return the directory in path that is a subdirectory of root. @@ -36,47 +50,69 @@ def get_top_dir(root, path): else: return None -class WatcherEvents(object): - # two events need to be tracked - # one to send startCopy - # one to send OMG its broken - # OMG its broken needs to stop when we've seen enough - # cycles - # this should be per runfolder. - # read the xml files - def __init__(self): - pass +class WatcherEvent(object): + """ + Track information about a file event + + Currently its time, and if it was an indication we've completed the run. + """ + def __init__(self, event_root=None): + self.time = time.time() + self.event_root = event_root + self.complete = False + def __unicode__(self): + if self.complete: + complete = "(completed)" + else: + complete = "" + return u"" % (time.ctime(self.time), self.event_root, complete) class Handler(pyinotify.ProcessEvent): - def __init__(self, watchmanager, bot, ipar=False): + def __init__(self, watchmanager, bot, completion_file=None): """ - ipar flag indicates we should wait for ipar to finish, instead of - just the run finishing + Completion file contains current "completion" filename """ self.last_event = {} self.watchmanager = watchmanager self.bot = bot - self.ipar_mode = ipar - if self.ipar_mode: - self.last_file = 'IPAR_Netcopy_Complete.txt'.lower() - else: - self.last_file = "run.completed".lower() + if completion_file is not None: + completion_file = completion_file.lower() + self.completion_file = completion_file def process_IN_CREATE(self, event): for wdd in self.bot.wdds: for watch_path in self.bot.watchdirs: - if event.path.startswith(watch_path): + run_already_complete = False + # I only care about things created inside the watch directory, so + # the event path needs to be longer than the watch path in addition to + # starting with the watch_path + if len(event.path) > len(watch_path) and event.path.startswith(watch_path): + # compute name of the top level directory that had an event + # in the current watch path target = get_top_dir(watch_path, event.path) - self.last_event.setdefault(watch_path, {})[target] = time.time() - msg = "Create: %s %s %s" % (event.path, event.name, target) + if not is_runfolder(target): + logging.debug("Skipping %s, not a runfolder" % (target,)) + continue + + # grab the previous events for this watch path + watch_path_events = self.last_event.setdefault(watch_path, {}) + + # if we've already seen an event in this directory (AKA runfolder) + # keep track if its already hit the "completed" flag + if watch_path_events.has_key(target): + run_already_complete = watch_path_events[target].complete + + watch_path_events[target] = WatcherEvent(target) + #self.last_event.setdefault(watch_path, {})[target] = WatcherEvent(target) + + msg = "Create: %s %s %s %s" % (watch_path, target, event.path, event.name) + + if self.completion_file == event.name.lower() or run_already_complete: + self.last_event[watch_path][target].complete = True + msg += "(completed)" - if event.name.lower() == self.last_file: - try: - self.bot.sequencingFinished(event.path) - except IOError, e: - logging.error("Couldn't send sequencingFinished") logging.debug(msg) def process_IN_DELETE(self, event): @@ -108,6 +144,8 @@ class SpoolWatcher(rpc.XmlRpcBot): # `write_timeout` - how many seconds to wait for writes to finish to # the spool # `notify_timeout` - how often to timeout from notify + # `completion_file` - what file indicates we've finished sequencing + # defaults to: netcopy_complete.txt def __init__(self, section=None, configfile=None): #if configfile is None: @@ -118,7 +156,7 @@ class SpoolWatcher(rpc.XmlRpcBot): self.cfg['write_timeout'] = 10 self.cfg['notify_users'] = None self.cfg['notify_runner'] = None - self.cfg['wait_for_ipar'] = 0 + self.cfg['completion_file'] = 'netcopy_complete.txt' self.watchdirs = [] self.watchdir_url_map = {} @@ -148,7 +186,7 @@ class SpoolWatcher(rpc.XmlRpcBot): self.watchdir_url_map[watchdir] = self.cfg.get(watchdir, watchdir) self.write_timeout = int(self.cfg['write_timeout']) - self.wait_for_ipar = int(self.cfg['wait_for_ipar']) + self.completion_file = self.cfg['completion_file'] self.notify_users = self._parse_user_list(self.cfg['notify_users']) try: @@ -170,7 +208,7 @@ class SpoolWatcher(rpc.XmlRpcBot): # create the watch managers if we need them if self.wm is None: self.wm = pyinotify.WatchManager() - self.handler = Handler(self.wm, self, self.wait_for_ipar) + self.handler = Handler(self.wm, self, self.completion_file) self.notifier = pyinotify.Notifier(self.wm, self.handler) # the one tree limit is mostly because self.wdd is a single item @@ -224,11 +262,15 @@ class SpoolWatcher(rpc.XmlRpcBot): # should we do something? # has something happened? for watchdir, last_events in self.handler.last_event.items(): - for last_event_dir, last_event_time in last_events.items(): - time_delta = time.time() - last_event_time + for last_event_dir, last_event_detail in last_events.items(): + time_delta = time.time() - last_event_detail.time if time_delta > self.write_timeout: + print "timeout", unicode(last_event_detail) copy_url = self.make_copy_url(watchdir, last_event_dir) self.startCopy(copy_url) + if last_event_detail.complete: + self.sequencingFinished(last_event_detail.event_root) + self.handler.last_event[watchdir] = {} # handle unmounted filesystems for mount_point, was_mounted in self.mounted_points.items(): @@ -290,25 +332,29 @@ class SpoolWatcher(rpc.XmlRpcBot): self.rpc_send(r, tuple([copy_url]), 'startCopy') if self.notify_users is not None: for u in self.notify_users: - self.send(u, 'startCopy %s.' % (copy_urls,)) + self.send(u, 'startCopy %s.' % (copy_url,)) def sequencingFinished(self, run_dir): # need to strip off self.watchdirs from rundir I suspect. logging.info("run.completed in " + str(run_dir)) for watch in self.watchdirs: if not run_dir.startswith(watch): + print "%s didn't start with %s" % (run_dir, watch) continue if watch[-1] != os.path.sep: watch += os.path.sep stripped_run_dir = re.sub(watch, "", run_dir) - logging.debug("stripped to " + stripped_run_dir) - if self.notify_users is not None: - for u in self.notify_users: - self.send(u, 'Sequencing run %s finished' % \ - (stripped_run_dir)) - if self.notify_runner is not None: - for r in self.notify_runner: - self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished') + else: + stripped_run_dir = run_dir + + logging.debug("stripped to " + stripped_run_dir) + if self.notify_users is not None: + for u in self.notify_users: + self.send(u, 'Sequencing run %s finished' % \ + (stripped_run_dir)) + if self.notify_runner is not None: + for r in self.notify_runner: + self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished') def main(args=None): bot = SpoolWatcher()