3 from datetime import datetime, date
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.runfolder import ElementTree
16 from htsworkflow.pipelines.test.simulate_runfolder import *
19 def make_runfolder(obj=None):
21 Make a fake runfolder, attach all the directories to obj if defined
23 # make a fake runfolder directory
24 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
26 runfolder_dir = os.path.join(temp_dir,
27 '090313_HWI-EAS229_0101_3021JAAXX')
28 os.mkdir(runfolder_dir)
30 data_dir = os.path.join(runfolder_dir, 'Data')
33 ipar_dir = make_ipar_dir(data_dir, '1.30')
35 bustard_dir = os.path.join(ipar_dir,
36 'Bustard1.3.2_15-03-2008_diane')
38 make_phasing_params(bustard_dir)
39 make_bustard_config132(bustard_dir)
41 gerald_dir = os.path.join(bustard_dir,
42 'GERALD_15-03-2008_diane')
44 make_gerald_config_100(gerald_dir)
45 make_summary_ipar130_htm(gerald_dir)
46 make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
47 make_scarf(gerald_dir, lane_list=[7,])
48 make_fastq(gerald_dir, lane_list=[8,])
51 obj.temp_dir = temp_dir
52 obj.runfolder_dir = runfolder_dir
53 obj.data_dir = data_dir
54 obj.image_analysis_dir = ipar_dir
55 obj.bustard_dir = bustard_dir
56 obj.gerald_dir = gerald_dir
59 class RunfolderTests(unittest.TestCase):
61 Test components of the runfolder processing code
62 which includes firecrest, bustard, and gerald
65 # attaches all the directories to the object passed in
69 shutil.rmtree(self.temp_dir)
73 Construct a firecrest object
75 i = ipar.ipar(self.image_analysis_dir)
76 self.failUnlessEqual(i.version, '2.01.192.0')
77 self.failUnlessEqual(i.start, 1)
78 self.failUnlessEqual(i.stop, 37)
80 xml = i.get_elements()
81 # just make sure that element tree can serialize the tree
82 xml_str = ElementTree.tostring(xml)
84 i2 = ipar.IPAR(xml=xml)
85 self.failUnlessEqual(i.version, i2.version)
86 self.failUnlessEqual(i.start, i2.start)
87 self.failUnlessEqual(i.stop, i2.stop)
88 self.failUnlessEqual(i.date, i2.date)
89 self.failUnlessEqual(i.file_list(), i2.file_list())
91 def test_bustard(self):
93 construct a bustard object
95 def check_crosstalk(crosstalk):
96 self.failUnlessAlmostEqual(crosstalk.base['A'][0], 1.27)
97 self.failUnlessAlmostEqual(crosstalk.base['A'][1], 0.20999999999999)
98 self.failUnlessAlmostEqual(crosstalk.base['A'][2], -0.02)
99 self.failUnlessAlmostEqual(crosstalk.base['A'][3], -0.03)
101 self.failUnlessAlmostEqual(crosstalk.base['C'][0], 0.57)
102 self.failUnlessAlmostEqual(crosstalk.base['C'][1], 0.58)
103 self.failUnlessAlmostEqual(crosstalk.base['C'][2], -0.01)
104 self.failUnlessAlmostEqual(crosstalk.base['C'][3], -0.01)
106 self.failUnlessAlmostEqual(crosstalk.base['T'][0], -0.02)
107 self.failUnlessAlmostEqual(crosstalk.base['T'][1], -0.02)
108 self.failUnlessAlmostEqual(crosstalk.base['T'][2], 0.80)
109 self.failUnlessAlmostEqual(crosstalk.base['T'][3], 1.07)
111 self.failUnlessAlmostEqual(crosstalk.base['G'][0], -0.03)
112 self.failUnlessAlmostEqual(crosstalk.base['G'][1], -0.04)
113 self.failUnlessAlmostEqual(crosstalk.base['G'][2], 1.51)
114 self.failUnlessAlmostEqual(crosstalk.base['G'][3], -0.02)
116 b = bustard.bustard(self.bustard_dir)
117 self.failUnlessEqual(b.version, '1.3.2')
118 self.failUnlessEqual(b.date, date(2008,3,15))
119 self.failUnlessEqual(b.user, 'diane')
120 self.failUnlessEqual(len(b.phasing), 8)
121 self.failUnlessAlmostEqual(b.phasing[8].phasing, 0.0099)
122 self.failUnlessEqual(b.crosstalk.base.keys(), ['A','C','T','G'])
123 check_crosstalk(b.crosstalk)
125 xml = b.get_elements()
126 b2 = bustard.Bustard(xml=xml)
127 self.failUnlessEqual(b.version, b2.version)
128 self.failUnlessEqual(b.date, b2.date )
129 self.failUnlessEqual(b.user, b2.user)
130 self.failUnlessEqual(len(b.phasing), len(b2.phasing))
131 for key in b.phasing.keys():
132 self.failUnlessEqual(b.phasing[key].lane,
133 b2.phasing[key].lane)
134 self.failUnlessEqual(b.phasing[key].phasing,
135 b2.phasing[key].phasing)
136 self.failUnlessEqual(b.phasing[key].prephasing,
137 b2.phasing[key].prephasing)
138 check_crosstalk(b2.crosstalk)
140 def test_gerald(self):
141 # need to update gerald and make tests for it
142 g = gerald.gerald(self.gerald_dir)
144 self.failUnlessEqual(g.version, '1.171')
145 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
146 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
147 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
150 # list of genomes, matches what was defined up in
151 # make_gerald_config.
152 # the first None is to offset the genomes list to be 1..9
153 # instead of pythons default 0..8
158 '/g/arabidopsis01222004',
164 # test lane specific parameters from gerald config file
166 cur_lane = g.lanes[i]
167 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
168 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
169 self.failUnlessEqual(cur_lane.read_length, '37')
170 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
172 # I want to be able to use a simple iterator
173 for l in g.lanes.values():
174 self.failUnlessEqual(l.analysis, 'eland_extended')
175 self.failUnlessEqual(l.read_length, '37')
176 self.failUnlessEqual(l.use_bases, 'Y'*37)
178 # test data extracted from summary file
180 (126910, 4300), (165739, 6792),
181 (196565, 8216), (153897, 8501),
182 (135536, 3908), (154083, 9315),
183 (159991, 9292), (198479, 17671),]
185 self.failUnlessEqual(len(g.summary), 1)
187 summary_lane = g.summary[0][i]
188 self.failUnlessEqual(summary_lane.cluster, clusters[i])
189 self.failUnlessEqual(summary_lane.lane, i)
191 xml = g.get_elements()
192 # just make sure that element tree can serialize the tree
193 xml_str = ElementTree.tostring(xml)
194 g2 = gerald.Gerald(xml=xml)
196 # do it all again after extracting from the xml file
197 self.failUnlessEqual(g.version, g2.version)
198 self.failUnlessEqual(g.date, g2.date)
199 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
200 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
202 # test lane specific parameters from gerald config file
205 g2_lane = g2.lanes[i]
206 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
207 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
208 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
209 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
211 # test (some) summary elements
212 self.failUnlessEqual(len(g.summary), 1)
214 g_summary = g.summary[0][i]
215 g2_summary = g2.summary[0][i]
216 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
217 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
219 g_eland = g.eland_results
220 g2_eland = g2.eland_results
221 for lane in g_eland.results[0].keys():
222 g_results = g_eland.results[0][lane]
223 g2_results = g2_eland.results[0][lane]
224 self.failUnlessEqual(g_results.reads,
226 if isinstance(g_results, eland.ElandLane):
227 self.failUnlessEqual(len(g_results.mapped_reads),
228 len(g2_results.mapped_reads))
229 for k in g_results.mapped_reads.keys():
230 self.failUnlessEqual(g_results.mapped_reads[k],
231 g2_results.mapped_reads[k])
233 self.failUnlessEqual(len(g_results.match_codes),
234 len(g2_results.match_codes))
235 for k in g_results.match_codes.keys():
236 self.failUnlessEqual(g_results.match_codes[k],
237 g2_results.match_codes[k])
240 def test_eland(self):
241 hg_map = {'Lambda.fa': 'Lambda.fa'}
242 for i in range(1,22):
243 short_name = 'chr%d.fa' % (i,)
244 long_name = 'hg18/chr%d.fa' % (i,)
245 hg_map[short_name] = long_name
247 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
248 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
249 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
251 # I added sequence lanes to the last 2 lanes of this test case
253 lane = eland_container.results[0][i]
254 self.failUnlessEqual(lane.reads, 6)
255 self.failUnlessEqual(lane.sample_name, "s")
256 self.failUnlessEqual(lane.lane_id, i)
257 self.failUnlessEqual(len(lane.mapped_reads), 17)
258 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
259 self.failUnlessEqual(lane.match_codes['U0'], 3)
260 self.failUnlessEqual(lane.match_codes['R0'], 2)
261 self.failUnlessEqual(lane.match_codes['U1'], 1)
262 self.failUnlessEqual(lane.match_codes['R1'], 9)
263 self.failUnlessEqual(lane.match_codes['U2'], 0)
264 self.failUnlessEqual(lane.match_codes['R2'], 12)
265 self.failUnlessEqual(lane.match_codes['NM'], 1)
266 self.failUnlessEqual(lane.match_codes['QC'], 0)
269 lane = eland_container.results[0][7]
270 self.failUnlessEqual(lane.reads, 5)
271 self.failUnlessEqual(lane.sample_name, 's')
272 self.failUnlessEqual(lane.lane_id, 7)
273 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
276 lane = eland_container.results[0][8]
277 self.failUnlessEqual(lane.reads, 3)
278 self.failUnlessEqual(lane.sample_name, 's')
279 self.failUnlessEqual(lane.lane_id, 8)
280 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
282 xml = eland_container.get_elements()
283 # just make sure that element tree can serialize the tree
284 xml_str = ElementTree.tostring(xml)
285 e2 = gerald.ELAND(xml=xml)
288 l1 = eland_container.results[0][i]
289 l2 = e2.results[0][i]
290 self.failUnlessEqual(l1.reads, l2.reads)
291 self.failUnlessEqual(l1.sample_name, l2.sample_name)
292 self.failUnlessEqual(l1.lane_id, l2.lane_id)
293 if isinstance(l1, eland.ElandLane):
294 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
295 self.failUnlessEqual(len(l1.mapped_reads), 17)
296 for k in l1.mapped_reads.keys():
297 self.failUnlessEqual(l1.mapped_reads[k],
300 self.failUnlessEqual(len(l1.match_codes), 9)
301 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
302 for k in l1.match_codes.keys():
303 self.failUnlessEqual(l1.match_codes[k],
305 elif isinstance(l1, eland.SequenceLane):
306 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
308 def test_runfolder(self):
309 runs = runfolder.get_runs(self.runfolder_dir)
311 # do we get the flowcell id from the filename?
312 self.failUnlessEqual(len(runs), 1)
313 name = 'run_3021JAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
314 self.failUnlessEqual(runs[0].name, name)
316 # do we get the flowcell id from the FlowcellId.xml file
317 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
318 runs = runfolder.get_runs(self.runfolder_dir)
319 self.failUnlessEqual(len(runs), 1)
320 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
321 self.failUnlessEqual(runs[0].name, name)
324 xml = r1.get_elements()
325 xml_str = ElementTree.tostring(xml)
327 r2 = runfolder.PipelineRun(xml=xml)
328 self.failUnlessEqual(r1.name, r2.name)
329 self.failIfEqual(r2.image_analysis, None)
330 self.failIfEqual(r2.bustard, None)
331 self.failIfEqual(r2.gerald, None)
335 return unittest.makeSuite(RunfolderTests,'test')
337 if __name__ == "__main__":
338 unittest.main(defaultTest="suite")