3 from datetime import datetime, date
7 from unittest2 import TestCase
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.samplekey import SampleKey
15 from htsworkflow.pipelines import ElementTree
17 from htsworkflow.pipelines.test.simulate_runfolder import *
20 def make_runfolder(obj=None):
22 Make a fake runfolder, attach all the directories to obj if defined
24 # make a fake runfolder directory
25 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
27 runfolder_dir = os.path.join(temp_dir,
28 '090608_HWI-EAS229_0117_4286GAAXX')
29 os.mkdir(runfolder_dir)
31 data_dir = os.path.join(runfolder_dir, 'Data')
34 intensities_dir = make_rta_intensities_1460(data_dir)
36 basecalls_dir = make_rta_basecalls_1460(intensities_dir)
38 #make_phasing_params(bustard_dir)
39 #make_bustard_config132(bustard_dir)
41 gerald_dir = os.path.join(basecalls_dir,
42 'GERALD_16-06-2009_diane')
44 make_gerald_config_100(gerald_dir)
45 make_summary_ipar130_htm(gerald_dir)
46 make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
47 make_scarf(gerald_dir, lane_list=[7,])
48 make_fastq(gerald_dir, lane_list=[8,])
51 obj.temp_dir = temp_dir
52 obj.runfolder_dir = runfolder_dir
53 obj.data_dir = data_dir
54 obj.image_analysis_dir = intensities_dir
55 obj.bustard_dir = basecalls_dir
56 obj.gerald_dir = gerald_dir
59 class RunfolderTests(TestCase):
61 Test components of the runfolder processing code
62 which includes firecrest, bustard, and gerald
65 # attaches all the directories to the object passed in
69 shutil.rmtree(self.temp_dir)
73 Construct a firecrest object
75 i = ipar.ipar(self.image_analysis_dir)
76 self.failUnlessEqual(i.version, '1.4.6.0')
77 self.failUnlessEqual(i.start, 1)
78 self.failUnlessEqual(i.stop, 38)
80 xml = i.get_elements()
81 # just make sure that element tree can serialize the tree
82 xml_str = ElementTree.tostring(xml)
84 i2 = ipar.IPAR(xml=xml)
85 self.failUnlessEqual(i.version, i2.version)
86 self.failUnlessEqual(i.start, i2.start)
87 self.failUnlessEqual(i.stop, i2.stop)
88 self.failUnlessEqual(i.date, i2.date)
90 def test_bustard(self):
92 construct a bustard object
94 b = bustard.bustard(self.bustard_dir)
95 self.failUnlessEqual(b.version, '1.4.6.0')
96 self.failUnlessEqual(b.date, None)
97 self.failUnlessEqual(b.user, None)
98 self.failUnlessEqual(len(b.phasing), 0)
100 xml = b.get_elements()
101 b2 = bustard.Bustard(xml=xml)
102 self.failUnlessEqual(b.version, b2.version)
103 self.failUnlessEqual(b.date, b2.date )
104 self.failUnlessEqual(b.user, b2.user)
106 def test_gerald(self):
107 # need to update gerald and make tests for it
108 g = gerald.gerald(self.gerald_dir)
110 self.failUnlessEqual(g.version, '1.171')
111 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
112 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
113 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
116 # list of genomes, matches what was defined up in
117 # make_gerald_config.
118 # the first None is to offset the genomes list to be 1..9
119 # instead of pythons default 0..8
124 '/g/arabidopsis01222004',
130 # test lane specific parameters from gerald config file
132 cur_lane = g.lanes[i]
133 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
134 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
135 self.failUnlessEqual(cur_lane.read_length, '37')
136 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
138 # I want to be able to use a simple iterator
139 for l in g.lanes.values():
140 self.failUnlessEqual(l.analysis, 'eland_extended')
141 self.failUnlessEqual(l.read_length, '37')
142 self.failUnlessEqual(l.use_bases, 'Y'*37)
144 # test data extracted from summary file
146 (126910, 4300), (165739, 6792),
147 (196565, 8216), (153897, 8501),
148 (135536, 3908), (154083, 9315),
149 (159991, 9292), (198479, 17671),]
151 self.failUnlessEqual(len(g.summary), 1)
153 summary_lane = g.summary[0][i]
154 self.failUnlessEqual(summary_lane.cluster, clusters[i])
155 self.failUnlessEqual(summary_lane.lane, i)
157 xml = g.get_elements()
158 # just make sure that element tree can serialize the tree
159 xml_str = ElementTree.tostring(xml)
160 g2 = gerald.Gerald(xml=xml)
162 # do it all again after extracting from the xml file
163 self.failUnlessEqual(g.version, g2.version)
164 self.failUnlessEqual(g.date, g2.date)
165 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
166 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
168 # test lane specific parameters from gerald config file
171 g2_lane = g2.lanes[i]
172 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
173 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
174 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
175 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
177 # test (some) summary elements
178 self.failUnlessEqual(len(g.summary), 1)
180 g_summary = g.summary[0][i]
181 g2_summary = g2.summary[0][i]
182 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
183 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
185 g_eland = g.eland_results
186 g2_eland = g2.eland_results
188 g_results = g_eland[lane]
189 g2_results = g2_eland[lane]
190 self.failUnlessEqual(g_results.reads,
192 if isinstance(g_results, eland.ElandLane):
193 self.failUnlessEqual(len(g_results.mapped_reads),
194 len(g2_results.mapped_reads))
195 for k in g_results.mapped_reads.keys():
196 self.failUnlessEqual(g_results.mapped_reads[k],
197 g2_results.mapped_reads[k])
199 self.failUnlessEqual(len(g_results.match_codes),
200 len(g2_results.match_codes))
201 for k in g_results.match_codes.keys():
202 self.failUnlessEqual(g_results.match_codes[k],
203 g2_results.match_codes[k])
206 def test_eland(self):
207 hg_map = {'Lambda.fa': 'Lambda.fa'}
208 for i in range(1,22):
209 short_name = 'chr%d.fa' % (i,)
210 long_name = 'hg18/chr%d.fa' % (i,)
211 hg_map[short_name] = long_name
213 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
214 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
215 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
217 # I added sequence lanes to the last 2 lanes of this test case
218 lanes = [ SampleKey(lane=i, read=1, sample='s') for i in range(1,7) ]
220 lane = eland_container[key]
221 self.failUnlessEqual(lane.reads, 6)
222 self.failUnlessEqual(lane.sample_name, "s")
223 self.failUnlessEqual(lane.lane_id, key.lane)
224 self.failUnlessEqual(len(lane.mapped_reads), 17)
225 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
226 self.failUnlessEqual(lane.match_codes['U0'], 3)
227 self.failUnlessEqual(lane.match_codes['R0'], 2)
228 self.failUnlessEqual(lane.match_codes['U1'], 1)
229 self.failUnlessEqual(lane.match_codes['R1'], 9)
230 self.failUnlessEqual(lane.match_codes['U2'], 0)
231 self.failUnlessEqual(lane.match_codes['R2'], 12)
232 self.failUnlessEqual(lane.match_codes['NM'], 1)
233 self.failUnlessEqual(lane.match_codes['QC'], 0)
236 lane = eland_container[SampleKey(lane=7, read=1, sample='s')]
237 self.failUnlessEqual(lane.reads, 5)
238 self.failUnlessEqual(lane.sample_name, 's')
239 self.failUnlessEqual(lane.lane_id, 7)
240 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
243 lane = eland_container[SampleKey(lane=8, read=1, sample='s')]
244 self.failUnlessEqual(lane.reads, 3)
245 self.failUnlessEqual(lane.sample_name, 's')
246 self.failUnlessEqual(lane.lane_id, 8)
247 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
249 xml = eland_container.get_elements()
250 # just make sure that element tree can serialize the tree
251 xml_str = ElementTree.tostring(xml)
252 e2 = gerald.ELAND(xml=xml)
254 for key in eland_container:
255 l1 = eland_container[key]
257 self.failUnlessEqual(l1.reads, l2.reads)
258 self.failUnlessEqual(l1.sample_name, l2.sample_name)
259 self.failUnlessEqual(l1.lane_id, l2.lane_id)
260 if isinstance(l1, eland.ElandLane):
261 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
262 self.failUnlessEqual(len(l1.mapped_reads), 17)
263 for k in l1.mapped_reads.keys():
264 self.failUnlessEqual(l1.mapped_reads[k],
267 self.failUnlessEqual(len(l1.match_codes), 9)
268 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
269 for k in l1.match_codes.keys():
270 self.failUnlessEqual(l1.match_codes[k],
272 elif isinstance(l1, eland.SequenceLane):
273 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
275 def test_runfolder(self):
276 runs = runfolder.get_runs(self.runfolder_dir)
278 # do we get the flowcell id from the filename?
279 self.failUnlessEqual(len(runs), 1)
280 name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
281 self.failUnlessEqual(runs[0].name, name)
283 # do we get the flowcell id from the FlowcellId.xml file
284 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
285 runs = runfolder.get_runs(self.runfolder_dir)
286 self.failUnlessEqual(len(runs), 1)
287 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
288 self.failUnlessEqual(runs[0].name, name)
291 xml = r1.get_elements()
292 xml_str = ElementTree.tostring(xml)
294 r2 = runfolder.PipelineRun(xml=xml)
295 self.failUnlessEqual(r1.name, r2.name)
296 self.failIfEqual(r2.image_analysis, None)
297 self.failIfEqual(r2.bustard, None)
298 self.failIfEqual(r2.gerald, None)
302 from unittest2 import TestSuite, defaultTestLoader
304 suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
308 if __name__ == "__main__":
309 from unittest2 import main
310 main(defaultTest="suite")