Add a software (name) property to firecrest, ipar, bustard, gerald
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta160.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import os
5 import tempfile
6 import shutil
7 import unittest
8
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.runfolder import ElementTree
15
16 from htsworkflow.pipelines.test.simulate_runfolder import *
17
18
19 def make_runfolder(obj=None):
20     """
21     Make a fake runfolder, attach all the directories to obj if defined
22     """
23     # make a fake runfolder directory
24     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
25
26     runfolder_dir = os.path.join(temp_dir,
27                                  '090608_HWI-EAS229_0117_4286GAAXX')
28     os.mkdir(runfolder_dir)
29
30     data_dir = os.path.join(runfolder_dir, 'Data')
31     os.mkdir(data_dir)
32
33     intensities_dir = make_rta_intensities_1460(data_dir)
34
35     basecalls_dir = make_rta_basecalls_1460(intensities_dir)
36
37     #make_phasing_params(bustard_dir)
38     #make_bustard_config132(bustard_dir)
39
40     gerald_dir = os.path.join(basecalls_dir,
41                               'GERALD_16-06-2009_diane')
42     os.mkdir(gerald_dir)
43     make_gerald_config_100(gerald_dir)
44     make_summary_rta160_xml(gerald_dir)
45     make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
46     make_scarf(gerald_dir, lane_list=[7,])
47     make_fastq(gerald_dir, lane_list=[8,])
48
49     if obj is not None:
50         obj.temp_dir = temp_dir
51         obj.runfolder_dir = runfolder_dir
52         obj.data_dir = data_dir
53         obj.image_analysis_dir = intensities_dir
54         obj.bustard_dir = basecalls_dir
55         obj.gerald_dir = gerald_dir
56
57
58 class RunfolderTests(unittest.TestCase):
59     """
60     Test components of the runfolder processing code
61     which includes firecrest, bustard, and gerald
62     """
63     def setUp(self):
64         # attaches all the directories to the object passed in
65         make_runfolder(self)
66
67     def tearDown(self):
68         shutil.rmtree(self.temp_dir)
69
70     # The only thing different from the previous RTA version is
71     # I'm processing the Summary.xml file
72
73
74     def test_gerald(self):
75         # need to update gerald and make tests for it
76         g = gerald.gerald(self.gerald_dir)
77
78         self.failUnlessEqual(g.software, 'GERALD')
79         self.failUnlessEqual(g.version, '1.171')
80         self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
81         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
82         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
83
84
85         # list of genomes, matches what was defined up in
86         # make_gerald_config.
87         # the first None is to offset the genomes list to be 1..9
88         # instead of pythons default 0..8
89         genomes = [None,
90                    '/g/mm9',
91                    '/g/mm9',
92                    '/g/elegans190',
93                    '/g/arabidopsis01222004',
94                    '/g/mm9',
95                    '/g/mm9',
96                    '/g/mm9',
97                    '/g/mm9', ]
98
99         # test lane specific parameters from gerald config file
100         for i in range(1,9):
101             cur_lane = g.lanes[i]
102             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
103             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
104             self.failUnlessEqual(cur_lane.read_length, '37')
105             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
106
107         # I want to be able to use a simple iterator
108         for l in g.lanes.values():
109           self.failUnlessEqual(l.analysis, 'eland_extended')
110           self.failUnlessEqual(l.read_length, '37')
111           self.failUnlessEqual(l.use_bases, 'Y'*37)
112
113         # test data extracted from summary file
114         clusters = [None,
115                     (281331, 11169), (203841, 13513),
116                     (220889, 15653), (137294, 14666),
117                     (129388, 14525), (262092, 10751),
118                     (185754, 13503), (233765, 9537),]
119
120         self.failUnlessEqual(len(g.summary), 1)
121         for i in range(1,9):
122             summary_lane = g.summary[0][i]
123             self.failUnlessEqual(summary_lane.cluster, clusters[i])
124             self.failUnlessEqual(summary_lane.lane, i)
125
126         xml = g.get_elements()
127         # just make sure that element tree can serialize the tree
128         xml_str = ElementTree.tostring(xml)
129         g2 = gerald.Gerald(xml=xml)
130         return
131
132         # do it all again after extracting from the xml file
133         self.failUnlessEqual(g.software, g2.software)
134         self.failUnlessEqual(g.version, g2.version)
135         self.failUnlessEqual(g.date, g2.date)
136         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
137         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
138
139         # test lane specific parameters from gerald config file
140         for i in range(1,9):
141             g_lane = g.lanes[i]
142             g2_lane = g2.lanes[i]
143             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
144             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
145             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
146             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
147
148         # test (some) summary elements
149         self.failUnlessEqual(len(g.summary), 1)
150         for i in range(1,9):
151             g_summary = g.summary[0][i]
152             g2_summary = g2.summary[0][i]
153             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
154             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
155
156             g_eland = g.eland_results
157             g2_eland = g2.eland_results
158             for lane in g_eland.results[0].keys():
159                 g_results = g_eland.results[0][lane]
160                 g2_results = g2_eland.results[0][lane]
161                 self.failUnlessEqual(g_results.reads,
162                                      g2_results.reads)
163                 if isinstance(g_results, eland.ElandLane):
164                   self.failUnlessEqual(len(g_results.mapped_reads),
165                                        len(g2_results.mapped_reads))
166                   for k in g_results.mapped_reads.keys():
167                       self.failUnlessEqual(g_results.mapped_reads[k],
168                                            g2_results.mapped_reads[k])
169
170                   self.failUnlessEqual(len(g_results.match_codes),
171                                        len(g2_results.match_codes))
172                   for k in g_results.match_codes.keys():
173                       self.failUnlessEqual(g_results.match_codes[k],
174                                            g2_results.match_codes[k])
175
176
177     def test_eland(self):
178         return
179         hg_map = {'Lambda.fa': 'Lambda.fa'}
180         for i in range(1,22):
181           short_name = 'chr%d.fa' % (i,)
182           long_name = 'hg18/chr%d.fa' % (i,)
183           hg_map[short_name] = long_name
184
185         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
186                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
187         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
188
189         # I added sequence lanes to the last 2 lanes of this test case
190         for i in range(1,7):
191             lane = eland_container.results[0][i]
192             self.failUnlessEqual(lane.reads, 6)
193             self.failUnlessEqual(lane.sample_name, "s")
194             self.failUnlessEqual(lane.lane_id, i)
195             self.failUnlessEqual(len(lane.mapped_reads), 17)
196             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
197             self.failUnlessEqual(lane.match_codes['U0'], 3)
198             self.failUnlessEqual(lane.match_codes['R0'], 2)
199             self.failUnlessEqual(lane.match_codes['U1'], 1)
200             self.failUnlessEqual(lane.match_codes['R1'], 9)
201             self.failUnlessEqual(lane.match_codes['U2'], 0)
202             self.failUnlessEqual(lane.match_codes['R2'], 12)
203             self.failUnlessEqual(lane.match_codes['NM'], 1)
204             self.failUnlessEqual(lane.match_codes['QC'], 0)
205
206         # test scarf
207         lane = eland_container.results[0][7]
208         self.failUnlessEqual(lane.reads, 5)
209         self.failUnlessEqual(lane.sample_name, 's')
210         self.failUnlessEqual(lane.lane_id, 7)
211         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
212
213         # test fastq
214         lane = eland_container.results[0][8]
215         self.failUnlessEqual(lane.reads, 3)
216         self.failUnlessEqual(lane.sample_name, 's')
217         self.failUnlessEqual(lane.lane_id, 8)
218         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
219
220         xml = eland_container.get_elements()
221         # just make sure that element tree can serialize the tree
222         xml_str = ElementTree.tostring(xml)
223         e2 = gerald.ELAND(xml=xml)
224
225         for i in range(1,9):
226             l1 = eland_container.results[0][i]
227             l2 = e2.results[0][i]
228             self.failUnlessEqual(l1.reads, l2.reads)
229             self.failUnlessEqual(l1.sample_name, l2.sample_name)
230             self.failUnlessEqual(l1.lane_id, l2.lane_id)
231             if isinstance(l1, eland.ElandLane):
232               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
233               self.failUnlessEqual(len(l1.mapped_reads), 17)
234               for k in l1.mapped_reads.keys():
235                   self.failUnlessEqual(l1.mapped_reads[k],
236                                        l2.mapped_reads[k])
237
238               self.failUnlessEqual(len(l1.match_codes), 9)
239               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
240               for k in l1.match_codes.keys():
241                   self.failUnlessEqual(l1.match_codes[k],
242                                        l2.match_codes[k])
243             elif isinstance(l1, eland.SequenceLane):
244                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
245
246     def test_runfolder(self):
247         return
248         runs = runfolder.get_runs(self.runfolder_dir)
249
250         # do we get the flowcell id from the filename?
251         self.failUnlessEqual(len(runs), 1)
252         name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
253         self.failUnlessEqual(runs[0].name, name)
254
255         # do we get the flowcell id from the FlowcellId.xml file
256         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
257         runs = runfolder.get_runs(self.runfolder_dir)
258         self.failUnlessEqual(len(runs), 1)
259         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
260         self.failUnlessEqual(runs[0].name, name)
261
262         r1 = runs[0]
263         xml = r1.get_elements()
264         xml_str = ElementTree.tostring(xml)
265
266         r2 = runfolder.PipelineRun(xml=xml)
267         self.failUnlessEqual(r1.name, r2.name)
268         self.failIfEqual(r2.image_analysis, None)
269         self.failIfEqual(r2.bustard, None)
270         self.failIfEqual(r2.gerald, None)
271
272
273 def suite():
274     return unittest.makeSuite(RunfolderTests,'test')
275
276 if __name__ == "__main__":
277     unittest.main(defaultTest="suite")
278