from htsworkflow.pipelines.summary import Summary, SummaryGA, SummaryHiSeq
from htsworkflow.pipelines.eland import eland, ELAND
+from htsworkflow.pipelines.samplekey import SampleKey
-from htsworkflow.pipelines.runfolder import \
+from htsworkflow.pipelines import \
ElementTree, \
EUROPEAN_STRPTIME, \
LANES_PER_FLOWCELL, \
"""
Debugging function, report current object
"""
- print 'Software:'. self.__class__.__name__
- print 'Alignment version:', self.version
- print 'Run date:', self.date
- print 'config.xml:', self.tree
+ print('Software:'. self.__class__.__name__)
+ print('Alignment version:', self.version)
+ print('Run date:', self.date)
+ print('config.xml:', self.tree)
self.summary.dump()
def get_elements(self, root_tag):
return None
gerald = ElementTree.Element(root_tag,
- {'version': unicode(Gerald.XML_VERSION)})
+ {'version': str(Gerald.XML_VERSION)})
gerald.append(self.tree)
gerald.append(self.summary.get_elements())
if self.eland_results:
timestamp = self.tree.findtext('ChipWideRunParameters/TIME_STAMP')
if timestamp is not None:
- epochstamp = time.mktime(time.strptime(timestamp, '%c'))
+ epochstamp = time.mktime(time.strptime(timestamp))
return datetime.fromtimestamp(epochstamp)
return super(Gerald, self)._get_date()
date = property(_get_date)
if self.tree is None:
return
if len(self.tree.xpath('TIME_STAMP')) == 0:
- time_stamp = self.date.strftime('%c')
+ time_stamp = self.date.strftime('%a %b %d %H:%M:%S %Y')
time_element = ElementTree.Element('TIME_STAMP')
time_element.text = time_stamp
self.tree.append(time_element)
return None
time_element = self.tree.xpath('TIME_STAMP')
if len(time_element) == 1:
- return datetime.strptime(time_element[0].text, '%c')
+ timetuple = time.strptime(
+ time_element[0].text.strip(),
+ "%a %b %d %H:%M:%S %Y")
+ return datetime(*timetuple[:6])
return super(CASAVA, self)._get_date()
date = property(_get_date)
lanes = [x.tag.split('_')[1] for x in container.getchildren()]
try:
index = lanes.index(self._lane_id)
- except ValueError, e:
+ except ValueError as e:
return None
element = container[index]
return element.text
# those consistently.
for element in analysis:
sample, lane_id = element.tag.split('_')
- self._lanes[int(lane_id)] = LaneParametersGA(
+ key = SampleKey(lane=int(lane_id), sample=sample)
+ self._lanes[key] = LaneParametersGA(
self._gerald, lane_id)
def _extract_hiseq_analysis_type(self, analysis):
"""Extract from HiSeq style multiplexed analysis types"""
for element in analysis:
name = element.attrib['name']
- self._lanes[name] = LaneParametersHiSeq(self._gerald,
- name,
- element)
+ key = SampleKey(sample=name)
+ self._lanes[key] = LaneParametersHiSeq(self._gerald,
+ name,
+ element)
def __iter__(self):
if self._lanes is None:
self._initialize_lanes()
- return self._lanes.iterkeys()
+ return iter(self._lanes.keys())
def __getitem__(self, key):
if self._lanes is None:
self._initialize_lanes()
- return self._lanes[key]
+ value = self._lanes.get(key, None)
+ if value is not None:
+ return value
+ real_key = self._find_key(key)
+ if real_key is not None:
+ return self._lanes[real_key]
+ raise KeyError("%s not found in %s" % (
+ repr(key),
+ ",".join((repr(k) for k in list(self._lanes.keys())))))
def __setitem__(self, key, value):
+ if len(self._lanes) > 100:
+ LOGGER.warn("many projects loaded, consider improving dictionary")
+ real_key = self._find_key(key)
+ if real_key is not None:
+ key = real_key
self._lanes[key] = value
def __delitem__(self, key):
- del self._lanes[key]
+ if key in self._lanes:
+ del self._lanes[key]
+ else:
+ real_key = self._find_key(key)
+ if real_key is not None:
+ del self._lanes[real_key]
def __len__(self):
if self._lanes is None:
self._initialize_lanes()
return len(self._lanes)
+ def _find_key(self, lookup_key):
+ if not isinstance(lookup_key, SampleKey):
+ lookup_key = SampleKey(lane=lookup_key)
+
+ results = []
+ for k in self._lanes:
+ if k.matches(lookup_key):
+ results.append(k)
+ if len(results) > 1:
+ errmsg = "Key %s matched multiple keys: %s"
+ raise ValueError(errmsg % (str(lookup_key),
+ ",".join((str(x) for x in results))))
+
+ elif len(results) == 1:
+ return results[0]
+ else:
+ return None
def gerald(pathname):
LOGGER.info("Parsing gerald config.xml")