Change unittest2 back into unittest.
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta1_12.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import logging
5 import os
6 import tempfile
7 import shutil
8 from unittest import TestCase
9
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines.samplekey import SampleKey
12 from htsworkflow.pipelines import ipar
13 from htsworkflow.pipelines import bustard
14 from htsworkflow.pipelines import gerald
15 from htsworkflow.pipelines import runfolder
16 from htsworkflow.pipelines import ElementTree
17
18 from htsworkflow.pipelines.test.simulate_runfolder import *
19
20
21 def make_runfolder(obj=None):
22     """
23     Make a fake runfolder, attach all the directories to obj if defined
24     """
25     # make a fake runfolder directory
26     flowcell_id = 'D07K6ACXX'
27     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
28
29     runfolder_dir = os.path.join(
30         temp_dir,
31         '110815_SN787_0101_A{0}'.format(flowcell_id))
32     os.mkdir(runfolder_dir)
33
34     make_runinfo(runfolder_dir, flowcell_id)
35
36     data_dir = os.path.join(runfolder_dir, 'Data')
37     os.mkdir(data_dir)
38
39     intensities_dir = make_rta_intensities_1_12(data_dir)
40     make_status_rta1_12(data_dir)
41
42     basecalls_dir = make_rta_basecalls_1_12(intensities_dir)
43     make_matrix_dir_rta_1_12(basecalls_dir)
44
45     unaligned_dir = os.path.join(runfolder_dir, "Unaligned")
46     os.mkdir(unaligned_dir)
47     make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
48     make_unaligned_config_1_12(unaligned_dir)
49     make_unaligned_status_1_12(unaligned_dir, flowcell_id)
50
51     aligned_dir = os.path.join(runfolder_dir, "Aligned")
52     os.mkdir(aligned_dir)
53     make_aligned_eland_export(aligned_dir, flowcell_id)
54     make_aligned_config_1_12(aligned_dir)
55
56     if obj is not None:
57         obj.flowcell_id = flowcell_id
58         obj.temp_dir = temp_dir
59         obj.runfolder_dir = runfolder_dir
60         obj.data_dir = data_dir
61         obj.image_analysis_dir = intensities_dir
62         obj.bustard_dir = unaligned_dir
63         obj.gerald_dir = aligned_dir
64         obj.reads = 2
65
66
67 class RunfolderTests(TestCase):
68     """
69     Test components of the runfolder processing code
70     which includes firecrest, bustard, and gerald
71     """
72     def setUp(self):
73         # attaches all the directories to the object passed in
74         make_runfolder(self)
75
76     def tearDown(self):
77         shutil.rmtree(self.temp_dir)
78
79     def test_bustard(self):
80         """Construct a bustard object"""
81         b = bustard.bustard(self.bustard_dir)
82         self.failUnlessEqual(b.software, 'RTA')
83         self.failUnlessEqual(b.version, '1.12.4.2')
84         self.failUnlessEqual(b.date,    None)
85         self.failUnlessEqual(b.user,    None)
86         self.failUnlessEqual(len(b.phasing), 0)
87
88         xml = b.get_elements()
89         b2 = bustard.Bustard(xml=xml)
90         self.failUnlessEqual(b.software, b2.software)
91         self.failUnlessEqual(b.version,  b2.version)
92         self.failUnlessEqual(b.date,     b2.date )
93         self.failUnlessEqual(b.user,     b2.user)
94
95     def test_gerald(self):
96         # need to update gerald and make tests for it
97         g = gerald.gerald(self.gerald_dir)
98
99         self.failUnlessEqual(g.software, 'CASAVA')
100         self.failUnlessEqual(g.version, '1.8.1')
101         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
102         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
103
104         # list of genomes, matches what was defined up in
105         # make_gerald_config.
106         # the first None is to offset the genomes list to be 1..9
107         # instead of pythons default 0..8
108         # test lane specific parameters from gerald config file
109
110         undetermined = g.lanes[SampleKey(sample='Undetermined_indices')]
111         self.failUnlessEqual(undetermined.analysis, 'none')
112         self.failUnlessEqual(undetermined.read_length, None)
113         self.failUnlessEqual(undetermined.use_bases, None)
114
115         project = g.lanes[SampleKey(sample='11115')]
116         self.failUnlessEqual(project.analysis, 'eland_extended')
117         self.failUnlessEqual(project.eland_genome, '/g/hg18/chromosomes/')
118         self.failUnlessEqual(project.read_length, '49')
119         self.failUnlessEqual(project.use_bases, 'y'*49+'n')
120
121         # test data extracted from summary file
122         clusters = [None,
123                     (3878755,  579626.0), (3920639, 1027332.4),
124                     (5713049,  876187.3), (5852907,  538640.6),
125                     (4006751, 1265247.4), (5678021,  627070.7),
126                     (1854131,  429053.2), (4777517,  592904.0),
127                    ]
128
129         self.failUnlessEqual(len(g.summary), self.reads)
130         for i in range(1,9):
131             summary_lane = g.summary[0][i]
132             self.failUnlessEqual(summary_lane.cluster, clusters[i])
133             self.failUnlessEqual(summary_lane.lane, i)
134
135         xml = g.get_elements()
136         # just make sure that element tree can serialize the tree
137         xml_str = ElementTree.tostring(xml)
138         g2 = gerald.CASAVA(xml=xml)
139
140         # do it all again after extracting from the xml file
141         self.failUnlessEqual(g.software, g2.software)
142         self.failUnlessEqual(g.version, g2.version)
143         self.failUnlessEqual(g.date, g2.date)
144         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
145         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
146
147         # test lane specific parameters from gerald config file
148         for i in g.lanes.keys():
149             g_lane = g.lanes[i]
150             g2_lane = g2.lanes[i]
151             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
152             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
153             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
154             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
155
156         # test (some) summary elements
157         self.failUnlessEqual(len(g.summary), self.reads)
158         for i in range(1,9):
159             g_summary = g.summary[0][i]
160             g2_summary = g2.summary[0][i]
161             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
162             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
163
164             g_eland = g.eland_results
165             g2_eland = g2.eland_results
166             for key in g_eland:
167                 g_results = g_eland[key]
168                 g2_results = g2_eland[key]
169                 self.failUnlessEqual(g_results.reads,
170                                      g2_results.reads)
171                 if isinstance(g_results, eland.ElandLane):
172                   self.failUnlessEqual(len(g_results.mapped_reads),
173                                        len(g2_results.mapped_reads))
174                   for k in g_results.mapped_reads.keys():
175                       self.failUnlessEqual(g_results.mapped_reads[k],
176                                            g2_results.mapped_reads[k])
177
178                   self.failUnlessEqual(len(g_results.match_codes),
179                                        len(g2_results.match_codes))
180                   for k in g_results.match_codes.keys():
181                       self.failUnlessEqual(g_results.match_codes[k],
182                                            g2_results.match_codes[k])
183
184
185     def test_eland(self):
186         hg_map = {'Lambda.fa': 'Lambda.fa'}
187         for i in range(1,22):
188           short_name = 'chr%d.fa' % (i,)
189           long_name = 'hg18/chr%d.fa' % (i,)
190           hg_map[short_name] = long_name
191
192         samples = set(('11111', '11112', '11113', '11114', '11115',
193                        '11116', '11117', '11118', '11119', '11120'))
194         genome_maps = {}
195         for i in range(1,9):
196             genome_maps[i] = hg_map
197
198         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
199
200         for lane in eland_container.values():
201             # I added sequence lanes to the last 2 lanes of this test case
202             if lane.sample_name == '11113':
203                 self.assertEqual(lane.reads, 24)
204                 self.assertEqual(lane.mapped_reads['hg18/chr9.fa'], 6)
205                 self.assertEqual(lane.match_codes['U0'], 6)
206                 self.assertEqual(lane.match_codes['R0'], 18)
207                 self.assertEqual(lane.match_codes['R1'], 24)
208                 self.assertEqual(lane.match_codes['R2'], 18)
209                 self.assertEqual(lane.match_codes['NM'], 12)
210             else:
211                 self.assertEqual(lane.reads, 8)
212                 self.assertEqual(lane.mapped_reads['hg18/chr9.fa'], 2)
213                 self.assertEqual(lane.match_codes['U0'], 2)
214                 self.assertEqual(lane.match_codes['R0'], 6)
215                 self.assertEqual(lane.match_codes['R1'], 8)
216                 self.assertEqual(lane.match_codes['R2'], 6)
217                 self.assertEqual(lane.match_codes['NM'], 4)
218
219             self.assertTrue(lane.sample_name in samples)
220             #self.assertEqual(lane.lane_id, 1)
221             self.assertEqual(len(lane.mapped_reads), 1)
222             self.assertEqual(lane.match_codes['U1'], 0)
223             self.assertEqual(lane.match_codes['U2'], 0)
224             self.assertEqual(lane.match_codes['QC'], 0)
225
226         xml = eland_container.get_elements()
227         # just make sure that element tree can serialize the tree
228         xml_str = ElementTree.tostring(xml)
229         e2 = gerald.ELAND(xml=xml)
230
231         for key in eland_container.results:
232             l1 = eland_container.results[key]
233             l2 = e2.results[key]
234             self.failUnlessEqual(l1.reads, l2.reads)
235             self.failUnlessEqual(l1.sample_name, l2.sample_name)
236             self.failUnlessEqual(l1.lane_id, l2.lane_id)
237             if isinstance(l1, eland.ElandLane):
238               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
239               self.failUnlessEqual(len(l1.mapped_reads), 1)
240               for k in l1.mapped_reads.keys():
241                   self.failUnlessEqual(l1.mapped_reads[k],
242                                        l2.mapped_reads[k])
243
244               self.failUnlessEqual(len(l1.match_codes), 9)
245               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
246               for k in l1.match_codes.keys():
247                   self.failUnlessEqual(l1.match_codes[k],
248                                        l2.match_codes[k])
249             elif isinstance(l1, eland.SequenceLane):
250                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
251
252     def test_runfolder(self):
253         runs = runfolder.get_runs(self.runfolder_dir)
254
255         # do we get the flowcell id from the filename?
256         self.failUnlessEqual(len(runs), 1)
257         self.assertEqual(runs[0].flowcell_id, self.flowcell_id)
258         name = 'run_%s_%s.xml' % ( self.flowcell_id,
259                                    date.today().strftime('%Y-%m-%d'),)
260         self.failUnlessEqual(runs[0].serialization_filename, name)
261
262         bustard_dir = os.path.join(self.runfolder_dir, 'Unaligned')
263         r1 = runs[0]
264         self.failUnlessEqual(r1.bustard.sequence_format, 'fastq')
265         self.failUnlessEqual(r1.bustard.pathname, bustard_dir)
266         self.failUnlessEqual(r1.gerald.runfolder_name, 'Unaligned')
267
268         xml = r1.get_elements()
269         xml_str = ElementTree.tostring(xml)
270
271         r2 = runfolder.PipelineRun(xml=xml)
272         self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
273         self.failIfEqual(r2.image_analysis, None)
274         self.failIfEqual(r2.bustard, None)
275         self.failIfEqual(r2.gerald, None)
276
277 def suite():
278     from unittest import TestSuite, defaultTestLoader
279     suite = TestSuite()
280     suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
281     return suite
282
283
284 if __name__ == "__main__":
285     from unittest import main
286     main(defaultTest="suite")