2 from __future__ import absolute_import
4 from datetime import datetime, date
8 from unittest import TestCase
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines import ipar
12 from htsworkflow.pipelines import bustard
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines.samplekey import SampleKey
16 from htsworkflow.pipelines import ElementTree
18 from .simulate_runfolder import *
21 def make_runfolder(obj=None):
23 Make a fake runfolder, attach all the directories to obj if defined
25 # make a fake runfolder directory
26 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
28 runfolder_dir = os.path.join(temp_dir,
29 '090608_HWI-EAS229_0117_4286GAAXX')
30 os.mkdir(runfolder_dir)
32 data_dir = os.path.join(runfolder_dir, 'Data')
35 intensities_dir = make_rta_intensities_1460(data_dir)
37 basecalls_dir = make_rta_basecalls_1460(intensities_dir)
39 #make_phasing_params(bustard_dir)
40 #make_bustard_config132(bustard_dir)
42 gerald_dir = os.path.join(basecalls_dir,
43 'GERALD_16-06-2009_diane')
45 make_gerald_config_100(gerald_dir)
46 make_summary_ipar130_htm(gerald_dir)
47 make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
48 make_scarf(gerald_dir, lane_list=[7,])
49 make_fastq(gerald_dir, lane_list=[8,])
52 obj.temp_dir = temp_dir
53 obj.runfolder_dir = runfolder_dir
54 obj.data_dir = data_dir
55 obj.image_analysis_dir = intensities_dir
56 obj.bustard_dir = basecalls_dir
57 obj.gerald_dir = gerald_dir
60 class RunfolderTests(TestCase):
62 Test components of the runfolder processing code
63 which includes firecrest, bustard, and gerald
66 # attaches all the directories to the object passed in
70 shutil.rmtree(self.temp_dir)
74 Construct a firecrest object
76 i = ipar.ipar(self.image_analysis_dir)
77 self.failUnlessEqual(i.version, '1.4.6.0')
78 self.failUnlessEqual(i.start, 1)
79 self.failUnlessEqual(i.stop, 38)
81 xml = i.get_elements()
82 # just make sure that element tree can serialize the tree
83 xml_str = ElementTree.tostring(xml)
85 i2 = ipar.IPAR(xml=xml)
86 self.failUnlessEqual(i.version, i2.version)
87 self.failUnlessEqual(i.start, i2.start)
88 self.failUnlessEqual(i.stop, i2.stop)
89 self.failUnlessEqual(i.date, i2.date)
91 def test_bustard(self):
93 construct a bustard object
95 b = bustard.bustard(self.bustard_dir)
96 self.failUnlessEqual(b.version, '1.4.6.0')
97 self.failUnlessEqual(b.date, None)
98 self.failUnlessEqual(b.user, None)
99 self.failUnlessEqual(len(b.phasing), 0)
101 xml = b.get_elements()
102 b2 = bustard.Bustard(xml=xml)
103 self.failUnlessEqual(b.version, b2.version)
104 self.failUnlessEqual(b.date, b2.date )
105 self.failUnlessEqual(b.user, b2.user)
107 def test_gerald(self):
108 # need to update gerald and make tests for it
109 g = gerald.gerald(self.gerald_dir)
111 self.failUnlessEqual(g.version, '1.171')
112 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
113 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
114 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
117 # list of genomes, matches what was defined up in
118 # make_gerald_config.
119 # the first None is to offset the genomes list to be 1..9
120 # instead of pythons default 0..8
125 '/g/arabidopsis01222004',
131 # test lane specific parameters from gerald config file
133 cur_lane = g.lanes[i]
134 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
135 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
136 self.failUnlessEqual(cur_lane.read_length, '37')
137 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
139 # I want to be able to use a simple iterator
140 for l in g.lanes.values():
141 self.failUnlessEqual(l.analysis, 'eland_extended')
142 self.failUnlessEqual(l.read_length, '37')
143 self.failUnlessEqual(l.use_bases, 'Y'*37)
145 # test data extracted from summary file
147 (126910, 4300), (165739, 6792),
148 (196565, 8216), (153897, 8501),
149 (135536, 3908), (154083, 9315),
150 (159991, 9292), (198479, 17671),]
152 self.failUnlessEqual(len(g.summary), 1)
154 summary_lane = g.summary[0][i]
155 self.failUnlessEqual(summary_lane.cluster, clusters[i])
156 self.failUnlessEqual(summary_lane.lane, i)
158 xml = g.get_elements()
159 # just make sure that element tree can serialize the tree
160 xml_str = ElementTree.tostring(xml)
161 g2 = gerald.Gerald(xml=xml)
163 # do it all again after extracting from the xml file
164 self.failUnlessEqual(g.version, g2.version)
165 self.failUnlessEqual(g.date, g2.date)
166 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
167 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
169 # test lane specific parameters from gerald config file
172 g2_lane = g2.lanes[i]
173 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
174 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
175 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
176 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
178 # test (some) summary elements
179 self.failUnlessEqual(len(g.summary), 1)
181 g_summary = g.summary[0][i]
182 g2_summary = g2.summary[0][i]
183 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
184 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
186 g_eland = g.eland_results
187 g2_eland = g2.eland_results
189 g_results = g_eland[lane]
190 g2_results = g2_eland[lane]
191 self.failUnlessEqual(g_results.reads,
193 if isinstance(g_results, eland.ElandLane):
194 self.failUnlessEqual(len(g_results.mapped_reads),
195 len(g2_results.mapped_reads))
196 for k in g_results.mapped_reads.keys():
197 self.failUnlessEqual(g_results.mapped_reads[k],
198 g2_results.mapped_reads[k])
200 self.failUnlessEqual(len(g_results.match_codes),
201 len(g2_results.match_codes))
202 for k in g_results.match_codes.keys():
203 self.failUnlessEqual(g_results.match_codes[k],
204 g2_results.match_codes[k])
207 def test_eland(self):
208 hg_map = {'Lambda.fa': 'Lambda.fa'}
209 for i in range(1,22):
210 short_name = 'chr%d.fa' % (i,)
211 long_name = 'hg18/chr%d.fa' % (i,)
212 hg_map[short_name] = long_name
214 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
215 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
216 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
218 # I added sequence lanes to the last 2 lanes of this test case
219 lanes = [ SampleKey(lane=i, read=1, sample='s') for i in range(1,7) ]
221 lane = eland_container[key]
222 self.failUnlessEqual(lane.reads, 6)
223 self.failUnlessEqual(lane.sample_name, "s")
224 self.failUnlessEqual(lane.lane_id, key.lane)
225 self.failUnlessEqual(len(lane.mapped_reads), 17)
226 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
227 self.failUnlessEqual(lane.match_codes['U0'], 3)
228 self.failUnlessEqual(lane.match_codes['R0'], 2)
229 self.failUnlessEqual(lane.match_codes['U1'], 1)
230 self.failUnlessEqual(lane.match_codes['R1'], 9)
231 self.failUnlessEqual(lane.match_codes['U2'], 0)
232 self.failUnlessEqual(lane.match_codes['R2'], 12)
233 self.failUnlessEqual(lane.match_codes['NM'], 1)
234 self.failUnlessEqual(lane.match_codes['QC'], 0)
237 lane = eland_container[SampleKey(lane=7, read=1, sample='s')]
238 self.failUnlessEqual(lane.reads, 5)
239 self.failUnlessEqual(lane.sample_name, 's')
240 self.failUnlessEqual(lane.lane_id, 7)
241 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
244 lane = eland_container[SampleKey(lane=8, read=1, sample='s')]
245 self.failUnlessEqual(lane.reads, 3)
246 self.failUnlessEqual(lane.sample_name, 's')
247 self.failUnlessEqual(lane.lane_id, 8)
248 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
250 xml = eland_container.get_elements()
251 # just make sure that element tree can serialize the tree
252 xml_str = ElementTree.tostring(xml)
253 e2 = gerald.ELAND(xml=xml)
255 for key in eland_container:
256 l1 = eland_container[key]
258 self.failUnlessEqual(l1.reads, l2.reads)
259 self.failUnlessEqual(l1.sample_name, l2.sample_name)
260 self.failUnlessEqual(l1.lane_id, l2.lane_id)
261 if isinstance(l1, eland.ElandLane):
262 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
263 self.failUnlessEqual(len(l1.mapped_reads), 17)
264 for k in l1.mapped_reads.keys():
265 self.failUnlessEqual(l1.mapped_reads[k],
268 self.failUnlessEqual(len(l1.match_codes), 9)
269 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
270 for k in l1.match_codes.keys():
271 self.failUnlessEqual(l1.match_codes[k],
273 elif isinstance(l1, eland.SequenceLane):
274 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
276 def test_runfolder(self):
277 runs = runfolder.get_runs(self.runfolder_dir)
279 # do we get the flowcell id from the filename?
280 self.failUnlessEqual(len(runs), 1)
281 name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
282 self.failUnlessEqual(runs[0].serialization_filename, name)
284 # do we get the flowcell id from the FlowcellId.xml file
285 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
286 runs = runfolder.get_runs(self.runfolder_dir)
287 self.failUnlessEqual(len(runs), 1)
288 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
289 self.failUnlessEqual(runs[0].serialization_filename, name)
292 xml = r1.get_elements()
293 xml_str = ElementTree.tostring(xml)
295 r2 = runfolder.PipelineRun(xml=xml)
296 self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
297 self.failIfEqual(r2.image_analysis, None)
298 self.failIfEqual(r2.bustard, None)
299 self.failIfEqual(r2.gerald, None)
303 from unittest import TestSuite, defaultTestLoader
305 suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
309 if __name__ == "__main__":
310 from unittest import main
311 main(defaultTest="suite")