move the autotmation scripts into gaworkflow.automation
authorDiane Trout <diane@caltech.edu>
Tue, 15 Jan 2008 01:07:48 +0000 (01:07 +0000)
committerDiane Trout <diane@caltech.edu>
Tue, 15 Jan 2008 01:07:48 +0000 (01:07 +0000)
the scripts that were providing the tools to automate running
the solexa pipeline were unfairly "priviledged" compared to the
components that wrapped talking to the pipeline commands and
providing the website, in that the other components were in
sub-packages while the automation was just in the gaworkflow
package.

So I moved them into the somewhat clearer "gaworkflow.automation".

The intent is that gaworkflow.automation contains modules
that make things happen without human intervention.

gaworkflow/automation/__init__.py [new file with mode: 0644]
gaworkflow/automation/copier.py [new file with mode: 0644]
gaworkflow/automation/runner.py [new file with mode: 0644]
gaworkflow/automation/spoolwatcher.py [new file with mode: 0644]
gaworkflow/copier.py [deleted file]
gaworkflow/runner.py [deleted file]
gaworkflow/spoolwatcher.py [deleted file]
scripts/copier
scripts/runner
scripts/spoolwatcher

diff --git a/gaworkflow/automation/__init__.py b/gaworkflow/automation/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/gaworkflow/automation/copier.py b/gaworkflow/automation/copier.py
new file mode 100644 (file)
index 0000000..2e0d3ae
--- /dev/null
@@ -0,0 +1,245 @@
+import ConfigParser
+import copy
+import logging
+import logging.handlers
+import os
+import re
+import subprocess
+import sys
+import time
+import traceback
+
+from benderjab import rpc
+
+def runfolder_validate(fname):
+    """
+    Return True if fname looks like a runfolder name
+    """
+    if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
+        return True
+    else:
+        return False
+    
+class rsync(object):
+  def __init__(self, source, dest, pwfile):
+    self.pwfile = os.path.expanduser(pwfile)
+    self.cmd = ['/usr/bin/rsync', ]
+    self.cmd.append('--password-file=%s' % (self.pwfile))
+    self.source_base = source
+    self.dest_base = dest
+    self.processes = {}
+    self.exit_code = None
+
+  def list(self):
+    """Get a directory listing"""
+    args = copy.copy(self.cmd)
+    args.append(self.source_base)
+
+    logging.debug("Rsync cmd:" + " ".join(args))
+    short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
+    return self.list_filter(short_process.stdout)
+
+  def list_filter(self, lines):
+    """
+    parse rsync directory listing
+    """
+    dirs_to_copy = []
+    direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ]
+    for permissions, size, filedate, filetime, filename in direntries:
+      if permissions[0] == 'd':
+        # hey its a directory, the first step to being something we want to 
+        # copy
+        if re.match("[0-9]{6}", filename):
+          # it starts with something that looks like a 6 digit date
+          # aka good enough for me
+          dirs_to_copy.append(filename)
+    return dirs_to_copy
+
+  def create_copy_process(self, dirname):
+    args = copy.copy(self.cmd)
+    # we want to copy everything
+    args.append('-rlt') 
+    # from here
+    args.append(os.path.join(self.source_base, dirname))
+    # to here
+    args.append(self.dest_base)
+    logging.debug("Rsync cmd:" + " ".join(args))
+    return subprocess.Popen(args)
+  def copy(self):
+    """
+    copy any interesting looking directories over
+    return list of items that we started copying.
+    """
+    # clean up any lingering non-running processes
+    self.poll()
+    
+    # what's available to copy?
+    dirs_to_copy = self.list()
+    
+    # lets start copying
+    started = []
+    for d in dirs_to_copy:
+      process = self.processes.get(d, None)
+      
+      if process is None:
+        # we don't have a process, so make one
+        logging.info("rsyncing %s" % (d))
+        self.processes[d] = self.create_copy_process(d)
+        started.append(d)           
+    return started
+      
+  def poll(self):
+      """
+      check currently running processes to see if they're done
+      
+      return path roots that have finished.
+      """
+      for dir_key, proc_value in self.processes.items():
+          retcode = proc_value.poll()
+          if retcode is None:
+              # process hasn't finished yet
+              pass
+          elif retcode == 0:
+              logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
+              del self.processes[dir_key]
+          else:
+              logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
+              
+  def __len__(self):
+      """
+      Return how many active rsync processes we currently have
+      
+      Call poll first to close finished processes.
+      """
+      return len(self.processes)
+  
+  def keys(self):
+      """
+      Return list of current run folder names
+      """
+      return self.processes.keys()
+
+class CopierBot(rpc.XmlRpcBot):
+    def __init__(self, section=None, configfile=None):
+        #if configfile is None:
+        #    configfile = '~/.gaworkflow'
+            
+        super(CopierBot, self).__init__(section, configfile)
+        
+        # options for rsync command
+        self.cfg['rsync_password_file'] = None
+        self.cfg['rsync_source'] = None
+        self.cfg['rsync_destination'] = None 
+        
+        # options for reporting we're done 
+        self.cfg['notify_users'] = None
+        self.cfg['notify_runner'] = None
+                            
+        self.pending = []
+        self.rsync = None
+        self.notify_users = None
+        self.notify_runner = None
+        
+        self.register_function(self.startCopy)
+        self.register_function(self.sequencingFinished)
+        self.eventTasks.append(self.update)
+        
+    def read_config(self, section=None, configfile=None):
+        """
+        read the config file
+        """
+        super(CopierBot, self).read_config(section, configfile)
+        
+        password = self._check_required_option('rsync_password_file')
+        source = self._check_required_option('rsync_source')
+        destination = self._check_required_option('rsync_destination')
+        self.rsync = rsync(source, destination, password)
+        
+        self.notify_users = self._parse_user_list(self.cfg['notify_users'])
+        try:
+          self.notify_runner = \
+             self._parse_user_list(self.cfg['notify_runner'],
+                                   require_resource=True)
+        except bot.JIDMissingResource:
+            msg = 'need a full jabber ID + resource for xml-rpc destinations'
+            logging.FATAL(msg)
+            raise bot.JIDMissingResource(msg)
+
+    def startCopy(self, *args):
+        """
+        start our copy
+        """
+        logging.info("starting copy scan")
+        started = self.rsync.copy()
+        logging.info("copying:" + " ".join(started)+".")
+        return started
+        
+    def sequencingFinished(self, runDir, *args):
+        """
+        The run was finished, if we're done copying, pass the message on        
+        """
+        # close any open processes
+        self.rsync.poll()
+        
+        # see if we're still copying
+        if runfolder_validate(runDir):
+            logging.info("recevied sequencing finshed for %s" % (runDir))
+            self.pending.append(runDir)
+            self.startCopy()
+            return "PENDING"
+        else:
+            errmsg = "received bad runfolder name (%s)" % (runDir)
+            logging.warning(errmsg)
+            # maybe I should use a different error message
+            raise RuntimeError(errmsg)
+    
+    def reportSequencingFinished(self, runDir):
+        """
+        Send the sequencingFinished message to the interested parties
+        """
+        if self.notify_users is not None:
+            for u in self.notify_users:
+                self.send(u, 'Sequencing run %s finished' % (runDir))
+        if self.notify_runner is not None:
+            for r in self.notify_runner:
+                self.rpc_send(r, (runDir,), 'sequencingFinished')
+        logging.info("forwarding sequencingFinshed message for %s" % (runDir))
+        
+    def update(self, *args):
+        """
+        Update our current status.
+        Report if we've finished copying files.
+        """
+        self.rsync.poll()
+        for p in self.pending:
+            if p not in self.rsync.keys():
+                self.reportSequencingFinished(p)
+                self.pending.remove(p)
+        
+    def _parser(self, msg, who):
+        """
+        Parse xmpp chat messages
+        """
+        help = u"I can [copy], or report current [status]"
+        if re.match(u"help", msg):
+            reply = help
+        elif re.match("copy", msg):            
+            started = self.startCopy()
+            reply = u"started copying " + ", ".join(started)
+        elif re.match(u"status", msg):
+            msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
+            for d in self.rsync.keys():
+              msg.append(u"  " + d)
+            reply = os.linesep.join(msg)
+        else:
+            reply = u"I didn't understand '%s'" % (unicode(msg))
+        return reply
+
+def main(args=None):
+    bot = CopierBot()
+    bot.main(args)
+    
+if __name__ == "__main__":
+  sys.exit(main(sys.argv[1:]))
+
diff --git a/gaworkflow/automation/runner.py b/gaworkflow/automation/runner.py
new file mode 100644 (file)
index 0000000..f81b682
--- /dev/null
@@ -0,0 +1,236 @@
+#!/usr/bin/env python
+import logging
+import os
+import re
+import sys
+import time
+import threading
+
+from benderjab import rpc
+
+from gaworkflow.pipeline.configure_run import *
+from gaworkflow.pipeline.monitors import _percentCompleted
+
+#s_fc = re.compile('FC[0-9]+')
+s_fc = re.compile('_[0-9a-zA-Z]*$')
+
+
+def _get_flowcell_from_rundir(run_dir):
+    """
+    Returns flowcell string based on run_dir.
+    Returns None and logs error if flowcell can't be found.
+    """
+    junk, dirname = os.path.split(run_dir)
+    mo = s_fc.search(dirname)
+    if not mo:
+        logging.error('RunDir 2 FlowCell error: %s' % (run_dir))
+        return None
+
+    return dirname[mo.start()+1:]
+    
+
+
+class Runner(rpc.XmlRpcBot):
+    """
+    Manage running pipeline jobs.
+    """    
+    def __init__(self, section=None, configfile=None):
+        #if configfile is None:
+        #    self.configfile = "~/.gaworkflow"
+        super(Runner, self).__init__(section, configfile)
+        
+        self.cfg['notify_users'] = None
+        self.cfg['genome_dir'] = None
+        self.cfg['base_analysis_dir'] = None
+
+        self.cfg['notify_users'] = None
+        self.cfg['notify_postanalysis'] = None
+
+        self.conf_info_dict = {}
+        
+        self.register_function(self.sequencingFinished)
+        #self.eventTasks.append(self.update)
+
+    
+    def read_config(self, section=None, configfile=None):
+        super(Runner, self).read_config(section, configfile)
+
+        self.genome_dir = self._check_required_option('genome_dir')
+        self.base_analysis_dir = self._check_required_option('base_analysis_dir')
+
+        self.notify_users = self._parse_user_list(self.cfg['notify_users'])
+        #FIXME: process notify_postpipeline cfg
+        
+    
+    def _parser(self, msg, who):
+        """
+        Parse xmpp chat messages
+        """
+        help = u"I can send [start] a run, or report [status]"
+        if re.match(u"help", msg):
+            reply = help
+        elif re.match("status", msg):
+            words = msg.split()
+            if len(words) == 2:
+                reply = self.getStatusReport(words[1])
+            else:
+                reply = u"Status available for: %s" \
+                        % (', '.join([k for k in self.conf_info_dict.keys()]))
+        elif re.match(u"start", msg):
+            words = msg.split()
+            if len(words) == 2:
+                self.sequencingFinished(words[1])
+                reply = u"starting run for %s" % (words[1])
+            else:
+                reply = u"need runfolder name"
+        else:
+            reply = u"I didn't understand '%s'" %(msg)
+
+        logging.debug("reply: " + str(reply))
+        return reply
+
+
+    def getStatusReport(self, fc_num):
+        """
+        Returns text status report for flow cell number 
+        """
+        if fc_num not in self.conf_info_dict:
+            return "No record of a %s run." % (fc_num)
+
+        status = self.conf_info_dict[fc_num].status
+
+        if status is None:
+            return "No status information for %s yet." \
+                   " Probably still in configure step. Try again later." % (fc_num)
+
+        fc,ft = status.statusFirecrest()
+        bc,bt = status.statusBustard()
+        gc,gt = status.statusGerald()
+
+        tc,tt = status.statusTotal()
+
+        fp = _percentCompleted(fc, ft)
+        bp = _percentCompleted(bc, bt)
+        gp = _percentCompleted(gc, gt)
+        tp = _percentCompleted(tc, tt)
+
+        output = []
+
+        output.append(u'Firecrest: %s%% (%s/%s)' % (fp, fc, ft))
+        output.append(u'  Bustard: %s%% (%s/%s)' % (bp, bc, bt))
+        output.append(u'   Gerald: %s%% (%s/%s)' % (gp, gc, gt))
+        output.append(u'-----------------------')
+        output.append(u'    Total: %s%% (%s/%s)' % (tp, tc, tt))
+
+        return '\n'.join(output)
+    
+            
+    def sequencingFinished(self, run_dir):
+        """
+        Sequenceing (and copying) is finished, time to start pipeline
+        """
+        logging.debug("received sequencing finished message")
+
+        # Setup config info object
+        ci = ConfigInfo()
+        ci.base_analysis_dir = self.base_analysis_dir
+        ci.analysis_dir = os.path.join(self.base_analysis_dir, run_dir)        
+
+        # get flowcell from run_dir name
+        flowcell = _get_flowcell_from_rundir(run_dir)
+
+        # Store ci object in dictionary
+        self.conf_info_dict[flowcell] = ci
+
+
+        # Launch the job in it's own thread and turn.
+        self.launchJob(run_dir, flowcell, ci)
+        return "started"
+        
+        
+    def pipelineFinished(self, run_dir):
+        # need to strip off self.watch_dir from rundir I suspect.
+        logging.info("pipeline finished in" + str(run_dir))
+        #pattern = self.watch_dir
+        #if pattern[-1] != os.path.sep:
+        #    pattern += os.path.sep
+        #stripped_run_dir = re.sub(pattern, "", run_dir)
+        #logging.debug("stripped to " + stripped_run_dir)
+
+        # Notify each user that the run has finished.
+        if self.notify_users is not None:
+            for u in self.notify_users:
+                self.send(u, 'Pipeline run %s finished' % (run_dir))
+                
+        #if self.notify_runner is not None:
+        #    for r in self.notify_runner:
+        #        self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
+
+    def reportMsg(self, msg):
+
+        if self.notify_users is not None:
+            for u in self.notify_users:
+                self.send(u, msg)
+
+
+    def _runner(self, run_dir, flowcell, conf_info):
+
+        # retrieve config step
+        cfg_filepath = os.path.join(conf_info.analysis_dir,
+                                    'config32auto.txt')
+        status_retrieve_cfg = retrieve_config(conf_info,
+                                          flowcell,
+                                          cfg_filepath,
+                                          self.genome_dir)
+        if status_retrieve_cfg:
+            logging.info("Runner: Retrieve config: success")
+            self.reportMsg("Retrieve config (%s): success" % (run_dir))
+        else:
+            logging.error("Runner: Retrieve config: failed")
+            self.reportMsg("Retrieve config (%s): FAILED" % (run_dir))
+
+        
+        # configure step
+        if status_retrieve_cfg:
+            status = configure(conf_info)
+            if status:
+                logging.info("Runner: Configure: success")
+                self.reportMsg("Configure (%s): success" % (run_dir))
+            else:
+                logging.error("Runner: Configure: failed")
+                self.reportMsg("Configure (%s): FAILED" % (run_dir))
+
+            #if successful, continue
+            if status:
+                # Setup status cmdline status monitor
+                #startCmdLineStatusMonitor(ci)
+                
+                # running step
+                print 'Running pipeline now!'
+                run_status = run_pipeline(conf_info)
+                if run_status is True:
+                    logging.info('Runner: Pipeline: success')
+                    self.piplineFinished(run_dir)
+                else:
+                    logging.info('Runner: Pipeline: failed')
+                    self.reportMsg("Pipeline run (%s): FAILED" % (run_dir))
+
+
+    def launchJob(self, run_dir, flowcell, conf_info):
+        """
+        Starts up a thread for running the pipeline
+        """
+        t = threading.Thread(target=self._runner,
+                        args=[run_dir, flowcell, conf_info])
+        t.setDaemon(True)
+        t.start()
+        
+
+        
+def main(args=None):
+    bot = Runner()
+    return bot.main(args)
+    
+if __name__ == "__main__":
+    sys.exit(main(sys.argv[1:]))
+    
diff --git a/gaworkflow/automation/spoolwatcher.py b/gaworkflow/automation/spoolwatcher.py
new file mode 100644 (file)
index 0000000..ba566cb
--- /dev/null
@@ -0,0 +1,196 @@
+#!/usr/bin/env python
+import logging
+import os
+import re
+import sys
+import time
+
+# this uses pyinotify
+import pyinotify
+from pyinotify import EventsCodes
+
+from benderjab import rpc
+
+
+class Handler(pyinotify.ProcessEvent):
+    def __init__(self, watchmanager, bot):
+        self.last_event_time = None
+        self.watchmanager = watchmanager
+        self.bot = bot
+
+    def process_IN_CREATE(self, event):
+        self.last_event_time = time.time()
+        msg = "Create: %s" %  os.path.join(event.path, event.name)
+        if event.name.lower() == "run.completed":
+            try:
+                self.bot.sequencingFinished(event.path)
+            except IOError, e:
+                logging.error("Couldn't send sequencingFinished")
+        logging.debug(msg)
+
+    def process_IN_DELETE(self, event):
+        logging.debug("Remove: %s" %  os.path.join(event.path, event.name))
+
+    def process_IN_UNMOUNT(self, event):
+        self.bot.unmount_watch()
+
+class SpoolWatcher(rpc.XmlRpcBot):
+    """
+    Watch a directory and send a message when another process is done writing.
+    
+    This monitors a directory tree using inotify (linux specific) and
+    after some files having been written will send a message after <timeout>
+    seconds of no file writing.
+    
+    (Basically when the solexa machine finishes dumping a round of data
+    this'll hopefully send out a message saying hey look theres data available
+    
+    """
+    # these params need to be in the config file
+    # I wonder where I should put the documentation
+    #:Parameters:
+    #    `watchdir` - which directory tree to monitor for modifications
+    #    `profile` - specify which .gaworkflow profile to use
+    #    `write_timeout` - how many seconds to wait for writes to finish to
+    #                      the spool
+    #    `notify_timeout` - how often to timeout from notify
+    
+    def __init__(self, section=None, configfile=None):
+        #if configfile is None:
+        #    self.configfile = "~/.gaworkflow"
+        super(SpoolWatcher, self).__init__(section, configfile)
+        
+        self.cfg['watchdir'] = None
+        self.cfg['write_timeout'] = 10
+        self.cfg['notify_users'] = None
+        self.cfg['notify_runner'] = None
+        
+        self.notify_timeout = 0.001
+        self.wm = pyinotify.WatchManager()
+        self.handler = Handler(self.wm, self)
+        self.notifier = pyinotify.Notifier(self.wm, self.handler)
+        self.wdd = None
+        
+        self.notify_users = None
+        self.notify_runner = None
+        
+        self.eventTasks.append(self.process_notify)
+
+    def read_config(self, section=None, configfile=None):
+        super(SpoolWatcher, self).read_config(section, configfile)
+        
+        self.watch_dir = self._check_required_option('watchdir')
+        self.write_timeout = int(self.cfg['write_timeout'])
+        
+        self.notify_users = self._parse_user_list(self.cfg['notify_users'])
+        try:
+          self.notify_runner = \
+             self._parse_user_list(self.cfg['notify_runner'],
+                                   require_resource=True)
+        except bot.JIDMissingResource:
+            msg = 'need a full jabber ID + resource for xml-rpc destinations'
+            logging.FATAL(msg)
+            raise bot.JIDMissingResource(msg)
+            
+    def add_watch(self, watchdir=None):
+        """
+        start watching watchdir or self.watch_dir
+        we're currently limited to watching one directory tree.
+        """
+        # the one tree limit is mostly because self.wdd is a single item
+        # but managing it as a list might be a bit more annoying
+        if watchdir is None:
+            watchdir = self.watch_dir
+        logging.info("Watching:"+str(watchdir))
+        mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
+        # rec traverses the tree and adds all the directories that are there
+        # at the start.
+        # auto_add will add in new directories as they are created
+        self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True)
+
+    def unmount_watch(self):
+        if self.wdd is not None:
+            logging.debug("disabling watch")
+            logging.debug(str(self.wdd))
+            self.wm.rm_watch(self.wdd)
+            self.wdd = None
+            
+    def process_notify(self, *args):
+        # process the queue of events as explained above
+        self.notifier.process_events()
+        #check events waits timeout
+        if self.notifier.check_events(self.notify_timeout):
+            # read notified events and enqeue them
+            self.notifier.read_events()
+            # should we do something?
+        last_event_time = self.handler.last_event_time
+        if last_event_time is not None:
+            time_delta = time.time() - last_event_time
+            if time_delta > self.write_timeout:
+                self.startCopy()
+                self.handler.last_event_time = None
+    
+    def _parser(self, msg, who):
+        """
+        Parse xmpp chat messages
+        """
+        help = u"I can send [copy] message, or squencer [finished]"
+        if re.match(u"help", msg):
+            reply = help
+        elif re.match("copy", msg):            
+            self.startCopy()
+            reply = u"sent copy message"
+        elif re.match(u"finished", msg):
+            words = msg.split()
+            if len(words) == 2:
+                self.sequencingFinished(words[1])
+                reply = u"sending sequencing finished for %s" % (words[1])
+            else:
+                reply = u"need runfolder name"
+        else:
+            reply = u"I didn't understand '%s'" %(msg)            
+        return reply
+        
+    def start(self, daemonize):
+        """
+        Start application
+        """
+        self.add_watch()
+        super(SpoolWatcher, self).start(daemonize)
+        
+    def stop(self):
+        """
+        shutdown application
+        """
+        # destroy the inotify's instance on this interrupt (stop monitoring)
+        self.notifier.stop()
+        super(SpoolWatcher, self).stop()
+    
+    def startCopy(self):
+        logging.debug("writes seem to have stopped")
+        if self.notify_runner is not None:
+            for r in self.notify_runner:
+                self.rpc_send(r, tuple(), 'startCopy')
+        
+    def sequencingFinished(self, run_dir):
+        # need to strip off self.watch_dir from rundir I suspect.
+        logging.info("run.completed in " + str(run_dir))
+        pattern = self.watch_dir
+        if pattern[-1] != os.path.sep:
+            pattern += os.path.sep
+        stripped_run_dir = re.sub(pattern, "", run_dir)
+        logging.debug("stripped to " + stripped_run_dir)
+        if self.notify_users is not None:
+            for u in self.notify_users:
+                self.send(u, 'Sequencing run %s finished' % (stripped_run_dir))
+        if self.notify_runner is not None:
+            for r in self.notify_runner:
+                self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
+        
+def main(args=None):
+    bot = SpoolWatcher()
+    return bot.main(args)
+    
+if __name__ == "__main__":
+    sys.exit(main(sys.argv[1:]))
+    
\ No newline at end of file
diff --git a/gaworkflow/copier.py b/gaworkflow/copier.py
deleted file mode 100644 (file)
index 2e0d3ae..0000000
+++ /dev/null
@@ -1,245 +0,0 @@
-import ConfigParser
-import copy
-import logging
-import logging.handlers
-import os
-import re
-import subprocess
-import sys
-import time
-import traceback
-
-from benderjab import rpc
-
-def runfolder_validate(fname):
-    """
-    Return True if fname looks like a runfolder name
-    """
-    if re.match("^[0-9]{6}_[-A-Za-z0-9_]*$", fname):
-        return True
-    else:
-        return False
-    
-class rsync(object):
-  def __init__(self, source, dest, pwfile):
-    self.pwfile = os.path.expanduser(pwfile)
-    self.cmd = ['/usr/bin/rsync', ]
-    self.cmd.append('--password-file=%s' % (self.pwfile))
-    self.source_base = source
-    self.dest_base = dest
-    self.processes = {}
-    self.exit_code = None
-
-  def list(self):
-    """Get a directory listing"""
-    args = copy.copy(self.cmd)
-    args.append(self.source_base)
-
-    logging.debug("Rsync cmd:" + " ".join(args))
-    short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
-    return self.list_filter(short_process.stdout)
-
-  def list_filter(self, lines):
-    """
-    parse rsync directory listing
-    """
-    dirs_to_copy = []
-    direntries = [ x[0:42].split() + [x[43:-1]] for x in lines ]
-    for permissions, size, filedate, filetime, filename in direntries:
-      if permissions[0] == 'd':
-        # hey its a directory, the first step to being something we want to 
-        # copy
-        if re.match("[0-9]{6}", filename):
-          # it starts with something that looks like a 6 digit date
-          # aka good enough for me
-          dirs_to_copy.append(filename)
-    return dirs_to_copy
-
-  def create_copy_process(self, dirname):
-    args = copy.copy(self.cmd)
-    # we want to copy everything
-    args.append('-rlt') 
-    # from here
-    args.append(os.path.join(self.source_base, dirname))
-    # to here
-    args.append(self.dest_base)
-    logging.debug("Rsync cmd:" + " ".join(args))
-    return subprocess.Popen(args)
-  def copy(self):
-    """
-    copy any interesting looking directories over
-    return list of items that we started copying.
-    """
-    # clean up any lingering non-running processes
-    self.poll()
-    
-    # what's available to copy?
-    dirs_to_copy = self.list()
-    
-    # lets start copying
-    started = []
-    for d in dirs_to_copy:
-      process = self.processes.get(d, None)
-      
-      if process is None:
-        # we don't have a process, so make one
-        logging.info("rsyncing %s" % (d))
-        self.processes[d] = self.create_copy_process(d)
-        started.append(d)           
-    return started
-      
-  def poll(self):
-      """
-      check currently running processes to see if they're done
-      
-      return path roots that have finished.
-      """
-      for dir_key, proc_value in self.processes.items():
-          retcode = proc_value.poll()
-          if retcode is None:
-              # process hasn't finished yet
-              pass
-          elif retcode == 0:
-              logging.info("finished rsyncing %s, exitcode %d" %( dir_key, retcode))
-              del self.processes[dir_key]
-          else:
-              logging.error("rsync failed for %s, exit code %d" % (dir_key, retcode))
-              
-  def __len__(self):
-      """
-      Return how many active rsync processes we currently have
-      
-      Call poll first to close finished processes.
-      """
-      return len(self.processes)
-  
-  def keys(self):
-      """
-      Return list of current run folder names
-      """
-      return self.processes.keys()
-
-class CopierBot(rpc.XmlRpcBot):
-    def __init__(self, section=None, configfile=None):
-        #if configfile is None:
-        #    configfile = '~/.gaworkflow'
-            
-        super(CopierBot, self).__init__(section, configfile)
-        
-        # options for rsync command
-        self.cfg['rsync_password_file'] = None
-        self.cfg['rsync_source'] = None
-        self.cfg['rsync_destination'] = None 
-        
-        # options for reporting we're done 
-        self.cfg['notify_users'] = None
-        self.cfg['notify_runner'] = None
-                            
-        self.pending = []
-        self.rsync = None
-        self.notify_users = None
-        self.notify_runner = None
-        
-        self.register_function(self.startCopy)
-        self.register_function(self.sequencingFinished)
-        self.eventTasks.append(self.update)
-        
-    def read_config(self, section=None, configfile=None):
-        """
-        read the config file
-        """
-        super(CopierBot, self).read_config(section, configfile)
-        
-        password = self._check_required_option('rsync_password_file')
-        source = self._check_required_option('rsync_source')
-        destination = self._check_required_option('rsync_destination')
-        self.rsync = rsync(source, destination, password)
-        
-        self.notify_users = self._parse_user_list(self.cfg['notify_users'])
-        try:
-          self.notify_runner = \
-             self._parse_user_list(self.cfg['notify_runner'],
-                                   require_resource=True)
-        except bot.JIDMissingResource:
-            msg = 'need a full jabber ID + resource for xml-rpc destinations'
-            logging.FATAL(msg)
-            raise bot.JIDMissingResource(msg)
-
-    def startCopy(self, *args):
-        """
-        start our copy
-        """
-        logging.info("starting copy scan")
-        started = self.rsync.copy()
-        logging.info("copying:" + " ".join(started)+".")
-        return started
-        
-    def sequencingFinished(self, runDir, *args):
-        """
-        The run was finished, if we're done copying, pass the message on        
-        """
-        # close any open processes
-        self.rsync.poll()
-        
-        # see if we're still copying
-        if runfolder_validate(runDir):
-            logging.info("recevied sequencing finshed for %s" % (runDir))
-            self.pending.append(runDir)
-            self.startCopy()
-            return "PENDING"
-        else:
-            errmsg = "received bad runfolder name (%s)" % (runDir)
-            logging.warning(errmsg)
-            # maybe I should use a different error message
-            raise RuntimeError(errmsg)
-    
-    def reportSequencingFinished(self, runDir):
-        """
-        Send the sequencingFinished message to the interested parties
-        """
-        if self.notify_users is not None:
-            for u in self.notify_users:
-                self.send(u, 'Sequencing run %s finished' % (runDir))
-        if self.notify_runner is not None:
-            for r in self.notify_runner:
-                self.rpc_send(r, (runDir,), 'sequencingFinished')
-        logging.info("forwarding sequencingFinshed message for %s" % (runDir))
-        
-    def update(self, *args):
-        """
-        Update our current status.
-        Report if we've finished copying files.
-        """
-        self.rsync.poll()
-        for p in self.pending:
-            if p not in self.rsync.keys():
-                self.reportSequencingFinished(p)
-                self.pending.remove(p)
-        
-    def _parser(self, msg, who):
-        """
-        Parse xmpp chat messages
-        """
-        help = u"I can [copy], or report current [status]"
-        if re.match(u"help", msg):
-            reply = help
-        elif re.match("copy", msg):            
-            started = self.startCopy()
-            reply = u"started copying " + ", ".join(started)
-        elif re.match(u"status", msg):
-            msg = [u"Currently %d rsync processes are running." % (len(self.rsync))]
-            for d in self.rsync.keys():
-              msg.append(u"  " + d)
-            reply = os.linesep.join(msg)
-        else:
-            reply = u"I didn't understand '%s'" % (unicode(msg))
-        return reply
-
-def main(args=None):
-    bot = CopierBot()
-    bot.main(args)
-    
-if __name__ == "__main__":
-  sys.exit(main(sys.argv[1:]))
-
diff --git a/gaworkflow/runner.py b/gaworkflow/runner.py
deleted file mode 100644 (file)
index f81b682..0000000
+++ /dev/null
@@ -1,236 +0,0 @@
-#!/usr/bin/env python
-import logging
-import os
-import re
-import sys
-import time
-import threading
-
-from benderjab import rpc
-
-from gaworkflow.pipeline.configure_run import *
-from gaworkflow.pipeline.monitors import _percentCompleted
-
-#s_fc = re.compile('FC[0-9]+')
-s_fc = re.compile('_[0-9a-zA-Z]*$')
-
-
-def _get_flowcell_from_rundir(run_dir):
-    """
-    Returns flowcell string based on run_dir.
-    Returns None and logs error if flowcell can't be found.
-    """
-    junk, dirname = os.path.split(run_dir)
-    mo = s_fc.search(dirname)
-    if not mo:
-        logging.error('RunDir 2 FlowCell error: %s' % (run_dir))
-        return None
-
-    return dirname[mo.start()+1:]
-    
-
-
-class Runner(rpc.XmlRpcBot):
-    """
-    Manage running pipeline jobs.
-    """    
-    def __init__(self, section=None, configfile=None):
-        #if configfile is None:
-        #    self.configfile = "~/.gaworkflow"
-        super(Runner, self).__init__(section, configfile)
-        
-        self.cfg['notify_users'] = None
-        self.cfg['genome_dir'] = None
-        self.cfg['base_analysis_dir'] = None
-
-        self.cfg['notify_users'] = None
-        self.cfg['notify_postanalysis'] = None
-
-        self.conf_info_dict = {}
-        
-        self.register_function(self.sequencingFinished)
-        #self.eventTasks.append(self.update)
-
-    
-    def read_config(self, section=None, configfile=None):
-        super(Runner, self).read_config(section, configfile)
-
-        self.genome_dir = self._check_required_option('genome_dir')
-        self.base_analysis_dir = self._check_required_option('base_analysis_dir')
-
-        self.notify_users = self._parse_user_list(self.cfg['notify_users'])
-        #FIXME: process notify_postpipeline cfg
-        
-    
-    def _parser(self, msg, who):
-        """
-        Parse xmpp chat messages
-        """
-        help = u"I can send [start] a run, or report [status]"
-        if re.match(u"help", msg):
-            reply = help
-        elif re.match("status", msg):
-            words = msg.split()
-            if len(words) == 2:
-                reply = self.getStatusReport(words[1])
-            else:
-                reply = u"Status available for: %s" \
-                        % (', '.join([k for k in self.conf_info_dict.keys()]))
-        elif re.match(u"start", msg):
-            words = msg.split()
-            if len(words) == 2:
-                self.sequencingFinished(words[1])
-                reply = u"starting run for %s" % (words[1])
-            else:
-                reply = u"need runfolder name"
-        else:
-            reply = u"I didn't understand '%s'" %(msg)
-
-        logging.debug("reply: " + str(reply))
-        return reply
-
-
-    def getStatusReport(self, fc_num):
-        """
-        Returns text status report for flow cell number 
-        """
-        if fc_num not in self.conf_info_dict:
-            return "No record of a %s run." % (fc_num)
-
-        status = self.conf_info_dict[fc_num].status
-
-        if status is None:
-            return "No status information for %s yet." \
-                   " Probably still in configure step. Try again later." % (fc_num)
-
-        fc,ft = status.statusFirecrest()
-        bc,bt = status.statusBustard()
-        gc,gt = status.statusGerald()
-
-        tc,tt = status.statusTotal()
-
-        fp = _percentCompleted(fc, ft)
-        bp = _percentCompleted(bc, bt)
-        gp = _percentCompleted(gc, gt)
-        tp = _percentCompleted(tc, tt)
-
-        output = []
-
-        output.append(u'Firecrest: %s%% (%s/%s)' % (fp, fc, ft))
-        output.append(u'  Bustard: %s%% (%s/%s)' % (bp, bc, bt))
-        output.append(u'   Gerald: %s%% (%s/%s)' % (gp, gc, gt))
-        output.append(u'-----------------------')
-        output.append(u'    Total: %s%% (%s/%s)' % (tp, tc, tt))
-
-        return '\n'.join(output)
-    
-            
-    def sequencingFinished(self, run_dir):
-        """
-        Sequenceing (and copying) is finished, time to start pipeline
-        """
-        logging.debug("received sequencing finished message")
-
-        # Setup config info object
-        ci = ConfigInfo()
-        ci.base_analysis_dir = self.base_analysis_dir
-        ci.analysis_dir = os.path.join(self.base_analysis_dir, run_dir)        
-
-        # get flowcell from run_dir name
-        flowcell = _get_flowcell_from_rundir(run_dir)
-
-        # Store ci object in dictionary
-        self.conf_info_dict[flowcell] = ci
-
-
-        # Launch the job in it's own thread and turn.
-        self.launchJob(run_dir, flowcell, ci)
-        return "started"
-        
-        
-    def pipelineFinished(self, run_dir):
-        # need to strip off self.watch_dir from rundir I suspect.
-        logging.info("pipeline finished in" + str(run_dir))
-        #pattern = self.watch_dir
-        #if pattern[-1] != os.path.sep:
-        #    pattern += os.path.sep
-        #stripped_run_dir = re.sub(pattern, "", run_dir)
-        #logging.debug("stripped to " + stripped_run_dir)
-
-        # Notify each user that the run has finished.
-        if self.notify_users is not None:
-            for u in self.notify_users:
-                self.send(u, 'Pipeline run %s finished' % (run_dir))
-                
-        #if self.notify_runner is not None:
-        #    for r in self.notify_runner:
-        #        self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
-
-    def reportMsg(self, msg):
-
-        if self.notify_users is not None:
-            for u in self.notify_users:
-                self.send(u, msg)
-
-
-    def _runner(self, run_dir, flowcell, conf_info):
-
-        # retrieve config step
-        cfg_filepath = os.path.join(conf_info.analysis_dir,
-                                    'config32auto.txt')
-        status_retrieve_cfg = retrieve_config(conf_info,
-                                          flowcell,
-                                          cfg_filepath,
-                                          self.genome_dir)
-        if status_retrieve_cfg:
-            logging.info("Runner: Retrieve config: success")
-            self.reportMsg("Retrieve config (%s): success" % (run_dir))
-        else:
-            logging.error("Runner: Retrieve config: failed")
-            self.reportMsg("Retrieve config (%s): FAILED" % (run_dir))
-
-        
-        # configure step
-        if status_retrieve_cfg:
-            status = configure(conf_info)
-            if status:
-                logging.info("Runner: Configure: success")
-                self.reportMsg("Configure (%s): success" % (run_dir))
-            else:
-                logging.error("Runner: Configure: failed")
-                self.reportMsg("Configure (%s): FAILED" % (run_dir))
-
-            #if successful, continue
-            if status:
-                # Setup status cmdline status monitor
-                #startCmdLineStatusMonitor(ci)
-                
-                # running step
-                print 'Running pipeline now!'
-                run_status = run_pipeline(conf_info)
-                if run_status is True:
-                    logging.info('Runner: Pipeline: success')
-                    self.piplineFinished(run_dir)
-                else:
-                    logging.info('Runner: Pipeline: failed')
-                    self.reportMsg("Pipeline run (%s): FAILED" % (run_dir))
-
-
-    def launchJob(self, run_dir, flowcell, conf_info):
-        """
-        Starts up a thread for running the pipeline
-        """
-        t = threading.Thread(target=self._runner,
-                        args=[run_dir, flowcell, conf_info])
-        t.setDaemon(True)
-        t.start()
-        
-
-        
-def main(args=None):
-    bot = Runner()
-    return bot.main(args)
-    
-if __name__ == "__main__":
-    sys.exit(main(sys.argv[1:]))
-    
diff --git a/gaworkflow/spoolwatcher.py b/gaworkflow/spoolwatcher.py
deleted file mode 100644 (file)
index ba566cb..0000000
+++ /dev/null
@@ -1,196 +0,0 @@
-#!/usr/bin/env python
-import logging
-import os
-import re
-import sys
-import time
-
-# this uses pyinotify
-import pyinotify
-from pyinotify import EventsCodes
-
-from benderjab import rpc
-
-
-class Handler(pyinotify.ProcessEvent):
-    def __init__(self, watchmanager, bot):
-        self.last_event_time = None
-        self.watchmanager = watchmanager
-        self.bot = bot
-
-    def process_IN_CREATE(self, event):
-        self.last_event_time = time.time()
-        msg = "Create: %s" %  os.path.join(event.path, event.name)
-        if event.name.lower() == "run.completed":
-            try:
-                self.bot.sequencingFinished(event.path)
-            except IOError, e:
-                logging.error("Couldn't send sequencingFinished")
-        logging.debug(msg)
-
-    def process_IN_DELETE(self, event):
-        logging.debug("Remove: %s" %  os.path.join(event.path, event.name))
-
-    def process_IN_UNMOUNT(self, event):
-        self.bot.unmount_watch()
-
-class SpoolWatcher(rpc.XmlRpcBot):
-    """
-    Watch a directory and send a message when another process is done writing.
-    
-    This monitors a directory tree using inotify (linux specific) and
-    after some files having been written will send a message after <timeout>
-    seconds of no file writing.
-    
-    (Basically when the solexa machine finishes dumping a round of data
-    this'll hopefully send out a message saying hey look theres data available
-    
-    """
-    # these params need to be in the config file
-    # I wonder where I should put the documentation
-    #:Parameters:
-    #    `watchdir` - which directory tree to monitor for modifications
-    #    `profile` - specify which .gaworkflow profile to use
-    #    `write_timeout` - how many seconds to wait for writes to finish to
-    #                      the spool
-    #    `notify_timeout` - how often to timeout from notify
-    
-    def __init__(self, section=None, configfile=None):
-        #if configfile is None:
-        #    self.configfile = "~/.gaworkflow"
-        super(SpoolWatcher, self).__init__(section, configfile)
-        
-        self.cfg['watchdir'] = None
-        self.cfg['write_timeout'] = 10
-        self.cfg['notify_users'] = None
-        self.cfg['notify_runner'] = None
-        
-        self.notify_timeout = 0.001
-        self.wm = pyinotify.WatchManager()
-        self.handler = Handler(self.wm, self)
-        self.notifier = pyinotify.Notifier(self.wm, self.handler)
-        self.wdd = None
-        
-        self.notify_users = None
-        self.notify_runner = None
-        
-        self.eventTasks.append(self.process_notify)
-
-    def read_config(self, section=None, configfile=None):
-        super(SpoolWatcher, self).read_config(section, configfile)
-        
-        self.watch_dir = self._check_required_option('watchdir')
-        self.write_timeout = int(self.cfg['write_timeout'])
-        
-        self.notify_users = self._parse_user_list(self.cfg['notify_users'])
-        try:
-          self.notify_runner = \
-             self._parse_user_list(self.cfg['notify_runner'],
-                                   require_resource=True)
-        except bot.JIDMissingResource:
-            msg = 'need a full jabber ID + resource for xml-rpc destinations'
-            logging.FATAL(msg)
-            raise bot.JIDMissingResource(msg)
-            
-    def add_watch(self, watchdir=None):
-        """
-        start watching watchdir or self.watch_dir
-        we're currently limited to watching one directory tree.
-        """
-        # the one tree limit is mostly because self.wdd is a single item
-        # but managing it as a list might be a bit more annoying
-        if watchdir is None:
-            watchdir = self.watch_dir
-        logging.info("Watching:"+str(watchdir))
-        mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
-        # rec traverses the tree and adds all the directories that are there
-        # at the start.
-        # auto_add will add in new directories as they are created
-        self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True)
-
-    def unmount_watch(self):
-        if self.wdd is not None:
-            logging.debug("disabling watch")
-            logging.debug(str(self.wdd))
-            self.wm.rm_watch(self.wdd)
-            self.wdd = None
-            
-    def process_notify(self, *args):
-        # process the queue of events as explained above
-        self.notifier.process_events()
-        #check events waits timeout
-        if self.notifier.check_events(self.notify_timeout):
-            # read notified events and enqeue them
-            self.notifier.read_events()
-            # should we do something?
-        last_event_time = self.handler.last_event_time
-        if last_event_time is not None:
-            time_delta = time.time() - last_event_time
-            if time_delta > self.write_timeout:
-                self.startCopy()
-                self.handler.last_event_time = None
-    
-    def _parser(self, msg, who):
-        """
-        Parse xmpp chat messages
-        """
-        help = u"I can send [copy] message, or squencer [finished]"
-        if re.match(u"help", msg):
-            reply = help
-        elif re.match("copy", msg):            
-            self.startCopy()
-            reply = u"sent copy message"
-        elif re.match(u"finished", msg):
-            words = msg.split()
-            if len(words) == 2:
-                self.sequencingFinished(words[1])
-                reply = u"sending sequencing finished for %s" % (words[1])
-            else:
-                reply = u"need runfolder name"
-        else:
-            reply = u"I didn't understand '%s'" %(msg)            
-        return reply
-        
-    def start(self, daemonize):
-        """
-        Start application
-        """
-        self.add_watch()
-        super(SpoolWatcher, self).start(daemonize)
-        
-    def stop(self):
-        """
-        shutdown application
-        """
-        # destroy the inotify's instance on this interrupt (stop monitoring)
-        self.notifier.stop()
-        super(SpoolWatcher, self).stop()
-    
-    def startCopy(self):
-        logging.debug("writes seem to have stopped")
-        if self.notify_runner is not None:
-            for r in self.notify_runner:
-                self.rpc_send(r, tuple(), 'startCopy')
-        
-    def sequencingFinished(self, run_dir):
-        # need to strip off self.watch_dir from rundir I suspect.
-        logging.info("run.completed in " + str(run_dir))
-        pattern = self.watch_dir
-        if pattern[-1] != os.path.sep:
-            pattern += os.path.sep
-        stripped_run_dir = re.sub(pattern, "", run_dir)
-        logging.debug("stripped to " + stripped_run_dir)
-        if self.notify_users is not None:
-            for u in self.notify_users:
-                self.send(u, 'Sequencing run %s finished' % (stripped_run_dir))
-        if self.notify_runner is not None:
-            for r in self.notify_runner:
-                self.rpc_send(r, (stripped_run_dir,), 'sequencingFinished')
-        
-def main(args=None):
-    bot = SpoolWatcher()
-    return bot.main(args)
-    
-if __name__ == "__main__":
-    sys.exit(main(sys.argv[1:]))
-    
\ No newline at end of file
index 898f68521045a3caa107b6c403ddbc536d23d62e..7025d57a72ceb85ba1b2b0a040abe34716056d91 100644 (file)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 import sys
-from gaworkflow.copier import main
+from gaworkflow.automation.copier import main
 
 if __name__ == "__main__":
   sys.exit(main(sys.argv[1:]))
index 655659a533c16579fe49860a905e0f1c7bec7ffb..6b6da11e1a85b5e55b89cd965471912ff694e05f 100644 (file)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 import sys
-from gaworkflow.runner import main
+from gaworkflow.automation.runner import main
 
 if __name__ == "__main__":
   sys.exit(main(sys.argv[1:]))
index 25a277f08ff3ad842ce80578439b5f63e955ddb2..269292e606b36c2706ac33cdd867d15cb36f8ce9 100644 (file)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 import sys
-from gaworkflow.spoolwatcher import main
+from gaworkflow.automation.spoolwatcher import main
 
 if __name__ == "__main__":
     sys.exit(main(sys.argv[1:]))