+"""Provide access to information stored in the GERALD directory.
"""
-Provide access to information stored in the GERALD directory.
-"""
+import collections
from datetime import datetime, date
import logging
import os
from htsworkflow.pipelines.summary import Summary, SummaryGA, SummaryHiSeq
from htsworkflow.pipelines.eland import eland, ELAND
+from htsworkflow.pipelines.samplekey import SampleKey
from htsworkflow.pipelines.runfolder import \
ElementTree, \
if xml is not None:
self.set_elements(xml)
+ def _get_date(self):
+ if self.pathname is not None:
+ epochstamp = os.stat(self.pathname)[stat.ST_MTIME]
+ return datetime.fromtimestamp(epochstamp)
+ return datetime.today()
+
def _get_time(self):
return time.mktime(self.date.timetuple())
time = property(_get_time, doc='return run time as seconds since epoch')
print 'config.xml:', self.tree
self.summary.dump()
- def get_elements(self):
+ def get_elements(self, root_tag):
if self.tree is None or self.summary is None:
return None
- gerald = ElementTree.Element(Gerald.GERALD,
+ gerald = ElementTree.Element(root_tag,
{'version': unicode(Gerald.XML_VERSION)})
gerald.append(self.tree)
gerald.append(self.summary.get_elements())
gerald.append(self.eland_results.get_elements())
return gerald
- def set_elements(self, tree):
- if tree.tag != self.__class__.GERALD:
- raise ValueError('expected GERALD')
+ def set_elements(self, tree, root_tag):
+ if tree.tag != root_tag:
+ raise ValueError('expected %s' % (self.__class__.GERALD,))
xml_version = int(tree.attrib.get('version', 0))
if xml_version > Gerald.XML_VERSION:
LOGGER.warn('XML tree is a higher version than this class')
def _get_date(self):
if self.tree is None:
return datetime.today()
+
timestamp = self.tree.findtext('ChipWideRunParameters/TIME_STAMP')
if timestamp is not None:
- epochstamp = time.mktime(time.strptime(timestamp, '%c'))
- return datetime.fromtimestamp(epochstamp)
- if self.pathname is not None:
- epochstamp = os.stat(self.pathname)[stat.ST_MTIME]
+ epochstamp = time.mktime(time.strptime(timestamp))
return datetime.fromtimestamp(epochstamp)
- return datetime.today()
+ return super(Gerald, self)._get_date()
date = property(_get_date)
+ def get_elements(self):
+ return super(Gerald, self).get_elements(Gerald.GERALD)
+
+ def set_elements(self, tree):
+ return super(Gerald, self).set_elements(tree, Gerald.GERALD)
+
def _get_experiment_root(self):
if self.tree is None:
return None
runfolder_name = property(_get_runfolder_name)
- def _get_version(self):
+ def _get_software_version(self):
if self.tree is None:
return None
ga_version = self.tree.findtext(
'ChipWideRunParameters/SOFTWARE_VERSION')
if ga_version is not None:
- match = re.match("@.*GERALD.pl,v (?P<version>\d+(\.\d+)+)",
+ gerald = re.match("@.*GERALD.pl,v (?P<version>\d+(\.\d+)+)",
ga_version)
- if match:
- return match.group('version')
- return ga_version
+ if gerald:
+ return ('GERALD', gerald.group('version'))
+ casava = re.match('CASAVA-(?P<version>\d+[.\d]*)',
+ ga_version)
+ if casava:
+ return ('CASAVA', casava.group('version'))
+
+ def _get_software(self):
+ """Return name of analysis software package"""
+ software_version = self._get_software_version()
+ return software_version[0] if software_version is not None else None
+ software = property(_get_software)
+
+ def _get_version(self):
+ """Return version number of software package"""
+ software_version = self._get_software_version()
+ return software_version[1] if software_version is not None else None
version = property(_get_version)
class CASAVA(Alignment):
GERALD='Casava'
+ def __init__(self, xml=None, pathname=None, tree=None):
+ super(CASAVA, self).__init__(xml=xml, pathname=pathname, tree=tree)
+
+ self._add_timestamp()
+
+ def _add_timestamp(self):
+ """Manually add a time stamp to CASAVA runs"""
+ if self.tree is None:
+ return
+ if len(self.tree.xpath('TIME_STAMP')) == 0:
+ time_stamp = self.date.strftime('%c')
+ time_element = ElementTree.Element('TIME_STAMP')
+ time_element.text = time_stamp
+ self.tree.append(time_element)
+
+ def _get_date(self):
+ if self.tree is None:
+ return None
+ time_element = self.tree.xpath('TIME_STAMP')
+ if len(time_element) == 1:
+ timetuple = time.strptime(
+ time_element[0].text.strip(),
+ "%a %d %b %Y %I:%M:%S %p")
+ return datetime(*timetuple[:6])
+ return super(CASAVA, self)._get_date()
+ date = property(_get_date)
+
+ def get_elements(self):
+ tree = super(CASAVA, self).get_elements(CASAVA.GERALD)
+ return tree
+
+ def set_elements(self, tree):
+ return super(CASAVA, self).set_elements(tree, CASAVA.GERALD)
+
def _get_runfolder_name(self):
if self.tree is None:
return None
runfolder_name = property(_get_runfolder_name)
- def _get_version(self):
+ def _get_software_version(self):
+ if self.tree is None:
+ return None
if self.tree is None:
return None
hiseq_software_node = self.tree.find('Software')
- hiseq_version = hiseq_software_node.attrib['Version']
- return hiseq_version
+ software_version = hiseq_software_node.attrib.get('Version',None)
+ if software_version is None:
+ return None
+ return software_version.split('-')
+
+ def _get_software(self):
+ software_version = self._get_software_version()
+ if software_version is None:
+ return None
+ return software_version[0]
+ software = property(_get_software)
+
+ def _get_version(self):
+ software_version = self._get_software_version()
+ if software_version is None:
+ return None
+ return software_version[1]
version = property(_get_version)
return self.__get_attribute('USE_BASES1')
use_bases = property(_get_use_bases)
-class LaneSpecificRunParameters(object):
+class LaneSpecificRunParameters(collections.MutableMapping):
"""
Provide access to LaneSpecificRunParameters
"""
def __init__(self, gerald):
self._gerald = gerald
- self._lane = None
+ self._lanes = None
- def _initalize_lanes(self):
+ def _initialize_lanes(self):
"""
build dictionary of LaneParameters
"""
# those consistently.
for element in analysis:
sample, lane_id = element.tag.split('_')
- self._lanes[int(lane_id)] = LaneParametersGA(
+ key = SampleKey(lane=int(lane_id), sample=sample)
+ self._lanes[key] = LaneParametersGA(
self._gerald, lane_id)
def _extract_hiseq_analysis_type(self, analysis):
"""Extract from HiSeq style multiplexed analysis types"""
for element in analysis:
name = element.attrib['name']
- self._lanes[name] = LaneParametersHiSeq(self._gerald,
- name,
- element)
+ key = SampleKey(sample=name)
+ self._lanes[key] = LaneParametersHiSeq(self._gerald,
+ name,
+ element)
def __iter__(self):
+ if self._lanes is None:
+ self._initialize_lanes()
return self._lanes.iterkeys()
+
def __getitem__(self, key):
- if self._lane is None:
- self._initalize_lanes()
- return self._lanes[key]
- def get(self, key, default):
- if self._lane is None:
- self._initalize_lanes()
- return self._lanes.get(key, None)
- def keys(self):
- if self._lane is None:
- self._initalize_lanes()
- return self._lanes.keys()
- def values(self):
- if self._lane is None:
- self._initalize_lanes()
- return self._lanes.values()
- def items(self):
- if self._lane is None:
- self._initalize_lanes()
- return self._lanes.items()
+ if self._lanes is None:
+ self._initialize_lanes()
+ value = self._lanes.get(key, None)
+ if value is not None:
+ return value
+ real_key = self._find_key(key)
+ if real_key is not None:
+ return self._lanes[real_key]
+ raise KeyError("%s not found in %s" % (
+ repr(key),
+ ",".join((repr(k) for k in self._lanes.keys()))))
+
+ def __setitem__(self, key, value):
+ if len(self._lanes) > 100:
+ LOGGER.warn("many projects loaded, consider improving dictionary")
+ real_key = self._find_key(key)
+ if real_key is not None:
+ key = real_key
+ self._lanes[key] = value
+
+ def __delitem__(self, key):
+ if key in self._lanes:
+ del self._lanes[key]
+ else:
+ real_key = self._find_key(key)
+ if real_key is not None:
+ del self._lanes[real_key]
+
def __len__(self):
- if self._lane is None:
- self._initalize_lanes()
+ if self._lanes is None:
+ self._initialize_lanes()
return len(self._lanes)
+ def _find_key(self, lookup_key):
+ if not isinstance(lookup_key, SampleKey):
+ lookup_key = SampleKey(lane=lookup_key)
+
+ results = []
+ for k in self._lanes:
+ if k.matches(lookup_key):
+ results.append(k)
+ if len(results) > 1:
+ errmsg = "Key %s matched multiple keys: %s"
+ raise ValueError(errmsg % (str(lookup_key),
+ ",".join((str(x) for x in results))))
+
+ elif len(results) == 1:
+ return results[0]
+ else:
+ return None
def gerald(pathname):
LOGGER.info("Parsing gerald config.xml")
g = CASAVA(pathname=pathname, tree=config_tree)
LOGGER.info("Parsing %s" % (report_summary,))
g.summary = SummaryHiSeq(report_summary)
+ g.eland_results = eland(g.pathname, g)
# parse eland files
return g