X-Git-Url: http://woldlab.caltech.edu/gitweb/?a=blobdiff_plain;f=htsworkflow%2Fpipelines%2Ftest%2Ftest_runfolder_ipar130.py;h=366f588b8b4472d5248a23ca777195ae656625d2;hb=d078bd2653d1d702d54a0ff12f91ef2bd85d8d0e;hp=46a26f01892983a8f3fc2adc4177323f75433335;hpb=fa0043bebbe796e9ae1b7551a8abebf3068c6e7d;p=htsworkflow.git diff --git a/htsworkflow/pipelines/test/test_runfolder_ipar130.py b/htsworkflow/pipelines/test/test_runfolder_ipar130.py index 46a26f0..366f588 100644 --- a/htsworkflow/pipelines/test/test_runfolder_ipar130.py +++ b/htsworkflow/pipelines/test/test_runfolder_ipar130.py @@ -4,14 +4,14 @@ from datetime import datetime, date import os import tempfile import shutil -import unittest +from unittest import TestCase from htsworkflow.pipelines import eland from htsworkflow.pipelines import ipar from htsworkflow.pipelines import bustard from htsworkflow.pipelines import gerald from htsworkflow.pipelines import runfolder -from htsworkflow.pipelines.runfolder import ElementTree +from htsworkflow.pipelines import ElementTree from htsworkflow.pipelines.test.simulate_runfolder import * @@ -56,7 +56,7 @@ def make_runfolder(obj=None): obj.gerald_dir = gerald_dir -class RunfolderTests(unittest.TestCase): +class RunfolderTests(TestCase): """ Test components of the runfolder processing code which includes firecrest, bustard, and gerald @@ -73,6 +73,7 @@ class RunfolderTests(unittest.TestCase): Construct a firecrest object """ i = ipar.ipar(self.image_analysis_dir) + self.failUnlessEqual(i.software, 'IPAR') self.failUnlessEqual(i.version, '2.01.192.0') self.failUnlessEqual(i.start, 1) self.failUnlessEqual(i.stop, 37) @@ -82,6 +83,7 @@ class RunfolderTests(unittest.TestCase): xml_str = ElementTree.tostring(xml) i2 = ipar.IPAR(xml=xml) + self.failUnlessEqual(i.software, i2.software) self.failUnlessEqual(i.version, i2.version) self.failUnlessEqual(i.start, i2.start) self.failUnlessEqual(i.stop, i2.stop) @@ -114,6 +116,7 @@ class RunfolderTests(unittest.TestCase): self.failUnlessAlmostEqual(crosstalk.base['G'][3], -0.02) b = bustard.bustard(self.bustard_dir) + self.failUnlessEqual(b.software, 'Bustard') self.failUnlessEqual(b.version, '1.3.2') self.failUnlessEqual(b.date, date(2008,3,15)) self.failUnlessEqual(b.user, 'diane') @@ -124,6 +127,7 @@ class RunfolderTests(unittest.TestCase): xml = b.get_elements() b2 = bustard.Bustard(xml=xml) + self.failUnlessEqual(b.software, b2.software) self.failUnlessEqual(b.version, b2.version) self.failUnlessEqual(b.date, b2.date ) self.failUnlessEqual(b.user, b2.user) @@ -141,6 +145,7 @@ class RunfolderTests(unittest.TestCase): # need to update gerald and make tests for it g = gerald.gerald(self.gerald_dir) + self.failUnlessEqual(g.software, 'GERALD') self.failUnlessEqual(g.version, '1.171') self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59)) self.failUnlessEqual(len(g.lanes), len(g.lanes.keys())) @@ -194,6 +199,7 @@ class RunfolderTests(unittest.TestCase): g2 = gerald.Gerald(xml=xml) # do it all again after extracting from the xml file + self.failUnlessEqual(g.software, g2.software) self.failUnlessEqual(g.version, g2.version) self.failUnlessEqual(g.date, g2.date) self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys())) @@ -218,9 +224,9 @@ class RunfolderTests(unittest.TestCase): g_eland = g.eland_results g2_eland = g2.eland_results - for lane in g_eland.results[0].keys(): - g_results = g_eland.results[0][lane] - g2_results = g2_eland.results[0][lane] + for key in g_eland: + g_results = g_eland[key] + g2_results = g2_eland[key] self.failUnlessEqual(g_results.reads, g2_results.reads) if isinstance(g_results, eland.ElandLane): @@ -249,44 +255,43 @@ class RunfolderTests(unittest.TestCase): eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps) # I added sequence lanes to the last 2 lanes of this test case - for i in range(1,7): - lane = eland_container.results[0][i] - self.failUnlessEqual(lane.reads, 6) - self.failUnlessEqual(lane.sample_name, "s") - self.failUnlessEqual(lane.lane_id, i) - self.failUnlessEqual(len(lane.mapped_reads), 17) - self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4) - self.failUnlessEqual(lane.match_codes['U0'], 3) - self.failUnlessEqual(lane.match_codes['R0'], 2) - self.failUnlessEqual(lane.match_codes['U1'], 1) - self.failUnlessEqual(lane.match_codes['R1'], 9) - self.failUnlessEqual(lane.match_codes['U2'], 0) - self.failUnlessEqual(lane.match_codes['R2'], 12) - self.failUnlessEqual(lane.match_codes['NM'], 1) - self.failUnlessEqual(lane.match_codes['QC'], 0) - - # test scarf - lane = eland_container.results[0][7] - self.failUnlessEqual(lane.reads, 5) - self.failUnlessEqual(lane.sample_name, 's') - self.failUnlessEqual(lane.lane_id, 7) - self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE) - - # test fastq - lane = eland_container.results[0][8] - self.failUnlessEqual(lane.reads, 3) - self.failUnlessEqual(lane.sample_name, 's') - self.failUnlessEqual(lane.lane_id, 8) - self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE) + for key in eland_container: + lane = eland_container[key] + if key.lane in [1,2,3,4,5,6]: + self.failUnlessEqual(lane.reads, 6) + self.failUnlessEqual(lane.sample_name, "s") + self.failUnlessEqual(lane.lane_id, key.lane) + self.failUnlessEqual(len(lane.mapped_reads), 17) + self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4) + self.failUnlessEqual(lane.match_codes['U0'], 3) + self.failUnlessEqual(lane.match_codes['R0'], 2) + self.failUnlessEqual(lane.match_codes['U1'], 1) + self.failUnlessEqual(lane.match_codes['R1'], 9) + self.failUnlessEqual(lane.match_codes['U2'], 0) + self.failUnlessEqual(lane.match_codes['R2'], 12) + self.failUnlessEqual(lane.match_codes['NM'], 1) + self.failUnlessEqual(lane.match_codes['QC'], 0) + elif key.lane == 7: + self.failUnlessEqual(lane.reads, 5) + self.failUnlessEqual(lane.sample_name, 's') + self.failUnlessEqual(lane.lane_id, 7) + self.failUnlessEqual(lane.sequence_type, + eland.SequenceLane.SCARF_TYPE) + elif key.lane == 8: + self.failUnlessEqual(lane.reads, 3) + self.failUnlessEqual(lane.sample_name, 's') + self.failUnlessEqual(lane.lane_id, 8) + self.failUnlessEqual(lane.sequence_type, + eland.SequenceLane.FASTQ_TYPE) xml = eland_container.get_elements() # just make sure that element tree can serialize the tree xml_str = ElementTree.tostring(xml) e2 = gerald.ELAND(xml=xml) - for i in range(1,9): - l1 = eland_container.results[0][i] - l2 = e2.results[0][i] + for key in eland_container: + l1 = eland_container[key] + l2 = e2[key] self.failUnlessEqual(l1.reads, l2.reads) self.failUnlessEqual(l1.sample_name, l2.sample_name) self.failUnlessEqual(l1.lane_id, l2.lane_id) @@ -311,29 +316,33 @@ class RunfolderTests(unittest.TestCase): # do we get the flowcell id from the filename? self.failUnlessEqual(len(runs), 1) name = 'run_3021JAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),) - self.failUnlessEqual(runs[0].name, name) + self.failUnlessEqual(runs[0].serialization_filename, name) # do we get the flowcell id from the FlowcellId.xml file make_flowcell_id(self.runfolder_dir, '207BTAAXY') runs = runfolder.get_runs(self.runfolder_dir) self.failUnlessEqual(len(runs), 1) name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),) - self.failUnlessEqual(runs[0].name, name) + self.failUnlessEqual(runs[0].serialization_filename, name) r1 = runs[0] xml = r1.get_elements() xml_str = ElementTree.tostring(xml) r2 = runfolder.PipelineRun(xml=xml) - self.failUnlessEqual(r1.name, r2.name) + self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename) self.failIfEqual(r2.image_analysis, None) self.failIfEqual(r2.bustard, None) self.failIfEqual(r2.gerald, None) def suite(): - return unittest.makeSuite(RunfolderTests,'test') + from unittest import TestSuite, defaultTestLoader + suite = TestSuite() + suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests)) + return suite -if __name__ == "__main__": - unittest.main(defaultTest="suite") +if __name__ == "__main__": + from unittest import main + main(defaultTest="suite")