3 from datetime import datetime, date
7 from unittest2 import TestCase
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.samplekey import SampleKey
15 from htsworkflow.pipelines import ElementTree
17 from htsworkflow.pipelines.test.simulate_runfolder import *
20 def make_runfolder(obj=None):
22 Make a fake runfolder, attach all the directories to obj if defined
24 # make a fake runfolder directory
25 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
27 flowcell_id = '4286GAAXX'
28 runfolder_dir = os.path.join(
30 '090608_HWI-EAS229_0117_{0}'.format(flowcell_id))
31 os.mkdir(runfolder_dir)
33 data_dir = os.path.join(runfolder_dir, 'Data')
36 intensities_dir = make_rta_intensities_1870(data_dir)
38 basecalls_dir = make_rta_basecalls_1870(intensities_dir)
39 make_matrix_dir_rta160(basecalls_dir)
41 gerald_dir = os.path.join(basecalls_dir,
42 'GERALD_07-09-2010_diane')
44 make_gerald_config_100(gerald_dir)
45 make_summary_rta160_xml(gerald_dir)
46 make_eland_export(gerald_dir, lane_list=[1,2,3,4,5,6,])
47 make_scarf(gerald_dir, lane_list=[7,])
48 make_fastq(gerald_dir, lane_list=[8,])
51 obj.flowcell_id = flowcell_id
52 obj.temp_dir = temp_dir
53 obj.runfolder_dir = runfolder_dir
54 obj.data_dir = data_dir
55 obj.image_analysis_dir = intensities_dir
56 obj.bustard_dir = basecalls_dir
57 obj.gerald_dir = gerald_dir
60 class RunfolderTests(TestCase):
62 Test components of the runfolder processing code
63 which includes firecrest, bustard, and gerald
66 # attaches all the directories to the object passed in
70 shutil.rmtree(self.temp_dir)
72 def test_bustard(self):
73 """Construct a bustard object"""
74 b = bustard.bustard(self.bustard_dir)
75 self.failUnlessEqual(b.software, 'RTA')
76 self.failUnlessEqual(b.version, '1.8.70.0')
77 self.failUnlessEqual(b.date, None)
78 self.failUnlessEqual(b.user, None)
79 self.failUnlessEqual(len(b.phasing), 0)
81 xml = b.get_elements()
82 b2 = bustard.Bustard(xml=xml)
83 self.failUnlessEqual(b.software, b2.software)
84 self.failUnlessEqual(b.version, b2.version)
85 self.failUnlessEqual(b.date, b2.date )
86 self.failUnlessEqual(b.user, b2.user)
88 def test_gerald(self):
89 # need to update gerald and make tests for it
90 g = gerald.gerald(self.gerald_dir)
92 self.failUnlessEqual(g.software, 'GERALD')
93 self.failUnlessEqual(g.version, '1.171')
94 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
95 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
96 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
99 # list of genomes, matches what was defined up in
100 # make_gerald_config.
101 # the first None is to offset the genomes list to be 1..9
102 # instead of pythons default 0..8
107 '/g/arabidopsis01222004',
113 # test lane specific parameters from gerald config file
115 cur_lane = g.lanes[i]
116 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
117 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
118 self.failUnlessEqual(cur_lane.read_length, '37')
119 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
121 # I want to be able to use a simple iterator
122 for l in g.lanes.values():
123 self.failUnlessEqual(l.analysis, 'eland_extended')
124 self.failUnlessEqual(l.read_length, '37')
125 self.failUnlessEqual(l.use_bases, 'Y'*37)
127 # test data extracted from summary file
129 (281331, 11169), (203841, 13513),
130 (220889, 15653), (137294, 14666),
131 (129388, 14525), (262092, 10751),
132 (185754, 13503), (233765, 9537),]
134 self.failUnlessEqual(len(g.summary), 1)
136 summary_lane = g.summary[0][i]
137 self.failUnlessEqual(summary_lane.cluster, clusters[i])
138 self.failUnlessEqual(summary_lane.lane, i)
140 xml = g.get_elements()
141 # just make sure that element tree can serialize the tree
142 xml_str = ElementTree.tostring(xml)
143 g2 = gerald.Gerald(xml=xml)
145 # do it all again after extracting from the xml file
146 self.failUnlessEqual(g.software, g2.software)
147 self.failUnlessEqual(g.version, g2.version)
148 self.failUnlessEqual(g.date, g2.date)
149 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
150 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
152 # test lane specific parameters from gerald config file
155 g2_lane = g2.lanes[i]
156 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
157 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
158 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
159 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
161 # test (some) summary elements
162 self.failUnlessEqual(len(g.summary), 1)
164 g_summary = g.summary[0][i]
165 g2_summary = g2.summary[0][i]
166 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
167 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
169 g_eland = g.eland_results
170 g2_eland = g2.eland_results
172 g_results = g_eland[lane]
173 g2_results = g2_eland[lane]
174 self.failUnlessEqual(g_results.reads,
176 if isinstance(g_results, eland.ElandLane):
177 self.failUnlessEqual(len(g_results.mapped_reads),
178 len(g2_results.mapped_reads))
179 for k in g_results.mapped_reads.keys():
180 self.failUnlessEqual(g_results.mapped_reads[k],
181 g2_results.mapped_reads[k])
183 self.failUnlessEqual(len(g_results.match_codes),
184 len(g2_results.match_codes))
185 for k in g_results.match_codes.keys():
186 self.failUnlessEqual(g_results.match_codes[k],
187 g2_results.match_codes[k])
190 def test_eland(self):
191 hg_map = {'Lambda.fa': 'Lambda.fa'}
192 for i in range(1,22):
193 short_name = 'chr%d.fa' % (i,)
194 long_name = 'hg18/chr%d.fa' % (i,)
195 hg_map[short_name] = long_name
197 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
198 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
199 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
201 # I added sequence lanes to the last 2 lanes of this test case
202 keys = [SampleKey(lane=i, read=1, sample='s') for i in range(1,7)]
204 lane = eland_container[key]
205 self.failUnlessEqual(lane.reads, 28)
206 self.failUnlessEqual(lane.sample_name, "s")
207 self.failUnlessEqual(lane.lane_id, key.lane)
208 self.failUnlessEqual(len(lane.mapped_reads), 7)
209 self.failUnlessEqual(lane.mapped_reads['hg18/chr7.fa'], 4)
210 self.failUnlessEqual(lane.mapped_reads['Lambda_1-1_11936nts.fa'], 1)
211 self.failUnlessEqual(lane.match_codes['U0'], 1)
212 self.failUnlessEqual(lane.match_codes['R0'], 20)
213 self.failUnlessEqual(lane.match_codes['U1'], 1)
214 self.failUnlessEqual(lane.match_codes['R1'], 2)
215 self.failUnlessEqual(lane.match_codes['U2'], 11)
216 self.failUnlessEqual(lane.match_codes['R2'], 0)
217 self.failUnlessEqual(lane.match_codes['NM'], 2)
218 self.failUnlessEqual(lane.match_codes['QC'], 9)
221 lane = eland_container[SampleKey(lane=7, read=1, sample='s')]
222 self.failUnlessEqual(lane.reads, 5)
223 self.failUnlessEqual(lane.sample_name, 's')
224 self.failUnlessEqual(lane.lane_id, 7)
225 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
228 lane = eland_container[SampleKey(lane=8, read=1, sample='s')]
229 self.failUnlessEqual(lane.reads, 3)
230 self.failUnlessEqual(lane.sample_name, 's')
231 self.failUnlessEqual(lane.lane_id, 8)
232 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
234 xml = eland_container.get_elements()
235 # just make sure that element tree can serialize the tree
236 xml_str = ElementTree.tostring(xml)
237 e2 = gerald.ELAND(xml=xml)
239 for key in eland_container:
240 l1 = eland_container[key]
242 self.failUnlessEqual(l1.reads, l2.reads)
243 self.failUnlessEqual(l1.sample_name, l2.sample_name)
244 self.failUnlessEqual(l1.lane_id, l2.lane_id)
245 if isinstance(l1, eland.ElandLane):
246 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
247 self.failUnlessEqual(len(l1.mapped_reads), 7)
248 for k in l1.mapped_reads.keys():
249 self.failUnlessEqual(l1.mapped_reads[k],
252 self.failUnlessEqual(len(l1.match_codes), 9)
253 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
254 for k in l1.match_codes.keys():
255 self.failUnlessEqual(l1.match_codes[k],
257 elif isinstance(l1, eland.SequenceLane):
258 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
260 def test_runfolder(self):
261 runs = runfolder.get_runs(self.runfolder_dir)
263 # do we get the flowcell id from the filename?
264 self.failUnlessEqual(len(runs), 1)
265 name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
266 self.failUnlessEqual(runs[0].name, name)
268 # do we get the flowcell id from the FlowcellId.xml file
269 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
270 runs = runfolder.get_runs(self.runfolder_dir)
271 self.failUnlessEqual(len(runs), 1)
272 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
273 self.failUnlessEqual(runs[0].name, name)
276 xml = r1.get_elements()
277 xml_str = ElementTree.tostring(xml)
279 r2 = runfolder.PipelineRun(xml=xml)
280 self.failUnlessEqual(r1.name, r2.name)
281 self.failIfEqual(r2.image_analysis, None)
282 self.failIfEqual(r2.bustard, None)
283 self.failIfEqual(r2.gerald, None)
287 from unittest2 import TestSuite, defaultTestLoader
289 suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
293 if __name__ == "__main__":
294 from unittest2 import main
295 main(defaultTest="suite")