Merge ssh://jumpgate.caltech.edu/var/htsworkflow/htsworkflow
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta180.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import os
5 import tempfile
6 import shutil
7 import unittest
8
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.runfolder import ElementTree
15
16 from htsworkflow.pipelines.test.simulate_runfolder import *
17
18
19 def make_runfolder(obj=None):
20     """
21     Make a fake runfolder, attach all the directories to obj if defined
22     """
23     # make a fake runfolder directory
24     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
25
26     runfolder_dir = os.path.join(temp_dir,
27                                  '090608_HWI-EAS229_0117_4286GAAXX')
28     os.mkdir(runfolder_dir)
29
30     data_dir = os.path.join(runfolder_dir, 'Data')
31     os.mkdir(data_dir)
32
33     intensities_dir = make_rta_intensities_1870(data_dir)
34
35     basecalls_dir = make_rta_basecalls_1870(intensities_dir)
36     make_matrix_dir_rta160(basecalls_dir)
37
38     gerald_dir = os.path.join(basecalls_dir,
39                               'GERALD_07-09-2010_diane')
40     os.mkdir(gerald_dir)
41     make_gerald_config_100(gerald_dir)
42     make_summary_rta160_xml(gerald_dir)
43     make_eland_export(gerald_dir, lane_list=[1,2,3,4,5,6,])
44     make_scarf(gerald_dir, lane_list=[7,])
45     make_fastq(gerald_dir, lane_list=[8,])
46
47     if obj is not None:
48         obj.temp_dir = temp_dir
49         obj.runfolder_dir = runfolder_dir
50         obj.data_dir = data_dir
51         obj.image_analysis_dir = intensities_dir
52         obj.bustard_dir = basecalls_dir
53         obj.gerald_dir = gerald_dir
54
55
56 class RunfolderTests(unittest.TestCase):
57     """
58     Test components of the runfolder processing code
59     which includes firecrest, bustard, and gerald
60     """
61     def setUp(self):
62         # attaches all the directories to the object passed in
63         make_runfolder(self)
64
65     def tearDown(self):
66         shutil.rmtree(self.temp_dir)
67
68     def test_bustard(self):
69         """Construct a bustard object"""
70         b = bustard.bustard(self.bustard_dir)
71         self.failUnlessEqual(b.software, 'RTA')
72         self.failUnlessEqual(b.version, '1.8.70.0')
73         self.failUnlessEqual(b.date,    None)
74         self.failUnlessEqual(b.user,    None)
75         self.failUnlessEqual(len(b.phasing), 0)
76
77         xml = b.get_elements()
78         b2 = bustard.Bustard(xml=xml)
79         self.failUnlessEqual(b.software, b2.software)
80         self.failUnlessEqual(b.version, b2.version)
81         self.failUnlessEqual(b.date,    b2.date )
82         self.failUnlessEqual(b.user,    b2.user)
83
84     def test_gerald(self):
85         # need to update gerald and make tests for it
86         g = gerald.gerald(self.gerald_dir)
87
88         self.failUnlessEqual(g.software, 'GERALD')
89         self.failUnlessEqual(g.version, '1.171')
90         self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
91         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
92         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
93
94
95         # list of genomes, matches what was defined up in
96         # make_gerald_config.
97         # the first None is to offset the genomes list to be 1..9
98         # instead of pythons default 0..8
99         genomes = [None,
100                    '/g/mm9',
101                    '/g/mm9',
102                    '/g/elegans190',
103                    '/g/arabidopsis01222004',
104                    '/g/mm9',
105                    '/g/mm9',
106                    '/g/mm9',
107                    '/g/mm9', ]
108
109         # test lane specific parameters from gerald config file
110         for i in range(1,9):
111             cur_lane = g.lanes[i]
112             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
113             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
114             self.failUnlessEqual(cur_lane.read_length, '37')
115             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
116
117         # I want to be able to use a simple iterator
118         for l in g.lanes.values():
119           self.failUnlessEqual(l.analysis, 'eland_extended')
120           self.failUnlessEqual(l.read_length, '37')
121           self.failUnlessEqual(l.use_bases, 'Y'*37)
122
123         # test data extracted from summary file
124         clusters = [None,
125                     (281331, 11169), (203841, 13513),
126                     (220889, 15653), (137294, 14666),
127                     (129388, 14525), (262092, 10751),
128                     (185754, 13503), (233765, 9537),]
129
130         self.failUnlessEqual(len(g.summary), 1)
131         for i in range(1,9):
132             summary_lane = g.summary[0][i]
133             self.failUnlessEqual(summary_lane.cluster, clusters[i])
134             self.failUnlessEqual(summary_lane.lane, i)
135
136         xml = g.get_elements()
137         # just make sure that element tree can serialize the tree
138         xml_str = ElementTree.tostring(xml)
139         g2 = gerald.Gerald(xml=xml)
140         return
141
142         # do it all again after extracting from the xml file
143         self.failUnlessEqual(g.software, g2.version)
144         self.failUnlessEqual(g.version, g2.version)
145         self.failUnlessEqual(g.date, g2.date)
146         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
147         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
148
149         # test lane specific parameters from gerald config file
150         for i in range(1,9):
151             g_lane = g.lanes[i]
152             g2_lane = g2.lanes[i]
153             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
154             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
155             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
156             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
157
158         # test (some) summary elements
159         self.failUnlessEqual(len(g.summary), 1)
160         for i in range(1,9):
161             g_summary = g.summary[0][i]
162             g2_summary = g2.summary[0][i]
163             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
164             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
165
166             g_eland = g.eland_results
167             g2_eland = g2.eland_results
168             for lane in g_eland.results[0].keys():
169                 g_results = g_eland.results[0][lane]
170                 g2_results = g2_eland.results[0][lane]
171                 self.failUnlessEqual(g_results.reads,
172                                      g2_results.reads)
173                 if isinstance(g_results, eland.ElandLane):
174                   self.failUnlessEqual(len(g_results.mapped_reads),
175                                        len(g2_results.mapped_reads))
176                   for k in g_results.mapped_reads.keys():
177                       self.failUnlessEqual(g_results.mapped_reads[k],
178                                            g2_results.mapped_reads[k])
179
180                   self.failUnlessEqual(len(g_results.match_codes),
181                                        len(g2_results.match_codes))
182                   for k in g_results.match_codes.keys():
183                       self.failUnlessEqual(g_results.match_codes[k],
184                                            g2_results.match_codes[k])
185
186
187     def test_eland(self):
188         return
189         hg_map = {'Lambda.fa': 'Lambda.fa'}
190         for i in range(1,22):
191           short_name = 'chr%d.fa' % (i,)
192           long_name = 'hg18/chr%d.fa' % (i,)
193           hg_map[short_name] = long_name
194
195         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
196                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
197         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
198
199         # I added sequence lanes to the last 2 lanes of this test case
200         for i in range(1,7):
201             lane = eland_container.results[0][i]
202             self.failUnlessEqual(lane.reads, 6)
203             self.failUnlessEqual(lane.sample_name, "s")
204             self.failUnlessEqual(lane.lane_id, i)
205             self.failUnlessEqual(len(lane.mapped_reads), 17)
206             self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
207             self.failUnlessEqual(lane.match_codes['U0'], 3)
208             self.failUnlessEqual(lane.match_codes['R0'], 2)
209             self.failUnlessEqual(lane.match_codes['U1'], 1)
210             self.failUnlessEqual(lane.match_codes['R1'], 9)
211             self.failUnlessEqual(lane.match_codes['U2'], 0)
212             self.failUnlessEqual(lane.match_codes['R2'], 12)
213             self.failUnlessEqual(lane.match_codes['NM'], 1)
214             self.failUnlessEqual(lane.match_codes['QC'], 0)
215
216         # test scarf
217         lane = eland_container.results[0][7]
218         self.failUnlessEqual(lane.reads, 5)
219         self.failUnlessEqual(lane.sample_name, 's')
220         self.failUnlessEqual(lane.lane_id, 7)
221         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
222
223         # test fastq
224         lane = eland_container.results[0][8]
225         self.failUnlessEqual(lane.reads, 3)
226         self.failUnlessEqual(lane.sample_name, 's')
227         self.failUnlessEqual(lane.lane_id, 8)
228         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
229
230         xml = eland_container.get_elements()
231         # just make sure that element tree can serialize the tree
232         xml_str = ElementTree.tostring(xml)
233         e2 = gerald.ELAND(xml=xml)
234
235         for i in range(1,9):
236             l1 = eland_container.results[0][i]
237             l2 = e2.results[0][i]
238             self.failUnlessEqual(l1.reads, l2.reads)
239             self.failUnlessEqual(l1.sample_name, l2.sample_name)
240             self.failUnlessEqual(l1.lane_id, l2.lane_id)
241             if isinstance(l1, eland.ElandLane):
242               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
243               self.failUnlessEqual(len(l1.mapped_reads), 17)
244               for k in l1.mapped_reads.keys():
245                   self.failUnlessEqual(l1.mapped_reads[k],
246                                        l2.mapped_reads[k])
247
248               self.failUnlessEqual(len(l1.match_codes), 9)
249               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
250               for k in l1.match_codes.keys():
251                   self.failUnlessEqual(l1.match_codes[k],
252                                        l2.match_codes[k])
253             elif isinstance(l1, eland.SequenceLane):
254                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
255
256     def test_runfolder(self):
257         return
258         runs = runfolder.get_runs(self.runfolder_dir)
259
260         # do we get the flowcell id from the filename?
261         self.failUnlessEqual(len(runs), 1)
262         name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
263         self.failUnlessEqual(runs[0].name, name)
264
265         # do we get the flowcell id from the FlowcellId.xml file
266         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
267         runs = runfolder.get_runs(self.runfolder_dir)
268         self.failUnlessEqual(len(runs), 1)
269         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
270         self.failUnlessEqual(runs[0].name, name)
271
272         r1 = runs[0]
273         xml = r1.get_elements()
274         xml_str = ElementTree.tostring(xml)
275
276         r2 = runfolder.PipelineRun(xml=xml)
277         self.failUnlessEqual(r1.name, r2.name)
278         self.failIfEqual(r2.image_analysis, None)
279         self.failIfEqual(r2.bustard, None)
280         self.failIfEqual(r2.gerald, None)
281
282
283 def suite():
284     return unittest.makeSuite(RunfolderTests,'test')
285
286 if __name__ == "__main__":
287     unittest.main(defaultTest="suite")
288