3 from datetime import datetime, date
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.runfolder import ElementTree
16 from htsworkflow.pipelines.test.simulate_runfolder import *
19 def make_runfolder(obj=None):
21 Make a fake runfolder, attach all the directories to obj if defined
23 # make a fake runfolder directory
24 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
26 runfolder_dir = os.path.join(temp_dir,
27 '090608_HWI-EAS229_0117_4286GAAXX')
28 os.mkdir(runfolder_dir)
30 data_dir = os.path.join(runfolder_dir, 'Data')
33 intensities_dir = make_rta_intensities_1460(data_dir)
35 basecalls_dir = make_rta_basecalls_1460(intensities_dir)
37 #make_phasing_params(bustard_dir)
38 #make_bustard_config132(bustard_dir)
40 gerald_dir = os.path.join(basecalls_dir,
41 'GERALD_16-06-2009_diane')
43 make_gerald_config_100(gerald_dir)
44 make_summary_ipar130_htm(gerald_dir)
45 make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
46 make_scarf(gerald_dir, lane_list=[7,])
47 make_fastq(gerald_dir, lane_list=[8,])
50 obj.temp_dir = temp_dir
51 obj.runfolder_dir = runfolder_dir
52 obj.data_dir = data_dir
53 obj.image_analysis_dir = intensities_dir
54 obj.bustard_dir = basecalls_dir
55 obj.gerald_dir = gerald_dir
58 class RunfolderTests(unittest.TestCase):
60 Test components of the runfolder processing code
61 which includes firecrest, bustard, and gerald
64 # attaches all the directories to the object passed in
68 shutil.rmtree(self.temp_dir)
72 Construct a firecrest object
74 i = ipar.ipar(self.image_analysis_dir)
75 self.failUnlessEqual(i.version, '1.4.6.0')
76 self.failUnlessEqual(i.start, 1)
77 self.failUnlessEqual(i.stop, 38)
79 xml = i.get_elements()
80 # just make sure that element tree can serialize the tree
81 xml_str = ElementTree.tostring(xml)
83 i2 = ipar.IPAR(xml=xml)
84 self.failUnlessEqual(i.version, i2.version)
85 self.failUnlessEqual(i.start, i2.start)
86 self.failUnlessEqual(i.stop, i2.stop)
87 self.failUnlessEqual(i.date, i2.date)
89 def test_bustard(self):
91 construct a bustard object
93 b = bustard.bustard(self.bustard_dir)
94 self.failUnlessEqual(b.version, '1.4.6.0')
95 self.failUnlessEqual(b.date, None)
96 self.failUnlessEqual(b.user, None)
97 self.failUnlessEqual(len(b.phasing), 0)
99 xml = b.get_elements()
100 b2 = bustard.Bustard(xml=xml)
101 self.failUnlessEqual(b.version, b2.version)
102 self.failUnlessEqual(b.date, b2.date )
103 self.failUnlessEqual(b.user, b2.user)
105 def test_gerald(self):
106 # need to update gerald and make tests for it
107 g = gerald.gerald(self.gerald_dir)
109 self.failUnlessEqual(g.version,
110 '@(#) Id: GERALD.pl,v 1.171 2008/05/19 17:36:14 mzerara Exp')
111 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
112 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
113 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
116 # list of genomes, matches what was defined up in
117 # make_gerald_config.
118 # the first None is to offset the genomes list to be 1..9
119 # instead of pythons default 0..8
124 '/g/arabidopsis01222004',
130 # test lane specific parameters from gerald config file
132 cur_lane = g.lanes[i]
133 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
134 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
135 self.failUnlessEqual(cur_lane.read_length, '37')
136 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
138 # I want to be able to use a simple iterator
139 for l in g.lanes.values():
140 self.failUnlessEqual(l.analysis, 'eland_extended')
141 self.failUnlessEqual(l.read_length, '37')
142 self.failUnlessEqual(l.use_bases, 'Y'*37)
144 # test data extracted from summary file
146 (126910, 4300), (165739, 6792),
147 (196565, 8216), (153897, 8501),
148 (135536, 3908), (154083, 9315),
149 (159991, 9292), (198479, 17671),]
151 self.failUnlessEqual(len(g.summary), 1)
153 summary_lane = g.summary[0][i]
154 self.failUnlessEqual(summary_lane.cluster, clusters[i])
155 self.failUnlessEqual(summary_lane.lane, i)
157 xml = g.get_elements()
158 # just make sure that element tree can serialize the tree
159 xml_str = ElementTree.tostring(xml)
160 g2 = gerald.Gerald(xml=xml)
163 # do it all again after extracting from the xml file
164 self.failUnlessEqual(g.version, g2.version)
165 self.failUnlessEqual(g.date, g2.date)
166 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
167 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
169 # test lane specific parameters from gerald config file
172 g2_lane = g2.lanes[i]
173 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
174 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
175 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
176 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
178 # test (some) summary elements
179 self.failUnlessEqual(len(g.summary), 1)
181 g_summary = g.summary[0][i]
182 g2_summary = g2.summary[0][i]
183 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
184 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
186 g_eland = g.eland_results
187 g2_eland = g2.eland_results
188 for lane in g_eland.results[0].keys():
189 g_results = g_eland.results[0][lane]
190 g2_results = g2_eland.results[0][lane]
191 self.failUnlessEqual(g_results.reads,
193 if isinstance(g_results, eland.ElandLane):
194 self.failUnlessEqual(len(g_results.mapped_reads),
195 len(g2_results.mapped_reads))
196 for k in g_results.mapped_reads.keys():
197 self.failUnlessEqual(g_results.mapped_reads[k],
198 g2_results.mapped_reads[k])
200 self.failUnlessEqual(len(g_results.match_codes),
201 len(g2_results.match_codes))
202 for k in g_results.match_codes.keys():
203 self.failUnlessEqual(g_results.match_codes[k],
204 g2_results.match_codes[k])
207 def test_eland(self):
209 hg_map = {'Lambda.fa': 'Lambda.fa'}
210 for i in range(1,22):
211 short_name = 'chr%d.fa' % (i,)
212 long_name = 'hg18/chr%d.fa' % (i,)
213 hg_map[short_name] = long_name
215 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
216 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
217 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
219 # I added sequence lanes to the last 2 lanes of this test case
221 lane = eland_container.results[0][i]
222 self.failUnlessEqual(lane.reads, 6)
223 self.failUnlessEqual(lane.sample_name, "s")
224 self.failUnlessEqual(lane.lane_id, i)
225 self.failUnlessEqual(len(lane.mapped_reads), 17)
226 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
227 self.failUnlessEqual(lane.match_codes['U0'], 3)
228 self.failUnlessEqual(lane.match_codes['R0'], 2)
229 self.failUnlessEqual(lane.match_codes['U1'], 1)
230 self.failUnlessEqual(lane.match_codes['R1'], 9)
231 self.failUnlessEqual(lane.match_codes['U2'], 0)
232 self.failUnlessEqual(lane.match_codes['R2'], 12)
233 self.failUnlessEqual(lane.match_codes['NM'], 1)
234 self.failUnlessEqual(lane.match_codes['QC'], 0)
237 lane = eland_container.results[0][7]
238 self.failUnlessEqual(lane.reads, 5)
239 self.failUnlessEqual(lane.sample_name, 's')
240 self.failUnlessEqual(lane.lane_id, 7)
241 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
244 lane = eland_container.results[0][8]
245 self.failUnlessEqual(lane.reads, 3)
246 self.failUnlessEqual(lane.sample_name, 's')
247 self.failUnlessEqual(lane.lane_id, 8)
248 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
250 xml = eland_container.get_elements()
251 # just make sure that element tree can serialize the tree
252 xml_str = ElementTree.tostring(xml)
253 e2 = gerald.ELAND(xml=xml)
256 l1 = eland_container.results[0][i]
257 l2 = e2.results[0][i]
258 self.failUnlessEqual(l1.reads, l2.reads)
259 self.failUnlessEqual(l1.sample_name, l2.sample_name)
260 self.failUnlessEqual(l1.lane_id, l2.lane_id)
261 if isinstance(l1, eland.ElandLane):
262 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
263 self.failUnlessEqual(len(l1.mapped_reads), 17)
264 for k in l1.mapped_reads.keys():
265 self.failUnlessEqual(l1.mapped_reads[k],
268 self.failUnlessEqual(len(l1.match_codes), 9)
269 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
270 for k in l1.match_codes.keys():
271 self.failUnlessEqual(l1.match_codes[k],
273 elif isinstance(l1, eland.SequenceLane):
274 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
276 def test_runfolder(self):
278 runs = runfolder.get_runs(self.runfolder_dir)
280 # do we get the flowcell id from the filename?
281 self.failUnlessEqual(len(runs), 1)
282 name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
283 self.failUnlessEqual(runs[0].name, name)
285 # do we get the flowcell id from the FlowcellId.xml file
286 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
287 runs = runfolder.get_runs(self.runfolder_dir)
288 self.failUnlessEqual(len(runs), 1)
289 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
290 self.failUnlessEqual(runs[0].name, name)
293 xml = r1.get_elements()
294 xml_str = ElementTree.tostring(xml)
296 r2 = runfolder.PipelineRun(xml=xml)
297 self.failUnlessEqual(r1.name, r2.name)
298 self.failIfEqual(r2.image_analysis, None)
299 self.failIfEqual(r2.bustard, None)
300 self.failIfEqual(r2.gerald, None)
304 return unittest.makeSuite(RunfolderTests,'test')
306 if __name__ == "__main__":
307 unittest.main(defaultTest="suite")