3 from datetime import datetime, date
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.runfolder import ElementTree
16 from htsworkflow.pipelines.test.simulate_runfolder import *
19 def make_runfolder(obj=None):
21 Make a fake runfolder, attach all the directories to obj if defined
23 # make a fake runfolder directory
24 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
26 runfolder_dir = os.path.join(temp_dir,
27 '090608_HWI-EAS229_0117_4286GAAXX')
28 os.mkdir(runfolder_dir)
30 data_dir = os.path.join(runfolder_dir, 'Data')
33 intensities_dir = make_rta_intensities_1870(data_dir)
35 basecalls_dir = make_rta_basecalls_1870(intensities_dir)
36 make_matrix_dir_rta160(basecalls_dir)
38 gerald_dir = os.path.join(basecalls_dir,
39 'GERALD_07-09-2010_diane')
41 make_gerald_config_100(gerald_dir)
42 make_summary_rta160_xml(gerald_dir)
43 make_eland_export(gerald_dir, lane_list=[1,2,3,4,5,6,])
44 make_scarf(gerald_dir, lane_list=[7,])
45 make_fastq(gerald_dir, lane_list=[8,])
48 obj.temp_dir = temp_dir
49 obj.runfolder_dir = runfolder_dir
50 obj.data_dir = data_dir
51 obj.image_analysis_dir = intensities_dir
52 obj.bustard_dir = basecalls_dir
53 obj.gerald_dir = gerald_dir
56 class RunfolderTests(unittest.TestCase):
58 Test components of the runfolder processing code
59 which includes firecrest, bustard, and gerald
62 # attaches all the directories to the object passed in
66 shutil.rmtree(self.temp_dir)
68 def test_bustard(self):
69 """Construct a bustard object"""
70 b = bustard.bustard(self.bustard_dir)
71 self.failUnlessEqual(b.software, 'RTA')
72 self.failUnlessEqual(b.version, '1.8.70.0')
73 self.failUnlessEqual(b.date, None)
74 self.failUnlessEqual(b.user, None)
75 self.failUnlessEqual(len(b.phasing), 0)
77 xml = b.get_elements()
78 b2 = bustard.Bustard(xml=xml)
79 self.failUnlessEqual(b.software, b2.software)
80 self.failUnlessEqual(b.version, b2.version)
81 self.failUnlessEqual(b.date, b2.date )
82 self.failUnlessEqual(b.user, b2.user)
84 def test_gerald(self):
85 # need to update gerald and make tests for it
86 g = gerald.gerald(self.gerald_dir)
88 self.failUnlessEqual(g.software, 'GERALD')
89 self.failUnlessEqual(g.version, '1.171')
90 self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
91 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
92 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
95 # list of genomes, matches what was defined up in
97 # the first None is to offset the genomes list to be 1..9
98 # instead of pythons default 0..8
103 '/g/arabidopsis01222004',
109 # test lane specific parameters from gerald config file
111 cur_lane = g.lanes[i]
112 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
113 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
114 self.failUnlessEqual(cur_lane.read_length, '37')
115 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
117 # I want to be able to use a simple iterator
118 for l in g.lanes.values():
119 self.failUnlessEqual(l.analysis, 'eland_extended')
120 self.failUnlessEqual(l.read_length, '37')
121 self.failUnlessEqual(l.use_bases, 'Y'*37)
123 # test data extracted from summary file
125 (281331, 11169), (203841, 13513),
126 (220889, 15653), (137294, 14666),
127 (129388, 14525), (262092, 10751),
128 (185754, 13503), (233765, 9537),]
130 self.failUnlessEqual(len(g.summary), 1)
132 summary_lane = g.summary[0][i]
133 self.failUnlessEqual(summary_lane.cluster, clusters[i])
134 self.failUnlessEqual(summary_lane.lane, i)
136 xml = g.get_elements()
137 # just make sure that element tree can serialize the tree
138 xml_str = ElementTree.tostring(xml)
139 g2 = gerald.Gerald(xml=xml)
142 # do it all again after extracting from the xml file
143 self.failUnlessEqual(g.software, g2.version)
144 self.failUnlessEqual(g.version, g2.version)
145 self.failUnlessEqual(g.date, g2.date)
146 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
147 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
149 # test lane specific parameters from gerald config file
152 g2_lane = g2.lanes[i]
153 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
154 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
155 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
156 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
158 # test (some) summary elements
159 self.failUnlessEqual(len(g.summary), 1)
161 g_summary = g.summary[0][i]
162 g2_summary = g2.summary[0][i]
163 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
164 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
166 g_eland = g.eland_results
167 g2_eland = g2.eland_results
168 for lane in g_eland.results[0].keys():
169 g_results = g_eland.results[0][lane]
170 g2_results = g2_eland.results[0][lane]
171 self.failUnlessEqual(g_results.reads,
173 if isinstance(g_results, eland.ElandLane):
174 self.failUnlessEqual(len(g_results.mapped_reads),
175 len(g2_results.mapped_reads))
176 for k in g_results.mapped_reads.keys():
177 self.failUnlessEqual(g_results.mapped_reads[k],
178 g2_results.mapped_reads[k])
180 self.failUnlessEqual(len(g_results.match_codes),
181 len(g2_results.match_codes))
182 for k in g_results.match_codes.keys():
183 self.failUnlessEqual(g_results.match_codes[k],
184 g2_results.match_codes[k])
187 def test_eland(self):
189 hg_map = {'Lambda.fa': 'Lambda.fa'}
190 for i in range(1,22):
191 short_name = 'chr%d.fa' % (i,)
192 long_name = 'hg18/chr%d.fa' % (i,)
193 hg_map[short_name] = long_name
195 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
196 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
197 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
199 # I added sequence lanes to the last 2 lanes of this test case
201 lane = eland_container.results[0][i]
202 self.failUnlessEqual(lane.reads, 6)
203 self.failUnlessEqual(lane.sample_name, "s")
204 self.failUnlessEqual(lane.lane_id, i)
205 self.failUnlessEqual(len(lane.mapped_reads), 17)
206 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
207 self.failUnlessEqual(lane.match_codes['U0'], 3)
208 self.failUnlessEqual(lane.match_codes['R0'], 2)
209 self.failUnlessEqual(lane.match_codes['U1'], 1)
210 self.failUnlessEqual(lane.match_codes['R1'], 9)
211 self.failUnlessEqual(lane.match_codes['U2'], 0)
212 self.failUnlessEqual(lane.match_codes['R2'], 12)
213 self.failUnlessEqual(lane.match_codes['NM'], 1)
214 self.failUnlessEqual(lane.match_codes['QC'], 0)
217 lane = eland_container.results[0][7]
218 self.failUnlessEqual(lane.reads, 5)
219 self.failUnlessEqual(lane.sample_name, 's')
220 self.failUnlessEqual(lane.lane_id, 7)
221 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
224 lane = eland_container.results[0][8]
225 self.failUnlessEqual(lane.reads, 3)
226 self.failUnlessEqual(lane.sample_name, 's')
227 self.failUnlessEqual(lane.lane_id, 8)
228 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
230 xml = eland_container.get_elements()
231 # just make sure that element tree can serialize the tree
232 xml_str = ElementTree.tostring(xml)
233 e2 = gerald.ELAND(xml=xml)
236 l1 = eland_container.results[0][i]
237 l2 = e2.results[0][i]
238 self.failUnlessEqual(l1.reads, l2.reads)
239 self.failUnlessEqual(l1.sample_name, l2.sample_name)
240 self.failUnlessEqual(l1.lane_id, l2.lane_id)
241 if isinstance(l1, eland.ElandLane):
242 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
243 self.failUnlessEqual(len(l1.mapped_reads), 17)
244 for k in l1.mapped_reads.keys():
245 self.failUnlessEqual(l1.mapped_reads[k],
248 self.failUnlessEqual(len(l1.match_codes), 9)
249 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
250 for k in l1.match_codes.keys():
251 self.failUnlessEqual(l1.match_codes[k],
253 elif isinstance(l1, eland.SequenceLane):
254 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
256 def test_runfolder(self):
258 runs = runfolder.get_runs(self.runfolder_dir)
260 # do we get the flowcell id from the filename?
261 self.failUnlessEqual(len(runs), 1)
262 name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
263 self.failUnlessEqual(runs[0].name, name)
265 # do we get the flowcell id from the FlowcellId.xml file
266 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
267 runs = runfolder.get_runs(self.runfolder_dir)
268 self.failUnlessEqual(len(runs), 1)
269 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
270 self.failUnlessEqual(runs[0].name, name)
273 xml = r1.get_elements()
274 xml_str = ElementTree.tostring(xml)
276 r2 = runfolder.PipelineRun(xml=xml)
277 self.failUnlessEqual(r1.name, r2.name)
278 self.failIfEqual(r2.image_analysis, None)
279 self.failIfEqual(r2.bustard, None)
280 self.failIfEqual(r2.gerald, None)
284 return unittest.makeSuite(RunfolderTests,'test')
286 if __name__ == "__main__":
287 unittest.main(defaultTest="suite")