Runner now notifies users about success or failure of the following steps:
authorBrandon King <kingb@caltech.edu>
Fri, 4 Jan 2008 22:41:50 +0000 (22:41 +0000)
committerBrandon King <kingb@caltech.edu>
Fri, 4 Jan 2008 22:41:50 +0000 (22:41 +0000)
 * Retrieve Config
 * Configure Pipeline
 * Run Pipeline

gaworkflow/runner.py

index d5bde94eff90d0bb9f41ab1984b544c610ff4a52..c1d4ba70b7f6a172513b3644d716eb6386373cbe 100644 (file)
@@ -43,6 +43,9 @@ class Runner(rpc.XmlRpcBot):
         self.cfg['genome_dir'] = None
         self.cfg['base_analysis_dir'] = None
 
+        self.cfg['notify_users'] = None
+        self.cfg['notify_postanalysis'] = None
+
         self.conf_info_dict = {}
         
         self.register_function(self.sequencingFinished)
@@ -54,6 +57,9 @@ class Runner(rpc.XmlRpcBot):
 
         self.genome_dir = self._check_required_option('genome_dir')
         self.base_analysis_dir = self._check_required_option('base_analysis_dir')
+
+        self.notify_users = self._parse_user_list(self.cfg['notify_users'])
+        #FIXME: process notify_postpipeline cfg
         
     
     def _parser(self, msg, who):
@@ -123,13 +129,22 @@ class Runner(rpc.XmlRpcBot):
         #    pattern += os.path.sep
         #stripped_run_dir = re.sub(pattern, "", run_dir)
         #logging.debug("stripped to " + stripped_run_dir)
-        #if self.notify_users is not None:
-        #    for u in self.notify_users:
-        #        self.send(u, 'Sequencing run %s finished' % #(stripped_run_dir))
+
+        # Notify each user that the run has finished.
+        if self.notify_users is not None:
+            for u in self.notify_users:
+                self.send(u, 'Pipeline run %s finished' % (run_dir))
+                
         #if self.notify_runner is not None:
         #    for r in self.notify_runner:
         #        self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
 
+    def reportMsg(self, msg):
+
+        if self.notify_users is not None:
+            for u in self.notify_users:
+                self.send(u, msg)
+
 
     def _runner(self, run_dir, flowcell, conf_info):
 
@@ -142,8 +157,10 @@ class Runner(rpc.XmlRpcBot):
                                           self.genome_dir)
         if status_retrieve_cfg:
             logging.info("Runner: Retrieve config: success")
+            self.reportMsg("Retrieve config (%s): success" % (run_dir))
         else:
             logging.error("Runner: Retrieve config: failed")
+            self.reportMsg("Retrieve config (%s): FAILED" % (run_dir))
 
         
         # configure step
@@ -151,8 +168,10 @@ class Runner(rpc.XmlRpcBot):
             status = configure(conf_info)
             if status:
                 logging.info("Runner: Configure: success")
+                self.reportMsg("Configure (%s): success" % (run_dir))
             else:
                 logging.error("Runner: Configure: failed")
+                self.reportMsg("Configure (%s): FAILED" % (run_dir))
 
             #if successful, continue
             if status:
@@ -167,6 +186,7 @@ class Runner(rpc.XmlRpcBot):
                     self.piplineFinished(run_dir)
                 else:
                     logging.info('Runner: Pipeline: failed')
+                    self.reportMsg("Pipeline run (%s): FAILED" % (run_dir))
 
 
     def launchJob(self, run_dir, flowcell, conf_info):