2 from __future__ import absolute_import
4 from datetime import datetime, date
8 from unittest import TestCase
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines import ipar
12 from htsworkflow.pipelines import bustard
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines import ElementTree
17 from .simulate_runfolder import *
20 def make_runfolder(obj=None):
22 Make a fake runfolder, attach all the directories to obj if defined
24 # make a fake runfolder directory
25 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
27 runfolder_dir = os.path.join(temp_dir,
28 '090313_HWI-EAS229_0101_3021JAAXX')
29 os.mkdir(runfolder_dir)
31 data_dir = os.path.join(runfolder_dir, 'Data')
34 ipar_dir = make_ipar_dir(data_dir, '1.30')
36 bustard_dir = os.path.join(ipar_dir,
37 'Bustard1.3.2_15-03-2008_diane')
39 make_phasing_params(bustard_dir)
40 make_bustard_config132(bustard_dir)
42 gerald_dir = os.path.join(bustard_dir,
43 'GERALD_15-03-2008_diane')
45 make_gerald_config_100(gerald_dir)
46 make_summary_ipar130_htm(gerald_dir)
47 make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
48 make_scarf(gerald_dir, lane_list=[7,])
49 make_fastq(gerald_dir, lane_list=[8,])
52 obj.temp_dir = temp_dir
53 obj.runfolder_dir = runfolder_dir
54 obj.data_dir = data_dir
55 obj.image_analysis_dir = ipar_dir
56 obj.bustard_dir = bustard_dir
57 obj.gerald_dir = gerald_dir
60 class RunfolderTests(TestCase):
62 Test components of the runfolder processing code
63 which includes firecrest, bustard, and gerald
66 # attaches all the directories to the object passed in
70 shutil.rmtree(self.temp_dir)
74 Construct a firecrest object
76 i = ipar.ipar(self.image_analysis_dir)
77 self.failUnlessEqual(i.software, 'IPAR')
78 self.failUnlessEqual(i.version, '2.01.192.0')
79 self.failUnlessEqual(i.start, 1)
80 self.failUnlessEqual(i.stop, 37)
82 xml = i.get_elements()
83 # just make sure that element tree can serialize the tree
84 xml_str = ElementTree.tostring(xml)
86 i2 = ipar.IPAR(xml=xml)
87 self.failUnlessEqual(i.software, i2.software)
88 self.failUnlessEqual(i.version, i2.version)
89 self.failUnlessEqual(i.start, i2.start)
90 self.failUnlessEqual(i.stop, i2.stop)
91 self.failUnlessEqual(i.date, i2.date)
92 self.failUnlessEqual(i.file_list(), i2.file_list())
94 def test_bustard(self):
96 construct a bustard object
98 def check_crosstalk(crosstalk):
99 self.failUnlessAlmostEqual(crosstalk.base['A'][0], 1.27)
100 self.failUnlessAlmostEqual(crosstalk.base['A'][1], 0.20999999999999)
101 self.failUnlessAlmostEqual(crosstalk.base['A'][2], -0.02)
102 self.failUnlessAlmostEqual(crosstalk.base['A'][3], -0.03)
104 self.failUnlessAlmostEqual(crosstalk.base['C'][0], 0.57)
105 self.failUnlessAlmostEqual(crosstalk.base['C'][1], 0.58)
106 self.failUnlessAlmostEqual(crosstalk.base['C'][2], -0.01)
107 self.failUnlessAlmostEqual(crosstalk.base['C'][3], -0.01)
109 self.failUnlessAlmostEqual(crosstalk.base['T'][0], -0.02)
110 self.failUnlessAlmostEqual(crosstalk.base['T'][1], -0.02)
111 self.failUnlessAlmostEqual(crosstalk.base['T'][2], 0.80)
112 self.failUnlessAlmostEqual(crosstalk.base['T'][3], 1.07)
114 self.failUnlessAlmostEqual(crosstalk.base['G'][0], -0.03)
115 self.failUnlessAlmostEqual(crosstalk.base['G'][1], -0.04)
116 self.failUnlessAlmostEqual(crosstalk.base['G'][2], 1.51)
117 self.failUnlessAlmostEqual(crosstalk.base['G'][3], -0.02)
119 b = bustard.bustard(self.bustard_dir)
120 self.failUnlessEqual(b.software, 'Bustard')
121 self.failUnlessEqual(b.version, '1.3.2')
122 self.failUnlessEqual(b.date, date(2008,3,15))
123 self.failUnlessEqual(b.user, 'diane')
124 self.failUnlessEqual(len(b.phasing), 8)
125 self.failUnlessAlmostEqual(b.phasing[8].phasing, 0.0099)
126 self.assertEqual(set(b.crosstalk.base), set(['A','C','T','G']))
127 check_crosstalk(b.crosstalk)
129 xml = b.get_elements()
130 b2 = bustard.Bustard(xml=xml)
131 self.failUnlessEqual(b.software, b2.software)
132 self.failUnlessEqual(b.version, b2.version)
133 self.failUnlessEqual(b.date, b2.date )
134 self.failUnlessEqual(b.user, b2.user)
135 self.failUnlessEqual(len(b.phasing), len(b2.phasing))
136 for key in b.phasing.keys():
137 self.failUnlessEqual(b.phasing[key].lane,
138 b2.phasing[key].lane)
139 self.failUnlessEqual(b.phasing[key].phasing,
140 b2.phasing[key].phasing)
141 self.failUnlessEqual(b.phasing[key].prephasing,
142 b2.phasing[key].prephasing)
143 check_crosstalk(b2.crosstalk)
145 def test_gerald(self):
146 # need to update gerald and make tests for it
147 g = gerald.gerald(self.gerald_dir)
149 self.failUnlessEqual(g.software, 'GERALD')
150 self.failUnlessEqual(g.version, '1.171')
151 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
152 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
153 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
156 # list of genomes, matches what was defined up in
157 # make_gerald_config.
158 # the first None is to offset the genomes list to be 1..9
159 # instead of pythons default 0..8
164 '/g/arabidopsis01222004',
170 # test lane specific parameters from gerald config file
172 cur_lane = g.lanes[i]
173 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
174 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
175 self.failUnlessEqual(cur_lane.read_length, '37')
176 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
178 # I want to be able to use a simple iterator
179 for l in g.lanes.values():
180 self.failUnlessEqual(l.analysis, 'eland_extended')
181 self.failUnlessEqual(l.read_length, '37')
182 self.failUnlessEqual(l.use_bases, 'Y'*37)
184 # test data extracted from summary file
186 (126910, 4300), (165739, 6792),
187 (196565, 8216), (153897, 8501),
188 (135536, 3908), (154083, 9315),
189 (159991, 9292), (198479, 17671),]
191 self.failUnlessEqual(len(g.summary), 1)
193 summary_lane = g.summary[0][i]
194 self.failUnlessEqual(summary_lane.cluster, clusters[i])
195 self.failUnlessEqual(summary_lane.lane, i)
197 xml = g.get_elements()
198 # just make sure that element tree can serialize the tree
199 xml_str = ElementTree.tostring(xml)
200 g2 = gerald.Gerald(xml=xml)
202 # do it all again after extracting from the xml file
203 self.failUnlessEqual(g.software, g2.software)
204 self.failUnlessEqual(g.version, g2.version)
205 self.failUnlessEqual(g.date, g2.date)
206 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
207 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
209 # test lane specific parameters from gerald config file
212 g2_lane = g2.lanes[i]
213 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
214 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
215 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
216 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
218 # test (some) summary elements
219 self.failUnlessEqual(len(g.summary), 1)
221 g_summary = g.summary[0][i]
222 g2_summary = g2.summary[0][i]
223 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
224 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
226 g_eland = g.eland_results
227 g2_eland = g2.eland_results
229 g_results = g_eland[key]
230 g2_results = g2_eland[key]
231 self.failUnlessEqual(g_results.reads,
233 if isinstance(g_results, eland.ElandLane):
234 self.failUnlessEqual(len(g_results.mapped_reads),
235 len(g2_results.mapped_reads))
236 for k in g_results.mapped_reads.keys():
237 self.failUnlessEqual(g_results.mapped_reads[k],
238 g2_results.mapped_reads[k])
240 self.failUnlessEqual(len(g_results.match_codes),
241 len(g2_results.match_codes))
242 for k in g_results.match_codes.keys():
243 self.failUnlessEqual(g_results.match_codes[k],
244 g2_results.match_codes[k])
247 def test_eland(self):
248 hg_map = {'Lambda.fa': 'Lambda.fa'}
249 for i in range(1,22):
250 short_name = 'chr%d.fa' % (i,)
251 long_name = 'hg18/chr%d.fa' % (i,)
252 hg_map[short_name] = long_name
254 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
255 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
256 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
258 # I added sequence lanes to the last 2 lanes of this test case
259 for key in eland_container:
260 lane = eland_container[key]
261 if key.lane in [1,2,3,4,5,6]:
262 self.failUnlessEqual(lane.reads, 6)
263 self.failUnlessEqual(lane.sample_name, "s")
264 self.failUnlessEqual(lane.lane_id, key.lane)
265 self.failUnlessEqual(len(lane.mapped_reads), 17)
266 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
267 self.failUnlessEqual(lane.match_codes['U0'], 3)
268 self.failUnlessEqual(lane.match_codes['R0'], 2)
269 self.failUnlessEqual(lane.match_codes['U1'], 1)
270 self.failUnlessEqual(lane.match_codes['R1'], 9)
271 self.failUnlessEqual(lane.match_codes['U2'], 0)
272 self.failUnlessEqual(lane.match_codes['R2'], 12)
273 self.failUnlessEqual(lane.match_codes['NM'], 1)
274 self.failUnlessEqual(lane.match_codes['QC'], 0)
276 self.failUnlessEqual(lane.reads, 5)
277 self.failUnlessEqual(lane.sample_name, 's')
278 self.failUnlessEqual(lane.lane_id, 7)
279 self.failUnlessEqual(lane.sequence_type,
280 eland.SequenceLane.SCARF_TYPE)
282 self.failUnlessEqual(lane.reads, 3)
283 self.failUnlessEqual(lane.sample_name, 's')
284 self.failUnlessEqual(lane.lane_id, 8)
285 self.failUnlessEqual(lane.sequence_type,
286 eland.SequenceLane.FASTQ_TYPE)
288 xml = eland_container.get_elements()
289 # just make sure that element tree can serialize the tree
290 xml_str = ElementTree.tostring(xml)
291 e2 = gerald.ELAND(xml=xml)
293 for key in eland_container:
294 l1 = eland_container[key]
296 self.failUnlessEqual(l1.reads, l2.reads)
297 self.failUnlessEqual(l1.sample_name, l2.sample_name)
298 self.failUnlessEqual(l1.lane_id, l2.lane_id)
299 if isinstance(l1, eland.ElandLane):
300 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
301 self.failUnlessEqual(len(l1.mapped_reads), 17)
302 for k in l1.mapped_reads.keys():
303 self.failUnlessEqual(l1.mapped_reads[k],
306 self.failUnlessEqual(len(l1.match_codes), 9)
307 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
308 for k in l1.match_codes.keys():
309 self.failUnlessEqual(l1.match_codes[k],
311 elif isinstance(l1, eland.SequenceLane):
312 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
314 def test_runfolder(self):
315 runs = runfolder.get_runs(self.runfolder_dir)
317 # do we get the flowcell id from the filename?
318 self.failUnlessEqual(len(runs), 1)
319 name = 'run_3021JAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
320 self.failUnlessEqual(runs[0].serialization_filename, name)
322 # do we get the flowcell id from the FlowcellId.xml file
323 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
324 runs = runfolder.get_runs(self.runfolder_dir)
325 self.failUnlessEqual(len(runs), 1)
326 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
327 self.failUnlessEqual(runs[0].serialization_filename, name)
330 xml = r1.get_elements()
331 xml_str = ElementTree.tostring(xml)
333 r2 = runfolder.PipelineRun(xml=xml)
334 self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
335 self.failIfEqual(r2.image_analysis, None)
336 self.failIfEqual(r2.bustard, None)
337 self.failIfEqual(r2.gerald, None)
341 from unittest import TestSuite, defaultTestLoader
343 suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
347 if __name__ == "__main__":
348 from unittest import main
349 main(defaultTest="suite")