take advantage of absolute_import to simplify import statements
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta160.py
1 #!/usr/bin/env python
2 from __future__ import absolute_import
3
4 from datetime import datetime, date
5 import os
6 import tempfile
7 import shutil
8 from unittest import TestCase
9
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines import ipar
12 from htsworkflow.pipelines import bustard
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines.samplekey import SampleKey
16 from htsworkflow.pipelines import ElementTree
17
18 from .simulate_runfolder import *
19
20
21 def make_runfolder(obj=None):
22     """
23     Make a fake runfolder, attach all the directories to obj if defined
24     """
25     # make a fake runfolder directory
26     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
27
28     runfolder_dir = os.path.join(temp_dir,
29                                  '090608_HWI-EAS229_0117_4286GAAXX')
30     os.mkdir(runfolder_dir)
31
32     data_dir = os.path.join(runfolder_dir, 'Data')
33     os.mkdir(data_dir)
34
35     intensities_dir = make_rta_intensities_1460(data_dir)
36
37     basecalls_dir = make_rta_basecalls_1460(intensities_dir)
38
39     #make_phasing_params(bustard_dir)
40     #make_bustard_config132(bustard_dir)
41
42     gerald_dir = os.path.join(basecalls_dir,
43                               'GERALD_16-06-2009_diane')
44     os.mkdir(gerald_dir)
45     make_gerald_config_100(gerald_dir)
46     make_summary_rta160_xml(gerald_dir)
47     make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
48     make_scarf(gerald_dir, lane_list=[7,])
49     make_fastq(gerald_dir, lane_list=[8,])
50
51     if obj is not None:
52         obj.temp_dir = temp_dir
53         obj.runfolder_dir = runfolder_dir
54         obj.data_dir = data_dir
55         obj.image_analysis_dir = intensities_dir
56         obj.bustard_dir = basecalls_dir
57         obj.gerald_dir = gerald_dir
58
59
60 class RunfolderTests(TestCase):
61     """
62     Test components of the runfolder processing code
63     which includes firecrest, bustard, and gerald
64     """
65     def setUp(self):
66         # attaches all the directories to the object passed in
67         make_runfolder(self)
68
69     def tearDown(self):
70         shutil.rmtree(self.temp_dir)
71
72     # The only thing different from the previous RTA version is
73     # I'm processing the Summary.xml file
74
75
76     def test_gerald(self):
77         # need to update gerald and make tests for it
78         g = gerald.gerald(self.gerald_dir)
79
80         self.failUnlessEqual(g.software, 'GERALD')
81         self.failUnlessEqual(g.version, '1.171')
82         self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
83         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
84         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
85
86
87         # list of genomes, matches what was defined up in
88         # make_gerald_config.
89         # the first None is to offset the genomes list to be 1..9
90         # instead of pythons default 0..8
91         genomes = [None,
92                    '/g/mm9',
93                    '/g/mm9',
94                    '/g/elegans190',
95                    '/g/arabidopsis01222004',
96                    '/g/mm9',
97                    '/g/mm9',
98                    '/g/mm9',
99                    '/g/mm9', ]
100
101         # test lane specific parameters from gerald config file
102         for i in range(1,9):
103             cur_lane = g.lanes[i]
104             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
105             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
106             self.failUnlessEqual(cur_lane.read_length, '37')
107             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
108
109         # I want to be able to use a simple iterator
110         for l in g.lanes.values():
111           self.failUnlessEqual(l.analysis, 'eland_extended')
112           self.failUnlessEqual(l.read_length, '37')
113           self.failUnlessEqual(l.use_bases, 'Y'*37)
114
115         # test data extracted from summary file
116         clusters = [None,
117                     (281331, 11169), (203841, 13513),
118                     (220889, 15653), (137294, 14666),
119                     (129388, 14525), (262092, 10751),
120                     (185754, 13503), (233765, 9537),]
121
122         self.failUnlessEqual(len(g.summary), 1)
123         for i in range(1,9):
124             summary_lane = g.summary[0][i]
125             self.failUnlessEqual(summary_lane.cluster, clusters[i])
126             self.failUnlessEqual(summary_lane.lane, i)
127
128         xml = g.get_elements()
129         # just make sure that element tree can serialize the tree
130         xml_str = ElementTree.tostring(xml)
131         g2 = gerald.Gerald(xml=xml)
132
133         # do it all again after extracting from the xml file
134         self.failUnlessEqual(g.software, g2.software)
135         self.failUnlessEqual(g.version, g2.version)
136         self.failUnlessEqual(g.date, g2.date)
137         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
138         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
139
140         # test lane specific parameters from gerald config file
141         for i in range(1,9):
142             g_lane = g.lanes[i]
143             g2_lane = g2.lanes[i]
144             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
145             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
146             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
147             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
148
149         # test (some) summary elements
150         self.failUnlessEqual(len(g.summary), 1)
151         for i in range(1,9):
152             g_summary = g.summary[0][i]
153             g2_summary = g2.summary[0][i]
154             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
155             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
156
157             g_eland = g.eland_results
158             g2_eland = g2.eland_results
159             for key in g_eland:
160                 g_results = g_eland[key]
161                 g2_results = g2_eland[key]
162                 self.failUnlessEqual(g_results.reads,
163                                      g2_results.reads)
164                 if isinstance(g_results, eland.ElandLane):
165                   self.failUnlessEqual(len(g_results.mapped_reads),
166                                        len(g2_results.mapped_reads))
167                   for k in g_results.mapped_reads.keys():
168                       self.failUnlessEqual(g_results.mapped_reads[k],
169                                            g2_results.mapped_reads[k])
170
171                   self.failUnlessEqual(len(g_results.match_codes),
172                                        len(g2_results.match_codes))
173                   for k in g_results.match_codes.keys():
174                       self.failUnlessEqual(g_results.match_codes[k],
175                                            g2_results.match_codes[k])
176
177
178     def test_eland(self):
179         hg_map = {'Lambda.fa': 'Lambda.fa'}
180         for i in range(1,22):
181           short_name = 'chr%d.fa' % (i,)
182           long_name = 'hg18/chr%d.fa' % (i,)
183           hg_map[short_name] = long_name
184
185         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
186                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
187         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
188
189         # I added sequence lanes to the last 2 lanes of this test case
190         keys = [ SampleKey(lane=i, read=1, sample='s') for i in range(1,7)]
191         for key in keys:
192             lane = eland_container[key]
193             self.failUnlessEqual(lane.reads, 6)
194             self.failUnlessEqual(lane.sample_name, "s")
195             self.failUnlessEqual(lane.lane_id, key.lane)
196             self.failUnlessEqual(len(lane.mapped_reads), 17)
197             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
198             self.failUnlessEqual(lane.match_codes['U0'], 3)
199             self.failUnlessEqual(lane.match_codes['R0'], 2)
200             self.failUnlessEqual(lane.match_codes['U1'], 1)
201             self.failUnlessEqual(lane.match_codes['R1'], 9)
202             self.failUnlessEqual(lane.match_codes['U2'], 0)
203             self.failUnlessEqual(lane.match_codes['R2'], 12)
204             self.failUnlessEqual(lane.match_codes['NM'], 1)
205             self.failUnlessEqual(lane.match_codes['QC'], 0)
206
207         # test scarf
208         lane = eland_container[SampleKey(lane=7, read=1, sample='s')]
209         self.failUnlessEqual(lane.reads, 5)
210         self.failUnlessEqual(lane.sample_name, 's')
211         self.failUnlessEqual(lane.lane_id, 7)
212         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
213
214         # test fastq
215         lane = eland_container[SampleKey(lane=8, read=1, sample='s')]
216         self.failUnlessEqual(lane.reads, 3)
217         self.failUnlessEqual(lane.sample_name, 's')
218         self.failUnlessEqual(lane.lane_id, 8)
219         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
220
221         xml = eland_container.get_elements()
222         # just make sure that element tree can serialize the tree
223         xml_str = ElementTree.tostring(xml)
224         e2 = gerald.ELAND(xml=xml)
225
226         for key in eland_container:
227             l1 = eland_container[key]
228             l2 = e2[key]
229             self.failUnlessEqual(l1.reads, l2.reads)
230             self.failUnlessEqual(l1.sample_name, l2.sample_name)
231             self.failUnlessEqual(l1.lane_id, l2.lane_id)
232             if isinstance(l1, eland.ElandLane):
233               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
234               self.failUnlessEqual(len(l1.mapped_reads), 17)
235               for k in l1.mapped_reads.keys():
236                   self.failUnlessEqual(l1.mapped_reads[k],
237                                        l2.mapped_reads[k])
238
239               self.failUnlessEqual(len(l1.match_codes), 9)
240               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
241               for k in l1.match_codes.keys():
242                   self.failUnlessEqual(l1.match_codes[k],
243                                        l2.match_codes[k])
244             elif isinstance(l1, eland.SequenceLane):
245                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
246
247     def test_runfolder(self):
248         runs = runfolder.get_runs(self.runfolder_dir)
249
250         # do we get the flowcell id from the filename?
251         self.failUnlessEqual(len(runs), 1)
252         name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
253         self.failUnlessEqual(runs[0].serialization_filename, name)
254
255         # do we get the flowcell id from the FlowcellId.xml file
256         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
257         runs = runfolder.get_runs(self.runfolder_dir)
258         self.failUnlessEqual(len(runs), 1)
259         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
260         self.failUnlessEqual(runs[0].serialization_filename, name)
261
262         bustard_dir = os.path.join(self.runfolder_dir, 'Data',
263                                    'Intensities', 'BaseCalls')
264         r1 = runs[0]
265         xml = r1.get_elements()
266         xml_str = ElementTree.tostring(xml)
267         self.failUnlessEqual(r1.bustard.sequence_format, 'qseq')
268         self.failUnlessEqual(r1.bustard.pathname, bustard_dir)
269         self.failUnlessEqual(r1.gerald.runfolder_name,
270                              '090220_HWI-EAS229_0093_30VR0AAXX')
271
272         r2 = runfolder.PipelineRun(xml=xml)
273         self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
274         self.failIfEqual(r2.image_analysis, None)
275         self.failIfEqual(r2.bustard, None)
276         self.failIfEqual(r2.gerald, None)
277
278
279 def suite():
280     from unittest import TestSuite, defaultTestLoader
281     suite = TestSuite()
282     suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
283     return suite
284
285
286 if __name__ == "__main__":
287     from unittest import main
288     main(defaultTest="suite")