From b76f06384f17ba63e1f7bc4057fb945e6455808d Mon Sep 17 00:00:00 2001 From: Brandon King Date: Wed, 21 Nov 2007 20:53:23 +0000 Subject: [PATCH] [project @ Monitor status implementation + config_pipeline cmdling args] * configure_pipeline now takes an optional command line argument of an eland config file to use. (Overrides automatic download). * Added monitors.py which contains methods providing a way of triggering some sort of threaded monitor of pipeline progress. * startCmdLineStatusMonitor(conf_info) prints status to stdout * Updated configure_pipeline script to use the startCmdLineStatusMonitor function. * ConfigInfo object now holds a status variable (GARunStatus object) * requires calling conf_info.createStatusObject() after _cfg_filepath has been set (currently handled by run_pipeline function) --- gaworkflow/pipeline/configure_run.py | 56 ++++++++++++++++++++++-- gaworkflow/pipeline/monitors.py | 64 ++++++++++++++++++++++++++++ scripts/configure_pipeline | 31 +++++++++++--- 3 files changed, 140 insertions(+), 11 deletions(-) create mode 100644 gaworkflow/pipeline/monitors.py diff --git a/gaworkflow/pipeline/configure_run.py b/gaworkflow/pipeline/configure_run.py index f71bfc5..19e2426 100644 --- a/gaworkflow/pipeline/configure_run.py +++ b/gaworkflow/pipeline/configure_run.py @@ -8,6 +8,7 @@ import os from gaworkflow.pipeline.retrieve_config import getCombinedOptions, saveConfigFile from gaworkflow.pipeline.retrieve_config import FlowCellNotFound, WebError404 from gaworkflow.pipeline.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict +from gaworkflow.pipeline.run_status import GARunStatus from pyinotify import WatchManager, ThreadedNotifier from pyinotify import EventsCodes, ProcessEvent @@ -24,6 +25,23 @@ class ConfigInfo: self.run_path = None self.bustard_path = None self.config_filepath = None + self.status = None + + + def createStatusObject(self): + """ + Creates a status object which can be queried for + status of running the pipeline + + returns True if object created + returns False if object cannot be created + """ + if self.config_filepath is None: + return False + + self.status = GARunStatus(self.config_filepath) + return True + #################################### @@ -33,15 +51,22 @@ s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt') s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt') s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt') +s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/') +s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/') +s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/') + class RunEvent(ProcessEvent): - def __init__(self): + def __init__(self, conf_info): self.run_status_dict = {'firecrest': False, 'bustard': False, 'gerald': False} + self._ci = conf_info + ProcessEvent.__init__(self) + def process_IN_CREATE(self, event): fullpath = os.path.join(event.path, event.name) @@ -50,15 +75,35 @@ class RunEvent(ProcessEvent): if s_firecrest_finished.search(fullpath): self.run_status_dict['firecrest'] = True + self._ci.status.updateFirecrest(event.name) elif s_bustard_finished.search(fullpath): self.run_status_dict['bustard'] = True + self._ci.status.updateBustard(event.name) elif s_gerald_finished.search(fullpath): self.run_status_dict['gerald'] = True + self._ci.status.updateGerald(event.name) + + #WARNING: The following order is important!! + # Firecrest regex will catch all gerald, bustard, and firecrest + # Bustard regex will catch all gerald and bustard + # Gerald regex will catch all gerald + # So, order needs to be Gerald, Bustard, Firecrest, or this + # won't work properly. + elif s_gerald_all.search(fullpath): + self._ci.status.updateGerald(event.name) + elif s_bustard_all.search(fullpath): + self._ci.status.updateBustard(event.name) + elif s_firecrest_all.search(fullpath): + self._ci.status.updateFirecrest(event.name) - print "Create: %s" % (os.path.join(event.path, event.name)) + #print "Create: %s" % (os.path.join(event.path, event.name)) def process_IN_DELETE(self, event): - print "Remove %s" % (os.path.join(event.path, event.name)) + #print "Remove %s" % (os.path.join(event.path, event.name)) + pass + + + #FLAGS # Config Step Error @@ -452,10 +497,13 @@ def run_pipeline(conf_info): stdout_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stdout.txt') stderr_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stderr.txt') + # Create status object + conf_info.createStatusObject() + # Monitor file creation wm = WatchManager() mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE - event = RunEvent() + event = RunEvent(conf_info) notifier = ThreadedNotifier(wm, event) notifier.start() wdd = wm.add_watch(conf_info.run_path, mask, rec=True) diff --git a/gaworkflow/pipeline/monitors.py b/gaworkflow/pipeline/monitors.py new file mode 100644 index 0000000..78380fd --- /dev/null +++ b/gaworkflow/pipeline/monitors.py @@ -0,0 +1,64 @@ +import time +import threading + +###################### +# Utility functions +def _percentCompleted(completed, total): + """ + Returns precent completed as float + """ + return (completed / float(total)) * 100 + + +################################################## +# Functions to be called by Thread(target=) +def _cmdLineStatusMonitorFunc(conf_info): + """ + Given a ConfigInfo object, provides status to stdout. + + You should probably use startCmdLineStatusMonitor() + instead of ths function. + + Use with: + t = threading.Thread(target=_cmdLineStatusMonitorFunc, + args=[conf_info]) + t.setDaemon(True) + t.start() + """ + SLEEP_AMOUNT = 30 + + while 1: + if conf_info.status is None: + print "No status object yet." + time.sleep(SLEEP_AMOUNT) + continue + + fc, ft = conf_info.status.statusFirecrest() + bc, bt = conf_info.status.statusBustard() + gc, gt = conf_info.status.statusGerald() + tc, tt = conf_info.status.statusTotal() + + fp = _percentCompleted(fc, ft) + bp = _percentCompleted(bc, bt) + gp = _percentCompleted(gc, gt) + tp = _percentCompleted(tc, tt) + + print 'Firecrest: %s%% (%s/%s)' % (fp, fc, ft) + print ' Bustard: %s%% (%s/%s)' % (bp, bc, bt) + print ' Gerald: %s%% (%s/%s)' % (gp, gc, gt) + print '-----------------------' + print ' Total: %s%% (%s/%s)' % (tp, tc, tt) + print '' + + time.sleep(SLEEP_AMOUNT) + + +############################################# +# Start monitor thread convenience functions +def startCmdLineStatusMonitor(conf_info): + """ + Starts a command line status monitor given a conf_info object. + """ + t = threading.Thread(target=_cmdLineStatusMonitorFunc, args=[conf_info]) + t.setDaemon(True) + t.start() diff --git a/scripts/configure_pipeline b/scripts/configure_pipeline index b491a38..d0d8b99 100644 --- a/scripts/configure_pipeline +++ b/scripts/configure_pipeline @@ -1,21 +1,35 @@ #!/usr/bin/env python +import os import sys from gaworkflow.pipeline.configure_run import * +from gaworkflow.pipeline.monitors import startCmdLineStatusMonitor + def main(args=None): ci = ConfigInfo() + #FIXME: make a better command line tool + skip_retrieve_config = False + if len(args) == 1: + cfg_filepath = os.path.abspath(args[0]) + skip_retrieve_config = True + else: + cfg_filepath = os.path.abspath('config32auto.txt') + flowcell = 'FC12150' - cfg_filepath = 'config32auto.txt' genome_dir = '/home/king/trog_drive/' - status_retrieve_cfg = retrieve_config(ci, flowcell, cfg_filepath, genome_dir) - if status_retrieve_cfg: - print "Retrieve config file successful" + if not skip_retrieve_config: + status_retrieve_cfg = retrieve_config(ci, flowcell, cfg_filepath, genome_dir) + if status_retrieve_cfg: + print "Retrieve config file successful" + else: + print "Failed to retrieve config file" else: - print "Failed to retrieve config file" - #ci.config_filepath = 'config32bk.txt' - + print "Config file %s provided from command-line" % (cfg_filepath) + ci.config_filepath = cfg_filepath + status_retrieve_cfg = True + if status_retrieve_cfg: status = configure(ci) if status: @@ -27,6 +41,9 @@ def main(args=None): print 'Bustard Dir:', ci.bustard_path if status: + # Setup status cmdline status monitor + startCmdLineStatusMonitor(ci) + print 'Running pipeline now!' run_status = run_pipeline(ci) if run_status is True: -- 2.30.2