[project @ gaworkflow.runner progress]
authorBrandon King <kingb@caltech.edu>
Fri, 14 Dec 2007 20:56:30 +0000 (20:56 +0000)
committerBrandon King <kingb@caltech.edu>
Fri, 14 Dec 2007 20:56:30 +0000 (20:56 +0000)
 * Work towards getting runner to launch jobs in a seperate thread.
 * Stores status in a dictionary.

gaworkflow/runner.py

index 40eae640c277e9eac66bf28c2b4accececd3858c..40aa9c86d16e80640b1062b5322d747eaaf584e0 100644 (file)
@@ -4,9 +4,33 @@ import os
 import re
 import sys
 import time
+import threading
 
 from benderjab import rpc
 
+from gaworkflow.pipeline.configure_run import *
+from gaworkflow.pipeline.monitors import startCmdLineStatusMonitor
+
+s_fc = re.compile('FC[0-9]+')
+
+
+def _get_flowcell_from_rundir(run_dir):
+    """
+    Returns flowcell string based on run_dir.
+    Returns None and logs error if flowcell can't be found.
+    """
+    junk, dirname = os.path.split(run_dir)
+    mo = s_fc.search(dirname)
+    if not mo:
+        logging.error('RunDir 2 FlowCell error: %s' % (run_dir))
+        return None
+
+    return dirname[mo.start():]
+    
+
+def _runner():
+
+
 class Runner(rpc.XmlRpcBot):
     """
     Manage running pipeline jobs.
@@ -17,12 +41,18 @@ class Runner(rpc.XmlRpcBot):
         super(Runner, self).__init__(section, configfile)
         
         self.cfg['notify_users'] = None
+        self.cfg['genome_dir'] = None
+
+        self.conf_info_dict = {}
         
         self.register_function(self.sequencingFinished)
         self.eventTasks.append(self.update)
     
     def read_config(self, section=None, configfile=None):
         super(Runner, self).read_config(section, configfile)
+
+        self.genome_dir = self._check_required_option('genome_dir')
+        
     
     def _parser(self, msg, who):
         """
@@ -61,6 +91,22 @@ class Runner(rpc.XmlRpcBot):
         Sequenceing (and copying) is finished, time to start pipeline
         """
         logging.debug("received sequencing finished message")
+
+        # Setup config info object
+        ci = ConfigInfo()
+        ci.run_path = run_dir
+
+        # get flowcell from run_dir name
+        flowcell = _get_flowcell_from_rundir(run_dir)
+
+        # Store ci object in dictionary
+        self.conf_info_dict[flowcell] = ci
+
+
+        # Launch the job in it's own thread and turn.
+        self.launchJob(run_dir, flowcell, ci)
+        
+
         
     def pipelineFinished(self, run_dir):
         # need to strip off self.watch_dir from rundir I suspect.
@@ -76,6 +122,54 @@ class Runner(rpc.XmlRpcBot):
         #if self.notify_runner is not None:
         #    for r in self.notify_runner:
         #        self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
+
+
+    def _runner(self, run_dir, flowcell, conf_info):
+        # retrieve config step
+        cfg_filepath = os.path.abspath('config32auto.txt')
+        status_retrieve_cfg = retrieve_config(ci,
+                                          flowcell,
+                                          cfg_filepath,
+                                          self.genome_dir)
+        if status_retrieve_cfg:
+            logging.info("Runner: Retrieve config: success")
+        else:
+            logging.error("Runner: Retrieve config: failed")
+
+        
+        # configure step
+        if status_retrieve_cfg:
+            status = configure(ci)
+            if status:
+                logging.info("Runner: Configure: success")
+            else:
+                logging.error("Runner: Configure: failed")
+
+            #if successful, continue
+            if status:
+                # Setup status cmdline status monitor
+                #startCmdLineStatusMonitor(ci)
+                
+                # running step
+                print 'Running pipeline now!'
+                run_status = run_pipeline(ci)
+                if run_status is True:
+                    logging.info('Runner: Pipeline: success')
+                    self.piplineFinished(run_dir)
+                else:
+                    logging.info('Runner: Pipeline: failed')
+
+
+    def launchJob(self, run_dir, flowcell, conf_info):
+        """
+        Starts up a thread for running the pipeline
+        """
+        t = threading.Thread(target=self._runner,
+                        args=[run_dir, flowcell, conf_info])
+        t.setDaemon(True)
+        t.start()
+        
+
         
 def main(args=None):
     bot = Runner()
@@ -83,4 +177,4 @@ def main(args=None):
     
 if __name__ == "__main__":
     sys.exit(main(sys.argv[1:]))
-    
\ No newline at end of file
+