Update summary script to read from the GERALD Summary.xml file
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta160.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import os
5 import tempfile
6 import shutil
7 import unittest
8
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.runfolder import ElementTree
15
16 from htsworkflow.pipelines.test.simulate_runfolder import *
17
18
19 def make_runfolder(obj=None):
20     """
21     Make a fake runfolder, attach all the directories to obj if defined
22     """
23     # make a fake runfolder directory
24     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
25
26     runfolder_dir = os.path.join(temp_dir,
27                                  '090608_HWI-EAS229_0117_4286GAAXX')
28     os.mkdir(runfolder_dir)
29
30     data_dir = os.path.join(runfolder_dir, 'Data')
31     os.mkdir(data_dir)
32
33     intensities_dir = make_rta_intensities_1460(data_dir)
34
35     basecalls_dir = make_rta_basecalls_1460(intensities_dir)
36
37     #make_phasing_params(bustard_dir)
38     #make_bustard_config132(bustard_dir)
39
40     gerald_dir = os.path.join(basecalls_dir,
41                               'GERALD_16-06-2009_diane')
42     os.mkdir(gerald_dir)
43     make_gerald_config_100(gerald_dir)
44     make_summary_rta160_xml(gerald_dir)
45     make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
46     make_scarf(gerald_dir, lane_list=[7,])
47     make_fastq(gerald_dir, lane_list=[8,])
48
49     if obj is not None:
50         obj.temp_dir = temp_dir
51         obj.runfolder_dir = runfolder_dir
52         obj.data_dir = data_dir
53         obj.image_analysis_dir = intensities_dir
54         obj.bustard_dir = basecalls_dir
55         obj.gerald_dir = gerald_dir
56
57
58 class RunfolderTests(unittest.TestCase):
59     """
60     Test components of the runfolder processing code
61     which includes firecrest, bustard, and gerald
62     """
63     def setUp(self):
64         # attaches all the directories to the object passed in
65         make_runfolder(self)
66
67     def tearDown(self):
68         shutil.rmtree(self.temp_dir)
69
70     # The only thing different from the previous RTA version is
71     # I'm processing the Summary.xml file
72
73     
74     def test_gerald(self):
75         # need to update gerald and make tests for it
76         g = gerald.gerald(self.gerald_dir)
77
78         self.failUnlessEqual(g.version,
79             '@(#) Id: GERALD.pl,v 1.171 2008/05/19 17:36:14 mzerara Exp')
80         self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
81         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
82         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
83
84
85         # list of genomes, matches what was defined up in
86         # make_gerald_config.
87         # the first None is to offset the genomes list to be 1..9
88         # instead of pythons default 0..8
89         genomes = [None, 
90                    '/g/mm9', 
91                    '/g/mm9', 
92                    '/g/elegans190', 
93                    '/g/arabidopsis01222004',
94                    '/g/mm9', 
95                    '/g/mm9', 
96                    '/g/mm9', 
97                    '/g/mm9', ]
98
99         # test lane specific parameters from gerald config file
100         for i in range(1,9):
101             cur_lane = g.lanes[i]
102             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
103             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
104             self.failUnlessEqual(cur_lane.read_length, '37')
105             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
106
107         # I want to be able to use a simple iterator
108         for l in g.lanes.values():
109           self.failUnlessEqual(l.analysis, 'eland_extended')
110           self.failUnlessEqual(l.read_length, '37')
111           self.failUnlessEqual(l.use_bases, 'Y'*37)
112
113         # test data extracted from summary file
114         clusters = [None,
115                     (281331, 11169), (203841, 13513),
116                     (220889, 15653), (137294, 14666),
117                     (129388, 14525), (262092, 10751),
118                     (185754, 13503), (233765, 9537),]
119
120         self.failUnlessEqual(len(g.summary), 1)
121         for i in range(1,9):
122             summary_lane = g.summary[0][i]
123             self.failUnlessEqual(summary_lane.cluster, clusters[i])
124             self.failUnlessEqual(summary_lane.lane, i)
125
126         xml = g.get_elements()
127         # just make sure that element tree can serialize the tree
128         xml_str = ElementTree.tostring(xml)
129         g2 = gerald.Gerald(xml=xml)
130         return
131
132         # do it all again after extracting from the xml file
133         self.failUnlessEqual(g.version, g2.version)
134         self.failUnlessEqual(g.date, g2.date)
135         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
136         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
137
138         # test lane specific parameters from gerald config file
139         for i in range(1,9):
140             g_lane = g.lanes[i]
141             g2_lane = g2.lanes[i]
142             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
143             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
144             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
145             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
146
147         # test (some) summary elements
148         self.failUnlessEqual(len(g.summary), 1)
149         for i in range(1,9):
150             g_summary = g.summary[0][i]
151             g2_summary = g2.summary[0][i]
152             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
153             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
154
155             g_eland = g.eland_results
156             g2_eland = g2.eland_results
157             for lane in g_eland.results[0].keys():
158                 g_results = g_eland.results[0][lane]
159                 g2_results = g2_eland.results[0][lane]
160                 self.failUnlessEqual(g_results.reads,
161                                      g2_results.reads)
162                 if isinstance(g_results, eland.ElandLane):
163                   self.failUnlessEqual(len(g_results.mapped_reads),
164                                        len(g2_results.mapped_reads))
165                   for k in g_results.mapped_reads.keys():
166                       self.failUnlessEqual(g_results.mapped_reads[k],
167                                            g2_results.mapped_reads[k])
168
169                   self.failUnlessEqual(len(g_results.match_codes),
170                                        len(g2_results.match_codes))
171                   for k in g_results.match_codes.keys():
172                       self.failUnlessEqual(g_results.match_codes[k],
173                                            g2_results.match_codes[k])
174
175
176     def test_eland(self):
177         return
178         hg_map = {'Lambda.fa': 'Lambda.fa'}
179         for i in range(1,22):
180           short_name = 'chr%d.fa' % (i,)
181           long_name = 'hg18/chr%d.fa' % (i,)
182           hg_map[short_name] = long_name
183
184         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
185                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
186         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
187
188         # I added sequence lanes to the last 2 lanes of this test case
189         for i in range(1,7):
190             lane = eland_container.results[0][i]
191             self.failUnlessEqual(lane.reads, 6)
192             self.failUnlessEqual(lane.sample_name, "s")
193             self.failUnlessEqual(lane.lane_id, i)
194             self.failUnlessEqual(len(lane.mapped_reads), 17)
195             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
196             self.failUnlessEqual(lane.match_codes['U0'], 3)
197             self.failUnlessEqual(lane.match_codes['R0'], 2)
198             self.failUnlessEqual(lane.match_codes['U1'], 1)
199             self.failUnlessEqual(lane.match_codes['R1'], 9)
200             self.failUnlessEqual(lane.match_codes['U2'], 0)
201             self.failUnlessEqual(lane.match_codes['R2'], 12)
202             self.failUnlessEqual(lane.match_codes['NM'], 1)
203             self.failUnlessEqual(lane.match_codes['QC'], 0)
204
205         # test scarf
206         lane = eland_container.results[0][7]
207         self.failUnlessEqual(lane.reads, 5)
208         self.failUnlessEqual(lane.sample_name, 's')
209         self.failUnlessEqual(lane.lane_id, 7)
210         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
211
212         # test fastq
213         lane = eland_container.results[0][8]
214         self.failUnlessEqual(lane.reads, 3)
215         self.failUnlessEqual(lane.sample_name, 's')
216         self.failUnlessEqual(lane.lane_id, 8)
217         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
218
219         xml = eland_container.get_elements()
220         # just make sure that element tree can serialize the tree
221         xml_str = ElementTree.tostring(xml)
222         e2 = gerald.ELAND(xml=xml)
223
224         for i in range(1,9):
225             l1 = eland_container.results[0][i]
226             l2 = e2.results[0][i]
227             self.failUnlessEqual(l1.reads, l2.reads)
228             self.failUnlessEqual(l1.sample_name, l2.sample_name)
229             self.failUnlessEqual(l1.lane_id, l2.lane_id)
230             if isinstance(l1, eland.ElandLane):
231               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
232               self.failUnlessEqual(len(l1.mapped_reads), 17)
233               for k in l1.mapped_reads.keys():
234                   self.failUnlessEqual(l1.mapped_reads[k],
235                                        l2.mapped_reads[k])
236
237               self.failUnlessEqual(len(l1.match_codes), 9)
238               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
239               for k in l1.match_codes.keys():
240                   self.failUnlessEqual(l1.match_codes[k],
241                                        l2.match_codes[k])
242             elif isinstance(l1, eland.SequenceLane):
243                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
244
245     def test_runfolder(self):
246         return
247         runs = runfolder.get_runs(self.runfolder_dir)
248
249         # do we get the flowcell id from the filename?
250         self.failUnlessEqual(len(runs), 1)
251         name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
252         self.failUnlessEqual(runs[0].name, name)
253
254         # do we get the flowcell id from the FlowcellId.xml file
255         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
256         runs = runfolder.get_runs(self.runfolder_dir)
257         self.failUnlessEqual(len(runs), 1)
258         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
259         self.failUnlessEqual(runs[0].name, name)
260
261         r1 = runs[0]
262         xml = r1.get_elements()
263         xml_str = ElementTree.tostring(xml)
264
265         r2 = runfolder.PipelineRun(xml=xml)
266         self.failUnlessEqual(r1.name, r2.name)
267         self.failIfEqual(r2.image_analysis, None)
268         self.failIfEqual(r2.bustard, None)
269         self.failIfEqual(r2.gerald, None)
270
271
272 def suite():
273     return unittest.makeSuite(RunfolderTests,'test')
274
275 if __name__ == "__main__":
276     unittest.main(defaultTest="suite")
277