+"""Provide access to information stored in the GERALD directory.
"""
-Provide access to information stored in the GERALD directory.
-"""
+import collections
from datetime import datetime, date
import logging
import os
+import re
import stat
import time
from htsworkflow.pipelines.summary import Summary, SummaryGA, SummaryHiSeq
from htsworkflow.pipelines.eland import eland, ELAND
+from htsworkflow.pipelines.samplekey import SampleKey
from htsworkflow.pipelines.runfolder import \
ElementTree, \
LOGGER = logging.getLogger(__name__)
-class Gerald(object):
+class Alignment(object):
"""
Capture meaning out of the GERALD directory
"""
XML_VERSION = 1
- GERALD='Gerald'
RUN_PARAMETERS='RunParameters'
SUMMARY='Summary'
- def __init__(self, xml=None):
- self.pathname = None
- self.tree = None
+ def __init__(self, xml=None, pathname=None, tree=None):
+ self.pathname = pathname
+ self.tree = tree
# parse lane parameters out of the config.xml file
self.lanes = LaneSpecificRunParameters(self)
self.set_elements(xml)
def _get_date(self):
- if self.tree is None:
- return datetime.today()
- timestamp = self.tree.findtext('ChipWideRunParameters/TIME_STAMP')
- if timestamp is not None:
- epochstamp = time.mktime(time.strptime(timestamp, '%c'))
- return datetime.fromtimestamp(epochstamp)
if self.pathname is not None:
epochstamp = os.stat(self.pathname)[stat.ST_MTIME]
return datetime.fromtimestamp(epochstamp)
return datetime.today()
- date = property(_get_date)
def _get_time(self):
return time.mktime(self.date.timetuple())
time = property(_get_time, doc='return run time as seconds since epoch')
+ def _get_chip_attribute(self, value):
+ return self.tree.findtext('ChipWideRunParameters/%s' % (value,))
+
+ def dump(self):
+ """
+ Debugging function, report current object
+ """
+ print 'Software:'. self.__class__.__name__
+ print 'Alignment version:', self.version
+ print 'Run date:', self.date
+ print 'config.xml:', self.tree
+ self.summary.dump()
+
+ def get_elements(self, root_tag):
+ if self.tree is None or self.summary is None:
+ return None
+
+ gerald = ElementTree.Element(root_tag,
+ {'version': unicode(Gerald.XML_VERSION)})
+ gerald.append(self.tree)
+ gerald.append(self.summary.get_elements())
+ if self.eland_results:
+ gerald.append(self.eland_results.get_elements())
+ return gerald
+
+ def set_elements(self, tree, root_tag):
+ if tree.tag != root_tag:
+ raise ValueError('expected %s' % (self.__class__.GERALD,))
+ xml_version = int(tree.attrib.get('version', 0))
+ if xml_version > Gerald.XML_VERSION:
+ LOGGER.warn('XML tree is a higher version than this class')
+ self.eland_results = ELAND()
+ for element in list(tree):
+ tag = element.tag.lower()
+ if tag == Gerald.RUN_PARAMETERS.lower():
+ self.tree = element
+ elif tag == Gerald.SUMMARY.lower():
+ self.summary = Summary(xml=element)
+ elif tag == ELAND.ELAND.lower():
+ self.eland_results = ELAND(xml=element)
+ else:
+ LOGGER.warn("Unrecognized tag %s" % (element.tag,))
+
+class Gerald(Alignment):
+ GERALD='Gerald'
+
+ def _get_date(self):
+ if self.tree is None:
+ return datetime.today()
+
+ timestamp = self.tree.findtext('ChipWideRunParameters/TIME_STAMP')
+ if timestamp is not None:
+ epochstamp = time.mktime(time.strptime(timestamp))
+ return datetime.fromtimestamp(epochstamp)
+ return super(Gerald, self)._get_date()
+ date = property(_get_date)
+
+ def get_elements(self):
+ return super(Gerald, self).get_elements(Gerald.GERALD)
+
+ def set_elements(self, tree):
+ return super(Gerald, self).set_elements(tree, Gerald.GERALD)
+
def _get_experiment_root(self):
if self.tree is None:
return None
expt_root = os.path.normpath(self._get_experiment_root())
chip_expt_dir = self.tree.findtext('ChipWideRunParameters/EXPT_DIR')
- # hiseqs renamed the experiment dir location
- defaults_expt_dir = self.tree.findtext('Defaults/EXPT_DIR')
- experiment_dir = None
- if defaults_expt_dir is not None:
- _, experiment_dir = os.path.split(defaults_expt_dir)
- elif expt_root is not None and chip_expt_dir is not None:
+ if expt_root is not None and chip_expt_dir is not None:
experiment_dir = chip_expt_dir.replace(expt_root+os.path.sep, '')
experiment_dir = experiment_dir.split(os.path.sep)[0]
runfolder_name = property(_get_runfolder_name)
- def _get_version(self):
+ def _get_software_version(self):
if self.tree is None:
return None
ga_version = self.tree.findtext(
'ChipWideRunParameters/SOFTWARE_VERSION')
if ga_version is not None:
- return ga_version
- hiseq_software_node = self.tree.find('Software')
- hiseq_version = hiseq_software_node.attrib['Version']
- return hiseq_version
+ gerald = re.match("@.*GERALD.pl,v (?P<version>\d+(\.\d+)+)",
+ ga_version)
+ if gerald:
+ return ('GERALD', gerald.group('version'))
+ casava = re.match('CASAVA-(?P<version>\d+[.\d]*)',
+ ga_version)
+ if casava:
+ return ('CASAVA', casava.group('version'))
+
+ def _get_software(self):
+ """Return name of analysis software package"""
+ software_version = self._get_software_version()
+ return software_version[0] if software_version is not None else None
+ software = property(_get_software)
+ def _get_version(self):
+ """Return version number of software package"""
+ software_version = self._get_software_version()
+ return software_version[1] if software_version is not None else None
version = property(_get_version)
- def _get_chip_attribute(self, value):
- return self.tree.findtext('ChipWideRunParameters/%s' % (value,))
+class CASAVA(Alignment):
+ GERALD='Casava'
- def dump(self):
- """
- Debugging function, report current object
- """
- print 'Gerald version:', self.version
- print 'Gerald run date:', self.date
- print 'Gerald config.xml:', self.tree
- self.summary.dump()
+ def __init__(self, xml=None, pathname=None, tree=None):
+ super(CASAVA, self).__init__(xml=xml, pathname=pathname, tree=tree)
- def get_elements(self):
- if self.tree is None or self.summary is None:
+ self._add_timestamp()
+
+ def _add_timestamp(self):
+ """Manually add a time stamp to CASAVA runs"""
+ if self.tree is None:
+ return
+ if len(self.tree.xpath('TIME_STAMP')) == 0:
+ time_stamp = self.date.strftime("%a %b %d %H:%M:%S %Y")
+ time_element = ElementTree.Element('TIME_STAMP')
+ time_element.text = time_stamp
+ self.tree.append(time_element)
+
+ def _get_date(self):
+ if self.tree is None:
return None
+ time_element = self.tree.xpath('TIME_STAMP')
+ if len(time_element) == 1:
+ timetuple = time.strptime(
+ time_element[0].text.strip(),
+ "%a %b %d %H:%M:%S %Y")
+ return datetime(*timetuple[:6])
+ return super(CASAVA, self)._get_date()
+ date = property(_get_date)
- gerald = ElementTree.Element(Gerald.GERALD,
- {'version': unicode(Gerald.XML_VERSION)})
- gerald.append(self.tree)
- gerald.append(self.summary.get_elements())
- if self.eland_results:
- gerald.append(self.eland_results.get_elements())
- return gerald
+ def get_elements(self):
+ tree = super(CASAVA, self).get_elements(CASAVA.GERALD)
+ return tree
def set_elements(self, tree):
- if tree.tag != Gerald.GERALD:
- raise ValueError('exptected GERALD')
- xml_version = int(tree.attrib.get('version', 0))
- if xml_version > Gerald.XML_VERSION:
- LOGGER.warn('XML tree is a higher version than this class')
- self.eland_results = ELAND()
- for element in list(tree):
- tag = element.tag.lower()
- if tag == Gerald.RUN_PARAMETERS.lower():
- self.tree = element
- elif tag == Gerald.SUMMARY.lower():
- self.summary = Summary(xml=element)
- elif tag == ELAND.ELAND.lower():
- self.eland_results = ELAND(xml=element)
- else:
- LOGGER.warn("Unrecognized tag %s" % (element.tag,))
+ return super(CASAVA, self).set_elements(tree, CASAVA.GERALD)
+
+ def _get_runfolder_name(self):
+ if self.tree is None:
+ return None
+
+ # hiseqs renamed the experiment dir location
+ defaults_expt_dir = self.tree.findtext('Defaults/EXPT_DIR')
+ _, experiment_dir = os.path.split(defaults_expt_dir)
+
+ if experiment_dir is None or len(experiment_dir) == 0:
+ return None
+ return experiment_dir
+
+ runfolder_name = property(_get_runfolder_name)
+
+ def _get_software_version(self):
+ if self.tree is None:
+ return None
+ if self.tree is None:
+ return None
+ hiseq_software_node = self.tree.find('Software')
+ software_version = hiseq_software_node.attrib.get('Version',None)
+ if software_version is None:
+ return None
+ return software_version.split('-')
+
+ def _get_software(self):
+ software_version = self._get_software_version()
+ if software_version is None:
+ return None
+ return software_version[0]
+ software = property(_get_software)
+
+ def _get_version(self):
+ software_version = self._get_software_version()
+ if software_version is None:
+ return None
+ return software_version[1]
+ version = property(_get_version)
class LaneParameters(object):
return self.__get_attribute('USE_BASES1')
use_bases = property(_get_use_bases)
-class LaneSpecificRunParameters(object):
+class LaneSpecificRunParameters(collections.MutableMapping):
"""
Provide access to LaneSpecificRunParameters
"""
def __init__(self, gerald):
self._gerald = gerald
- self._lane = None
+ self._lanes = None
- def _initalize_lanes(self):
+ def _initialize_lanes(self):
"""
build dictionary of LaneParameters
"""
# those consistently.
for element in analysis:
sample, lane_id = element.tag.split('_')
- self._lanes[int(lane_id)] = LaneParametersGA(
+ key = SampleKey(lane=int(lane_id), sample=sample)
+ self._lanes[key] = LaneParametersGA(
self._gerald, lane_id)
def _extract_hiseq_analysis_type(self, analysis):
"""Extract from HiSeq style multiplexed analysis types"""
for element in analysis:
name = element.attrib['name']
- self._lanes[name] = LaneParametersHiSeq(self._gerald,
- name,
- element)
+ key = SampleKey(sample=name)
+ self._lanes[key] = LaneParametersHiSeq(self._gerald,
+ name,
+ element)
def __iter__(self):
+ if self._lanes is None:
+ self._initialize_lanes()
return self._lanes.iterkeys()
+
def __getitem__(self, key):
- if self._lane is None:
- self._initalize_lanes()
- return self._lanes[key]
- def get(self, key, default):
- if self._lane is None:
- self._initalize_lanes()
- return self._lanes.get(key, None)
- def keys(self):
- if self._lane is None:
- self._initalize_lanes()
- return self._lanes.keys()
- def values(self):
- if self._lane is None:
- self._initalize_lanes()
- return self._lanes.values()
- def items(self):
- if self._lane is None:
- self._initalize_lanes()
- return self._lanes.items()
+ if self._lanes is None:
+ self._initialize_lanes()
+ value = self._lanes.get(key, None)
+ if value is not None:
+ return value
+ real_key = self._find_key(key)
+ if real_key is not None:
+ return self._lanes[real_key]
+ raise KeyError("%s not found in %s" % (
+ repr(key),
+ ",".join((repr(k) for k in self._lanes.keys()))))
+
+ def __setitem__(self, key, value):
+ if len(self._lanes) > 100:
+ LOGGER.warn("many projects loaded, consider improving dictionary")
+ real_key = self._find_key(key)
+ if real_key is not None:
+ key = real_key
+ self._lanes[key] = value
+
+ def __delitem__(self, key):
+ if key in self._lanes:
+ del self._lanes[key]
+ else:
+ real_key = self._find_key(key)
+ if real_key is not None:
+ del self._lanes[real_key]
+
def __len__(self):
- if self._lane is None:
- self._initalize_lanes()
+ if self._lanes is None:
+ self._initialize_lanes()
return len(self._lanes)
+ def _find_key(self, lookup_key):
+ if not isinstance(lookup_key, SampleKey):
+ lookup_key = SampleKey(lane=lookup_key)
+
+ results = []
+ for k in self._lanes:
+ if k.matches(lookup_key):
+ results.append(k)
+ if len(results) > 1:
+ errmsg = "Key %s matched multiple keys: %s"
+ raise ValueError(errmsg % (str(lookup_key),
+ ",".join((str(x) for x in results))))
+
+ elif len(results) == 1:
+ return results[0]
+ else:
+ return None
def gerald(pathname):
- g = Gerald()
- g.pathname = os.path.expanduser(pathname)
- path, name = os.path.split(g.pathname)
LOGGER.info("Parsing gerald config.xml")
- config_pathname = os.path.join(g.pathname, 'config.xml')
- g.tree = ElementTree.parse(config_pathname).getroot()
+ pathname = os.path.expanduser(pathname)
+ config_pathname = os.path.join(pathname, 'config.xml')
+ config_tree = ElementTree.parse(config_pathname).getroot()
# parse Summary.htm file
- summary_xml = os.path.join(g.pathname, 'Summary.xml')
- summary_htm = os.path.join(g.pathname, 'Summary.htm')
- report_summary = os.path.join(g.pathname, '..', 'Data',
+ summary_xml = os.path.join(pathname, 'Summary.xml')
+ summary_htm = os.path.join(pathname, 'Summary.htm')
+ report_summary = os.path.join(pathname, '..', 'Data',
'reports', 'Summary', )
if os.path.exists(summary_xml):
+ g = Gerald(pathname = pathname, tree=config_tree)
LOGGER.info("Parsing Summary.xml")
g.summary = SummaryGA(summary_xml)
g.eland_results = eland(g.pathname, g)
elif os.path.exists(summary_htm):
+ g = Gerald(pathname=pathname, tree=config_tree)
LOGGER.info("Parsing Summary.htm")
g.summary = SummaryGA(summary_htm)
g.eland_results = eland(g.pathname, g)
elif os.path.isdir(report_summary):
+ g = CASAVA(pathname=pathname, tree=config_tree)
LOGGER.info("Parsing %s" % (report_summary,))
g.summary = SummaryHiSeq(report_summary)
+ g.eland_results = eland(g.pathname, g)
# parse eland files
return g