927cf61072f038eb626f4a07deecd937dbbd71b6
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta1_12.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import os
5 import tempfile
6 import shutil
7 import unittest
8
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.runfolder import ElementTree
15
16 from htsworkflow.pipelines.test.simulate_runfolder import *
17
18
19 def make_runfolder(obj=None):
20     """
21     Make a fake runfolder, attach all the directories to obj if defined
22     """
23     # make a fake runfolder directory
24     flowcell_id = 'D07K6ACXX'
25     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
26
27     runfolder_dir = os.path.join(temp_dir,
28                                  '110815_SN787_0101_A{0}'.format(flowcell_id))
29     os.mkdir(runfolder_dir)
30
31     data_dir = os.path.join(runfolder_dir, 'Data')
32     os.mkdir(data_dir)
33
34     intensities_dir = make_rta_intensities_1_12(data_dir)
35
36     basecalls_dir = make_rta_basecalls_1_12(intensities_dir)
37     make_matrix_dir_rta_1_12(basecalls_dir)
38
39     unaligned_dir = os.path.join(runfolder_dir, "Unaligned")
40     os.mkdir(unaligned_dir)
41     make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
42     make_unaligned_config_1_12(unaligned_dir)
43
44     aligned_dir = os.path.join(runfolder_dir, "Aligned")
45     os.mkdir(aligned_dir)
46     make_aligned_eland_export(aligned_dir, flowcell_id)
47     make_aligned_config_1_12(aligned_dir)
48
49     if obj is not None:
50         obj.temp_dir = temp_dir
51         obj.runfolder_dir = runfolder_dir
52         obj.data_dir = data_dir
53         obj.image_analysis_dir = intensities_dir
54         obj.bustard_dir = unaligned_dir
55         obj.gerald_dir = aligned_dir
56
57
58 class RunfolderTests(unittest.TestCase):
59     """
60     Test components of the runfolder processing code
61     which includes firecrest, bustard, and gerald
62     """
63     def setUp(self):
64         # attaches all the directories to the object passed in
65         make_runfolder(self)
66
67     def tearDown(self):
68         shutil.rmtree(self.temp_dir)
69
70     def test_bustard(self):
71         """Construct a bustard object"""
72         b = bustard.bustard(self.bustard_dir)
73         self.failUnlessEqual(b.version, '1.8.70.0')
74         self.failUnlessEqual(b.date,    None)
75         self.failUnlessEqual(b.user,    None)
76         self.failUnlessEqual(len(b.phasing), 0)
77
78         xml = b.get_elements()
79         b2 = bustard.Bustard(xml=xml)
80         self.failUnlessEqual(b.version, b2.version)
81         self.failUnlessEqual(b.date,    b2.date )
82         self.failUnlessEqual(b.user,    b2.user)
83
84     def test_gerald(self):
85         # need to update gerald and make tests for it
86         g = gerald.gerald(self.gerald_dir)
87
88         self.failUnlessEqual(g.version,
89             '@(#) Id: GERALD.pl,v 1.171 2008/05/19 17:36:14 mzerara Exp')
90         self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
91         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
92         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
93
94
95         # list of genomes, matches what was defined up in
96         # make_gerald_config.
97         # the first None is to offset the genomes list to be 1..9
98         # instead of pythons default 0..8
99         genomes = [None,
100                    '/g/mm9',
101                    '/g/mm9',
102                    '/g/elegans190',
103                    '/g/arabidopsis01222004',
104                    '/g/mm9',
105                    '/g/mm9',
106                    '/g/mm9',
107                    '/g/mm9', ]
108
109         # test lane specific parameters from gerald config file
110         for i in range(1,9):
111             cur_lane = g.lanes[i]
112             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
113             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
114             self.failUnlessEqual(cur_lane.read_length, '37')
115             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
116
117         # I want to be able to use a simple iterator
118         for l in g.lanes.values():
119           self.failUnlessEqual(l.analysis, 'eland_extended')
120           self.failUnlessEqual(l.read_length, '37')
121           self.failUnlessEqual(l.use_bases, 'Y'*37)
122
123         # test data extracted from summary file
124         clusters = [None,
125                     (281331, 11169), (203841, 13513),
126                     (220889, 15653), (137294, 14666),
127                     (129388, 14525), (262092, 10751),
128                     (185754, 13503), (233765, 9537),]
129
130         self.failUnlessEqual(len(g.summary), 1)
131         for i in range(1,9):
132             summary_lane = g.summary[0][i]
133             self.failUnlessEqual(summary_lane.cluster, clusters[i])
134             self.failUnlessEqual(summary_lane.lane, i)
135
136         xml = g.get_elements()
137         # just make sure that element tree can serialize the tree
138         xml_str = ElementTree.tostring(xml)
139         g2 = gerald.Gerald(xml=xml)
140         return
141
142         # do it all again after extracting from the xml file
143         self.failUnlessEqual(g.version, g2.version)
144         self.failUnlessEqual(g.date, g2.date)
145         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
146         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
147
148         # test lane specific parameters from gerald config file
149         for i in range(1,9):
150             g_lane = g.lanes[i]
151             g2_lane = g2.lanes[i]
152             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
153             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
154             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
155             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
156
157         # test (some) summary elements
158         self.failUnlessEqual(len(g.summary), 1)
159         for i in range(1,9):
160             g_summary = g.summary[0][i]
161             g2_summary = g2.summary[0][i]
162             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
163             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
164
165             g_eland = g.eland_results
166             g2_eland = g2.eland_results
167             for lane in g_eland.results[0].keys():
168                 g_results = g_eland.results[0][lane]
169                 g2_results = g2_eland.results[0][lane]
170                 self.failUnlessEqual(g_results.reads,
171                                      g2_results.reads)
172                 if isinstance(g_results, eland.ElandLane):
173                   self.failUnlessEqual(len(g_results.mapped_reads),
174                                        len(g2_results.mapped_reads))
175                   for k in g_results.mapped_reads.keys():
176                       self.failUnlessEqual(g_results.mapped_reads[k],
177                                            g2_results.mapped_reads[k])
178
179                   self.failUnlessEqual(len(g_results.match_codes),
180                                        len(g2_results.match_codes))
181                   for k in g_results.match_codes.keys():
182                       self.failUnlessEqual(g_results.match_codes[k],
183                                            g2_results.match_codes[k])
184
185
186     def test_eland(self):
187         return
188         hg_map = {'Lambda.fa': 'Lambda.fa'}
189         for i in range(1,22):
190           short_name = 'chr%d.fa' % (i,)
191           long_name = 'hg18/chr%d.fa' % (i,)
192           hg_map[short_name] = long_name
193
194         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
195                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
196         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
197
198         # I added sequence lanes to the last 2 lanes of this test case
199         for i in range(1,7):
200             lane = eland_container.results[0][i]
201             self.failUnlessEqual(lane.reads, 6)
202             self.failUnlessEqual(lane.sample_name, "s")
203             self.failUnlessEqual(lane.lane_id, i)
204             self.failUnlessEqual(len(lane.mapped_reads), 17)
205             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
206             self.failUnlessEqual(lane.match_codes['U0'], 3)
207             self.failUnlessEqual(lane.match_codes['R0'], 2)
208             self.failUnlessEqual(lane.match_codes['U1'], 1)
209             self.failUnlessEqual(lane.match_codes['R1'], 9)
210             self.failUnlessEqual(lane.match_codes['U2'], 0)
211             self.failUnlessEqual(lane.match_codes['R2'], 12)
212             self.failUnlessEqual(lane.match_codes['NM'], 1)
213             self.failUnlessEqual(lane.match_codes['QC'], 0)
214
215         # test scarf
216         lane = eland_container.results[0][7]
217         self.failUnlessEqual(lane.reads, 5)
218         self.failUnlessEqual(lane.sample_name, 's')
219         self.failUnlessEqual(lane.lane_id, 7)
220         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
221
222         # test fastq
223         lane = eland_container.results[0][8]
224         self.failUnlessEqual(lane.reads, 3)
225         self.failUnlessEqual(lane.sample_name, 's')
226         self.failUnlessEqual(lane.lane_id, 8)
227         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
228
229         xml = eland_container.get_elements()
230         # just make sure that element tree can serialize the tree
231         xml_str = ElementTree.tostring(xml)
232         e2 = gerald.ELAND(xml=xml)
233
234         for i in range(1,9):
235             l1 = eland_container.results[0][i]
236             l2 = e2.results[0][i]
237             self.failUnlessEqual(l1.reads, l2.reads)
238             self.failUnlessEqual(l1.sample_name, l2.sample_name)
239             self.failUnlessEqual(l1.lane_id, l2.lane_id)
240             if isinstance(l1, eland.ElandLane):
241               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
242               self.failUnlessEqual(len(l1.mapped_reads), 17)
243               for k in l1.mapped_reads.keys():
244                   self.failUnlessEqual(l1.mapped_reads[k],
245                                        l2.mapped_reads[k])
246
247               self.failUnlessEqual(len(l1.match_codes), 9)
248               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
249               for k in l1.match_codes.keys():
250                   self.failUnlessEqual(l1.match_codes[k],
251                                        l2.match_codes[k])
252             elif isinstance(l1, eland.SequenceLane):
253                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
254
255     def test_runfolder(self):
256         return
257         runs = runfolder.get_runs(self.runfolder_dir)
258
259         # do we get the flowcell id from the filename?
260         self.failUnlessEqual(len(runs), 1)
261         name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
262         self.failUnlessEqual(runs[0].name, name)
263
264         # do we get the flowcell id from the FlowcellId.xml file
265         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
266         runs = runfolder.get_runs(self.runfolder_dir)
267         self.failUnlessEqual(len(runs), 1)
268         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
269         self.failUnlessEqual(runs[0].name, name)
270
271         r1 = runs[0]
272         xml = r1.get_elements()
273         xml_str = ElementTree.tostring(xml)
274
275         r2 = runfolder.PipelineRun(xml=xml)
276         self.failUnlessEqual(r1.name, r2.name)
277         self.failIfEqual(r2.image_analysis, None)
278         self.failIfEqual(r2.bustard, None)
279         self.failIfEqual(r2.gerald, None)
280
281
282 def suite():
283     return unittest.makeSuite(RunfolderTests,'test')
284
285 if __name__ == "__main__":
286     #unittest.main(defaultTest="suite")
287     class Test(object): pass
288     t = Test()
289     make_runfolder(t)
290     print ('path ' + t.runfolder_dir)
291