take advantage of absolute_import to simplify import statements
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta1_12.py
1 #!/usr/bin/env python
2 from __future__ import absolute_import
3
4 from datetime import datetime, date
5 import logging
6 import os
7 import tempfile
8 import shutil
9 from unittest import TestCase
10
11 from htsworkflow.pipelines import eland
12 from htsworkflow.pipelines.samplekey import SampleKey
13 from htsworkflow.pipelines import ipar
14 from htsworkflow.pipelines import bustard
15 from htsworkflow.pipelines import gerald
16 from htsworkflow.pipelines import runfolder
17 from htsworkflow.pipelines import ElementTree
18
19 from .simulate_runfolder import *
20
21
22 def make_runfolder(obj=None):
23     """
24     Make a fake runfolder, attach all the directories to obj if defined
25     """
26     # make a fake runfolder directory
27     flowcell_id = 'D07K6ACXX'
28     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
29
30     runfolder_dir = os.path.join(
31         temp_dir,
32         '110815_SN787_0101_A{0}'.format(flowcell_id))
33     os.mkdir(runfolder_dir)
34
35     make_runinfo(runfolder_dir, flowcell_id)
36
37     data_dir = os.path.join(runfolder_dir, 'Data')
38     os.mkdir(data_dir)
39
40     intensities_dir = make_rta_intensities_1_12(data_dir)
41     make_status_rta1_12(data_dir)
42
43     basecalls_dir = make_rta_basecalls_1_12(intensities_dir)
44     make_matrix_dir_rta_1_12(basecalls_dir)
45
46     unaligned_dir = os.path.join(runfolder_dir, "Unaligned")
47     os.mkdir(unaligned_dir)
48     make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
49     make_unaligned_config_1_12(unaligned_dir)
50     make_unaligned_status_1_12(unaligned_dir, flowcell_id)
51
52     aligned_dir = os.path.join(runfolder_dir, "Aligned")
53     os.mkdir(aligned_dir)
54     make_aligned_eland_export(aligned_dir, flowcell_id)
55     make_aligned_config_1_12(aligned_dir)
56
57     if obj is not None:
58         obj.flowcell_id = flowcell_id
59         obj.temp_dir = temp_dir
60         obj.runfolder_dir = runfolder_dir
61         obj.data_dir = data_dir
62         obj.image_analysis_dir = intensities_dir
63         obj.bustard_dir = unaligned_dir
64         obj.gerald_dir = aligned_dir
65         obj.reads = 2
66
67
68 class RunfolderTests(TestCase):
69     """
70     Test components of the runfolder processing code
71     which includes firecrest, bustard, and gerald
72     """
73     def setUp(self):
74         # attaches all the directories to the object passed in
75         make_runfolder(self)
76
77     def tearDown(self):
78         shutil.rmtree(self.temp_dir)
79
80     def test_bustard(self):
81         """Construct a bustard object"""
82         b = bustard.bustard(self.bustard_dir)
83         self.failUnlessEqual(b.software, 'RTA')
84         self.failUnlessEqual(b.version, '1.12.4.2')
85         self.failUnlessEqual(b.date,    None)
86         self.failUnlessEqual(b.user,    None)
87         self.failUnlessEqual(len(b.phasing), 0)
88
89         xml = b.get_elements()
90         b2 = bustard.Bustard(xml=xml)
91         self.failUnlessEqual(b.software, b2.software)
92         self.failUnlessEqual(b.version,  b2.version)
93         self.failUnlessEqual(b.date,     b2.date )
94         self.failUnlessEqual(b.user,     b2.user)
95
96     def test_gerald(self):
97         # need to update gerald and make tests for it
98         g = gerald.gerald(self.gerald_dir)
99
100         self.failUnlessEqual(g.software, 'CASAVA')
101         self.failUnlessEqual(g.version, '1.8.1')
102         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
103         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
104
105         # list of genomes, matches what was defined up in
106         # make_gerald_config.
107         # the first None is to offset the genomes list to be 1..9
108         # instead of pythons default 0..8
109         # test lane specific parameters from gerald config file
110
111         undetermined = g.lanes[SampleKey(sample='Undetermined_indices')]
112         self.failUnlessEqual(undetermined.analysis, 'none')
113         self.failUnlessEqual(undetermined.read_length, None)
114         self.failUnlessEqual(undetermined.use_bases, None)
115
116         project = g.lanes[SampleKey(sample='11115')]
117         self.failUnlessEqual(project.analysis, 'eland_extended')
118         self.failUnlessEqual(project.eland_genome, '/g/hg18/chromosomes/')
119         self.failUnlessEqual(project.read_length, '49')
120         self.failUnlessEqual(project.use_bases, 'y'*49+'n')
121
122         # test data extracted from summary file
123         clusters = [None,
124                     (3878755,  579626.0), (3920639, 1027332.4),
125                     (5713049,  876187.3), (5852907,  538640.6),
126                     (4006751, 1265247.4), (5678021,  627070.7),
127                     (1854131,  429053.2), (4777517,  592904.0),
128                    ]
129
130         self.failUnlessEqual(len(g.summary), self.reads)
131         for i in range(1,9):
132             summary_lane = g.summary[0][i]
133             self.failUnlessEqual(summary_lane.cluster, clusters[i])
134             self.failUnlessEqual(summary_lane.lane, i)
135
136         xml = g.get_elements()
137         # just make sure that element tree can serialize the tree
138         xml_str = ElementTree.tostring(xml)
139         g2 = gerald.CASAVA(xml=xml)
140
141         # do it all again after extracting from the xml file
142         self.failUnlessEqual(g.software, g2.software)
143         self.failUnlessEqual(g.version, g2.version)
144         self.failUnlessEqual(g.date, g2.date)
145         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
146         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
147
148         # test lane specific parameters from gerald config file
149         for i in g.lanes.keys():
150             g_lane = g.lanes[i]
151             g2_lane = g2.lanes[i]
152             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
153             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
154             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
155             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
156
157         # test (some) summary elements
158         self.failUnlessEqual(len(g.summary), self.reads)
159         for i in range(1,9):
160             g_summary = g.summary[0][i]
161             g2_summary = g2.summary[0][i]
162             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
163             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
164
165             g_eland = g.eland_results
166             g2_eland = g2.eland_results
167             for key in g_eland:
168                 g_results = g_eland[key]
169                 g2_results = g2_eland[key]
170                 self.failUnlessEqual(g_results.reads,
171                                      g2_results.reads)
172                 if isinstance(g_results, eland.ElandLane):
173                   self.failUnlessEqual(len(g_results.mapped_reads),
174                                        len(g2_results.mapped_reads))
175                   for k in g_results.mapped_reads.keys():
176                       self.failUnlessEqual(g_results.mapped_reads[k],
177                                            g2_results.mapped_reads[k])
178
179                   self.failUnlessEqual(len(g_results.match_codes),
180                                        len(g2_results.match_codes))
181                   for k in g_results.match_codes.keys():
182                       self.failUnlessEqual(g_results.match_codes[k],
183                                            g2_results.match_codes[k])
184
185
186     def test_eland(self):
187         hg_map = {'Lambda.fa': 'Lambda.fa'}
188         for i in range(1,22):
189           short_name = 'chr%d.fa' % (i,)
190           long_name = 'hg18/chr%d.fa' % (i,)
191           hg_map[short_name] = long_name
192
193         samples = set(('11111', '11112', '11113', '11114', '11115',
194                        '11116', '11117', '11118', '11119', '11120'))
195         genome_maps = {}
196         for i in range(1,9):
197             genome_maps[i] = hg_map
198
199         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
200
201         for lane in eland_container.values():
202             # I added sequence lanes to the last 2 lanes of this test case
203             if lane.sample_name == '11113':
204                 self.assertEqual(lane.reads, 24)
205                 self.assertEqual(lane.mapped_reads['hg18/chr9.fa'], 6)
206                 self.assertEqual(lane.match_codes['U0'], 6)
207                 self.assertEqual(lane.match_codes['R0'], 18)
208                 self.assertEqual(lane.match_codes['R1'], 24)
209                 self.assertEqual(lane.match_codes['R2'], 18)
210                 self.assertEqual(lane.match_codes['NM'], 12)
211             else:
212                 self.assertEqual(lane.reads, 8)
213                 self.assertEqual(lane.mapped_reads['hg18/chr9.fa'], 2)
214                 self.assertEqual(lane.match_codes['U0'], 2)
215                 self.assertEqual(lane.match_codes['R0'], 6)
216                 self.assertEqual(lane.match_codes['R1'], 8)
217                 self.assertEqual(lane.match_codes['R2'], 6)
218                 self.assertEqual(lane.match_codes['NM'], 4)
219
220             self.assertTrue(lane.sample_name in samples)
221             #self.assertEqual(lane.lane_id, 1)
222             self.assertEqual(len(lane.mapped_reads), 1)
223             self.assertEqual(lane.match_codes['U1'], 0)
224             self.assertEqual(lane.match_codes['U2'], 0)
225             self.assertEqual(lane.match_codes['QC'], 0)
226
227         xml = eland_container.get_elements()
228         # just make sure that element tree can serialize the tree
229         xml_str = ElementTree.tostring(xml)
230         e2 = gerald.ELAND(xml=xml)
231
232         for key in eland_container.results:
233             l1 = eland_container.results[key]
234             l2 = e2.results[key]
235             self.failUnlessEqual(l1.reads, l2.reads)
236             self.failUnlessEqual(l1.sample_name, l2.sample_name)
237             self.failUnlessEqual(l1.lane_id, l2.lane_id)
238             if isinstance(l1, eland.ElandLane):
239               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
240               self.failUnlessEqual(len(l1.mapped_reads), 1)
241               for k in l1.mapped_reads.keys():
242                   self.failUnlessEqual(l1.mapped_reads[k],
243                                        l2.mapped_reads[k])
244
245               self.failUnlessEqual(len(l1.match_codes), 9)
246               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
247               for k in l1.match_codes.keys():
248                   self.failUnlessEqual(l1.match_codes[k],
249                                        l2.match_codes[k])
250             elif isinstance(l1, eland.SequenceLane):
251                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
252
253     def test_runfolder(self):
254         runs = runfolder.get_runs(self.runfolder_dir)
255
256         # do we get the flowcell id from the filename?
257         self.failUnlessEqual(len(runs), 1)
258         self.assertEqual(runs[0].flowcell_id, self.flowcell_id)
259         name = 'run_%s_%s.xml' % ( self.flowcell_id,
260                                    date.today().strftime('%Y-%m-%d'),)
261         self.failUnlessEqual(runs[0].serialization_filename, name)
262
263         bustard_dir = os.path.join(self.runfolder_dir, 'Unaligned')
264         r1 = runs[0]
265         self.failUnlessEqual(r1.bustard.sequence_format, 'fastq')
266         self.failUnlessEqual(r1.bustard.pathname, bustard_dir)
267         self.failUnlessEqual(r1.gerald.runfolder_name, 'Unaligned')
268
269         xml = r1.get_elements()
270         xml_str = ElementTree.tostring(xml)
271
272         r2 = runfolder.PipelineRun(xml=xml)
273         self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
274         self.failIfEqual(r2.image_analysis, None)
275         self.failIfEqual(r2.bustard, None)
276         self.failIfEqual(r2.gerald, None)
277
278 def suite():
279     from unittest import TestSuite, defaultTestLoader
280     suite = TestSuite()
281     suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
282     return suite
283
284
285 if __name__ == "__main__":
286     from unittest import main
287     main(defaultTest="suite")