[project @ Monitor status implementation + config_pipeline cmdling args]
authorBrandon King <kingb@caltech.edu>
Wed, 21 Nov 2007 20:53:23 +0000 (20:53 +0000)
committerBrandon King <kingb@caltech.edu>
Wed, 21 Nov 2007 20:53:23 +0000 (20:53 +0000)
  * configure_pipeline now takes an optional command line
    argument of an eland config file to use. (Overrides
    automatic download).
  * Added monitors.py which contains methods providing a way
    of triggering some sort of threaded monitor of pipeline
    progress.
    *  startCmdLineStatusMonitor(conf_info) prints status
       to stdout
  * Updated configure_pipeline script to use the
    startCmdLineStatusMonitor function.
  * ConfigInfo object now holds a status variable
    (GARunStatus object)
    * requires calling conf_info.createStatusObject() after
      _cfg_filepath has been set (currently handled by
      run_pipeline function)

gaworkflow/pipeline/configure_run.py
gaworkflow/pipeline/monitors.py [new file with mode: 0644]
scripts/configure_pipeline

index f71bfc543ace446d5729dc22776f5c3b3441b02f..19e242615b0de6b1b4f71fda2fad5389675011a9 100644 (file)
@@ -8,6 +8,7 @@ import os
 from gaworkflow.pipeline.retrieve_config import getCombinedOptions, saveConfigFile
 from gaworkflow.pipeline.retrieve_config import FlowCellNotFound, WebError404
 from gaworkflow.pipeline.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
+from gaworkflow.pipeline.run_status import GARunStatus
 
 from pyinotify import WatchManager, ThreadedNotifier
 from pyinotify import EventsCodes, ProcessEvent
@@ -24,6 +25,23 @@ class ConfigInfo:
     self.run_path = None
     self.bustard_path = None
     self.config_filepath = None
+    self.status = None
+
+
+  def createStatusObject(self):
+    """
+    Creates a status object which can be queried for
+    status of running the pipeline
+
+    returns True if object created
+    returns False if object cannot be created
+    """
+    if self.config_filepath is None:
+      return False
+
+    self.status = GARunStatus(self.config_filepath)
+    return True
+
 
 
 ####################################
@@ -33,15 +51,22 @@ s_firecrest_finished = re.compile('Firecrest[0-9\._\-A-Za-z]+/finished.txt')
 s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
 s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
 
+s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/')
+s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/')
+s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/')
+
 class RunEvent(ProcessEvent):
 
-  def __init__(self):
+  def __init__(self, conf_info):
 
     self.run_status_dict = {'firecrest': False,
                             'bustard': False,
                             'gerald': False}
 
+    self._ci = conf_info
+
     ProcessEvent.__init__(self)
+    
 
   def process_IN_CREATE(self, event):
     fullpath = os.path.join(event.path, event.name)
@@ -50,15 +75,35 @@ class RunEvent(ProcessEvent):
 
       if s_firecrest_finished.search(fullpath):
         self.run_status_dict['firecrest'] = True
+        self._ci.status.updateFirecrest(event.name)
       elif s_bustard_finished.search(fullpath):
         self.run_status_dict['bustard'] = True
+        self._ci.status.updateBustard(event.name)
       elif s_gerald_finished.search(fullpath):
         self.run_status_dict['gerald'] = True
+        self._ci.status.updateGerald(event.name)
+
+    #WARNING: The following order is important!!
+    # Firecrest regex will catch all gerald, bustard, and firecrest
+    # Bustard regex will catch all gerald and bustard
+    # Gerald regex will catch all gerald
+    # So, order needs to be Gerald, Bustard, Firecrest, or this
+    #  won't work properly.
+    elif s_gerald_all.search(fullpath):
+      self._ci.status.updateGerald(event.name)
+    elif s_bustard_all.search(fullpath):
+      self._ci.status.updateBustard(event.name)
+    elif s_firecrest_all.search(fullpath):
+      self._ci.status.updateFirecrest(event.name)
       
-    print "Create: %s" % (os.path.join(event.path, event.name))
+    #print "Create: %s" % (os.path.join(event.path, event.name))
 
   def process_IN_DELETE(self, event):
-    print "Remove %s" % (os.path.join(event.path, event.name))
+    #print "Remove %s" % (os.path.join(event.path, event.name))
+    pass
+
+
+
 
 #FLAGS
 # Config Step Error
@@ -452,10 +497,13 @@ def run_pipeline(conf_info):
   stdout_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stdout.txt')
   stderr_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stderr.txt')
 
+  # Create status object
+  conf_info.createStatusObject()
+
   # Monitor file creation
   wm = WatchManager()
   mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
-  event = RunEvent()
+  event = RunEvent(conf_info)
   notifier = ThreadedNotifier(wm, event)
   notifier.start()
   wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
diff --git a/gaworkflow/pipeline/monitors.py b/gaworkflow/pipeline/monitors.py
new file mode 100644 (file)
index 0000000..78380fd
--- /dev/null
@@ -0,0 +1,64 @@
+import time
+import threading
+
+######################
+# Utility functions
+def _percentCompleted(completed, total):
+  """
+  Returns precent completed as float
+  """
+  return (completed / float(total)) * 100
+
+
+##################################################
+# Functions to be called by Thread(target=<func>)
+def _cmdLineStatusMonitorFunc(conf_info):
+  """
+  Given a ConfigInfo object, provides status to stdout.
+
+  You should probably use startCmdLineStatusMonitor()
+  instead of ths function.
+
+  Use with:
+    t = threading.Thread(target=_cmdLineStatusMonitorFunc,
+                         args=[conf_info])
+    t.setDaemon(True)
+    t.start()
+  """
+  SLEEP_AMOUNT = 30
+
+  while 1:
+    if conf_info.status is None:
+      print "No status object yet."
+      time.sleep(SLEEP_AMOUNT)
+      continue
+    
+    fc, ft = conf_info.status.statusFirecrest()
+    bc, bt = conf_info.status.statusBustard()
+    gc, gt = conf_info.status.statusGerald()
+    tc, tt = conf_info.status.statusTotal()
+    
+    fp = _percentCompleted(fc, ft)
+    bp = _percentCompleted(bc, bt)
+    gp = _percentCompleted(gc, gt)
+    tp = _percentCompleted(tc, tt)
+    
+    print 'Firecrest: %s%% (%s/%s)' % (fp, fc, ft)
+    print '  Bustard: %s%% (%s/%s)' % (bp, bc, bt)
+    print '   Gerald: %s%% (%s/%s)' % (gp, gc, gt)
+    print '-----------------------'
+    print '    Total: %s%% (%s/%s)' % (tp, tc, tt)
+    print ''
+
+    time.sleep(SLEEP_AMOUNT)
+
+
+#############################################
+# Start monitor thread convenience functions
+def startCmdLineStatusMonitor(conf_info):
+  """
+  Starts a command line status monitor given a conf_info object.
+  """
+  t = threading.Thread(target=_cmdLineStatusMonitorFunc, args=[conf_info])
+  t.setDaemon(True)
+  t.start()
index b491a38eeee3e46768820ce761cdd259d17ac1ee..d0d8b99af6cb36e37bedbb8eba746444e7220064 100644 (file)
@@ -1,21 +1,35 @@
 #!/usr/bin/env python
+import os
 import sys
 from gaworkflow.pipeline.configure_run import *
+from gaworkflow.pipeline.monitors import startCmdLineStatusMonitor
+
 
 def main(args=None):
   ci = ConfigInfo()
 
+  #FIXME: make a better command line tool
+  skip_retrieve_config = False
+  if len(args) == 1:
+    cfg_filepath = os.path.abspath(args[0])
+    skip_retrieve_config = True
+  else:
+    cfg_filepath = os.path.abspath('config32auto.txt')
+
   flowcell = 'FC12150'
-  cfg_filepath = 'config32auto.txt'
   genome_dir = '/home/king/trog_drive/'
 
-  status_retrieve_cfg = retrieve_config(ci, flowcell, cfg_filepath, genome_dir)
-  if status_retrieve_cfg:
-    print "Retrieve config file successful"
+  if not skip_retrieve_config:
+    status_retrieve_cfg = retrieve_config(ci, flowcell, cfg_filepath, genome_dir)
+    if status_retrieve_cfg:
+      print "Retrieve config file successful"
+    else:
+      print "Failed to retrieve config file"
   else:
-    print "Failed to retrieve config file"
-  #ci.config_filepath = 'config32bk.txt'
-
+    print "Config file %s provided from command-line" % (cfg_filepath)
+    ci.config_filepath = cfg_filepath
+    status_retrieve_cfg = True
+  
   if status_retrieve_cfg:
     status = configure(ci)
     if status:
@@ -27,6 +41,9 @@ def main(args=None):
     print 'Bustard Dir:', ci.bustard_path
     
     if status:
+      # Setup status cmdline status monitor
+      startCmdLineStatusMonitor(ci)
+      
       print 'Running pipeline now!'
       run_status = run_pipeline(ci)
       if run_status is True: