3 from datetime import datetime, date
7 from unittest2 import TestCase
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.runfolder import ElementTree
16 from htsworkflow.pipelines.test.simulate_runfolder import *
19 def make_runfolder(obj=None):
21 Make a fake runfolder, attach all the directories to obj if defined
23 # make a fake runfolder directory
24 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
26 runfolder_dir = os.path.join(temp_dir,
27 '090313_HWI-EAS229_0101_3021JAAXX')
28 os.mkdir(runfolder_dir)
30 data_dir = os.path.join(runfolder_dir, 'Data')
33 ipar_dir = make_ipar_dir(data_dir, '1.30')
35 bustard_dir = os.path.join(ipar_dir,
36 'Bustard1.3.2_15-03-2008_diane')
38 make_phasing_params(bustard_dir)
39 make_bustard_config132(bustard_dir)
41 gerald_dir = os.path.join(bustard_dir,
42 'GERALD_15-03-2008_diane')
44 make_gerald_config_100(gerald_dir)
45 make_summary_ipar130_htm(gerald_dir)
46 make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
47 make_scarf(gerald_dir, lane_list=[7,])
48 make_fastq(gerald_dir, lane_list=[8,])
51 obj.temp_dir = temp_dir
52 obj.runfolder_dir = runfolder_dir
53 obj.data_dir = data_dir
54 obj.image_analysis_dir = ipar_dir
55 obj.bustard_dir = bustard_dir
56 obj.gerald_dir = gerald_dir
59 class RunfolderTests(TestCase):
61 Test components of the runfolder processing code
62 which includes firecrest, bustard, and gerald
65 # attaches all the directories to the object passed in
69 shutil.rmtree(self.temp_dir)
73 Construct a firecrest object
75 i = ipar.ipar(self.image_analysis_dir)
76 self.failUnlessEqual(i.software, 'IPAR')
77 self.failUnlessEqual(i.version, '2.01.192.0')
78 self.failUnlessEqual(i.start, 1)
79 self.failUnlessEqual(i.stop, 37)
81 xml = i.get_elements()
82 # just make sure that element tree can serialize the tree
83 xml_str = ElementTree.tostring(xml)
85 i2 = ipar.IPAR(xml=xml)
86 self.failUnlessEqual(i.software, i2.software)
87 self.failUnlessEqual(i.version, i2.version)
88 self.failUnlessEqual(i.start, i2.start)
89 self.failUnlessEqual(i.stop, i2.stop)
90 self.failUnlessEqual(i.date, i2.date)
91 self.failUnlessEqual(i.file_list(), i2.file_list())
93 def test_bustard(self):
95 construct a bustard object
97 def check_crosstalk(crosstalk):
98 self.failUnlessAlmostEqual(crosstalk.base['A'][0], 1.27)
99 self.failUnlessAlmostEqual(crosstalk.base['A'][1], 0.20999999999999)
100 self.failUnlessAlmostEqual(crosstalk.base['A'][2], -0.02)
101 self.failUnlessAlmostEqual(crosstalk.base['A'][3], -0.03)
103 self.failUnlessAlmostEqual(crosstalk.base['C'][0], 0.57)
104 self.failUnlessAlmostEqual(crosstalk.base['C'][1], 0.58)
105 self.failUnlessAlmostEqual(crosstalk.base['C'][2], -0.01)
106 self.failUnlessAlmostEqual(crosstalk.base['C'][3], -0.01)
108 self.failUnlessAlmostEqual(crosstalk.base['T'][0], -0.02)
109 self.failUnlessAlmostEqual(crosstalk.base['T'][1], -0.02)
110 self.failUnlessAlmostEqual(crosstalk.base['T'][2], 0.80)
111 self.failUnlessAlmostEqual(crosstalk.base['T'][3], 1.07)
113 self.failUnlessAlmostEqual(crosstalk.base['G'][0], -0.03)
114 self.failUnlessAlmostEqual(crosstalk.base['G'][1], -0.04)
115 self.failUnlessAlmostEqual(crosstalk.base['G'][2], 1.51)
116 self.failUnlessAlmostEqual(crosstalk.base['G'][3], -0.02)
118 b = bustard.bustard(self.bustard_dir)
119 self.failUnlessEqual(b.software, 'Bustard')
120 self.failUnlessEqual(b.version, '1.3.2')
121 self.failUnlessEqual(b.date, date(2008,3,15))
122 self.failUnlessEqual(b.user, 'diane')
123 self.failUnlessEqual(len(b.phasing), 8)
124 self.failUnlessAlmostEqual(b.phasing[8].phasing, 0.0099)
125 self.failUnlessEqual(b.crosstalk.base.keys(), ['A','C','T','G'])
126 check_crosstalk(b.crosstalk)
128 xml = b.get_elements()
129 b2 = bustard.Bustard(xml=xml)
130 self.failUnlessEqual(b.software, b2.software)
131 self.failUnlessEqual(b.version, b2.version)
132 self.failUnlessEqual(b.date, b2.date )
133 self.failUnlessEqual(b.user, b2.user)
134 self.failUnlessEqual(len(b.phasing), len(b2.phasing))
135 for key in b.phasing.keys():
136 self.failUnlessEqual(b.phasing[key].lane,
137 b2.phasing[key].lane)
138 self.failUnlessEqual(b.phasing[key].phasing,
139 b2.phasing[key].phasing)
140 self.failUnlessEqual(b.phasing[key].prephasing,
141 b2.phasing[key].prephasing)
142 check_crosstalk(b2.crosstalk)
144 def test_gerald(self):
145 # need to update gerald and make tests for it
146 g = gerald.gerald(self.gerald_dir)
148 self.failUnlessEqual(g.software, 'GERALD')
149 self.failUnlessEqual(g.version, '1.171')
150 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
151 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
152 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
155 # list of genomes, matches what was defined up in
156 # make_gerald_config.
157 # the first None is to offset the genomes list to be 1..9
158 # instead of pythons default 0..8
163 '/g/arabidopsis01222004',
169 # test lane specific parameters from gerald config file
171 cur_lane = g.lanes[i]
172 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
173 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
174 self.failUnlessEqual(cur_lane.read_length, '37')
175 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
177 # I want to be able to use a simple iterator
178 for l in g.lanes.values():
179 self.failUnlessEqual(l.analysis, 'eland_extended')
180 self.failUnlessEqual(l.read_length, '37')
181 self.failUnlessEqual(l.use_bases, 'Y'*37)
183 # test data extracted from summary file
185 (126910, 4300), (165739, 6792),
186 (196565, 8216), (153897, 8501),
187 (135536, 3908), (154083, 9315),
188 (159991, 9292), (198479, 17671),]
190 self.failUnlessEqual(len(g.summary), 1)
192 summary_lane = g.summary[0][i]
193 self.failUnlessEqual(summary_lane.cluster, clusters[i])
194 self.failUnlessEqual(summary_lane.lane, i)
196 xml = g.get_elements()
197 # just make sure that element tree can serialize the tree
198 xml_str = ElementTree.tostring(xml)
199 g2 = gerald.Gerald(xml=xml)
201 # do it all again after extracting from the xml file
202 self.failUnlessEqual(g.software, g2.software)
203 self.failUnlessEqual(g.version, g2.version)
204 self.failUnlessEqual(g.date, g2.date)
205 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
206 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
208 # test lane specific parameters from gerald config file
211 g2_lane = g2.lanes[i]
212 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
213 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
214 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
215 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
217 # test (some) summary elements
218 self.failUnlessEqual(len(g.summary), 1)
220 g_summary = g.summary[0][i]
221 g2_summary = g2.summary[0][i]
222 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
223 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
225 g_eland = g.eland_results
226 g2_eland = g2.eland_results
228 g_results = g_eland[key]
229 g2_results = g2_eland[key]
230 self.failUnlessEqual(g_results.reads,
232 if isinstance(g_results, eland.ElandLane):
233 self.failUnlessEqual(len(g_results.mapped_reads),
234 len(g2_results.mapped_reads))
235 for k in g_results.mapped_reads.keys():
236 self.failUnlessEqual(g_results.mapped_reads[k],
237 g2_results.mapped_reads[k])
239 self.failUnlessEqual(len(g_results.match_codes),
240 len(g2_results.match_codes))
241 for k in g_results.match_codes.keys():
242 self.failUnlessEqual(g_results.match_codes[k],
243 g2_results.match_codes[k])
246 def test_eland(self):
247 hg_map = {'Lambda.fa': 'Lambda.fa'}
248 for i in range(1,22):
249 short_name = 'chr%d.fa' % (i,)
250 long_name = 'hg18/chr%d.fa' % (i,)
251 hg_map[short_name] = long_name
253 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
254 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
255 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
257 # I added sequence lanes to the last 2 lanes of this test case
258 for key in eland_container:
259 lane = eland_container[key]
260 if key.lane in [1,2,3,4,5,6]:
261 self.failUnlessEqual(lane.reads, 6)
262 self.failUnlessEqual(lane.sample_name, "s")
263 self.failUnlessEqual(lane.lane_id, key.lane)
264 self.failUnlessEqual(len(lane.mapped_reads), 17)
265 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
266 self.failUnlessEqual(lane.match_codes['U0'], 3)
267 self.failUnlessEqual(lane.match_codes['R0'], 2)
268 self.failUnlessEqual(lane.match_codes['U1'], 1)
269 self.failUnlessEqual(lane.match_codes['R1'], 9)
270 self.failUnlessEqual(lane.match_codes['U2'], 0)
271 self.failUnlessEqual(lane.match_codes['R2'], 12)
272 self.failUnlessEqual(lane.match_codes['NM'], 1)
273 self.failUnlessEqual(lane.match_codes['QC'], 0)
275 self.failUnlessEqual(lane.reads, 5)
276 self.failUnlessEqual(lane.sample_name, 's')
277 self.failUnlessEqual(lane.lane_id, 7)
278 self.failUnlessEqual(lane.sequence_type,
279 eland.SequenceLane.SCARF_TYPE)
281 self.failUnlessEqual(lane.reads, 3)
282 self.failUnlessEqual(lane.sample_name, 's')
283 self.failUnlessEqual(lane.lane_id, 8)
284 self.failUnlessEqual(lane.sequence_type,
285 eland.SequenceLane.FASTQ_TYPE)
287 xml = eland_container.get_elements()
288 # just make sure that element tree can serialize the tree
289 xml_str = ElementTree.tostring(xml)
290 e2 = gerald.ELAND(xml=xml)
292 for key in eland_container:
293 l1 = eland_container[key]
295 self.failUnlessEqual(l1.reads, l2.reads)
296 self.failUnlessEqual(l1.sample_name, l2.sample_name)
297 self.failUnlessEqual(l1.lane_id, l2.lane_id)
298 if isinstance(l1, eland.ElandLane):
299 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
300 self.failUnlessEqual(len(l1.mapped_reads), 17)
301 for k in l1.mapped_reads.keys():
302 self.failUnlessEqual(l1.mapped_reads[k],
305 self.failUnlessEqual(len(l1.match_codes), 9)
306 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
307 for k in l1.match_codes.keys():
308 self.failUnlessEqual(l1.match_codes[k],
310 elif isinstance(l1, eland.SequenceLane):
311 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
313 def test_runfolder(self):
314 runs = runfolder.get_runs(self.runfolder_dir)
316 # do we get the flowcell id from the filename?
317 self.failUnlessEqual(len(runs), 1)
318 name = 'run_3021JAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
319 self.failUnlessEqual(runs[0].name, name)
321 # do we get the flowcell id from the FlowcellId.xml file
322 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
323 runs = runfolder.get_runs(self.runfolder_dir)
324 self.failUnlessEqual(len(runs), 1)
325 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
326 self.failUnlessEqual(runs[0].name, name)
329 xml = r1.get_elements()
330 xml_str = ElementTree.tostring(xml)
332 r2 = runfolder.PipelineRun(xml=xml)
333 self.failUnlessEqual(r1.name, r2.name)
334 self.failIfEqual(r2.image_analysis, None)
335 self.failIfEqual(r2.bustard, None)
336 self.failIfEqual(r2.gerald, None)
340 from unittest2 import TestSuite, defaultTestLoader
342 suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
346 if __name__ == "__main__":
347 from unittest2 import main
348 main(defaultTest="suite")