take advantage of absolute_import to simplify import statements
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta.py
1 #!/usr/bin/env python
2 from __future__ import absolute_import
3
4 from datetime import datetime, date
5 import os
6 import tempfile
7 import shutil
8 from unittest import TestCase
9
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines import ipar
12 from htsworkflow.pipelines import bustard
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines.samplekey import SampleKey
16 from htsworkflow.pipelines import ElementTree
17
18 from .simulate_runfolder import *
19
20
21 def make_runfolder(obj=None):
22     """
23     Make a fake runfolder, attach all the directories to obj if defined
24     """
25     # make a fake runfolder directory
26     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
27
28     runfolder_dir = os.path.join(temp_dir,
29                                  '090608_HWI-EAS229_0117_4286GAAXX')
30     os.mkdir(runfolder_dir)
31
32     data_dir = os.path.join(runfolder_dir, 'Data')
33     os.mkdir(data_dir)
34
35     intensities_dir = make_rta_intensities_1460(data_dir)
36
37     basecalls_dir = make_rta_basecalls_1460(intensities_dir)
38
39     #make_phasing_params(bustard_dir)
40     #make_bustard_config132(bustard_dir)
41
42     gerald_dir = os.path.join(basecalls_dir,
43                               'GERALD_16-06-2009_diane')
44     os.mkdir(gerald_dir)
45     make_gerald_config_100(gerald_dir)
46     make_summary_ipar130_htm(gerald_dir)
47     make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
48     make_scarf(gerald_dir, lane_list=[7,])
49     make_fastq(gerald_dir, lane_list=[8,])
50
51     if obj is not None:
52         obj.temp_dir = temp_dir
53         obj.runfolder_dir = runfolder_dir
54         obj.data_dir = data_dir
55         obj.image_analysis_dir = intensities_dir
56         obj.bustard_dir = basecalls_dir
57         obj.gerald_dir = gerald_dir
58
59
60 class RunfolderTests(TestCase):
61     """
62     Test components of the runfolder processing code
63     which includes firecrest, bustard, and gerald
64     """
65     def setUp(self):
66         # attaches all the directories to the object passed in
67         make_runfolder(self)
68
69     def tearDown(self):
70         shutil.rmtree(self.temp_dir)
71
72     def test_ipar(self):
73         """
74         Construct a firecrest object
75         """
76         i = ipar.ipar(self.image_analysis_dir)
77         self.failUnlessEqual(i.version, '1.4.6.0')
78         self.failUnlessEqual(i.start, 1)
79         self.failUnlessEqual(i.stop, 38)
80
81         xml = i.get_elements()
82         # just make sure that element tree can serialize the tree
83         xml_str = ElementTree.tostring(xml)
84
85         i2 = ipar.IPAR(xml=xml)
86         self.failUnlessEqual(i.version, i2.version)
87         self.failUnlessEqual(i.start,   i2.start)
88         self.failUnlessEqual(i.stop,    i2.stop)
89         self.failUnlessEqual(i.date,    i2.date)
90
91     def test_bustard(self):
92         """
93         construct a bustard object
94         """
95         b = bustard.bustard(self.bustard_dir)
96         self.failUnlessEqual(b.version, '1.4.6.0')
97         self.failUnlessEqual(b.date,    None)
98         self.failUnlessEqual(b.user,    None)
99         self.failUnlessEqual(len(b.phasing), 0)
100
101         xml = b.get_elements()
102         b2 = bustard.Bustard(xml=xml)
103         self.failUnlessEqual(b.version, b2.version)
104         self.failUnlessEqual(b.date,    b2.date )
105         self.failUnlessEqual(b.user,    b2.user)
106
107     def test_gerald(self):
108         # need to update gerald and make tests for it
109         g = gerald.gerald(self.gerald_dir)
110
111         self.failUnlessEqual(g.version, '1.171')
112         self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
113         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
114         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
115
116
117         # list of genomes, matches what was defined up in
118         # make_gerald_config.
119         # the first None is to offset the genomes list to be 1..9
120         # instead of pythons default 0..8
121         genomes = [None,
122                    '/g/mm9',
123                    '/g/mm9',
124                    '/g/elegans190',
125                    '/g/arabidopsis01222004',
126                    '/g/mm9',
127                    '/g/mm9',
128                    '/g/mm9',
129                    '/g/mm9', ]
130
131         # test lane specific parameters from gerald config file
132         for i in range(1,9):
133             cur_lane = g.lanes[i]
134             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
135             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
136             self.failUnlessEqual(cur_lane.read_length, '37')
137             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
138
139         # I want to be able to use a simple iterator
140         for l in g.lanes.values():
141           self.failUnlessEqual(l.analysis, 'eland_extended')
142           self.failUnlessEqual(l.read_length, '37')
143           self.failUnlessEqual(l.use_bases, 'Y'*37)
144
145         # test data extracted from summary file
146         clusters = [None,
147                     (126910, 4300), (165739, 6792),
148                     (196565, 8216), (153897, 8501),
149                     (135536, 3908), (154083, 9315),
150                     (159991, 9292), (198479, 17671),]
151
152         self.failUnlessEqual(len(g.summary), 1)
153         for i in range(1,9):
154             summary_lane = g.summary[0][i]
155             self.failUnlessEqual(summary_lane.cluster, clusters[i])
156             self.failUnlessEqual(summary_lane.lane, i)
157
158         xml = g.get_elements()
159         # just make sure that element tree can serialize the tree
160         xml_str = ElementTree.tostring(xml)
161         g2 = gerald.Gerald(xml=xml)
162
163         # do it all again after extracting from the xml file
164         self.failUnlessEqual(g.version, g2.version)
165         self.failUnlessEqual(g.date, g2.date)
166         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
167         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
168
169         # test lane specific parameters from gerald config file
170         for i in range(1,9):
171             g_lane = g.lanes[i]
172             g2_lane = g2.lanes[i]
173             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
174             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
175             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
176             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
177
178         # test (some) summary elements
179         self.failUnlessEqual(len(g.summary), 1)
180         for i in range(1,9):
181             g_summary = g.summary[0][i]
182             g2_summary = g2.summary[0][i]
183             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
184             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
185
186             g_eland = g.eland_results
187             g2_eland = g2.eland_results
188             for lane in g_eland:
189                 g_results = g_eland[lane]
190                 g2_results = g2_eland[lane]
191                 self.failUnlessEqual(g_results.reads,
192                                      g2_results.reads)
193                 if isinstance(g_results, eland.ElandLane):
194                   self.failUnlessEqual(len(g_results.mapped_reads),
195                                        len(g2_results.mapped_reads))
196                   for k in g_results.mapped_reads.keys():
197                       self.failUnlessEqual(g_results.mapped_reads[k],
198                                            g2_results.mapped_reads[k])
199
200                   self.failUnlessEqual(len(g_results.match_codes),
201                                        len(g2_results.match_codes))
202                   for k in g_results.match_codes.keys():
203                       self.failUnlessEqual(g_results.match_codes[k],
204                                            g2_results.match_codes[k])
205
206
207     def test_eland(self):
208         hg_map = {'Lambda.fa': 'Lambda.fa'}
209         for i in range(1,22):
210           short_name = 'chr%d.fa' % (i,)
211           long_name = 'hg18/chr%d.fa' % (i,)
212           hg_map[short_name] = long_name
213
214         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
215                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
216         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
217
218         # I added sequence lanes to the last 2 lanes of this test case
219         lanes = [ SampleKey(lane=i, read=1, sample='s') for i in range(1,7) ]
220         for key in lanes:
221             lane = eland_container[key]
222             self.failUnlessEqual(lane.reads, 6)
223             self.failUnlessEqual(lane.sample_name, "s")
224             self.failUnlessEqual(lane.lane_id, key.lane)
225             self.failUnlessEqual(len(lane.mapped_reads), 17)
226             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
227             self.failUnlessEqual(lane.match_codes['U0'], 3)
228             self.failUnlessEqual(lane.match_codes['R0'], 2)
229             self.failUnlessEqual(lane.match_codes['U1'], 1)
230             self.failUnlessEqual(lane.match_codes['R1'], 9)
231             self.failUnlessEqual(lane.match_codes['U2'], 0)
232             self.failUnlessEqual(lane.match_codes['R2'], 12)
233             self.failUnlessEqual(lane.match_codes['NM'], 1)
234             self.failUnlessEqual(lane.match_codes['QC'], 0)
235
236         # test scarf
237         lane = eland_container[SampleKey(lane=7, read=1, sample='s')]
238         self.failUnlessEqual(lane.reads, 5)
239         self.failUnlessEqual(lane.sample_name, 's')
240         self.failUnlessEqual(lane.lane_id, 7)
241         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
242
243         # test fastq
244         lane = eland_container[SampleKey(lane=8, read=1, sample='s')]
245         self.failUnlessEqual(lane.reads, 3)
246         self.failUnlessEqual(lane.sample_name, 's')
247         self.failUnlessEqual(lane.lane_id, 8)
248         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
249
250         xml = eland_container.get_elements()
251         # just make sure that element tree can serialize the tree
252         xml_str = ElementTree.tostring(xml)
253         e2 = gerald.ELAND(xml=xml)
254
255         for key in eland_container:
256             l1 = eland_container[key]
257             l2 = e2[key]
258             self.failUnlessEqual(l1.reads, l2.reads)
259             self.failUnlessEqual(l1.sample_name, l2.sample_name)
260             self.failUnlessEqual(l1.lane_id, l2.lane_id)
261             if isinstance(l1, eland.ElandLane):
262               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
263               self.failUnlessEqual(len(l1.mapped_reads), 17)
264               for k in l1.mapped_reads.keys():
265                   self.failUnlessEqual(l1.mapped_reads[k],
266                                        l2.mapped_reads[k])
267
268               self.failUnlessEqual(len(l1.match_codes), 9)
269               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
270               for k in l1.match_codes.keys():
271                   self.failUnlessEqual(l1.match_codes[k],
272                                        l2.match_codes[k])
273             elif isinstance(l1, eland.SequenceLane):
274                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
275
276     def test_runfolder(self):
277         runs = runfolder.get_runs(self.runfolder_dir)
278
279         # do we get the flowcell id from the filename?
280         self.failUnlessEqual(len(runs), 1)
281         name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
282         self.failUnlessEqual(runs[0].serialization_filename, name)
283
284         # do we get the flowcell id from the FlowcellId.xml file
285         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
286         runs = runfolder.get_runs(self.runfolder_dir)
287         self.failUnlessEqual(len(runs), 1)
288         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
289         self.failUnlessEqual(runs[0].serialization_filename, name)
290
291         r1 = runs[0]
292         xml = r1.get_elements()
293         xml_str = ElementTree.tostring(xml)
294
295         r2 = runfolder.PipelineRun(xml=xml)
296         self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
297         self.failIfEqual(r2.image_analysis, None)
298         self.failIfEqual(r2.bustard, None)
299         self.failIfEqual(r2.gerald, None)
300
301
302 def suite():
303     from unittest import TestSuite, defaultTestLoader
304     suite = TestSuite()
305     suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
306     return suite
307
308
309 if __name__ == "__main__":
310     from unittest import main
311     main(defaultTest="suite")