merge copying/running automation code from trunk into live site
authorDiane Trout <diane@caltech.edu>
Sat, 24 Jan 2009 01:28:29 +0000 (01:28 +0000)
committerDiane Trout <diane@caltech.edu>
Sat, 24 Jan 2009 01:28:29 +0000 (01:28 +0000)
gaworkflow/automation/copier.py
gaworkflow/automation/runner.py
gaworkflow/automation/spoolwatcher.py
gaworkflow/automation/test/test_runner.py [new file with mode: 0644]

index 2e0d3ae32ca947c3637cd5f67c012db4a3dd0843..572cfb3abacbff9abe8ad296245fafc695db66b9 100644 (file)
@@ -123,7 +123,7 @@ class rsync(object):
 class CopierBot(rpc.XmlRpcBot):
     def __init__(self, section=None, configfile=None):
         #if configfile is None:
-        #    configfile = '~/.gaworkflow'
+        #    configfile = '~/.htsworkflow'
             
         super(CopierBot, self).__init__(section, configfile)
         
index f81b68200bf43a4f41fd749cf1377a65859a37f2..45d4ffcdee8d52a7786dc65816fcc6b8d8d78044 100644 (file)
@@ -1,4 +1,5 @@
 #!/usr/bin/env python
+from glob import glob
 import logging
 import os
 import re
@@ -8,8 +9,7 @@ import threading
 
 from benderjab import rpc
 
-from gaworkflow.pipeline.configure_run import *
-from gaworkflow.pipeline.monitors import _percentCompleted
+from htsworkflow.pipelines.configure_run import *
 
 #s_fc = re.compile('FC[0-9]+')
 s_fc = re.compile('_[0-9a-zA-Z]*$')
@@ -36,7 +36,7 @@ class Runner(rpc.XmlRpcBot):
     """    
     def __init__(self, section=None, configfile=None):
         #if configfile is None:
-        #    self.configfile = "~/.gaworkflow"
+        #    self.configfile = "~/.htsworkflow"
         super(Runner, self).__init__(section, configfile)
         
         self.cfg['notify_users'] = None
@@ -83,6 +83,8 @@ class Runner(rpc.XmlRpcBot):
                 reply = u"starting run for %s" % (words[1])
             else:
                 reply = u"need runfolder name"
+        elif re.match(u"path", msg):
+           reply = u"My path is: " + unicode(os.environ['PATH'])
         else:
             reply = u"I didn't understand '%s'" %(msg)
 
@@ -103,24 +105,7 @@ class Runner(rpc.XmlRpcBot):
             return "No status information for %s yet." \
                    " Probably still in configure step. Try again later." % (fc_num)
 
-        fc,ft = status.statusFirecrest()
-        bc,bt = status.statusBustard()
-        gc,gt = status.statusGerald()
-
-        tc,tt = status.statusTotal()
-
-        fp = _percentCompleted(fc, ft)
-        bp = _percentCompleted(bc, bt)
-        gp = _percentCompleted(gc, gt)
-        tp = _percentCompleted(tc, tt)
-
-        output = []
-
-        output.append(u'Firecrest: %s%% (%s/%s)' % (fp, fc, ft))
-        output.append(u'  Bustard: %s%% (%s/%s)' % (bp, bc, bt))
-        output.append(u'   Gerald: %s%% (%s/%s)' % (gp, gc, gt))
-        output.append(u'-----------------------')
-        output.append(u'    Total: %s%% (%s/%s)' % (tp, tc, tt))
+        output = status.statusReport()
 
         return '\n'.join(output)
     
@@ -177,7 +162,7 @@ class Runner(rpc.XmlRpcBot):
 
         # retrieve config step
         cfg_filepath = os.path.join(conf_info.analysis_dir,
-                                    'config32auto.txt')
+                                    'config-auto.txt')
         status_retrieve_cfg = retrieve_config(conf_info,
                                           flowcell,
                                           cfg_filepath,
@@ -196,6 +181,9 @@ class Runner(rpc.XmlRpcBot):
             if status:
                 logging.info("Runner: Configure: success")
                 self.reportMsg("Configure (%s): success" % (run_dir))
+                self.reportMsg(
+                    os.linesep.join(glob(os.path.join(run_dir,'Data','C*')))
+                )
             else:
                 logging.error("Runner: Configure: failed")
                 self.reportMsg("Configure (%s): FAILED" % (run_dir))
@@ -210,7 +198,7 @@ class Runner(rpc.XmlRpcBot):
                 run_status = run_pipeline(conf_info)
                 if run_status is True:
                     logging.info('Runner: Pipeline: success')
-                    self.piplineFinished(run_dir)
+                    self.reportMsg("Pipeline run (%s): Finished" % (run_dir,))
                 else:
                     logging.info('Runner: Pipeline: failed')
                     self.reportMsg("Pipeline run (%s): FAILED" % (run_dir))
index abd970982263975cd23b0144024be88711f81f06..2a575353d124587db3158289533d644ea57979ec 100644 (file)
@@ -6,7 +6,7 @@ import sys
 import time
 #import glob
 
-#from gaworkflow.pipeline.recipe_parser import get_cycles
+from htsworkflow.util import mount
 
 # this uses pyinotify
 import pyinotify
@@ -47,6 +47,8 @@ class Handler(pyinotify.ProcessEvent):
         logging.debug("Remove: %s" %  os.path.join(event.path, event.name))
 
     def process_IN_UNMOUNT(self, event):
+        pathname = os.path.join(event.path, event.name)
+        logging.debug("IN_UNMOUNT: %s" % (pathname,))
         self.bot.unmount_watch()
 
 class SpoolWatcher(rpc.XmlRpcBot):
@@ -65,14 +67,14 @@ class SpoolWatcher(rpc.XmlRpcBot):
     # I wonder where I should put the documentation
     #:Parameters:
     #    `watchdir` - which directory tree to monitor for modifications
-    #    `profile` - specify which .gaworkflow profile to use
+    #    `profile` - specify which .htsworkflow profile to use
     #    `write_timeout` - how many seconds to wait for writes to finish to
     #                      the spool
     #    `notify_timeout` - how often to timeout from notify
     
     def __init__(self, section=None, configfile=None):
         #if configfile is None:
-        #    self.configfile = "~/.gaworkflow"
+        #    self.configfile = "~/.htsworkflow"
         super(SpoolWatcher, self).__init__(section, configfile)
         
         self.cfg['watchdir'] = None
@@ -85,6 +87,8 @@ class SpoolWatcher(rpc.XmlRpcBot):
         self.handler = Handler(self.wm, self)
         self.notifier = pyinotify.Notifier(self.wm, self.handler)
         self.wdd = None
+        self.mount_point = None
+        self.mounted = True
         
         self.notify_users = None
         self.notify_runner = None
@@ -106,7 +110,7 @@ class SpoolWatcher(rpc.XmlRpcBot):
             msg = 'need a full jabber ID + resource for xml-rpc destinations'
             logging.FATAL(msg)
             raise bot.JIDMissingResource(msg)
-            
+
     def add_watch(self, watchdir=None):
         """
         start watching watchdir or self.watch_dir
@@ -117,6 +121,9 @@ class SpoolWatcher(rpc.XmlRpcBot):
         if watchdir is None:
             watchdir = self.watch_dir
         logging.info("Watching:"+str(watchdir))
+
+        self.mount_point = mount.find_mount_point_for(watchdir)
+
         mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
         # rec traverses the tree and adds all the directories that are there
         # at the start.
@@ -125,10 +132,9 @@ class SpoolWatcher(rpc.XmlRpcBot):
 
     def unmount_watch(self):
         if self.wdd is not None:
-            logging.debug("disabling watch")
-            logging.debug(str(self.wdd))
             self.wm.rm_watch(self.wdd.values())
             self.wdd = None
+            self.mounted = False
             
     def process_notify(self, *args):
         # process the queue of events as explained above
@@ -138,13 +144,25 @@ class SpoolWatcher(rpc.XmlRpcBot):
             # read notified events and enqeue them
             self.notifier.read_events()
             # should we do something?
+        # has something happened?
         last_event_time = self.handler.last_event_time
         if last_event_time is not None:
             time_delta = time.time() - last_event_time
             if time_delta > self.write_timeout:
                 self.startCopy()
                 self.handler.last_event_time = None
-    
+        # handle unmounted filesystems
+        if not self.mounted:
+            if mount.is_mounted(self.mount_point):
+                # we've been remounted. Huzzah!
+                # restart the watch
+                self.add_watch()
+                self.mounted = True
+                logging.info(
+                    "%s was remounted, restarting watch" % \
+                        (self.mount_point)
+                )
+
     def _parser(self, msg, who):
         """
         Parse xmpp chat messages
diff --git a/gaworkflow/automation/test/test_runner.py b/gaworkflow/automation/test/test_runner.py
new file mode 100644 (file)
index 0000000..6c3b9df
--- /dev/null
@@ -0,0 +1,46 @@
+import unittest
+
+
+import os
+from htsworkflow.automation.copier import runfolder_validate
+
+def extract_runfolder_path(watchdir, event):
+  runfolder_path = watchdir
+  path = event.path
+  if not path.startswith(watchdir):
+    return None
+
+  fragments = path[len(watchdir):].split(os.path.sep)
+  for f in fragments:
+    runfolder_path = os.path.join(runfolder_path, f)
+    if runfolder_validate(f):
+      return runfolder_path
+  return None
+
+class Event(object):
+  def __init__(self, path=None, name=None):
+    self.path = path
+    self.name = name
+
+class testRunner(unittest.TestCase):
+
+    def test_extract_runfolder(self):
+        watchdir = os.path.join('root', 'server', 'mount')
+        runfolder = os.path.join(watchdir, '080909_HWI-EAS229_0052_1234ABCD')
+        ipar = os.path.join(runfolder, 'Data', 'IPAR_1.01')
+        other = os.path.join(watchdir, 'other')
+
+        event = Event( path=runfolder )
+        self.failUnlessEqual(extract_runfolder_path(watchdir, event), runfolder)
+        
+        event = Event( path=ipar )
+        self.failUnlessEqual(extract_runfolder_path(watchdir, event), runfolder)
+
+        event = Event( path=other)
+        self.failUnlessEqual(extract_runfolder_path(watchdir, event), None )
+        
+def suite():
+    return unittest.makeSuite(testRunner,'test')
+
+if __name__ == "__main__":
+    unittest.main(defaultTest="suite")