Change unittest2 back into unittest.
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import os
5 import tempfile
6 import shutil
7 from unittest import TestCase
8
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.samplekey import SampleKey
15 from htsworkflow.pipelines import ElementTree
16
17 from htsworkflow.pipelines.test.simulate_runfolder import *
18
19
20 def make_runfolder(obj=None):
21     """
22     Make a fake runfolder, attach all the directories to obj if defined
23     """
24     # make a fake runfolder directory
25     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
26
27     runfolder_dir = os.path.join(temp_dir,
28                                  '090608_HWI-EAS229_0117_4286GAAXX')
29     os.mkdir(runfolder_dir)
30
31     data_dir = os.path.join(runfolder_dir, 'Data')
32     os.mkdir(data_dir)
33
34     intensities_dir = make_rta_intensities_1460(data_dir)
35
36     basecalls_dir = make_rta_basecalls_1460(intensities_dir)
37
38     #make_phasing_params(bustard_dir)
39     #make_bustard_config132(bustard_dir)
40
41     gerald_dir = os.path.join(basecalls_dir,
42                               'GERALD_16-06-2009_diane')
43     os.mkdir(gerald_dir)
44     make_gerald_config_100(gerald_dir)
45     make_summary_ipar130_htm(gerald_dir)
46     make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
47     make_scarf(gerald_dir, lane_list=[7,])
48     make_fastq(gerald_dir, lane_list=[8,])
49
50     if obj is not None:
51         obj.temp_dir = temp_dir
52         obj.runfolder_dir = runfolder_dir
53         obj.data_dir = data_dir
54         obj.image_analysis_dir = intensities_dir
55         obj.bustard_dir = basecalls_dir
56         obj.gerald_dir = gerald_dir
57
58
59 class RunfolderTests(TestCase):
60     """
61     Test components of the runfolder processing code
62     which includes firecrest, bustard, and gerald
63     """
64     def setUp(self):
65         # attaches all the directories to the object passed in
66         make_runfolder(self)
67
68     def tearDown(self):
69         shutil.rmtree(self.temp_dir)
70
71     def test_ipar(self):
72         """
73         Construct a firecrest object
74         """
75         i = ipar.ipar(self.image_analysis_dir)
76         self.failUnlessEqual(i.version, '1.4.6.0')
77         self.failUnlessEqual(i.start, 1)
78         self.failUnlessEqual(i.stop, 38)
79
80         xml = i.get_elements()
81         # just make sure that element tree can serialize the tree
82         xml_str = ElementTree.tostring(xml)
83
84         i2 = ipar.IPAR(xml=xml)
85         self.failUnlessEqual(i.version, i2.version)
86         self.failUnlessEqual(i.start,   i2.start)
87         self.failUnlessEqual(i.stop,    i2.stop)
88         self.failUnlessEqual(i.date,    i2.date)
89
90     def test_bustard(self):
91         """
92         construct a bustard object
93         """
94         b = bustard.bustard(self.bustard_dir)
95         self.failUnlessEqual(b.version, '1.4.6.0')
96         self.failUnlessEqual(b.date,    None)
97         self.failUnlessEqual(b.user,    None)
98         self.failUnlessEqual(len(b.phasing), 0)
99
100         xml = b.get_elements()
101         b2 = bustard.Bustard(xml=xml)
102         self.failUnlessEqual(b.version, b2.version)
103         self.failUnlessEqual(b.date,    b2.date )
104         self.failUnlessEqual(b.user,    b2.user)
105
106     def test_gerald(self):
107         # need to update gerald and make tests for it
108         g = gerald.gerald(self.gerald_dir)
109
110         self.failUnlessEqual(g.version, '1.171')
111         self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
112         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
113         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
114
115
116         # list of genomes, matches what was defined up in
117         # make_gerald_config.
118         # the first None is to offset the genomes list to be 1..9
119         # instead of pythons default 0..8
120         genomes = [None,
121                    '/g/mm9',
122                    '/g/mm9',
123                    '/g/elegans190',
124                    '/g/arabidopsis01222004',
125                    '/g/mm9',
126                    '/g/mm9',
127                    '/g/mm9',
128                    '/g/mm9', ]
129
130         # test lane specific parameters from gerald config file
131         for i in range(1,9):
132             cur_lane = g.lanes[i]
133             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
134             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
135             self.failUnlessEqual(cur_lane.read_length, '37')
136             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
137
138         # I want to be able to use a simple iterator
139         for l in g.lanes.values():
140           self.failUnlessEqual(l.analysis, 'eland_extended')
141           self.failUnlessEqual(l.read_length, '37')
142           self.failUnlessEqual(l.use_bases, 'Y'*37)
143
144         # test data extracted from summary file
145         clusters = [None,
146                     (126910, 4300), (165739, 6792),
147                     (196565, 8216), (153897, 8501),
148                     (135536, 3908), (154083, 9315),
149                     (159991, 9292), (198479, 17671),]
150
151         self.failUnlessEqual(len(g.summary), 1)
152         for i in range(1,9):
153             summary_lane = g.summary[0][i]
154             self.failUnlessEqual(summary_lane.cluster, clusters[i])
155             self.failUnlessEqual(summary_lane.lane, i)
156
157         xml = g.get_elements()
158         # just make sure that element tree can serialize the tree
159         xml_str = ElementTree.tostring(xml)
160         g2 = gerald.Gerald(xml=xml)
161
162         # do it all again after extracting from the xml file
163         self.failUnlessEqual(g.version, g2.version)
164         self.failUnlessEqual(g.date, g2.date)
165         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
166         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
167
168         # test lane specific parameters from gerald config file
169         for i in range(1,9):
170             g_lane = g.lanes[i]
171             g2_lane = g2.lanes[i]
172             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
173             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
174             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
175             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
176
177         # test (some) summary elements
178         self.failUnlessEqual(len(g.summary), 1)
179         for i in range(1,9):
180             g_summary = g.summary[0][i]
181             g2_summary = g2.summary[0][i]
182             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
183             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
184
185             g_eland = g.eland_results
186             g2_eland = g2.eland_results
187             for lane in g_eland:
188                 g_results = g_eland[lane]
189                 g2_results = g2_eland[lane]
190                 self.failUnlessEqual(g_results.reads,
191                                      g2_results.reads)
192                 if isinstance(g_results, eland.ElandLane):
193                   self.failUnlessEqual(len(g_results.mapped_reads),
194                                        len(g2_results.mapped_reads))
195                   for k in g_results.mapped_reads.keys():
196                       self.failUnlessEqual(g_results.mapped_reads[k],
197                                            g2_results.mapped_reads[k])
198
199                   self.failUnlessEqual(len(g_results.match_codes),
200                                        len(g2_results.match_codes))
201                   for k in g_results.match_codes.keys():
202                       self.failUnlessEqual(g_results.match_codes[k],
203                                            g2_results.match_codes[k])
204
205
206     def test_eland(self):
207         hg_map = {'Lambda.fa': 'Lambda.fa'}
208         for i in range(1,22):
209           short_name = 'chr%d.fa' % (i,)
210           long_name = 'hg18/chr%d.fa' % (i,)
211           hg_map[short_name] = long_name
212
213         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
214                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
215         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
216
217         # I added sequence lanes to the last 2 lanes of this test case
218         lanes = [ SampleKey(lane=i, read=1, sample='s') for i in range(1,7) ]
219         for key in lanes:
220             lane = eland_container[key]
221             self.failUnlessEqual(lane.reads, 6)
222             self.failUnlessEqual(lane.sample_name, "s")
223             self.failUnlessEqual(lane.lane_id, key.lane)
224             self.failUnlessEqual(len(lane.mapped_reads), 17)
225             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
226             self.failUnlessEqual(lane.match_codes['U0'], 3)
227             self.failUnlessEqual(lane.match_codes['R0'], 2)
228             self.failUnlessEqual(lane.match_codes['U1'], 1)
229             self.failUnlessEqual(lane.match_codes['R1'], 9)
230             self.failUnlessEqual(lane.match_codes['U2'], 0)
231             self.failUnlessEqual(lane.match_codes['R2'], 12)
232             self.failUnlessEqual(lane.match_codes['NM'], 1)
233             self.failUnlessEqual(lane.match_codes['QC'], 0)
234
235         # test scarf
236         lane = eland_container[SampleKey(lane=7, read=1, sample='s')]
237         self.failUnlessEqual(lane.reads, 5)
238         self.failUnlessEqual(lane.sample_name, 's')
239         self.failUnlessEqual(lane.lane_id, 7)
240         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
241
242         # test fastq
243         lane = eland_container[SampleKey(lane=8, read=1, sample='s')]
244         self.failUnlessEqual(lane.reads, 3)
245         self.failUnlessEqual(lane.sample_name, 's')
246         self.failUnlessEqual(lane.lane_id, 8)
247         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
248
249         xml = eland_container.get_elements()
250         # just make sure that element tree can serialize the tree
251         xml_str = ElementTree.tostring(xml)
252         e2 = gerald.ELAND(xml=xml)
253
254         for key in eland_container:
255             l1 = eland_container[key]
256             l2 = e2[key]
257             self.failUnlessEqual(l1.reads, l2.reads)
258             self.failUnlessEqual(l1.sample_name, l2.sample_name)
259             self.failUnlessEqual(l1.lane_id, l2.lane_id)
260             if isinstance(l1, eland.ElandLane):
261               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
262               self.failUnlessEqual(len(l1.mapped_reads), 17)
263               for k in l1.mapped_reads.keys():
264                   self.failUnlessEqual(l1.mapped_reads[k],
265                                        l2.mapped_reads[k])
266
267               self.failUnlessEqual(len(l1.match_codes), 9)
268               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
269               for k in l1.match_codes.keys():
270                   self.failUnlessEqual(l1.match_codes[k],
271                                        l2.match_codes[k])
272             elif isinstance(l1, eland.SequenceLane):
273                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
274
275     def test_runfolder(self):
276         runs = runfolder.get_runs(self.runfolder_dir)
277
278         # do we get the flowcell id from the filename?
279         self.failUnlessEqual(len(runs), 1)
280         name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
281         self.failUnlessEqual(runs[0].serialization_filename, name)
282
283         # do we get the flowcell id from the FlowcellId.xml file
284         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
285         runs = runfolder.get_runs(self.runfolder_dir)
286         self.failUnlessEqual(len(runs), 1)
287         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
288         self.failUnlessEqual(runs[0].serialization_filename, name)
289
290         r1 = runs[0]
291         xml = r1.get_elements()
292         xml_str = ElementTree.tostring(xml)
293
294         r2 = runfolder.PipelineRun(xml=xml)
295         self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
296         self.failIfEqual(r2.image_analysis, None)
297         self.failIfEqual(r2.bustard, None)
298         self.failIfEqual(r2.gerald, None)
299
300
301 def suite():
302     from unittest import TestSuite, defaultTestLoader
303     suite = TestSuite()
304     suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
305     return suite
306
307
308 if __name__ == "__main__":
309     from unittest import main
310     main(defaultTest="suite")