Add a software (name) property to firecrest, ipar, bustard, gerald
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta1_12.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import logging
5 import os
6 import tempfile
7 import shutil
8 import unittest
9
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines import ipar
12 from htsworkflow.pipelines import bustard
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines.runfolder import ElementTree
16
17 from htsworkflow.pipelines.test.simulate_runfolder import *
18
19
20 def make_runfolder(obj=None):
21     """
22     Make a fake runfolder, attach all the directories to obj if defined
23     """
24     # make a fake runfolder directory
25     flowcell_id = 'D07K6ACXX'
26     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
27
28     runfolder_dir = os.path.join(temp_dir,
29                                  '110815_SN787_0101_A{0}'.format(flowcell_id))
30     os.mkdir(runfolder_dir)
31
32     data_dir = os.path.join(runfolder_dir, 'Data')
33     os.mkdir(data_dir)
34
35     intensities_dir = make_rta_intensities_1_12(data_dir)
36     make_status_rta1_12(data_dir)
37
38     basecalls_dir = make_rta_basecalls_1_12(intensities_dir)
39     make_matrix_dir_rta_1_12(basecalls_dir)
40
41     unaligned_dir = os.path.join(runfolder_dir, "Unaligned")
42     os.mkdir(unaligned_dir)
43     make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
44     make_unaligned_config_1_12(unaligned_dir)
45
46     aligned_dir = os.path.join(runfolder_dir, "Aligned")
47     os.mkdir(aligned_dir)
48     make_aligned_eland_export(aligned_dir, flowcell_id)
49     make_aligned_config_1_12(aligned_dir)
50
51     if obj is not None:
52         obj.temp_dir = temp_dir
53         obj.runfolder_dir = runfolder_dir
54         obj.data_dir = data_dir
55         obj.image_analysis_dir = intensities_dir
56         obj.bustard_dir = unaligned_dir
57         obj.gerald_dir = aligned_dir
58
59
60 class RunfolderTests(unittest.TestCase):
61     """
62     Test components of the runfolder processing code
63     which includes firecrest, bustard, and gerald
64     """
65     def setUp(self):
66         # attaches all the directories to the object passed in
67         make_runfolder(self)
68
69     def tearDown(self):
70         shutil.rmtree(self.temp_dir)
71
72     def test_bustard(self):
73         """Construct a bustard object"""
74         b = bustard.bustard(self.bustard_dir)
75         self.failUnlessEqual(b.software, 'RTA')
76         self.failUnlessEqual(b.version, '1.12.4.2')
77         self.failUnlessEqual(b.date,    None)
78         self.failUnlessEqual(b.user,    None)
79         self.failUnlessEqual(len(b.phasing), 0)
80
81         xml = b.get_elements()
82         b2 = bustard.Bustard(xml=xml)
83         self.failUnlessEqual(b.software, b2.software)
84         self.failUnlessEqual(b.version,  b2.version)
85         self.failUnlessEqual(b.date,     b2.date )
86         self.failUnlessEqual(b.user,     b2.user)
87
88     def test_gerald(self):
89         # need to update gerald and make tests for it
90         g = gerald.gerald(self.gerald_dir)
91
92         self.failUnlessEqual(g.software, 'CASAVA')
93         self.failUnlessEqual(g.version, '1.8.1')
94         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
95         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
96
97         # list of genomes, matches what was defined up in
98         # make_gerald_config.
99         # the first None is to offset the genomes list to be 1..9
100         # instead of pythons default 0..8
101         # test lane specific parameters from gerald config file
102
103         undetermined = g.lanes['Undetermined_indices']
104         self.failUnlessEqual(undetermined.analysis, 'none')
105         self.failUnlessEqual(undetermined.read_length, None)
106         self.failUnlessEqual(undetermined.use_bases, None)
107
108         project = g.lanes['12383']
109         self.failUnlessEqual(project.analysis, 'eland_extended')
110         self.failUnlessEqual(project.eland_genome, '/g/hg18/chromosomes/')
111         self.failUnlessEqual(project.read_length, '49')
112         self.failUnlessEqual(project.use_bases, 'y'*49+'n')
113
114         # test data extracted from summary file
115         clusters = [None,
116                     (3878755,  579626.0), (3920639, 1027332.4),
117                     (5713049,  876187.3), (5852907,  538640.6),
118                     (4006751, 1265247.4), (5678021,  627070.7),
119                     (1854131,  429053.2), (4777517,  592904.0),
120                    ]
121
122         self.failUnlessEqual(len(g.summary), 2)
123         for i in range(1,9):
124             summary_lane = g.summary[0][i]
125             self.failUnlessEqual(summary_lane.cluster, clusters[i])
126             self.failUnlessEqual(summary_lane.lane, i)
127
128         xml = g.get_elements()
129         # just make sure that element tree can serialize the tree
130         xml_str = ElementTree.tostring(xml)
131         g2 = gerald.Gerald(xml=xml)
132         return
133
134         # do it all again after extracting from the xml file
135         self.failUnlessEqual(g.software, g2.software)
136         self.failUnlessEqual(g.version, g2.version)
137         self.failUnlessEqual(g.date, g2.date)
138         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
139         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
140
141         # test lane specific parameters from gerald config file
142         for i in range(1,9):
143             g_lane = g.lanes[i]
144             g2_lane = g2.lanes[i]
145             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
146             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
147             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
148             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
149
150         # test (some) summary elements
151         self.failUnlessEqual(len(g.summary), 1)
152         for i in range(1,9):
153             g_summary = g.summary[0][i]
154             g2_summary = g2.summary[0][i]
155             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
156             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
157
158             g_eland = g.eland_results
159             g2_eland = g2.eland_results
160             for lane in g_eland.results[0].keys():
161                 g_results = g_eland.results[0][lane]
162                 g2_results = g2_eland.results[0][lane]
163                 self.failUnlessEqual(g_results.reads,
164                                      g2_results.reads)
165                 if isinstance(g_results, eland.ElandLane):
166                   self.failUnlessEqual(len(g_results.mapped_reads),
167                                        len(g2_results.mapped_reads))
168                   for k in g_results.mapped_reads.keys():
169                       self.failUnlessEqual(g_results.mapped_reads[k],
170                                            g2_results.mapped_reads[k])
171
172                   self.failUnlessEqual(len(g_results.match_codes),
173                                        len(g2_results.match_codes))
174                   for k in g_results.match_codes.keys():
175                       self.failUnlessEqual(g_results.match_codes[k],
176                                            g2_results.match_codes[k])
177
178
179     def test_eland(self):
180         return
181         hg_map = {'Lambda.fa': 'Lambda.fa'}
182         for i in range(1,22):
183           short_name = 'chr%d.fa' % (i,)
184           long_name = 'hg18/chr%d.fa' % (i,)
185           hg_map[short_name] = long_name
186
187         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
188                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
189         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
190
191         # I added sequence lanes to the last 2 lanes of this test case
192         for i in range(1,7):
193             lane = eland_container.results[0][i]
194             self.failUnlessEqual(lane.reads, 6)
195             self.failUnlessEqual(lane.sample_name, "s")
196             self.failUnlessEqual(lane.lane_id, i)
197             self.failUnlessEqual(len(lane.mapped_reads), 17)
198             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
199             self.failUnlessEqual(lane.match_codes['U0'], 3)
200             self.failUnlessEqual(lane.match_codes['R0'], 2)
201             self.failUnlessEqual(lane.match_codes['U1'], 1)
202             self.failUnlessEqual(lane.match_codes['R1'], 9)
203             self.failUnlessEqual(lane.match_codes['U2'], 0)
204             self.failUnlessEqual(lane.match_codes['R2'], 12)
205             self.failUnlessEqual(lane.match_codes['NM'], 1)
206             self.failUnlessEqual(lane.match_codes['QC'], 0)
207
208         # test scarf
209         lane = eland_container.results[0][7]
210         self.failUnlessEqual(lane.reads, 5)
211         self.failUnlessEqual(lane.sample_name, 's')
212         self.failUnlessEqual(lane.lane_id, 7)
213         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
214
215         # test fastq
216         lane = eland_container.results[0][8]
217         self.failUnlessEqual(lane.reads, 3)
218         self.failUnlessEqual(lane.sample_name, 's')
219         self.failUnlessEqual(lane.lane_id, 8)
220         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
221
222         xml = eland_container.get_elements()
223         # just make sure that element tree can serialize the tree
224         xml_str = ElementTree.tostring(xml)
225         e2 = gerald.ELAND(xml=xml)
226
227         for i in range(1,9):
228             l1 = eland_container.results[0][i]
229             l2 = e2.results[0][i]
230             self.failUnlessEqual(l1.reads, l2.reads)
231             self.failUnlessEqual(l1.sample_name, l2.sample_name)
232             self.failUnlessEqual(l1.lane_id, l2.lane_id)
233             if isinstance(l1, eland.ElandLane):
234               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
235               self.failUnlessEqual(len(l1.mapped_reads), 17)
236               for k in l1.mapped_reads.keys():
237                   self.failUnlessEqual(l1.mapped_reads[k],
238                                        l2.mapped_reads[k])
239
240               self.failUnlessEqual(len(l1.match_codes), 9)
241               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
242               for k in l1.match_codes.keys():
243                   self.failUnlessEqual(l1.match_codes[k],
244                                        l2.match_codes[k])
245             elif isinstance(l1, eland.SequenceLane):
246                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
247
248     def test_runfolder(self):
249         return
250         runs = runfolder.get_runs(self.runfolder_dir)
251
252         # do we get the flowcell id from the filename?
253         self.failUnlessEqual(len(runs), 1)
254         name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
255         self.failUnlessEqual(runs[0].name, name)
256
257         # do we get the flowcell id from the FlowcellId.xml file
258         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
259         runs = runfolder.get_runs(self.runfolder_dir)
260         self.failUnlessEqual(len(runs), 1)
261         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
262         self.failUnlessEqual(runs[0].name, name)
263
264         r1 = runs[0]
265         xml = r1.get_elements()
266         xml_str = ElementTree.tostring(xml)
267
268         r2 = runfolder.PipelineRun(xml=xml)
269         self.failUnlessEqual(r1.name, r2.name)
270         self.failIfEqual(r2.image_analysis, None)
271         self.failIfEqual(r2.bustard, None)
272         self.failIfEqual(r2.gerald, None)
273
274
275 def suite():
276     return unittest.makeSuite(RunfolderTests,'test')
277
278 if __name__ == "__main__":
279     logging.basicConfig(level=logging.WARN)
280     unittest.main(defaultTest="suite")
281