2 from __future__ import absolute_import
4 from datetime import datetime, date
8 from unittest import TestCase
10 from htsworkflow.pipelines import firecrest
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines import ElementTree
16 from .simulate_runfolder import *
19 def make_runfolder(obj=None):
21 Make a fake runfolder, attach all the directories to obj if defined
23 # make a fake runfolder directory
24 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
26 runfolder_dir = os.path.join(temp_dir,
27 '081017_HWI-EAS229_0062_30J55AAXX')
28 os.mkdir(runfolder_dir)
30 data_dir = os.path.join(runfolder_dir, 'Data')
33 firecrest_dir = os.path.join(data_dir,
34 'C1-37_Firecrest1.9.6_20-10-2008_diane')
35 os.mkdir(firecrest_dir)
37 bustard_dir = os.path.join(firecrest_dir,
38 'Bustard1.9.6_20-10-2008_diane')
40 make_phasing_params(bustard_dir)
42 matrix_name = os.path.join(bustard_dir, 'matrix1.txt')
43 make_matrix(matrix_name)
46 gerald_dir = os.path.join(bustard_dir,
47 'GERALD_20-10-2008_diane')
49 make_gerald_config_100(gerald_dir)
50 make_summary_htm_110(gerald_dir)
51 make_eland_multi(gerald_dir)
54 obj.temp_dir = temp_dir
55 obj.runfolder_dir = runfolder_dir
56 obj.data_dir = data_dir
57 obj.image_analysis_dir = firecrest_dir
58 obj.bustard_dir = bustard_dir
59 obj.gerald_dir = gerald_dir
62 class RunfolderTests(TestCase):
64 Test components of the runfolder processing code
65 which includes firecrest, bustard, and gerald
68 # attaches all the directories to the object passed in
72 shutil.rmtree(self.temp_dir)
74 def test_firecrest(self):
76 Construct a firecrest object
78 f = firecrest.firecrest(self.image_analysis_dir)
79 self.failUnlessEqual(f.software, 'Firecrest')
80 self.failUnlessEqual(f.version, '1.9.6')
81 self.failUnlessEqual(f.start, 1)
82 self.failUnlessEqual(f.stop, 37)
83 self.failUnlessEqual(f.user, 'diane')
84 self.failUnlessEqual(f.date, date(2008,10,20))
86 xml = f.get_elements()
87 # just make sure that element tree can serialize the tree
88 xml_str = ElementTree.tostring(xml)
90 f2 = firecrest.Firecrest(xml=xml)
91 self.failUnlessEqual(f.software, f2.software)
92 self.failUnlessEqual(f.version, f2.version)
93 self.failUnlessEqual(f.start, f2.start)
94 self.failUnlessEqual(f.stop, f2.stop)
95 self.failUnlessEqual(f.user, f2.user)
97 def test_bustard(self):
99 construct a bustard object
101 b = bustard.bustard(self.bustard_dir)
102 self.failUnlessEqual(b.software, 'Bustard')
103 self.failUnlessEqual(b.version, '1.9.6')
104 self.failUnlessEqual(b.date, date(2008,10,20))
105 self.failUnlessEqual(b.user, 'diane')
106 self.failUnlessEqual(len(b.phasing), 8)
107 self.failUnlessAlmostEqual(b.phasing[8].phasing, 0.0099)
109 xml = b.get_elements()
110 b2 = bustard.Bustard(xml=xml)
111 self.failUnlessEqual(b.software, b2.software)
112 self.failUnlessEqual(b.version, b2.version)
113 self.failUnlessEqual(b.date, b2.date )
114 self.failUnlessEqual(b.user, b2.user)
115 self.failUnlessEqual(len(b.phasing), len(b2.phasing))
116 for key in b.phasing.keys():
117 self.failUnlessEqual(b.phasing[key].lane,
118 b2.phasing[key].lane)
119 self.failUnlessEqual(b.phasing[key].phasing,
120 b2.phasing[key].phasing)
121 self.failUnlessEqual(b.phasing[key].prephasing,
122 b2.phasing[key].prephasing)
124 def test_gerald(self):
125 # need to update gerald and make tests for it
126 g = gerald.gerald(self.gerald_dir)
128 self.failUnlessEqual(g.software, 'GERALD')
129 self.failUnlessEqual(g.version, '1.171')
130 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
131 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
132 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
135 # list of genomes, matches what was defined up in
136 # make_gerald_config.
137 # the first None is to offset the genomes list to be 1..9
138 # instead of pythons default 0..8
143 '/g/arabidopsis01222004',
149 # test lane specific parameters from gerald config file
151 cur_lane = g.lanes[i]
152 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
153 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
154 self.failUnlessEqual(cur_lane.read_length, '37')
155 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
157 # I want to be able to use a simple iterator
158 for l in g.lanes.values():
159 self.failUnlessEqual(l.analysis, 'eland_extended')
160 self.failUnlessEqual(l.read_length, '37')
161 self.failUnlessEqual(l.use_bases, 'Y'*37)
163 # raw cluster numbers extracted from summary file
164 # its the first +/- value in the lane results summary
167 (190220, 15118), (190560, 14399),
168 (187597, 12369), (204142, 16877),
169 (247308, 11600), (204298, 15640),
170 (202707, 15404), (198075, 14702),]
172 self.failUnlessEqual(len(g.summary), 1)
174 summary_lane = g.summary[0][i]
175 self.failUnlessEqual(summary_lane.cluster, clusters[i])
176 self.failUnlessEqual(summary_lane.lane, i)
178 xml = g.get_elements()
179 # just make sure that element tree can serialize the tree
180 xml_str = ElementTree.tostring(xml)
181 g2 = gerald.Gerald(xml=xml)
183 # do it all again after extracting from the xml file
184 self.failUnlessEqual(g.software, g2.software)
185 self.failUnlessEqual(g.version, g2.version)
186 self.failUnlessEqual(g.date, g2.date)
187 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
188 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
190 # test lane specific parameters from gerald config file
193 g2_lane = g2.lanes[i]
194 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
195 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
196 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
197 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
199 self.failUnlessEqual(len(g.summary), 1)
200 # test (some) summary elements
202 g_summary = g.summary[0][i]
203 g2_summary = g2.summary[0][i]
204 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
205 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
207 g_eland = g.eland_results
208 g2_eland = g2.eland_results
210 g_results = g_eland[key]
211 g2_results = g2_eland[key]
212 self.failUnlessEqual(g_results.reads,
214 self.failUnlessEqual(len(g_results.mapped_reads),
215 len(g2_results.mapped_reads))
216 for k in g_results.mapped_reads.keys():
217 self.failUnlessEqual(g_results.mapped_reads[k],
218 g2_results.mapped_reads[k])
220 self.failUnlessEqual(len(g_results.match_codes),
221 len(g2_results.match_codes))
222 for k in g_results.match_codes.keys():
223 self.failUnlessEqual(g_results.match_codes[k],
224 g2_results.match_codes[k])
227 def test_eland(self):
228 hg_map = {'Lambda.fa': 'Lambda.fa'}
229 for i in range(1,22):
230 short_name = 'chr%d.fa' % (i,)
231 long_name = 'hg18/chr%d.fa' % (i,)
232 hg_map[short_name] = long_name
234 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
235 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
236 eland = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
240 self.failUnlessEqual(lane.reads, 6)
241 self.failUnlessEqual(lane.sample_name, "s")
242 self.failUnlessEqual(lane.lane_id, key.lane)
243 self.failUnlessEqual(len(lane.mapped_reads), 17)
244 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
245 self.failUnlessEqual(lane.match_codes['U0'], 3)
246 self.failUnlessEqual(lane.match_codes['R0'], 2)
247 self.failUnlessEqual(lane.match_codes['U1'], 1)
248 self.failUnlessEqual(lane.match_codes['R1'], 9)
249 self.failUnlessEqual(lane.match_codes['U2'], 0)
250 self.failUnlessEqual(lane.match_codes['R2'], 12)
251 self.failUnlessEqual(lane.match_codes['NM'], 1)
252 self.failUnlessEqual(lane.match_codes['QC'], 0)
254 xml = eland.get_elements()
255 # just make sure that element tree can serialize the tree
256 xml_str = ElementTree.tostring(xml)
257 e2 = gerald.ELAND(xml=xml)
262 self.failUnlessEqual(l1.reads, l2.reads)
263 self.failUnlessEqual(l1.sample_name, l2.sample_name)
264 self.failUnlessEqual(l1.lane_id, l2.lane_id)
265 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
266 self.failUnlessEqual(len(l1.mapped_reads), 17)
267 for k in l1.mapped_reads.keys():
268 self.failUnlessEqual(l1.mapped_reads[k],
271 self.failUnlessEqual(len(l1.match_codes), 9)
272 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
273 for k in l1.match_codes.keys():
274 self.failUnlessEqual(l1.match_codes[k],
277 def test_runfolder(self):
278 runs = runfolder.get_runs(self.runfolder_dir)
280 # do we get the flowcell id from the filename?
281 self.failUnlessEqual(len(runs), 1)
282 name = 'run_30J55AAXX_2009-02-22.xml'
283 self.failUnlessEqual(runs[0].serialization_filename, name)
285 # do we get the flowcell id from the FlowcellId.xml file
286 make_flowcell_id(self.runfolder_dir, '30J55AAXX')
287 runs = runfolder.get_runs(self.runfolder_dir)
288 self.failUnlessEqual(len(runs), 1)
289 name = 'run_30J55AAXX_2009-02-22.xml'
290 self.failUnlessEqual(runs[0].serialization_filename, name)
293 xml = r1.get_elements()
294 xml_str = ElementTree.tostring(xml)
296 r2 = runfolder.PipelineRun(xml=xml)
297 self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
298 self.failIfEqual(r2.image_analysis, None)
299 self.failIfEqual(r2.bustard, None)
300 self.failIfEqual(r2.gerald, None)
304 from unittest import TestSuite, defaultTestLoader
306 suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
310 if __name__ == "__main__":
311 from unittest import main
312 main(defaultTest="suite")