2 from __future__ import absolute_import
4 from datetime import datetime, date
8 from unittest import TestCase
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines import ipar
12 from htsworkflow.pipelines import bustard
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines.samplekey import SampleKey
16 from htsworkflow.pipelines import ElementTree
18 from .simulate_runfolder import *
21 def make_runfolder(obj=None):
23 Make a fake runfolder, attach all the directories to obj if defined
25 # make a fake runfolder directory
26 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
28 runfolder_dir = os.path.join(temp_dir,
29 '090608_HWI-EAS229_0117_4286GAAXX')
30 os.mkdir(runfolder_dir)
32 data_dir = os.path.join(runfolder_dir, 'Data')
35 intensities_dir = make_rta_intensities_1460(data_dir)
37 basecalls_dir = make_rta_basecalls_1460(intensities_dir)
39 #make_phasing_params(bustard_dir)
40 #make_bustard_config132(bustard_dir)
42 gerald_dir = os.path.join(basecalls_dir,
43 'GERALD_16-06-2009_diane')
45 make_gerald_config_100(gerald_dir)
46 make_summary_rta160_xml(gerald_dir)
47 make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
48 make_scarf(gerald_dir, lane_list=[7,])
49 make_fastq(gerald_dir, lane_list=[8,])
52 obj.temp_dir = temp_dir
53 obj.runfolder_dir = runfolder_dir
54 obj.data_dir = data_dir
55 obj.image_analysis_dir = intensities_dir
56 obj.bustard_dir = basecalls_dir
57 obj.gerald_dir = gerald_dir
60 class RunfolderTests(TestCase):
62 Test components of the runfolder processing code
63 which includes firecrest, bustard, and gerald
66 # attaches all the directories to the object passed in
70 shutil.rmtree(self.temp_dir)
72 # The only thing different from the previous RTA version is
73 # I'm processing the Summary.xml file
76 def test_gerald(self):
77 # need to update gerald and make tests for it
78 g = gerald.gerald(self.gerald_dir)
80 self.failUnlessEqual(g.software, 'GERALD')
81 self.failUnlessEqual(g.version, '1.171')
82 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
83 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
84 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
87 # list of genomes, matches what was defined up in
89 # the first None is to offset the genomes list to be 1..9
90 # instead of pythons default 0..8
95 '/g/arabidopsis01222004',
101 # test lane specific parameters from gerald config file
103 cur_lane = g.lanes[i]
104 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
105 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
106 self.failUnlessEqual(cur_lane.read_length, '37')
107 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
109 # I want to be able to use a simple iterator
110 for l in g.lanes.values():
111 self.failUnlessEqual(l.analysis, 'eland_extended')
112 self.failUnlessEqual(l.read_length, '37')
113 self.failUnlessEqual(l.use_bases, 'Y'*37)
115 # test data extracted from summary file
117 (281331, 11169), (203841, 13513),
118 (220889, 15653), (137294, 14666),
119 (129388, 14525), (262092, 10751),
120 (185754, 13503), (233765, 9537),]
122 self.failUnlessEqual(len(g.summary), 1)
124 summary_lane = g.summary[0][i]
125 self.failUnlessEqual(summary_lane.cluster, clusters[i])
126 self.failUnlessEqual(summary_lane.lane, i)
128 xml = g.get_elements()
129 # just make sure that element tree can serialize the tree
130 xml_str = ElementTree.tostring(xml)
131 g2 = gerald.Gerald(xml=xml)
133 # do it all again after extracting from the xml file
134 self.failUnlessEqual(g.software, g2.software)
135 self.failUnlessEqual(g.version, g2.version)
136 self.failUnlessEqual(g.date, g2.date)
137 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
138 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
140 # test lane specific parameters from gerald config file
143 g2_lane = g2.lanes[i]
144 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
145 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
146 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
147 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
149 # test (some) summary elements
150 self.failUnlessEqual(len(g.summary), 1)
152 g_summary = g.summary[0][i]
153 g2_summary = g2.summary[0][i]
154 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
155 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
157 g_eland = g.eland_results
158 g2_eland = g2.eland_results
160 g_results = g_eland[key]
161 g2_results = g2_eland[key]
162 self.failUnlessEqual(g_results.reads,
164 if isinstance(g_results, eland.ElandLane):
165 self.failUnlessEqual(len(g_results.mapped_reads),
166 len(g2_results.mapped_reads))
167 for k in g_results.mapped_reads.keys():
168 self.failUnlessEqual(g_results.mapped_reads[k],
169 g2_results.mapped_reads[k])
171 self.failUnlessEqual(len(g_results.match_codes),
172 len(g2_results.match_codes))
173 for k in g_results.match_codes.keys():
174 self.failUnlessEqual(g_results.match_codes[k],
175 g2_results.match_codes[k])
178 def test_eland(self):
179 hg_map = {'Lambda.fa': 'Lambda.fa'}
180 for i in range(1,22):
181 short_name = 'chr%d.fa' % (i,)
182 long_name = 'hg18/chr%d.fa' % (i,)
183 hg_map[short_name] = long_name
185 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
186 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
187 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
189 # I added sequence lanes to the last 2 lanes of this test case
190 keys = [ SampleKey(lane=i, read=1, sample='s') for i in range(1,7)]
192 lane = eland_container[key]
193 self.failUnlessEqual(lane.reads, 6)
194 self.failUnlessEqual(lane.sample_name, "s")
195 self.failUnlessEqual(lane.lane_id, key.lane)
196 self.failUnlessEqual(len(lane.mapped_reads), 17)
197 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
198 self.failUnlessEqual(lane.match_codes['U0'], 3)
199 self.failUnlessEqual(lane.match_codes['R0'], 2)
200 self.failUnlessEqual(lane.match_codes['U1'], 1)
201 self.failUnlessEqual(lane.match_codes['R1'], 9)
202 self.failUnlessEqual(lane.match_codes['U2'], 0)
203 self.failUnlessEqual(lane.match_codes['R2'], 12)
204 self.failUnlessEqual(lane.match_codes['NM'], 1)
205 self.failUnlessEqual(lane.match_codes['QC'], 0)
208 lane = eland_container[SampleKey(lane=7, read=1, sample='s')]
209 self.failUnlessEqual(lane.reads, 5)
210 self.failUnlessEqual(lane.sample_name, 's')
211 self.failUnlessEqual(lane.lane_id, 7)
212 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
215 lane = eland_container[SampleKey(lane=8, read=1, sample='s')]
216 self.failUnlessEqual(lane.reads, 3)
217 self.failUnlessEqual(lane.sample_name, 's')
218 self.failUnlessEqual(lane.lane_id, 8)
219 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
221 xml = eland_container.get_elements()
222 # just make sure that element tree can serialize the tree
223 xml_str = ElementTree.tostring(xml)
224 e2 = gerald.ELAND(xml=xml)
226 for key in eland_container:
227 l1 = eland_container[key]
229 self.failUnlessEqual(l1.reads, l2.reads)
230 self.failUnlessEqual(l1.sample_name, l2.sample_name)
231 self.failUnlessEqual(l1.lane_id, l2.lane_id)
232 if isinstance(l1, eland.ElandLane):
233 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
234 self.failUnlessEqual(len(l1.mapped_reads), 17)
235 for k in l1.mapped_reads.keys():
236 self.failUnlessEqual(l1.mapped_reads[k],
239 self.failUnlessEqual(len(l1.match_codes), 9)
240 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
241 for k in l1.match_codes.keys():
242 self.failUnlessEqual(l1.match_codes[k],
244 elif isinstance(l1, eland.SequenceLane):
245 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
247 def test_runfolder(self):
248 runs = runfolder.get_runs(self.runfolder_dir)
250 # do we get the flowcell id from the filename?
251 self.failUnlessEqual(len(runs), 1)
252 name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
253 self.failUnlessEqual(runs[0].serialization_filename, name)
255 # do we get the flowcell id from the FlowcellId.xml file
256 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
257 runs = runfolder.get_runs(self.runfolder_dir)
258 self.failUnlessEqual(len(runs), 1)
259 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
260 self.failUnlessEqual(runs[0].serialization_filename, name)
262 bustard_dir = os.path.join(self.runfolder_dir, 'Data',
263 'Intensities', 'BaseCalls')
265 xml = r1.get_elements()
266 xml_str = ElementTree.tostring(xml)
267 self.failUnlessEqual(r1.bustard.sequence_format, 'qseq')
268 self.failUnlessEqual(r1.bustard.pathname, bustard_dir)
269 self.failUnlessEqual(r1.gerald.runfolder_name,
270 '090220_HWI-EAS229_0093_30VR0AAXX')
272 r2 = runfolder.PipelineRun(xml=xml)
273 self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
274 self.failIfEqual(r2.image_analysis, None)
275 self.failIfEqual(r2.bustard, None)
276 self.failIfEqual(r2.gerald, None)
280 from unittest import TestSuite, defaultTestLoader
282 suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
286 if __name__ == "__main__":
287 from unittest import main
288 main(defaultTest="suite")