2 from __future__ import absolute_import
4 from datetime import datetime, date
8 from unittest import TestCase
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines import ElementTree
16 from .simulate_runfolder import *
19 def make_runfolder(obj=None):
21 Make a fake runfolder, attach all the directories to obj if defined
23 # make a fake runfolder directory
24 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
26 runfolder_dir = os.path.join(temp_dir,
27 '080102_HWI-EAS229_0010_207BTAAXX')
28 os.mkdir(runfolder_dir)
30 data_dir = os.path.join(runfolder_dir, 'Data')
33 ipar_dir = make_ipar_dir(data_dir)
35 matrix_dir = os.path.join(ipar_dir, 'Matrix')
37 matrix_name = os.path.join(matrix_dir, 's_matrix.txt')
38 make_matrix(matrix_name)
40 bustard_dir = os.path.join(ipar_dir,
41 'Bustard1.8.28_12-04-2008_diane')
43 make_phasing_params(bustard_dir)
45 gerald_dir = os.path.join(bustard_dir,
46 'GERALD_12-04-2008_diane')
48 make_gerald_config_100(gerald_dir)
49 make_summary_htm_100(gerald_dir)
50 make_eland_multi(gerald_dir)
53 obj.temp_dir = temp_dir
54 obj.runfolder_dir = runfolder_dir
55 obj.data_dir = data_dir
56 obj.image_analysis_dir = ipar_dir
57 obj.matrix_dir = matrix_dir
58 obj.bustard_dir = bustard_dir
59 obj.gerald_dir = gerald_dir
62 class RunfolderTests(TestCase):
64 Test components of the runfolder processing code
65 which includes firecrest, bustard, and gerald
68 # attaches all the directories to the object passed in
72 shutil.rmtree(self.temp_dir)
76 Construct a firecrest object
78 i = ipar.ipar(self.image_analysis_dir)
79 self.failUnlessEqual(i.software, 'IPAR')
80 self.failUnlessEqual(i.version, '2.01.192.0')
81 self.failUnlessEqual(i.start, 1)
82 self.failUnlessEqual(i.stop, 37)
84 xml = i.get_elements()
85 # just make sure that element tree can serialize the tree
86 xml_str = ElementTree.tostring(xml)
88 i2 = ipar.IPAR(xml=xml)
89 self.failUnlessEqual(i.software, i2.software)
90 self.failUnlessEqual(i.version, i2.version)
91 self.failUnlessEqual(i.start, i2.start)
92 self.failUnlessEqual(i.stop, i2.stop)
93 self.failUnlessEqual(i.date, i2.date)
94 self.failUnlessEqual(i.file_list(), i2.file_list())
96 def test_bustard(self):
98 construct a bustard object
100 b = bustard.bustard(self.bustard_dir)
101 self.failUnlessEqual(b.software, 'Bustard')
102 self.failUnlessEqual(b.version, '1.8.28')
103 self.failUnlessEqual(b.date, date(2008,4,12))
104 self.failUnlessEqual(b.user, 'diane')
105 self.failUnlessEqual(len(b.phasing), 8)
106 self.failUnlessAlmostEqual(b.phasing[8].phasing, 0.0099)
108 xml = b.get_elements()
109 b2 = bustard.Bustard(xml=xml)
110 self.failUnlessEqual(b.software, b2.software)
111 self.failUnlessEqual(b.version, b2.version)
112 self.failUnlessEqual(b.date, b2.date )
113 self.failUnlessEqual(b.user, b2.user)
114 self.failUnlessEqual(len(b.phasing), len(b2.phasing))
115 for key in b.phasing.keys():
116 self.failUnlessEqual(b.phasing[key].lane,
117 b2.phasing[key].lane)
118 self.failUnlessEqual(b.phasing[key].phasing,
119 b2.phasing[key].phasing)
120 self.failUnlessEqual(b.phasing[key].prephasing,
121 b2.phasing[key].prephasing)
123 def test_gerald(self):
124 # need to update gerald and make tests for it
125 g = gerald.gerald(self.gerald_dir)
127 self.failUnlessEqual(g.software, 'GERALD')
128 self.failUnlessEqual(g.version, '1.171')
129 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
130 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
131 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
134 # list of genomes, matches what was defined up in
135 # make_gerald_config.
136 # the first None is to offset the genomes list to be 1..9
137 # instead of pythons default 0..8
142 '/g/arabidopsis01222004',
148 # test lane specific parameters from gerald config file
150 cur_lane = g.lanes[i]
151 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
152 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
153 self.failUnlessEqual(cur_lane.read_length, '37')
154 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
156 # I want to be able to use a simple iterator
157 for l in g.lanes.values():
158 self.failUnlessEqual(l.analysis, 'eland_extended')
159 self.failUnlessEqual(l.read_length, '37')
160 self.failUnlessEqual(l.use_bases, 'Y'*37)
162 # test data extracted from summary file
164 (96483, 9074), (133738, 7938),
165 (152142, 10002), (15784, 2162),
166 (119735, 8465), (152177, 8146),
167 (84649, 7325), (54622, 4812),]
169 self.failUnlessEqual(len(g.summary), 1)
171 summary_lane = g.summary[0][i]
172 self.failUnlessEqual(summary_lane.cluster, clusters[i])
173 self.failUnlessEqual(summary_lane.lane, i)
175 xml = g.get_elements()
176 # just make sure that element tree can serialize the tree
177 xml_str = ElementTree.tostring(xml)
178 g2 = gerald.Gerald(xml=xml)
180 # do it all again after extracting from the xml file
181 self.failUnlessEqual(g.software, g2.software)
182 self.failUnlessEqual(g.version, g2.version)
183 self.failUnlessEqual(g.date, g2.date)
184 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
185 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
187 # test lane specific parameters from gerald config file
190 g2_lane = g2.lanes[i]
191 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
192 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
193 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
194 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
196 # test (some) summary elements
197 self.failUnlessEqual(len(g.summary), 1)
199 g_summary = g.summary[0][i]
200 g2_summary = g2.summary[0][i]
201 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
202 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
204 g_eland = g.eland_results
205 g2_eland = g2.eland_results
207 g_results = g_eland[key]
208 g2_results = g2_eland[key]
209 self.failUnlessEqual(g_results.reads,
211 self.failUnlessEqual(len(g_results.mapped_reads),
212 len(g2_results.mapped_reads))
213 for k in g_results.mapped_reads.keys():
214 self.failUnlessEqual(g_results.mapped_reads[k],
215 g2_results.mapped_reads[k])
217 self.failUnlessEqual(len(g_results.match_codes),
218 len(g2_results.match_codes))
219 for k in g_results.match_codes.keys():
220 self.failUnlessEqual(g_results.match_codes[k],
221 g2_results.match_codes[k])
224 def test_eland(self):
225 hg_map = {'Lambda.fa': 'Lambda.fa'}
226 for i in range(1,22):
227 short_name = 'chr%d.fa' % (i,)
228 long_name = 'hg18/chr%d.fa' % (i,)
229 hg_map[short_name] = long_name
231 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
232 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
233 eland = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
237 self.failUnlessEqual(lane.reads, 6)
238 self.failUnlessEqual(lane.sample_name, "s")
239 self.failUnlessEqual(lane.lane_id, key.lane)
240 self.failUnlessEqual(len(lane.mapped_reads), 17)
241 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
242 self.failUnlessEqual(lane.mapped_reads['spike.fa/sample1'], 1)
243 self.failUnlessEqual(lane.mapped_reads['spike.fa/sample2'], 1)
244 self.failUnlessEqual(lane.match_codes['U0'], 3)
245 self.failUnlessEqual(lane.match_codes['R0'], 2)
246 self.failUnlessEqual(lane.match_codes['U1'], 1)
247 self.failUnlessEqual(lane.match_codes['R1'], 9)
248 self.failUnlessEqual(lane.match_codes['U2'], 0)
249 self.failUnlessEqual(lane.match_codes['R2'], 12)
250 self.failUnlessEqual(lane.match_codes['NM'], 1)
251 self.failUnlessEqual(lane.match_codes['QC'], 0)
253 xml = eland.get_elements()
254 # just make sure that element tree can serialize the tree
255 xml_str = ElementTree.tostring(xml)
256 e2 = gerald.ELAND(xml=xml)
261 self.failUnlessEqual(l1.reads, l2.reads)
262 self.failUnlessEqual(l1.sample_name, l2.sample_name)
263 self.failUnlessEqual(l1.lane_id, l2.lane_id)
264 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
265 self.failUnlessEqual(len(l1.mapped_reads), 17)
266 for k in l1.mapped_reads.keys():
267 self.failUnlessEqual(l1.mapped_reads[k],
270 self.failUnlessEqual(len(l1.match_codes), 9)
271 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
272 for k in l1.match_codes.keys():
273 self.failUnlessEqual(l1.match_codes[k],
276 def test_runfolder(self):
277 runs = runfolder.get_runs(self.runfolder_dir)
279 # do we get the flowcell id from the filename?
280 self.failUnlessEqual(len(runs), 1)
281 name = 'run_207BTAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
282 self.failUnlessEqual(runs[0].serialization_filename, name)
284 # do we get the flowcell id from the FlowcellId.xml file
285 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
286 runs = runfolder.get_runs(self.runfolder_dir)
287 self.failUnlessEqual(len(runs), 1)
288 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
289 self.failUnlessEqual(runs[0].serialization_filename, name)
292 xml = r1.get_elements()
293 xml_str = ElementTree.tostring(xml)
295 r2 = runfolder.PipelineRun(xml=xml)
296 self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
297 self.failIfEqual(r2.image_analysis, None)
298 self.failIfEqual(r2.bustard, None)
299 self.failIfEqual(r2.gerald, None)
303 from unittest import TestSuite, defaultTestLoader
305 suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
309 if __name__ == "__main__":
310 from unittest import main
311 main(defaultTest="suite")