from htsworkflow.pipelines.summary import Summary, SummaryGA, SummaryHiSeq
from htsworkflow.pipelines.eland import eland, ELAND
+from htsworkflow.pipelines.samplekey import SampleKey
from htsworkflow.pipelines.runfolder import \
ElementTree, \
timestamp = self.tree.findtext('ChipWideRunParameters/TIME_STAMP')
if timestamp is not None:
- epochstamp = time.mktime(time.strptime(timestamp, '%c'))
+ epochstamp = time.mktime(time.strptime(timestamp))
return datetime.fromtimestamp(epochstamp)
return super(Gerald, self)._get_date()
date = property(_get_date)
if self.tree is None:
return
if len(self.tree.xpath('TIME_STAMP')) == 0:
- time_stamp = self.date.strftime('%c')
+ time_stamp = self.date.strftime("%a %b %d %H:%M:%S %Y")
time_element = ElementTree.Element('TIME_STAMP')
time_element.text = time_stamp
self.tree.append(time_element)
return None
time_element = self.tree.xpath('TIME_STAMP')
if len(time_element) == 1:
- return datetime.strptime(time_element[0].text, '%c')
+ timetuple = time.strptime(
+ time_element[0].text.strip(),
+ "%a %b %d %H:%M:%S %Y")
+ return datetime(*timetuple[:6])
return super(CASAVA, self)._get_date()
date = property(_get_date)
# those consistently.
for element in analysis:
sample, lane_id = element.tag.split('_')
- self._lanes[int(lane_id)] = LaneParametersGA(
+ key = SampleKey(lane=int(lane_id), sample=sample)
+ self._lanes[key] = LaneParametersGA(
self._gerald, lane_id)
def _extract_hiseq_analysis_type(self, analysis):
"""Extract from HiSeq style multiplexed analysis types"""
for element in analysis:
name = element.attrib['name']
- self._lanes[name] = LaneParametersHiSeq(self._gerald,
- name,
- element)
+ key = SampleKey(sample=name)
+ self._lanes[key] = LaneParametersHiSeq(self._gerald,
+ name,
+ element)
def __iter__(self):
if self._lanes is None:
def __getitem__(self, key):
if self._lanes is None:
self._initialize_lanes()
- return self._lanes[key]
+ value = self._lanes.get(key, None)
+ if value is not None:
+ return value
+ real_key = self._find_key(key)
+ if real_key is not None:
+ return self._lanes[real_key]
+ raise KeyError("%s not found in %s" % (
+ repr(key),
+ ",".join((repr(k) for k in self._lanes.keys()))))
def __setitem__(self, key, value):
+ if len(self._lanes) > 100:
+ LOGGER.warn("many projects loaded, consider improving dictionary")
+ real_key = self._find_key(key)
+ if real_key is not None:
+ key = real_key
self._lanes[key] = value
def __delitem__(self, key):
- del self._lanes[key]
+ if key in self._lanes:
+ del self._lanes[key]
+ else:
+ real_key = self._find_key(key)
+ if real_key is not None:
+ del self._lanes[real_key]
def __len__(self):
if self._lanes is None:
self._initialize_lanes()
return len(self._lanes)
+ def _find_key(self, lookup_key):
+ if not isinstance(lookup_key, SampleKey):
+ lookup_key = SampleKey(lane=lookup_key)
+
+ results = []
+ for k in self._lanes:
+ if k.matches(lookup_key):
+ results.append(k)
+ if len(results) > 1:
+ errmsg = "Key %s matched multiple keys: %s"
+ raise ValueError(errmsg % (str(lookup_key),
+ ",".join((str(x) for x in results))))
+
+ elif len(results) == 1:
+ return results[0]
+ else:
+ return None
def gerald(pathname):
LOGGER.info("Parsing gerald config.xml")