Change unittest2 back into unittest.
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_ipar130.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import os
5 import tempfile
6 import shutil
7 from unittest import TestCase
8
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines import ElementTree
15
16 from htsworkflow.pipelines.test.simulate_runfolder import *
17
18
19 def make_runfolder(obj=None):
20     """
21     Make a fake runfolder, attach all the directories to obj if defined
22     """
23     # make a fake runfolder directory
24     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
25
26     runfolder_dir = os.path.join(temp_dir,
27                                  '090313_HWI-EAS229_0101_3021JAAXX')
28     os.mkdir(runfolder_dir)
29
30     data_dir = os.path.join(runfolder_dir, 'Data')
31     os.mkdir(data_dir)
32
33     ipar_dir = make_ipar_dir(data_dir, '1.30')
34
35     bustard_dir = os.path.join(ipar_dir,
36                                'Bustard1.3.2_15-03-2008_diane')
37     os.mkdir(bustard_dir)
38     make_phasing_params(bustard_dir)
39     make_bustard_config132(bustard_dir)
40
41     gerald_dir = os.path.join(bustard_dir,
42                               'GERALD_15-03-2008_diane')
43     os.mkdir(gerald_dir)
44     make_gerald_config_100(gerald_dir)
45     make_summary_ipar130_htm(gerald_dir)
46     make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
47     make_scarf(gerald_dir, lane_list=[7,])
48     make_fastq(gerald_dir, lane_list=[8,])
49
50     if obj is not None:
51         obj.temp_dir = temp_dir
52         obj.runfolder_dir = runfolder_dir
53         obj.data_dir = data_dir
54         obj.image_analysis_dir = ipar_dir
55         obj.bustard_dir = bustard_dir
56         obj.gerald_dir = gerald_dir
57
58
59 class RunfolderTests(TestCase):
60     """
61     Test components of the runfolder processing code
62     which includes firecrest, bustard, and gerald
63     """
64     def setUp(self):
65         # attaches all the directories to the object passed in
66         make_runfolder(self)
67
68     def tearDown(self):
69         shutil.rmtree(self.temp_dir)
70
71     def test_ipar(self):
72         """
73         Construct a firecrest object
74         """
75         i = ipar.ipar(self.image_analysis_dir)
76         self.failUnlessEqual(i.software, 'IPAR')
77         self.failUnlessEqual(i.version, '2.01.192.0')
78         self.failUnlessEqual(i.start, 1)
79         self.failUnlessEqual(i.stop, 37)
80
81         xml = i.get_elements()
82         # just make sure that element tree can serialize the tree
83         xml_str = ElementTree.tostring(xml)
84
85         i2 = ipar.IPAR(xml=xml)
86         self.failUnlessEqual(i.software, i2.software)
87         self.failUnlessEqual(i.version, i2.version)
88         self.failUnlessEqual(i.start,   i2.start)
89         self.failUnlessEqual(i.stop,    i2.stop)
90         self.failUnlessEqual(i.date,    i2.date)
91         self.failUnlessEqual(i.file_list(), i2.file_list())
92
93     def test_bustard(self):
94         """
95         construct a bustard object
96         """
97         def check_crosstalk(crosstalk):
98             self.failUnlessAlmostEqual(crosstalk.base['A'][0],  1.27)
99             self.failUnlessAlmostEqual(crosstalk.base['A'][1],  0.20999999999999)
100             self.failUnlessAlmostEqual(crosstalk.base['A'][2], -0.02)
101             self.failUnlessAlmostEqual(crosstalk.base['A'][3], -0.03)
102
103             self.failUnlessAlmostEqual(crosstalk.base['C'][0],  0.57)
104             self.failUnlessAlmostEqual(crosstalk.base['C'][1],  0.58)
105             self.failUnlessAlmostEqual(crosstalk.base['C'][2], -0.01)
106             self.failUnlessAlmostEqual(crosstalk.base['C'][3], -0.01)
107
108             self.failUnlessAlmostEqual(crosstalk.base['T'][0], -0.02)
109             self.failUnlessAlmostEqual(crosstalk.base['T'][1], -0.02)
110             self.failUnlessAlmostEqual(crosstalk.base['T'][2],  0.80)
111             self.failUnlessAlmostEqual(crosstalk.base['T'][3],  1.07)
112
113             self.failUnlessAlmostEqual(crosstalk.base['G'][0], -0.03)
114             self.failUnlessAlmostEqual(crosstalk.base['G'][1], -0.04)
115             self.failUnlessAlmostEqual(crosstalk.base['G'][2],  1.51)
116             self.failUnlessAlmostEqual(crosstalk.base['G'][3], -0.02)
117
118         b = bustard.bustard(self.bustard_dir)
119         self.failUnlessEqual(b.software, 'Bustard')
120         self.failUnlessEqual(b.version, '1.3.2')
121         self.failUnlessEqual(b.date,    date(2008,3,15))
122         self.failUnlessEqual(b.user,    'diane')
123         self.failUnlessEqual(len(b.phasing), 8)
124         self.failUnlessAlmostEqual(b.phasing[8].phasing, 0.0099)
125         self.failUnlessEqual(b.crosstalk.base.keys(), ['A','C','T','G'])
126         check_crosstalk(b.crosstalk)
127
128         xml = b.get_elements()
129         b2 = bustard.Bustard(xml=xml)
130         self.failUnlessEqual(b.software, b2.software)
131         self.failUnlessEqual(b.version, b2.version)
132         self.failUnlessEqual(b.date,    b2.date )
133         self.failUnlessEqual(b.user,    b2.user)
134         self.failUnlessEqual(len(b.phasing), len(b2.phasing))
135         for key in b.phasing.keys():
136             self.failUnlessEqual(b.phasing[key].lane,
137                                  b2.phasing[key].lane)
138             self.failUnlessEqual(b.phasing[key].phasing,
139                                  b2.phasing[key].phasing)
140             self.failUnlessEqual(b.phasing[key].prephasing,
141                                  b2.phasing[key].prephasing)
142         check_crosstalk(b2.crosstalk)
143
144     def test_gerald(self):
145         # need to update gerald and make tests for it
146         g = gerald.gerald(self.gerald_dir)
147
148         self.failUnlessEqual(g.software, 'GERALD')
149         self.failUnlessEqual(g.version, '1.171')
150         self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
151         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
152         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
153
154
155         # list of genomes, matches what was defined up in
156         # make_gerald_config.
157         # the first None is to offset the genomes list to be 1..9
158         # instead of pythons default 0..8
159         genomes = [None,
160                    '/g/mm9',
161                    '/g/mm9',
162                    '/g/elegans190',
163                    '/g/arabidopsis01222004',
164                    '/g/mm9',
165                    '/g/mm9',
166                    '/g/mm9',
167                    '/g/mm9', ]
168
169         # test lane specific parameters from gerald config file
170         for i in range(1,9):
171             cur_lane = g.lanes[i]
172             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
173             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
174             self.failUnlessEqual(cur_lane.read_length, '37')
175             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
176
177         # I want to be able to use a simple iterator
178         for l in g.lanes.values():
179           self.failUnlessEqual(l.analysis, 'eland_extended')
180           self.failUnlessEqual(l.read_length, '37')
181           self.failUnlessEqual(l.use_bases, 'Y'*37)
182
183         # test data extracted from summary file
184         clusters = [None,
185                     (126910, 4300), (165739, 6792),
186                     (196565, 8216), (153897, 8501),
187                     (135536, 3908), (154083, 9315),
188                     (159991, 9292), (198479, 17671),]
189
190         self.failUnlessEqual(len(g.summary), 1)
191         for i in range(1,9):
192             summary_lane = g.summary[0][i]
193             self.failUnlessEqual(summary_lane.cluster, clusters[i])
194             self.failUnlessEqual(summary_lane.lane, i)
195
196         xml = g.get_elements()
197         # just make sure that element tree can serialize the tree
198         xml_str = ElementTree.tostring(xml)
199         g2 = gerald.Gerald(xml=xml)
200
201         # do it all again after extracting from the xml file
202         self.failUnlessEqual(g.software, g2.software)
203         self.failUnlessEqual(g.version, g2.version)
204         self.failUnlessEqual(g.date, g2.date)
205         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
206         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
207
208         # test lane specific parameters from gerald config file
209         for i in range(1,9):
210             g_lane = g.lanes[i]
211             g2_lane = g2.lanes[i]
212             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
213             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
214             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
215             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
216
217         # test (some) summary elements
218         self.failUnlessEqual(len(g.summary), 1)
219         for i in range(1,9):
220             g_summary = g.summary[0][i]
221             g2_summary = g2.summary[0][i]
222             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
223             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
224
225             g_eland = g.eland_results
226             g2_eland = g2.eland_results
227             for key in g_eland:
228                 g_results = g_eland[key]
229                 g2_results = g2_eland[key]
230                 self.failUnlessEqual(g_results.reads,
231                                      g2_results.reads)
232                 if isinstance(g_results, eland.ElandLane):
233                   self.failUnlessEqual(len(g_results.mapped_reads),
234                                        len(g2_results.mapped_reads))
235                   for k in g_results.mapped_reads.keys():
236                       self.failUnlessEqual(g_results.mapped_reads[k],
237                                            g2_results.mapped_reads[k])
238
239                   self.failUnlessEqual(len(g_results.match_codes),
240                                        len(g2_results.match_codes))
241                   for k in g_results.match_codes.keys():
242                       self.failUnlessEqual(g_results.match_codes[k],
243                                            g2_results.match_codes[k])
244
245
246     def test_eland(self):
247         hg_map = {'Lambda.fa': 'Lambda.fa'}
248         for i in range(1,22):
249           short_name = 'chr%d.fa' % (i,)
250           long_name = 'hg18/chr%d.fa' % (i,)
251           hg_map[short_name] = long_name
252
253         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
254                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
255         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
256
257         # I added sequence lanes to the last 2 lanes of this test case
258         for key in eland_container:
259             lane = eland_container[key]
260             if key.lane in [1,2,3,4,5,6]:
261                 self.failUnlessEqual(lane.reads, 6)
262                 self.failUnlessEqual(lane.sample_name, "s")
263                 self.failUnlessEqual(lane.lane_id, key.lane)
264                 self.failUnlessEqual(len(lane.mapped_reads), 17)
265                 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
266                 self.failUnlessEqual(lane.match_codes['U0'], 3)
267                 self.failUnlessEqual(lane.match_codes['R0'], 2)
268                 self.failUnlessEqual(lane.match_codes['U1'], 1)
269                 self.failUnlessEqual(lane.match_codes['R1'], 9)
270                 self.failUnlessEqual(lane.match_codes['U2'], 0)
271                 self.failUnlessEqual(lane.match_codes['R2'], 12)
272                 self.failUnlessEqual(lane.match_codes['NM'], 1)
273                 self.failUnlessEqual(lane.match_codes['QC'], 0)
274             elif key.lane == 7:
275                 self.failUnlessEqual(lane.reads, 5)
276                 self.failUnlessEqual(lane.sample_name, 's')
277                 self.failUnlessEqual(lane.lane_id, 7)
278                 self.failUnlessEqual(lane.sequence_type,
279                                      eland.SequenceLane.SCARF_TYPE)
280             elif key.lane == 8:
281                 self.failUnlessEqual(lane.reads, 3)
282                 self.failUnlessEqual(lane.sample_name, 's')
283                 self.failUnlessEqual(lane.lane_id, 8)
284                 self.failUnlessEqual(lane.sequence_type,
285                                      eland.SequenceLane.FASTQ_TYPE)
286
287         xml = eland_container.get_elements()
288         # just make sure that element tree can serialize the tree
289         xml_str = ElementTree.tostring(xml)
290         e2 = gerald.ELAND(xml=xml)
291
292         for key in eland_container:
293             l1 = eland_container[key]
294             l2 = e2[key]
295             self.failUnlessEqual(l1.reads, l2.reads)
296             self.failUnlessEqual(l1.sample_name, l2.sample_name)
297             self.failUnlessEqual(l1.lane_id, l2.lane_id)
298             if isinstance(l1, eland.ElandLane):
299               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
300               self.failUnlessEqual(len(l1.mapped_reads), 17)
301               for k in l1.mapped_reads.keys():
302                   self.failUnlessEqual(l1.mapped_reads[k],
303                                        l2.mapped_reads[k])
304
305               self.failUnlessEqual(len(l1.match_codes), 9)
306               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
307               for k in l1.match_codes.keys():
308                   self.failUnlessEqual(l1.match_codes[k],
309                                        l2.match_codes[k])
310             elif isinstance(l1, eland.SequenceLane):
311                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
312
313     def test_runfolder(self):
314         runs = runfolder.get_runs(self.runfolder_dir)
315
316         # do we get the flowcell id from the filename?
317         self.failUnlessEqual(len(runs), 1)
318         name = 'run_3021JAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
319         self.failUnlessEqual(runs[0].serialization_filename, name)
320
321         # do we get the flowcell id from the FlowcellId.xml file
322         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
323         runs = runfolder.get_runs(self.runfolder_dir)
324         self.failUnlessEqual(len(runs), 1)
325         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
326         self.failUnlessEqual(runs[0].serialization_filename, name)
327
328         r1 = runs[0]
329         xml = r1.get_elements()
330         xml_str = ElementTree.tostring(xml)
331
332         r2 = runfolder.PipelineRun(xml=xml)
333         self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
334         self.failIfEqual(r2.image_analysis, None)
335         self.failIfEqual(r2.bustard, None)
336         self.failIfEqual(r2.gerald, None)
337
338
339 def suite():
340     from unittest import TestSuite, defaultTestLoader
341     suite = TestSuite()
342     suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
343     return suite
344
345
346 if __name__ == "__main__":
347     from unittest import main
348     main(defaultTest="suite")