Analyze the Summary.htm file produced by GERALD
"""
import logging
+import re
import types
from pprint import pprint
-from htsworkflow.pipelines.runfolder import ElementTree
+#from htsworkflow.pipelines.runfolder import ElementTree
+from lxml import html
+from lxml import etree
from htsworkflow.util.ethelp import indent, flatten
LOGGER = logging.getLogger(__name__)
self.percent_error_rate = None
if html is not None:
- self.set_elements_from_html(html)
+ self.set_elements_from_source(html)
if xml is not None:
self.set_elements(xml)
- def set_elements_from_html(self, data):
+ def set_elements_from_source(self, data):
+ """Read from an initial summary data file. Either xml or html
+ """
if not len(data) in (8,10):
raise RuntimeError("Summary.htm file format changed, len(data)=%d" % (len(data),))
setattr(self, LRSName, parse_xml_mean_range(node))
def get_elements(self):
- lane_result = ElementTree.Element(
+ lane_result = etree.Element(
Summary.LaneResultSummary.LANE_RESULT_SUMMARY,
{'lane': unicode(self.lane), 'end': unicode(self.end)})
for tag, variable_name in Summary.LaneResultSummary.TAGS.items():
*value
)
else:
- element = ElementTree.SubElement(lane_result, tag)
+ element = etree.SubElement(lane_result, tag)
element.text = unicode(value)
return lane_result
The contents of the h2 tag is considered to the name
of the table.
"""
- # tree = ElementTree.parse(pathname).getroot()
+ if isxml_file(pathname):
+ tree = etree.parse(pathname).getroot()
+ else:
+ # html
+ tree = html.parse(pathname).getroot()
+ # tree = etree.parse(pathname).getroot()
# hack for 1.1rc1, this should be removed when possible.
- file_body = open(pathname).read()
- file_body = file_body.replace('CHASTITY<=', 'CHASTITY<=')
- tree = ElementTree.fromstring(file_body)
+ #file_body = open(pathname).read()
+ #file_body = file_body.replace('CHASTITY<=', 'CHASTITY<=')
+ #tree = etree.fromstring(file_body)
# are we reading the xml or the html version of the Summary file?
if tree.tag.lower() == 'summary':
###### START HTML Table Extraction ########
def _extract_named_tables_from_html(self, tree):
- body = tree.find('body')
tables = {}
- for i in range(len(body)):
- if body[i].tag == 'h2' and body[i+1].tag == 'table':
- # we have an interesting table
- name = flatten(body[i])
- table = body[i+1]
- data = self._parse_table(table)
+ for t in tree.findall('*//table'):
+ previous = t.getprevious()
+ if previous is None:
+ previous = t.getparent()
+
+ if previous.tag.lower() == 'div':
+ previous = previous.getprevious()
+
+ if previous.tag in ('h2', 'p'):
+ # we have a table
+ name = flatten(previous)
+ data = self._parse_table(t)
tables[name] = data
return tables
###### END HTML Table Extraction ########
def get_elements(self):
- summary = ElementTree.Element(Summary.SUMMARY,
+ summary = etree.Element(Summary.SUMMARY,
{'version': unicode(Summary.XML_VERSION)})
for end in self.lane_results:
for lane in end.values():
Debugging function, report current object
"""
tree = self.get_elements()
- print ElementTree.tostring(tree)
+ print etree.tostring(tree)
def tonumber(v):
"""
def make_mean_range_element(parent, name, mean, deviation):
"""
- Make an ElementTree subelement <Name mean='mean', deviation='deviation'/>
+ Make an etree subelement <Name mean='mean', deviation='deviation'/>
"""
- element = ElementTree.SubElement(parent, name,
+ element = etree.SubElement(parent, name,
{ 'mean': unicode(mean),
'deviation': unicode(deviation)})
return element
return (mean_value, stddev_value)
+def isxml_file(fname):
+ with open(fname,'r') as stream:
+ return isxml_stream(stream)
+
+def isxml_stream(stream):
+ """Return true or false if its sort of an xml file
+ """
+ pos = stream.tell()
+ line = stream.readline()
+ stream.seek(pos)
+ if re.match("<\?xml.*?>", line):
+ # attempt at xml
+ return True
+ else:
+ return False
+
if __name__ == "__main__":
# test code
from optparse import OptionParser
--- /dev/null
+#!/usr/bin/env python
+import os
+from StringIO import StringIO
+import unittest
+
+from htsworkflow.pipelines import summary
+from simulate_runfolder import TESTDATA_DIR
+
+class SummaryTests(unittest.TestCase):
+ """Test elements of the summary file parser
+ """
+ def test_is_xml(self):
+ xmlfile = StringIO('<?xml version="1.0"?>\n<tag>\n')
+ htmlfile = StringIO('<html>\n<head></head>\n<body><p></p></body>\n</html>')
+
+ self.failUnlessEqual(summary.isxml_stream(xmlfile), True)
+ self.failUnlessEqual(xmlfile.tell(), 0)
+
+ self.failUnlessEqual(summary.isxml_stream(htmlfile), False)
+
+ def test_xml_summary_file(self):
+ pathname = os.path.join(TESTDATA_DIR, 'Summary-casava1.7.xml')
+ s = summary.Summary(pathname)
+ self.failUnlessEqual(len(s.lane_results[0]), 8)
+ self.failUnlessEqual(s.lane_results[0][1].cluster, (1073893, 146344))
+
+ def test_html_summary_file(self):
+ pathname = os.path.join(TESTDATA_DIR, 'Summary-ipar130.htm')
+ s = summary.Summary(pathname)
+ self.failUnlessEqual(len(s.lane_results[0]), 8)
+ self.failUnlessEqual(s.lane_results[0][1].cluster, (126910, 4300))
+
+ def test_hiseq_sample_summary_file(self):
+ pathname = os.path.join(TESTDATA_DIR, 'sample_summary_1_12.htm')
+ s = summary.Summary(pathname)
+
+def suite():
+ return unittest.makeSuite(SummaryTests,'test')
+
+if __name__ == "__main__":
+ unittest.main(defaultTest="suite")