Add support for CASAVA 1.7
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta180.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import os
5 import tempfile
6 import shutil
7 import unittest
8
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.runfolder import ElementTree
15
16 from htsworkflow.pipelines.test.simulate_runfolder import *
17
18
19 def make_runfolder(obj=None):
20     """
21     Make a fake runfolder, attach all the directories to obj if defined
22     """
23     # make a fake runfolder directory
24     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
25
26     runfolder_dir = os.path.join(temp_dir,
27                                  '090608_HWI-EAS229_0117_4286GAAXX')
28     os.mkdir(runfolder_dir)
29
30     data_dir = os.path.join(runfolder_dir, 'Data')
31     os.mkdir(data_dir)
32
33     intensities_dir = make_rta_intensities_1870(data_dir)
34
35     basecalls_dir = make_rta_basecalls_1870(intensities_dir)
36     make_matrix_dir_rta160(basecalls_dir)
37
38     gerald_dir = os.path.join(basecalls_dir,
39                               'GERALD_07-09-2010_diane')
40     os.mkdir(gerald_dir)
41     make_gerald_config_100(gerald_dir)
42     make_summary_rta160_xml(gerald_dir)
43     make_eland_export(gerald_dir, lane_list=[1,2,3,4,5,6,])
44     make_scarf(gerald_dir, lane_list=[7,])
45     make_fastq(gerald_dir, lane_list=[8,])
46
47     if obj is not None:
48         obj.temp_dir = temp_dir
49         obj.runfolder_dir = runfolder_dir
50         obj.data_dir = data_dir
51         obj.image_analysis_dir = intensities_dir
52         obj.bustard_dir = basecalls_dir
53         obj.gerald_dir = gerald_dir
54
55
56 class RunfolderTests(unittest.TestCase):
57     """
58     Test components of the runfolder processing code
59     which includes firecrest, bustard, and gerald
60     """
61     def setUp(self):
62         # attaches all the directories to the object passed in
63         make_runfolder(self)
64
65     def tearDown(self):
66         shutil.rmtree(self.temp_dir)
67
68     def test_bustard(self):
69         """Construct a bustard object"""
70         b = bustard.bustard(self.bustard_dir)
71         self.failUnlessEqual(b.version, '1.8.70.0')
72         self.failUnlessEqual(b.date,    None)
73         self.failUnlessEqual(b.user,    None)
74         self.failUnlessEqual(len(b.phasing), 0)
75
76         xml = b.get_elements()
77         b2 = bustard.Bustard(xml=xml)
78         self.failUnlessEqual(b.version, b2.version)
79         self.failUnlessEqual(b.date,    b2.date )
80         self.failUnlessEqual(b.user,    b2.user)
81         
82     def test_gerald(self):
83         # need to update gerald and make tests for it
84         g = gerald.gerald(self.gerald_dir)
85
86         self.failUnlessEqual(g.version,
87             '@(#) Id: GERALD.pl,v 1.171 2008/05/19 17:36:14 mzerara Exp')
88         self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
89         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
90         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
91
92
93         # list of genomes, matches what was defined up in
94         # make_gerald_config.
95         # the first None is to offset the genomes list to be 1..9
96         # instead of pythons default 0..8
97         genomes = [None, 
98                    '/g/mm9', 
99                    '/g/mm9', 
100                    '/g/elegans190', 
101                    '/g/arabidopsis01222004',
102                    '/g/mm9', 
103                    '/g/mm9', 
104                    '/g/mm9', 
105                    '/g/mm9', ]
106
107         # test lane specific parameters from gerald config file
108         for i in range(1,9):
109             cur_lane = g.lanes[i]
110             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
111             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
112             self.failUnlessEqual(cur_lane.read_length, '37')
113             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
114
115         # I want to be able to use a simple iterator
116         for l in g.lanes.values():
117           self.failUnlessEqual(l.analysis, 'eland_extended')
118           self.failUnlessEqual(l.read_length, '37')
119           self.failUnlessEqual(l.use_bases, 'Y'*37)
120
121         # test data extracted from summary file
122         clusters = [None,
123                     (281331, 11169), (203841, 13513),
124                     (220889, 15653), (137294, 14666),
125                     (129388, 14525), (262092, 10751),
126                     (185754, 13503), (233765, 9537),]
127
128         self.failUnlessEqual(len(g.summary), 1)
129         for i in range(1,9):
130             summary_lane = g.summary[0][i]
131             self.failUnlessEqual(summary_lane.cluster, clusters[i])
132             self.failUnlessEqual(summary_lane.lane, i)
133
134         xml = g.get_elements()
135         # just make sure that element tree can serialize the tree
136         xml_str = ElementTree.tostring(xml)
137         g2 = gerald.Gerald(xml=xml)
138         return
139
140         # do it all again after extracting from the xml file
141         self.failUnlessEqual(g.version, g2.version)
142         self.failUnlessEqual(g.date, g2.date)
143         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
144         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
145
146         # test lane specific parameters from gerald config file
147         for i in range(1,9):
148             g_lane = g.lanes[i]
149             g2_lane = g2.lanes[i]
150             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
151             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
152             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
153             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
154
155         # test (some) summary elements
156         self.failUnlessEqual(len(g.summary), 1)
157         for i in range(1,9):
158             g_summary = g.summary[0][i]
159             g2_summary = g2.summary[0][i]
160             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
161             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
162
163             g_eland = g.eland_results
164             g2_eland = g2.eland_results
165             for lane in g_eland.results[0].keys():
166                 g_results = g_eland.results[0][lane]
167                 g2_results = g2_eland.results[0][lane]
168                 self.failUnlessEqual(g_results.reads,
169                                      g2_results.reads)
170                 if isinstance(g_results, eland.ElandLane):
171                   self.failUnlessEqual(len(g_results.mapped_reads),
172                                        len(g2_results.mapped_reads))
173                   for k in g_results.mapped_reads.keys():
174                       self.failUnlessEqual(g_results.mapped_reads[k],
175                                            g2_results.mapped_reads[k])
176
177                   self.failUnlessEqual(len(g_results.match_codes),
178                                        len(g2_results.match_codes))
179                   for k in g_results.match_codes.keys():
180                       self.failUnlessEqual(g_results.match_codes[k],
181                                            g2_results.match_codes[k])
182
183
184     def test_eland(self):
185         return
186         hg_map = {'Lambda.fa': 'Lambda.fa'}
187         for i in range(1,22):
188           short_name = 'chr%d.fa' % (i,)
189           long_name = 'hg18/chr%d.fa' % (i,)
190           hg_map[short_name] = long_name
191
192         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
193                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
194         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
195
196         # I added sequence lanes to the last 2 lanes of this test case
197         for i in range(1,7):
198             lane = eland_container.results[0][i]
199             self.failUnlessEqual(lane.reads, 6)
200             self.failUnlessEqual(lane.sample_name, "s")
201             self.failUnlessEqual(lane.lane_id, i)
202             self.failUnlessEqual(len(lane.mapped_reads), 17)
203             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
204             self.failUnlessEqual(lane.match_codes['U0'], 3)
205             self.failUnlessEqual(lane.match_codes['R0'], 2)
206             self.failUnlessEqual(lane.match_codes['U1'], 1)
207             self.failUnlessEqual(lane.match_codes['R1'], 9)
208             self.failUnlessEqual(lane.match_codes['U2'], 0)
209             self.failUnlessEqual(lane.match_codes['R2'], 12)
210             self.failUnlessEqual(lane.match_codes['NM'], 1)
211             self.failUnlessEqual(lane.match_codes['QC'], 0)
212
213         # test scarf
214         lane = eland_container.results[0][7]
215         self.failUnlessEqual(lane.reads, 5)
216         self.failUnlessEqual(lane.sample_name, 's')
217         self.failUnlessEqual(lane.lane_id, 7)
218         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
219
220         # test fastq
221         lane = eland_container.results[0][8]
222         self.failUnlessEqual(lane.reads, 3)
223         self.failUnlessEqual(lane.sample_name, 's')
224         self.failUnlessEqual(lane.lane_id, 8)
225         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
226
227         xml = eland_container.get_elements()
228         # just make sure that element tree can serialize the tree
229         xml_str = ElementTree.tostring(xml)
230         e2 = gerald.ELAND(xml=xml)
231
232         for i in range(1,9):
233             l1 = eland_container.results[0][i]
234             l2 = e2.results[0][i]
235             self.failUnlessEqual(l1.reads, l2.reads)
236             self.failUnlessEqual(l1.sample_name, l2.sample_name)
237             self.failUnlessEqual(l1.lane_id, l2.lane_id)
238             if isinstance(l1, eland.ElandLane):
239               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
240               self.failUnlessEqual(len(l1.mapped_reads), 17)
241               for k in l1.mapped_reads.keys():
242                   self.failUnlessEqual(l1.mapped_reads[k],
243                                        l2.mapped_reads[k])
244
245               self.failUnlessEqual(len(l1.match_codes), 9)
246               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
247               for k in l1.match_codes.keys():
248                   self.failUnlessEqual(l1.match_codes[k],
249                                        l2.match_codes[k])
250             elif isinstance(l1, eland.SequenceLane):
251                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
252
253     def test_runfolder(self):
254         return
255         runs = runfolder.get_runs(self.runfolder_dir)
256
257         # do we get the flowcell id from the filename?
258         self.failUnlessEqual(len(runs), 1)
259         name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
260         self.failUnlessEqual(runs[0].name, name)
261
262         # do we get the flowcell id from the FlowcellId.xml file
263         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
264         runs = runfolder.get_runs(self.runfolder_dir)
265         self.failUnlessEqual(len(runs), 1)
266         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
267         self.failUnlessEqual(runs[0].name, name)
268
269         r1 = runs[0]
270         xml = r1.get_elements()
271         xml_str = ElementTree.tostring(xml)
272
273         r2 = runfolder.PipelineRun(xml=xml)
274         self.failUnlessEqual(r1.name, r2.name)
275         self.failIfEqual(r2.image_analysis, None)
276         self.failIfEqual(r2.bustard, None)
277         self.failIfEqual(r2.gerald, None)
278
279
280 def suite():
281     return unittest.makeSuite(RunfolderTests,'test')
282
283 if __name__ == "__main__":
284     unittest.main(defaultTest="suite")
285