Fix bugs introduduced by the improved HiSeq runfolder scanning.
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta1_12.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import logging
5 import os
6 import tempfile
7 import shutil
8 import unittest
9
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines import ipar
12 from htsworkflow.pipelines import bustard
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines.runfolder import ElementTree
16
17 from htsworkflow.pipelines.test.simulate_runfolder import *
18
19
20 def make_runfolder(obj=None):
21     """
22     Make a fake runfolder, attach all the directories to obj if defined
23     """
24     # make a fake runfolder directory
25     flowcell_id = 'D07K6ACXX'
26     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
27
28     runfolder_dir = os.path.join(temp_dir,
29                                  '110815_SN787_0101_A{0}'.format(flowcell_id))
30     os.mkdir(runfolder_dir)
31
32     data_dir = os.path.join(runfolder_dir, 'Data')
33     os.mkdir(data_dir)
34
35     intensities_dir = make_rta_intensities_1_12(data_dir)
36
37     status_dir = os.path.join(data_dir, 'Status_Files')
38     os.mkdir(status_dir)
39     make_summary_rta1_12(status_dir)
40
41     basecalls_dir = make_rta_basecalls_1_12(intensities_dir)
42     make_matrix_dir_rta_1_12(basecalls_dir)
43
44     unaligned_dir = os.path.join(runfolder_dir, "Unaligned")
45     os.mkdir(unaligned_dir)
46     make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
47     make_unaligned_config_1_12(unaligned_dir)
48
49     aligned_dir = os.path.join(runfolder_dir, "Aligned")
50     os.mkdir(aligned_dir)
51     make_aligned_eland_export(aligned_dir, flowcell_id)
52     make_aligned_config_1_12(aligned_dir)
53
54     if obj is not None:
55         obj.temp_dir = temp_dir
56         obj.runfolder_dir = runfolder_dir
57         obj.data_dir = data_dir
58         obj.image_analysis_dir = intensities_dir
59         obj.bustard_dir = unaligned_dir
60         obj.gerald_dir = aligned_dir
61
62
63 class RunfolderTests(unittest.TestCase):
64     """
65     Test components of the runfolder processing code
66     which includes firecrest, bustard, and gerald
67     """
68     def setUp(self):
69         # attaches all the directories to the object passed in
70         make_runfolder(self)
71
72     def tearDown(self):
73         shutil.rmtree(self.temp_dir)
74
75     def test_bustard(self):
76         """Construct a bustard object"""
77         b = bustard.bustard(self.bustard_dir)
78         self.failUnlessEqual(b.version, '1.12.4.2')
79         self.failUnlessEqual(b.date,    None)
80         self.failUnlessEqual(b.user,    None)
81         self.failUnlessEqual(len(b.phasing), 0)
82
83         xml = b.get_elements()
84         b2 = bustard.Bustard(xml=xml)
85         self.failUnlessEqual(b.version, b2.version)
86         self.failUnlessEqual(b.date,    b2.date )
87         self.failUnlessEqual(b.user,    b2.user)
88
89     def test_gerald(self):
90         # need to update gerald and make tests for it
91         g = gerald.gerald(self.gerald_dir)
92
93         self.failUnlessEqual(g.version, 'CASAVA-1.8.1')
94         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
95         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
96
97         # list of genomes, matches what was defined up in
98         # make_gerald_config.
99         # the first None is to offset the genomes list to be 1..9
100         # instead of pythons default 0..8
101         genomes = [None,
102                    '/g/mm9',
103                    '/g/mm9',
104                    '/g/elegans190',
105                    '/g/arabidopsis01222004',
106                    '/g/mm9',
107                    '/g/mm9',
108                    '/g/mm9',
109                    '/g/mm9', ]
110
111         # test lane specific parameters from gerald config file
112         for i in range(1,9):
113             cur_lane = g.lanes[i]
114             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
115             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
116             self.failUnlessEqual(cur_lane.read_length, '37')
117             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
118
119         # I want to be able to use a simple iterator
120         for l in g.lanes.values():
121           self.failUnlessEqual(l.analysis, 'eland_extended')
122           self.failUnlessEqual(l.read_length, '37')
123           self.failUnlessEqual(l.use_bases, 'Y'*37)
124
125         # test data extracted from summary file
126         clusters = [None,
127                     (281331, 11169), (203841, 13513),
128                     (220889, 15653), (137294, 14666),
129                     (129388, 14525), (262092, 10751),
130                     (185754, 13503), (233765, 9537),]
131
132         self.failUnlessEqual(len(g.summary), 1)
133         for i in range(1,9):
134             summary_lane = g.summary[0][i]
135             self.failUnlessEqual(summary_lane.cluster, clusters[i])
136             self.failUnlessEqual(summary_lane.lane, i)
137
138         xml = g.get_elements()
139         # just make sure that element tree can serialize the tree
140         xml_str = ElementTree.tostring(xml)
141         g2 = gerald.Gerald(xml=xml)
142         return
143
144         # do it all again after extracting from the xml file
145         self.failUnlessEqual(g.version, g2.version)
146         self.failUnlessEqual(g.date, g2.date)
147         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
148         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
149
150         # test lane specific parameters from gerald config file
151         for i in range(1,9):
152             g_lane = g.lanes[i]
153             g2_lane = g2.lanes[i]
154             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
155             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
156             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
157             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
158
159         # test (some) summary elements
160         self.failUnlessEqual(len(g.summary), 1)
161         for i in range(1,9):
162             g_summary = g.summary[0][i]
163             g2_summary = g2.summary[0][i]
164             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
165             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
166
167             g_eland = g.eland_results
168             g2_eland = g2.eland_results
169             for lane in g_eland.results[0].keys():
170                 g_results = g_eland.results[0][lane]
171                 g2_results = g2_eland.results[0][lane]
172                 self.failUnlessEqual(g_results.reads,
173                                      g2_results.reads)
174                 if isinstance(g_results, eland.ElandLane):
175                   self.failUnlessEqual(len(g_results.mapped_reads),
176                                        len(g2_results.mapped_reads))
177                   for k in g_results.mapped_reads.keys():
178                       self.failUnlessEqual(g_results.mapped_reads[k],
179                                            g2_results.mapped_reads[k])
180
181                   self.failUnlessEqual(len(g_results.match_codes),
182                                        len(g2_results.match_codes))
183                   for k in g_results.match_codes.keys():
184                       self.failUnlessEqual(g_results.match_codes[k],
185                                            g2_results.match_codes[k])
186
187
188     def test_eland(self):
189         return
190         hg_map = {'Lambda.fa': 'Lambda.fa'}
191         for i in range(1,22):
192           short_name = 'chr%d.fa' % (i,)
193           long_name = 'hg18/chr%d.fa' % (i,)
194           hg_map[short_name] = long_name
195
196         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
197                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
198         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
199
200         # I added sequence lanes to the last 2 lanes of this test case
201         for i in range(1,7):
202             lane = eland_container.results[0][i]
203             self.failUnlessEqual(lane.reads, 6)
204             self.failUnlessEqual(lane.sample_name, "s")
205             self.failUnlessEqual(lane.lane_id, i)
206             self.failUnlessEqual(len(lane.mapped_reads), 17)
207             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
208             self.failUnlessEqual(lane.match_codes['U0'], 3)
209             self.failUnlessEqual(lane.match_codes['R0'], 2)
210             self.failUnlessEqual(lane.match_codes['U1'], 1)
211             self.failUnlessEqual(lane.match_codes['R1'], 9)
212             self.failUnlessEqual(lane.match_codes['U2'], 0)
213             self.failUnlessEqual(lane.match_codes['R2'], 12)
214             self.failUnlessEqual(lane.match_codes['NM'], 1)
215             self.failUnlessEqual(lane.match_codes['QC'], 0)
216
217         # test scarf
218         lane = eland_container.results[0][7]
219         self.failUnlessEqual(lane.reads, 5)
220         self.failUnlessEqual(lane.sample_name, 's')
221         self.failUnlessEqual(lane.lane_id, 7)
222         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
223
224         # test fastq
225         lane = eland_container.results[0][8]
226         self.failUnlessEqual(lane.reads, 3)
227         self.failUnlessEqual(lane.sample_name, 's')
228         self.failUnlessEqual(lane.lane_id, 8)
229         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
230
231         xml = eland_container.get_elements()
232         # just make sure that element tree can serialize the tree
233         xml_str = ElementTree.tostring(xml)
234         e2 = gerald.ELAND(xml=xml)
235
236         for i in range(1,9):
237             l1 = eland_container.results[0][i]
238             l2 = e2.results[0][i]
239             self.failUnlessEqual(l1.reads, l2.reads)
240             self.failUnlessEqual(l1.sample_name, l2.sample_name)
241             self.failUnlessEqual(l1.lane_id, l2.lane_id)
242             if isinstance(l1, eland.ElandLane):
243               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
244               self.failUnlessEqual(len(l1.mapped_reads), 17)
245               for k in l1.mapped_reads.keys():
246                   self.failUnlessEqual(l1.mapped_reads[k],
247                                        l2.mapped_reads[k])
248
249               self.failUnlessEqual(len(l1.match_codes), 9)
250               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
251               for k in l1.match_codes.keys():
252                   self.failUnlessEqual(l1.match_codes[k],
253                                        l2.match_codes[k])
254             elif isinstance(l1, eland.SequenceLane):
255                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
256
257     def test_runfolder(self):
258         return
259         runs = runfolder.get_runs(self.runfolder_dir)
260
261         # do we get the flowcell id from the filename?
262         self.failUnlessEqual(len(runs), 1)
263         name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
264         self.failUnlessEqual(runs[0].name, name)
265
266         # do we get the flowcell id from the FlowcellId.xml file
267         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
268         runs = runfolder.get_runs(self.runfolder_dir)
269         self.failUnlessEqual(len(runs), 1)
270         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
271         self.failUnlessEqual(runs[0].name, name)
272
273         r1 = runs[0]
274         xml = r1.get_elements()
275         xml_str = ElementTree.tostring(xml)
276
277         r2 = runfolder.PipelineRun(xml=xml)
278         self.failUnlessEqual(r1.name, r2.name)
279         self.failIfEqual(r2.image_analysis, None)
280         self.failIfEqual(r2.bustard, None)
281         self.failIfEqual(r2.gerald, None)
282
283
284 def suite():
285     return unittest.makeSuite(RunfolderTests,'test')
286
287 if __name__ == "__main__":
288     logging.basicConfig(level=logging.WARN)
289     unittest.main(defaultTest="suite")
290