3 from datetime import datetime, date
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.runfolder import ElementTree
16 from htsworkflow.pipelines.test.simulate_runfolder import *
19 def make_runfolder(obj=None):
21 Make a fake runfolder, attach all the directories to obj if defined
23 # make a fake runfolder directory
24 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
26 runfolder_dir = os.path.join(temp_dir,
27 '090608_HWI-EAS229_0117_4286GAAXX')
28 os.mkdir(runfolder_dir)
30 data_dir = os.path.join(runfolder_dir, 'Data')
33 intensities_dir = make_rta_intensities_1460(data_dir)
35 basecalls_dir = make_rta_basecalls_1460(intensities_dir)
37 #make_phasing_params(bustard_dir)
38 #make_bustard_config132(bustard_dir)
40 gerald_dir = os.path.join(basecalls_dir,
41 'GERALD_16-06-2009_diane')
43 make_gerald_config_100(gerald_dir)
44 make_summary_rta160_xml(gerald_dir)
45 make_eland_multi(gerald_dir, lane_list=[1,2,3,4,5,6,])
46 make_scarf(gerald_dir, lane_list=[7,])
47 make_fastq(gerald_dir, lane_list=[8,])
50 obj.temp_dir = temp_dir
51 obj.runfolder_dir = runfolder_dir
52 obj.data_dir = data_dir
53 obj.image_analysis_dir = intensities_dir
54 obj.bustard_dir = basecalls_dir
55 obj.gerald_dir = gerald_dir
58 class RunfolderTests(unittest.TestCase):
60 Test components of the runfolder processing code
61 which includes firecrest, bustard, and gerald
64 # attaches all the directories to the object passed in
68 shutil.rmtree(self.temp_dir)
70 # The only thing different from the previous RTA version is
71 # I'm processing the Summary.xml file
74 def test_gerald(self):
75 # need to update gerald and make tests for it
76 g = gerald.gerald(self.gerald_dir)
78 self.failUnlessEqual(g.version,
79 '@(#) Id: GERALD.pl,v 1.171 2008/05/19 17:36:14 mzerara Exp')
80 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
81 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
82 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
85 # list of genomes, matches what was defined up in
87 # the first None is to offset the genomes list to be 1..9
88 # instead of pythons default 0..8
93 '/g/arabidopsis01222004',
99 # test lane specific parameters from gerald config file
101 cur_lane = g.lanes[i]
102 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
103 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
104 self.failUnlessEqual(cur_lane.read_length, '37')
105 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
107 # I want to be able to use a simple iterator
108 for l in g.lanes.values():
109 self.failUnlessEqual(l.analysis, 'eland_extended')
110 self.failUnlessEqual(l.read_length, '37')
111 self.failUnlessEqual(l.use_bases, 'Y'*37)
113 # test data extracted from summary file
115 (281331, 11169), (203841, 13513),
116 (220889, 15653), (137294, 14666),
117 (129388, 14525), (262092, 10751),
118 (185754, 13503), (233765, 9537),]
120 self.failUnlessEqual(len(g.summary), 1)
122 summary_lane = g.summary[0][i]
123 self.failUnlessEqual(summary_lane.cluster, clusters[i])
124 self.failUnlessEqual(summary_lane.lane, i)
126 xml = g.get_elements()
127 # just make sure that element tree can serialize the tree
128 xml_str = ElementTree.tostring(xml)
129 g2 = gerald.Gerald(xml=xml)
132 # do it all again after extracting from the xml file
133 self.failUnlessEqual(g.version, g2.version)
134 self.failUnlessEqual(g.date, g2.date)
135 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
136 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
138 # test lane specific parameters from gerald config file
141 g2_lane = g2.lanes[i]
142 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
143 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
144 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
145 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
147 # test (some) summary elements
148 self.failUnlessEqual(len(g.summary), 1)
150 g_summary = g.summary[0][i]
151 g2_summary = g2.summary[0][i]
152 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
153 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
155 g_eland = g.eland_results
156 g2_eland = g2.eland_results
157 for lane in g_eland.results[0].keys():
158 g_results = g_eland.results[0][lane]
159 g2_results = g2_eland.results[0][lane]
160 self.failUnlessEqual(g_results.reads,
162 if isinstance(g_results, eland.ElandLane):
163 self.failUnlessEqual(len(g_results.mapped_reads),
164 len(g2_results.mapped_reads))
165 for k in g_results.mapped_reads.keys():
166 self.failUnlessEqual(g_results.mapped_reads[k],
167 g2_results.mapped_reads[k])
169 self.failUnlessEqual(len(g_results.match_codes),
170 len(g2_results.match_codes))
171 for k in g_results.match_codes.keys():
172 self.failUnlessEqual(g_results.match_codes[k],
173 g2_results.match_codes[k])
176 def test_eland(self):
178 hg_map = {'Lambda.fa': 'Lambda.fa'}
179 for i in range(1,22):
180 short_name = 'chr%d.fa' % (i,)
181 long_name = 'hg18/chr%d.fa' % (i,)
182 hg_map[short_name] = long_name
184 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
185 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
186 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
188 # I added sequence lanes to the last 2 lanes of this test case
190 lane = eland_container.results[0][i]
191 self.failUnlessEqual(lane.reads, 6)
192 self.failUnlessEqual(lane.sample_name, "s")
193 self.failUnlessEqual(lane.lane_id, i)
194 self.failUnlessEqual(len(lane.mapped_reads), 17)
195 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
196 self.failUnlessEqual(lane.match_codes['U0'], 3)
197 self.failUnlessEqual(lane.match_codes['R0'], 2)
198 self.failUnlessEqual(lane.match_codes['U1'], 1)
199 self.failUnlessEqual(lane.match_codes['R1'], 9)
200 self.failUnlessEqual(lane.match_codes['U2'], 0)
201 self.failUnlessEqual(lane.match_codes['R2'], 12)
202 self.failUnlessEqual(lane.match_codes['NM'], 1)
203 self.failUnlessEqual(lane.match_codes['QC'], 0)
206 lane = eland_container.results[0][7]
207 self.failUnlessEqual(lane.reads, 5)
208 self.failUnlessEqual(lane.sample_name, 's')
209 self.failUnlessEqual(lane.lane_id, 7)
210 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
213 lane = eland_container.results[0][8]
214 self.failUnlessEqual(lane.reads, 3)
215 self.failUnlessEqual(lane.sample_name, 's')
216 self.failUnlessEqual(lane.lane_id, 8)
217 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
219 xml = eland_container.get_elements()
220 # just make sure that element tree can serialize the tree
221 xml_str = ElementTree.tostring(xml)
222 e2 = gerald.ELAND(xml=xml)
225 l1 = eland_container.results[0][i]
226 l2 = e2.results[0][i]
227 self.failUnlessEqual(l1.reads, l2.reads)
228 self.failUnlessEqual(l1.sample_name, l2.sample_name)
229 self.failUnlessEqual(l1.lane_id, l2.lane_id)
230 if isinstance(l1, eland.ElandLane):
231 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
232 self.failUnlessEqual(len(l1.mapped_reads), 17)
233 for k in l1.mapped_reads.keys():
234 self.failUnlessEqual(l1.mapped_reads[k],
237 self.failUnlessEqual(len(l1.match_codes), 9)
238 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
239 for k in l1.match_codes.keys():
240 self.failUnlessEqual(l1.match_codes[k],
242 elif isinstance(l1, eland.SequenceLane):
243 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
245 def test_runfolder(self):
247 runs = runfolder.get_runs(self.runfolder_dir)
249 # do we get the flowcell id from the filename?
250 self.failUnlessEqual(len(runs), 1)
251 name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
252 self.failUnlessEqual(runs[0].name, name)
254 # do we get the flowcell id from the FlowcellId.xml file
255 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
256 runs = runfolder.get_runs(self.runfolder_dir)
257 self.failUnlessEqual(len(runs), 1)
258 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
259 self.failUnlessEqual(runs[0].name, name)
262 xml = r1.get_elements()
263 xml_str = ElementTree.tostring(xml)
265 r2 = runfolder.PipelineRun(xml=xml)
266 self.failUnlessEqual(r1.name, r2.name)
267 self.failIfEqual(r2.image_analysis, None)
268 self.failIfEqual(r2.bustard, None)
269 self.failIfEqual(r2.gerald, None)
273 return unittest.makeSuite(RunfolderTests,'test')
275 if __name__ == "__main__":
276 unittest.main(defaultTest="suite")