2 """More direct synthetic test cases for the eland output file processing
4 from StringIO import StringIO
5 from unittest import TestCase
7 from htsworkflow.pipelines.eland import ELAND, ElandLane, ElandMatches, \
8 SampleKey, MatchCodes, MappedReads
10 class MatchCodeTests(TestCase):
11 def test_initializer(self):
12 self.assertRaises(ValueError, MatchCodes, {'foo':'bar'})
13 self.assertRaises(ValueError, MatchCodes, 3)
16 def test_dictlike(self):
18 match_codes = {'NM':0, 'QC':0, 'RM':0,
19 'U0':0, 'U1':0, 'U2':0,
20 'R0':0, 'R1':0, 'R2':0,
22 self.assertEqual(mc.keys(), match_codes.keys())
23 self.assertEqual(mc.items(), match_codes.items())
24 self.assertEqual(mc.values(), match_codes.values())
25 self.assertRaises(KeyError, mc.__getitem__, 'foo')
27 def test_addition(self):
29 mc2 = MatchCodes({'NM':5, 'QC':10, 'U0': 100})
32 self.assertEqual(mc1['NM'], 5)
33 self.assertEqual(mc1['QC'], 0)
34 self.assertEqual(mc1['U0'], 0)
36 self.assertEqual(mc1['NM'], 10)
37 self.assertEqual(mc1['QC'], 10)
38 self.assertEqual(mc1['U0'], 100)
41 class TestMappedReads(TestCase):
42 def test_initializer(self):
44 self.assertEqual(len(mr1), 0)
45 mr2 = MappedReads({'hg19': 100, 'newcontamUK.fa': 12})
46 self.assertEqual(len(mr2), 2)
47 self.assertEqual(mr2['hg19'], 100)
49 self.assertRaises(ValueError, MappedReads, 3)
51 def test_dictionaryness(self):
54 self.assertEqual(list(mr1.keys()), ['chr9'])
55 self.assertEqual(mr1['chr9'], 7)
56 self.assertEqual(mr1.items(), [('chr9', 7)])
58 self.assertEqual(len(mr1), 0)
60 def test_addition(self):
61 mr1 = MappedReads({'hg19': 100, 'Lambda1': 5})
62 mr2 = MappedReads({'hg19': 100, 'newcontamUK.fa': 10})
65 self.assertEqual(len(mr1), 2)
66 self.assertEqual(len(mr2), 2)
67 self.assertEqual(len(mr3), 3)
69 self.assertEqual(mr1['Lambda1'], 5)
70 self.assertRaises(KeyError, mr1.__getitem__, 'newcontamUK.fa')
71 self.assertEqual(mr1.get('newcontamUK.fa', None), None)
74 self.assertEqual(mr3['Lambda3'], 2)
76 class ElandTests(TestCase):
77 """Test specific Eland modules
79 def compare_match_array(self, current, expected):
80 for key in expected.keys():
81 self.assertEqual(current[key], expected[key],
82 "Key %s: %s != %s" % (key,current[key],expected[key]))
84 def test_eland_score_mapped_mismatches(self):
86 match_codes = {'NM':0, 'QC':0, 'RM':0,
87 'U0':0, 'U1':0, 'U2':0,
88 'R0':0, 'R1':0, 'R2':0,
90 r = eland._score_mapped_mismatches("QC", match_codes)
91 self.assertEqual(r, ElandLane.SCORE_QC)
92 self.compare_match_array(match_codes,
93 {'NM':0, 'QC':1, 'RM':0,
94 'U0':0, 'U1':0, 'U2':0,
95 'R0':0, 'R1':0, 'R2':0,
98 r = eland._score_mapped_mismatches("NM", match_codes)
99 self.assertEqual(r, ElandLane.SCORE_QC)
100 self.compare_match_array(match_codes,
101 {'NM':1, 'QC':1, 'RM':0,
102 'U0':0, 'U1':0, 'U2':0,
103 'R0':0, 'R1':0, 'R2':0,
106 r = eland._score_mapped_mismatches("1:0:0", match_codes)
107 self.assertEqual(r, ElandLane.SCORE_READ)
108 self.compare_match_array(match_codes,
109 {'NM':1, 'QC':1, 'RM':0,
110 'U0':1, 'U1':0, 'U2':0,
111 'R0':0, 'R1':0, 'R2':0,
114 r = eland._score_mapped_mismatches("2:4:16", match_codes)
115 self.assertEqual(r, ElandLane.SCORE_READ)
116 self.compare_match_array(match_codes,
117 {'NM':1, 'QC':1, 'RM':0,
118 'U0':1, 'U1':0, 'U2':0,
119 'R0':2, 'R1':4, 'R2':16,
122 r = eland._score_mapped_mismatches("1:1:1", match_codes)
123 self.assertEqual(r, ElandLane.SCORE_READ)
124 self.compare_match_array(match_codes,
125 {'NM':1, 'QC':1, 'RM':0,
126 'U0':2, 'U1':1, 'U2':1,
127 'R0':2, 'R1':4, 'R2':16,
130 r = eland._score_mapped_mismatches("1:0:0", match_codes)
131 self.assertEqual(r, ElandLane.SCORE_READ)
132 self.compare_match_array(match_codes,
133 {'NM':1, 'QC':1, 'RM':0,
134 'U0':3, 'U1':1, 'U2':1,
135 'R0':2, 'R1':4, 'R2':16,
138 r = eland._score_mapped_mismatches("0:0:1", match_codes)
139 self.assertEqual(r, ElandLane.SCORE_READ)
140 self.compare_match_array(match_codes,
141 {'NM':1, 'QC':1, 'RM':0,
142 'U0':3, 'U1':1, 'U2':2,
143 'R0':2, 'R1':4, 'R2':16,
146 r = eland._score_mapped_mismatches("chr3.fa", match_codes)
147 self.assertEqual(r, ElandLane.SCORE_UNRECOGNIZED)
148 self.compare_match_array(match_codes,
149 {'NM':1, 'QC':1, 'RM':0,
150 'U0':3, 'U1':1, 'U2':2,
151 'R0':2, 'R1':4, 'R2':16,
154 def test_count_mapped_export(self):
157 r = eland._count_mapped_export(mapped_reads, "chr3.fa", "38")
158 self.assertEqual(mapped_reads['chr3.fa'], 1)
159 self.assertEqual(r, 'U0')
162 r = eland._count_mapped_export(mapped_reads, "chr3.fa", "36A4")
163 self.assertEqual(mapped_reads['chr3.fa'], 1)
164 self.assertEqual(r, 'U1')
167 r = eland._count_mapped_export(mapped_reads, "chr3.fa", "30A2T2")
168 self.assertEqual(mapped_reads['chr3.fa'], 1)
169 self.assertEqual(r, 'U2')
172 r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26AG2T2")
173 self.assertEqual(mapped_reads['chr3.fa'], 1)
174 self.assertEqual(r, 'U2')
178 r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26^AG$4")
179 self.assertEqual(mapped_reads['chr3.fa'], 1)
180 self.assertEqual(r, 'U2')
184 r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26^2$4")
185 self.assertEqual(mapped_reads['chr3.fa'], 1)
186 self.assertEqual(r, 'U0')
188 def test_update_eland_export(self):
189 """Test scoring the pipeline export file"""
191 qc_read = StringIO("ILLUMINA-33A494 1 1 1 3291 1036 0 1 GANNTCCTCACCCGACANNNNNNNANNNCGGGNNACTC \XBB]^^^^[`````BBBBBBBBBBBBBBBBBBBBBBB QC")
192 one_read_exact = StringIO("ILLUMINA-33A494 1 1 1 2678 1045 0 1 AAGGTGAAGAAGGAGATGNNGATGATGACGACGATAGA ]]WW[[W]W]]R\WWZ[RBBS^\XVa____]W[]]___ chrX.fa 148341829 F 38 45")
193 one_read_mismatch = StringIO("ILLUMINA-33A494 1 1 1 2678 1045 0 1 AAGGTGAAGAAGGAGATGNNGATGATGACGACGATAGA ]]WW[[W]W]]R\WWZ[RBBS^\XVa____]W[]]___ chrX.fa 148341829 F 18AA15G1T 45")
194 multi_read = StringIO("ILLUMINA-33A494 1 1 1 4405 1046 0 1 GTGGTTTCGCTGGATAGTNNGTAGGGACAGTGGGAATC ``````````__a__V^XBB^SW^^a_____a______ 9:2:1")
196 match_codes, match_reads, reads = eland._update_eland_export(qc_read)
197 self.compare_match_array(match_codes,
198 {'NM':0, 'QC':1, 'RM':0,
199 'U0':0, 'U1':0, 'U2':0,
200 'R0':0, 'R1':0, 'R2':0,
202 self.assertEqual(len(match_reads), 0)
203 self.assertEqual(reads, 1)
205 match_codes, match_reads, reads = eland._update_eland_export(one_read_exact)
206 self.compare_match_array(match_codes,
207 {'NM':0, 'QC':0, 'RM':0,
208 'U0':1, 'U1':0, 'U2':0,
209 'R0':0, 'R1':0, 'R2':0,
211 self.assertEqual(match_reads['chrX.fa'], 1)
212 self.assertEqual(reads, 1)
214 match_codes, match_reads, reads = eland._update_eland_export(one_read_mismatch)
215 self.compare_match_array(match_codes,
216 {'NM':0, 'QC':0, 'RM':0,
217 'U0':0, 'U1':0, 'U2':1,
218 'R0':0, 'R1':0, 'R2':0,
220 self.assertEqual(match_reads['chrX.fa'], 1)
221 self.assertEqual(reads, 1)
223 match_codes, match_reads, reads = eland._update_eland_export(multi_read)
224 self.compare_match_array(match_codes,
225 {'NM':0, 'QC':0, 'RM':0,
226 'U0':0, 'U1':0, 'U2':1,
227 'R0':9, 'R1':2, 'R2':0,
229 self.assertEqual(len(match_reads), 0)
230 self.assertEqual(reads, 1)
232 def test_ordering(self):
234 sl3 = SampleKey(lane=3, read=1, sample='33333')
235 sl1 = SampleKey(lane=1, read=1, sample='11111')
236 sl5 = SampleKey(lane=5, read=1, sample='55555')
237 e.results[sl5] = 'Lane5'
238 e.results[sl3] = 'Lane3'
239 e.results[sl1] = 'Lane1'
242 self.assertEqual(e_list[0], 'Lane1')
243 self.assertEqual(e_list[1], 'Lane3')
244 self.assertEqual(e_list[2], 'Lane5')
246 class TestElandMatches(TestCase):
247 def test_eland_replacing(self):
248 key = SampleKey(1, 1, 's')
251 em.add('s_1_sequence.txt')
252 self.assertEqual(len(em), 1)
253 self.assertEqual(len(em[key]), 1)
254 filename = iter(em[key]).next().filename
255 self.assertEqual(filename, 's_1_sequence.txt')
256 self.assertEqual(em.keys(), [key])
257 em.add('s_1_eland_result.txt')
258 self.assertEqual(len(em), 1)
259 self.assertEqual(len(em[key]), 1)
260 filename = iter(em[key]).next().filename
261 self.assertEqual(filename, 's_1_eland_result.txt')
262 self.assertEqual(em.keys(), [key])
264 def test_parts(self):
265 key11111 = SampleKey(1, 1, '11111')
266 key11112 = SampleKey(1, 1, '11112')
269 em.add('11111_CCAATT_L001_R1_001_export.txt.gz')
270 em.add('11111_CCAATT_L001_R1_002_export.txt.gz')
271 em.add('11111_CCAATT_L001_R1_003_export.txt.gz')
272 em.add('11112_AAGGTT_L001_R1_001_export.txt.gz')
273 em.add('11112_AAGGTT_L001_R1_002_export.txt.gz')
274 self.assertEqual(len(em), 2)
275 self.assertEqual(len(em[key11111]), 3)
276 self.assertEqual(len(em[key11112]), 2)
279 from unittest import TestSuite, defaultTestLoader
281 suite.addTests(defaultTestLoader.loadTestsFromTestCase(MatchCodeTests))
282 suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestMappedReads))
283 suite.addTests(defaultTestLoader.loadTestsFromTestCase(ElandTests))
284 suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestElandMatches))
288 if __name__ == "__main__":
289 from unittest import main
290 main(defaultTest="suite")