3 from datetime import datetime, date
10 from htsworkflow.pipelines import bustard
11 from htsworkflow.pipelines import eland
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import ipar
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines import srf
16 from htsworkflow.pipelines.runfolder import ElementTree
18 from htsworkflow.pipelines.test.simulate_runfolder import *
21 RUN_NAME = '110420_SN787_0069_%s' %( FCID,)
23 def make_runfolder(obj=None):
25 Make a fake runfolder, attach all the directories to obj if defined
27 # make a fake runfolder directory
28 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
30 runfolder_dir = os.path.join(temp_dir, RUN_NAME)
31 os.mkdir(runfolder_dir)
33 data_dir = os.path.join(runfolder_dir, 'Data')
36 intensities_dir = make_rta_intensities_1_10(data_dir)
38 basecalls_dir = make_rta_basecalls_1_10(intensities_dir)
39 make_matrix_dir_rta_1_10(basecalls_dir)
40 make_qseqs(basecalls_dir, ABXX_BASE_CALL_INFO)
42 gerald_dir = os.path.join(basecalls_dir,
43 'GERALD_02-05-2011_diane')
45 make_gerald_config_1_7(gerald_dir)
46 make_summary_casava1_7_xml(gerald_dir)
47 make_eland_export(gerald_dir, lane_list=[4,5,6,7,8])
48 make_fastq(gerald_dir, lane_list=[1,2,3,])
51 obj.temp_dir = temp_dir
52 obj.runfolder_dir = runfolder_dir
53 obj.data_dir = data_dir
54 obj.image_analysis_dir = intensities_dir
55 obj.bustard_dir = basecalls_dir
56 obj.gerald_dir = gerald_dir
59 class RunfolderTests(unittest.TestCase):
61 Test components of the runfolder processing code
62 which includes firecrest, bustard, and gerald
65 # attaches all the directories to the object passed in
69 shutil.rmtree(self.temp_dir)
71 def test_bustard(self):
72 """Construct a bustard object"""
73 b = bustard.bustard(self.bustard_dir)
74 self.failUnlessEqual(b.version, '1.10.36.0')
75 self.failUnlessEqual(b.date, None)
76 self.failUnlessEqual(b.user, None)
77 self.failUnlessEqual(len(b.phasing), 0)
79 xml = b.get_elements()
80 b2 = bustard.Bustard(xml=xml)
81 self.failUnlessEqual(b.version, b2.version)
82 self.failUnlessEqual(b.date, b2.date )
83 self.failUnlessEqual(b.user, b2.user)
85 def test_gerald(self):
86 # need to update gerald and make tests for it
87 g = gerald.gerald(self.gerald_dir)
89 self.failUnlessEqual(g.version,
91 self.failUnlessEqual(g.date, datetime(2011,5,2,19,19,49))
92 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
93 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
96 # list of genomes, matches what was defined up in
98 # the first None is to offset the genomes list to be 1..9
99 # instead of pythons default 0..8
100 genomes = [None, # placeholder
109 analysis = [None, # placeholder
120 # test lane specific parameters from gerald config file
122 cur_lane = g.lanes[i]
123 self.failUnlessEqual(cur_lane.analysis, analysis[i])
124 self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
125 self.failUnlessEqual(cur_lane.read_length, '100')
126 self.failUnlessEqual(cur_lane.use_bases, 'Y'*100+'y'*100)
128 # I want to be able to use a simple iterator
129 for l in g.lanes.values():
130 self.failUnless(l.analysis in ('sequence_pair', 'eland_pair'))
131 self.failUnlessEqual(l.read_length, '100')
132 self.failUnlessEqual(l.use_bases, 'Y'*100+'y'*100)
134 # test data extracted from summary file
136 (1073893, 146344), (1671038, 167728),
137 (5778484, 83123), (2411032, 290873),
138 (2197555, 294381), (2725097, 333724),
139 (2549849, 313056), (2282159, 232709),]
141 self.failUnlessEqual(len(g.summary), 1)
143 summary_lane = g.summary[0][i]
144 self.failUnlessEqual(summary_lane.cluster, clusters[i])
145 self.failUnlessEqual(summary_lane.lane, i)
147 xml = g.get_elements()
148 # just make sure that element tree can serialize the tree
149 xml_str = ElementTree.tostring(xml)
150 g2 = gerald.Gerald(xml=xml)
152 # do it all again after extracting from the xml file
153 self.failUnlessEqual(g.version, g2.version)
154 self.failUnlessEqual(g.date, g2.date)
155 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
156 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
158 # test lane specific parameters from gerald config file
161 g2_lane = g2.lanes[i]
162 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
163 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
164 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
165 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
167 # test (some) summary elements
168 self.failUnlessEqual(len(g.summary), 1)
170 g_summary = g.summary[0][i]
171 g2_summary = g2.summary[0][i]
172 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
173 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
175 g_eland = g.eland_results
176 g2_eland = g2.eland_results
177 for lane in g_eland.results[0].keys():
178 g_results = g_eland.results[0][lane]
179 g2_results = g2_eland.results[0][lane]
180 self.failUnlessEqual(g_results.reads,
182 if isinstance(g_results, eland.ElandLane):
183 self.failUnlessEqual(len(g_results.mapped_reads),
184 len(g2_results.mapped_reads))
185 for k in g_results.mapped_reads.keys():
186 self.failUnlessEqual(g_results.mapped_reads[k],
187 g2_results.mapped_reads[k])
189 self.failUnlessEqual(len(g_results.match_codes),
190 len(g2_results.match_codes))
191 for k in g_results.match_codes.keys():
192 self.failUnlessEqual(g_results.match_codes[k],
193 g2_results.match_codes[k])
196 def test_eland(self):
197 ls_tree(self.runfolder_dir)
198 hg_map = {'Lambda.fa': 'Lambda.fa'}
199 for i in range(1,22):
200 short_name = 'chr%d.fa' % (i,)
201 long_name = 'hg18/chr%d.fa' % (i,)
202 hg_map[short_name] = long_name
204 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
205 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
206 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
210 lane = eland_container.results[0][i]
211 self.failUnlessEqual(lane.reads, 3)
212 self.failUnlessEqual(lane.sample_name, 's')
213 self.failUnlessEqual(lane.lane_id, i)
214 self.failUnlessEqual(lane.sequence_type,
215 eland.SequenceLane.FASTQ_TYPE)
217 # I added sequence lanes to the last 2 lanes of this test case
219 lane = eland_container.results[0][i]
220 self.failUnlessEqual(lane.reads, 28)
221 self.failUnlessEqual(lane.sample_name, "s")
222 self.failUnlessEqual(lane.lane_id, i)
223 self.failUnlessEqual(len(lane.mapped_reads), 7)
224 self.failUnlessEqual(lane.mapped_reads['hg18/chr7.fa'], 4)
225 self.failUnlessEqual(lane.match_codes['U0'], 1)
226 self.failUnlessEqual(lane.match_codes['R0'], 20)
227 self.failUnlessEqual(lane.match_codes['U1'], 1)
228 self.failUnlessEqual(lane.match_codes['R1'], 2)
229 self.failUnlessEqual(lane.match_codes['U2'], 11)
230 self.failUnlessEqual(lane.match_codes['R2'], 0)
231 self.failUnlessEqual(lane.match_codes['NM'], 2)
232 self.failUnlessEqual(lane.match_codes['QC'], 9)
235 xml = eland_container.get_elements()
236 # just make sure that element tree can serialize the tree
237 xml_str = ElementTree.tostring(xml)
238 e2 = gerald.ELAND(xml=xml)
241 l1 = eland_container.results[0][i]
242 l2 = e2.results[0][i]
243 self.failUnlessEqual(l1.reads, l2.reads)
244 self.failUnlessEqual(l1.sample_name, l2.sample_name)
245 self.failUnlessEqual(l1.lane_id, l2.lane_id)
246 if isinstance(l1, eland.ElandLane):
247 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
248 self.failUnlessEqual(len(l1.mapped_reads), 7)
249 for k in l1.mapped_reads.keys():
250 self.failUnlessEqual(l1.mapped_reads[k],
253 self.failUnlessEqual(len(l1.match_codes), 9)
254 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
255 for k in l1.match_codes.keys():
256 self.failUnlessEqual(l1.match_codes[k],
258 elif isinstance(l1, eland.SequenceLane):
259 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
261 def test_runfolder(self):
262 runs = runfolder.get_runs(self.runfolder_dir)
264 # do we get the flowcell id from the filename?
265 self.failUnlessEqual(len(runs), 1)
266 name = 'run_%s_%s.xml' % ( FCID, date.today().strftime('%Y-%m-%d'),)
267 self.failUnlessEqual(runs[0].name, name)
269 # do we get the flowcell id from the FlowcellId.xml file
270 make_flowcell_id(self.runfolder_dir, FCID)
271 runs = runfolder.get_runs(self.runfolder_dir)
272 self.failUnlessEqual(len(runs), 1)
273 name = 'run_%s_%s.xml' % ( FCID, date.today().strftime('%Y-%m-%d'),)
274 self.failUnlessEqual(runs[0].name, name)
277 xml = r1.get_elements()
278 xml_str = ElementTree.tostring(xml)
280 r2 = runfolder.PipelineRun(xml=xml)
281 self.failUnlessEqual(r1.name, r2.name)
282 self.failIfEqual(r2.image_analysis, None)
283 self.failIfEqual(r2.bustard, None)
284 self.failIfEqual(r2.gerald, None)
287 pattern_list = srf.create_qseq_patterns(self.bustard_dir)
288 self.assertEqual(len(pattern_list), 1)
289 pattern = pattern_list[0][1] % (1,)
291 glob.fnmatch.fnmatch("s_1_1_1101_qseq.txt", pattern),
294 glob.fnmatch.fnmatch("s_1_0001_qseq.txt", pattern),
301 return unittest.makeSuite(RunfolderTests,'test')
304 if __name__ == "__main__":
305 unittest.main(defaultTest="suite")