+"""Provide access to information stored in the GERALD directory.
"""
-Provide access to information stored in the GERALD directory.
-"""
+import collections
from datetime import datetime, date
-from glob import glob
import logging
import os
+import re
import stat
import time
-import types
-from htsworkflow.pipelines.runfolder import \
+from htsworkflow.pipelines.summary import Summary, SummaryGA, SummaryHiSeq
+from htsworkflow.pipelines.eland import eland, ELAND
+from htsworkflow.pipelines.samplekey import SampleKey
+
+from htsworkflow.pipelines import \
ElementTree, \
EUROPEAN_STRPTIME, \
LANES_PER_FLOWCELL, \
VERSION_RE
from htsworkflow.util.ethelp import indent, flatten
-from htsworkflow.util.opener import autoopen
-class Gerald(object):
+LOGGER = logging.getLogger(__name__)
+
+class Alignment(object):
"""
Capture meaning out of the GERALD directory
"""
XML_VERSION = 1
- GERALD='Gerald'
RUN_PARAMETERS='RunParameters'
SUMMARY='Summary'
- class LaneParameters(object):
- """
- Make it easy to access elements of LaneSpecificRunParameters from python
- """
- def __init__(self, gerald, key):
- self._gerald = gerald
- self._key = key
-
- def __get_attribute(self, xml_tag):
- subtree = self._gerald.tree.find('LaneSpecificRunParameters')
- container = subtree.find(xml_tag)
- if container is None:
- return None
- if len(container.getchildren()) > LANES_PER_FLOWCELL:
- raise RuntimeError('GERALD config.xml file changed')
- lanes = [x.tag.split('_')[1] for x in container.getchildren()]
- index = lanes.index(self._key)
- element = container[index]
- return element.text
- def _get_analysis(self):
- return self.__get_attribute('ANALYSIS')
- analysis = property(_get_analysis)
-
- def _get_eland_genome(self):
- genome = self.__get_attribute('ELAND_GENOME')
- # default to the chipwide parameters if there isn't an
- # entry in the lane specific paramaters
- if genome is None:
- subtree = self._gerald.tree.find('ChipWideRunParameters')
- container = subtree.find('ELAND_GENOME')
- genome = container.text
- return genome
- eland_genome = property(_get_eland_genome)
-
- def _get_read_length(self):
- return self.__get_attribute('READ_LENGTH')
- read_length = property(_get_read_length)
-
- def _get_use_bases(self):
- return self.__get_attribute('USE_BASES')
- use_bases = property(_get_use_bases)
-
- class LaneSpecificRunParameters(object):
- """
- Provide access to LaneSpecificRunParameters
- """
- def __init__(self, gerald):
- self._gerald = gerald
- self._keys = None
- def __getitem__(self, key):
- return Gerald.LaneParameters(self._gerald, key)
- def keys(self):
- if self._keys is None:
- tree = self._gerald.tree
- analysis = tree.find('LaneSpecificRunParameters/ANALYSIS')
- # according to the pipeline specs I think their fields
- # are sampleName_laneID, with sampleName defaulting to s
- # since laneIDs are constant lets just try using
- # those consistently.
- self._keys = [ x.tag.split('_')[1] for x in analysis]
- return self._keys
- def values(self):
- return [ self[x] for x in self.keys() ]
- def items(self):
- return zip(self.keys(), self.values())
- def __len__(self):
- return len(self.keys())
-
- def __init__(self, xml=None):
- self.pathname = None
- self.tree = None
+ def __init__(self, xml=None, pathname=None, tree=None):
+ self.pathname = pathname
+ self.tree = tree
# parse lane parameters out of the config.xml file
- self.lanes = Gerald.LaneSpecificRunParameters(self)
+ self.lanes = LaneSpecificRunParameters(self)
self.summary = None
self.eland_results = None
self.set_elements(xml)
def _get_date(self):
- if self.tree is None:
- return datetime.today()
- timestamp = self.tree.findtext('ChipWideRunParameters/TIME_STAMP')
- epochstamp = time.mktime(time.strptime(timestamp, '%c'))
- return datetime.fromtimestamp(epochstamp)
- date = property(_get_date)
+ if self.pathname is not None:
+ epochstamp = os.stat(self.pathname)[stat.ST_MTIME]
+ return datetime.fromtimestamp(epochstamp)
+ return datetime.today()
def _get_time(self):
return time.mktime(self.date.timetuple())
time = property(_get_time, doc='return run time as seconds since epoch')
- def _get_version(self):
- if self.tree is None:
- return None
- return self.tree.findtext('ChipWideRunParameters/SOFTWARE_VERSION')
- version = property(_get_version)
+ def _get_chip_attribute(self, value):
+ return self.tree.findtext('ChipWideRunParameters/%s' % (value,))
def dump(self):
"""
Debugging function, report current object
"""
- print 'Gerald version:', self.version
- print 'Gerald run date:', self.date
- print 'Gerald config.xml:', self.tree
+ print 'Software:'. self.__class__.__name__
+ print 'Alignment version:', self.version
+ print 'Run date:', self.date
+ print 'config.xml:', self.tree
self.summary.dump()
- def get_elements(self):
+ def get_elements(self, root_tag):
if self.tree is None or self.summary is None:
return None
- gerald = ElementTree.Element(Gerald.GERALD,
+ gerald = ElementTree.Element(root_tag,
{'version': unicode(Gerald.XML_VERSION)})
gerald.append(self.tree)
gerald.append(self.summary.get_elements())
gerald.append(self.eland_results.get_elements())
return gerald
- def set_elements(self, tree):
- if tree.tag != Gerald.GERALD:
- raise ValueError('exptected GERALD')
+ def set_elements(self, tree, root_tag):
+ if tree.tag != root_tag:
+ raise ValueError('expected %s' % (self.__class__.GERALD,))
xml_version = int(tree.attrib.get('version', 0))
if xml_version > Gerald.XML_VERSION:
- logging.warn('XML tree is a higher version than this class')
+ LOGGER.warn('XML tree is a higher version than this class')
+ self.eland_results = ELAND()
for element in list(tree):
tag = element.tag.lower()
if tag == Gerald.RUN_PARAMETERS.lower():
elif tag == ELAND.ELAND.lower():
self.eland_results = ELAND(xml=element)
else:
- logging.warn("Unrecognized tag %s" % (element.tag,))
+ LOGGER.warn("Unrecognized tag %s" % (element.tag,))
+class Gerald(Alignment):
+ GERALD='Gerald'
-def gerald(pathname):
- g = Gerald()
- g.pathname = pathname
- path, name = os.path.split(pathname)
- logging.info("Parsing gerald config.xml")
- config_pathname = os.path.join(pathname, 'config.xml')
- g.tree = ElementTree.parse(config_pathname).getroot()
+ def _get_date(self):
+ if self.tree is None:
+ return datetime.today()
- # parse Summary.htm file
- logging.info("Parsing Summary.htm")
- summary_pathname = os.path.join(pathname, 'Summary.htm')
- g.summary = Summary(summary_pathname)
- # parse eland files
- g.eland_results = eland(g.pathname, g)
- return g
+ timestamp = self.tree.findtext('ChipWideRunParameters/TIME_STAMP')
+ if timestamp is not None:
+ epochstamp = time.mktime(time.strptime(timestamp))
+ return datetime.fromtimestamp(epochstamp)
+ return super(Gerald, self)._get_date()
+ date = property(_get_date)
-def tonumber(v):
- """
- Convert a value to int if its an int otherwise a float.
- """
- try:
- v = int(v)
- except ValueError, e:
- v = float(v)
- return v
+ def get_elements(self):
+ return super(Gerald, self).get_elements(Gerald.GERALD)
-def parse_mean_range(value):
- """
- Parse values like 123 +/- 4.5
- """
- if value.strip() == 'unknown':
- return 0, 0
+ def set_elements(self, tree):
+ return super(Gerald, self).set_elements(tree, Gerald.GERALD)
- average, pm, deviation = value.split()
- if pm != '+/-':
- raise RuntimeError("Summary.htm file format changed")
- return tonumber(average), tonumber(deviation)
+ def _get_experiment_root(self):
+ if self.tree is None:
+ return None
+ return self.tree.findtext('ChipWideRunParameters/EXPT_DIR_ROOT')
-def make_mean_range_element(parent, name, mean, deviation):
- """
- Make an ElementTree subelement <Name mean='mean', deviation='deviation'/>
- """
- element = ElementTree.SubElement(parent, name,
- { 'mean': unicode(mean),
- 'deviation': unicode(deviation)})
- return element
+ def _get_runfolder_name(self):
+ if self.tree is None:
+ return None
-def parse_mean_range_element(element):
- """
- Grab mean/deviation out of element
- """
- return (tonumber(element.attrib['mean']),
- tonumber(element.attrib['deviation']))
+ expt_root = os.path.normpath(self._get_experiment_root())
+ chip_expt_dir = self.tree.findtext('ChipWideRunParameters/EXPT_DIR')
-def parse_summary_element(element):
- """
- Determine if we have a simple element or a mean/deviation element
- """
- if len(element.attrib) > 0:
- return parse_mean_range_element(element)
- else:
- return element.text
+ if expt_root is not None and chip_expt_dir is not None:
+ experiment_dir = chip_expt_dir.replace(expt_root+os.path.sep, '')
+ experiment_dir = experiment_dir.split(os.path.sep)[0]
-class Summary(object):
- """
- Extract some useful information from the Summary.htm file
- """
- XML_VERSION = 2
- SUMMARY = 'Summary'
+ if experiment_dir is None or len(experiment_dir) == 0:
+ return None
+ return experiment_dir
- class LaneResultSummary(object):
- """
- Parse the LaneResultSummary table out of Summary.htm
- Mostly for the cluster number
- """
- LANE_RESULT_SUMMARY = 'LaneResultSummary'
- TAGS = {
- 'LaneYield': 'lane_yield',
- 'Cluster': 'cluster', # Raw
- 'ClusterPF': 'cluster_pass_filter',
- 'AverageFirstCycleIntensity': 'average_first_cycle_intensity',
- 'PercentIntensityAfter20Cycles': 'percent_intensity_after_20_cycles',
- 'PercentPassFilterClusters': 'percent_pass_filter_clusters',
- 'PercentPassFilterAlign': 'percent_pass_filter_align',
- 'AverageAlignmentScore': 'average_alignment_score',
- 'PercentErrorRate': 'percent_error_rate'
- }
-
- def __init__(self, html=None, xml=None):
- self.lane = None
- self.lane_yield = None
- self.cluster = None
- self.cluster_pass_filter = None
- self.average_first_cycle_intensity = None
- self.percent_intensity_after_20_cycles = None
- self.percent_pass_filter_clusters = None
- self.percent_pass_filter_align = None
- self.average_alignment_score = None
- self.percent_error_rate = None
-
- if html is not None:
- self.set_elements_from_html(html)
- if xml is not None:
- self.set_elements(xml)
-
- def set_elements_from_html(self, data):
- if not len(data) in (8,10):
- raise RuntimeError("Summary.htm file format changed")
-
- # same in pre-0.3.0 Summary file and 0.3 summary file
- self.lane = data[0]
-
- if len(data) == 8:
- parsed_data = [ parse_mean_range(x) for x in data[1:] ]
- # this is the < 0.3 Pipeline version
- self.cluster = parsed_data[0]
- self.average_first_cycle_intensity = parsed_data[1]
- self.percent_intensity_after_20_cycles = parsed_data[2]
- self.percent_pass_filter_clusters = parsed_data[3]
- self.percent_pass_filter_align = parsed_data[4]
- self.average_alignment_score = parsed_data[5]
- self.percent_error_rate = parsed_data[6]
- elif len(data) == 10:
- parsed_data = [ parse_mean_range(x) for x in data[2:] ]
- # this is the >= 0.3 summary file
- self.lane_yield = data[1]
- self.cluster = parsed_data[0]
- self.cluster_pass_filter = parsed_data[1]
- self.average_first_cycle_intensity = parsed_data[2]
- self.percent_intensity_after_20_cycles = parsed_data[3]
- self.percent_pass_filter_clusters = parsed_data[4]
- self.percent_pass_filter_align = parsed_data[5]
- self.average_alignment_score = parsed_data[6]
- self.percent_error_rate = parsed_data[7]
-
- def get_elements(self):
- lane_result = ElementTree.Element(
- Summary.LaneResultSummary.LANE_RESULT_SUMMARY,
- {'lane': self.lane})
- for tag, variable_name in Summary.LaneResultSummary.TAGS.items():
- value = getattr(self, variable_name)
- if value is None:
- continue
- # it looks like a sequence
- elif type(value) in (types.TupleType, types.ListType):
- element = make_mean_range_element(
- lane_result,
- tag,
- *value
- )
- else:
- element = ElementTree.SubElement(lane_result, tag)
- element.text = value
- return lane_result
-
- def set_elements(self, tree):
- if tree.tag != Summary.LaneResultSummary.LANE_RESULT_SUMMARY:
- raise ValueError('Expected %s' % (
- Summary.LaneResultSummary.LANE_RESULT_SUMMARY))
- self.lane = tree.attrib['lane']
- tags = Summary.LaneResultSummary.TAGS
- for element in list(tree):
- try:
- variable_name = tags[element.tag]
- setattr(self, variable_name,
- parse_summary_element(element))
- except KeyError, e:
- logging.warn('Unrecognized tag %s' % (element.tag,))
-
- def __init__(self, filename=None, xml=None):
- self.lane_results = {}
-
- if filename is not None:
- self._extract_lane_results(filename)
- if xml is not None:
- self.set_elements(xml)
+ runfolder_name = property(_get_runfolder_name)
- def __getitem__(self, key):
- return self.lane_results[key]
+ def _get_software_version(self):
+ if self.tree is None:
+ return None
+ ga_version = self.tree.findtext(
+ 'ChipWideRunParameters/SOFTWARE_VERSION')
+ if ga_version is not None:
+ gerald = re.match("@.*GERALD.pl,v (?P<version>\d+(\.\d+)+)",
+ ga_version)
+ if gerald:
+ return ('GERALD', gerald.group('version'))
+ casava = re.match('CASAVA-(?P<version>\d+[.\d]*)',
+ ga_version)
+ if casava:
+ return ('CASAVA', casava.group('version'))
+
+ def _get_software(self):
+ """Return name of analysis software package"""
+ software_version = self._get_software_version()
+ return software_version[0] if software_version is not None else None
+ software = property(_get_software)
- def __len__(self):
- return len(self.lane_results)
+ def _get_version(self):
+ """Return version number of software package"""
+ software_version = self._get_software_version()
+ return software_version[1] if software_version is not None else None
+ version = property(_get_version)
- def keys(self):
- return self.lane_results.keys()
+class CASAVA(Alignment):
+ GERALD='Casava'
- def values(self):
- return self.lane_results.values()
+ def __init__(self, xml=None, pathname=None, tree=None):
+ super(CASAVA, self).__init__(xml=xml, pathname=pathname, tree=tree)
- def items(self):
- return self.lane_results.items()
+ self._add_timestamp()
- def _flattened_row(self, row):
- """
- flatten the children of a <tr>...</tr>
- """
- return [flatten(x) for x in row.getchildren() ]
+ def _add_timestamp(self):
+ """Manually add a time stamp to CASAVA runs"""
+ if self.tree is None:
+ return
+ if len(self.tree.xpath('TIME_STAMP')) == 0:
+ time_stamp = self.date.strftime('%a %b %d %H:%M:%S %Y')
+ time_element = ElementTree.Element('TIME_STAMP')
+ time_element.text = time_stamp
+ self.tree.append(time_element)
- def _parse_table(self, table):
- """
- assumes the first line is the header of a table,
- and that the remaining rows are data
- """
- rows = table.getchildren()
- data = []
- for r in rows:
- data.append(self._flattened_row(r))
- return data
+ def _get_date(self):
+ if self.tree is None:
+ return None
+ time_element = self.tree.xpath('TIME_STAMP')
+ if len(time_element) == 1:
+ timetuple = time.strptime(
+ time_element[0].text.strip(),
+ "%a %b %d %H:%M:%S %Y")
+ return datetime(*timetuple[:6])
+ return super(CASAVA, self)._get_date()
+ date = property(_get_date)
- def _extract_named_tables(self, pathname):
- """
- extract all the 'named' tables from a Summary.htm file
- and return as a dictionary
+ def get_elements(self):
+ tree = super(CASAVA, self).get_elements(CASAVA.GERALD)
+ return tree
- Named tables are <h2>...</h2><table>...</table> pairs
- The contents of the h2 tag is considered to the name
- of the table.
- """
- tree = ElementTree.parse(pathname).getroot()
- body = tree.find('body')
- tables = {}
- for i in range(len(body)):
- if body[i].tag == 'h2' and body[i+1].tag == 'table':
- # we have an interesting table
- name = flatten(body[i])
- table = body[i+1]
- data = self._parse_table(table)
- tables[name] = data
- return tables
-
- def _extract_lane_results(self, pathname):
- """
- extract the Lane Results Summary table
- """
+ def set_elements(self, tree):
+ return super(CASAVA, self).set_elements(tree, CASAVA.GERALD)
- tables = self._extract_named_tables(pathname)
+ def _get_runfolder_name(self):
+ if self.tree is None:
+ return None
- # parse lane result summary
- lane_summary = tables['Lane Results Summary']
- # this is version 1 of the summary file
- if len(lane_summary[-1]) == 8:
- # strip header
- headers = lane_summary[0]
- # grab the lane by lane data
- lane_summary = lane_summary[1:]
+ # hiseqs renamed the experiment dir location
+ defaults_expt_dir = self.tree.findtext('Defaults/EXPT_DIR')
+ _, experiment_dir = os.path.split(defaults_expt_dir)
- # this is version 2 of the summary file
- if len(lane_summary[-1]) == 10:
- # lane_summary[0] is a different less specific header row
- headers = lane_summary[1]
- lane_summary = lane_summary[2:10]
- # after the last lane, there's a set of chip wide averages
+ if experiment_dir is None or len(experiment_dir) == 0:
+ return None
+ return experiment_dir
- for r in lane_summary:
- lrs = Summary.LaneResultSummary(html=r)
- self.lane_results[lrs.lane] = lrs
+ runfolder_name = property(_get_runfolder_name)
- def get_elements(self):
- summary = ElementTree.Element(Summary.SUMMARY,
- {'version': unicode(Summary.XML_VERSION)})
- for lane in self.lane_results.values():
- summary.append(lane.get_elements())
- return summary
+ def _get_software_version(self):
+ if self.tree is None:
+ return None
+ if self.tree is None:
+ return None
+ hiseq_software_node = self.tree.find('Software')
+ software_version = hiseq_software_node.attrib.get('Version',None)
+ if software_version is None:
+ return None
+ return software_version.split('-')
- def set_elements(self, tree):
- if tree.tag != Summary.SUMMARY:
- return ValueError("Expected %s" % (Summary.SUMMARY,))
- xml_version = int(tree.attrib.get('version', 0))
- if xml_version > Summary.XML_VERSION:
- logging.warn('Summary XML tree is a higher version than this class')
- for element in list(tree):
- lrs = Summary.LaneResultSummary()
- lrs.set_elements(element)
- self.lane_results[lrs.lane] = lrs
+ def _get_software(self):
+ software_version = self._get_software_version()
+ if software_version is None:
+ return None
+ return software_version[0]
+ software = property(_get_software)
+
+ def _get_version(self):
+ software_version = self._get_software_version()
+ if software_version is None:
+ return None
+ return software_version[1]
+ version = property(_get_version)
- def dump(self):
- """
- Debugging function, report current object
- """
- pass
-
-
-def build_genome_fasta_map(genome_dir):
- # build fasta to fasta file map
- logging.info("Building genome map")
- genome = genome_dir.split(os.path.sep)[-1]
- fasta_map = {}
- for vld_file in glob(os.path.join(genome_dir, '*.vld')):
- is_link = False
- if os.path.islink(vld_file):
- is_link = True
- vld_file = os.path.realpath(vld_file)
- path, vld_name = os.path.split(vld_file)
- name, ext = os.path.splitext(vld_name)
- if is_link:
- fasta_map[name] = name
- else:
- fasta_map[name] = os.path.join(genome, name)
- return fasta_map
-class ElandLane(object):
+class LaneParameters(object):
"""
- Process an eland result file
+ Make it easy to access elements of LaneSpecificRunParameters from python
"""
- XML_VERSION = 1
- LANE = 'ElandLane'
- SAMPLE_NAME = 'SampleName'
- LANE_ID = 'LaneID'
- GENOME_MAP = 'GenomeMap'
- GENOME_ITEM = 'GenomeItem'
- MAPPED_READS = 'MappedReads'
- MAPPED_ITEM = 'MappedItem'
- MATCH_CODES = 'MatchCodes'
- MATCH_ITEM = 'Code'
- READS = 'Reads'
-
- def __init__(self, pathname=None, genome_map=None, xml=None):
- self.pathname = pathname
- self._sample_name = None
- self._lane_id = None
- self._reads = None
- self._mapped_reads = None
- self._match_codes = None
- if genome_map is None:
- genome_map = {}
- self.genome_map = genome_map
+ def __init__(self, gerald, lane_id):
+ self._gerald = gerald
+ self._lane_id = lane_id
- if xml is not None:
- self.set_elements(xml)
+ def _get_analysis(self):
+ raise NotImplemented("abstract class")
+ analysis = property(_get_analysis)
- def _update(self):
- """
- Actually read the file and actually count the reads
- """
- # can't do anything if we don't have a file to process
- if self.pathname is None:
- return
+ def _get_eland_genome(self):
+ raise NotImplemented("abstract class")
+ eland_genome = property(_get_eland_genome)
- if os.stat(self.pathname)[stat.ST_SIZE] == 0:
- raise RuntimeError("Eland isn't done, try again later.")
-
- logging.info("summarizing results for %s" % (self.pathname))
- reads = 0
- mapped_reads = {}
-
- match_codes = {'NM':0, 'QC':0, 'RM':0,
- 'U0':0, 'U1':0, 'U2':0,
- 'R0':0, 'R1':0, 'R2':0,
- }
- for line in autoopen(self.pathname,'r'):
- reads += 1
- fields = line.split()
- # code = fields[2]
- # match_codes[code] = match_codes.setdefault(code, 0) + 1
- # the QC/NM etc codes are in the 3rd field and always present
- match_codes[fields[2]] += 1
- # ignore lines that don't have a fasta filename
- if len(fields) < 7:
- continue
- fasta = self.genome_map.get(fields[6], fields[6])
- mapped_reads[fasta] = mapped_reads.setdefault(fasta, 0) + 1
- self._match_codes = match_codes
- self._mapped_reads = mapped_reads
- self._reads = reads
-
- def _update_name(self):
- # extract the sample name
- if self.pathname is None:
- return
+ def _get_read_length(self):
+ raise NotImplemented("abstract class")
+ read_length = property(_get_read_length)
- path, name = os.path.split(self.pathname)
- split_name = name.split('_')
- self._sample_name = split_name[0]
- self._lane_id = split_name[1]
-
- def _get_sample_name(self):
- if self._sample_name is None:
- self._update_name()
- return self._sample_name
- sample_name = property(_get_sample_name)
-
- def _get_lane_id(self):
- if self._lane_id is None:
- self._update_name()
- return self._lane_id
- lane_id = property(_get_lane_id)
-
- def _get_reads(self):
- if self._reads is None:
- self._update()
- return self._reads
- reads = property(_get_reads)
-
- def _get_mapped_reads(self):
- if self._mapped_reads is None:
- self._update()
- return self._mapped_reads
- mapped_reads = property(_get_mapped_reads)
-
- def _get_match_codes(self):
- if self._match_codes is None:
- self._update()
- return self._match_codes
- match_codes = property(_get_match_codes)
+ def _get_use_bases(self):
+ raise NotImplemented("abstract class")
+ use_bases = property(_get_use_bases)
- def get_elements(self):
- lane = ElementTree.Element(ElandLane.LANE,
- {'version':
- unicode(ElandLane.XML_VERSION)})
- sample_tag = ElementTree.SubElement(lane, ElandLane.SAMPLE_NAME)
- sample_tag.text = self.sample_name
- lane_tag = ElementTree.SubElement(lane, ElandLane.LANE_ID)
- lane_tag.text = self.lane_id
- genome_map = ElementTree.SubElement(lane, ElandLane.GENOME_MAP)
- for k, v in self.genome_map.items():
- item = ElementTree.SubElement(
- genome_map, ElandLane.GENOME_ITEM,
- {'name':k, 'value':unicode(v)})
- mapped_reads = ElementTree.SubElement(lane, ElandLane.MAPPED_READS)
- for k, v in self.mapped_reads.items():
- item = ElementTree.SubElement(
- mapped_reads, ElandLane.MAPPED_ITEM,
- {'name':k, 'value':unicode(v)})
- match_codes = ElementTree.SubElement(lane, ElandLane.MATCH_CODES)
- for k, v in self.match_codes.items():
- item = ElementTree.SubElement(
- match_codes, ElandLane.MATCH_ITEM,
- {'name':k, 'value':unicode(v)})
- reads = ElementTree.SubElement(lane, ElandLane.READS)
- reads.text = unicode(self.reads)
-
- return lane
- def set_elements(self, tree):
- if tree.tag != ElandLane.LANE:
- raise ValueError('Exptecting %s' % (ElandLane.LANE,))
-
- # reset dictionaries
- self._mapped_reads = {}
- self._match_codes = {}
-
- for element in tree:
- tag = element.tag.lower()
- if tag == ElandLane.SAMPLE_NAME.lower():
- self._sample_name = element.text
- elif tag == ElandLane.LANE_ID.lower():
- self._lane_id = element.text
- elif tag == ElandLane.GENOME_MAP.lower():
- for child in element:
- name = child.attrib['name']
- value = child.attrib['value']
- self.genome_map[name] = value
- elif tag == ElandLane.MAPPED_READS.lower():
- for child in element:
- name = child.attrib['name']
- value = child.attrib['value']
- self._mapped_reads[name] = int(value)
- elif tag == ElandLane.MATCH_CODES.lower():
- for child in element:
- name = child.attrib['name']
- value = int(child.attrib['value'])
- self._match_codes[name] = value
- elif tag == ElandLane.READS.lower():
- self._reads = int(element.text)
- else:
- logging.warn("ElandLane unrecognized tag %s" % (element.tag,))
-
-def extract_eland_sequence(instream, outstream, start, end):
+class LaneParametersGA(LaneParameters):
"""
- Extract a chunk of sequence out of an eland file
+ Make it easy to access elements of LaneSpecificRunParameters from python
"""
- for line in instream:
- record = line.split()
- if len(record) > 1:
- result = [record[0], record[1][start:end]]
- else:
- result = [record[0][start:end]]
- outstream.write("\t".join(result))
- outstream.write(os.linesep)
+ def __init__(self, gerald, lane_id):
+ super(LaneParametersGA, self).__init__(gerald, lane_id)
-class ELAND(object):
+ def __get_attribute(self, xml_tag):
+ subtree = self._gerald.tree.find('LaneSpecificRunParameters')
+ container = subtree.find(xml_tag)
+ if container is None:
+ return None
+ if len(container.getchildren()) > LANES_PER_FLOWCELL:
+ raise RuntimeError('GERALD config.xml file changed')
+ lanes = [x.tag.split('_')[1] for x in container.getchildren()]
+ try:
+ index = lanes.index(self._lane_id)
+ except ValueError, e:
+ return None
+ element = container[index]
+ return element.text
+ def _get_analysis(self):
+ return self.__get_attribute('ANALYSIS')
+ analysis = property(_get_analysis)
+
+ def _get_eland_genome(self):
+ genome = self.__get_attribute('ELAND_GENOME')
+ # default to the chipwide parameters if there isn't an
+ # entry in the lane specific paramaters
+ if genome is None:
+ genome = self._gerald._get_chip_attribute('ELAND_GENOME')
+ # ignore flag value
+ if genome == 'Need_to_specify_ELAND_genome_directory':
+ genome = None
+ return genome
+ eland_genome = property(_get_eland_genome)
+
+ def _get_read_length(self):
+ read_length = self.__get_attribute('READ_LENGTH')
+ if read_length is None:
+ read_length = self._gerald._get_chip_attribute('READ_LENGTH')
+ return read_length
+ read_length = property(_get_read_length)
+
+ def _get_use_bases(self):
+ return self.__get_attribute('USE_BASES')
+ use_bases = property(_get_use_bases)
+
+
+class LaneParametersHiSeq(LaneParameters):
"""
- Summarize information from eland files
+ Make it easy to access elements of LaneSpecificRunParameters from python
"""
- XML_VERSION = 1
+ def __init__(self, gerald, lane_id, element):
+ super(LaneParametersHiSeq, self).__init__(gerald, lane_id)
+ self.element = element
- ELAND = 'ElandCollection'
- LANE = 'Lane'
- LANE_ID = 'id'
+ def __get_attribute(self, xml_tag):
+ container = self.element.find(xml_tag)
+ if container is None:
+ return None
+ return container.text
+
+ def _get_analysis(self):
+ return self.__get_attribute('ANALYSIS')
+ analysis = property(_get_analysis)
+
+ def _get_eland_genome(self):
+ genome = self.__get_attribute('ELAND_GENOME')
+ # default to the chipwide parameters if there isn't an
+ # entry in the lane specific paramaters
+ if genome is None:
+ genome = self._gerald._get_chip_attribute('ELAND_GENOME')
+ # ignore flag value
+ if genome == 'Need_to_specify_ELAND_genome_directory':
+ genome = None
+ return genome
+ eland_genome = property(_get_eland_genome)
+
+ def _get_read_length(self):
+ return self.__get_attribute('READ_LENGTH1')
+ read_length = property(_get_read_length)
+
+ def _get_use_bases(self):
+ return self.__get_attribute('USE_BASES1')
+ use_bases = property(_get_use_bases)
+
+class LaneSpecificRunParameters(collections.MutableMapping):
+ """
+ Provide access to LaneSpecificRunParameters
+ """
+ def __init__(self, gerald):
+ self._gerald = gerald
+ self._lanes = None
- def __init__(self, xml=None):
- # we need information from the gerald config.xml
- self.results = {}
+ def _initialize_lanes(self):
+ """
+ build dictionary of LaneParameters
+ """
+ self._lanes = {}
+ tree = self._gerald.tree
+ analysis = tree.find('LaneSpecificRunParameters/ANALYSIS')
+ if analysis is not None:
+ self._extract_ga_analysis_type(analysis)
+ analysis = tree.find('Projects')
+ if analysis is not None:
+ self._extract_hiseq_analysis_type(analysis)
+
+ def _extract_ga_analysis_type(self, analysis):
+ # according to the pipeline specs I think their fields
+ # are sampleName_laneID, with sampleName defaulting to s
+ # since laneIDs are constant lets just try using
+ # those consistently.
+ for element in analysis:
+ sample, lane_id = element.tag.split('_')
+ key = SampleKey(lane=int(lane_id), sample=sample)
+ self._lanes[key] = LaneParametersGA(
+ self._gerald, lane_id)
+
+ def _extract_hiseq_analysis_type(self, analysis):
+ """Extract from HiSeq style multiplexed analysis types"""
+ for element in analysis:
+ name = element.attrib['name']
+ key = SampleKey(sample=name)
+ self._lanes[key] = LaneParametersHiSeq(self._gerald,
+ name,
+ element)
+
+ def __iter__(self):
+ if self._lanes is None:
+ self._initialize_lanes()
+ return self._lanes.iterkeys()
- if xml is not None:
- self.set_elements(xml)
+ def __getitem__(self, key):
+ if self._lanes is None:
+ self._initialize_lanes()
+ value = self._lanes.get(key, None)
+ if value is not None:
+ return value
+ real_key = self._find_key(key)
+ if real_key is not None:
+ return self._lanes[real_key]
+ raise KeyError("%s not found in %s" % (
+ repr(key),
+ ",".join((repr(k) for k in self._lanes.keys()))))
+
+ def __setitem__(self, key, value):
+ if len(self._lanes) > 100:
+ LOGGER.warn("many projects loaded, consider improving dictionary")
+ real_key = self._find_key(key)
+ if real_key is not None:
+ key = real_key
+ self._lanes[key] = value
+
+ def __delitem__(self, key):
+ if key in self._lanes:
+ del self._lanes[key]
+ else:
+ real_key = self._find_key(key)
+ if real_key is not None:
+ del self._lanes[real_key]
def __len__(self):
- return len(self.results)
-
- def keys(self):
- return self.results.keys()
-
- def values(self):
- return self.results.values()
-
- def items(self):
- return self.results.items()
+ if self._lanes is None:
+ self._initialize_lanes()
+ return len(self._lanes)
+
+ def _find_key(self, lookup_key):
+ if not isinstance(lookup_key, SampleKey):
+ lookup_key = SampleKey(lane=lookup_key)
+
+ results = []
+ for k in self._lanes:
+ if k.matches(lookup_key):
+ results.append(k)
+ if len(results) > 1:
+ errmsg = "Key %s matched multiple keys: %s"
+ raise ValueError(errmsg % (str(lookup_key),
+ ",".join((str(x) for x in results))))
+
+ elif len(results) == 1:
+ return results[0]
+ else:
+ return None
- def __getitem__(self, key):
- return self.results[key]
+def gerald(pathname):
+ LOGGER.info("Parsing gerald config.xml")
+ pathname = os.path.expanduser(pathname)
+ config_pathname = os.path.join(pathname, 'config.xml')
+ config_tree = ElementTree.parse(config_pathname).getroot()
- def get_elements(self):
- root = ElementTree.Element(ELAND.ELAND,
- {'version': unicode(ELAND.XML_VERSION)})
- for lane_id, lane in self.results.items():
- eland_lane = lane.get_elements()
- eland_lane.attrib[ELAND.LANE_ID] = unicode(lane_id)
- root.append(eland_lane)
- return root
+ # parse Summary.htm file
+ summary_xml = os.path.join(pathname, 'Summary.xml')
+ summary_htm = os.path.join(pathname, 'Summary.htm')
+ report_summary = os.path.join(pathname, '..', 'Data',
+ 'reports', 'Summary', )
+ if os.path.exists(summary_xml):
+ g = Gerald(pathname = pathname, tree=config_tree)
+ LOGGER.info("Parsing Summary.xml")
+ g.summary = SummaryGA(summary_xml)
+ g.eland_results = eland(g.pathname, g)
+ elif os.path.exists(summary_htm):
+ g = Gerald(pathname=pathname, tree=config_tree)
+ LOGGER.info("Parsing Summary.htm")
+ g.summary = SummaryGA(summary_htm)
+ g.eland_results = eland(g.pathname, g)
+ elif os.path.isdir(report_summary):
+ g = CASAVA(pathname=pathname, tree=config_tree)
+ LOGGER.info("Parsing %s" % (report_summary,))
+ g.summary = SummaryHiSeq(report_summary)
+ g.eland_results = eland(g.pathname, g)
- def set_elements(self, tree):
- if tree.tag.lower() != ELAND.ELAND.lower():
- raise ValueError('Expecting %s', ELAND.ELAND)
- for element in list(tree):
- lane_id = element.attrib[ELAND.LANE_ID]
- lane = ElandLane(xml=element)
- self.results[lane_id] = lane
-
-def eland(basedir, gerald=None, genome_maps=None):
- e = ELAND()
-
- file_list = glob(os.path.join(basedir, "*_eland_result.txt"))
- if len(file_list) == 0:
- # lets handle compressed eland files too
- file_list = glob(os.path.join(basedir, "*_eland_result.txt.bz2"))
-
- for pathname in file_list:
- # yes the lane_id is also being computed in ElandLane._update
- # I didn't want to clutter up my constructor
- # but I needed to persist the sample_name/lane_id for
- # runfolder summary_report
- path, name = os.path.split(pathname)
- logging.info("Adding eland file %s" %(name,))
- split_name = name.split('_')
- lane_id = split_name[1]
-
- if genome_maps is not None:
- genome_map = genome_maps[lane_id]
- elif gerald is not None:
- genome_dir = gerald.lanes[lane_id].eland_genome
- genome_map = build_genome_fasta_map(genome_dir)
- else:
- genome_map = {}
+ # parse eland files
+ return g
- eland_result = ElandLane(pathname, genome_map)
- e.results[lane_id] = eland_result
- return e
+if __name__ == "__main__":
+ # quick test code
+ import sys
+ g = gerald(sys.argv[1])
+ #ElementTree.dump(g.get_elements())