3 from datetime import datetime, date
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.runfolder import ElementTree
16 from htsworkflow.pipelines.test.simulate_runfolder import *
19 def make_runfolder(obj=None):
21 Make a fake runfolder, attach all the directories to obj if defined
23 # make a fake runfolder directory
24 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
26 runfolder_dir = os.path.join(temp_dir,
27 '090608_HWI-EAS229_0117_4286GAAXX')
28 os.mkdir(runfolder_dir)
30 data_dir = os.path.join(runfolder_dir, 'Data')
33 intensities_dir = make_rta_intensities_1870(data_dir)
35 basecalls_dir = make_rta_basecalls_1870(intensities_dir)
36 make_matrix_dir_rta160(basecalls_dir)
38 gerald_dir = os.path.join(basecalls_dir,
39 'GERALD_07-09-2010_diane')
41 make_gerald_config_100(gerald_dir)
42 make_summary_rta160_xml(gerald_dir)
43 make_eland_export(gerald_dir, lane_list=[1,2,3,4,5,6,])
44 make_scarf(gerald_dir, lane_list=[7,])
45 make_fastq(gerald_dir, lane_list=[8,])
48 obj.temp_dir = temp_dir
49 obj.runfolder_dir = runfolder_dir
50 obj.data_dir = data_dir
51 obj.image_analysis_dir = intensities_dir
52 obj.bustard_dir = basecalls_dir
53 obj.gerald_dir = gerald_dir
56 class RunfolderTests(unittest.TestCase):
58 Test components of the runfolder processing code
59 which includes firecrest, bustard, and gerald
62 # attaches all the directories to the object passed in
66 shutil.rmtree(self.temp_dir)
68 def test_bustard(self):
69 """Construct a bustard object"""
70 b = bustard.bustard(self.bustard_dir)
71 self.failUnlessEqual(b.version, '1.8.70.0')
72 self.failUnlessEqual(b.date, None)
73 self.failUnlessEqual(b.user, None)
74 self.failUnlessEqual(len(b.phasing), 0)
76 xml = b.get_elements()
77 b2 = bustard.Bustard(xml=xml)
78 self.failUnlessEqual(b.version, b2.version)
79 self.failUnlessEqual(b.date, b2.date )
80 self.failUnlessEqual(b.user, b2.user)
82 def test_gerald(self):
83 # need to update gerald and make tests for it
84 g = gerald.gerald(self.gerald_dir)
86 self.failUnlessEqual(g.version,
87 '@(#) Id: GERALD.pl,v 1.171 2008/05/19 17:36:14 mzerara Exp')
88 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
89 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
90 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
93 # list of genomes, matches what was defined up in
95 # the first None is to offset the genomes list to be 1..9
96 # instead of pythons default 0..8
101 '/g/arabidopsis01222004',
107 # test lane specific parameters from gerald config file
109 cur_lane = g.lanes[i]
110 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
111 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
112 self.failUnlessEqual(cur_lane.read_length, '37')
113 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
115 # I want to be able to use a simple iterator
116 for l in g.lanes.values():
117 self.failUnlessEqual(l.analysis, 'eland_extended')
118 self.failUnlessEqual(l.read_length, '37')
119 self.failUnlessEqual(l.use_bases, 'Y'*37)
121 # test data extracted from summary file
123 (281331, 11169), (203841, 13513),
124 (220889, 15653), (137294, 14666),
125 (129388, 14525), (262092, 10751),
126 (185754, 13503), (233765, 9537),]
128 self.failUnlessEqual(len(g.summary), 1)
130 summary_lane = g.summary[0][i]
131 self.failUnlessEqual(summary_lane.cluster, clusters[i])
132 self.failUnlessEqual(summary_lane.lane, i)
134 xml = g.get_elements()
135 # just make sure that element tree can serialize the tree
136 xml_str = ElementTree.tostring(xml)
137 g2 = gerald.Gerald(xml=xml)
140 # do it all again after extracting from the xml file
141 self.failUnlessEqual(g.version, g2.version)
142 self.failUnlessEqual(g.date, g2.date)
143 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
144 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
146 # test lane specific parameters from gerald config file
149 g2_lane = g2.lanes[i]
150 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
151 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
152 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
153 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
155 # test (some) summary elements
156 self.failUnlessEqual(len(g.summary), 1)
158 g_summary = g.summary[0][i]
159 g2_summary = g2.summary[0][i]
160 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
161 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
163 g_eland = g.eland_results
164 g2_eland = g2.eland_results
165 for lane in g_eland.results[0].keys():
166 g_results = g_eland.results[0][lane]
167 g2_results = g2_eland.results[0][lane]
168 self.failUnlessEqual(g_results.reads,
170 if isinstance(g_results, eland.ElandLane):
171 self.failUnlessEqual(len(g_results.mapped_reads),
172 len(g2_results.mapped_reads))
173 for k in g_results.mapped_reads.keys():
174 self.failUnlessEqual(g_results.mapped_reads[k],
175 g2_results.mapped_reads[k])
177 self.failUnlessEqual(len(g_results.match_codes),
178 len(g2_results.match_codes))
179 for k in g_results.match_codes.keys():
180 self.failUnlessEqual(g_results.match_codes[k],
181 g2_results.match_codes[k])
184 def test_eland(self):
186 hg_map = {'Lambda.fa': 'Lambda.fa'}
187 for i in range(1,22):
188 short_name = 'chr%d.fa' % (i,)
189 long_name = 'hg18/chr%d.fa' % (i,)
190 hg_map[short_name] = long_name
192 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
193 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
194 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
196 # I added sequence lanes to the last 2 lanes of this test case
198 lane = eland_container.results[0][i]
199 self.failUnlessEqual(lane.reads, 6)
200 self.failUnlessEqual(lane.sample_name, "s")
201 self.failUnlessEqual(lane.lane_id, i)
202 self.failUnlessEqual(len(lane.mapped_reads), 17)
203 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
204 self.failUnlessEqual(lane.match_codes['U0'], 3)
205 self.failUnlessEqual(lane.match_codes['R0'], 2)
206 self.failUnlessEqual(lane.match_codes['U1'], 1)
207 self.failUnlessEqual(lane.match_codes['R1'], 9)
208 self.failUnlessEqual(lane.match_codes['U2'], 0)
209 self.failUnlessEqual(lane.match_codes['R2'], 12)
210 self.failUnlessEqual(lane.match_codes['NM'], 1)
211 self.failUnlessEqual(lane.match_codes['QC'], 0)
214 lane = eland_container.results[0][7]
215 self.failUnlessEqual(lane.reads, 5)
216 self.failUnlessEqual(lane.sample_name, 's')
217 self.failUnlessEqual(lane.lane_id, 7)
218 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
221 lane = eland_container.results[0][8]
222 self.failUnlessEqual(lane.reads, 3)
223 self.failUnlessEqual(lane.sample_name, 's')
224 self.failUnlessEqual(lane.lane_id, 8)
225 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
227 xml = eland_container.get_elements()
228 # just make sure that element tree can serialize the tree
229 xml_str = ElementTree.tostring(xml)
230 e2 = gerald.ELAND(xml=xml)
233 l1 = eland_container.results[0][i]
234 l2 = e2.results[0][i]
235 self.failUnlessEqual(l1.reads, l2.reads)
236 self.failUnlessEqual(l1.sample_name, l2.sample_name)
237 self.failUnlessEqual(l1.lane_id, l2.lane_id)
238 if isinstance(l1, eland.ElandLane):
239 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
240 self.failUnlessEqual(len(l1.mapped_reads), 17)
241 for k in l1.mapped_reads.keys():
242 self.failUnlessEqual(l1.mapped_reads[k],
245 self.failUnlessEqual(len(l1.match_codes), 9)
246 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
247 for k in l1.match_codes.keys():
248 self.failUnlessEqual(l1.match_codes[k],
250 elif isinstance(l1, eland.SequenceLane):
251 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
253 def test_runfolder(self):
255 runs = runfolder.get_runs(self.runfolder_dir)
257 # do we get the flowcell id from the filename?
258 self.failUnlessEqual(len(runs), 1)
259 name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
260 self.failUnlessEqual(runs[0].name, name)
262 # do we get the flowcell id from the FlowcellId.xml file
263 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
264 runs = runfolder.get_runs(self.runfolder_dir)
265 self.failUnlessEqual(len(runs), 1)
266 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
267 self.failUnlessEqual(runs[0].name, name)
270 xml = r1.get_elements()
271 xml_str = ElementTree.tostring(xml)
273 r2 = runfolder.PipelineRun(xml=xml)
274 self.failUnlessEqual(r1.name, r2.name)
275 self.failIfEqual(r2.image_analysis, None)
276 self.failIfEqual(r2.bustard, None)
277 self.failIfEqual(r2.gerald, None)
281 return unittest.makeSuite(RunfolderTests,'test')
283 if __name__ == "__main__":
284 unittest.main(defaultTest="suite")