fdb60011df9f12785ba224ccc29a62617c0a30d1
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta1_12.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import logging
5 import os
6 import tempfile
7 import shutil
8 import unittest
9
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines.samplekey import SampleKey
12 from htsworkflow.pipelines import ipar
13 from htsworkflow.pipelines import bustard
14 from htsworkflow.pipelines import gerald
15 from htsworkflow.pipelines import runfolder
16 from htsworkflow.pipelines.runfolder import ElementTree
17
18 from htsworkflow.pipelines.test.simulate_runfolder import *
19
20
21 def make_runfolder(obj=None):
22     """
23     Make a fake runfolder, attach all the directories to obj if defined
24     """
25     # make a fake runfolder directory
26     flowcell_id = 'D07K6ACXX'
27     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
28
29     runfolder_dir = os.path.join(
30         temp_dir,
31         '110815_SN787_0101_A{0}'.format(flowcell_id))
32     os.mkdir(runfolder_dir)
33
34     make_runinfo(runfolder_dir, flowcell_id)
35
36     data_dir = os.path.join(runfolder_dir, 'Data')
37     os.mkdir(data_dir)
38
39     intensities_dir = make_rta_intensities_1_12(data_dir)
40     make_status_rta1_12(data_dir)
41
42     basecalls_dir = make_rta_basecalls_1_12(intensities_dir)
43     make_matrix_dir_rta_1_12(basecalls_dir)
44
45     unaligned_dir = os.path.join(runfolder_dir, "Unaligned")
46     os.mkdir(unaligned_dir)
47     make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
48     make_unaligned_config_1_12(unaligned_dir)
49
50     aligned_dir = os.path.join(runfolder_dir, "Aligned")
51     os.mkdir(aligned_dir)
52     make_aligned_eland_export(aligned_dir, flowcell_id)
53     make_aligned_config_1_12(aligned_dir)
54
55     if obj is not None:
56         obj.flowcell_id = flowcell_id
57         obj.temp_dir = temp_dir
58         obj.runfolder_dir = runfolder_dir
59         obj.data_dir = data_dir
60         obj.image_analysis_dir = intensities_dir
61         obj.bustard_dir = unaligned_dir
62         obj.gerald_dir = aligned_dir
63         obj.reads = 2
64
65
66 class RunfolderTests(unittest.TestCase):
67     """
68     Test components of the runfolder processing code
69     which includes firecrest, bustard, and gerald
70     """
71     def setUp(self):
72         # attaches all the directories to the object passed in
73         make_runfolder(self)
74
75     def tearDown(self):
76         shutil.rmtree(self.temp_dir)
77
78     def test_bustard(self):
79         """Construct a bustard object"""
80         b = bustard.bustard(self.bustard_dir)
81         self.failUnlessEqual(b.software, 'RTA')
82         self.failUnlessEqual(b.version, '1.12.4.2')
83         self.failUnlessEqual(b.date,    None)
84         self.failUnlessEqual(b.user,    None)
85         self.failUnlessEqual(len(b.phasing), 0)
86
87         xml = b.get_elements()
88         b2 = bustard.Bustard(xml=xml)
89         self.failUnlessEqual(b.software, b2.software)
90         self.failUnlessEqual(b.version,  b2.version)
91         self.failUnlessEqual(b.date,     b2.date )
92         self.failUnlessEqual(b.user,     b2.user)
93
94     def test_gerald(self):
95         # need to update gerald and make tests for it
96         g = gerald.gerald(self.gerald_dir)
97
98         self.failUnlessEqual(g.software, 'CASAVA')
99         self.failUnlessEqual(g.version, '1.8.1')
100         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
101         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
102
103         # list of genomes, matches what was defined up in
104         # make_gerald_config.
105         # the first None is to offset the genomes list to be 1..9
106         # instead of pythons default 0..8
107         # test lane specific parameters from gerald config file
108
109         undetermined = g.lanes[SampleKey(sample='Undetermined_indices')]
110         self.failUnlessEqual(undetermined.analysis, 'none')
111         self.failUnlessEqual(undetermined.read_length, None)
112         self.failUnlessEqual(undetermined.use_bases, None)
113
114         project = g.lanes[SampleKey(sample='11115')]
115         self.failUnlessEqual(project.analysis, 'eland_extended')
116         self.failUnlessEqual(project.eland_genome, '/g/hg18/chromosomes/')
117         self.failUnlessEqual(project.read_length, '49')
118         self.failUnlessEqual(project.use_bases, 'y'*49+'n')
119
120         # test data extracted from summary file
121         clusters = [None,
122                     (3878755,  579626.0), (3920639, 1027332.4),
123                     (5713049,  876187.3), (5852907,  538640.6),
124                     (4006751, 1265247.4), (5678021,  627070.7),
125                     (1854131,  429053.2), (4777517,  592904.0),
126                    ]
127
128         self.failUnlessEqual(len(g.summary), self.reads)
129         for i in range(1,9):
130             summary_lane = g.summary[0][i]
131             self.failUnlessEqual(summary_lane.cluster, clusters[i])
132             self.failUnlessEqual(summary_lane.lane, i)
133
134         xml = g.get_elements()
135         # just make sure that element tree can serialize the tree
136         xml_str = ElementTree.tostring(xml)
137         g2 = gerald.CASAVA(xml=xml)
138
139         # do it all again after extracting from the xml file
140         self.failUnlessEqual(g.software, g2.software)
141         self.failUnlessEqual(g.version, g2.version)
142         self.failUnlessEqual(g.date, g2.date)
143         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
144         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
145
146         # test lane specific parameters from gerald config file
147         for i in g.lanes.keys():
148             g_lane = g.lanes[i]
149             g2_lane = g2.lanes[i]
150             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
151             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
152             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
153             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
154
155         # test (some) summary elements
156         self.failUnlessEqual(len(g.summary), self.reads)
157         for i in range(1,9):
158             g_summary = g.summary[0][i]
159             g2_summary = g2.summary[0][i]
160             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
161             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
162
163             g_eland = g.eland_results
164             g2_eland = g2.eland_results
165             for key in g_eland:
166                 g_results = g_eland[key]
167                 g2_results = g2_eland[key]
168                 self.failUnlessEqual(g_results.reads,
169                                      g2_results.reads)
170                 if isinstance(g_results, eland.ElandLane):
171                   self.failUnlessEqual(len(g_results.mapped_reads),
172                                        len(g2_results.mapped_reads))
173                   for k in g_results.mapped_reads.keys():
174                       self.failUnlessEqual(g_results.mapped_reads[k],
175                                            g2_results.mapped_reads[k])
176
177                   self.failUnlessEqual(len(g_results.match_codes),
178                                        len(g2_results.match_codes))
179                   for k in g_results.match_codes.keys():
180                       self.failUnlessEqual(g_results.match_codes[k],
181                                            g2_results.match_codes[k])
182
183
184     def test_eland(self):
185         hg_map = {'Lambda.fa': 'Lambda.fa'}
186         for i in range(1,22):
187           short_name = 'chr%d.fa' % (i,)
188           long_name = 'hg18/chr%d.fa' % (i,)
189           hg_map[short_name] = long_name
190
191         samples = set(('11111', '11112', '11113', '11114', '11115',
192                        '11116', '11117', '11118', '11119', '11120'))
193         genome_maps = {}
194         for i in range(1,9):
195             genome_maps[i] = hg_map
196
197         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
198
199         for lane in eland_container.values():
200             # I added sequence lanes to the last 2 lanes of this test case
201             if lane.sample_name == '11113':
202                 self.assertEqual(lane.reads, 24)
203                 self.assertEqual(lane.mapped_reads['hg18/chr9.fa'], 6)
204                 self.assertEqual(lane.match_codes['U0'], 6)
205                 self.assertEqual(lane.match_codes['R0'], 18)
206                 self.assertEqual(lane.match_codes['R1'], 24)
207                 self.assertEqual(lane.match_codes['R2'], 18)
208                 self.assertEqual(lane.match_codes['NM'], 12)
209             else:
210                 self.assertEqual(lane.reads, 8)
211                 self.assertEqual(lane.mapped_reads['hg18/chr9.fa'], 2)
212                 self.assertEqual(lane.match_codes['U0'], 2)
213                 self.assertEqual(lane.match_codes['R0'], 6)
214                 self.assertEqual(lane.match_codes['R1'], 8)
215                 self.assertEqual(lane.match_codes['R2'], 6)
216                 self.assertEqual(lane.match_codes['NM'], 4)
217
218             self.assertTrue(lane.sample_name in samples)
219             #self.assertEqual(lane.lane_id, 1)
220             self.assertEqual(len(lane.mapped_reads), 1)
221             self.assertEqual(lane.match_codes['U1'], 0)
222             self.assertEqual(lane.match_codes['U2'], 0)
223             self.assertEqual(lane.match_codes['QC'], 0)
224
225         xml = eland_container.get_elements()
226         # just make sure that element tree can serialize the tree
227         xml_str = ElementTree.tostring(xml)
228         e2 = gerald.ELAND(xml=xml)
229
230         for key in eland_container.results:
231             l1 = eland_container.results[key]
232             l2 = e2.results[key]
233             self.failUnlessEqual(l1.reads, l2.reads)
234             self.failUnlessEqual(l1.sample_name, l2.sample_name)
235             self.failUnlessEqual(l1.lane_id, l2.lane_id)
236             if isinstance(l1, eland.ElandLane):
237               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
238               self.failUnlessEqual(len(l1.mapped_reads), 1)
239               for k in l1.mapped_reads.keys():
240                   self.failUnlessEqual(l1.mapped_reads[k],
241                                        l2.mapped_reads[k])
242
243               self.failUnlessEqual(len(l1.match_codes), 9)
244               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
245               for k in l1.match_codes.keys():
246                   self.failUnlessEqual(l1.match_codes[k],
247                                        l2.match_codes[k])
248             elif isinstance(l1, eland.SequenceLane):
249                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
250
251     def test_runfolder(self):
252         runs = runfolder.get_runs(self.runfolder_dir)
253
254         # do we get the flowcell id from the filename?
255         self.failUnlessEqual(len(runs), 1)
256         self.assertEqual(runs[0].flowcell_id, self.flowcell_id)
257         name = 'run_%s_%s.xml' % ( self.flowcell_id,
258                                    date.today().strftime('%Y-%m-%d'),)
259         self.failUnlessEqual(runs[0].name, name)
260
261         bustard_dir = os.path.join(self.runfolder_dir, 'Unaligned')
262         r1 = runs[0]
263         self.failUnlessEqual(r1.bustard.sequence_format, 'fastq')
264         self.failUnlessEqual(r1.bustard.pathname, bustard_dir)
265         self.failUnlessEqual(r1.gerald.runfolder_name, 'Unaligned')
266
267         xml = r1.get_elements()
268         xml_str = ElementTree.tostring(xml)
269
270         r2 = runfolder.PipelineRun(xml=xml)
271         self.failUnlessEqual(r1.name, r2.name)
272         self.failIfEqual(r2.image_analysis, None)
273         self.failIfEqual(r2.bustard, None)
274         self.failIfEqual(r2.gerald, None)
275
276 def suite():
277     return unittest.makeSuite(RunfolderTests,'test')
278
279 if __name__ == "__main__":
280     logging.basicConfig(level=logging.WARN)
281     unittest.main(defaultTest="suite")
282