Change unittest2 back into unittest.
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta160.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import os
5 import tempfile
6 import shutil
7 from unittest import TestCase
8
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.samplekey import SampleKey
15 from htsworkflow.pipelines import ElementTree
16
17 from htsworkflow.pipelines.test.simulate_runfolder import *
18
19
20 def make_runfolder(obj=None):
21     """
22     Make a fake runfolder, attach all the directories to obj if defined
23     """
24     # make a fake runfolder directory
25     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
26
27     runfolder_dir = os.path.join(temp_dir,
28                                  '090608_HWI-EAS229_0117_4286GAAXX')
29     os.mkdir(runfolder_dir)
30
31     data_dir = os.path.join(runfolder_dir, 'Data')
32     os.mkdir(data_dir)
33
34     intensities_dir = make_rta_intensities_1460(data_dir)
35
36     basecalls_dir = make_rta_basecalls_1460(intensities_dir)
37
38     #make_phasing_params(bustard_dir)
39     #make_bustard_config132(bustard_dir)
40
41     gerald_dir = os.path.join(basecalls_dir,
42                               'GERALD_16-06-2009_diane')
43     os.mkdir(gerald_dir)
44     make_gerald_config_100(gerald_dir)
45     make_summary_rta160_xml(gerald_dir)
46     make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
47     make_scarf(gerald_dir, lane_list=[7,])
48     make_fastq(gerald_dir, lane_list=[8,])
49
50     if obj is not None:
51         obj.temp_dir = temp_dir
52         obj.runfolder_dir = runfolder_dir
53         obj.data_dir = data_dir
54         obj.image_analysis_dir = intensities_dir
55         obj.bustard_dir = basecalls_dir
56         obj.gerald_dir = gerald_dir
57
58
59 class RunfolderTests(TestCase):
60     """
61     Test components of the runfolder processing code
62     which includes firecrest, bustard, and gerald
63     """
64     def setUp(self):
65         # attaches all the directories to the object passed in
66         make_runfolder(self)
67
68     def tearDown(self):
69         shutil.rmtree(self.temp_dir)
70
71     # The only thing different from the previous RTA version is
72     # I'm processing the Summary.xml file
73
74
75     def test_gerald(self):
76         # need to update gerald and make tests for it
77         g = gerald.gerald(self.gerald_dir)
78
79         self.failUnlessEqual(g.software, 'GERALD')
80         self.failUnlessEqual(g.version, '1.171')
81         self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
82         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
83         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
84
85
86         # list of genomes, matches what was defined up in
87         # make_gerald_config.
88         # the first None is to offset the genomes list to be 1..9
89         # instead of pythons default 0..8
90         genomes = [None,
91                    '/g/mm9',
92                    '/g/mm9',
93                    '/g/elegans190',
94                    '/g/arabidopsis01222004',
95                    '/g/mm9',
96                    '/g/mm9',
97                    '/g/mm9',
98                    '/g/mm9', ]
99
100         # test lane specific parameters from gerald config file
101         for i in range(1,9):
102             cur_lane = g.lanes[i]
103             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
104             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
105             self.failUnlessEqual(cur_lane.read_length, '37')
106             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
107
108         # I want to be able to use a simple iterator
109         for l in g.lanes.values():
110           self.failUnlessEqual(l.analysis, 'eland_extended')
111           self.failUnlessEqual(l.read_length, '37')
112           self.failUnlessEqual(l.use_bases, 'Y'*37)
113
114         # test data extracted from summary file
115         clusters = [None,
116                     (281331, 11169), (203841, 13513),
117                     (220889, 15653), (137294, 14666),
118                     (129388, 14525), (262092, 10751),
119                     (185754, 13503), (233765, 9537),]
120
121         self.failUnlessEqual(len(g.summary), 1)
122         for i in range(1,9):
123             summary_lane = g.summary[0][i]
124             self.failUnlessEqual(summary_lane.cluster, clusters[i])
125             self.failUnlessEqual(summary_lane.lane, i)
126
127         xml = g.get_elements()
128         # just make sure that element tree can serialize the tree
129         xml_str = ElementTree.tostring(xml)
130         g2 = gerald.Gerald(xml=xml)
131
132         # do it all again after extracting from the xml file
133         self.failUnlessEqual(g.software, g2.software)
134         self.failUnlessEqual(g.version, g2.version)
135         self.failUnlessEqual(g.date, g2.date)
136         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
137         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
138
139         # test lane specific parameters from gerald config file
140         for i in range(1,9):
141             g_lane = g.lanes[i]
142             g2_lane = g2.lanes[i]
143             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
144             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
145             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
146             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
147
148         # test (some) summary elements
149         self.failUnlessEqual(len(g.summary), 1)
150         for i in range(1,9):
151             g_summary = g.summary[0][i]
152             g2_summary = g2.summary[0][i]
153             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
154             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
155
156             g_eland = g.eland_results
157             g2_eland = g2.eland_results
158             for key in g_eland:
159                 g_results = g_eland[key]
160                 g2_results = g2_eland[key]
161                 self.failUnlessEqual(g_results.reads,
162                                      g2_results.reads)
163                 if isinstance(g_results, eland.ElandLane):
164                   self.failUnlessEqual(len(g_results.mapped_reads),
165                                        len(g2_results.mapped_reads))
166                   for k in g_results.mapped_reads.keys():
167                       self.failUnlessEqual(g_results.mapped_reads[k],
168                                            g2_results.mapped_reads[k])
169
170                   self.failUnlessEqual(len(g_results.match_codes),
171                                        len(g2_results.match_codes))
172                   for k in g_results.match_codes.keys():
173                       self.failUnlessEqual(g_results.match_codes[k],
174                                            g2_results.match_codes[k])
175
176
177     def test_eland(self):
178         hg_map = {'Lambda.fa': 'Lambda.fa'}
179         for i in range(1,22):
180           short_name = 'chr%d.fa' % (i,)
181           long_name = 'hg18/chr%d.fa' % (i,)
182           hg_map[short_name] = long_name
183
184         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
185                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
186         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
187
188         # I added sequence lanes to the last 2 lanes of this test case
189         keys = [ SampleKey(lane=i, read=1, sample='s') for i in range(1,7)]
190         for key in keys:
191             lane = eland_container[key]
192             self.failUnlessEqual(lane.reads, 6)
193             self.failUnlessEqual(lane.sample_name, "s")
194             self.failUnlessEqual(lane.lane_id, key.lane)
195             self.failUnlessEqual(len(lane.mapped_reads), 17)
196             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
197             self.failUnlessEqual(lane.match_codes['U0'], 3)
198             self.failUnlessEqual(lane.match_codes['R0'], 2)
199             self.failUnlessEqual(lane.match_codes['U1'], 1)
200             self.failUnlessEqual(lane.match_codes['R1'], 9)
201             self.failUnlessEqual(lane.match_codes['U2'], 0)
202             self.failUnlessEqual(lane.match_codes['R2'], 12)
203             self.failUnlessEqual(lane.match_codes['NM'], 1)
204             self.failUnlessEqual(lane.match_codes['QC'], 0)
205
206         # test scarf
207         lane = eland_container[SampleKey(lane=7, read=1, sample='s')]
208         self.failUnlessEqual(lane.reads, 5)
209         self.failUnlessEqual(lane.sample_name, 's')
210         self.failUnlessEqual(lane.lane_id, 7)
211         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
212
213         # test fastq
214         lane = eland_container[SampleKey(lane=8, read=1, sample='s')]
215         self.failUnlessEqual(lane.reads, 3)
216         self.failUnlessEqual(lane.sample_name, 's')
217         self.failUnlessEqual(lane.lane_id, 8)
218         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
219
220         xml = eland_container.get_elements()
221         # just make sure that element tree can serialize the tree
222         xml_str = ElementTree.tostring(xml)
223         e2 = gerald.ELAND(xml=xml)
224
225         for key in eland_container:
226             l1 = eland_container[key]
227             l2 = e2[key]
228             self.failUnlessEqual(l1.reads, l2.reads)
229             self.failUnlessEqual(l1.sample_name, l2.sample_name)
230             self.failUnlessEqual(l1.lane_id, l2.lane_id)
231             if isinstance(l1, eland.ElandLane):
232               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
233               self.failUnlessEqual(len(l1.mapped_reads), 17)
234               for k in l1.mapped_reads.keys():
235                   self.failUnlessEqual(l1.mapped_reads[k],
236                                        l2.mapped_reads[k])
237
238               self.failUnlessEqual(len(l1.match_codes), 9)
239               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
240               for k in l1.match_codes.keys():
241                   self.failUnlessEqual(l1.match_codes[k],
242                                        l2.match_codes[k])
243             elif isinstance(l1, eland.SequenceLane):
244                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
245
246     def test_runfolder(self):
247         runs = runfolder.get_runs(self.runfolder_dir)
248
249         # do we get the flowcell id from the filename?
250         self.failUnlessEqual(len(runs), 1)
251         name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
252         self.failUnlessEqual(runs[0].serialization_filename, name)
253
254         # do we get the flowcell id from the FlowcellId.xml file
255         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
256         runs = runfolder.get_runs(self.runfolder_dir)
257         self.failUnlessEqual(len(runs), 1)
258         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
259         self.failUnlessEqual(runs[0].serialization_filename, name)
260
261         bustard_dir = os.path.join(self.runfolder_dir, 'Data',
262                                    'Intensities', 'BaseCalls')
263         r1 = runs[0]
264         xml = r1.get_elements()
265         xml_str = ElementTree.tostring(xml)
266         self.failUnlessEqual(r1.bustard.sequence_format, 'qseq')
267         self.failUnlessEqual(r1.bustard.pathname, bustard_dir)
268         self.failUnlessEqual(r1.gerald.runfolder_name,
269                              '090220_HWI-EAS229_0093_30VR0AAXX')
270
271         r2 = runfolder.PipelineRun(xml=xml)
272         self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
273         self.failIfEqual(r2.image_analysis, None)
274         self.failIfEqual(r2.bustard, None)
275         self.failIfEqual(r2.gerald, None)
276
277
278 def suite():
279     from unittest import TestSuite, defaultTestLoader
280     suite = TestSuite()
281     suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
282     return suite
283
284
285 if __name__ == "__main__":
286     from unittest import main
287     main(defaultTest="suite")