2 from __future__ import absolute_import
4 from datetime import datetime, date
8 from unittest import TestCase
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines import ipar
12 from htsworkflow.pipelines import bustard
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines.samplekey import SampleKey
16 from htsworkflow.pipelines import ElementTree
18 from .simulate_runfolder import *
21 def make_runfolder(obj=None):
23 Make a fake runfolder, attach all the directories to obj if defined
25 # make a fake runfolder directory
26 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
28 flowcell_id = '4286GAAXX'
29 runfolder = '090608_HWI-EAS229_0117_{0}'.format(flowcell_id)
30 runfolder_dir = os.path.join(temp_dir, runfolder)
31 os.mkdir(runfolder_dir)
33 data_dir = os.path.join(runfolder_dir, 'Data')
36 intensities_dir = make_rta_intensities_1870(data_dir)
38 basecalls_dir = make_rta_basecalls_1870(intensities_dir)
39 make_matrix_dir_rta160(basecalls_dir)
41 gerald_dir = os.path.join(basecalls_dir,
42 'GERALD_07-09-2010_diane')
44 make_gerald_config_100(gerald_dir)
45 make_summary_rta160_xml(gerald_dir)
46 make_eland_export(gerald_dir, lane_list=[1,2,3,4,5,6,])
47 make_scarf(gerald_dir, lane_list=[7,])
48 make_fastq(gerald_dir, lane_list=[8,])
51 obj.flowcell_id = flowcell_id
52 obj.temp_dir = temp_dir
53 obj.runfolder = runfolder
54 obj.runfolder_dir = runfolder_dir
55 obj.data_dir = data_dir
56 obj.image_analysis_dir = intensities_dir
57 obj.bustard_dir = basecalls_dir
58 obj.gerald_dir = gerald_dir
61 class RunfolderTests(TestCase):
63 Test components of the runfolder processing code
64 which includes firecrest, bustard, and gerald
67 # attaches all the directories to the object passed in
71 shutil.rmtree(self.temp_dir)
73 def test_bustard(self):
74 """Construct a bustard object"""
75 b = bustard.bustard(self.bustard_dir)
76 self.failUnlessEqual(b.software, 'RTA')
77 self.failUnlessEqual(b.version, '1.8.70.0')
78 self.failUnlessEqual(b.date, None)
79 self.failUnlessEqual(b.user, None)
80 self.failUnlessEqual(len(b.phasing), 0)
82 xml = b.get_elements()
83 b2 = bustard.Bustard(xml=xml)
84 self.failUnlessEqual(b.software, b2.software)
85 self.failUnlessEqual(b.version, b2.version)
86 self.failUnlessEqual(b.date, b2.date )
87 self.failUnlessEqual(b.user, b2.user)
89 def test_gerald(self):
90 # need to update gerald and make tests for it
91 g = gerald.gerald(self.gerald_dir)
93 self.failUnlessEqual(g.software, 'GERALD')
94 self.failUnlessEqual(g.version, '1.171')
95 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
96 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
97 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
100 # list of genomes, matches what was defined up in
101 # make_gerald_config.
102 # the first None is to offset the genomes list to be 1..9
103 # instead of pythons default 0..8
108 '/g/arabidopsis01222004',
114 # test lane specific parameters from gerald config file
116 cur_lane = g.lanes[i]
117 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
118 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
119 self.failUnlessEqual(cur_lane.read_length, '37')
120 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
122 # I want to be able to use a simple iterator
123 for l in g.lanes.values():
124 self.failUnlessEqual(l.analysis, 'eland_extended')
125 self.failUnlessEqual(l.read_length, '37')
126 self.failUnlessEqual(l.use_bases, 'Y'*37)
128 # test data extracted from summary file
130 (281331, 11169), (203841, 13513),
131 (220889, 15653), (137294, 14666),
132 (129388, 14525), (262092, 10751),
133 (185754, 13503), (233765, 9537),]
135 self.failUnlessEqual(len(g.summary), 1)
137 summary_lane = g.summary[0][i]
138 self.failUnlessEqual(summary_lane.cluster, clusters[i])
139 self.failUnlessEqual(summary_lane.lane, i)
141 xml = g.get_elements()
142 # just make sure that element tree can serialize the tree
143 xml_str = ElementTree.tostring(xml)
144 g2 = gerald.Gerald(xml=xml)
146 # do it all again after extracting from the xml file
147 self.failUnlessEqual(g.software, g2.software)
148 self.failUnlessEqual(g.version, g2.version)
149 self.failUnlessEqual(g.date, g2.date)
150 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
151 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
153 # test lane specific parameters from gerald config file
156 g2_lane = g2.lanes[i]
157 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
158 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
159 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
160 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
162 # test (some) summary elements
163 self.failUnlessEqual(len(g.summary), 1)
165 g_summary = g.summary[0][i]
166 g2_summary = g2.summary[0][i]
167 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
168 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
170 g_eland = g.eland_results
171 g2_eland = g2.eland_results
173 g_results = g_eland[lane]
174 g2_results = g2_eland[lane]
175 self.failUnlessEqual(g_results.reads,
177 if isinstance(g_results, eland.ElandLane):
178 self.failUnlessEqual(len(g_results.mapped_reads),
179 len(g2_results.mapped_reads))
180 for k in g_results.mapped_reads.keys():
181 self.failUnlessEqual(g_results.mapped_reads[k],
182 g2_results.mapped_reads[k])
184 self.failUnlessEqual(len(g_results.match_codes),
185 len(g2_results.match_codes))
186 for k in g_results.match_codes.keys():
187 self.failUnlessEqual(g_results.match_codes[k],
188 g2_results.match_codes[k])
191 def test_eland(self):
192 hg_map = {'Lambda.fa': 'Lambda.fa'}
193 for i in range(1,22):
194 short_name = 'chr%d.fa' % (i,)
195 long_name = 'hg18/chr%d.fa' % (i,)
196 hg_map[short_name] = long_name
198 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
199 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
200 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
202 # I added sequence lanes to the last 2 lanes of this test case
203 keys = [SampleKey(lane=i, read=1, sample='s') for i in range(1,7)]
205 lane = eland_container[key]
206 self.failUnlessEqual(lane.reads, 28)
207 self.failUnlessEqual(lane.sample_name, "s")
208 self.failUnlessEqual(lane.lane_id, key.lane)
209 self.failUnlessEqual(len(lane.mapped_reads), 7)
210 self.failUnlessEqual(lane.mapped_reads['hg18/chr7.fa'], 4)
211 self.failUnlessEqual(lane.mapped_reads['Lambda_1-1_11936nts.fa'], 1)
212 self.failUnlessEqual(lane.match_codes['U0'], 1)
213 self.failUnlessEqual(lane.match_codes['R0'], 20)
214 self.failUnlessEqual(lane.match_codes['U1'], 1)
215 self.failUnlessEqual(lane.match_codes['R1'], 2)
216 self.failUnlessEqual(lane.match_codes['U2'], 11)
217 self.failUnlessEqual(lane.match_codes['R2'], 0)
218 self.failUnlessEqual(lane.match_codes['NM'], 2)
219 self.failUnlessEqual(lane.match_codes['QC'], 9)
222 lane = eland_container[SampleKey(lane=7, read=1, sample='s')]
223 self.failUnlessEqual(lane.reads, 5)
224 self.failUnlessEqual(lane.sample_name, 's')
225 self.failUnlessEqual(lane.lane_id, 7)
226 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
229 lane = eland_container[SampleKey(lane=8, read=1, sample='s')]
230 self.failUnlessEqual(lane.reads, 3)
231 self.failUnlessEqual(lane.sample_name, 's')
232 self.failUnlessEqual(lane.lane_id, 8)
233 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
235 xml = eland_container.get_elements()
236 # just make sure that element tree can serialize the tree
237 xml_str = ElementTree.tostring(xml)
238 e2 = gerald.ELAND(xml=xml)
240 for key in eland_container:
241 l1 = eland_container[key]
243 self.failUnlessEqual(l1.reads, l2.reads)
244 self.failUnlessEqual(l1.sample_name, l2.sample_name)
245 self.failUnlessEqual(l1.lane_id, l2.lane_id)
246 if isinstance(l1, eland.ElandLane):
247 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
248 self.failUnlessEqual(len(l1.mapped_reads), 7)
249 for k in l1.mapped_reads.keys():
250 self.failUnlessEqual(l1.mapped_reads[k],
253 self.failUnlessEqual(len(l1.match_codes), 9)
254 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
255 for k in l1.match_codes.keys():
256 self.failUnlessEqual(l1.match_codes[k],
258 elif isinstance(l1, eland.SequenceLane):
259 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
261 def test_runfolder(self):
262 runs = runfolder.get_runs(self.runfolder_dir)
264 # do we get the flowcell id from the filename?
265 self.failUnlessEqual(len(runs), 1)
266 name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
267 self.failUnlessEqual(runs[0].serialization_filename, name)
268 self.assertEqual(runs[0].runfolder_name, '090220_HWI-EAS229_0093_30VR0AAXX')
270 # do we get the flowcell id from the FlowcellId.xml file
271 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
272 runs = runfolder.get_runs(self.runfolder_dir)
273 self.failUnlessEqual(len(runs), 1)
274 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
275 self.failUnlessEqual(runs[0].serialization_filename, name)
280 xml = r1.get_elements()
281 xml_str = ElementTree.tostring(xml)
283 r2 = runfolder.PipelineRun(xml=xml)
284 self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
285 self.failIfEqual(r2.image_analysis, None)
286 self.failIfEqual(r2.bustard, None)
287 self.failIfEqual(r2.gerald, None)
291 from unittest import TestSuite, defaultTestLoader
293 suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
297 if __name__ == "__main__":
298 from unittest import main
299 main(defaultTest="suite")