Use sample keys when looking up lane parameters.
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta1_12.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import logging
5 import os
6 import tempfile
7 import shutil
8 import unittest
9
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines.samplekey import SampleKey
12 from htsworkflow.pipelines import ipar
13 from htsworkflow.pipelines import bustard
14 from htsworkflow.pipelines import gerald
15 from htsworkflow.pipelines import runfolder
16 from htsworkflow.pipelines.runfolder import ElementTree
17
18 from htsworkflow.pipelines.test.simulate_runfolder import *
19
20
21 def make_runfolder(obj=None):
22     """
23     Make a fake runfolder, attach all the directories to obj if defined
24     """
25     # make a fake runfolder directory
26     flowcell_id = 'D07K6ACXX'
27     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
28
29     runfolder_dir = os.path.join(temp_dir,
30                                  '110815_SN787_0101_A{0}'.format(flowcell_id))
31     os.mkdir(runfolder_dir)
32
33     data_dir = os.path.join(runfolder_dir, 'Data')
34     os.mkdir(data_dir)
35
36     intensities_dir = make_rta_intensities_1_12(data_dir)
37     make_status_rta1_12(data_dir)
38
39     basecalls_dir = make_rta_basecalls_1_12(intensities_dir)
40     make_matrix_dir_rta_1_12(basecalls_dir)
41
42     unaligned_dir = os.path.join(runfolder_dir, "Unaligned")
43     os.mkdir(unaligned_dir)
44     make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
45     make_unaligned_config_1_12(unaligned_dir)
46
47     aligned_dir = os.path.join(runfolder_dir, "Aligned")
48     os.mkdir(aligned_dir)
49     make_aligned_eland_export(aligned_dir, flowcell_id)
50     make_aligned_config_1_12(aligned_dir)
51
52     if obj is not None:
53         obj.temp_dir = temp_dir
54         obj.runfolder_dir = runfolder_dir
55         obj.data_dir = data_dir
56         obj.image_analysis_dir = intensities_dir
57         obj.bustard_dir = unaligned_dir
58         obj.gerald_dir = aligned_dir
59         obj.reads = 2
60
61
62 class RunfolderTests(unittest.TestCase):
63     """
64     Test components of the runfolder processing code
65     which includes firecrest, bustard, and gerald
66     """
67     def setUp(self):
68         # attaches all the directories to the object passed in
69         make_runfolder(self)
70
71     def tearDown(self):
72         shutil.rmtree(self.temp_dir)
73
74     def test_bustard(self):
75         """Construct a bustard object"""
76         b = bustard.bustard(self.bustard_dir)
77         self.failUnlessEqual(b.software, 'RTA')
78         self.failUnlessEqual(b.version, '1.12.4.2')
79         self.failUnlessEqual(b.date,    None)
80         self.failUnlessEqual(b.user,    None)
81         self.failUnlessEqual(len(b.phasing), 0)
82
83         xml = b.get_elements()
84         b2 = bustard.Bustard(xml=xml)
85         self.failUnlessEqual(b.software, b2.software)
86         self.failUnlessEqual(b.version,  b2.version)
87         self.failUnlessEqual(b.date,     b2.date )
88         self.failUnlessEqual(b.user,     b2.user)
89
90     def test_gerald(self):
91         # need to update gerald and make tests for it
92         g = gerald.gerald(self.gerald_dir)
93
94         self.failUnlessEqual(g.software, 'CASAVA')
95         self.failUnlessEqual(g.version, '1.8.1')
96         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
97         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
98
99         # list of genomes, matches what was defined up in
100         # make_gerald_config.
101         # the first None is to offset the genomes list to be 1..9
102         # instead of pythons default 0..8
103         # test lane specific parameters from gerald config file
104
105         undetermined = g.lanes[SampleKey(sample='Undetermined_indices')]
106         self.failUnlessEqual(undetermined.analysis, 'none')
107         self.failUnlessEqual(undetermined.read_length, None)
108         self.failUnlessEqual(undetermined.use_bases, None)
109
110         project = g.lanes[SampleKey(sample='11115')]
111         self.failUnlessEqual(project.analysis, 'eland_extended')
112         self.failUnlessEqual(project.eland_genome, '/g/hg18/chromosomes/')
113         self.failUnlessEqual(project.read_length, '49')
114         self.failUnlessEqual(project.use_bases, 'y'*49+'n')
115
116         # test data extracted from summary file
117         clusters = [None,
118                     (3878755,  579626.0), (3920639, 1027332.4),
119                     (5713049,  876187.3), (5852907,  538640.6),
120                     (4006751, 1265247.4), (5678021,  627070.7),
121                     (1854131,  429053.2), (4777517,  592904.0),
122                    ]
123
124         self.failUnlessEqual(len(g.summary), self.reads)
125         for i in range(1,9):
126             summary_lane = g.summary[0][i]
127             self.failUnlessEqual(summary_lane.cluster, clusters[i])
128             self.failUnlessEqual(summary_lane.lane, i)
129
130         xml = g.get_elements()
131         # just make sure that element tree can serialize the tree
132         xml_str = ElementTree.tostring(xml)
133         g2 = gerald.CASAVA(xml=xml)
134
135         # do it all again after extracting from the xml file
136         self.failUnlessEqual(g.software, g2.software)
137         self.failUnlessEqual(g.version, g2.version)
138         self.failUnlessEqual(g.date, g2.date)
139         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
140         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
141
142         # test lane specific parameters from gerald config file
143         for i in g.lanes.keys():
144             g_lane = g.lanes[i]
145             g2_lane = g2.lanes[i]
146             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
147             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
148             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
149             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
150
151         # test (some) summary elements
152         self.failUnlessEqual(len(g.summary), self.reads)
153         for i in range(1,9):
154             g_summary = g.summary[0][i]
155             g2_summary = g2.summary[0][i]
156             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
157             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
158
159             g_eland = g.eland_results
160             g2_eland = g2.eland_results
161             for key in g_eland:
162                 g_results = g_eland[key]
163                 g2_results = g2_eland[key]
164                 self.failUnlessEqual(g_results.reads,
165                                      g2_results.reads)
166                 if isinstance(g_results, eland.ElandLane):
167                   self.failUnlessEqual(len(g_results.mapped_reads),
168                                        len(g2_results.mapped_reads))
169                   for k in g_results.mapped_reads.keys():
170                       self.failUnlessEqual(g_results.mapped_reads[k],
171                                            g2_results.mapped_reads[k])
172
173                   self.failUnlessEqual(len(g_results.match_codes),
174                                        len(g2_results.match_codes))
175                   for k in g_results.match_codes.keys():
176                       self.failUnlessEqual(g_results.match_codes[k],
177                                            g2_results.match_codes[k])
178
179
180     def test_eland(self):
181         hg_map = {'Lambda.fa': 'Lambda.fa'}
182         for i in range(1,22):
183           short_name = 'chr%d.fa' % (i,)
184           long_name = 'hg18/chr%d.fa' % (i,)
185           hg_map[short_name] = long_name
186
187         samples = set(('11111', '11112', '11113', '11114', '11115',
188                        '11116', '11117', '11118', '11119', '11120'))
189         genome_maps = {}
190         for i in range(1,9):
191             genome_maps[i] = hg_map
192
193         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
194
195         for lane in eland_container.values():
196             # I added sequence lanes to the last 2 lanes of this test case
197             if lane.sample_name == '11113':
198                 self.assertEqual(lane.reads, 24)
199                 self.assertEqual(lane.mapped_reads['hg18/chr9.fa'], 6)
200                 self.assertEqual(lane.match_codes['U0'], 6)
201                 self.assertEqual(lane.match_codes['R0'], 18)
202                 self.assertEqual(lane.match_codes['R1'], 24)
203                 self.assertEqual(lane.match_codes['R2'], 18)
204                 self.assertEqual(lane.match_codes['NM'], 12)
205             else:
206                 self.assertEqual(lane.reads, 8)
207                 self.assertEqual(lane.mapped_reads['hg18/chr9.fa'], 2)
208                 self.assertEqual(lane.match_codes['U0'], 2)
209                 self.assertEqual(lane.match_codes['R0'], 6)
210                 self.assertEqual(lane.match_codes['R1'], 8)
211                 self.assertEqual(lane.match_codes['R2'], 6)
212                 self.assertEqual(lane.match_codes['NM'], 4)
213
214             self.assertTrue(lane.sample_name in samples)
215             #self.assertEqual(lane.lane_id, 1)
216             self.assertEqual(len(lane.mapped_reads), 1)
217             self.assertEqual(lane.match_codes['U1'], 0)
218             self.assertEqual(lane.match_codes['U2'], 0)
219             self.assertEqual(lane.match_codes['QC'], 0)
220
221         xml = eland_container.get_elements()
222         # just make sure that element tree can serialize the tree
223         xml_str = ElementTree.tostring(xml)
224         e2 = gerald.ELAND(xml=xml)
225
226         for key in eland_container.results:
227             l1 = eland_container.results[key]
228             l2 = e2.results[key]
229             self.failUnlessEqual(l1.reads, l2.reads)
230             self.failUnlessEqual(l1.sample_name, l2.sample_name)
231             self.failUnlessEqual(l1.lane_id, l2.lane_id)
232             if isinstance(l1, eland.ElandLane):
233               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
234               self.failUnlessEqual(len(l1.mapped_reads), 1)
235               for k in l1.mapped_reads.keys():
236                   self.failUnlessEqual(l1.mapped_reads[k],
237                                        l2.mapped_reads[k])
238
239               self.failUnlessEqual(len(l1.match_codes), 9)
240               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
241               for k in l1.match_codes.keys():
242                   self.failUnlessEqual(l1.match_codes[k],
243                                        l2.match_codes[k])
244             elif isinstance(l1, eland.SequenceLane):
245                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
246
247     def test_runfolder(self):
248         return
249         runs = runfolder.get_runs(self.runfolder_dir)
250
251         # do we get the flowcell id from the filename?
252         self.failUnlessEqual(len(runs), 1)
253         name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
254         self.failUnlessEqual(runs[0].name, name)
255
256         # do we get the flowcell id from the FlowcellId.xml file
257         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
258         runs = runfolder.get_runs(self.runfolder_dir)
259         self.failUnlessEqual(len(runs), 1)
260         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
261         self.failUnlessEqual(runs[0].name, name)
262
263         r1 = runs[0]
264         xml = r1.get_elements()
265         xml_str = ElementTree.tostring(xml)
266
267         r2 = runfolder.PipelineRun(xml=xml)
268         self.failUnlessEqual(r1.name, r2.name)
269         self.failIfEqual(r2.image_analysis, None)
270         self.failIfEqual(r2.bustard, None)
271         self.failIfEqual(r2.gerald, None)
272
273
274 def suite():
275     return unittest.makeSuite(RunfolderTests,'test')
276
277 if __name__ == "__main__":
278     logging.basicConfig(level=logging.WARN)
279     unittest.main(defaultTest="suite")
280