From 8795797def26e8ac23c957dcf018447ecc88a6ff Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Thu, 2 Aug 2012 14:52:10 -0700 Subject: [PATCH] Test more of the sequences class. (And fix the bugs discovered with better test coverage) --- htsworkflow/pipelines/sequences.py | 9 +- htsworkflow/pipelines/test/test_sequences.py | 357 +++++++++++++------ 2 files changed, 258 insertions(+), 108 deletions(-) diff --git a/htsworkflow/pipelines/sequences.py b/htsworkflow/pipelines/sequences.py index 772af7b..0e5612a 100644 --- a/htsworkflow/pipelines/sequences.py +++ b/htsworkflow/pipelines/sequences.py @@ -76,7 +76,7 @@ class SequenceFile(object): def key(self): return (self.flowcell, self.lane, self.read, self.project, self.split) - def unicode(self): + def __unicode__(self): return unicode(self.path) def __eq__(self, other): @@ -98,6 +98,9 @@ class SequenceFile(object): return True + def __ne__(self, other): + return not self == other + def __repr__(self): return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path) @@ -255,8 +258,8 @@ def parse_fastq_pf_flag(records): elif fastq_type.startswith('all'): pf = None else: - raise ValueError("Unrecognized fastq name %s at %s" % \ - (records[-1], os.path.join(path,filename))) + raise ValueError("Unrecognized fastq name: %s" % ( + "_".join(records),)) return pf diff --git a/htsworkflow/pipelines/test/test_sequences.py b/htsworkflow/pipelines/test/test_sequences.py index e00f5ec..9c85f39 100644 --- a/htsworkflow/pipelines/test/test_sequences.py +++ b/htsworkflow/pipelines/test/test_sequences.py @@ -1,5 +1,7 @@ #!/usr/bin/env python import os +import shutil +import tempfile import unittest from htsworkflow.pipelines import sequences @@ -23,7 +25,7 @@ class SequenceFileTests(unittest.TestCase): for t in tests: path = sequences.get_flowcell_cycle(t[0]) - self.failUnlessEqual(path, t[1]) + self.assertEqual(path, t[1]) def test_flowcell_cycle(self): """ @@ -32,13 +34,13 @@ class SequenceFileTests(unittest.TestCase): path = '/root/42BW9AAXX/C1-152' flowcell, start, stop, project = sequences.get_flowcell_cycle(path) - self.failUnlessEqual(flowcell, '42BW9AAXX') - self.failUnlessEqual(start, 1) - self.failUnlessEqual(stop, 152) - self.failUnlessEqual(project, None) + self.assertEqual(flowcell, '42BW9AAXX') + self.assertEqual(start, 1) + self.assertEqual(stop, 152) + self.assertEqual(project, None) path = '/root/42BW9AAXX/other' - self.failUnlessRaises(ValueError, sequences.get_flowcell_cycle, path) + self.assertRaises(ValueError, sequences.get_flowcell_cycle, path) def test_flowcell_project_cycle(self): """ @@ -47,110 +49,203 @@ class SequenceFileTests(unittest.TestCase): path = '/root/42BW9AAXX/C1-152/Project_12345_Index1' flowcell, start, stop, project = sequences.get_flowcell_cycle(path) - self.failUnlessEqual(flowcell, '42BW9AAXX') - self.failUnlessEqual(start, 1) - self.failUnlessEqual(stop, 152) - self.failUnlessEqual(project, 'Project_12345_Index1') + self.assertEqual(flowcell, '42BW9AAXX') + self.assertEqual(start, 1) + self.assertEqual(stop, 152) + self.assertEqual(project, 'Project_12345_Index1') path = '/root/42BW9AAXX/other' - self.failUnlessRaises(ValueError, sequences.get_flowcell_cycle, path) + self.assertRaises(ValueError, sequences.get_flowcell_cycle, path) def test_srf(self): path = '/root/42BW9AAXX/C1-38' name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_4.srf' + other = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_5.srf' pathname = os.path.join(path,name) - f = sequences.parse_srf(path, name) + f0 = sequences.parse_srf(path, name) + f1 = sequences.parse_srf(path, name) + fother = sequences.parse_srf(path, other) + + self.assertEqual(f0.filetype, 'srf') + self.assertEqual(f0.path, pathname) + self.assertEqual(unicode(f0), unicode(pathname)) + self.assertEqual(repr(f0), "" % (pathname,)) + self.assertEqual(f0.flowcell, '42BW9AAXX') + self.assertEqual(f0.lane, 4) + self.assertEqual(f0.read, None) + self.assertEqual(f0.pf, None) + self.assertEqual(f0.cycle, 38) + self.assertEqual(f0.make_target_name('/tmp'), + os.path.join('/tmp', name)) + + self.assertEqual(f0, f1) + self.assertNotEqual(f0, fother) - self.failUnlessEqual(f.filetype, 'srf') - self.failUnlessEqual(f.path, pathname) - self.failUnlessEqual(f.flowcell, '42BW9AAXX') - self.failUnlessEqual(f.lane, 4) - self.failUnlessEqual(f.read, None) - self.failUnlessEqual(f.pf, None) - self.failUnlessEqual(f.cycle, 38) def test_qseq(self): path = '/root/42BW9AAXX/C1-36' name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l4_r1.tar.bz2' + other = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l5_r1.tar.bz2' pathname = os.path.join(path,name) - f = sequences.parse_qseq(path, name) - - self.failUnlessEqual(f.filetype, 'qseq') - self.failUnlessEqual(f.path, pathname) - self.failUnlessEqual(f.flowcell, '42BW9AAXX') - self.failUnlessEqual(f.lane, 4) - self.failUnlessEqual(f.read, 1) - self.failUnlessEqual(f.pf, None) - self.failUnlessEqual(f.cycle, 36) - + f0 = sequences.parse_qseq(path, name) + f1 = sequences.parse_qseq(path, name) + fother = sequences.parse_qseq(path, other) + + self.assertEqual(f0.filetype, 'qseq') + self.assertEqual(f0.path, pathname) + self.assertEqual(unicode(f0), unicode(pathname)) + self.assertEqual(repr(f0), "" %(pathname,)) + self.assertEqual(f0.flowcell, '42BW9AAXX') + self.assertEqual(f0.lane, 4) + self.assertEqual(f0.read, 1) + self.assertEqual(f0.pf, None) + self.assertEqual(f0.cycle, 36) + self.assertEqual(f0.make_target_name('/tmp'), + os.path.join('/tmp', name)) + + self.assertEqual(f0, f1) + self.assertNotEqual(f0, fother) path = '/root/ilmn200901/C1-202' name = 'woldlab_090125_HWI-EAS_0000_ilmn200901_l1_r1.tar.bz2' + other = 'woldlab_090125_HWI-EAS_0000_ilmn200901_l1_r2.tar.bz2' pathname = os.path.join(path, name) - f = sequences.parse_qseq(path, name) - - self.failUnlessEqual(f.filetype, 'qseq') - self.failUnlessEqual(f.path, pathname) - self.failUnlessEqual(f.lane, 1) - self.failUnlessEqual(f.read, 1) - self.failUnlessEqual(f.pf, None) - self.failUnlessEqual(f.cycle, 202) + f0 = sequences.parse_qseq(path, name) + f1 = sequences.parse_qseq(path, name) + fother = sequences.parse_qseq(path, other) + + self.assertEqual(f0.filetype, 'qseq') + self.assertEqual(f0.path, pathname) + self.assertEqual(unicode(f0), unicode(pathname)) + self.assertEqual(repr(f0), "" %(pathname,)) + self.assertEqual(f0.lane, 1) + self.assertEqual(f0.read, 1) + self.assertEqual(f0.pf, None) + self.assertEqual(f0.cycle, 202) + self.assertEqual(f0.make_target_name('/tmp'), + os.path.join('/tmp', name)) + + self.assertEqual(f0, f1) + self.assertNotEqual(f0, fother) def test_fastq(self): path = '/root/42BW9AAXX/C1-38' name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l4_r1_pass.fastq.bz2' + other = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l5_r1_pass.fastq.bz2' pathname = os.path.join(path,name) - f = sequences.parse_fastq(path, name) - - self.failUnlessEqual(f.filetype, 'fastq') - self.failUnlessEqual(f.path, pathname) - self.failUnlessEqual(f.flowcell, '42BW9AAXX') - self.failUnlessEqual(f.lane, 4) - self.failUnlessEqual(f.read, 1) - self.failUnlessEqual(f.pf, True) - self.failUnlessEqual(f.cycle, 38) + f0 = sequences.parse_fastq(path, name) + f1 = sequences.parse_fastq(path, name) + fother = sequences.parse_fastq(path, other) + + self.assertEqual(f0.filetype, 'fastq') + self.assertEqual(f0.path, pathname) + self.assertEqual(unicode(f0), unicode(pathname)) + self.assertEqual(repr(f0), "" % (pathname,)) + self.assertEqual(f0.flowcell, '42BW9AAXX') + self.assertEqual(f0.lane, 4) + self.assertEqual(f0.read, 1) + self.assertEqual(f0.pf, True) + self.assertEqual(f0.cycle, 38) + self.assertEqual(f0.make_target_name('/tmp'), + os.path.join('/tmp', name)) + + self.assertEqual(f0, f1) + self.assertNotEqual(f0, fother) name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l4_r2_nopass.fastq.bz2' + other = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r2_nopass.fastq.bz2' pathname = os.path.join(path,name) - f = sequences.parse_fastq(path, name) - - self.failUnlessEqual(f.filetype, 'fastq') - self.failUnlessEqual(f.path, pathname) - self.failUnlessEqual(f.flowcell, '42BW9AAXX') - self.failUnlessEqual(f.lane, 4) - self.failUnlessEqual(f.read, 2) - self.failUnlessEqual(f.pf, False) - self.failUnlessEqual(f.cycle, 38) + f0 = sequences.parse_fastq(path, name) + f1 = sequences.parse_fastq(path, name) + fother = sequences.parse_fastq(path, other) + + self.assertEqual(f0.filetype, 'fastq') + self.assertEqual(f0.path, pathname) + self.assertEqual(unicode(f0), unicode(pathname)) + self.assertEqual(repr(f0), "" %(pathname,)) + self.assertEqual(f0.flowcell, '42BW9AAXX') + self.assertEqual(f0.lane, 4) + self.assertEqual(f0.read, 2) + self.assertEqual(f0.pf, False) + self.assertEqual(f0.cycle, 38) + self.assertEqual(f0.make_target_name('/tmp'), + os.path.join('/tmp', name)) + + self.assertEqual(f0, f1) + self.assertNotEqual(f0, fother) def test_project_fastq(self): path = '/root/42BW9AAXX/C1-38/Project_12345' name = '11111_NoIndex_L001_R1_001.fastq.gz' + other = '22222_NoIndex_L001_R1_001.fastq.gz' pathname = os.path.join(path,name) - f = sequences.parse_fastq(path, name) - - self.failUnlessEqual(f.filetype, 'split_fastq') - self.failUnlessEqual(f.path, pathname) - self.failUnlessEqual(f.flowcell, '42BW9AAXX') - self.failUnlessEqual(f.lane, 1) - self.failUnlessEqual(f.read, 1) - self.failUnlessEqual(f.pf, True) - self.failUnlessEqual(f.project, '11111') - self.failUnlessEqual(f.index, 'NoIndex') - self.failUnlessEqual(f.cycle, 38) + f0 = sequences.parse_fastq(path, name) + f1 = sequences.parse_fastq(path, name) + fother = sequences.parse_fastq(path, other) + + self.assertEqual(f0.filetype, 'split_fastq') + self.assertEqual(f0.path, pathname) + self.assertEqual(unicode(f0), unicode(pathname)) + self.assertEqual(repr(f0), "" %(pathname,)) + self.assertEqual(f0.flowcell, '42BW9AAXX') + self.assertEqual(f0.lane, 1) + self.assertEqual(f0.read, 1) + self.assertEqual(f0.pf, True) + self.assertEqual(f0.project, '11111') + self.assertEqual(f0.index, 'NoIndex') + self.assertEqual(f0.cycle, 38) + self.assertEqual(f0.make_target_name('/tmp'), + os.path.join('/tmp', name)) + + self.assertEqual(f0, f1) + self.assertNotEqual(f0, fother) name = '11112_AAATTT_L001_R2_003.fastq.gz' + other = '11112_AAATTT_L002_R2_003.fastq.gz' pathname = os.path.join(path,name) - f = sequences.parse_fastq(path, name) - - self.failUnlessEqual(f.filetype, 'split_fastq') - self.failUnlessEqual(f.path, pathname) - self.failUnlessEqual(f.flowcell, '42BW9AAXX') - self.failUnlessEqual(f.lane, 1) - self.failUnlessEqual(f.read, 2) - self.failUnlessEqual(f.pf, True) - self.failUnlessEqual(f.project, '11112') - self.failUnlessEqual(f.index, 'AAATTT') - self.failUnlessEqual(f.cycle, 38) + f0 = sequences.parse_fastq(path, name) + f1 = sequences.parse_fastq(path, name) + fother = sequences.parse_fastq(path, other) + + self.assertEqual(f0.filetype, 'split_fastq') + self.assertEqual(f0.path, pathname) + self.assertEqual(unicode(f0), unicode(pathname)) + self.assertEqual(repr(f0), "" % (pathname,)) + self.assertEqual(f0.flowcell, '42BW9AAXX') + self.assertEqual(f0.lane, 1) + self.assertEqual(f0.read, 2) + self.assertEqual(f0.pf, True) + self.assertEqual(f0.project, '11112') + self.assertEqual(f0.index, 'AAATTT') + self.assertEqual(f0.cycle, 38) + self.assertEqual(f0.make_target_name('/tmp'), + os.path.join('/tmp', name)) + + self.assertEqual(f0, f1) + self.assertNotEqual(f0, fother) + + def test_parse_fastq_pf_flag(self): + other = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r2_nopass.fastq.bz2' + data = ['woldlab', '090622', 'HWI-EAS229', '0120', '42BW9AAXX', + 'l1', 'r2', 'nopass'] + self.assertEqual(sequences.parse_fastq_pf_flag(data), False) + + data = ['woldlab', '090622', 'HWI-EAS229', '0120', '42BW9AAXX', + 'l1', 'r2', 'pass'] + self.assertEqual(sequences.parse_fastq_pf_flag(data), True) + + data = ['woldlab', '090622', 'HWI-EAS229', '0120', '42BW9AAXX', + 'l1', 'r2', 'all'] + self.assertEqual(sequences.parse_fastq_pf_flag(data), None) + + data = ['woldlab', '090622', 'HWI-EAS229', '0120', '42BW9AAXX', + 'l1', 'r2'] + self.assertEqual(sequences.parse_fastq_pf_flag(data), None) + + data = ['woldlab', '090622', 'HWI-EAS229', '0120', '42BW9AAXX', + 'l1', 'r2', 'all', 'newthing'] + self.assertRaises(ValueError, sequences.parse_fastq_pf_flag, data) + def test_project_fastq_hashing(self): """Can we tell the difference between sequence files? @@ -164,9 +259,9 @@ class SequenceFileTests(unittest.TestCase): for a_name, b_name in names: a = sequences.parse_fastq(path, a_name) b = sequences.parse_fastq(path, b_name) - self.failIfEqual(a, b) - self.failIfEqual(a.key(), b.key()) - self.failIfEqual(hash(a), hash(b)) + self.assertNotEqual(a, b) + self.assertNotEqual(a.key(), b.key()) + self.assertNotEqual(hash(a), hash(b)) def test_eland(self): path = '/root/42BW9AAXX/C1-38' @@ -174,35 +269,30 @@ class SequenceFileTests(unittest.TestCase): pathname = os.path.join(path,name) f = sequences.parse_eland(path, name) - self.failUnlessEqual(f.filetype, 'eland') - self.failUnlessEqual(f.path, pathname) - self.failUnlessEqual(f.flowcell, '42BW9AAXX') - self.failUnlessEqual(f.lane, 4) - self.failUnlessEqual(f.read, None) - self.failUnlessEqual(f.pf, None) - self.failUnlessEqual(f.cycle, 38) + self.assertEqual(f.filetype, 'eland') + self.assertEqual(f.path, pathname) + self.assertEqual(f.flowcell, '42BW9AAXX') + self.assertEqual(f.lane, 4) + self.assertEqual(f.read, None) + self.assertEqual(f.pf, None) + self.assertEqual(f.cycle, 38) + self.assertEqual(f.make_target_name('/tmp'), + '/tmp/42BW9AAXX_38_s_4_eland_extended.txt.bz2') path = '/root/42BW9AAXX/C1-152' name = 's_4_1_eland_extended.txt.bz2' pathname = os.path.join(path,name) f = sequences.parse_eland(path, name) - self.failUnlessEqual(f.filetype, 'eland') - self.failUnlessEqual(f.path, pathname) - self.failUnlessEqual(f.flowcell, '42BW9AAXX') - self.failUnlessEqual(f.lane, 4) - self.failUnlessEqual(f.read, 1) - self.failUnlessEqual(f.pf, None) - self.failUnlessEqual(f.cycle, 152) - - def test_sequence_file_equality(self): - path = '/root/42BW9AAXX/C1-38' - name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l4_r1.tar.bz2' - - f1_qseq = sequences.parse_qseq(path, name) - f2_qseq = sequences.parse_qseq(path, name) - - self.failUnlessEqual(f1_qseq, f2_qseq) + self.assertEqual(f.filetype, 'eland') + self.assertEqual(f.path, pathname) + self.assertEqual(f.flowcell, '42BW9AAXX') + self.assertEqual(f.lane, 4) + self.assertEqual(f.read, 1) + self.assertEqual(f.pf, None) + self.assertEqual(f.cycle, 152) + self.assertEqual(f.make_target_name('/tmp'), + '/tmp/42BW9AAXX_152_s_4_1_eland_extended.txt.bz2') def test_sql(self): """ @@ -228,8 +318,65 @@ class SequenceFileTests(unittest.TestCase): count = c.execute("select count(*) from sequences") row = count.fetchone() - self.failUnlessEqual(row[0], 4) - + self.assertEqual(row[0], 4) + + def test_scan_for_sequences(self): + # simulate tree + seen = set() + should_see = set(['fastq', 'srf', 'eland', 'qseq']) + with SimulateTree() as tree: + seqs = sequences.scan_for_sequences([tree.root, '/a/b/c/98345']) + for s in seqs: + self.assertEquals(s.flowcell, '42BW9AAXX') + self.assertEquals(s.cycle, 33) + seen.add(s.filetype) + + self.assertEquals(len(seqs), 8) + + self.assertRaises(ValueError, sequences.scan_for_sequences, '/tmp') + self.assertEqual(len(should_see.difference(seen)), 0) + +class SimulateTree(object): + def __init__(self): + self.root = tempfile.mkdtemp(prefix='sequences_') + + fc = self.mkflowcell(self.root, '42BW9AAXX', 'C1-33') + files = [ + 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r1.tar.bz2', + 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r2.tar.bz2', + 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r1.tar.bz2.md5', + 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r2.tar.bz2.md5', + 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_2.srf', + 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l3_r1_pass.fastq.bz2', + 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l3_r2_pass.fastq.bz2', + 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l3_r1_nopass.fastq.bz2', + 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l3_r2_nopass.fastq.bz2', + 's_1_eland_extended.txt.bz2', + 's_1_eland_extended.txt.bz2.md5', + ] + for f in files: + self.mkfile(fc, f) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + shutil.rmtree(self.root) + + def mkflowcell(self, *components): + head = self.root + for c in components: + head = os.path.join(head, c) + if not os.path.exists(head): + os.mkdir(head) + return head + + def mkfile(self, flowcell, filename): + pathname = os.path.join(flowcell, filename) + stream = open(pathname,'w') + stream.write(pathname) + stream.write(os.linesep) + stream.close() def suite(): return unittest.makeSuite(SequenceFileTests,'test') -- 2.30.2