Merge branch 'master' into debianized
[htsworkflow.git] / test / test_srf2fastq.py
1 import os
2 from StringIO import StringIO
3 import sys
4 import unittest
5
6 _module_path, _module_name = os.path.split(__file__)
7 sys.path.append(os.path.join(_module_path, '..', 'scripts'))
8
9 from htsworkflow.pipelines import srf2fastq
10
11 class testSrf2Fastq(unittest.TestCase):
12     def test_split_good(self):
13         source = StringIO("""@header
14 AGCTTTTT
15 +
16 IIIIB+++
17 """)
18         target1 = StringIO()
19         target2 = StringIO()
20
21         srf2fastq.convert_single_to_two_fastq(source, target1, target2)
22
23         target1.seek(0)
24         lines1 = target1.readlines()
25         self.failUnlessEqual(len(lines1),4)
26         self.failUnlessEqual(lines1[0].rstrip(), '@header/1')
27         self.failUnlessEqual(lines1[1].rstrip(), 'AGCT')
28         self.failUnlessEqual(lines1[2].rstrip(), '+')
29         self.failUnlessEqual(lines1[3].rstrip(), 'IIII')
30
31         target2.seek(0)
32         lines2 = target2.readlines()
33         self.failUnlessEqual(len(lines2),4)
34         self.failUnlessEqual(lines2[0].rstrip(), '@header/2')
35         self.failUnlessEqual(lines2[1].rstrip(), 'TTTT')
36         self.failUnlessEqual(lines2[2].rstrip(), '+')
37         self.failUnlessEqual(lines2[3].rstrip(), 'B+++')
38
39     def test_split_at_with_header(self):
40         source = StringIO("""@header1
41 AGCTTTTT
42 +
43 @IIIB+++
44 @header2
45 AGCTTTTT
46 +
47 IIIIB+++
48 """)
49         target1 = StringIO()
50         target2 = StringIO()
51
52         srf2fastq.convert_single_to_two_fastq(source, target1, target2, header="foo_")
53
54         target1.seek(0)
55         lines1 = target1.readlines()
56         self.failUnlessEqual(len(lines1),8)
57         self.failUnlessEqual(lines1[0].rstrip(), '@foo_header1/1')
58         self.failUnlessEqual(lines1[1].rstrip(), 'AGCT')
59         self.failUnlessEqual(lines1[2].rstrip(), '+')
60         self.failUnlessEqual(lines1[3].rstrip(), '@III')
61
62         target2.seek(0)
63         lines2 = target2.readlines()
64         self.failUnlessEqual(len(lines2),8)
65         self.failUnlessEqual(lines2[0].rstrip(), '@foo_header1/2')
66         self.failUnlessEqual(lines2[1].rstrip(), 'TTTT')
67         self.failUnlessEqual(lines2[2].rstrip(), '+')
68         self.failUnlessEqual(lines2[3].rstrip(), 'B+++')
69
70     def test_single_at(self):
71         source = StringIO("""@header1
72 AGCTTTTT
73 +
74 @IIIB+++
75 @header2
76 AGCTTTTT
77 +
78 IIIIB+++
79 """)
80         target1 = StringIO()
81
82         srf2fastq.convert_single_to_fastq(source, target1)
83
84         target1.seek(0)
85         lines1 = target1.readlines()
86         self.failUnlessEqual(len(lines1),8)
87         self.failUnlessEqual(lines1[0].rstrip(), '@header1')
88         self.failUnlessEqual(lines1[1].rstrip(), 'AGCTTTTT')
89         self.failUnlessEqual(lines1[2].rstrip(), '+')
90         self.failUnlessEqual(lines1[3].rstrip(), '@IIIB+++')
91
92     def test_single_at_with_header(self):
93         source = StringIO("""@header1
94 AGCTTTTT
95 +
96 @IIIB+++
97 @header2
98 AGCTTTTT
99 +
100 IIIIB+++
101 """)
102         target1 = StringIO()
103
104         srf2fastq.convert_single_to_fastq(source, target1, header="foo_")
105
106         target1.seek(0)
107         lines1 = target1.readlines()
108         self.failUnlessEqual(len(lines1),8)
109         self.failUnlessEqual(lines1[0].rstrip(), '@foo_header1')
110         self.failUnlessEqual(lines1[1].rstrip(), 'AGCTTTTT')
111         self.failUnlessEqual(lines1[2].rstrip(), '+')
112         self.failUnlessEqual(lines1[3].rstrip(), '@IIIB+++')
113
114     def test_is_srf(self):        
115         cnf4_srf = 'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'
116         cnf4_path = os.path.join(_module_path, cnf4_srf)
117         cnf1_srf = 'woldlab_090512_HWI-EAS229_0114_428NNAAXX_5.srf'
118         cnf1_path = os.path.join(_module_path, cnf1_srf)
119         
120         is_srf = srf2fastq.is_srf
121         self.failUnlessEqual(is_srf(__file__), False)
122         self.failUnlessEqual(is_srf(cnf4_path), True)
123         self.failUnlessEqual(is_srf(cnf1_path), True)
124
125     def test_is_cnf1(self):        
126         cnf4_srf = 'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'
127         cnf4_path = os.path.join(_module_path, cnf4_srf)
128         cnf1_srf = 'woldlab_090512_HWI-EAS229_0114_428NNAAXX_5.srf'
129         cnf1_path = os.path.join(_module_path, cnf1_srf)
130         
131         is_cnf1 = srf2fastq.is_cnf1
132         self.failUnlessRaises(ValueError, is_cnf1, __file__)
133         self.failUnlessEqual(is_cnf1(cnf4_path), False)
134         self.failUnlessEqual(is_cnf1(cnf1_path), True)
135
136 def suite():
137     return unittest.makeSuite(testSrf2Fastq,'test')
138
139 if __name__ == "__main__":
140     unittest.main(defaultTest="suite")