Change unittest2 back into unittest.
[htsworkflow.git] / htsworkflow / pipelines / test / test_eland.py
1 #!/usr/bin/env python
2 """More direct synthetic test cases for the eland output file processing
3 """
4 from StringIO import StringIO
5 from unittest import TestCase
6
7 from htsworkflow.pipelines.eland import ELAND, ElandLane, ElandMatches, \
8      SampleKey, MatchCodes, MappedReads
9
10 class MatchCodeTests(TestCase):
11     def test_initializer(self):
12         self.assertRaises(ValueError, MatchCodes, {'foo':'bar'})
13         self.assertRaises(ValueError, MatchCodes, 3)
14         mc = MatchCodes(None)
15
16     def test_dictlike(self):
17         mc = MatchCodes()
18         match_codes = {'NM':0, 'QC':0, 'RM':0,
19                        'U0':0, 'U1':0, 'U2':0,
20                        'R0':0, 'R1':0, 'R2':0,
21                       }
22         self.assertEqual(mc.keys(), match_codes.keys())
23         self.assertEqual(mc.items(), match_codes.items())
24         self.assertEqual(mc.values(), match_codes.values())
25         self.assertRaises(KeyError, mc.__getitem__, 'foo')
26
27     def test_addition(self):
28         mc1 = MatchCodes()
29         mc2 = MatchCodes({'NM':5, 'QC':10, 'U0': 100})
30
31         mc1['NM'] += 5
32         self.assertEqual(mc1['NM'], 5)
33         self.assertEqual(mc1['QC'], 0)
34         self.assertEqual(mc1['U0'], 0)
35         mc1 += mc2
36         self.assertEqual(mc1['NM'], 10)
37         self.assertEqual(mc1['QC'], 10)
38         self.assertEqual(mc1['U0'], 100)
39
40
41 class TestMappedReads(TestCase):
42     def test_initializer(self):
43         mr1 = MappedReads()
44         self.assertEqual(len(mr1), 0)
45         mr2 = MappedReads({'hg19': 100, 'newcontamUK.fa': 12})
46         self.assertEqual(len(mr2), 2)
47         self.assertEqual(mr2['hg19'], 100)
48
49         self.assertRaises(ValueError, MappedReads, 3)
50
51     def test_dictionaryness(self):
52         mr1 = MappedReads()
53         mr1['chr9'] = 7
54         self.assertEqual(list(mr1.keys()), ['chr9'])
55         self.assertEqual(mr1['chr9'], 7)
56         self.assertEqual(mr1.items(), [('chr9', 7)])
57         del mr1['chr9']
58         self.assertEqual(len(mr1), 0)
59
60     def test_addition(self):
61         mr1 = MappedReads({'hg19': 100, 'Lambda1': 5})
62         mr2 = MappedReads({'hg19': 100, 'newcontamUK.fa': 10})
63         mr3 = mr1 + mr2
64
65         self.assertEqual(len(mr1), 2)
66         self.assertEqual(len(mr2), 2)
67         self.assertEqual(len(mr3), 3)
68
69         self.assertEqual(mr1['Lambda1'], 5)
70         self.assertRaises(KeyError, mr1.__getitem__, 'newcontamUK.fa')
71         self.assertEqual(mr1.get('newcontamUK.fa', None), None)
72
73         mr3['Lambda3'] = 2
74         self.assertEqual(mr3['Lambda3'], 2)
75
76 class ElandTests(TestCase):
77     """Test specific Eland modules
78     """
79     def compare_match_array(self, current, expected):
80         for key in expected.keys():
81             self.assertEqual(current[key], expected[key],
82                  "Key %s: %s != %s" % (key,current[key],expected[key]))
83
84     def test_eland_score_mapped_mismatches(self):
85         eland = ElandLane()
86         match_codes = {'NM':0, 'QC':0, 'RM':0,
87                        'U0':0, 'U1':0, 'U2':0,
88                        'R0':0, 'R1':0, 'R2':0,
89                       }
90         r = eland._score_mapped_mismatches("QC", match_codes)
91         self.assertEqual(r, ElandLane.SCORE_QC)
92         self.compare_match_array(match_codes,
93                                  {'NM':0, 'QC':1, 'RM':0,
94                                   'U0':0, 'U1':0, 'U2':0,
95                                   'R0':0, 'R1':0, 'R2':0,
96                                   })
97
98         r = eland._score_mapped_mismatches("NM", match_codes)
99         self.assertEqual(r, ElandLane.SCORE_QC)
100         self.compare_match_array(match_codes,
101                                  {'NM':1, 'QC':1, 'RM':0,
102                                   'U0':0, 'U1':0, 'U2':0,
103                                   'R0':0, 'R1':0, 'R2':0,
104                                   })
105
106         r = eland._score_mapped_mismatches("1:0:0", match_codes)
107         self.assertEqual(r, ElandLane.SCORE_READ)
108         self.compare_match_array(match_codes,
109                                  {'NM':1, 'QC':1, 'RM':0,
110                                   'U0':1, 'U1':0, 'U2':0,
111                                   'R0':0, 'R1':0, 'R2':0,
112                                   })
113
114         r = eland._score_mapped_mismatches("2:4:16", match_codes)
115         self.assertEqual(r, ElandLane.SCORE_READ)
116         self.compare_match_array(match_codes,
117                                  {'NM':1, 'QC':1, 'RM':0,
118                                   'U0':1, 'U1':0, 'U2':0,
119                                   'R0':2, 'R1':4, 'R2':16,
120                                   })
121
122         r = eland._score_mapped_mismatches("1:1:1", match_codes)
123         self.assertEqual(r, ElandLane.SCORE_READ)
124         self.compare_match_array(match_codes,
125                                  {'NM':1, 'QC':1, 'RM':0,
126                                   'U0':2, 'U1':1, 'U2':1,
127                                   'R0':2, 'R1':4, 'R2':16,
128                                   })
129
130         r = eland._score_mapped_mismatches("1:0:0", match_codes)
131         self.assertEqual(r, ElandLane.SCORE_READ)
132         self.compare_match_array(match_codes,
133                                  {'NM':1, 'QC':1, 'RM':0,
134                                   'U0':3, 'U1':1, 'U2':1,
135                                   'R0':2, 'R1':4, 'R2':16,
136                                   })
137
138         r = eland._score_mapped_mismatches("0:0:1", match_codes)
139         self.assertEqual(r, ElandLane.SCORE_READ)
140         self.compare_match_array(match_codes,
141                                  {'NM':1, 'QC':1, 'RM':0,
142                                   'U0':3, 'U1':1, 'U2':2,
143                                   'R0':2, 'R1':4, 'R2':16,
144                                   })
145
146         r = eland._score_mapped_mismatches("chr3.fa", match_codes)
147         self.assertEqual(r, ElandLane.SCORE_UNRECOGNIZED)
148         self.compare_match_array(match_codes,
149                                  {'NM':1, 'QC':1, 'RM':0,
150                                   'U0':3, 'U1':1, 'U2':2,
151                                   'R0':2, 'R1':4, 'R2':16,
152                                   })
153
154     def test_count_mapped_export(self):
155         eland = ElandLane()
156         mapped_reads = {}
157         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "38")
158         self.assertEqual(mapped_reads['chr3.fa'], 1)
159         self.assertEqual(r, 'U0')
160
161         mapped_reads = {}
162         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "36A4")
163         self.assertEqual(mapped_reads['chr3.fa'], 1)
164         self.assertEqual(r, 'U1')
165
166         mapped_reads = {}
167         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "30A2T2")
168         self.assertEqual(mapped_reads['chr3.fa'], 1)
169         self.assertEqual(r, 'U2')
170
171         mapped_reads = {}
172         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26AG2T2")
173         self.assertEqual(mapped_reads['chr3.fa'], 1)
174         self.assertEqual(r, 'U2')
175
176         # deletion
177         mapped_reads = {}
178         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26^AG$4")
179         self.assertEqual(mapped_reads['chr3.fa'], 1)
180         self.assertEqual(r, 'U2')
181
182         # insertion
183         mapped_reads = {}
184         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26^2$4")
185         self.assertEqual(mapped_reads['chr3.fa'], 1)
186         self.assertEqual(r, 'U0')
187
188     def test_update_eland_export(self):
189         """Test scoring the pipeline export file"""
190         eland = ElandLane()
191         qc_read = StringIO("ILLUMINA-33A494 1       1       1       3291    1036    0       1       GANNTCCTCACCCGACANNNNNNNANNNCGGGNNACTC  \XBB]^^^^[`````BBBBBBBBBBBBBBBBBBBBBBB  QC")
192         one_read_exact = StringIO("ILLUMINA-33A494 1       1       1       2678    1045    0       1       AAGGTGAAGAAGGAGATGNNGATGATGACGACGATAGA  ]]WW[[W]W]]R\WWZ[RBBS^\XVa____]W[]]___  chrX.fa         148341829       F       38       45")
193         one_read_mismatch = StringIO("ILLUMINA-33A494 1       1       1       2678    1045    0       1       AAGGTGAAGAAGGAGATGNNGATGATGACGACGATAGA  ]]WW[[W]W]]R\WWZ[RBBS^\XVa____]W[]]___  chrX.fa         148341829       F       18AA15G1T       45")
194         multi_read = StringIO("ILLUMINA-33A494 1       1       1       4405    1046    0       1       GTGGTTTCGCTGGATAGTNNGTAGGGACAGTGGGAATC  ``````````__a__V^XBB^SW^^a_____a______  9:2:1")
195
196         match_codes, match_reads, reads = eland._update_eland_export(qc_read)
197         self.compare_match_array(match_codes,
198                                  {'NM':0, 'QC':1, 'RM':0,
199                                   'U0':0, 'U1':0, 'U2':0,
200                                   'R0':0, 'R1':0, 'R2':0,
201                                   })
202         self.assertEqual(len(match_reads), 0)
203         self.assertEqual(reads, 1)
204
205         match_codes, match_reads, reads = eland._update_eland_export(one_read_exact)
206         self.compare_match_array(match_codes,
207                                  {'NM':0, 'QC':0, 'RM':0,
208                                   'U0':1, 'U1':0, 'U2':0,
209                                   'R0':0, 'R1':0, 'R2':0,
210                                   })
211         self.assertEqual(match_reads['chrX.fa'], 1)
212         self.assertEqual(reads, 1)
213
214         match_codes, match_reads, reads = eland._update_eland_export(one_read_mismatch)
215         self.compare_match_array(match_codes,
216                                  {'NM':0, 'QC':0, 'RM':0,
217                                   'U0':0, 'U1':0, 'U2':1,
218                                   'R0':0, 'R1':0, 'R2':0,
219                                   })
220         self.assertEqual(match_reads['chrX.fa'], 1)
221         self.assertEqual(reads, 1)
222
223         match_codes, match_reads, reads = eland._update_eland_export(multi_read)
224         self.compare_match_array(match_codes,
225                                  {'NM':0, 'QC':0, 'RM':0,
226                                   'U0':0, 'U1':0, 'U2':1,
227                                   'R0':9, 'R1':2, 'R2':0,
228                                   })
229         self.assertEqual(len(match_reads), 0)
230         self.assertEqual(reads, 1)
231
232     def test_ordering(self):
233         e = ELAND()
234         sl3 = SampleKey(lane=3, read=1, sample='33333')
235         sl1 = SampleKey(lane=1, read=1, sample='11111')
236         sl5 = SampleKey(lane=5, read=1, sample='55555')
237         e.results[sl5] = 'Lane5'
238         e.results[sl3] = 'Lane3'
239         e.results[sl1] = 'Lane1'
240
241         e_list = e.values()
242         self.assertEqual(e_list[0], 'Lane1')
243         self.assertEqual(e_list[1], 'Lane3')
244         self.assertEqual(e_list[2], 'Lane5')
245
246 class TestElandMatches(TestCase):
247     def test_eland_replacing(self):
248         key = SampleKey(1, 1, 's')
249         e = ELAND()
250         em = ElandMatches(e)
251         em.add('s_1_sequence.txt')
252         self.assertEqual(len(em), 1)
253         self.assertEqual(len(em[key]), 1)
254         filename = iter(em[key]).next().filename
255         self.assertEqual(filename, 's_1_sequence.txt')
256         self.assertEqual(em.keys(), [key])
257         em.add('s_1_eland_result.txt')
258         self.assertEqual(len(em), 1)
259         self.assertEqual(len(em[key]), 1)
260         filename = iter(em[key]).next().filename
261         self.assertEqual(filename, 's_1_eland_result.txt')
262         self.assertEqual(em.keys(), [key])
263
264     def test_parts(self):
265         key11111 = SampleKey(1, 1, '11111')
266         key11112 = SampleKey(1, 1, '11112')
267         e = ELAND()
268         em = ElandMatches(e)
269         em.add('11111_CCAATT_L001_R1_001_export.txt.gz')
270         em.add('11111_CCAATT_L001_R1_002_export.txt.gz')
271         em.add('11111_CCAATT_L001_R1_003_export.txt.gz')
272         em.add('11112_AAGGTT_L001_R1_001_export.txt.gz')
273         em.add('11112_AAGGTT_L001_R1_002_export.txt.gz')
274         self.assertEqual(len(em), 2)
275         self.assertEqual(len(em[key11111]), 3)
276         self.assertEqual(len(em[key11112]), 2)
277
278 def suite():
279     from unittest import TestSuite, defaultTestLoader
280     suite = TestSuite()
281     suite.addTests(defaultTestLoader.loadTestsFromTestCase(MatchCodeTests))
282     suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestMappedReads))
283     suite.addTests(defaultTestLoader.loadTestsFromTestCase(ElandTests))
284     suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestElandMatches))
285     return suite
286
287
288 if __name__ == "__main__":
289     from unittest import main
290     main(defaultTest="suite")