3 from datetime import datetime, date
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines import ipar
12 from htsworkflow.pipelines import bustard
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines.runfolder import ElementTree
17 from htsworkflow.pipelines.test.simulate_runfolder import *
20 def make_runfolder(obj=None):
22 Make a fake runfolder, attach all the directories to obj if defined
24 # make a fake runfolder directory
25 flowcell_id = 'D07K6ACXX'
26 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
28 runfolder_dir = os.path.join(temp_dir,
29 '110815_SN787_0101_A{0}'.format(flowcell_id))
30 os.mkdir(runfolder_dir)
32 data_dir = os.path.join(runfolder_dir, 'Data')
35 intensities_dir = make_rta_intensities_1_12(data_dir)
37 status_dir = os.path.join(data_dir, 'Status_Files')
39 make_summary_rta1_12(status_dir)
41 basecalls_dir = make_rta_basecalls_1_12(intensities_dir)
42 make_matrix_dir_rta_1_12(basecalls_dir)
44 unaligned_dir = os.path.join(runfolder_dir, "Unaligned")
45 os.mkdir(unaligned_dir)
46 make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
47 make_unaligned_config_1_12(unaligned_dir)
49 aligned_dir = os.path.join(runfolder_dir, "Aligned")
51 make_aligned_eland_export(aligned_dir, flowcell_id)
52 make_aligned_config_1_12(aligned_dir)
55 obj.temp_dir = temp_dir
56 obj.runfolder_dir = runfolder_dir
57 obj.data_dir = data_dir
58 obj.image_analysis_dir = intensities_dir
59 obj.bustard_dir = unaligned_dir
60 obj.gerald_dir = aligned_dir
63 class RunfolderTests(unittest.TestCase):
65 Test components of the runfolder processing code
66 which includes firecrest, bustard, and gerald
69 # attaches all the directories to the object passed in
73 shutil.rmtree(self.temp_dir)
75 def test_bustard(self):
76 """Construct a bustard object"""
77 b = bustard.bustard(self.bustard_dir)
78 self.failUnlessEqual(b.version, '1.12.4.2')
79 self.failUnlessEqual(b.date, None)
80 self.failUnlessEqual(b.user, None)
81 self.failUnlessEqual(len(b.phasing), 0)
83 xml = b.get_elements()
84 b2 = bustard.Bustard(xml=xml)
85 self.failUnlessEqual(b.version, b2.version)
86 self.failUnlessEqual(b.date, b2.date )
87 self.failUnlessEqual(b.user, b2.user)
89 def test_gerald(self):
90 # need to update gerald and make tests for it
91 g = gerald.gerald(self.gerald_dir)
93 self.failUnlessEqual(g.version, 'CASAVA-1.8.1')
94 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
95 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
97 # list of genomes, matches what was defined up in
99 # the first None is to offset the genomes list to be 1..9
100 # instead of pythons default 0..8
105 '/g/arabidopsis01222004',
111 # test lane specific parameters from gerald config file
113 cur_lane = g.lanes[i]
114 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
115 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
116 self.failUnlessEqual(cur_lane.read_length, '37')
117 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
119 # I want to be able to use a simple iterator
120 for l in g.lanes.values():
121 self.failUnlessEqual(l.analysis, 'eland_extended')
122 self.failUnlessEqual(l.read_length, '37')
123 self.failUnlessEqual(l.use_bases, 'Y'*37)
125 # test data extracted from summary file
127 (281331, 11169), (203841, 13513),
128 (220889, 15653), (137294, 14666),
129 (129388, 14525), (262092, 10751),
130 (185754, 13503), (233765, 9537),]
132 self.failUnlessEqual(len(g.summary), 1)
134 summary_lane = g.summary[0][i]
135 self.failUnlessEqual(summary_lane.cluster, clusters[i])
136 self.failUnlessEqual(summary_lane.lane, i)
138 xml = g.get_elements()
139 # just make sure that element tree can serialize the tree
140 xml_str = ElementTree.tostring(xml)
141 g2 = gerald.Gerald(xml=xml)
144 # do it all again after extracting from the xml file
145 self.failUnlessEqual(g.version, g2.version)
146 self.failUnlessEqual(g.date, g2.date)
147 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
148 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
150 # test lane specific parameters from gerald config file
153 g2_lane = g2.lanes[i]
154 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
155 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
156 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
157 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
159 # test (some) summary elements
160 self.failUnlessEqual(len(g.summary), 1)
162 g_summary = g.summary[0][i]
163 g2_summary = g2.summary[0][i]
164 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
165 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
167 g_eland = g.eland_results
168 g2_eland = g2.eland_results
169 for lane in g_eland.results[0].keys():
170 g_results = g_eland.results[0][lane]
171 g2_results = g2_eland.results[0][lane]
172 self.failUnlessEqual(g_results.reads,
174 if isinstance(g_results, eland.ElandLane):
175 self.failUnlessEqual(len(g_results.mapped_reads),
176 len(g2_results.mapped_reads))
177 for k in g_results.mapped_reads.keys():
178 self.failUnlessEqual(g_results.mapped_reads[k],
179 g2_results.mapped_reads[k])
181 self.failUnlessEqual(len(g_results.match_codes),
182 len(g2_results.match_codes))
183 for k in g_results.match_codes.keys():
184 self.failUnlessEqual(g_results.match_codes[k],
185 g2_results.match_codes[k])
188 def test_eland(self):
190 hg_map = {'Lambda.fa': 'Lambda.fa'}
191 for i in range(1,22):
192 short_name = 'chr%d.fa' % (i,)
193 long_name = 'hg18/chr%d.fa' % (i,)
194 hg_map[short_name] = long_name
196 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
197 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
198 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
200 # I added sequence lanes to the last 2 lanes of this test case
202 lane = eland_container.results[0][i]
203 self.failUnlessEqual(lane.reads, 6)
204 self.failUnlessEqual(lane.sample_name, "s")
205 self.failUnlessEqual(lane.lane_id, i)
206 self.failUnlessEqual(len(lane.mapped_reads), 17)
207 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
208 self.failUnlessEqual(lane.match_codes['U0'], 3)
209 self.failUnlessEqual(lane.match_codes['R0'], 2)
210 self.failUnlessEqual(lane.match_codes['U1'], 1)
211 self.failUnlessEqual(lane.match_codes['R1'], 9)
212 self.failUnlessEqual(lane.match_codes['U2'], 0)
213 self.failUnlessEqual(lane.match_codes['R2'], 12)
214 self.failUnlessEqual(lane.match_codes['NM'], 1)
215 self.failUnlessEqual(lane.match_codes['QC'], 0)
218 lane = eland_container.results[0][7]
219 self.failUnlessEqual(lane.reads, 5)
220 self.failUnlessEqual(lane.sample_name, 's')
221 self.failUnlessEqual(lane.lane_id, 7)
222 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
225 lane = eland_container.results[0][8]
226 self.failUnlessEqual(lane.reads, 3)
227 self.failUnlessEqual(lane.sample_name, 's')
228 self.failUnlessEqual(lane.lane_id, 8)
229 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
231 xml = eland_container.get_elements()
232 # just make sure that element tree can serialize the tree
233 xml_str = ElementTree.tostring(xml)
234 e2 = gerald.ELAND(xml=xml)
237 l1 = eland_container.results[0][i]
238 l2 = e2.results[0][i]
239 self.failUnlessEqual(l1.reads, l2.reads)
240 self.failUnlessEqual(l1.sample_name, l2.sample_name)
241 self.failUnlessEqual(l1.lane_id, l2.lane_id)
242 if isinstance(l1, eland.ElandLane):
243 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
244 self.failUnlessEqual(len(l1.mapped_reads), 17)
245 for k in l1.mapped_reads.keys():
246 self.failUnlessEqual(l1.mapped_reads[k],
249 self.failUnlessEqual(len(l1.match_codes), 9)
250 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
251 for k in l1.match_codes.keys():
252 self.failUnlessEqual(l1.match_codes[k],
254 elif isinstance(l1, eland.SequenceLane):
255 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
257 def test_runfolder(self):
259 runs = runfolder.get_runs(self.runfolder_dir)
261 # do we get the flowcell id from the filename?
262 self.failUnlessEqual(len(runs), 1)
263 name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
264 self.failUnlessEqual(runs[0].name, name)
266 # do we get the flowcell id from the FlowcellId.xml file
267 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
268 runs = runfolder.get_runs(self.runfolder_dir)
269 self.failUnlessEqual(len(runs), 1)
270 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
271 self.failUnlessEqual(runs[0].name, name)
274 xml = r1.get_elements()
275 xml_str = ElementTree.tostring(xml)
277 r2 = runfolder.PipelineRun(xml=xml)
278 self.failUnlessEqual(r1.name, r2.name)
279 self.failIfEqual(r2.image_analysis, None)
280 self.failIfEqual(r2.bustard, None)
281 self.failIfEqual(r2.gerald, None)
285 return unittest.makeSuite(RunfolderTests,'test')
287 if __name__ == "__main__":
288 logging.basicConfig(level=logging.WARN)
289 unittest.main(defaultTest="suite")