2 from __future__ import absolute_import
4 from datetime import datetime, date
9 from unittest import TestCase
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import eland
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import ipar
15 from htsworkflow.pipelines import runfolder
16 from htsworkflow.pipelines import srf
17 from htsworkflow.pipelines import ElementTree
19 from .simulate_runfolder import *
22 RUN_NAME = '110420_SN787_0069_%s' %( FCID,)
24 def make_runfolder(obj=None):
26 Make a fake runfolder, attach all the directories to obj if defined
28 # make a fake runfolder directory
29 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
31 runfolder_dir = os.path.join(temp_dir, RUN_NAME)
32 os.mkdir(runfolder_dir)
34 data_dir = os.path.join(runfolder_dir, 'Data')
37 intensities_dir = make_rta_intensities_1_10(data_dir)
39 basecalls_dir = make_rta_basecalls_1_10(intensities_dir)
40 make_matrix_dir_rta_1_10(basecalls_dir)
41 make_qseqs(basecalls_dir, ABXX_BASE_CALL_INFO)
43 gerald_dir = os.path.join(basecalls_dir,
44 'GERALD_02-05-2011_diane')
46 make_gerald_config_1_7(gerald_dir)
47 make_summary_casava1_7_xml(gerald_dir)
48 make_eland_export(gerald_dir, lane_list=[4,5,6,7,8])
49 make_fastq(gerald_dir, lane_list=[1,2,3,])
52 obj.temp_dir = temp_dir
53 obj.runfolder_dir = runfolder_dir
54 obj.data_dir = data_dir
55 obj.image_analysis_dir = intensities_dir
56 obj.bustard_dir = basecalls_dir
57 obj.gerald_dir = gerald_dir
60 class RunfolderTests(TestCase):
62 Test components of the runfolder processing code
63 which includes firecrest, bustard, and gerald
66 # attaches all the directories to the object passed in
70 shutil.rmtree(self.temp_dir)
72 def test_bustard(self):
73 """Construct a bustard object"""
74 b = bustard.bustard(self.bustard_dir)
75 self.failUnlessEqual(b.software, 'RTA')
76 self.failUnlessEqual(b.version, '1.10.36.0')
77 self.failUnlessEqual(b.date, None)
78 self.failUnlessEqual(b.user, None)
79 self.failUnlessEqual(len(b.phasing), 0)
81 xml = b.get_elements()
82 b2 = bustard.Bustard(xml=xml)
83 self.failUnlessEqual(b.software, b2.software)
84 self.failUnlessEqual(b.version, b2.version)
85 self.failUnlessEqual(b.date, b2.date )
86 self.failUnlessEqual(b.user, b2.user)
88 def test_gerald(self):
89 # need to update gerald and make tests for it
90 g = gerald.gerald(self.gerald_dir)
92 self.failUnlessEqual(g.software, 'CASAVA')
93 self.failUnlessEqual(g.version, '1.7.0')
94 self.failUnlessEqual(g.date, datetime(2011,5,2,19,19,49))
95 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
96 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
99 # list of genomes, matches what was defined up in
100 # make_gerald_config.
101 # the first None is to offset the genomes list to be 1..9
102 # instead of pythons default 0..8
103 genomes = [None, # placeholder
112 analysis = [None, # placeholder
123 # test lane specific parameters from gerald config file
125 cur_lane = g.lanes[i]
126 self.failUnlessEqual(cur_lane.analysis, analysis[i])
127 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
128 self.failUnlessEqual(cur_lane.read_length, '100')
129 self.failUnlessEqual(cur_lane.use_bases, 'Y'*100+'y'*100)
131 # I want to be able to use a simple iterator
132 for l in g.lanes.values():
133 self.failUnless(l.analysis in ('sequence_pair', 'eland_pair'))
134 self.failUnlessEqual(l.read_length, '100')
135 self.failUnlessEqual(l.use_bases, 'Y'*100+'y'*100)
137 # test data extracted from summary file
139 (1073893, 146344), (1671038, 167728),
140 (5778484, 83123), (2411032, 290873),
141 (2197555, 294381), (2725097, 333724),
142 (2549849, 313056), (2282159, 232709),]
144 self.failUnlessEqual(len(g.summary), 1)
146 summary_lane = g.summary[0][i]
147 self.failUnlessEqual(summary_lane.cluster, clusters[i])
148 self.failUnlessEqual(summary_lane.lane, i)
150 xml = g.get_elements()
151 # just make sure that element tree can serialize the tree
152 xml_str = ElementTree.tostring(xml)
153 g2 = gerald.Gerald(xml=xml)
155 # do it all again after extracting from the xml file
156 self.failUnlessEqual(g.software, g2.software)
157 self.failUnlessEqual(g.version, g2.version)
158 self.failUnlessEqual(g.date, g2.date)
159 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
160 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
162 # test lane specific parameters from gerald config file
165 g2_lane = g2.lanes[i]
166 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
167 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
168 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
169 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
171 # test (some) summary elements
172 self.failUnlessEqual(len(g.summary), 1)
174 g_summary = g.summary[0][i]
175 g2_summary = g2.summary[0][i]
176 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
177 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
179 g_eland = g.eland_results
180 g2_eland = g2.eland_results
182 g_results = g_eland[key]
183 g2_results = g2_eland[key]
184 self.failUnlessEqual(g_results.reads,
186 if isinstance(g_results, eland.ElandLane):
187 self.failUnlessEqual(len(g_results.mapped_reads),
188 len(g2_results.mapped_reads))
189 for k in g_results.mapped_reads.keys():
190 self.failUnlessEqual(g_results.mapped_reads[k],
191 g2_results.mapped_reads[k])
193 self.failUnlessEqual(len(g_results.match_codes),
194 len(g2_results.match_codes))
195 for k in g_results.match_codes.keys():
196 self.failUnlessEqual(g_results.match_codes[k],
197 g2_results.match_codes[k])
200 def test_eland(self):
201 hg_map = {'Lambda.fa': 'Lambda.fa'}
202 for i in range(1,22):
203 short_name = 'chr%d.fa' % (i,)
204 long_name = 'hg18/chr%d.fa' % (i,)
205 hg_map[short_name] = long_name
207 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
208 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
209 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
213 key = eland.SampleKey(lane=i, read=1, sample='s')
214 lane = eland_container[key]
215 self.failUnlessEqual(lane.reads, 3)
216 self.failUnlessEqual(lane.sample_name, 's')
217 self.failUnlessEqual(lane.lane_id, i)
218 self.failUnlessEqual(lane.sequence_type,
219 eland.SequenceLane.FASTQ_TYPE)
221 # I added sequence lanes to the last 2 lanes of this test case
223 key = eland.SampleKey(lane=i, read=1, sample='s')
224 lane = eland_container[key]
225 self.failUnlessEqual(lane.reads, 28)
226 self.failUnlessEqual(lane.sample_name, "s")
227 self.failUnlessEqual(lane.lane_id, i)
228 self.failUnlessEqual(len(lane.mapped_reads), 7)
229 self.failUnlessEqual(lane.mapped_reads['hg18/chr7.fa'], 4)
230 self.failUnlessEqual(lane.match_codes['U0'], 1)
231 self.failUnlessEqual(lane.match_codes['R0'], 20)
232 self.failUnlessEqual(lane.match_codes['U1'], 1)
233 self.failUnlessEqual(lane.match_codes['R1'], 2)
234 self.failUnlessEqual(lane.match_codes['U2'], 11)
235 self.failUnlessEqual(lane.match_codes['R2'], 0)
236 self.failUnlessEqual(lane.match_codes['NM'], 2)
237 self.failUnlessEqual(lane.match_codes['QC'], 9)
240 xml = eland_container.get_elements()
241 # just make sure that element tree can serialize the tree
242 xml_str = ElementTree.tostring(xml)
243 e2 = gerald.ELAND(xml=xml)
245 for key in eland_container:
246 l1 = eland_container[key]
248 self.failUnlessEqual(l1.reads, l2.reads)
249 self.failUnlessEqual(l1.sample_name, l2.sample_name)
250 self.failUnlessEqual(l1.lane_id, l2.lane_id)
251 if isinstance(l1, eland.ElandLane):
252 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
253 self.failUnlessEqual(len(l1.mapped_reads), 7)
254 for k in l1.mapped_reads.keys():
255 self.failUnlessEqual(l1.mapped_reads[k],
258 self.failUnlessEqual(len(l1.match_codes), 9)
259 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
260 for k in l1.match_codes.keys():
261 self.failUnlessEqual(l1.match_codes[k],
263 elif isinstance(l1, eland.SequenceLane):
264 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
266 def test_runfolder(self):
267 runs = runfolder.get_runs(self.runfolder_dir)
269 # do we get the flowcell id from the filename?
270 self.failUnlessEqual(len(runs), 1)
271 name = 'run_%s_%s.xml' % ( FCID, date.today().strftime('%Y-%m-%d'),)
272 self.failUnlessEqual(runs[0].serialization_filename, name)
274 # do we get the flowcell id from the FlowcellId.xml file
275 make_flowcell_id(self.runfolder_dir, FCID)
276 runs = runfolder.get_runs(self.runfolder_dir)
277 self.failUnlessEqual(len(runs), 1)
278 name = 'run_%s_%s.xml' % ( FCID, date.today().strftime('%Y-%m-%d'),)
279 self.failUnlessEqual(runs[0].serialization_filename, name)
282 xml = r1.get_elements()
283 xml_str = ElementTree.tostring(xml)
285 r2 = runfolder.PipelineRun(xml=xml)
286 self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
287 self.failIfEqual(r2.image_analysis, None)
288 self.failIfEqual(r2.bustard, None)
289 self.failIfEqual(r2.gerald, None)
292 pattern_list = srf.create_qseq_patterns(self.bustard_dir)
293 self.assertEqual(len(pattern_list), 1)
294 pattern = pattern_list[0][1] % (1,)
296 glob.fnmatch.fnmatch("s_1_1_1101_qseq.txt", pattern),
299 glob.fnmatch.fnmatch("s_1_0001_qseq.txt", pattern),
304 from unittest import TestSuite, defaultTestLoader
306 suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
310 if __name__ == "__main__":
311 from unittest import main
312 main(defaultTest="suite")