take advantage of absolute_import to simplify import statements
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_pair.py
1 #!/usr/bin/env python
2 from __future__ import absolute_import
3
4 from datetime import datetime, date
5 import os
6 import tempfile
7 import shutil
8 from unittest import TestCase
9
10 from htsworkflow.pipelines import firecrest
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines.eland import SampleKey
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines import ElementTree
16
17 from .simulate_runfolder import *
18
19
20 def make_runfolder(obj=None):
21     """
22     Make a fake runfolder, attach all the directories to obj if defined
23     """
24     # make a fake runfolder directory
25     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
26
27     runfolder_dir = os.path.join(temp_dir,
28                                  '080102_HWI-EAS229_0010_207BTAAXX')
29     os.mkdir(runfolder_dir)
30
31     data_dir = os.path.join(runfolder_dir, 'Data')
32     os.mkdir(data_dir)
33
34     ipar_dir = make_firecrest_dir(data_dir, "1.9.6", 1, 152)
35
36     matrix_dir = os.path.join(ipar_dir, 'Matrix')
37     os.mkdir(matrix_dir)
38     matrix_name = os.path.join(matrix_dir, 's_matrix.txt')
39     make_matrix(matrix_name)
40
41     bustard_dir = os.path.join(ipar_dir,
42                                'Bustard1.8.28_12-04-2008_diane')
43     os.mkdir(bustard_dir)
44     make_phasing_params(bustard_dir)
45
46     gerald_dir = os.path.join(bustard_dir,
47                               'GERALD_12-04-2008_diane')
48     os.mkdir(gerald_dir)
49     make_gerald_config_100(gerald_dir)
50     make_summary_paired_htm(gerald_dir)
51     make_eland_multi(gerald_dir, paired=True)
52
53     if obj is not None:
54         obj.temp_dir = temp_dir
55         obj.runfolder_dir = runfolder_dir
56         obj.data_dir = data_dir
57         obj.image_analysis_dir = ipar_dir
58         obj.matrix_dir = matrix_dir
59         obj.bustard_dir = bustard_dir
60         obj.gerald_dir = gerald_dir
61
62
63 class RunfolderTests(TestCase):
64     """
65     Test components of the runfolder processing code
66     which includes firecrest, bustard, and gerald
67     """
68     def setUp(self):
69         # attaches all the directories to the object passed in
70         make_runfolder(self)
71
72     def tearDown(self):
73         shutil.rmtree(self.temp_dir)
74
75     def test_firecrest(self):
76         """
77         Construct a firecrest object
78         """
79         f = firecrest.firecrest(self.image_analysis_dir)
80         self.failUnlessEqual(f.software, 'Firecrest')
81         self.failUnlessEqual(f.version, '1.9.6')
82         self.failUnlessEqual(f.start, 1)
83         self.failUnlessEqual(f.stop, 152)
84         self.failUnlessEqual(f.user, 'diane')
85         # As of 2008-12-8, the date was being set in
86         # simulate_runfolder.make_firecrest_dir
87         self.failUnlessEqual(f.date, date(2008,4,12))
88
89         xml = f.get_elements()
90         # just make sure that element tree can serialize the tree
91         xml_str = ElementTree.tostring(xml)
92
93         f2 = firecrest.Firecrest(xml=xml)
94         self.failUnlessEqual(f.software, f2.software)
95         self.failUnlessEqual(f.version, f2.version)
96         self.failUnlessEqual(f.start,   f2.start)
97         self.failUnlessEqual(f.stop,    f2.stop)
98         self.failUnlessEqual(f.user,    f2.user)
99
100     def test_bustard(self):
101         """
102         construct a bustard object
103         """
104         b = bustard.bustard(self.bustard_dir)
105         self.failUnlessEqual(b.software, 'Bustard')
106         self.failUnlessEqual(b.version, '1.8.28')
107         self.failUnlessEqual(b.date,    date(2008,4,12))
108         self.failUnlessEqual(b.user,    'diane')
109         self.failUnlessEqual(len(b.phasing), 8)
110         self.failUnlessAlmostEqual(b.phasing[8].phasing, 0.0099)
111
112         xml = b.get_elements()
113         b2 = bustard.Bustard(xml=xml)
114         self.failUnlessEqual(b.software, b2.software)
115         self.failUnlessEqual(b.version, b2.version)
116         self.failUnlessEqual(b.date,    b2.date )
117         self.failUnlessEqual(b.user,    b2.user)
118         self.failUnlessEqual(len(b.phasing), len(b2.phasing))
119         for key in b.phasing.keys():
120             self.failUnlessEqual(b.phasing[key].lane,
121                                  b2.phasing[key].lane)
122             self.failUnlessEqual(b.phasing[key].phasing,
123                                  b2.phasing[key].phasing)
124             self.failUnlessEqual(b.phasing[key].prephasing,
125                                  b2.phasing[key].prephasing)
126
127     def test_gerald(self):
128         # need to update gerald and make tests for it
129         g = gerald.gerald(self.gerald_dir)
130
131         self.failUnlessEqual(g.software, 'GERALD')
132         self.failUnlessEqual(g.version, '1.171')
133         self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
134         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
135         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
136
137
138         # list of genomes, matches what was defined up in
139         # make_gerald_config.
140         # the first None is to offset the genomes list to be 1..9
141         # instead of pythons default 0..8
142         genomes = [None,
143                    '/g/mm9',
144                    '/g/mm9',
145                    '/g/elegans190',
146                    '/g/arabidopsis01222004',
147                    '/g/mm9',
148                    '/g/mm9',
149                    '/g/mm9',
150                    '/g/mm9', ]
151
152         # test lane specific parameters from gerald config file
153         for i in range(1,9):
154             cur_lane = g.lanes[i]
155             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
156             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
157             self.failUnlessEqual(cur_lane.read_length, '37')
158             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
159
160         # I want to be able to use a simple iterator
161         for l in g.lanes.values():
162           self.failUnlessEqual(l.analysis, 'eland_extended')
163           self.failUnlessEqual(l.read_length, '37')
164           self.failUnlessEqual(l.use_bases, 'Y'*37)
165
166         # test data extracted from summary file
167         clusters = [[None,
168                     (103646, 4515), (106678, 4652),
169                     (84583, 5963), (68813, 4782),
170                     (104854, 4664), (43555, 1632),
171                     (54265, 1588), (64363, 2697),],
172                     [None,
173                     (103647, 4516), (106679, 4653),
174                     (84584, 5964), (68814, 4783),
175                     (104855, 4665), (43556, 1633),
176                     (54266, 1589), (64364, 2698),],]
177
178         for end in [0,1]:
179             for lane in range(1,9):
180                 summary_lane = g.summary[end][lane]
181                 self.failUnlessEqual(summary_lane.cluster, clusters[end][lane])
182                 self.failUnlessEqual(summary_lane.lane, lane)
183
184         xml = g.get_elements()
185         # just make sure that element tree can serialize the tree
186         xml_str = ElementTree.tostring(xml)
187         g2 = gerald.Gerald(xml=xml)
188
189         # do it all again after extracting from the xml file
190         self.failUnlessEqual(g.software, g2.software)
191         self.failUnlessEqual(g.version, g2.version)
192         self.failUnlessEqual(g.date, g2.date)
193         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
194         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
195
196         # test lane specific parameters from gerald config file
197         for i in range(1,9):
198             g_lane = g.lanes[i]
199             g2_lane = g2.lanes[i]
200             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
201             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
202             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
203             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
204
205         # test (some) summary elements
206         for end in [0,1]:
207             for i in range(1,9):
208                 g_summary = g.summary[end][i]
209                 g2_summary = g2.summary[end][i]
210                 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
211                 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
212
213                 g_eland = g.eland_results
214                 g2_eland = g2.eland_results
215                 for key in g_eland:
216                     g_results = g_eland[key]
217                     g2_results = g2_eland[key]
218                     self.failUnlessEqual(g_results.reads,
219                                          g2_results.reads)
220                     self.failUnlessEqual(len(g_results.mapped_reads),
221                                          len(g2_results.mapped_reads))
222                     for k in g_results.mapped_reads.keys():
223                         self.failUnlessEqual(g_results.mapped_reads[k],
224                                              g2_results.mapped_reads[k])
225
226                     self.failUnlessEqual(len(g_results.match_codes),
227                                          len(g2_results.match_codes))
228                     for k in g_results.match_codes.keys():
229                         self.failUnlessEqual(g_results.match_codes[k],
230                                              g2_results.match_codes[k])
231
232
233     def test_eland(self):
234         hg_map = {'Lambda.fa': 'Lambda.fa'}
235         for i in range(1,22):
236           short_name = 'chr%d.fa' % (i,)
237           long_name = 'hg18/chr%d.fa' % (i,)
238           hg_map[short_name] = long_name
239
240         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
241                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
242         eland = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
243
244         # check first end
245         for key in eland.find_keys(SampleKey(read=1)):
246             lane = eland[key]
247             self.failUnlessEqual(lane.reads, 6)
248             self.failUnlessEqual(lane.sample_name, "s")
249             self.failUnlessEqual(lane.lane_id, key.lane)
250             self.failUnlessEqual(len(lane.mapped_reads), 17)
251             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
252             self.failUnlessEqual(lane.match_codes['U0'], 3)
253             self.failUnlessEqual(lane.match_codes['R0'], 2)
254             self.failUnlessEqual(lane.match_codes['U1'], 1)
255             self.failUnlessEqual(lane.match_codes['R1'], 9)
256             self.failUnlessEqual(lane.match_codes['U2'], 0)
257             self.failUnlessEqual(lane.match_codes['R2'], 12)
258             self.failUnlessEqual(lane.match_codes['NM'], 1)
259             self.failUnlessEqual(lane.match_codes['QC'], 0)
260
261         # check second end
262         for key in eland.find_keys(SampleKey(read=2)):
263             lane = eland[key]
264             self.failUnlessEqual(lane.reads, 7)
265             self.failUnlessEqual(lane.sample_name, "s")
266             self.failUnlessEqual(lane.lane_id, key.lane)
267             self.failUnlessEqual(len(lane.mapped_reads), 17)
268             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
269             self.failUnlessEqual(lane.match_codes['U0'], 3)
270             self.failUnlessEqual(lane.match_codes['R0'], 2)
271             self.failUnlessEqual(lane.match_codes['U1'], 1)
272             self.failUnlessEqual(lane.match_codes['R1'], 9)
273             self.failUnlessEqual(lane.match_codes['U2'], 0)
274             self.failUnlessEqual(lane.match_codes['R2'], 12)
275             self.failUnlessEqual(lane.match_codes['NM'], 1)
276             self.failUnlessEqual(lane.match_codes['QC'], 1)
277
278         xml = eland.get_elements()
279         # just make sure that element tree can serialize the tree
280         xml_str = ElementTree.tostring(xml)
281         e2 = gerald.ELAND(xml=xml)
282
283         for key in eland:
284             l1 = eland[key]
285             l2 = e2[key]
286             self.failUnlessEqual(l1.reads, l2.reads)
287             self.failUnlessEqual(l1.sample_name, l2.sample_name)
288             self.failUnlessEqual(l1.lane_id, l2.lane_id)
289             self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
290             self.failUnlessEqual(len(l1.mapped_reads), 17)
291             for k in l1.mapped_reads.keys():
292                 self.failUnlessEqual(l1.mapped_reads[k],
293                                      l2.mapped_reads[k])
294
295             self.failUnlessEqual(len(l1.match_codes), 9)
296             self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
297             for k in l1.match_codes.keys():
298                 self.failUnlessEqual(l1.match_codes[k],
299                                      l2.match_codes[k])
300
301     def test_runfolder(self):
302         runs = runfolder.get_runs(self.runfolder_dir)
303
304         # do we get the flowcell id from the filename?
305         self.failUnlessEqual(len(runs), 1)
306         # firecrest's date depends on filename not the create time.
307         name = 'run_207BTAAXX_2009-02-22.xml'
308         self.failUnlessEqual(runs[0].serialization_filename, name)
309
310         # do we get the flowcell id from the FlowcellId.xml file
311         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
312         runs = runfolder.get_runs(self.runfolder_dir)
313         self.failUnlessEqual(len(runs), 1)
314         name = 'run_207BTAAXY_2009-02-22.xml'
315         self.failUnlessEqual(runs[0].serialization_filename, name)
316
317         r1 = runs[0]
318         xml = r1.get_elements()
319         xml_str = ElementTree.tostring(xml)
320
321         r2 = runfolder.PipelineRun(xml=xml)
322         self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
323         self.failIfEqual(r2.image_analysis, None)
324         self.failIfEqual(r2.bustard, None)
325         self.failIfEqual(r2.gerald, None)
326
327
328 def suite():
329     from unittest import TestSuite, defaultTestLoader
330     suite = TestSuite()
331     suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
332     return suite
333
334
335 if __name__ == "__main__":
336     from unittest import main
337     main(defaultTest="suite")