Fix typo in a function name.
[htsworkflow.git] / htsworkflow / pipelines / test / test_eland.py
1 #!/usr/bin/env python
2 """More direct synthetic test cases for the eland output file processing
3 """
4 from StringIO import StringIO
5 import unittest
6
7 from htsworkflow.pipelines.eland import ElandLane
8
9 class ElandTests(unittest.TestCase):
10     """Test specific Eland modules
11     """
12     def compare_match_array(self, current, expected):
13         for key in expected.keys():
14             self.failUnlessEqual(current[key], expected[key],
15                  "Key %s: %s != %s" % (key,current[key],expected[key]))
16
17     def test_eland_score_mapped_mismatches(self):
18         eland = ElandLane()
19         match_codes = {'NM':0, 'QC':0, 'RM':0,
20                        'U0':0, 'U1':0, 'U2':0,
21                        'R0':0, 'R1':0, 'R2':0,
22                       }
23         r = eland._score_mapped_mismatches("QC", match_codes)
24         self.failUnlessEqual(r, ElandLane.SCORE_QC)
25         self.compare_match_array(match_codes, 
26                                  {'NM':0, 'QC':1, 'RM':0,
27                                   'U0':0, 'U1':0, 'U2':0,
28                                   'R0':0, 'R1':0, 'R2':0,
29                                   })
30
31         r = eland._score_mapped_mismatches("NM", match_codes)
32         self.failUnlessEqual(r, ElandLane.SCORE_QC)
33         self.compare_match_array(match_codes, 
34                                  {'NM':1, 'QC':1, 'RM':0,
35                                   'U0':0, 'U1':0, 'U2':0,
36                                   'R0':0, 'R1':0, 'R2':0,
37                                   })
38
39         r = eland._score_mapped_mismatches("1:0:0", match_codes)
40         self.failUnlessEqual(r, ElandLane.SCORE_READ)
41         self.compare_match_array(match_codes, 
42                                  {'NM':1, 'QC':1, 'RM':0,
43                                   'U0':1, 'U1':0, 'U2':0,
44                                   'R0':0, 'R1':0, 'R2':0,
45                                   })
46
47         r = eland._score_mapped_mismatches("2:4:16", match_codes)
48         self.failUnlessEqual(r, ElandLane.SCORE_READ)
49         self.compare_match_array(match_codes, 
50                                  {'NM':1, 'QC':1, 'RM':0,
51                                   'U0':1, 'U1':0, 'U2':0,
52                                   'R0':2, 'R1':4, 'R2':16,
53                                   })
54
55         r = eland._score_mapped_mismatches("1:1:1", match_codes)
56         self.failUnlessEqual(r, ElandLane.SCORE_READ)
57         self.compare_match_array(match_codes, 
58                                  {'NM':1, 'QC':1, 'RM':0,
59                                   'U0':2, 'U1':1, 'U2':1,
60                                   'R0':2, 'R1':4, 'R2':16,
61                                   })
62
63         r = eland._score_mapped_mismatches("1:0:0", match_codes)
64         self.failUnlessEqual(r, ElandLane.SCORE_READ)
65         self.compare_match_array(match_codes, 
66                                  {'NM':1, 'QC':1, 'RM':0,
67                                   'U0':3, 'U1':1, 'U2':1,
68                                   'R0':2, 'R1':4, 'R2':16,
69                                   })
70
71         r = eland._score_mapped_mismatches("0:0:1", match_codes)
72         self.failUnlessEqual(r, ElandLane.SCORE_READ)
73         self.compare_match_array(match_codes, 
74                                  {'NM':1, 'QC':1, 'RM':0,
75                                   'U0':3, 'U1':1, 'U2':2,
76                                   'R0':2, 'R1':4, 'R2':16,
77                                   })
78
79         r = eland._score_mapped_mismatches("chr3.fa", match_codes)
80         self.failUnlessEqual(r, ElandLane.SCORE_UNRECOGNIZED)
81         self.compare_match_array(match_codes, 
82                                  {'NM':1, 'QC':1, 'RM':0,
83                                   'U0':3, 'U1':1, 'U2':2,
84                                   'R0':2, 'R1':4, 'R2':16,
85                                   })
86                                  
87     def test_count_mapped_export(self):
88         eland = ElandLane()
89         mapped_reads = {}
90         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "38")
91         self.failUnlessEqual(mapped_reads['chr3.fa'], 1)
92         self.failUnlessEqual(r, 'U0')
93
94         mapped_reads = {}
95         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "36A4")
96         self.failUnlessEqual(mapped_reads['chr3.fa'], 1)
97         self.failUnlessEqual(r, 'U1')
98
99         mapped_reads = {}
100         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "30A2T2")
101         self.failUnlessEqual(mapped_reads['chr3.fa'], 1)
102         self.failUnlessEqual(r, 'U2')
103
104         mapped_reads = {}
105         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26AG2T2")
106         self.failUnlessEqual(mapped_reads['chr3.fa'], 1)
107         self.failUnlessEqual(r, 'U2')
108
109         # deletion
110         mapped_reads = {}
111         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26^AG$4")
112         self.failUnlessEqual(mapped_reads['chr3.fa'], 1)
113         self.failUnlessEqual(r, 'U2')
114
115         # insertion
116         mapped_reads = {}
117         r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26^2$4")
118         self.failUnlessEqual(mapped_reads['chr3.fa'], 1)
119         self.failUnlessEqual(r, 'U0')
120
121     def test_update_eland_export(self):
122         """Test scoring the pipeline export file"""
123         eland = ElandLane()
124         qc_read = StringIO("ILLUMINA-33A494 1       1       1       3291    1036    0       1       GANNTCCTCACCCGACANNNNNNNANNNCGGGNNACTC  \XBB]^^^^[`````BBBBBBBBBBBBBBBBBBBBBBB  QC")
125         one_read_exact = StringIO("ILLUMINA-33A494 1       1       1       2678    1045    0       1       AAGGTGAAGAAGGAGATGNNGATGATGACGACGATAGA  ]]WW[[W]W]]R\WWZ[RBBS^\XVa____]W[]]___  chrX.fa         148341829       F       38       45")
126         one_read_mismatch = StringIO("ILLUMINA-33A494 1       1       1       2678    1045    0       1       AAGGTGAAGAAGGAGATGNNGATGATGACGACGATAGA  ]]WW[[W]W]]R\WWZ[RBBS^\XVa____]W[]]___  chrX.fa         148341829       F       18AA15G1T       45")
127         multi_read = StringIO("ILLUMINA-33A494 1       1       1       4405    1046    0       1       GTGGTTTCGCTGGATAGTNNGTAGGGACAGTGGGAATC  ``````````__a__V^XBB^SW^^a_____a______  9:2:1")
128
129         match_codes, match_reads, reads = eland._update_eland_export(qc_read)
130         self.compare_match_array(match_codes, 
131                                  {'NM':0, 'QC':1, 'RM':0,
132                                   'U0':0, 'U1':0, 'U2':0,
133                                   'R0':0, 'R1':0, 'R2':0,
134                                   })
135         self.failUnlessEqual(len(match_reads), 0)
136         self.failUnlessEqual(reads, 1)
137
138         match_codes, match_reads, reads = eland._update_eland_export(one_read_exact)
139         self.compare_match_array(match_codes, 
140                                  {'NM':0, 'QC':0, 'RM':0,
141                                   'U0':1, 'U1':0, 'U2':0,
142                                   'R0':0, 'R1':0, 'R2':0,
143                                   })
144         self.failUnlessEqual(match_reads['chrX.fa'], 1)
145         self.failUnlessEqual(reads, 1)
146
147         match_codes, match_reads, reads = eland._update_eland_export(one_read_mismatch)
148         self.compare_match_array(match_codes, 
149                                  {'NM':0, 'QC':0, 'RM':0,
150                                   'U0':0, 'U1':0, 'U2':1,
151                                   'R0':0, 'R1':0, 'R2':0,
152                                   })
153         self.failUnlessEqual(match_reads['chrX.fa'], 1)
154         self.failUnlessEqual(reads, 1)
155
156         match_codes, match_reads, reads = eland._update_eland_export(multi_read)
157         self.compare_match_array(match_codes, 
158                                  {'NM':0, 'QC':0, 'RM':0,
159                                   'U0':0, 'U1':0, 'U2':1,
160                                   'R0':9, 'R1':2, 'R2':0,
161                                   })
162         self.failUnlessEqual(len(match_reads), 0)
163         self.failUnlessEqual(reads, 1)
164
165
166 def suite():
167     return unittest.makeSuite(ElandTests, 'test')
168
169 if __name__ == "__main__":
170     unittest.main(defaultTest="suite")