ed225bfd36b9ca4dbf5cf618e04eddd50d3af1af
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta1_12.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import logging
5 import os
6 import tempfile
7 import shutil
8 import unittest
9
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines import ipar
12 from htsworkflow.pipelines import bustard
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines.runfolder import ElementTree
16
17 from htsworkflow.pipelines.test.simulate_runfolder import *
18
19
20 def make_runfolder(obj=None):
21     """
22     Make a fake runfolder, attach all the directories to obj if defined
23     """
24     # make a fake runfolder directory
25     flowcell_id = 'D07K6ACXX'
26     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
27
28     runfolder_dir = os.path.join(temp_dir,
29                                  '110815_SN787_0101_A{0}'.format(flowcell_id))
30     os.mkdir(runfolder_dir)
31
32     data_dir = os.path.join(runfolder_dir, 'Data')
33     os.mkdir(data_dir)
34
35     intensities_dir = make_rta_intensities_1_12(data_dir)
36     make_status_rta1_12(data_dir)
37
38     basecalls_dir = make_rta_basecalls_1_12(intensities_dir)
39     make_matrix_dir_rta_1_12(basecalls_dir)
40
41     unaligned_dir = os.path.join(runfolder_dir, "Unaligned")
42     os.mkdir(unaligned_dir)
43     make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
44     make_unaligned_config_1_12(unaligned_dir)
45
46     aligned_dir = os.path.join(runfolder_dir, "Aligned")
47     os.mkdir(aligned_dir)
48     make_aligned_eland_export(aligned_dir, flowcell_id)
49     make_aligned_config_1_12(aligned_dir)
50
51     if obj is not None:
52         obj.temp_dir = temp_dir
53         obj.runfolder_dir = runfolder_dir
54         obj.data_dir = data_dir
55         obj.image_analysis_dir = intensities_dir
56         obj.bustard_dir = unaligned_dir
57         obj.gerald_dir = aligned_dir
58
59
60 class RunfolderTests(unittest.TestCase):
61     """
62     Test components of the runfolder processing code
63     which includes firecrest, bustard, and gerald
64     """
65     def setUp(self):
66         # attaches all the directories to the object passed in
67         make_runfolder(self)
68
69     def tearDown(self):
70         shutil.rmtree(self.temp_dir)
71
72     def test_bustard(self):
73         """Construct a bustard object"""
74         b = bustard.bustard(self.bustard_dir)
75         self.failUnlessEqual(b.version, '1.12.4.2')
76         self.failUnlessEqual(b.date,    None)
77         self.failUnlessEqual(b.user,    None)
78         self.failUnlessEqual(len(b.phasing), 0)
79
80         xml = b.get_elements()
81         b2 = bustard.Bustard(xml=xml)
82         self.failUnlessEqual(b.version, b2.version)
83         self.failUnlessEqual(b.date,    b2.date )
84         self.failUnlessEqual(b.user,    b2.user)
85
86     def test_gerald(self):
87         # need to update gerald and make tests for it
88         g = gerald.gerald(self.gerald_dir)
89
90         self.failUnlessEqual(g.version, 'CASAVA-1.8.1')
91         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
92         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
93
94         # list of genomes, matches what was defined up in
95         # make_gerald_config.
96         # the first None is to offset the genomes list to be 1..9
97         # instead of pythons default 0..8
98         genomes = [None,
99                    '/g/mm9',
100                    '/g/mm9',
101                    '/g/elegans190',
102                    '/g/arabidopsis01222004',
103                    '/g/mm9',
104                    '/g/mm9',
105                    '/g/mm9',
106                    '/g/mm9', ]
107
108         # test lane specific parameters from gerald config file
109         for i in range(1,9):
110             cur_lane = g.lanes[i]
111             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
112             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
113             self.failUnlessEqual(cur_lane.read_length, '37')
114             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
115
116         # I want to be able to use a simple iterator
117         for l in g.lanes.values():
118           self.failUnlessEqual(l.analysis, 'eland_extended')
119           self.failUnlessEqual(l.read_length, '37')
120           self.failUnlessEqual(l.use_bases, 'Y'*37)
121
122         # test data extracted from summary file
123         clusters = [None,
124                     (281331, 11169), (203841, 13513),
125                     (220889, 15653), (137294, 14666),
126                     (129388, 14525), (262092, 10751),
127                     (185754, 13503), (233765, 9537),]
128
129         self.failUnlessEqual(len(g.summary), 1)
130         for i in range(1,9):
131             summary_lane = g.summary[0][i]
132             self.failUnlessEqual(summary_lane.cluster, clusters[i])
133             self.failUnlessEqual(summary_lane.lane, i)
134
135         xml = g.get_elements()
136         # just make sure that element tree can serialize the tree
137         xml_str = ElementTree.tostring(xml)
138         g2 = gerald.Gerald(xml=xml)
139         return
140
141         # do it all again after extracting from the xml file
142         self.failUnlessEqual(g.version, g2.version)
143         self.failUnlessEqual(g.date, g2.date)
144         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
145         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
146
147         # test lane specific parameters from gerald config file
148         for i in range(1,9):
149             g_lane = g.lanes[i]
150             g2_lane = g2.lanes[i]
151             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
152             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
153             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
154             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
155
156         # test (some) summary elements
157         self.failUnlessEqual(len(g.summary), 1)
158         for i in range(1,9):
159             g_summary = g.summary[0][i]
160             g2_summary = g2.summary[0][i]
161             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
162             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
163
164             g_eland = g.eland_results
165             g2_eland = g2.eland_results
166             for lane in g_eland.results[0].keys():
167                 g_results = g_eland.results[0][lane]
168                 g2_results = g2_eland.results[0][lane]
169                 self.failUnlessEqual(g_results.reads,
170                                      g2_results.reads)
171                 if isinstance(g_results, eland.ElandLane):
172                   self.failUnlessEqual(len(g_results.mapped_reads),
173                                        len(g2_results.mapped_reads))
174                   for k in g_results.mapped_reads.keys():
175                       self.failUnlessEqual(g_results.mapped_reads[k],
176                                            g2_results.mapped_reads[k])
177
178                   self.failUnlessEqual(len(g_results.match_codes),
179                                        len(g2_results.match_codes))
180                   for k in g_results.match_codes.keys():
181                       self.failUnlessEqual(g_results.match_codes[k],
182                                            g2_results.match_codes[k])
183
184
185     def test_eland(self):
186         return
187         hg_map = {'Lambda.fa': 'Lambda.fa'}
188         for i in range(1,22):
189           short_name = 'chr%d.fa' % (i,)
190           long_name = 'hg18/chr%d.fa' % (i,)
191           hg_map[short_name] = long_name
192
193         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
194                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
195         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
196
197         # I added sequence lanes to the last 2 lanes of this test case
198         for i in range(1,7):
199             lane = eland_container.results[0][i]
200             self.failUnlessEqual(lane.reads, 6)
201             self.failUnlessEqual(lane.sample_name, "s")
202             self.failUnlessEqual(lane.lane_id, i)
203             self.failUnlessEqual(len(lane.mapped_reads), 17)
204             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
205             self.failUnlessEqual(lane.match_codes['U0'], 3)
206             self.failUnlessEqual(lane.match_codes['R0'], 2)
207             self.failUnlessEqual(lane.match_codes['U1'], 1)
208             self.failUnlessEqual(lane.match_codes['R1'], 9)
209             self.failUnlessEqual(lane.match_codes['U2'], 0)
210             self.failUnlessEqual(lane.match_codes['R2'], 12)
211             self.failUnlessEqual(lane.match_codes['NM'], 1)
212             self.failUnlessEqual(lane.match_codes['QC'], 0)
213
214         # test scarf
215         lane = eland_container.results[0][7]
216         self.failUnlessEqual(lane.reads, 5)
217         self.failUnlessEqual(lane.sample_name, 's')
218         self.failUnlessEqual(lane.lane_id, 7)
219         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
220
221         # test fastq
222         lane = eland_container.results[0][8]
223         self.failUnlessEqual(lane.reads, 3)
224         self.failUnlessEqual(lane.sample_name, 's')
225         self.failUnlessEqual(lane.lane_id, 8)
226         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
227
228         xml = eland_container.get_elements()
229         # just make sure that element tree can serialize the tree
230         xml_str = ElementTree.tostring(xml)
231         e2 = gerald.ELAND(xml=xml)
232
233         for i in range(1,9):
234             l1 = eland_container.results[0][i]
235             l2 = e2.results[0][i]
236             self.failUnlessEqual(l1.reads, l2.reads)
237             self.failUnlessEqual(l1.sample_name, l2.sample_name)
238             self.failUnlessEqual(l1.lane_id, l2.lane_id)
239             if isinstance(l1, eland.ElandLane):
240               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
241               self.failUnlessEqual(len(l1.mapped_reads), 17)
242               for k in l1.mapped_reads.keys():
243                   self.failUnlessEqual(l1.mapped_reads[k],
244                                        l2.mapped_reads[k])
245
246               self.failUnlessEqual(len(l1.match_codes), 9)
247               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
248               for k in l1.match_codes.keys():
249                   self.failUnlessEqual(l1.match_codes[k],
250                                        l2.match_codes[k])
251             elif isinstance(l1, eland.SequenceLane):
252                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
253
254     def test_runfolder(self):
255         return
256         runs = runfolder.get_runs(self.runfolder_dir)
257
258         # do we get the flowcell id from the filename?
259         self.failUnlessEqual(len(runs), 1)
260         name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
261         self.failUnlessEqual(runs[0].name, name)
262
263         # do we get the flowcell id from the FlowcellId.xml file
264         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
265         runs = runfolder.get_runs(self.runfolder_dir)
266         self.failUnlessEqual(len(runs), 1)
267         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
268         self.failUnlessEqual(runs[0].name, name)
269
270         r1 = runs[0]
271         xml = r1.get_elements()
272         xml_str = ElementTree.tostring(xml)
273
274         r2 = runfolder.PipelineRun(xml=xml)
275         self.failUnlessEqual(r1.name, r2.name)
276         self.failIfEqual(r2.image_analysis, None)
277         self.failIfEqual(r2.bustard, None)
278         self.failIfEqual(r2.gerald, None)
279
280
281 def suite():
282     return unittest.makeSuite(RunfolderTests,'test')
283
284 if __name__ == "__main__":
285     logging.basicConfig(level=logging.WARN)
286     unittest.main(defaultTest="suite")
287