From: Diane Trout Date: Sat, 24 Jan 2009 01:28:29 +0000 (+0000) Subject: merge copying/running automation code from trunk into live site X-Git-Tag: 0.1.14~8 X-Git-Url: http://woldlab.caltech.edu/gitweb/?a=commitdiff_plain;h=0fce83604b0a4013ed6ac031a2729f1233fabd4f;p=htsworkflow.git merge copying/running automation code from trunk into live site --- diff --git a/gaworkflow/automation/copier.py b/gaworkflow/automation/copier.py index 2e0d3ae..572cfb3 100644 --- a/gaworkflow/automation/copier.py +++ b/gaworkflow/automation/copier.py @@ -123,7 +123,7 @@ class rsync(object): class CopierBot(rpc.XmlRpcBot): def __init__(self, section=None, configfile=None): #if configfile is None: - # configfile = '~/.gaworkflow' + # configfile = '~/.htsworkflow' super(CopierBot, self).__init__(section, configfile) diff --git a/gaworkflow/automation/runner.py b/gaworkflow/automation/runner.py index f81b682..45d4ffc 100644 --- a/gaworkflow/automation/runner.py +++ b/gaworkflow/automation/runner.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +from glob import glob import logging import os import re @@ -8,8 +9,7 @@ import threading from benderjab import rpc -from gaworkflow.pipeline.configure_run import * -from gaworkflow.pipeline.monitors import _percentCompleted +from htsworkflow.pipelines.configure_run import * #s_fc = re.compile('FC[0-9]+') s_fc = re.compile('_[0-9a-zA-Z]*$') @@ -36,7 +36,7 @@ class Runner(rpc.XmlRpcBot): """ def __init__(self, section=None, configfile=None): #if configfile is None: - # self.configfile = "~/.gaworkflow" + # self.configfile = "~/.htsworkflow" super(Runner, self).__init__(section, configfile) self.cfg['notify_users'] = None @@ -83,6 +83,8 @@ class Runner(rpc.XmlRpcBot): reply = u"starting run for %s" % (words[1]) else: reply = u"need runfolder name" + elif re.match(u"path", msg): + reply = u"My path is: " + unicode(os.environ['PATH']) else: reply = u"I didn't understand '%s'" %(msg) @@ -103,24 +105,7 @@ class Runner(rpc.XmlRpcBot): return "No status information for %s yet." \ " Probably still in configure step. Try again later." % (fc_num) - fc,ft = status.statusFirecrest() - bc,bt = status.statusBustard() - gc,gt = status.statusGerald() - - tc,tt = status.statusTotal() - - fp = _percentCompleted(fc, ft) - bp = _percentCompleted(bc, bt) - gp = _percentCompleted(gc, gt) - tp = _percentCompleted(tc, tt) - - output = [] - - output.append(u'Firecrest: %s%% (%s/%s)' % (fp, fc, ft)) - output.append(u' Bustard: %s%% (%s/%s)' % (bp, bc, bt)) - output.append(u' Gerald: %s%% (%s/%s)' % (gp, gc, gt)) - output.append(u'-----------------------') - output.append(u' Total: %s%% (%s/%s)' % (tp, tc, tt)) + output = status.statusReport() return '\n'.join(output) @@ -177,7 +162,7 @@ class Runner(rpc.XmlRpcBot): # retrieve config step cfg_filepath = os.path.join(conf_info.analysis_dir, - 'config32auto.txt') + 'config-auto.txt') status_retrieve_cfg = retrieve_config(conf_info, flowcell, cfg_filepath, @@ -196,6 +181,9 @@ class Runner(rpc.XmlRpcBot): if status: logging.info("Runner: Configure: success") self.reportMsg("Configure (%s): success" % (run_dir)) + self.reportMsg( + os.linesep.join(glob(os.path.join(run_dir,'Data','C*'))) + ) else: logging.error("Runner: Configure: failed") self.reportMsg("Configure (%s): FAILED" % (run_dir)) @@ -210,7 +198,7 @@ class Runner(rpc.XmlRpcBot): run_status = run_pipeline(conf_info) if run_status is True: logging.info('Runner: Pipeline: success') - self.piplineFinished(run_dir) + self.reportMsg("Pipeline run (%s): Finished" % (run_dir,)) else: logging.info('Runner: Pipeline: failed') self.reportMsg("Pipeline run (%s): FAILED" % (run_dir)) diff --git a/gaworkflow/automation/spoolwatcher.py b/gaworkflow/automation/spoolwatcher.py index abd9709..2a57535 100644 --- a/gaworkflow/automation/spoolwatcher.py +++ b/gaworkflow/automation/spoolwatcher.py @@ -6,7 +6,7 @@ import sys import time #import glob -#from gaworkflow.pipeline.recipe_parser import get_cycles +from htsworkflow.util import mount # this uses pyinotify import pyinotify @@ -47,6 +47,8 @@ class Handler(pyinotify.ProcessEvent): logging.debug("Remove: %s" % os.path.join(event.path, event.name)) def process_IN_UNMOUNT(self, event): + pathname = os.path.join(event.path, event.name) + logging.debug("IN_UNMOUNT: %s" % (pathname,)) self.bot.unmount_watch() class SpoolWatcher(rpc.XmlRpcBot): @@ -65,14 +67,14 @@ class SpoolWatcher(rpc.XmlRpcBot): # I wonder where I should put the documentation #:Parameters: # `watchdir` - which directory tree to monitor for modifications - # `profile` - specify which .gaworkflow profile to use + # `profile` - specify which .htsworkflow profile to use # `write_timeout` - how many seconds to wait for writes to finish to # the spool # `notify_timeout` - how often to timeout from notify def __init__(self, section=None, configfile=None): #if configfile is None: - # self.configfile = "~/.gaworkflow" + # self.configfile = "~/.htsworkflow" super(SpoolWatcher, self).__init__(section, configfile) self.cfg['watchdir'] = None @@ -85,6 +87,8 @@ class SpoolWatcher(rpc.XmlRpcBot): self.handler = Handler(self.wm, self) self.notifier = pyinotify.Notifier(self.wm, self.handler) self.wdd = None + self.mount_point = None + self.mounted = True self.notify_users = None self.notify_runner = None @@ -106,7 +110,7 @@ class SpoolWatcher(rpc.XmlRpcBot): msg = 'need a full jabber ID + resource for xml-rpc destinations' logging.FATAL(msg) raise bot.JIDMissingResource(msg) - + def add_watch(self, watchdir=None): """ start watching watchdir or self.watch_dir @@ -117,6 +121,9 @@ class SpoolWatcher(rpc.XmlRpcBot): if watchdir is None: watchdir = self.watch_dir logging.info("Watching:"+str(watchdir)) + + self.mount_point = mount.find_mount_point_for(watchdir) + mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT # rec traverses the tree and adds all the directories that are there # at the start. @@ -125,10 +132,9 @@ class SpoolWatcher(rpc.XmlRpcBot): def unmount_watch(self): if self.wdd is not None: - logging.debug("disabling watch") - logging.debug(str(self.wdd)) self.wm.rm_watch(self.wdd.values()) self.wdd = None + self.mounted = False def process_notify(self, *args): # process the queue of events as explained above @@ -138,13 +144,25 @@ class SpoolWatcher(rpc.XmlRpcBot): # read notified events and enqeue them self.notifier.read_events() # should we do something? + # has something happened? last_event_time = self.handler.last_event_time if last_event_time is not None: time_delta = time.time() - last_event_time if time_delta > self.write_timeout: self.startCopy() self.handler.last_event_time = None - + # handle unmounted filesystems + if not self.mounted: + if mount.is_mounted(self.mount_point): + # we've been remounted. Huzzah! + # restart the watch + self.add_watch() + self.mounted = True + logging.info( + "%s was remounted, restarting watch" % \ + (self.mount_point) + ) + def _parser(self, msg, who): """ Parse xmpp chat messages diff --git a/gaworkflow/automation/test/test_runner.py b/gaworkflow/automation/test/test_runner.py new file mode 100644 index 0000000..6c3b9df --- /dev/null +++ b/gaworkflow/automation/test/test_runner.py @@ -0,0 +1,46 @@ +import unittest + + +import os +from htsworkflow.automation.copier import runfolder_validate + +def extract_runfolder_path(watchdir, event): + runfolder_path = watchdir + path = event.path + if not path.startswith(watchdir): + return None + + fragments = path[len(watchdir):].split(os.path.sep) + for f in fragments: + runfolder_path = os.path.join(runfolder_path, f) + if runfolder_validate(f): + return runfolder_path + return None + +class Event(object): + def __init__(self, path=None, name=None): + self.path = path + self.name = name + +class testRunner(unittest.TestCase): + + def test_extract_runfolder(self): + watchdir = os.path.join('root', 'server', 'mount') + runfolder = os.path.join(watchdir, '080909_HWI-EAS229_0052_1234ABCD') + ipar = os.path.join(runfolder, 'Data', 'IPAR_1.01') + other = os.path.join(watchdir, 'other') + + event = Event( path=runfolder ) + self.failUnlessEqual(extract_runfolder_path(watchdir, event), runfolder) + + event = Event( path=ipar ) + self.failUnlessEqual(extract_runfolder_path(watchdir, event), runfolder) + + event = Event( path=other) + self.failUnlessEqual(extract_runfolder_path(watchdir, event), None ) + +def suite(): + return unittest.makeSuite(testRunner,'test') + +if __name__ == "__main__": + unittest.main(defaultTest="suite")