Update test to be more robust to changing dictionary order.
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_ipar130.py
1 #!/usr/bin/env python
2 from __future__ import absolute_import
3
4 from datetime import datetime, date
5 import os
6 import tempfile
7 import shutil
8 from unittest import TestCase
9
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines import ipar
12 from htsworkflow.pipelines import bustard
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines import ElementTree
16
17 from .simulate_runfolder import *
18
19
20 def make_runfolder(obj=None):
21     """
22     Make a fake runfolder, attach all the directories to obj if defined
23     """
24     # make a fake runfolder directory
25     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
26
27     runfolder_dir = os.path.join(temp_dir,
28                                  '090313_HWI-EAS229_0101_3021JAAXX')
29     os.mkdir(runfolder_dir)
30
31     data_dir = os.path.join(runfolder_dir, 'Data')
32     os.mkdir(data_dir)
33
34     ipar_dir = make_ipar_dir(data_dir, '1.30')
35
36     bustard_dir = os.path.join(ipar_dir,
37                                'Bustard1.3.2_15-03-2008_diane')
38     os.mkdir(bustard_dir)
39     make_phasing_params(bustard_dir)
40     make_bustard_config132(bustard_dir)
41
42     gerald_dir = os.path.join(bustard_dir,
43                               'GERALD_15-03-2008_diane')
44     os.mkdir(gerald_dir)
45     make_gerald_config_100(gerald_dir)
46     make_summary_ipar130_htm(gerald_dir)
47     make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
48     make_scarf(gerald_dir, lane_list=[7,])
49     make_fastq(gerald_dir, lane_list=[8,])
50
51     if obj is not None:
52         obj.temp_dir = temp_dir
53         obj.runfolder_dir = runfolder_dir
54         obj.data_dir = data_dir
55         obj.image_analysis_dir = ipar_dir
56         obj.bustard_dir = bustard_dir
57         obj.gerald_dir = gerald_dir
58
59
60 class RunfolderTests(TestCase):
61     """
62     Test components of the runfolder processing code
63     which includes firecrest, bustard, and gerald
64     """
65     def setUp(self):
66         # attaches all the directories to the object passed in
67         make_runfolder(self)
68
69     def tearDown(self):
70         shutil.rmtree(self.temp_dir)
71
72     def test_ipar(self):
73         """
74         Construct a firecrest object
75         """
76         i = ipar.ipar(self.image_analysis_dir)
77         self.failUnlessEqual(i.software, 'IPAR')
78         self.failUnlessEqual(i.version, '2.01.192.0')
79         self.failUnlessEqual(i.start, 1)
80         self.failUnlessEqual(i.stop, 37)
81
82         xml = i.get_elements()
83         # just make sure that element tree can serialize the tree
84         xml_str = ElementTree.tostring(xml)
85
86         i2 = ipar.IPAR(xml=xml)
87         self.failUnlessEqual(i.software, i2.software)
88         self.failUnlessEqual(i.version, i2.version)
89         self.failUnlessEqual(i.start,   i2.start)
90         self.failUnlessEqual(i.stop,    i2.stop)
91         self.failUnlessEqual(i.date,    i2.date)
92         self.failUnlessEqual(i.file_list(), i2.file_list())
93
94     def test_bustard(self):
95         """
96         construct a bustard object
97         """
98         def check_crosstalk(crosstalk):
99             self.failUnlessAlmostEqual(crosstalk.base['A'][0],  1.27)
100             self.failUnlessAlmostEqual(crosstalk.base['A'][1],  0.20999999999999)
101             self.failUnlessAlmostEqual(crosstalk.base['A'][2], -0.02)
102             self.failUnlessAlmostEqual(crosstalk.base['A'][3], -0.03)
103
104             self.failUnlessAlmostEqual(crosstalk.base['C'][0],  0.57)
105             self.failUnlessAlmostEqual(crosstalk.base['C'][1],  0.58)
106             self.failUnlessAlmostEqual(crosstalk.base['C'][2], -0.01)
107             self.failUnlessAlmostEqual(crosstalk.base['C'][3], -0.01)
108
109             self.failUnlessAlmostEqual(crosstalk.base['T'][0], -0.02)
110             self.failUnlessAlmostEqual(crosstalk.base['T'][1], -0.02)
111             self.failUnlessAlmostEqual(crosstalk.base['T'][2],  0.80)
112             self.failUnlessAlmostEqual(crosstalk.base['T'][3],  1.07)
113
114             self.failUnlessAlmostEqual(crosstalk.base['G'][0], -0.03)
115             self.failUnlessAlmostEqual(crosstalk.base['G'][1], -0.04)
116             self.failUnlessAlmostEqual(crosstalk.base['G'][2],  1.51)
117             self.failUnlessAlmostEqual(crosstalk.base['G'][3], -0.02)
118
119         b = bustard.bustard(self.bustard_dir)
120         self.failUnlessEqual(b.software, 'Bustard')
121         self.failUnlessEqual(b.version, '1.3.2')
122         self.failUnlessEqual(b.date,    date(2008,3,15))
123         self.failUnlessEqual(b.user,    'diane')
124         self.failUnlessEqual(len(b.phasing), 8)
125         self.failUnlessAlmostEqual(b.phasing[8].phasing, 0.0099)
126         self.assertEqual(set(b.crosstalk.base), set(['A','C','T','G']))
127         check_crosstalk(b.crosstalk)
128
129         xml = b.get_elements()
130         b2 = bustard.Bustard(xml=xml)
131         self.failUnlessEqual(b.software, b2.software)
132         self.failUnlessEqual(b.version, b2.version)
133         self.failUnlessEqual(b.date,    b2.date )
134         self.failUnlessEqual(b.user,    b2.user)
135         self.failUnlessEqual(len(b.phasing), len(b2.phasing))
136         for key in b.phasing.keys():
137             self.failUnlessEqual(b.phasing[key].lane,
138                                  b2.phasing[key].lane)
139             self.failUnlessEqual(b.phasing[key].phasing,
140                                  b2.phasing[key].phasing)
141             self.failUnlessEqual(b.phasing[key].prephasing,
142                                  b2.phasing[key].prephasing)
143         check_crosstalk(b2.crosstalk)
144
145     def test_gerald(self):
146         # need to update gerald and make tests for it
147         g = gerald.gerald(self.gerald_dir)
148
149         self.failUnlessEqual(g.software, 'GERALD')
150         self.failUnlessEqual(g.version, '1.171')
151         self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
152         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
153         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
154
155
156         # list of genomes, matches what was defined up in
157         # make_gerald_config.
158         # the first None is to offset the genomes list to be 1..9
159         # instead of pythons default 0..8
160         genomes = [None,
161                    '/g/mm9',
162                    '/g/mm9',
163                    '/g/elegans190',
164                    '/g/arabidopsis01222004',
165                    '/g/mm9',
166                    '/g/mm9',
167                    '/g/mm9',
168                    '/g/mm9', ]
169
170         # test lane specific parameters from gerald config file
171         for i in range(1,9):
172             cur_lane = g.lanes[i]
173             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
174             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
175             self.failUnlessEqual(cur_lane.read_length, '37')
176             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
177
178         # I want to be able to use a simple iterator
179         for l in g.lanes.values():
180           self.failUnlessEqual(l.analysis, 'eland_extended')
181           self.failUnlessEqual(l.read_length, '37')
182           self.failUnlessEqual(l.use_bases, 'Y'*37)
183
184         # test data extracted from summary file
185         clusters = [None,
186                     (126910, 4300), (165739, 6792),
187                     (196565, 8216), (153897, 8501),
188                     (135536, 3908), (154083, 9315),
189                     (159991, 9292), (198479, 17671),]
190
191         self.failUnlessEqual(len(g.summary), 1)
192         for i in range(1,9):
193             summary_lane = g.summary[0][i]
194             self.failUnlessEqual(summary_lane.cluster, clusters[i])
195             self.failUnlessEqual(summary_lane.lane, i)
196
197         xml = g.get_elements()
198         # just make sure that element tree can serialize the tree
199         xml_str = ElementTree.tostring(xml)
200         g2 = gerald.Gerald(xml=xml)
201
202         # do it all again after extracting from the xml file
203         self.failUnlessEqual(g.software, g2.software)
204         self.failUnlessEqual(g.version, g2.version)
205         self.failUnlessEqual(g.date, g2.date)
206         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
207         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
208
209         # test lane specific parameters from gerald config file
210         for i in range(1,9):
211             g_lane = g.lanes[i]
212             g2_lane = g2.lanes[i]
213             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
214             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
215             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
216             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
217
218         # test (some) summary elements
219         self.failUnlessEqual(len(g.summary), 1)
220         for i in range(1,9):
221             g_summary = g.summary[0][i]
222             g2_summary = g2.summary[0][i]
223             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
224             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
225
226             g_eland = g.eland_results
227             g2_eland = g2.eland_results
228             for key in g_eland:
229                 g_results = g_eland[key]
230                 g2_results = g2_eland[key]
231                 self.failUnlessEqual(g_results.reads,
232                                      g2_results.reads)
233                 if isinstance(g_results, eland.ElandLane):
234                   self.failUnlessEqual(len(g_results.mapped_reads),
235                                        len(g2_results.mapped_reads))
236                   for k in g_results.mapped_reads.keys():
237                       self.failUnlessEqual(g_results.mapped_reads[k],
238                                            g2_results.mapped_reads[k])
239
240                   self.failUnlessEqual(len(g_results.match_codes),
241                                        len(g2_results.match_codes))
242                   for k in g_results.match_codes.keys():
243                       self.failUnlessEqual(g_results.match_codes[k],
244                                            g2_results.match_codes[k])
245
246
247     def test_eland(self):
248         hg_map = {'Lambda.fa': 'Lambda.fa'}
249         for i in range(1,22):
250           short_name = 'chr%d.fa' % (i,)
251           long_name = 'hg18/chr%d.fa' % (i,)
252           hg_map[short_name] = long_name
253
254         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
255                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
256         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
257
258         # I added sequence lanes to the last 2 lanes of this test case
259         for key in eland_container:
260             lane = eland_container[key]
261             if key.lane in [1,2,3,4,5,6]:
262                 self.failUnlessEqual(lane.reads, 6)
263                 self.failUnlessEqual(lane.sample_name, "s")
264                 self.failUnlessEqual(lane.lane_id, key.lane)
265                 self.failUnlessEqual(len(lane.mapped_reads), 17)
266                 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
267                 self.failUnlessEqual(lane.match_codes['U0'], 3)
268                 self.failUnlessEqual(lane.match_codes['R0'], 2)
269                 self.failUnlessEqual(lane.match_codes['U1'], 1)
270                 self.failUnlessEqual(lane.match_codes['R1'], 9)
271                 self.failUnlessEqual(lane.match_codes['U2'], 0)
272                 self.failUnlessEqual(lane.match_codes['R2'], 12)
273                 self.failUnlessEqual(lane.match_codes['NM'], 1)
274                 self.failUnlessEqual(lane.match_codes['QC'], 0)
275             elif key.lane == 7:
276                 self.failUnlessEqual(lane.reads, 5)
277                 self.failUnlessEqual(lane.sample_name, 's')
278                 self.failUnlessEqual(lane.lane_id, 7)
279                 self.failUnlessEqual(lane.sequence_type,
280                                      eland.SequenceLane.SCARF_TYPE)
281             elif key.lane == 8:
282                 self.failUnlessEqual(lane.reads, 3)
283                 self.failUnlessEqual(lane.sample_name, 's')
284                 self.failUnlessEqual(lane.lane_id, 8)
285                 self.failUnlessEqual(lane.sequence_type,
286                                      eland.SequenceLane.FASTQ_TYPE)
287
288         xml = eland_container.get_elements()
289         # just make sure that element tree can serialize the tree
290         xml_str = ElementTree.tostring(xml)
291         e2 = gerald.ELAND(xml=xml)
292
293         for key in eland_container:
294             l1 = eland_container[key]
295             l2 = e2[key]
296             self.failUnlessEqual(l1.reads, l2.reads)
297             self.failUnlessEqual(l1.sample_name, l2.sample_name)
298             self.failUnlessEqual(l1.lane_id, l2.lane_id)
299             if isinstance(l1, eland.ElandLane):
300               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
301               self.failUnlessEqual(len(l1.mapped_reads), 17)
302               for k in l1.mapped_reads.keys():
303                   self.failUnlessEqual(l1.mapped_reads[k],
304                                        l2.mapped_reads[k])
305
306               self.failUnlessEqual(len(l1.match_codes), 9)
307               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
308               for k in l1.match_codes.keys():
309                   self.failUnlessEqual(l1.match_codes[k],
310                                        l2.match_codes[k])
311             elif isinstance(l1, eland.SequenceLane):
312                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
313
314     def test_runfolder(self):
315         runs = runfolder.get_runs(self.runfolder_dir)
316
317         # do we get the flowcell id from the filename?
318         self.failUnlessEqual(len(runs), 1)
319         name = 'run_3021JAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
320         self.failUnlessEqual(runs[0].serialization_filename, name)
321
322         # do we get the flowcell id from the FlowcellId.xml file
323         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
324         runs = runfolder.get_runs(self.runfolder_dir)
325         self.failUnlessEqual(len(runs), 1)
326         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
327         self.failUnlessEqual(runs[0].serialization_filename, name)
328
329         r1 = runs[0]
330         xml = r1.get_elements()
331         xml_str = ElementTree.tostring(xml)
332
333         r2 = runfolder.PipelineRun(xml=xml)
334         self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
335         self.failIfEqual(r2.image_analysis, None)
336         self.failIfEqual(r2.bustard, None)
337         self.failIfEqual(r2.gerald, None)
338
339
340 def suite():
341     from unittest import TestSuite, defaultTestLoader
342     suite = TestSuite()
343     suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
344     return suite
345
346
347 if __name__ == "__main__":
348     from unittest import main
349     main(defaultTest="suite")