3 from datetime import datetime, date
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.runfolder import ElementTree
16 from htsworkflow.pipelines.test.simulate_runfolder import *
19 def make_runfolder(obj=None):
21 Make a fake runfolder, attach all the directories to obj if defined
23 # make a fake runfolder directory
24 flowcell_id = 'D07K6ACXX'
25 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
27 runfolder_dir = os.path.join(temp_dir,
28 '110815_SN787_0101_A{0}'.format(flowcell_id))
29 os.mkdir(runfolder_dir)
31 data_dir = os.path.join(runfolder_dir, 'Data')
34 intensities_dir = make_rta_intensities_1_12(data_dir)
36 basecalls_dir = make_rta_basecalls_1_12(intensities_dir)
37 make_matrix_dir_rta_1_12(basecalls_dir)
39 unaligned_dir = os.path.join(runfolder_dir, "Unaligned")
40 os.mkdir(unaligned_dir)
41 make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
42 make_unaligned_config_1_12(unaligned_dir)
44 aligned_dir = os.path.join(runfolder_dir, "Aligned")
46 make_aligned_eland_export(aligned_dir, flowcell_id)
47 make_aligned_config_1_12(aligned_dir)
50 obj.temp_dir = temp_dir
51 obj.runfolder_dir = runfolder_dir
52 obj.data_dir = data_dir
53 obj.image_analysis_dir = intensities_dir
54 obj.bustard_dir = unaligned_dir
55 obj.gerald_dir = aligned_dir
58 class RunfolderTests(unittest.TestCase):
60 Test components of the runfolder processing code
61 which includes firecrest, bustard, and gerald
64 # attaches all the directories to the object passed in
68 shutil.rmtree(self.temp_dir)
70 def test_bustard(self):
71 """Construct a bustard object"""
72 b = bustard.bustard(self.bustard_dir)
73 self.failUnlessEqual(b.version, '1.8.70.0')
74 self.failUnlessEqual(b.date, None)
75 self.failUnlessEqual(b.user, None)
76 self.failUnlessEqual(len(b.phasing), 0)
78 xml = b.get_elements()
79 b2 = bustard.Bustard(xml=xml)
80 self.failUnlessEqual(b.version, b2.version)
81 self.failUnlessEqual(b.date, b2.date )
82 self.failUnlessEqual(b.user, b2.user)
84 def test_gerald(self):
85 # need to update gerald and make tests for it
86 g = gerald.gerald(self.gerald_dir)
88 self.failUnlessEqual(g.version,
89 '@(#) Id: GERALD.pl,v 1.171 2008/05/19 17:36:14 mzerara Exp')
90 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
91 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
92 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
95 # list of genomes, matches what was defined up in
97 # the first None is to offset the genomes list to be 1..9
98 # instead of pythons default 0..8
103 '/g/arabidopsis01222004',
109 # test lane specific parameters from gerald config file
111 cur_lane = g.lanes[i]
112 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
113 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
114 self.failUnlessEqual(cur_lane.read_length, '37')
115 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
117 # I want to be able to use a simple iterator
118 for l in g.lanes.values():
119 self.failUnlessEqual(l.analysis, 'eland_extended')
120 self.failUnlessEqual(l.read_length, '37')
121 self.failUnlessEqual(l.use_bases, 'Y'*37)
123 # test data extracted from summary file
125 (281331, 11169), (203841, 13513),
126 (220889, 15653), (137294, 14666),
127 (129388, 14525), (262092, 10751),
128 (185754, 13503), (233765, 9537),]
130 self.failUnlessEqual(len(g.summary), 1)
132 summary_lane = g.summary[0][i]
133 self.failUnlessEqual(summary_lane.cluster, clusters[i])
134 self.failUnlessEqual(summary_lane.lane, i)
136 xml = g.get_elements()
137 # just make sure that element tree can serialize the tree
138 xml_str = ElementTree.tostring(xml)
139 g2 = gerald.Gerald(xml=xml)
142 # do it all again after extracting from the xml file
143 self.failUnlessEqual(g.version, g2.version)
144 self.failUnlessEqual(g.date, g2.date)
145 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
146 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
148 # test lane specific parameters from gerald config file
151 g2_lane = g2.lanes[i]
152 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
153 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
154 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
155 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
157 # test (some) summary elements
158 self.failUnlessEqual(len(g.summary), 1)
160 g_summary = g.summary[0][i]
161 g2_summary = g2.summary[0][i]
162 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
163 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
165 g_eland = g.eland_results
166 g2_eland = g2.eland_results
167 for lane in g_eland.results[0].keys():
168 g_results = g_eland.results[0][lane]
169 g2_results = g2_eland.results[0][lane]
170 self.failUnlessEqual(g_results.reads,
172 if isinstance(g_results, eland.ElandLane):
173 self.failUnlessEqual(len(g_results.mapped_reads),
174 len(g2_results.mapped_reads))
175 for k in g_results.mapped_reads.keys():
176 self.failUnlessEqual(g_results.mapped_reads[k],
177 g2_results.mapped_reads[k])
179 self.failUnlessEqual(len(g_results.match_codes),
180 len(g2_results.match_codes))
181 for k in g_results.match_codes.keys():
182 self.failUnlessEqual(g_results.match_codes[k],
183 g2_results.match_codes[k])
186 def test_eland(self):
188 hg_map = {'Lambda.fa': 'Lambda.fa'}
189 for i in range(1,22):
190 short_name = 'chr%d.fa' % (i,)
191 long_name = 'hg18/chr%d.fa' % (i,)
192 hg_map[short_name] = long_name
194 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
195 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
196 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
198 # I added sequence lanes to the last 2 lanes of this test case
200 lane = eland_container.results[0][i]
201 self.failUnlessEqual(lane.reads, 6)
202 self.failUnlessEqual(lane.sample_name, "s")
203 self.failUnlessEqual(lane.lane_id, i)
204 self.failUnlessEqual(len(lane.mapped_reads), 17)
205 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
206 self.failUnlessEqual(lane.match_codes['U0'], 3)
207 self.failUnlessEqual(lane.match_codes['R0'], 2)
208 self.failUnlessEqual(lane.match_codes['U1'], 1)
209 self.failUnlessEqual(lane.match_codes['R1'], 9)
210 self.failUnlessEqual(lane.match_codes['U2'], 0)
211 self.failUnlessEqual(lane.match_codes['R2'], 12)
212 self.failUnlessEqual(lane.match_codes['NM'], 1)
213 self.failUnlessEqual(lane.match_codes['QC'], 0)
216 lane = eland_container.results[0][7]
217 self.failUnlessEqual(lane.reads, 5)
218 self.failUnlessEqual(lane.sample_name, 's')
219 self.failUnlessEqual(lane.lane_id, 7)
220 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
223 lane = eland_container.results[0][8]
224 self.failUnlessEqual(lane.reads, 3)
225 self.failUnlessEqual(lane.sample_name, 's')
226 self.failUnlessEqual(lane.lane_id, 8)
227 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
229 xml = eland_container.get_elements()
230 # just make sure that element tree can serialize the tree
231 xml_str = ElementTree.tostring(xml)
232 e2 = gerald.ELAND(xml=xml)
235 l1 = eland_container.results[0][i]
236 l2 = e2.results[0][i]
237 self.failUnlessEqual(l1.reads, l2.reads)
238 self.failUnlessEqual(l1.sample_name, l2.sample_name)
239 self.failUnlessEqual(l1.lane_id, l2.lane_id)
240 if isinstance(l1, eland.ElandLane):
241 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
242 self.failUnlessEqual(len(l1.mapped_reads), 17)
243 for k in l1.mapped_reads.keys():
244 self.failUnlessEqual(l1.mapped_reads[k],
247 self.failUnlessEqual(len(l1.match_codes), 9)
248 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
249 for k in l1.match_codes.keys():
250 self.failUnlessEqual(l1.match_codes[k],
252 elif isinstance(l1, eland.SequenceLane):
253 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
255 def test_runfolder(self):
257 runs = runfolder.get_runs(self.runfolder_dir)
259 # do we get the flowcell id from the filename?
260 self.failUnlessEqual(len(runs), 1)
261 name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
262 self.failUnlessEqual(runs[0].name, name)
264 # do we get the flowcell id from the FlowcellId.xml file
265 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
266 runs = runfolder.get_runs(self.runfolder_dir)
267 self.failUnlessEqual(len(runs), 1)
268 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
269 self.failUnlessEqual(runs[0].name, name)
272 xml = r1.get_elements()
273 xml_str = ElementTree.tostring(xml)
275 r2 = runfolder.PipelineRun(xml=xml)
276 self.failUnlessEqual(r1.name, r2.name)
277 self.failIfEqual(r2.image_analysis, None)
278 self.failIfEqual(r2.bustard, None)
279 self.failIfEqual(r2.gerald, None)
283 return unittest.makeSuite(RunfolderTests,'test')
285 if __name__ == "__main__":
286 #unittest.main(defaultTest="suite")
287 class Test(object): pass
290 print ('path ' + t.runfolder_dir)