from gaworkflow.pipeline.retrieve_config import getCombinedOptions, saveConfigFile
from gaworkflow.pipeline.retrieve_config import FlowCellNotFound, WebError404
from gaworkflow.pipeline.genome_mapper import DuplicateGenome, getAvailableGenomes, constructMapperDict
+from gaworkflow.pipeline.run_status import GARunStatus
from pyinotify import WatchManager, ThreadedNotifier
from pyinotify import EventsCodes, ProcessEvent
self.run_path = None
self.bustard_path = None
self.config_filepath = None
+ self.status = None
+
+
+ def createStatusObject(self):
+ """
+ Creates a status object which can be queried for
+ status of running the pipeline
+
+ returns True if object created
+ returns False if object cannot be created
+ """
+ if self.config_filepath is None:
+ return False
+
+ self.status = GARunStatus(self.config_filepath)
+ return True
+
####################################
s_bustard_finished = re.compile('Bustard[0-9\._\-A-Za-z]+/finished.txt')
s_gerald_finished = re.compile('GERALD[0-9\._\-A-Za-z]+/finished.txt')
+s_gerald_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/GERALD[0-9\._\-A-Za-z]+/')
+s_bustard_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/Bustard[0-9\._\-A-Za-z]+/')
+s_firecrest_all = re.compile('Firecrest[0-9\._\-A-Za-z]+/')
+
class RunEvent(ProcessEvent):
- def __init__(self):
+ def __init__(self, conf_info):
self.run_status_dict = {'firecrest': False,
'bustard': False,
'gerald': False}
+ self._ci = conf_info
+
ProcessEvent.__init__(self)
+
def process_IN_CREATE(self, event):
fullpath = os.path.join(event.path, event.name)
if s_firecrest_finished.search(fullpath):
self.run_status_dict['firecrest'] = True
+ self._ci.status.updateFirecrest(event.name)
elif s_bustard_finished.search(fullpath):
self.run_status_dict['bustard'] = True
+ self._ci.status.updateBustard(event.name)
elif s_gerald_finished.search(fullpath):
self.run_status_dict['gerald'] = True
+ self._ci.status.updateGerald(event.name)
+
+ #WARNING: The following order is important!!
+ # Firecrest regex will catch all gerald, bustard, and firecrest
+ # Bustard regex will catch all gerald and bustard
+ # Gerald regex will catch all gerald
+ # So, order needs to be Gerald, Bustard, Firecrest, or this
+ # won't work properly.
+ elif s_gerald_all.search(fullpath):
+ self._ci.status.updateGerald(event.name)
+ elif s_bustard_all.search(fullpath):
+ self._ci.status.updateBustard(event.name)
+ elif s_firecrest_all.search(fullpath):
+ self._ci.status.updateFirecrest(event.name)
- print "Create: %s" % (os.path.join(event.path, event.name))
+ #print "Create: %s" % (os.path.join(event.path, event.name))
def process_IN_DELETE(self, event):
- print "Remove %s" % (os.path.join(event.path, event.name))
+ #print "Remove %s" % (os.path.join(event.path, event.name))
+ pass
+
+
+
#FLAGS
# Config Step Error
stdout_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stdout.txt')
stderr_filepath = os.path.join(conf_info.run_path, 'pipeline_run_stderr.txt')
+ # Create status object
+ conf_info.createStatusObject()
+
# Monitor file creation
wm = WatchManager()
mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE
- event = RunEvent()
+ event = RunEvent(conf_info)
notifier = ThreadedNotifier(wm, event)
notifier.start()
wdd = wm.add_watch(conf_info.run_path, mask, rec=True)
--- /dev/null
+import time
+import threading
+
+######################
+# Utility functions
+def _percentCompleted(completed, total):
+ """
+ Returns precent completed as float
+ """
+ return (completed / float(total)) * 100
+
+
+##################################################
+# Functions to be called by Thread(target=<func>)
+def _cmdLineStatusMonitorFunc(conf_info):
+ """
+ Given a ConfigInfo object, provides status to stdout.
+
+ You should probably use startCmdLineStatusMonitor()
+ instead of ths function.
+
+ Use with:
+ t = threading.Thread(target=_cmdLineStatusMonitorFunc,
+ args=[conf_info])
+ t.setDaemon(True)
+ t.start()
+ """
+ SLEEP_AMOUNT = 30
+
+ while 1:
+ if conf_info.status is None:
+ print "No status object yet."
+ time.sleep(SLEEP_AMOUNT)
+ continue
+
+ fc, ft = conf_info.status.statusFirecrest()
+ bc, bt = conf_info.status.statusBustard()
+ gc, gt = conf_info.status.statusGerald()
+ tc, tt = conf_info.status.statusTotal()
+
+ fp = _percentCompleted(fc, ft)
+ bp = _percentCompleted(bc, bt)
+ gp = _percentCompleted(gc, gt)
+ tp = _percentCompleted(tc, tt)
+
+ print 'Firecrest: %s%% (%s/%s)' % (fp, fc, ft)
+ print ' Bustard: %s%% (%s/%s)' % (bp, bc, bt)
+ print ' Gerald: %s%% (%s/%s)' % (gp, gc, gt)
+ print '-----------------------'
+ print ' Total: %s%% (%s/%s)' % (tp, tc, tt)
+ print ''
+
+ time.sleep(SLEEP_AMOUNT)
+
+
+#############################################
+# Start monitor thread convenience functions
+def startCmdLineStatusMonitor(conf_info):
+ """
+ Starts a command line status monitor given a conf_info object.
+ """
+ t = threading.Thread(target=_cmdLineStatusMonitorFunc, args=[conf_info])
+ t.setDaemon(True)
+ t.start()
#!/usr/bin/env python
+import os
import sys
from gaworkflow.pipeline.configure_run import *
+from gaworkflow.pipeline.monitors import startCmdLineStatusMonitor
+
def main(args=None):
ci = ConfigInfo()
+ #FIXME: make a better command line tool
+ skip_retrieve_config = False
+ if len(args) == 1:
+ cfg_filepath = os.path.abspath(args[0])
+ skip_retrieve_config = True
+ else:
+ cfg_filepath = os.path.abspath('config32auto.txt')
+
flowcell = 'FC12150'
- cfg_filepath = 'config32auto.txt'
genome_dir = '/home/king/trog_drive/'
- status_retrieve_cfg = retrieve_config(ci, flowcell, cfg_filepath, genome_dir)
- if status_retrieve_cfg:
- print "Retrieve config file successful"
+ if not skip_retrieve_config:
+ status_retrieve_cfg = retrieve_config(ci, flowcell, cfg_filepath, genome_dir)
+ if status_retrieve_cfg:
+ print "Retrieve config file successful"
+ else:
+ print "Failed to retrieve config file"
else:
- print "Failed to retrieve config file"
- #ci.config_filepath = 'config32bk.txt'
-
+ print "Config file %s provided from command-line" % (cfg_filepath)
+ ci.config_filepath = cfg_filepath
+ status_retrieve_cfg = True
+
if status_retrieve_cfg:
status = configure(ci)
if status:
print 'Bustard Dir:', ci.bustard_path
if status:
+ # Setup status cmdline status monitor
+ startCmdLineStatusMonitor(ci)
+
print 'Running pipeline now!'
run_status = run_pipeline(ci)
if run_status is True: