3 from datetime import datetime, date
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.runfolder import ElementTree
16 from htsworkflow.pipelines.test.simulate_runfolder import *
19 def make_runfolder(obj=None):
21 Make a fake runfolder, attach all the directories to obj if defined
23 # make a fake runfolder directory
24 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
26 runfolder_dir = os.path.join(temp_dir,
27 '090313_HWI-EAS229_0101_3021JAAXX')
28 os.mkdir(runfolder_dir)
30 data_dir = os.path.join(runfolder_dir, 'Data')
33 ipar_dir = make_ipar_dir(data_dir, '1.30')
35 bustard_dir = os.path.join(ipar_dir,
36 'Bustard1.3.2_15-03-2008_diane')
38 make_phasing_params(bustard_dir)
39 make_bustard_config132(bustard_dir)
41 gerald_dir = os.path.join(bustard_dir,
42 'GERALD_15-03-2008_diane')
44 make_gerald_config_100(gerald_dir)
45 make_summary_ipar130_htm(gerald_dir)
46 make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
47 make_scarf(gerald_dir, lane_list=[7,])
48 make_fastq(gerald_dir, lane_list=[8,])
51 obj.temp_dir = temp_dir
52 obj.runfolder_dir = runfolder_dir
53 obj.data_dir = data_dir
54 obj.image_analysis_dir = ipar_dir
55 obj.bustard_dir = bustard_dir
56 obj.gerald_dir = gerald_dir
59 class RunfolderTests(unittest.TestCase):
61 Test components of the runfolder processing code
62 which includes firecrest, bustard, and gerald
65 # attaches all the directories to the object passed in
69 shutil.rmtree(self.temp_dir)
73 Construct a firecrest object
75 i = ipar.ipar(self.image_analysis_dir)
76 self.failUnlessEqual(i.version, '2.01.192.0')
77 self.failUnlessEqual(i.start, 1)
78 self.failUnlessEqual(i.stop, 37)
80 xml = i.get_elements()
81 # just make sure that element tree can serialize the tree
82 xml_str = ElementTree.tostring(xml)
84 i2 = ipar.IPAR(xml=xml)
85 self.failUnlessEqual(i.version, i2.version)
86 self.failUnlessEqual(i.start, i2.start)
87 self.failUnlessEqual(i.stop, i2.stop)
88 self.failUnlessEqual(i.date, i2.date)
89 self.failUnlessEqual(i.file_list(), i2.file_list())
91 def test_bustard(self):
93 construct a bustard object
95 def check_crosstalk(crosstalk):
96 self.failUnlessAlmostEqual(crosstalk.base['A'][0], 1.27)
97 self.failUnlessAlmostEqual(crosstalk.base['A'][1], 0.20999999999999)
98 self.failUnlessAlmostEqual(crosstalk.base['A'][2], -0.02)
99 self.failUnlessAlmostEqual(crosstalk.base['A'][3], -0.03)
101 self.failUnlessAlmostEqual(crosstalk.base['C'][0], 0.57)
102 self.failUnlessAlmostEqual(crosstalk.base['C'][1], 0.58)
103 self.failUnlessAlmostEqual(crosstalk.base['C'][2], -0.01)
104 self.failUnlessAlmostEqual(crosstalk.base['C'][3], -0.01)
106 self.failUnlessAlmostEqual(crosstalk.base['T'][0], -0.02)
107 self.failUnlessAlmostEqual(crosstalk.base['T'][1], -0.02)
108 self.failUnlessAlmostEqual(crosstalk.base['T'][2], 0.80)
109 self.failUnlessAlmostEqual(crosstalk.base['T'][3], 1.07)
111 self.failUnlessAlmostEqual(crosstalk.base['G'][0], -0.03)
112 self.failUnlessAlmostEqual(crosstalk.base['G'][1], -0.04)
113 self.failUnlessAlmostEqual(crosstalk.base['G'][2], 1.51)
114 self.failUnlessAlmostEqual(crosstalk.base['G'][3], -0.02)
116 b = bustard.bustard(self.bustard_dir)
117 self.failUnlessEqual(b.version, '1.3.2')
118 self.failUnlessEqual(b.date, date(2008,3,15))
119 self.failUnlessEqual(b.user, 'diane')
120 self.failUnlessEqual(len(b.phasing), 8)
121 self.failUnlessAlmostEqual(b.phasing[8].phasing, 0.0099)
122 self.failUnlessEqual(b.crosstalk.base.keys(), ['A','C','T','G'])
123 check_crosstalk(b.crosstalk)
125 xml = b.get_elements()
126 print ElementTree.dump(xml)
127 b2 = bustard.Bustard(xml=xml)
128 self.failUnlessEqual(b.version, b2.version)
129 self.failUnlessEqual(b.date, b2.date )
130 self.failUnlessEqual(b.user, b2.user)
131 self.failUnlessEqual(len(b.phasing), len(b2.phasing))
132 for key in b.phasing.keys():
133 self.failUnlessEqual(b.phasing[key].lane,
134 b2.phasing[key].lane)
135 self.failUnlessEqual(b.phasing[key].phasing,
136 b2.phasing[key].phasing)
137 self.failUnlessEqual(b.phasing[key].prephasing,
138 b2.phasing[key].prephasing)
139 check_crosstalk(b2.crosstalk)
141 def test_gerald(self):
142 # need to update gerald and make tests for it
143 g = gerald.gerald(self.gerald_dir)
145 self.failUnlessEqual(g.version,
146 '@(#) Id: GERALD.pl,v 1.171 2008/05/19 17:36:14 mzerara Exp')
147 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
148 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
149 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
152 # list of genomes, matches what was defined up in
153 # make_gerald_config.
154 # the first None is to offset the genomes list to be 1..9
155 # instead of pythons default 0..8
160 '/g/arabidopsis01222004',
166 # test lane specific parameters from gerald config file
168 cur_lane = g.lanes[i]
169 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
170 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
171 self.failUnlessEqual(cur_lane.read_length, '37')
172 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
174 # I want to be able to use a simple iterator
175 for l in g.lanes.values():
176 self.failUnlessEqual(l.analysis, 'eland_extended')
177 self.failUnlessEqual(l.read_length, '37')
178 self.failUnlessEqual(l.use_bases, 'Y'*37)
180 # test data extracted from summary file
182 (126910, 4300), (165739, 6792),
183 (196565, 8216), (153897, 8501),
184 (135536, 3908), (154083, 9315),
185 (159991, 9292), (198479, 17671),]
187 self.failUnlessEqual(len(g.summary), 1)
189 summary_lane = g.summary[0][i]
190 self.failUnlessEqual(summary_lane.cluster, clusters[i])
191 self.failUnlessEqual(summary_lane.lane, i)
193 xml = g.get_elements()
194 # just make sure that element tree can serialize the tree
195 xml_str = ElementTree.tostring(xml)
196 g2 = gerald.Gerald(xml=xml)
198 # do it all again after extracting from the xml file
199 self.failUnlessEqual(g.version, g2.version)
200 self.failUnlessEqual(g.date, g2.date)
201 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
202 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
204 # test lane specific parameters from gerald config file
207 g2_lane = g2.lanes[i]
208 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
209 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
210 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
211 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
213 # test (some) summary elements
214 self.failUnlessEqual(len(g.summary), 1)
216 g_summary = g.summary[0][i]
217 g2_summary = g2.summary[0][i]
218 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
219 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
221 g_eland = g.eland_results
222 g2_eland = g2.eland_results
223 for lane in g_eland.results[0].keys():
224 g_results = g_eland.results[0][lane]
225 g2_results = g2_eland.results[0][lane]
226 self.failUnlessEqual(g_results.reads,
228 if isinstance(g_results, eland.ElandLane):
229 self.failUnlessEqual(len(g_results.mapped_reads),
230 len(g2_results.mapped_reads))
231 for k in g_results.mapped_reads.keys():
232 self.failUnlessEqual(g_results.mapped_reads[k],
233 g2_results.mapped_reads[k])
235 self.failUnlessEqual(len(g_results.match_codes),
236 len(g2_results.match_codes))
237 for k in g_results.match_codes.keys():
238 self.failUnlessEqual(g_results.match_codes[k],
239 g2_results.match_codes[k])
242 def test_eland(self):
243 hg_map = {'Lambda.fa': 'Lambda.fa'}
244 for i in range(1,22):
245 short_name = 'chr%d.fa' % (i,)
246 long_name = 'hg18/chr%d.fa' % (i,)
247 hg_map[short_name] = long_name
249 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
250 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
251 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
253 # I added sequence lanes to the last 2 lanes of this test case
255 lane = eland_container.results[0][i]
256 self.failUnlessEqual(lane.reads, 6)
257 self.failUnlessEqual(lane.sample_name, "s")
258 self.failUnlessEqual(lane.lane_id, i)
259 self.failUnlessEqual(len(lane.mapped_reads), 17)
260 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
261 self.failUnlessEqual(lane.match_codes['U0'], 3)
262 self.failUnlessEqual(lane.match_codes['R0'], 2)
263 self.failUnlessEqual(lane.match_codes['U1'], 1)
264 self.failUnlessEqual(lane.match_codes['R1'], 9)
265 self.failUnlessEqual(lane.match_codes['U2'], 0)
266 self.failUnlessEqual(lane.match_codes['R2'], 12)
267 self.failUnlessEqual(lane.match_codes['NM'], 1)
268 self.failUnlessEqual(lane.match_codes['QC'], 0)
271 lane = eland_container.results[0][7]
272 self.failUnlessEqual(lane.reads, 5)
273 self.failUnlessEqual(lane.sample_name, 's')
274 self.failUnlessEqual(lane.lane_id, 7)
275 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
278 lane = eland_container.results[0][8]
279 self.failUnlessEqual(lane.reads, 3)
280 self.failUnlessEqual(lane.sample_name, 's')
281 self.failUnlessEqual(lane.lane_id, 8)
282 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
284 xml = eland_container.get_elements()
285 # just make sure that element tree can serialize the tree
286 xml_str = ElementTree.tostring(xml)
287 e2 = gerald.ELAND(xml=xml)
290 l1 = eland_container.results[0][i]
291 l2 = e2.results[0][i]
292 self.failUnlessEqual(l1.reads, l2.reads)
293 self.failUnlessEqual(l1.sample_name, l2.sample_name)
294 self.failUnlessEqual(l1.lane_id, l2.lane_id)
295 if isinstance(l1, eland.ElandLane):
296 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
297 self.failUnlessEqual(len(l1.mapped_reads), 17)
298 for k in l1.mapped_reads.keys():
299 self.failUnlessEqual(l1.mapped_reads[k],
302 self.failUnlessEqual(len(l1.match_codes), 9)
303 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
304 for k in l1.match_codes.keys():
305 self.failUnlessEqual(l1.match_codes[k],
307 elif isinstance(l1, eland.SequenceLane):
308 print 'l1', l1.__dict__
309 print 'l2', l2.__dict__
310 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
312 def test_runfolder(self):
313 runs = runfolder.get_runs(self.runfolder_dir)
315 # do we get the flowcell id from the filename?
316 self.failUnlessEqual(len(runs), 1)
317 name = 'run_3021JAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
318 self.failUnlessEqual(runs[0].name, name)
320 # do we get the flowcell id from the FlowcellId.xml file
321 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
322 runs = runfolder.get_runs(self.runfolder_dir)
323 self.failUnlessEqual(len(runs), 1)
324 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
325 self.failUnlessEqual(runs[0].name, name)
328 xml = r1.get_elements()
329 xml_str = ElementTree.tostring(xml)
331 r2 = runfolder.PipelineRun(xml=xml)
332 self.failUnlessEqual(r1.name, r2.name)
333 self.failIfEqual(r2.image_analysis, None)
334 self.failIfEqual(r2.bustard, None)
335 self.failIfEqual(r2.gerald, None)
339 return unittest.makeSuite(RunfolderTests,'test')
341 if __name__ == "__main__":
342 unittest.main(defaultTest="suite")