Gerald's time-stamp format was inconsistent.
[htsworkflow.git] / htsworkflow / pipelines / gerald.py
index eb3352a51665901d126220f17dc1b50fb3a3da83..75d7d2e514485503b0cca1a612a969ddc6e156e0 100644 (file)
@@ -1,6 +1,6 @@
+"""Provide access to information stored in the GERALD directory.
 """
-Provide access to information stored in the GERALD directory.
-"""
+import collections
 from datetime import datetime, date
 import logging
 import os
@@ -10,6 +10,7 @@ import time
 
 from htsworkflow.pipelines.summary import Summary, SummaryGA, SummaryHiSeq
 from htsworkflow.pipelines.eland import eland, ELAND
+from htsworkflow.pipelines.samplekey import SampleKey
 
 from htsworkflow.pipelines.runfolder import \
    ElementTree, \
@@ -103,7 +104,7 @@ class Gerald(Alignment):
 
         timestamp = self.tree.findtext('ChipWideRunParameters/TIME_STAMP')
         if timestamp is not None:
-            epochstamp = time.mktime(time.strptime(timestamp, '%c'))
+            epochstamp = time.mktime(time.strptime(timestamp))
             return datetime.fromtimestamp(epochstamp)
         return super(Gerald, self)._get_date()
     date = property(_get_date)
@@ -176,7 +177,7 @@ class CASAVA(Alignment):
         if self.tree is None:
             return
         if len(self.tree.xpath('TIME_STAMP')) == 0:
-            time_stamp = self.date.strftime('%c')
+            time_stamp = self.date.strftime("%a %b %d %H:%M:%S %Y")
             time_element = ElementTree.Element('TIME_STAMP')
             time_element.text = time_stamp
             self.tree.append(time_element)
@@ -186,7 +187,10 @@ class CASAVA(Alignment):
             return None
         time_element = self.tree.xpath('TIME_STAMP')
         if len(time_element) == 1:
-            return datetime.strptime(time_element[0].text, '%c')
+           timetuple = time.strptime(
+               time_element[0].text.strip(),
+               "%a %b %d %H:%M:%S %Y")
+           return datetime(*timetuple[:6])
         return super(CASAVA, self)._get_date()
     date = property(_get_date)
 
@@ -349,15 +353,15 @@ class LaneParametersHiSeq(LaneParameters):
         return self.__get_attribute('USE_BASES1')
     use_bases = property(_get_use_bases)
 
-class LaneSpecificRunParameters(object):
+class LaneSpecificRunParameters(collections.MutableMapping):
     """
     Provide access to LaneSpecificRunParameters
     """
     def __init__(self, gerald):
         self._gerald = gerald
-        self._lane = None
+        self._lanes = None
 
-    def _initalize_lanes(self):
+    def _initialize_lanes(self):
         """
         build dictionary of LaneParameters
         """
@@ -377,44 +381,75 @@ class LaneSpecificRunParameters(object):
         # those consistently.
         for element in analysis:
             sample, lane_id = element.tag.split('_')
-            self._lanes[int(lane_id)] = LaneParametersGA(
+            key = SampleKey(lane=int(lane_id), sample=sample)
+            self._lanes[key] = LaneParametersGA(
                                           self._gerald, lane_id)
 
     def _extract_hiseq_analysis_type(self, analysis):
         """Extract from HiSeq style multiplexed analysis types"""
         for element in analysis:
             name = element.attrib['name']
-            self._lanes[name] = LaneParametersHiSeq(self._gerald,
-                                                    name,
-                                                    element)
+            key = SampleKey(sample=name)
+            self._lanes[key] = LaneParametersHiSeq(self._gerald,
+                                                   name,
+                                                   element)
 
     def __iter__(self):
+        if self._lanes is None:
+            self._initialize_lanes()
         return self._lanes.iterkeys()
+
     def __getitem__(self, key):
-        if self._lane is None:
-            self._initalize_lanes()
-        return self._lanes[key]
-    def get(self, key, default):
-        if self._lane is None:
-            self._initalize_lanes()
-        return self._lanes.get(key, None)
-    def keys(self):
-        if self._lane is None:
-            self._initalize_lanes()
-        return self._lanes.keys()
-    def values(self):
-        if self._lane is None:
-            self._initalize_lanes()
-        return self._lanes.values()
-    def items(self):
-        if self._lane is None:
-            self._initalize_lanes()
-        return self._lanes.items()
+        if self._lanes is None:
+            self._initialize_lanes()
+        value = self._lanes.get(key, None)
+        if value is not None:
+            return value
+        real_key = self._find_key(key)
+        if real_key is not None:
+            return self._lanes[real_key]
+        raise KeyError("%s not found in %s" % (
+            repr(key),
+            ",".join((repr(k) for k in self._lanes.keys()))))
+
+    def __setitem__(self, key, value):
+        if len(self._lanes) > 100:
+            LOGGER.warn("many projects loaded, consider improving dictionary")
+        real_key = self._find_key(key)
+        if real_key is not None:
+            key = real_key
+        self._lanes[key] = value
+
+    def __delitem__(self, key):
+        if key in self._lanes:
+            del self._lanes[key]
+        else:
+            real_key = self._find_key(key)
+            if real_key is not None:
+                del self._lanes[real_key]
+
     def __len__(self):
-        if self._lane is None:
-            self._initalize_lanes()
+        if self._lanes is None:
+            self._initialize_lanes()
         return len(self._lanes)
 
+    def _find_key(self, lookup_key):
+        if not isinstance(lookup_key, SampleKey):
+            lookup_key = SampleKey(lane=lookup_key)
+
+        results = []
+        for k in self._lanes:
+            if k.matches(lookup_key):
+                results.append(k)
+        if len(results) > 1:
+            errmsg = "Key %s matched multiple keys: %s"
+            raise ValueError(errmsg % (str(lookup_key),
+                                       ",".join((str(x) for x in results))))
+
+        elif len(results) == 1:
+            return results[0]
+        else:
+            return None
 
 def gerald(pathname):
     LOGGER.info("Parsing gerald config.xml")