Merge branch 'master' of mus.cacr.caltech.edu:htsworkflow
[htsworkflow.git] / htsworkflow / pipelines / test / test_eland.py
1 #!/usr/bin/env python
2 """More direct synthetic test cases for the eland output file processing
3 """
4 from StringIO import StringIO
5 import unittest
6
7 from htsworkflow.pipelines.eland import ElandLane, MatchCodes, MappedReads
8
9 class MatchCodeTests(unittest.TestCase):
10     def test_initializer(self):
11         self.assertRaises(ValueError, MatchCodes, {'foo':'bar'})
12         self.assertRaises(ValueError, MatchCodes, 3)
13         mc = MatchCodes(None)
14
15     def test_dictlike(self):
16         mc = MatchCodes()
17         match_codes = {'NM':0, 'QC':0, 'RM':0,
18                        'U0':0, 'U1':0, 'U2':0,
19                        'R0':0, 'R1':0, 'R2':0,
20                       }
21         self.assertEqual(mc.keys(), match_codes.keys())
22         self.assertEqual(mc.items(), match_codes.items())
23         self.assertEqual(mc.values(), match_codes.values())
24         self.assertRaises(KeyError, mc.__getitem__, 'foo')
25
26     def test_addition(self):
27         mc1 = MatchCodes()
28         mc2 = MatchCodes({'NM':5, 'QC':10, 'U0': 100})
29
30         mc1['NM'] += 5
31         self.assertEqual(mc1['NM'], 5)
32         self.assertEqual(mc1['QC'], 0)
33         self.assertEqual(mc1['U0'], 0)
34         mc1 += mc2
35         self.assertEqual(mc1['NM'], 10)
36         self.assertEqual(mc1['QC'], 10)
37         self.assertEqual(mc1['U0'], 100)
38
39
40 class TestMappedReads(unittest.TestCase):
41     def test_initializer(self):
42         mr1 = MappedReads()
43         self.assertEqual(len(mr1), 0)
44         mr2 = MappedReads({'hg19': 100, 'newcontamUK.fa': 12})
45         self.assertEqual(len(mr2), 2)
46         self.assertEqual(mr2['hg19'], 100)
47
48         self.assertRaises(ValueError, MappedReads, 3)
49
50     def test_dictionaryness(self):
51         mr1 = MappedReads()
52         mr1['chr9'] = 7
53         self.assertEqual(list(mr1.keys()), ['chr9'])
54         self.assertEqual(mr1['chr9'], 7)
55         self.assertEqual(mr1.items(), [('chr9', 7)])
56         del mr1['chr9']
57         self.assertEqual(len(mr1), 0)
58
59     def test_addition(self):
60         mr1 = MappedReads({'hg19': 100, 'Lambda1': 5})
61         mr2 = MappedReads({'hg19': 100, 'newcontamUK.fa': 10})
62         mr3 = mr1 + mr2
63
64         self.assertEqual(len(mr1), 2)
65         self.assertEqual(len(mr2), 2)
66         self.assertEqual(len(mr3), 3)
67
68         self.assertEqual(mr1['Lambda1'], 5)
69         self.assertRaises(KeyError, mr1.__getitem__, 'newcontamUK.fa')
70         self.assertEqual(mr1.get('newcontamUK.fa', None), None)
71
72         mr3['Lambda3'] = 2
73         self.assertEqual(mr3['Lambda3'], 2)
74
75 class ElandTests(unittest.TestCase):
76     """Test specific Eland modules
77     """
78     def compare_match_array(self, current, expected):
79         for key in expected.keys():
80             self.assertEqual(current[key], expected[key],
81                  "Key %s: %s != %s" % (key,current[key],expected[key]))
82
83     def test_eland_score_mapped_mismatches(self):
84         eland = ElandLane()
85         match_codes = {'NM':0, 'QC':0, 'RM':0,
86                        'U0':0, 'U1':0, 'U2':0,
87                        'R0':0, 'R1':0, 'R2':0,
88                       }
89         r = eland._score_mapped_mismatches("QC", match_codes)
90         self.assertEqual(r, ElandLane.SCORE_QC)
91         self.compare_match_array(match_codes,
92                                  {'NM':0, 'QC':1, 'RM':0,
93                                   'U0':0, 'U1':0, 'U2':0,
94                                   'R0':0, 'R1':0, 'R2':0,
95                                   })
96
97         r = eland._score_mapped_mismatches("NM", match_codes)
98         self.assertEqual(r, ElandLane.SCORE_QC)
99         self.compare_match_array(match_codes,
100                                  {'NM':1, 'QC':1, 'RM':0,
101                                   'U0':0, 'U1':0, 'U2':0,
102                                   'R0':0, 'R1':0, 'R2':0,
103                                   })
104
105         r = eland._score_mapped_mismatches("1:0:0", match_codes)
106         self.assertEqual(r, ElandLane.SCORE_READ)
107         self.compare_match_array(match_codes,
108                                  {'NM':1, 'QC':1, 'RM':0,
109                                   'U0':1, 'U1':0, 'U2':0,
110                                   'R0':0, 'R1':0, 'R2':0,
111                                   })
112
113         r = eland._score_mapped_mismatches("2:4:16", match_codes)
114         self.assertEqual(r, ElandLane.SCORE_READ)
115         self.compare_match_array(match_codes,
116                                  {'NM':1, 'QC':1, 'RM':0,
117                                   'U0':1, 'U1':0, 'U2':0,
118                                   'R0':2, 'R1':4, 'R2':16,
119                                   })
120
121         r = eland._score_mapped_mismatches("1:1:1", match_codes)
122         self.assertEqual(r, ElandLane.SCORE_READ)
123         self.compare_match_array(match_codes,
124                                  {'NM':1, 'QC':1, 'RM':0,
125                                   'U0':2, 'U1':1, 'U2':1,
126                                   'R0':2, 'R1':4, 'R2':16,
127                                   })
128
129         r = eland._score_mapped_mismatches("1:0:0", match_codes)
130         self.assertEqual(r, ElandLane.SCORE_READ)
131         self.compare_match_array(match_codes,
132                                  {'NM':1, 'QC':1, 'RM':0,
133                                   'U0':3, 'U1':1, 'U2':1,
134                                   'R0':2, 'R1':4, 'R2':16,
135                                   })
136
137         r = eland._score_mapped_mismatches("0:0:1", match_codes)
138         self.assertEqual(r, ElandLane.SCORE_READ)
139         self.compare_match_array(match_codes,
140                                  {'NM':1, 'QC':1, 'RM':0,
141                                   'U0':3, 'U1':1, 'U2':2,
142                                   'R0':2, 'R1':4, 'R2':16,
143                                   })
144
145         r = eland._score_mapped_mismatches("chr3.fa", match_codes)
146         self.assertEqual(r, ElandLane.SCORE_UNRECOGNIZED)
147         self.compare_match_array(match_codes,
148                                  {'NM':1, 'QC':1, 'RM':0,
149                                   'U0':3, 'U1':1, 'U2':2,
150                                   'R0':2, 'R1':4, 'R2':16,
151                                   })
152
153     def test_count_mapped_export(self):
154         eland = ElandLane()
155         mapped_reads = {}
156         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "38")
157         self.assertEqual(mapped_reads['chr3.fa'], 1)
158         self.assertEqual(r, 'U0')
159
160         mapped_reads = {}
161         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "36A4")
162         self.assertEqual(mapped_reads['chr3.fa'], 1)
163         self.assertEqual(r, 'U1')
164
165         mapped_reads = {}
166         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "30A2T2")
167         self.assertEqual(mapped_reads['chr3.fa'], 1)
168         self.assertEqual(r, 'U2')
169
170         mapped_reads = {}
171         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26AG2T2")
172         self.assertEqual(mapped_reads['chr3.fa'], 1)
173         self.assertEqual(r, 'U2')
174
175         # deletion
176         mapped_reads = {}
177         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26^AG$4")
178         self.assertEqual(mapped_reads['chr3.fa'], 1)
179         self.assertEqual(r, 'U2')
180
181         # insertion
182         mapped_reads = {}
183         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26^2$4")
184         self.assertEqual(mapped_reads['chr3.fa'], 1)
185         self.assertEqual(r, 'U0')
186
187     def test_update_eland_export(self):
188         """Test scoring the pipeline export file"""
189         eland = ElandLane()
190         qc_read = StringIO("ILLUMINA-33A494 1       1       1       3291    1036    0       1       GANNTCCTCACCCGACANNNNNNNANNNCGGGNNACTC  \XBB]^^^^[`````BBBBBBBBBBBBBBBBBBBBBBB  QC")
191         one_read_exact = StringIO("ILLUMINA-33A494 1       1       1       2678    1045    0       1       AAGGTGAAGAAGGAGATGNNGATGATGACGACGATAGA  ]]WW[[W]W]]R\WWZ[RBBS^\XVa____]W[]]___  chrX.fa         148341829       F       38       45")
192         one_read_mismatch = StringIO("ILLUMINA-33A494 1       1       1       2678    1045    0       1       AAGGTGAAGAAGGAGATGNNGATGATGACGACGATAGA  ]]WW[[W]W]]R\WWZ[RBBS^\XVa____]W[]]___  chrX.fa         148341829       F       18AA15G1T       45")
193         multi_read = StringIO("ILLUMINA-33A494 1       1       1       4405    1046    0       1       GTGGTTTCGCTGGATAGTNNGTAGGGACAGTGGGAATC  ``````````__a__V^XBB^SW^^a_____a______  9:2:1")
194
195         match_codes, match_reads, reads = eland._update_eland_export(qc_read)
196         self.compare_match_array(match_codes,
197                                  {'NM':0, 'QC':1, 'RM':0,
198                                   'U0':0, 'U1':0, 'U2':0,
199                                   'R0':0, 'R1':0, 'R2':0,
200                                   })
201         self.assertEqual(len(match_reads), 0)
202         self.assertEqual(reads, 1)
203
204         match_codes, match_reads, reads = eland._update_eland_export(one_read_exact)
205         self.compare_match_array(match_codes,
206                                  {'NM':0, 'QC':0, 'RM':0,
207                                   'U0':1, 'U1':0, 'U2':0,
208                                   'R0':0, 'R1':0, 'R2':0,
209                                   })
210         self.assertEqual(match_reads['chrX.fa'], 1)
211         self.assertEqual(reads, 1)
212
213         match_codes, match_reads, reads = eland._update_eland_export(one_read_mismatch)
214         self.compare_match_array(match_codes,
215                                  {'NM':0, 'QC':0, 'RM':0,
216                                   'U0':0, 'U1':0, 'U2':1,
217                                   'R0':0, 'R1':0, 'R2':0,
218                                   })
219         self.assertEqual(match_reads['chrX.fa'], 1)
220         self.assertEqual(reads, 1)
221
222         match_codes, match_reads, reads = eland._update_eland_export(multi_read)
223         self.compare_match_array(match_codes,
224                                  {'NM':0, 'QC':0, 'RM':0,
225                                   'U0':0, 'U1':0, 'U2':1,
226                                   'R0':9, 'R1':2, 'R2':0,
227                                   })
228         self.assertEqual(len(match_reads), 0)
229         self.assertEqual(reads, 1)
230
231
232 if __name__ == "__main__":
233     unittest.main()