Merge branch 'master' of mus.cacr.caltech.edu:htsworkflow
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta1_12.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import logging
5 import os
6 import tempfile
7 import shutil
8 import unittest
9
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines import ipar
12 from htsworkflow.pipelines import bustard
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines.runfolder import ElementTree
16
17 from htsworkflow.pipelines.test.simulate_runfolder import *
18
19
20 def make_runfolder(obj=None):
21     """
22     Make a fake runfolder, attach all the directories to obj if defined
23     """
24     # make a fake runfolder directory
25     flowcell_id = 'D07K6ACXX'
26     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
27
28     runfolder_dir = os.path.join(temp_dir,
29                                  '110815_SN787_0101_A{0}'.format(flowcell_id))
30     os.mkdir(runfolder_dir)
31
32     data_dir = os.path.join(runfolder_dir, 'Data')
33     os.mkdir(data_dir)
34
35     intensities_dir = make_rta_intensities_1_12(data_dir)
36     make_status_rta1_12(data_dir)
37
38     basecalls_dir = make_rta_basecalls_1_12(intensities_dir)
39     make_matrix_dir_rta_1_12(basecalls_dir)
40
41     unaligned_dir = os.path.join(runfolder_dir, "Unaligned")
42     os.mkdir(unaligned_dir)
43     make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
44     make_unaligned_config_1_12(unaligned_dir)
45
46     aligned_dir = os.path.join(runfolder_dir, "Aligned")
47     os.mkdir(aligned_dir)
48     make_aligned_eland_export(aligned_dir, flowcell_id)
49     make_aligned_config_1_12(aligned_dir)
50
51     if obj is not None:
52         obj.temp_dir = temp_dir
53         obj.runfolder_dir = runfolder_dir
54         obj.data_dir = data_dir
55         obj.image_analysis_dir = intensities_dir
56         obj.bustard_dir = unaligned_dir
57         obj.gerald_dir = aligned_dir
58         obj.reads = 2
59
60
61 class RunfolderTests(unittest.TestCase):
62     """
63     Test components of the runfolder processing code
64     which includes firecrest, bustard, and gerald
65     """
66     def setUp(self):
67         # attaches all the directories to the object passed in
68         make_runfolder(self)
69
70     def tearDown(self):
71         shutil.rmtree(self.temp_dir)
72
73     def test_bustard(self):
74         """Construct a bustard object"""
75         b = bustard.bustard(self.bustard_dir)
76         self.failUnlessEqual(b.software, 'RTA')
77         self.failUnlessEqual(b.version, '1.12.4.2')
78         self.failUnlessEqual(b.date,    None)
79         self.failUnlessEqual(b.user,    None)
80         self.failUnlessEqual(len(b.phasing), 0)
81
82         xml = b.get_elements()
83         b2 = bustard.Bustard(xml=xml)
84         self.failUnlessEqual(b.software, b2.software)
85         self.failUnlessEqual(b.version,  b2.version)
86         self.failUnlessEqual(b.date,     b2.date )
87         self.failUnlessEqual(b.user,     b2.user)
88
89     def test_gerald(self):
90         # need to update gerald and make tests for it
91         g = gerald.gerald(self.gerald_dir)
92
93         self.failUnlessEqual(g.software, 'CASAVA')
94         self.failUnlessEqual(g.version, '1.8.1')
95         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
96         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
97
98         # list of genomes, matches what was defined up in
99         # make_gerald_config.
100         # the first None is to offset the genomes list to be 1..9
101         # instead of pythons default 0..8
102         # test lane specific parameters from gerald config file
103
104         undetermined = g.lanes['Undetermined_indices']
105         self.failUnlessEqual(undetermined.analysis, 'none')
106         self.failUnlessEqual(undetermined.read_length, None)
107         self.failUnlessEqual(undetermined.use_bases, None)
108
109         project = g.lanes['12383']
110         self.failUnlessEqual(project.analysis, 'eland_extended')
111         self.failUnlessEqual(project.eland_genome, '/g/hg18/chromosomes/')
112         self.failUnlessEqual(project.read_length, '49')
113         self.failUnlessEqual(project.use_bases, 'y'*49+'n')
114
115         # test data extracted from summary file
116         clusters = [None,
117                     (3878755,  579626.0), (3920639, 1027332.4),
118                     (5713049,  876187.3), (5852907,  538640.6),
119                     (4006751, 1265247.4), (5678021,  627070.7),
120                     (1854131,  429053.2), (4777517,  592904.0),
121                    ]
122
123         self.failUnlessEqual(len(g.summary), self.reads)
124         for i in range(1,9):
125             summary_lane = g.summary[0][i]
126             self.failUnlessEqual(summary_lane.cluster, clusters[i])
127             self.failUnlessEqual(summary_lane.lane, i)
128
129         xml = g.get_elements()
130         # just make sure that element tree can serialize the tree
131         xml_str = ElementTree.tostring(xml)
132         g2 = gerald.CASAVA(xml=xml)
133
134         # do it all again after extracting from the xml file
135         self.failUnlessEqual(g.software, g2.software)
136         self.failUnlessEqual(g.version, g2.version)
137         self.failUnlessEqual(g.date, g2.date)
138         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
139         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
140
141         # test lane specific parameters from gerald config file
142         for i in g.lanes.keys():
143             g_lane = g.lanes[i]
144             g2_lane = g2.lanes[i]
145             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
146             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
147             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
148             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
149
150         # test (some) summary elements
151         self.failUnlessEqual(len(g.summary), self.reads)
152         for i in range(1,9):
153             g_summary = g.summary[0][i]
154             g2_summary = g2.summary[0][i]
155             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
156             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
157
158             g_eland = g.eland_results
159             g2_eland = g2.eland_results
160             for lane in g_eland.results[0].keys():
161                 g_results = g_eland.results[0][lane]
162                 g2_results = g2_eland.results[0][lane]
163                 self.failUnlessEqual(g_results.reads,
164                                      g2_results.reads)
165                 if isinstance(g_results, eland.ElandLane):
166                   self.failUnlessEqual(len(g_results.mapped_reads),
167                                        len(g2_results.mapped_reads))
168                   for k in g_results.mapped_reads.keys():
169                       self.failUnlessEqual(g_results.mapped_reads[k],
170                                            g2_results.mapped_reads[k])
171
172                   self.failUnlessEqual(len(g_results.match_codes),
173                                        len(g2_results.match_codes))
174                   for k in g_results.match_codes.keys():
175                       self.failUnlessEqual(g_results.match_codes[k],
176                                            g2_results.match_codes[k])
177
178
179     def test_eland(self):
180         hg_map = {'Lambda.fa': 'Lambda.fa'}
181         for i in range(1,22):
182           short_name = 'chr%d.fa' % (i,)
183           long_name = 'hg18/chr%d.fa' % (i,)
184           hg_map[short_name] = long_name
185
186         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
187                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
188         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
189
190         # I added sequence lanes to the last 2 lanes of this test case
191         for i in range(1,7):
192             lane = eland_container.results[0][i]
193             self.failUnlessEqual(lane.reads, 6)
194             self.failUnlessEqual(lane.sample_name, "s")
195             self.failUnlessEqual(lane.lane_id, i)
196             self.failUnlessEqual(len(lane.mapped_reads), 17)
197             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
198             self.failUnlessEqual(lane.match_codes['U0'], 3)
199             self.failUnlessEqual(lane.match_codes['R0'], 2)
200             self.failUnlessEqual(lane.match_codes['U1'], 1)
201             self.failUnlessEqual(lane.match_codes['R1'], 9)
202             self.failUnlessEqual(lane.match_codes['U2'], 0)
203             self.failUnlessEqual(lane.match_codes['R2'], 12)
204             self.failUnlessEqual(lane.match_codes['NM'], 1)
205             self.failUnlessEqual(lane.match_codes['QC'], 0)
206
207         # test scarf
208         lane = eland_container.results[0][7]
209         self.failUnlessEqual(lane.reads, 5)
210         self.failUnlessEqual(lane.sample_name, 's')
211         self.failUnlessEqual(lane.lane_id, 7)
212         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
213
214         # test fastq
215         lane = eland_container.results[0][8]
216         self.failUnlessEqual(lane.reads, 3)
217         self.failUnlessEqual(lane.sample_name, 's')
218         self.failUnlessEqual(lane.lane_id, 8)
219         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
220
221         xml = eland_container.get_elements()
222         # just make sure that element tree can serialize the tree
223         xml_str = ElementTree.tostring(xml)
224         e2 = gerald.ELAND(xml=xml)
225
226         for i in range(1,9):
227             l1 = eland_container.results[0][i]
228             l2 = e2.results[0][i]
229             self.failUnlessEqual(l1.reads, l2.reads)
230             self.failUnlessEqual(l1.sample_name, l2.sample_name)
231             self.failUnlessEqual(l1.lane_id, l2.lane_id)
232             if isinstance(l1, eland.ElandLane):
233               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
234               self.failUnlessEqual(len(l1.mapped_reads), 17)
235               for k in l1.mapped_reads.keys():
236                   self.failUnlessEqual(l1.mapped_reads[k],
237                                        l2.mapped_reads[k])
238
239               self.failUnlessEqual(len(l1.match_codes), 9)
240               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
241               for k in l1.match_codes.keys():
242                   self.failUnlessEqual(l1.match_codes[k],
243                                        l2.match_codes[k])
244             elif isinstance(l1, eland.SequenceLane):
245                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
246
247     def test_runfolder(self):
248         return
249         runs = runfolder.get_runs(self.runfolder_dir)
250
251         # do we get the flowcell id from the filename?
252         self.failUnlessEqual(len(runs), 1)
253         name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
254         self.failUnlessEqual(runs[0].name, name)
255
256         # do we get the flowcell id from the FlowcellId.xml file
257         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
258         runs = runfolder.get_runs(self.runfolder_dir)
259         self.failUnlessEqual(len(runs), 1)
260         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
261         self.failUnlessEqual(runs[0].name, name)
262
263         r1 = runs[0]
264         xml = r1.get_elements()
265         xml_str = ElementTree.tostring(xml)
266
267         r2 = runfolder.PipelineRun(xml=xml)
268         self.failUnlessEqual(r1.name, r2.name)
269         self.failIfEqual(r2.image_analysis, None)
270         self.failIfEqual(r2.bustard, None)
271         self.failIfEqual(r2.gerald, None)
272
273
274 def suite():
275     return unittest.makeSuite(RunfolderTests,'test')
276
277 if __name__ == "__main__":
278     logging.basicConfig(level=logging.WARN)
279     unittest.main(defaultTest="suite")
280