Return a gerald version number as a number and not a cvs string.
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta180.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import os
5 import tempfile
6 import shutil
7 import unittest
8
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.runfolder import ElementTree
15
16 from htsworkflow.pipelines.test.simulate_runfolder import *
17
18
19 def make_runfolder(obj=None):
20     """
21     Make a fake runfolder, attach all the directories to obj if defined
22     """
23     # make a fake runfolder directory
24     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
25
26     runfolder_dir = os.path.join(temp_dir,
27                                  '090608_HWI-EAS229_0117_4286GAAXX')
28     os.mkdir(runfolder_dir)
29
30     data_dir = os.path.join(runfolder_dir, 'Data')
31     os.mkdir(data_dir)
32
33     intensities_dir = make_rta_intensities_1870(data_dir)
34
35     basecalls_dir = make_rta_basecalls_1870(intensities_dir)
36     make_matrix_dir_rta160(basecalls_dir)
37
38     gerald_dir = os.path.join(basecalls_dir,
39                               'GERALD_07-09-2010_diane')
40     os.mkdir(gerald_dir)
41     make_gerald_config_100(gerald_dir)
42     make_summary_rta160_xml(gerald_dir)
43     make_eland_export(gerald_dir, lane_list=[1,2,3,4,5,6,])
44     make_scarf(gerald_dir, lane_list=[7,])
45     make_fastq(gerald_dir, lane_list=[8,])
46
47     if obj is not None:
48         obj.temp_dir = temp_dir
49         obj.runfolder_dir = runfolder_dir
50         obj.data_dir = data_dir
51         obj.image_analysis_dir = intensities_dir
52         obj.bustard_dir = basecalls_dir
53         obj.gerald_dir = gerald_dir
54
55
56 class RunfolderTests(unittest.TestCase):
57     """
58     Test components of the runfolder processing code
59     which includes firecrest, bustard, and gerald
60     """
61     def setUp(self):
62         # attaches all the directories to the object passed in
63         make_runfolder(self)
64
65     def tearDown(self):
66         shutil.rmtree(self.temp_dir)
67
68     def test_bustard(self):
69         """Construct a bustard object"""
70         b = bustard.bustard(self.bustard_dir)
71         self.failUnlessEqual(b.version, '1.8.70.0')
72         self.failUnlessEqual(b.date,    None)
73         self.failUnlessEqual(b.user,    None)
74         self.failUnlessEqual(len(b.phasing), 0)
75
76         xml = b.get_elements()
77         b2 = bustard.Bustard(xml=xml)
78         self.failUnlessEqual(b.version, b2.version)
79         self.failUnlessEqual(b.date,    b2.date )
80         self.failUnlessEqual(b.user,    b2.user)
81
82     def test_gerald(self):
83         # need to update gerald and make tests for it
84         g = gerald.gerald(self.gerald_dir)
85
86         self.failUnlessEqual(g.version, '1.171')
87         self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
88         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
89         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
90
91
92         # list of genomes, matches what was defined up in
93         # make_gerald_config.
94         # the first None is to offset the genomes list to be 1..9
95         # instead of pythons default 0..8
96         genomes = [None,
97                    '/g/mm9',
98                    '/g/mm9',
99                    '/g/elegans190',
100                    '/g/arabidopsis01222004',
101                    '/g/mm9',
102                    '/g/mm9',
103                    '/g/mm9',
104                    '/g/mm9', ]
105
106         # test lane specific parameters from gerald config file
107         for i in range(1,9):
108             cur_lane = g.lanes[i]
109             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
110             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
111             self.failUnlessEqual(cur_lane.read_length, '37')
112             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
113
114         # I want to be able to use a simple iterator
115         for l in g.lanes.values():
116           self.failUnlessEqual(l.analysis, 'eland_extended')
117           self.failUnlessEqual(l.read_length, '37')
118           self.failUnlessEqual(l.use_bases, 'Y'*37)
119
120         # test data extracted from summary file
121         clusters = [None,
122                     (281331, 11169), (203841, 13513),
123                     (220889, 15653), (137294, 14666),
124                     (129388, 14525), (262092, 10751),
125                     (185754, 13503), (233765, 9537),]
126
127         self.failUnlessEqual(len(g.summary), 1)
128         for i in range(1,9):
129             summary_lane = g.summary[0][i]
130             self.failUnlessEqual(summary_lane.cluster, clusters[i])
131             self.failUnlessEqual(summary_lane.lane, i)
132
133         xml = g.get_elements()
134         # just make sure that element tree can serialize the tree
135         xml_str = ElementTree.tostring(xml)
136         g2 = gerald.Gerald(xml=xml)
137         return
138
139         # do it all again after extracting from the xml file
140         self.failUnlessEqual(g.version, g2.version)
141         self.failUnlessEqual(g.date, g2.date)
142         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
143         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
144
145         # test lane specific parameters from gerald config file
146         for i in range(1,9):
147             g_lane = g.lanes[i]
148             g2_lane = g2.lanes[i]
149             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
150             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
151             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
152             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
153
154         # test (some) summary elements
155         self.failUnlessEqual(len(g.summary), 1)
156         for i in range(1,9):
157             g_summary = g.summary[0][i]
158             g2_summary = g2.summary[0][i]
159             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
160             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
161
162             g_eland = g.eland_results
163             g2_eland = g2.eland_results
164             for lane in g_eland.results[0].keys():
165                 g_results = g_eland.results[0][lane]
166                 g2_results = g2_eland.results[0][lane]
167                 self.failUnlessEqual(g_results.reads,
168                                      g2_results.reads)
169                 if isinstance(g_results, eland.ElandLane):
170                   self.failUnlessEqual(len(g_results.mapped_reads),
171                                        len(g2_results.mapped_reads))
172                   for k in g_results.mapped_reads.keys():
173                       self.failUnlessEqual(g_results.mapped_reads[k],
174                                            g2_results.mapped_reads[k])
175
176                   self.failUnlessEqual(len(g_results.match_codes),
177                                        len(g2_results.match_codes))
178                   for k in g_results.match_codes.keys():
179                       self.failUnlessEqual(g_results.match_codes[k],
180                                            g2_results.match_codes[k])
181
182
183     def test_eland(self):
184         return
185         hg_map = {'Lambda.fa': 'Lambda.fa'}
186         for i in range(1,22):
187           short_name = 'chr%d.fa' % (i,)
188           long_name = 'hg18/chr%d.fa' % (i,)
189           hg_map[short_name] = long_name
190
191         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
192                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
193         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
194
195         # I added sequence lanes to the last 2 lanes of this test case
196         for i in range(1,7):
197             lane = eland_container.results[0][i]
198             self.failUnlessEqual(lane.reads, 6)
199             self.failUnlessEqual(lane.sample_name, "s")
200             self.failUnlessEqual(lane.lane_id, i)
201             self.failUnlessEqual(len(lane.mapped_reads), 17)
202             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
203             self.failUnlessEqual(lane.match_codes['U0'], 3)
204             self.failUnlessEqual(lane.match_codes['R0'], 2)
205             self.failUnlessEqual(lane.match_codes['U1'], 1)
206             self.failUnlessEqual(lane.match_codes['R1'], 9)
207             self.failUnlessEqual(lane.match_codes['U2'], 0)
208             self.failUnlessEqual(lane.match_codes['R2'], 12)
209             self.failUnlessEqual(lane.match_codes['NM'], 1)
210             self.failUnlessEqual(lane.match_codes['QC'], 0)
211
212         # test scarf
213         lane = eland_container.results[0][7]
214         self.failUnlessEqual(lane.reads, 5)
215         self.failUnlessEqual(lane.sample_name, 's')
216         self.failUnlessEqual(lane.lane_id, 7)
217         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
218
219         # test fastq
220         lane = eland_container.results[0][8]
221         self.failUnlessEqual(lane.reads, 3)
222         self.failUnlessEqual(lane.sample_name, 's')
223         self.failUnlessEqual(lane.lane_id, 8)
224         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
225
226         xml = eland_container.get_elements()
227         # just make sure that element tree can serialize the tree
228         xml_str = ElementTree.tostring(xml)
229         e2 = gerald.ELAND(xml=xml)
230
231         for i in range(1,9):
232             l1 = eland_container.results[0][i]
233             l2 = e2.results[0][i]
234             self.failUnlessEqual(l1.reads, l2.reads)
235             self.failUnlessEqual(l1.sample_name, l2.sample_name)
236             self.failUnlessEqual(l1.lane_id, l2.lane_id)
237             if isinstance(l1, eland.ElandLane):
238               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
239               self.failUnlessEqual(len(l1.mapped_reads), 17)
240               for k in l1.mapped_reads.keys():
241                   self.failUnlessEqual(l1.mapped_reads[k],
242                                        l2.mapped_reads[k])
243
244               self.failUnlessEqual(len(l1.match_codes), 9)
245               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
246               for k in l1.match_codes.keys():
247                   self.failUnlessEqual(l1.match_codes[k],
248                                        l2.match_codes[k])
249             elif isinstance(l1, eland.SequenceLane):
250                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
251
252     def test_runfolder(self):
253         return
254         runs = runfolder.get_runs(self.runfolder_dir)
255
256         # do we get the flowcell id from the filename?
257         self.failUnlessEqual(len(runs), 1)
258         name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
259         self.failUnlessEqual(runs[0].name, name)
260
261         # do we get the flowcell id from the FlowcellId.xml file
262         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
263         runs = runfolder.get_runs(self.runfolder_dir)
264         self.failUnlessEqual(len(runs), 1)
265         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
266         self.failUnlessEqual(runs[0].name, name)
267
268         r1 = runs[0]
269         xml = r1.get_elements()
270         xml_str = ElementTree.tostring(xml)
271
272         r2 = runfolder.PipelineRun(xml=xml)
273         self.failUnlessEqual(r1.name, r2.name)
274         self.failIfEqual(r2.image_analysis, None)
275         self.failIfEqual(r2.bustard, None)
276         self.failIfEqual(r2.gerald, None)
277
278
279 def suite():
280     return unittest.makeSuite(RunfolderTests,'test')
281
282 if __name__ == "__main__":
283     unittest.main(defaultTest="suite")
284