2 from __future__ import absolute_import
4 from datetime import datetime, date
8 from unittest import TestCase
10 from htsworkflow.pipelines import firecrest
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines.eland import SampleKey
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines import ElementTree
17 from .simulate_runfolder import *
20 def make_runfolder(obj=None):
22 Make a fake runfolder, attach all the directories to obj if defined
24 # make a fake runfolder directory
25 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
27 runfolder_dir = os.path.join(temp_dir,
28 '080102_HWI-EAS229_0010_207BTAAXX')
29 os.mkdir(runfolder_dir)
31 data_dir = os.path.join(runfolder_dir, 'Data')
34 ipar_dir = make_firecrest_dir(data_dir, "1.9.6", 1, 152)
36 matrix_dir = os.path.join(ipar_dir, 'Matrix')
38 matrix_name = os.path.join(matrix_dir, 's_matrix.txt')
39 make_matrix(matrix_name)
41 bustard_dir = os.path.join(ipar_dir,
42 'Bustard1.8.28_12-04-2008_diane')
44 make_phasing_params(bustard_dir)
46 gerald_dir = os.path.join(bustard_dir,
47 'GERALD_12-04-2008_diane')
49 make_gerald_config_100(gerald_dir)
50 make_summary_paired_htm(gerald_dir)
51 make_eland_multi(gerald_dir, paired=True)
54 obj.temp_dir = temp_dir
55 obj.runfolder_dir = runfolder_dir
56 obj.data_dir = data_dir
57 obj.image_analysis_dir = ipar_dir
58 obj.matrix_dir = matrix_dir
59 obj.bustard_dir = bustard_dir
60 obj.gerald_dir = gerald_dir
63 class RunfolderTests(TestCase):
65 Test components of the runfolder processing code
66 which includes firecrest, bustard, and gerald
69 # attaches all the directories to the object passed in
73 shutil.rmtree(self.temp_dir)
75 def test_firecrest(self):
77 Construct a firecrest object
79 f = firecrest.firecrest(self.image_analysis_dir)
80 self.failUnlessEqual(f.software, 'Firecrest')
81 self.failUnlessEqual(f.version, '1.9.6')
82 self.failUnlessEqual(f.start, 1)
83 self.failUnlessEqual(f.stop, 152)
84 self.failUnlessEqual(f.user, 'diane')
85 # As of 2008-12-8, the date was being set in
86 # simulate_runfolder.make_firecrest_dir
87 self.failUnlessEqual(f.date, date(2008,4,12))
89 xml = f.get_elements()
90 # just make sure that element tree can serialize the tree
91 xml_str = ElementTree.tostring(xml)
93 f2 = firecrest.Firecrest(xml=xml)
94 self.failUnlessEqual(f.software, f2.software)
95 self.failUnlessEqual(f.version, f2.version)
96 self.failUnlessEqual(f.start, f2.start)
97 self.failUnlessEqual(f.stop, f2.stop)
98 self.failUnlessEqual(f.user, f2.user)
100 def test_bustard(self):
102 construct a bustard object
104 b = bustard.bustard(self.bustard_dir)
105 self.failUnlessEqual(b.software, 'Bustard')
106 self.failUnlessEqual(b.version, '1.8.28')
107 self.failUnlessEqual(b.date, date(2008,4,12))
108 self.failUnlessEqual(b.user, 'diane')
109 self.failUnlessEqual(len(b.phasing), 8)
110 self.failUnlessAlmostEqual(b.phasing[8].phasing, 0.0099)
112 xml = b.get_elements()
113 b2 = bustard.Bustard(xml=xml)
114 self.failUnlessEqual(b.software, b2.software)
115 self.failUnlessEqual(b.version, b2.version)
116 self.failUnlessEqual(b.date, b2.date )
117 self.failUnlessEqual(b.user, b2.user)
118 self.failUnlessEqual(len(b.phasing), len(b2.phasing))
119 for key in b.phasing.keys():
120 self.failUnlessEqual(b.phasing[key].lane,
121 b2.phasing[key].lane)
122 self.failUnlessEqual(b.phasing[key].phasing,
123 b2.phasing[key].phasing)
124 self.failUnlessEqual(b.phasing[key].prephasing,
125 b2.phasing[key].prephasing)
127 def test_gerald(self):
128 # need to update gerald and make tests for it
129 g = gerald.gerald(self.gerald_dir)
131 self.failUnlessEqual(g.software, 'GERALD')
132 self.failUnlessEqual(g.version, '1.171')
133 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
134 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
135 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
138 # list of genomes, matches what was defined up in
139 # make_gerald_config.
140 # the first None is to offset the genomes list to be 1..9
141 # instead of pythons default 0..8
146 '/g/arabidopsis01222004',
152 # test lane specific parameters from gerald config file
154 cur_lane = g.lanes[i]
155 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
156 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
157 self.failUnlessEqual(cur_lane.read_length, '37')
158 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
160 # I want to be able to use a simple iterator
161 for l in g.lanes.values():
162 self.failUnlessEqual(l.analysis, 'eland_extended')
163 self.failUnlessEqual(l.read_length, '37')
164 self.failUnlessEqual(l.use_bases, 'Y'*37)
166 # test data extracted from summary file
168 (103646, 4515), (106678, 4652),
169 (84583, 5963), (68813, 4782),
170 (104854, 4664), (43555, 1632),
171 (54265, 1588), (64363, 2697),],
173 (103647, 4516), (106679, 4653),
174 (84584, 5964), (68814, 4783),
175 (104855, 4665), (43556, 1633),
176 (54266, 1589), (64364, 2698),],]
179 for lane in range(1,9):
180 summary_lane = g.summary[end][lane]
181 self.failUnlessEqual(summary_lane.cluster, clusters[end][lane])
182 self.failUnlessEqual(summary_lane.lane, lane)
184 xml = g.get_elements()
185 # just make sure that element tree can serialize the tree
186 xml_str = ElementTree.tostring(xml)
187 g2 = gerald.Gerald(xml=xml)
189 # do it all again after extracting from the xml file
190 self.failUnlessEqual(g.software, g2.software)
191 self.failUnlessEqual(g.version, g2.version)
192 self.failUnlessEqual(g.date, g2.date)
193 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
194 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
196 # test lane specific parameters from gerald config file
199 g2_lane = g2.lanes[i]
200 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
201 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
202 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
203 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
205 # test (some) summary elements
208 g_summary = g.summary[end][i]
209 g2_summary = g2.summary[end][i]
210 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
211 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
213 g_eland = g.eland_results
214 g2_eland = g2.eland_results
216 g_results = g_eland[key]
217 g2_results = g2_eland[key]
218 self.failUnlessEqual(g_results.reads,
220 self.failUnlessEqual(len(g_results.mapped_reads),
221 len(g2_results.mapped_reads))
222 for k in g_results.mapped_reads.keys():
223 self.failUnlessEqual(g_results.mapped_reads[k],
224 g2_results.mapped_reads[k])
226 self.failUnlessEqual(len(g_results.match_codes),
227 len(g2_results.match_codes))
228 for k in g_results.match_codes.keys():
229 self.failUnlessEqual(g_results.match_codes[k],
230 g2_results.match_codes[k])
233 def test_eland(self):
234 hg_map = {'Lambda.fa': 'Lambda.fa'}
235 for i in range(1,22):
236 short_name = 'chr%d.fa' % (i,)
237 long_name = 'hg18/chr%d.fa' % (i,)
238 hg_map[short_name] = long_name
240 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
241 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
242 eland = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
245 for key in eland.find_keys(SampleKey(read=1)):
247 self.failUnlessEqual(lane.reads, 6)
248 self.failUnlessEqual(lane.sample_name, "s")
249 self.failUnlessEqual(lane.lane_id, key.lane)
250 self.failUnlessEqual(len(lane.mapped_reads), 17)
251 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
252 self.failUnlessEqual(lane.match_codes['U0'], 3)
253 self.failUnlessEqual(lane.match_codes['R0'], 2)
254 self.failUnlessEqual(lane.match_codes['U1'], 1)
255 self.failUnlessEqual(lane.match_codes['R1'], 9)
256 self.failUnlessEqual(lane.match_codes['U2'], 0)
257 self.failUnlessEqual(lane.match_codes['R2'], 12)
258 self.failUnlessEqual(lane.match_codes['NM'], 1)
259 self.failUnlessEqual(lane.match_codes['QC'], 0)
262 for key in eland.find_keys(SampleKey(read=2)):
264 self.failUnlessEqual(lane.reads, 7)
265 self.failUnlessEqual(lane.sample_name, "s")
266 self.failUnlessEqual(lane.lane_id, key.lane)
267 self.failUnlessEqual(len(lane.mapped_reads), 17)
268 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
269 self.failUnlessEqual(lane.match_codes['U0'], 3)
270 self.failUnlessEqual(lane.match_codes['R0'], 2)
271 self.failUnlessEqual(lane.match_codes['U1'], 1)
272 self.failUnlessEqual(lane.match_codes['R1'], 9)
273 self.failUnlessEqual(lane.match_codes['U2'], 0)
274 self.failUnlessEqual(lane.match_codes['R2'], 12)
275 self.failUnlessEqual(lane.match_codes['NM'], 1)
276 self.failUnlessEqual(lane.match_codes['QC'], 1)
278 xml = eland.get_elements()
279 # just make sure that element tree can serialize the tree
280 xml_str = ElementTree.tostring(xml)
281 e2 = gerald.ELAND(xml=xml)
286 self.failUnlessEqual(l1.reads, l2.reads)
287 self.failUnlessEqual(l1.sample_name, l2.sample_name)
288 self.failUnlessEqual(l1.lane_id, l2.lane_id)
289 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
290 self.failUnlessEqual(len(l1.mapped_reads), 17)
291 for k in l1.mapped_reads.keys():
292 self.failUnlessEqual(l1.mapped_reads[k],
295 self.failUnlessEqual(len(l1.match_codes), 9)
296 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
297 for k in l1.match_codes.keys():
298 self.failUnlessEqual(l1.match_codes[k],
301 def test_runfolder(self):
302 runs = runfolder.get_runs(self.runfolder_dir)
304 # do we get the flowcell id from the filename?
305 self.failUnlessEqual(len(runs), 1)
306 # firecrest's date depends on filename not the create time.
307 name = 'run_207BTAAXX_2009-02-22.xml'
308 self.failUnlessEqual(runs[0].serialization_filename, name)
310 # do we get the flowcell id from the FlowcellId.xml file
311 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
312 runs = runfolder.get_runs(self.runfolder_dir)
313 self.failUnlessEqual(len(runs), 1)
314 name = 'run_207BTAAXY_2009-02-22.xml'
315 self.failUnlessEqual(runs[0].serialization_filename, name)
318 xml = r1.get_elements()
319 xml_str = ElementTree.tostring(xml)
321 r2 = runfolder.PipelineRun(xml=xml)
322 self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
323 self.failIfEqual(r2.image_analysis, None)
324 self.failIfEqual(r2.bustard, None)
325 self.failIfEqual(r2.gerald, None)
329 from unittest import TestSuite, defaultTestLoader
331 suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
335 if __name__ == "__main__":
336 from unittest import main
337 main(defaultTest="suite")