Change unittest2 back into unittest.
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_pair.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import os
5 import tempfile
6 import shutil
7 from unittest import TestCase
8
9 from htsworkflow.pipelines import firecrest
10 from htsworkflow.pipelines import bustard
11 from htsworkflow.pipelines import gerald
12 from htsworkflow.pipelines.eland import SampleKey
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines import ElementTree
15
16 from htsworkflow.pipelines.test.simulate_runfolder import *
17
18
19 def make_runfolder(obj=None):
20     """
21     Make a fake runfolder, attach all the directories to obj if defined
22     """
23     # make a fake runfolder directory
24     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
25
26     runfolder_dir = os.path.join(temp_dir,
27                                  '080102_HWI-EAS229_0010_207BTAAXX')
28     os.mkdir(runfolder_dir)
29
30     data_dir = os.path.join(runfolder_dir, 'Data')
31     os.mkdir(data_dir)
32
33     ipar_dir = make_firecrest_dir(data_dir, "1.9.6", 1, 152)
34
35     matrix_dir = os.path.join(ipar_dir, 'Matrix')
36     os.mkdir(matrix_dir)
37     matrix_name = os.path.join(matrix_dir, 's_matrix.txt')
38     make_matrix(matrix_name)
39
40     bustard_dir = os.path.join(ipar_dir,
41                                'Bustard1.8.28_12-04-2008_diane')
42     os.mkdir(bustard_dir)
43     make_phasing_params(bustard_dir)
44
45     gerald_dir = os.path.join(bustard_dir,
46                               'GERALD_12-04-2008_diane')
47     os.mkdir(gerald_dir)
48     make_gerald_config_100(gerald_dir)
49     make_summary_paired_htm(gerald_dir)
50     make_eland_multi(gerald_dir, paired=True)
51
52     if obj is not None:
53         obj.temp_dir = temp_dir
54         obj.runfolder_dir = runfolder_dir
55         obj.data_dir = data_dir
56         obj.image_analysis_dir = ipar_dir
57         obj.matrix_dir = matrix_dir
58         obj.bustard_dir = bustard_dir
59         obj.gerald_dir = gerald_dir
60
61
62 class RunfolderTests(TestCase):
63     """
64     Test components of the runfolder processing code
65     which includes firecrest, bustard, and gerald
66     """
67     def setUp(self):
68         # attaches all the directories to the object passed in
69         make_runfolder(self)
70
71     def tearDown(self):
72         shutil.rmtree(self.temp_dir)
73
74     def test_firecrest(self):
75         """
76         Construct a firecrest object
77         """
78         f = firecrest.firecrest(self.image_analysis_dir)
79         self.failUnlessEqual(f.software, 'Firecrest')
80         self.failUnlessEqual(f.version, '1.9.6')
81         self.failUnlessEqual(f.start, 1)
82         self.failUnlessEqual(f.stop, 152)
83         self.failUnlessEqual(f.user, 'diane')
84         # As of 2008-12-8, the date was being set in
85         # simulate_runfolder.make_firecrest_dir
86         self.failUnlessEqual(f.date, date(2008,4,12))
87
88         xml = f.get_elements()
89         # just make sure that element tree can serialize the tree
90         xml_str = ElementTree.tostring(xml)
91
92         f2 = firecrest.Firecrest(xml=xml)
93         self.failUnlessEqual(f.software, f2.software)
94         self.failUnlessEqual(f.version, f2.version)
95         self.failUnlessEqual(f.start,   f2.start)
96         self.failUnlessEqual(f.stop,    f2.stop)
97         self.failUnlessEqual(f.user,    f2.user)
98
99     def test_bustard(self):
100         """
101         construct a bustard object
102         """
103         b = bustard.bustard(self.bustard_dir)
104         self.failUnlessEqual(b.software, 'Bustard')
105         self.failUnlessEqual(b.version, '1.8.28')
106         self.failUnlessEqual(b.date,    date(2008,4,12))
107         self.failUnlessEqual(b.user,    'diane')
108         self.failUnlessEqual(len(b.phasing), 8)
109         self.failUnlessAlmostEqual(b.phasing[8].phasing, 0.0099)
110
111         xml = b.get_elements()
112         b2 = bustard.Bustard(xml=xml)
113         self.failUnlessEqual(b.software, b2.software)
114         self.failUnlessEqual(b.version, b2.version)
115         self.failUnlessEqual(b.date,    b2.date )
116         self.failUnlessEqual(b.user,    b2.user)
117         self.failUnlessEqual(len(b.phasing), len(b2.phasing))
118         for key in b.phasing.keys():
119             self.failUnlessEqual(b.phasing[key].lane,
120                                  b2.phasing[key].lane)
121             self.failUnlessEqual(b.phasing[key].phasing,
122                                  b2.phasing[key].phasing)
123             self.failUnlessEqual(b.phasing[key].prephasing,
124                                  b2.phasing[key].prephasing)
125
126     def test_gerald(self):
127         # need to update gerald and make tests for it
128         g = gerald.gerald(self.gerald_dir)
129
130         self.failUnlessEqual(g.software, 'GERALD')
131         self.failUnlessEqual(g.version, '1.171')
132         self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
133         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
134         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
135
136
137         # list of genomes, matches what was defined up in
138         # make_gerald_config.
139         # the first None is to offset the genomes list to be 1..9
140         # instead of pythons default 0..8
141         genomes = [None,
142                    '/g/mm9',
143                    '/g/mm9',
144                    '/g/elegans190',
145                    '/g/arabidopsis01222004',
146                    '/g/mm9',
147                    '/g/mm9',
148                    '/g/mm9',
149                    '/g/mm9', ]
150
151         # test lane specific parameters from gerald config file
152         for i in range(1,9):
153             cur_lane = g.lanes[i]
154             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
155             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
156             self.failUnlessEqual(cur_lane.read_length, '37')
157             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
158
159         # I want to be able to use a simple iterator
160         for l in g.lanes.values():
161           self.failUnlessEqual(l.analysis, 'eland_extended')
162           self.failUnlessEqual(l.read_length, '37')
163           self.failUnlessEqual(l.use_bases, 'Y'*37)
164
165         # test data extracted from summary file
166         clusters = [[None,
167                     (103646, 4515), (106678, 4652),
168                     (84583, 5963), (68813, 4782),
169                     (104854, 4664), (43555, 1632),
170                     (54265, 1588), (64363, 2697),],
171                     [None,
172                     (103647, 4516), (106679, 4653),
173                     (84584, 5964), (68814, 4783),
174                     (104855, 4665), (43556, 1633),
175                     (54266, 1589), (64364, 2698),],]
176
177         for end in [0,1]:
178             for lane in range(1,9):
179                 summary_lane = g.summary[end][lane]
180                 self.failUnlessEqual(summary_lane.cluster, clusters[end][lane])
181                 self.failUnlessEqual(summary_lane.lane, lane)
182
183         xml = g.get_elements()
184         # just make sure that element tree can serialize the tree
185         xml_str = ElementTree.tostring(xml)
186         g2 = gerald.Gerald(xml=xml)
187
188         # do it all again after extracting from the xml file
189         self.failUnlessEqual(g.software, g2.software)
190         self.failUnlessEqual(g.version, g2.version)
191         self.failUnlessEqual(g.date, g2.date)
192         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
193         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
194
195         # test lane specific parameters from gerald config file
196         for i in range(1,9):
197             g_lane = g.lanes[i]
198             g2_lane = g2.lanes[i]
199             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
200             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
201             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
202             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
203
204         # test (some) summary elements
205         for end in [0,1]:
206             for i in range(1,9):
207                 g_summary = g.summary[end][i]
208                 g2_summary = g2.summary[end][i]
209                 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
210                 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
211
212                 g_eland = g.eland_results
213                 g2_eland = g2.eland_results
214                 for key in g_eland:
215                     g_results = g_eland[key]
216                     g2_results = g2_eland[key]
217                     self.failUnlessEqual(g_results.reads,
218                                          g2_results.reads)
219                     self.failUnlessEqual(len(g_results.mapped_reads),
220                                          len(g2_results.mapped_reads))
221                     for k in g_results.mapped_reads.keys():
222                         self.failUnlessEqual(g_results.mapped_reads[k],
223                                              g2_results.mapped_reads[k])
224
225                     self.failUnlessEqual(len(g_results.match_codes),
226                                          len(g2_results.match_codes))
227                     for k in g_results.match_codes.keys():
228                         self.failUnlessEqual(g_results.match_codes[k],
229                                              g2_results.match_codes[k])
230
231
232     def test_eland(self):
233         hg_map = {'Lambda.fa': 'Lambda.fa'}
234         for i in range(1,22):
235           short_name = 'chr%d.fa' % (i,)
236           long_name = 'hg18/chr%d.fa' % (i,)
237           hg_map[short_name] = long_name
238
239         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
240                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
241         eland = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
242
243         # check first end
244         for key in eland.find_keys(SampleKey(read=1)):
245             lane = eland[key]
246             self.failUnlessEqual(lane.reads, 6)
247             self.failUnlessEqual(lane.sample_name, "s")
248             self.failUnlessEqual(lane.lane_id, key.lane)
249             self.failUnlessEqual(len(lane.mapped_reads), 17)
250             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
251             self.failUnlessEqual(lane.match_codes['U0'], 3)
252             self.failUnlessEqual(lane.match_codes['R0'], 2)
253             self.failUnlessEqual(lane.match_codes['U1'], 1)
254             self.failUnlessEqual(lane.match_codes['R1'], 9)
255             self.failUnlessEqual(lane.match_codes['U2'], 0)
256             self.failUnlessEqual(lane.match_codes['R2'], 12)
257             self.failUnlessEqual(lane.match_codes['NM'], 1)
258             self.failUnlessEqual(lane.match_codes['QC'], 0)
259
260         # check second end
261         for key in eland.find_keys(SampleKey(read=2)):
262             lane = eland[key]
263             self.failUnlessEqual(lane.reads, 7)
264             self.failUnlessEqual(lane.sample_name, "s")
265             self.failUnlessEqual(lane.lane_id, key.lane)
266             self.failUnlessEqual(len(lane.mapped_reads), 17)
267             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
268             self.failUnlessEqual(lane.match_codes['U0'], 3)
269             self.failUnlessEqual(lane.match_codes['R0'], 2)
270             self.failUnlessEqual(lane.match_codes['U1'], 1)
271             self.failUnlessEqual(lane.match_codes['R1'], 9)
272             self.failUnlessEqual(lane.match_codes['U2'], 0)
273             self.failUnlessEqual(lane.match_codes['R2'], 12)
274             self.failUnlessEqual(lane.match_codes['NM'], 1)
275             self.failUnlessEqual(lane.match_codes['QC'], 1)
276
277         xml = eland.get_elements()
278         # just make sure that element tree can serialize the tree
279         xml_str = ElementTree.tostring(xml)
280         e2 = gerald.ELAND(xml=xml)
281
282         for key in eland:
283             l1 = eland[key]
284             l2 = e2[key]
285             self.failUnlessEqual(l1.reads, l2.reads)
286             self.failUnlessEqual(l1.sample_name, l2.sample_name)
287             self.failUnlessEqual(l1.lane_id, l2.lane_id)
288             self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
289             self.failUnlessEqual(len(l1.mapped_reads), 17)
290             for k in l1.mapped_reads.keys():
291                 self.failUnlessEqual(l1.mapped_reads[k],
292                                      l2.mapped_reads[k])
293
294             self.failUnlessEqual(len(l1.match_codes), 9)
295             self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
296             for k in l1.match_codes.keys():
297                 self.failUnlessEqual(l1.match_codes[k],
298                                      l2.match_codes[k])
299
300     def test_runfolder(self):
301         runs = runfolder.get_runs(self.runfolder_dir)
302
303         # do we get the flowcell id from the filename?
304         self.failUnlessEqual(len(runs), 1)
305         # firecrest's date depends on filename not the create time.
306         name = 'run_207BTAAXX_2009-02-22.xml'
307         self.failUnlessEqual(runs[0].serialization_filename, name)
308
309         # do we get the flowcell id from the FlowcellId.xml file
310         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
311         runs = runfolder.get_runs(self.runfolder_dir)
312         self.failUnlessEqual(len(runs), 1)
313         name = 'run_207BTAAXY_2009-02-22.xml'
314         self.failUnlessEqual(runs[0].serialization_filename, name)
315
316         r1 = runs[0]
317         xml = r1.get_elements()
318         xml_str = ElementTree.tostring(xml)
319
320         r2 = runfolder.PipelineRun(xml=xml)
321         self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
322         self.failIfEqual(r2.image_analysis, None)
323         self.failIfEqual(r2.bustard, None)
324         self.failIfEqual(r2.gerald, None)
325
326
327 def suite():
328     from unittest import TestSuite, defaultTestLoader
329     suite = TestSuite()
330     suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
331     return suite
332
333
334 if __name__ == "__main__":
335     from unittest import main
336     main(defaultTest="suite")