3 from datetime import datetime, date
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines import ipar
12 from htsworkflow.pipelines import bustard
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines.runfolder import ElementTree
17 from htsworkflow.pipelines.test.simulate_runfolder import *
20 def make_runfolder(obj=None):
22 Make a fake runfolder, attach all the directories to obj if defined
24 # make a fake runfolder directory
25 flowcell_id = 'D07K6ACXX'
26 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
28 runfolder_dir = os.path.join(temp_dir,
29 '110815_SN787_0101_A{0}'.format(flowcell_id))
30 os.mkdir(runfolder_dir)
32 data_dir = os.path.join(runfolder_dir, 'Data')
35 intensities_dir = make_rta_intensities_1_12(data_dir)
36 make_status_rta1_12(data_dir)
38 basecalls_dir = make_rta_basecalls_1_12(intensities_dir)
39 make_matrix_dir_rta_1_12(basecalls_dir)
41 unaligned_dir = os.path.join(runfolder_dir, "Unaligned")
42 os.mkdir(unaligned_dir)
43 make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
44 make_unaligned_config_1_12(unaligned_dir)
46 aligned_dir = os.path.join(runfolder_dir, "Aligned")
48 make_aligned_eland_export(aligned_dir, flowcell_id)
49 make_aligned_config_1_12(aligned_dir)
52 obj.temp_dir = temp_dir
53 obj.runfolder_dir = runfolder_dir
54 obj.data_dir = data_dir
55 obj.image_analysis_dir = intensities_dir
56 obj.bustard_dir = unaligned_dir
57 obj.gerald_dir = aligned_dir
60 class RunfolderTests(unittest.TestCase):
62 Test components of the runfolder processing code
63 which includes firecrest, bustard, and gerald
66 # attaches all the directories to the object passed in
70 shutil.rmtree(self.temp_dir)
72 def test_bustard(self):
73 """Construct a bustard object"""
74 b = bustard.bustard(self.bustard_dir)
75 self.failUnlessEqual(b.version, '1.12.4.2')
76 self.failUnlessEqual(b.date, None)
77 self.failUnlessEqual(b.user, None)
78 self.failUnlessEqual(len(b.phasing), 0)
80 xml = b.get_elements()
81 b2 = bustard.Bustard(xml=xml)
82 self.failUnlessEqual(b.version, b2.version)
83 self.failUnlessEqual(b.date, b2.date )
84 self.failUnlessEqual(b.user, b2.user)
86 def test_gerald(self):
87 # need to update gerald and make tests for it
88 g = gerald.gerald(self.gerald_dir)
90 self.failUnlessEqual(g.version, 'CASAVA-1.8.1')
91 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
92 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
94 # list of genomes, matches what was defined up in
96 # the first None is to offset the genomes list to be 1..9
97 # instead of pythons default 0..8
102 '/g/arabidopsis01222004',
108 # test lane specific parameters from gerald config file
110 cur_lane = g.lanes[i]
111 self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
112 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
113 self.failUnlessEqual(cur_lane.read_length, '37')
114 self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
116 # I want to be able to use a simple iterator
117 for l in g.lanes.values():
118 self.failUnlessEqual(l.analysis, 'eland_extended')
119 self.failUnlessEqual(l.read_length, '37')
120 self.failUnlessEqual(l.use_bases, 'Y'*37)
122 # test data extracted from summary file
124 (281331, 11169), (203841, 13513),
125 (220889, 15653), (137294, 14666),
126 (129388, 14525), (262092, 10751),
127 (185754, 13503), (233765, 9537),]
129 self.failUnlessEqual(len(g.summary), 1)
131 summary_lane = g.summary[0][i]
132 self.failUnlessEqual(summary_lane.cluster, clusters[i])
133 self.failUnlessEqual(summary_lane.lane, i)
135 xml = g.get_elements()
136 # just make sure that element tree can serialize the tree
137 xml_str = ElementTree.tostring(xml)
138 g2 = gerald.Gerald(xml=xml)
141 # do it all again after extracting from the xml file
142 self.failUnlessEqual(g.version, g2.version)
143 self.failUnlessEqual(g.date, g2.date)
144 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
145 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
147 # test lane specific parameters from gerald config file
150 g2_lane = g2.lanes[i]
151 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
152 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
153 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
154 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
156 # test (some) summary elements
157 self.failUnlessEqual(len(g.summary), 1)
159 g_summary = g.summary[0][i]
160 g2_summary = g2.summary[0][i]
161 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
162 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
164 g_eland = g.eland_results
165 g2_eland = g2.eland_results
166 for lane in g_eland.results[0].keys():
167 g_results = g_eland.results[0][lane]
168 g2_results = g2_eland.results[0][lane]
169 self.failUnlessEqual(g_results.reads,
171 if isinstance(g_results, eland.ElandLane):
172 self.failUnlessEqual(len(g_results.mapped_reads),
173 len(g2_results.mapped_reads))
174 for k in g_results.mapped_reads.keys():
175 self.failUnlessEqual(g_results.mapped_reads[k],
176 g2_results.mapped_reads[k])
178 self.failUnlessEqual(len(g_results.match_codes),
179 len(g2_results.match_codes))
180 for k in g_results.match_codes.keys():
181 self.failUnlessEqual(g_results.match_codes[k],
182 g2_results.match_codes[k])
185 def test_eland(self):
187 hg_map = {'Lambda.fa': 'Lambda.fa'}
188 for i in range(1,22):
189 short_name = 'chr%d.fa' % (i,)
190 long_name = 'hg18/chr%d.fa' % (i,)
191 hg_map[short_name] = long_name
193 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
194 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
195 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
197 # I added sequence lanes to the last 2 lanes of this test case
199 lane = eland_container.results[0][i]
200 self.failUnlessEqual(lane.reads, 6)
201 self.failUnlessEqual(lane.sample_name, "s")
202 self.failUnlessEqual(lane.lane_id, i)
203 self.failUnlessEqual(len(lane.mapped_reads), 17)
204 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
205 self.failUnlessEqual(lane.match_codes['U0'], 3)
206 self.failUnlessEqual(lane.match_codes['R0'], 2)
207 self.failUnlessEqual(lane.match_codes['U1'], 1)
208 self.failUnlessEqual(lane.match_codes['R1'], 9)
209 self.failUnlessEqual(lane.match_codes['U2'], 0)
210 self.failUnlessEqual(lane.match_codes['R2'], 12)
211 self.failUnlessEqual(lane.match_codes['NM'], 1)
212 self.failUnlessEqual(lane.match_codes['QC'], 0)
215 lane = eland_container.results[0][7]
216 self.failUnlessEqual(lane.reads, 5)
217 self.failUnlessEqual(lane.sample_name, 's')
218 self.failUnlessEqual(lane.lane_id, 7)
219 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
222 lane = eland_container.results[0][8]
223 self.failUnlessEqual(lane.reads, 3)
224 self.failUnlessEqual(lane.sample_name, 's')
225 self.failUnlessEqual(lane.lane_id, 8)
226 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
228 xml = eland_container.get_elements()
229 # just make sure that element tree can serialize the tree
230 xml_str = ElementTree.tostring(xml)
231 e2 = gerald.ELAND(xml=xml)
234 l1 = eland_container.results[0][i]
235 l2 = e2.results[0][i]
236 self.failUnlessEqual(l1.reads, l2.reads)
237 self.failUnlessEqual(l1.sample_name, l2.sample_name)
238 self.failUnlessEqual(l1.lane_id, l2.lane_id)
239 if isinstance(l1, eland.ElandLane):
240 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
241 self.failUnlessEqual(len(l1.mapped_reads), 17)
242 for k in l1.mapped_reads.keys():
243 self.failUnlessEqual(l1.mapped_reads[k],
246 self.failUnlessEqual(len(l1.match_codes), 9)
247 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
248 for k in l1.match_codes.keys():
249 self.failUnlessEqual(l1.match_codes[k],
251 elif isinstance(l1, eland.SequenceLane):
252 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
254 def test_runfolder(self):
256 runs = runfolder.get_runs(self.runfolder_dir)
258 # do we get the flowcell id from the filename?
259 self.failUnlessEqual(len(runs), 1)
260 name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
261 self.failUnlessEqual(runs[0].name, name)
263 # do we get the flowcell id from the FlowcellId.xml file
264 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
265 runs = runfolder.get_runs(self.runfolder_dir)
266 self.failUnlessEqual(len(runs), 1)
267 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
268 self.failUnlessEqual(runs[0].name, name)
271 xml = r1.get_elements()
272 xml_str = ElementTree.tostring(xml)
274 r2 = runfolder.PipelineRun(xml=xml)
275 self.failUnlessEqual(r1.name, r2.name)
276 self.failIfEqual(r2.image_analysis, None)
277 self.failIfEqual(r2.bustard, None)
278 self.failIfEqual(r2.gerald, None)
282 return unittest.makeSuite(RunfolderTests,'test')
284 if __name__ == "__main__":
285 logging.basicConfig(level=logging.WARN)
286 unittest.main(defaultTest="suite")