Might actually need the runner_s implementation!
authorBrandon King <kingb@caltech.edu>
Wed, 6 Aug 2008 18:19:06 +0000 (18:19 +0000)
committerBrandon King <kingb@caltech.edu>
Wed, 6 Aug 2008 18:19:06 +0000 (18:19 +0000)
htswdataprod/htswdataprod/automation/runner_s.py [new file with mode: 0644]

diff --git a/htswdataprod/htswdataprod/automation/runner_s.py b/htswdataprod/htswdataprod/automation/runner_s.py
new file mode 100644 (file)
index 0000000..b408d8e
--- /dev/null
@@ -0,0 +1,181 @@
+#!/usr/bin/env python
+from glob import glob
+import logging
+import os
+import re
+import sys
+import time
+import threading
+
+from benderjab import rpc
+
+#from htswdataprod.configure_run import *
+#from htswdataprod.monitors import _percentCompleted
+
+#s_fc = re.compile('FC[0-9]+')
+s_fc = re.compile('_[0-9a-zA-Z]*$')    
+
+
+class Runner(rpc.XmlRpcBot):
+    """
+    Manage running pipeline jobs.
+    """    
+    def __init__(self, section=None, configfile=None):
+        #if configfile is None:
+        #    self.configfile = "~/.gaworkflow"
+        super(Runner, self).__init__(section, configfile)
+        
+        self.cfg['notify_users'] = None
+        self.cfg['genome_dir'] = None
+        self.cfg['base_analysis_dir'] = None
+
+        #self.cfg['notify_users'] = None
+        self.cfg['notify_postanalysis'] = None
+
+        self.conf_info_dict = {}
+        
+        self.register_function(self.sequencingFinished)
+        #self.eventTasks.append(self.update)
+
+    
+    def read_config(self, section=None, configfile=None):
+        super(Runner, self).read_config(section, configfile)
+
+        self.genome_dir = self._check_required_option('genome_dir')
+        self.base_analysis_dir = self._check_required_option('base_analysis_dir')
+
+        self.notify_users = self._parse_user_list(self.cfg['notify_users'])
+        #FIXME: process notify_postpipeline cfg
+        
+    
+    def _parser(self, msg, who):
+        """
+        Parse xmpp chat messages
+        """
+        ################
+        # Help response
+        help = u"I can send [start] a run, or report [status]"
+        if re.match(u"help", msg):
+            reply = help
+            
+        ################
+        # Status response
+        elif re.match("status", msg):
+            words = msg.split()
+            #if len(words) == 2:
+            #    reply = self.getStatusReport(words[1])
+            #else:
+            #    reply = u"Status available for: %s" \
+            #            % (', '.join([k for k in self.conf_info_dict.keys()]))
+            return 'Status not implemented yet.'
+        
+        ###############
+        # Start run
+        elif re.match(u"start", msg):
+            words = msg.split()
+            if len(words) == 2:
+                self.sequencingFinished(words[1])
+                reply = u"starting run for %s" % (words[1])
+            else:
+                reply = u"need runfolder name"
+        else:
+            reply = u"I didn't understand '%s'" %(msg)
+
+        logging.debug("reply: " + str(reply))
+        return reply
+
+
+    def getStatusReport(self, fc_num):
+        """
+        Returns text status report for flow cell number 
+        """
+        
+        return 'StatusReport: Not implemented'
+    
+            
+    def sequencingFinished(self, run_dir):
+        """
+        Sequenceing (and copying) is finished, time to start pipeline
+        
+        xmlrpc call from another bot
+        """
+        logging.debug("received sequencing finished message")
+
+        self.launchJob(run_dir, flowcell, ci)
+        return "started"
+        
+        
+    def pipelineFinished(self, run_dir):
+        # need to strip off self.watch_dir from rundir I suspect.
+        logging.info("pipeline finished in" + str(run_dir))
+
+        # Notify each user that the run has finished.
+        if self.notify_users is not None:
+            for u in self.notify_users:
+                self.send(u, 'Pipeline run %s finished' % (run_dir))
+                
+        #if self.notify_runner is not None:
+        #    for r in self.notify_runner:
+        #        self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
+
+    def reportMsg(self, msg):
+
+        if self.notify_users is not None:
+            for u in self.notify_users:
+                self.send(u, msg)
+
+
+    def _runner(self, run_dir, flowcell, conf_info):
+
+        # retrieve config step
+        # example error message
+        #self.reportMsg("Retrieve config (%s): FAILED" % (run_dir))
+
+        
+        # configure step
+        if status_retrieve_cfg:
+            status = configure(conf_info)
+            if status:
+                logging.info("Runner: Configure: success")
+                self.reportMsg("Configure (%s): success" % (run_dir))
+                self.reportMsg(
+                    os.linesep.join(glob(os.path.join(run_dir,'Data','C*')))
+                )
+            else:
+                logging.error("Runner: Configure: failed")
+                self.reportMsg("Configure (%s): FAILED" % (run_dir))
+
+            #if successful, continue
+            if status:
+                # Setup status cmdline status monitor
+                #startCmdLineStatusMonitor(ci)
+                
+                # running step
+                print 'Running pipeline now!'
+                run_status = run_pipeline(conf_info)
+                if run_status is True:
+                    logging.info('Runner: Pipeline: success')
+                    self.reportMsg("Pipeline run (%s): Finished" % (run_dir,))
+                else:
+                    logging.info('Runner: Pipeline: failed')
+                    self.reportMsg("Pipeline run (%s): FAILED" % (run_dir))
+
+
+    def launchJob(self, run_dir, flowcell, conf_info):
+        """
+        Starts up a thread for running the pipeline
+        """
+        t = threading.Thread(target=self._runner,
+                        args=[run_dir, flowcell, conf_info])
+        t.setDaemon(True)
+        t.start()
+        
+
+        
+def main(args=None):
+    bot = Runner()
+    return bot.main(args)
+    
+if __name__ == "__main__":
+    sys.exit(main(sys.argv[1:]))
+