3 from datetime import datetime, date
9 from htsworkflow.pipelines import firecrest
10 from htsworkflow.pipelines import bustard
11 from htsworkflow.pipelines import gerald
12 from htsworkflow.pipelines.eland import SampleKey
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.runfolder import ElementTree
16 from htsworkflow.pipelines.test.simulate_runfolder import *
19 def make_runfolder(obj=None):
21 Make a fake runfolder, attach all the directories to obj if defined
23 # make a fake runfolder directory
24 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
26 runfolder_dir = os.path.join(temp_dir,
27 '080102_HWI-EAS229_0010_207BTAAXX')
28 os.mkdir(runfolder_dir)
30 data_dir = os.path.join(runfolder_dir, 'Data')
33 ipar_dir = make_firecrest_dir(data_dir, "1.9.6", 1, 152)
35 matrix_dir = os.path.join(ipar_dir, 'Matrix')
37 matrix_name = os.path.join(matrix_dir, 's_matrix.txt')
38 make_matrix(matrix_name)
40 bustard_dir = os.path.join(ipar_dir,
41 'Bustard1.8.28_12-04-2008_diane')
43 make_phasing_params(bustard_dir)
45 gerald_dir = os.path.join(bustard_dir,
46 'GERALD_12-04-2008_diane')
48 make_gerald_config_100(gerald_dir)
49 make_summary_paired_htm(gerald_dir)
50 make_eland_multi(gerald_dir, paired=True)
53 obj.temp_dir = temp_dir
54 obj.runfolder_dir = runfolder_dir
55 obj.data_dir = data_dir
56 obj.image_analysis_dir = ipar_dir
57 obj.matrix_dir = matrix_dir
58 obj.bustard_dir = bustard_dir
59 obj.gerald_dir = gerald_dir
62 class RunfolderTests(unittest.TestCase):
64 Test components of the runfolder processing code
65 which includes firecrest, bustard, and gerald
68 # attaches all the directories to the object passed in
72 shutil.rmtree(self.temp_dir)
74 def test_firecrest(self):
76 Construct a firecrest object
78 f = firecrest.firecrest(self.image_analysis_dir)
79 self.failUnlessEqual(f.software, 'Firecrest')
80 self.failUnlessEqual(f.version, '1.9.6')
81 self.failUnlessEqual(f.start, 1)
82 self.failUnlessEqual(f.stop, 152)
83 self.failUnlessEqual(f.user, 'diane')
84 # As of 2008-12-8, the date was being set in
85 # simulate_runfolder.make_firecrest_dir
86 self.failUnlessEqual(f.date, date(2008,4,12))
88 xml = f.get_elements()
89 # just make sure that element tree can serialize the tree
90 xml_str = ElementTree.tostring(xml)
92 f2 = firecrest.Firecrest(xml=xml)
93 self.failUnlessEqual(f.software, f2.software)
94 self.failUnlessEqual(f.version, f2.version)
95 self.failUnlessEqual(f.start, f2.start)
96 self.failUnlessEqual(f.stop, f2.stop)
97 self.failUnlessEqual(f.user, f2.user)
99 def test_bustard(self):
101 construct a bustard object
103 b = bustard.bustard(self.bustard_dir)
104 self.failUnlessEqual(b.software, 'Bustard')
105 self.failUnlessEqual(b.version, '1.8.28')
106 self.failUnlessEqual(b.date, date(2008,4,12))
107 self.failUnlessEqual(b.user, 'diane')
108 self.failUnlessEqual(len(b.phasing), 8)
109 self.failUnlessAlmostEqual(b.phasing[8].phasing, 0.0099)
111 xml = b.get_elements()
112 b2 = bustard.Bustard(xml=xml)
113 self.failUnlessEqual(b.software, b2.software)
114 self.failUnlessEqual(b.version, b2.version)
115 self.failUnlessEqual(b.date, b2.date )
116 self.failUnlessEqual(b.user, b2.user)
117 self.failUnlessEqual(len(b.phasing), len(b2.phasing))
118 for key in b.phasing.keys():
119 self.failUnlessEqual(b.phasing[key].lane,
120 b2.phasing[key].lane)
121 self.failUnlessEqual(b.phasing[key].phasing,
122 b2.phasing[key].phasing)
123 self.failUnlessEqual(b.phasing[key].prephasing,
124 b2.phasing[key].prephasing)
126 def test_gerald(self):
127 # need to update gerald and make tests for it
128 g = gerald.gerald(self.gerald_dir)
130 self.failUnlessEqual(g.software, 'GERALD')
131 self.failUnlessEqual(g.version, '1.171')
132 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
133 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
134 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
137 # list of genomes, matches what was defined up in
138 # make_gerald_config.
139 # the first None is to offset the genomes list to be 1..9
140 # instead of pythons default 0..8
145 '/g/arabidopsis01222004',
151 # test lane specific parameters from gerald config file
153 cur_lane = g.lanes[i]
154 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
155 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
156 self.failUnlessEqual(cur_lane.read_length, '37')
157 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
159 # I want to be able to use a simple iterator
160 for l in g.lanes.values():
161 self.failUnlessEqual(l.analysis, 'eland_extended')
162 self.failUnlessEqual(l.read_length, '37')
163 self.failUnlessEqual(l.use_bases, 'Y'*37)
165 # test data extracted from summary file
167 (103646, 4515), (106678, 4652),
168 (84583, 5963), (68813, 4782),
169 (104854, 4664), (43555, 1632),
170 (54265, 1588), (64363, 2697),],
172 (103647, 4516), (106679, 4653),
173 (84584, 5964), (68814, 4783),
174 (104855, 4665), (43556, 1633),
175 (54266, 1589), (64364, 2698),],]
178 for lane in range(1,9):
179 summary_lane = g.summary[end][lane]
180 self.failUnlessEqual(summary_lane.cluster, clusters[end][lane])
181 self.failUnlessEqual(summary_lane.lane, lane)
183 xml = g.get_elements()
184 # just make sure that element tree can serialize the tree
185 xml_str = ElementTree.tostring(xml)
186 g2 = gerald.Gerald(xml=xml)
188 # do it all again after extracting from the xml file
189 self.failUnlessEqual(g.software, g2.software)
190 self.failUnlessEqual(g.version, g2.version)
191 self.failUnlessEqual(g.date, g2.date)
192 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
193 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
195 # test lane specific parameters from gerald config file
198 g2_lane = g2.lanes[i]
199 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
200 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
201 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
202 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
204 # test (some) summary elements
207 g_summary = g.summary[end][i]
208 g2_summary = g2.summary[end][i]
209 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
210 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
212 g_eland = g.eland_results
213 g2_eland = g2.eland_results
215 g_results = g_eland[key]
216 g2_results = g2_eland[key]
217 self.failUnlessEqual(g_results.reads,
219 self.failUnlessEqual(len(g_results.mapped_reads),
220 len(g2_results.mapped_reads))
221 for k in g_results.mapped_reads.keys():
222 self.failUnlessEqual(g_results.mapped_reads[k],
223 g2_results.mapped_reads[k])
225 self.failUnlessEqual(len(g_results.match_codes),
226 len(g2_results.match_codes))
227 for k in g_results.match_codes.keys():
228 self.failUnlessEqual(g_results.match_codes[k],
229 g2_results.match_codes[k])
232 def test_eland(self):
233 hg_map = {'Lambda.fa': 'Lambda.fa'}
234 for i in range(1,22):
235 short_name = 'chr%d.fa' % (i,)
236 long_name = 'hg18/chr%d.fa' % (i,)
237 hg_map[short_name] = long_name
239 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
240 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
241 eland = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
244 for key in eland.find_keys(SampleKey(read=1)):
246 self.failUnlessEqual(lane.reads, 6)
247 self.failUnlessEqual(lane.sample_name, "s")
248 self.failUnlessEqual(lane.lane_id, key.lane)
249 self.failUnlessEqual(len(lane.mapped_reads), 17)
250 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
251 self.failUnlessEqual(lane.match_codes['U0'], 3)
252 self.failUnlessEqual(lane.match_codes['R0'], 2)
253 self.failUnlessEqual(lane.match_codes['U1'], 1)
254 self.failUnlessEqual(lane.match_codes['R1'], 9)
255 self.failUnlessEqual(lane.match_codes['U2'], 0)
256 self.failUnlessEqual(lane.match_codes['R2'], 12)
257 self.failUnlessEqual(lane.match_codes['NM'], 1)
258 self.failUnlessEqual(lane.match_codes['QC'], 0)
261 for key in eland.find_keys(SampleKey(read=2)):
263 self.failUnlessEqual(lane.reads, 7)
264 self.failUnlessEqual(lane.sample_name, "s")
265 self.failUnlessEqual(lane.lane_id, key.lane)
266 self.failUnlessEqual(len(lane.mapped_reads), 17)
267 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
268 self.failUnlessEqual(lane.match_codes['U0'], 3)
269 self.failUnlessEqual(lane.match_codes['R0'], 2)
270 self.failUnlessEqual(lane.match_codes['U1'], 1)
271 self.failUnlessEqual(lane.match_codes['R1'], 9)
272 self.failUnlessEqual(lane.match_codes['U2'], 0)
273 self.failUnlessEqual(lane.match_codes['R2'], 12)
274 self.failUnlessEqual(lane.match_codes['NM'], 1)
275 self.failUnlessEqual(lane.match_codes['QC'], 1)
277 xml = eland.get_elements()
278 # just make sure that element tree can serialize the tree
279 xml_str = ElementTree.tostring(xml)
280 e2 = gerald.ELAND(xml=xml)
285 self.failUnlessEqual(l1.reads, l2.reads)
286 self.failUnlessEqual(l1.sample_name, l2.sample_name)
287 self.failUnlessEqual(l1.lane_id, l2.lane_id)
288 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
289 self.failUnlessEqual(len(l1.mapped_reads), 17)
290 for k in l1.mapped_reads.keys():
291 self.failUnlessEqual(l1.mapped_reads[k],
294 self.failUnlessEqual(len(l1.match_codes), 9)
295 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
296 for k in l1.match_codes.keys():
297 self.failUnlessEqual(l1.match_codes[k],
300 def test_runfolder(self):
301 runs = runfolder.get_runs(self.runfolder_dir)
303 # do we get the flowcell id from the filename?
304 self.failUnlessEqual(len(runs), 1)
305 # firecrest's date depends on filename not the create time.
306 name = 'run_207BTAAXX_2009-02-22.xml'
307 self.failUnlessEqual(runs[0].name, name)
309 # do we get the flowcell id from the FlowcellId.xml file
310 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
311 runs = runfolder.get_runs(self.runfolder_dir)
312 self.failUnlessEqual(len(runs), 1)
313 name = 'run_207BTAAXY_2009-02-22.xml'
314 self.failUnlessEqual(runs[0].name, name)
317 xml = r1.get_elements()
318 xml_str = ElementTree.tostring(xml)
320 r2 = runfolder.PipelineRun(xml=xml)
321 self.failUnlessEqual(r1.name, r2.name)
322 self.failIfEqual(r2.image_analysis, None)
323 self.failIfEqual(r2.bustard, None)
324 self.failIfEqual(r2.gerald, None)
328 return unittest.makeSuite(RunfolderTests,'test')
330 if __name__ == "__main__":
331 unittest.main(defaultTest="suite")