3 from datetime import datetime, date
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.runfolder import ElementTree
16 from htsworkflow.pipelines.test.simulate_runfolder import *
19 def make_runfolder(obj=None):
21 Make a fake runfolder, attach all the directories to obj if defined
23 # make a fake runfolder directory
24 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
26 runfolder_dir = os.path.join(temp_dir,
27 '090313_HWI-EAS229_0101_3021JAAXX')
28 os.mkdir(runfolder_dir)
30 data_dir = os.path.join(runfolder_dir, 'Data')
33 ipar_dir = make_ipar_dir(data_dir, '1.30')
35 bustard_dir = os.path.join(ipar_dir,
36 'Bustard1.3.2_15-03-2008_diane')
38 make_phasing_params(bustard_dir)
39 make_bustard_config132(bustard_dir)
41 gerald_dir = os.path.join(bustard_dir,
42 'GERALD_15-03-2008_diane')
44 make_gerald_config_100(gerald_dir)
45 make_summary_ipar130_htm(gerald_dir)
46 make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
47 make_scarf(gerald_dir, lane_list=[7,])
48 make_fastq(gerald_dir, lane_list=[8,])
51 obj.temp_dir = temp_dir
52 obj.runfolder_dir = runfolder_dir
53 obj.data_dir = data_dir
54 obj.image_analysis_dir = ipar_dir
55 obj.bustard_dir = bustard_dir
56 obj.gerald_dir = gerald_dir
59 class RunfolderTests(unittest.TestCase):
61 Test components of the runfolder processing code
62 which includes firecrest, bustard, and gerald
65 # attaches all the directories to the object passed in
69 shutil.rmtree(self.temp_dir)
73 Construct a firecrest object
75 i = ipar.ipar(self.image_analysis_dir)
76 self.failUnlessEqual(i.version, '2.01.192.0')
77 self.failUnlessEqual(i.start, 1)
78 self.failUnlessEqual(i.stop, 37)
80 xml = i.get_elements()
81 # just make sure that element tree can serialize the tree
82 xml_str = ElementTree.tostring(xml)
84 i2 = ipar.IPAR(xml=xml)
85 self.failUnlessEqual(i.version, i2.version)
86 self.failUnlessEqual(i.start, i2.start)
87 self.failUnlessEqual(i.stop, i2.stop)
88 self.failUnlessEqual(i.date, i2.date)
89 self.failUnlessEqual(i.file_list(), i2.file_list())
91 def test_bustard(self):
93 construct a bustard object
95 def check_crosstalk(crosstalk):
96 self.failUnlessAlmostEqual(crosstalk.base['A'][0], 1.27)
97 self.failUnlessAlmostEqual(crosstalk.base['A'][1], 0.20999999999999)
98 self.failUnlessAlmostEqual(crosstalk.base['A'][2], -0.02)
99 self.failUnlessAlmostEqual(crosstalk.base['A'][3], -0.03)
101 self.failUnlessAlmostEqual(crosstalk.base['C'][0], 0.57)
102 self.failUnlessAlmostEqual(crosstalk.base['C'][1], 0.58)
103 self.failUnlessAlmostEqual(crosstalk.base['C'][2], -0.01)
104 self.failUnlessAlmostEqual(crosstalk.base['C'][3], -0.01)
106 self.failUnlessAlmostEqual(crosstalk.base['T'][0], -0.02)
107 self.failUnlessAlmostEqual(crosstalk.base['T'][1], -0.02)
108 self.failUnlessAlmostEqual(crosstalk.base['T'][2], 0.80)
109 self.failUnlessAlmostEqual(crosstalk.base['T'][3], 1.07)
111 self.failUnlessAlmostEqual(crosstalk.base['G'][0], -0.03)
112 self.failUnlessAlmostEqual(crosstalk.base['G'][1], -0.04)
113 self.failUnlessAlmostEqual(crosstalk.base['G'][2], 1.51)
114 self.failUnlessAlmostEqual(crosstalk.base['G'][3], -0.02)
116 b = bustard.bustard(self.bustard_dir)
117 self.failUnlessEqual(b.version, '1.3.2')
118 self.failUnlessEqual(b.date, date(2008,3,15))
119 self.failUnlessEqual(b.user, 'diane')
120 self.failUnlessEqual(len(b.phasing), 8)
121 self.failUnlessAlmostEqual(b.phasing[8].phasing, 0.0099)
122 self.failUnlessEqual(b.crosstalk.base.keys(), ['A','C','T','G'])
123 check_crosstalk(b.crosstalk)
125 xml = b.get_elements()
126 b2 = bustard.Bustard(xml=xml)
127 self.failUnlessEqual(b.version, b2.version)
128 self.failUnlessEqual(b.date, b2.date )
129 self.failUnlessEqual(b.user, b2.user)
130 self.failUnlessEqual(len(b.phasing), len(b2.phasing))
131 for key in b.phasing.keys():
132 self.failUnlessEqual(b.phasing[key].lane,
133 b2.phasing[key].lane)
134 self.failUnlessEqual(b.phasing[key].phasing,
135 b2.phasing[key].phasing)
136 self.failUnlessEqual(b.phasing[key].prephasing,
137 b2.phasing[key].prephasing)
138 check_crosstalk(b2.crosstalk)
140 def test_gerald(self):
141 # need to update gerald and make tests for it
142 g = gerald.gerald(self.gerald_dir)
144 self.failUnlessEqual(g.version,
145 '@(#) Id: GERALD.pl,v 1.171 2008/05/19 17:36:14 mzerara Exp')
146 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
147 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
148 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
151 # list of genomes, matches what was defined up in
152 # make_gerald_config.
153 # the first None is to offset the genomes list to be 1..9
154 # instead of pythons default 0..8
159 '/g/arabidopsis01222004',
165 # test lane specific parameters from gerald config file
167 cur_lane = g.lanes[i]
168 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
169 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
170 self.failUnlessEqual(cur_lane.read_length, '37')
171 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
173 # I want to be able to use a simple iterator
174 for l in g.lanes.values():
175 self.failUnlessEqual(l.analysis, 'eland_extended')
176 self.failUnlessEqual(l.read_length, '37')
177 self.failUnlessEqual(l.use_bases, 'Y'*37)
179 # test data extracted from summary file
181 (126910, 4300), (165739, 6792),
182 (196565, 8216), (153897, 8501),
183 (135536, 3908), (154083, 9315),
184 (159991, 9292), (198479, 17671),]
186 self.failUnlessEqual(len(g.summary), 1)
188 summary_lane = g.summary[0][i]
189 self.failUnlessEqual(summary_lane.cluster, clusters[i])
190 self.failUnlessEqual(summary_lane.lane, i)
192 xml = g.get_elements()
193 # just make sure that element tree can serialize the tree
194 xml_str = ElementTree.tostring(xml)
195 g2 = gerald.Gerald(xml=xml)
197 # do it all again after extracting from the xml file
198 self.failUnlessEqual(g.version, g2.version)
199 self.failUnlessEqual(g.date, g2.date)
200 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
201 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
203 # test lane specific parameters from gerald config file
206 g2_lane = g2.lanes[i]
207 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
208 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
209 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
210 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
212 # test (some) summary elements
213 self.failUnlessEqual(len(g.summary), 1)
215 g_summary = g.summary[0][i]
216 g2_summary = g2.summary[0][i]
217 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
218 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
220 g_eland = g.eland_results
221 g2_eland = g2.eland_results
222 for lane in g_eland.results[0].keys():
223 g_results = g_eland.results[0][lane]
224 g2_results = g2_eland.results[0][lane]
225 self.failUnlessEqual(g_results.reads,
227 if isinstance(g_results, eland.ElandLane):
228 self.failUnlessEqual(len(g_results.mapped_reads),
229 len(g2_results.mapped_reads))
230 for k in g_results.mapped_reads.keys():
231 self.failUnlessEqual(g_results.mapped_reads[k],
232 g2_results.mapped_reads[k])
234 self.failUnlessEqual(len(g_results.match_codes),
235 len(g2_results.match_codes))
236 for k in g_results.match_codes.keys():
237 self.failUnlessEqual(g_results.match_codes[k],
238 g2_results.match_codes[k])
241 def test_eland(self):
242 hg_map = {'Lambda.fa': 'Lambda.fa'}
243 for i in range(1,22):
244 short_name = 'chr%d.fa' % (i,)
245 long_name = 'hg18/chr%d.fa' % (i,)
246 hg_map[short_name] = long_name
248 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
249 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
250 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
252 # I added sequence lanes to the last 2 lanes of this test case
254 lane = eland_container.results[0][i]
255 self.failUnlessEqual(lane.reads, 6)
256 self.failUnlessEqual(lane.sample_name, "s")
257 self.failUnlessEqual(lane.lane_id, i)
258 self.failUnlessEqual(len(lane.mapped_reads), 17)
259 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
260 self.failUnlessEqual(lane.match_codes['U0'], 3)
261 self.failUnlessEqual(lane.match_codes['R0'], 2)
262 self.failUnlessEqual(lane.match_codes['U1'], 1)
263 self.failUnlessEqual(lane.match_codes['R1'], 9)
264 self.failUnlessEqual(lane.match_codes['U2'], 0)
265 self.failUnlessEqual(lane.match_codes['R2'], 12)
266 self.failUnlessEqual(lane.match_codes['NM'], 1)
267 self.failUnlessEqual(lane.match_codes['QC'], 0)
270 lane = eland_container.results[0][7]
271 self.failUnlessEqual(lane.reads, 5)
272 self.failUnlessEqual(lane.sample_name, 's')
273 self.failUnlessEqual(lane.lane_id, 7)
274 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
277 lane = eland_container.results[0][8]
278 self.failUnlessEqual(lane.reads, 3)
279 self.failUnlessEqual(lane.sample_name, 's')
280 self.failUnlessEqual(lane.lane_id, 8)
281 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
283 xml = eland_container.get_elements()
284 # just make sure that element tree can serialize the tree
285 xml_str = ElementTree.tostring(xml)
286 e2 = gerald.ELAND(xml=xml)
289 l1 = eland_container.results[0][i]
290 l2 = e2.results[0][i]
291 self.failUnlessEqual(l1.reads, l2.reads)
292 self.failUnlessEqual(l1.sample_name, l2.sample_name)
293 self.failUnlessEqual(l1.lane_id, l2.lane_id)
294 if isinstance(l1, eland.ElandLane):
295 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
296 self.failUnlessEqual(len(l1.mapped_reads), 17)
297 for k in l1.mapped_reads.keys():
298 self.failUnlessEqual(l1.mapped_reads[k],
301 self.failUnlessEqual(len(l1.match_codes), 9)
302 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
303 for k in l1.match_codes.keys():
304 self.failUnlessEqual(l1.match_codes[k],
306 elif isinstance(l1, eland.SequenceLane):
307 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
309 def test_runfolder(self):
310 runs = runfolder.get_runs(self.runfolder_dir)
312 # do we get the flowcell id from the filename?
313 self.failUnlessEqual(len(runs), 1)
314 name = 'run_3021JAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
315 self.failUnlessEqual(runs[0].name, name)
317 # do we get the flowcell id from the FlowcellId.xml file
318 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
319 runs = runfolder.get_runs(self.runfolder_dir)
320 self.failUnlessEqual(len(runs), 1)
321 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
322 self.failUnlessEqual(runs[0].name, name)
325 xml = r1.get_elements()
326 xml_str = ElementTree.tostring(xml)
328 r2 = runfolder.PipelineRun(xml=xml)
329 self.failUnlessEqual(r1.name, r2.name)
330 self.failIfEqual(r2.image_analysis, None)
331 self.failIfEqual(r2.bustard, None)
332 self.failIfEqual(r2.gerald, None)
336 return unittest.makeSuite(RunfolderTests,'test')
338 if __name__ == "__main__":
339 unittest.main(defaultTest="suite")