3 from datetime import datetime, date
8 from unittest2 import TestCase
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines.samplekey import SampleKey
12 from htsworkflow.pipelines import ipar
13 from htsworkflow.pipelines import bustard
14 from htsworkflow.pipelines import gerald
15 from htsworkflow.pipelines import runfolder
16 from htsworkflow.pipelines.runfolder import ElementTree
18 from htsworkflow.pipelines.test.simulate_runfolder import *
21 def make_runfolder(obj=None):
23 Make a fake runfolder, attach all the directories to obj if defined
25 # make a fake runfolder directory
26 flowcell_id = 'D07K6ACXX'
27 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
29 runfolder_dir = os.path.join(
31 '110815_SN787_0101_A{0}'.format(flowcell_id))
32 os.mkdir(runfolder_dir)
34 make_runinfo(runfolder_dir, flowcell_id)
36 data_dir = os.path.join(runfolder_dir, 'Data')
39 intensities_dir = make_rta_intensities_1_12(data_dir)
40 make_status_rta1_12(data_dir)
42 basecalls_dir = make_rta_basecalls_1_12(intensities_dir)
43 make_matrix_dir_rta_1_12(basecalls_dir)
45 unaligned_dir = os.path.join(runfolder_dir, "Unaligned")
46 os.mkdir(unaligned_dir)
47 make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
48 make_unaligned_config_1_12(unaligned_dir)
50 aligned_dir = os.path.join(runfolder_dir, "Aligned")
52 make_aligned_eland_export(aligned_dir, flowcell_id)
53 make_aligned_config_1_12(aligned_dir)
56 obj.flowcell_id = flowcell_id
57 obj.temp_dir = temp_dir
58 obj.runfolder_dir = runfolder_dir
59 obj.data_dir = data_dir
60 obj.image_analysis_dir = intensities_dir
61 obj.bustard_dir = unaligned_dir
62 obj.gerald_dir = aligned_dir
66 class RunfolderTests(TestCase):
68 Test components of the runfolder processing code
69 which includes firecrest, bustard, and gerald
72 # attaches all the directories to the object passed in
76 shutil.rmtree(self.temp_dir)
78 def test_bustard(self):
79 """Construct a bustard object"""
80 b = bustard.bustard(self.bustard_dir)
81 self.failUnlessEqual(b.software, 'RTA')
82 self.failUnlessEqual(b.version, '1.12.4.2')
83 self.failUnlessEqual(b.date, None)
84 self.failUnlessEqual(b.user, None)
85 self.failUnlessEqual(len(b.phasing), 0)
87 xml = b.get_elements()
88 b2 = bustard.Bustard(xml=xml)
89 self.failUnlessEqual(b.software, b2.software)
90 self.failUnlessEqual(b.version, b2.version)
91 self.failUnlessEqual(b.date, b2.date )
92 self.failUnlessEqual(b.user, b2.user)
94 def test_gerald(self):
95 # need to update gerald and make tests for it
96 g = gerald.gerald(self.gerald_dir)
98 self.failUnlessEqual(g.software, 'CASAVA')
99 self.failUnlessEqual(g.version, '1.8.1')
100 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
101 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
103 # list of genomes, matches what was defined up in
104 # make_gerald_config.
105 # the first None is to offset the genomes list to be 1..9
106 # instead of pythons default 0..8
107 # test lane specific parameters from gerald config file
109 undetermined = g.lanes[SampleKey(sample='Undetermined_indices')]
110 self.failUnlessEqual(undetermined.analysis, 'none')
111 self.failUnlessEqual(undetermined.read_length, None)
112 self.failUnlessEqual(undetermined.use_bases, None)
114 project = g.lanes[SampleKey(sample='11115')]
115 self.failUnlessEqual(project.analysis, 'eland_extended')
116 self.failUnlessEqual(project.eland_genome, '/g/hg18/chromosomes/')
117 self.failUnlessEqual(project.read_length, '49')
118 self.failUnlessEqual(project.use_bases, 'y'*49+'n')
120 # test data extracted from summary file
122 (3878755, 579626.0), (3920639, 1027332.4),
123 (5713049, 876187.3), (5852907, 538640.6),
124 (4006751, 1265247.4), (5678021, 627070.7),
125 (1854131, 429053.2), (4777517, 592904.0),
128 self.failUnlessEqual(len(g.summary), self.reads)
130 summary_lane = g.summary[0][i]
131 self.failUnlessEqual(summary_lane.cluster, clusters[i])
132 self.failUnlessEqual(summary_lane.lane, i)
134 xml = g.get_elements()
135 # just make sure that element tree can serialize the tree
136 xml_str = ElementTree.tostring(xml)
137 g2 = gerald.CASAVA(xml=xml)
139 # do it all again after extracting from the xml file
140 self.failUnlessEqual(g.software, g2.software)
141 self.failUnlessEqual(g.version, g2.version)
142 self.failUnlessEqual(g.date, g2.date)
143 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
144 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
146 # test lane specific parameters from gerald config file
147 for i in g.lanes.keys():
149 g2_lane = g2.lanes[i]
150 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
151 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
152 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
153 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
155 # test (some) summary elements
156 self.failUnlessEqual(len(g.summary), self.reads)
158 g_summary = g.summary[0][i]
159 g2_summary = g2.summary[0][i]
160 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
161 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
163 g_eland = g.eland_results
164 g2_eland = g2.eland_results
166 g_results = g_eland[key]
167 g2_results = g2_eland[key]
168 self.failUnlessEqual(g_results.reads,
170 if isinstance(g_results, eland.ElandLane):
171 self.failUnlessEqual(len(g_results.mapped_reads),
172 len(g2_results.mapped_reads))
173 for k in g_results.mapped_reads.keys():
174 self.failUnlessEqual(g_results.mapped_reads[k],
175 g2_results.mapped_reads[k])
177 self.failUnlessEqual(len(g_results.match_codes),
178 len(g2_results.match_codes))
179 for k in g_results.match_codes.keys():
180 self.failUnlessEqual(g_results.match_codes[k],
181 g2_results.match_codes[k])
184 def test_eland(self):
185 hg_map = {'Lambda.fa': 'Lambda.fa'}
186 for i in range(1,22):
187 short_name = 'chr%d.fa' % (i,)
188 long_name = 'hg18/chr%d.fa' % (i,)
189 hg_map[short_name] = long_name
191 samples = set(('11111', '11112', '11113', '11114', '11115',
192 '11116', '11117', '11118', '11119', '11120'))
195 genome_maps[i] = hg_map
197 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
199 for lane in eland_container.values():
200 # I added sequence lanes to the last 2 lanes of this test case
201 if lane.sample_name == '11113':
202 self.assertEqual(lane.reads, 24)
203 self.assertEqual(lane.mapped_reads['hg18/chr9.fa'], 6)
204 self.assertEqual(lane.match_codes['U0'], 6)
205 self.assertEqual(lane.match_codes['R0'], 18)
206 self.assertEqual(lane.match_codes['R1'], 24)
207 self.assertEqual(lane.match_codes['R2'], 18)
208 self.assertEqual(lane.match_codes['NM'], 12)
210 self.assertEqual(lane.reads, 8)
211 self.assertEqual(lane.mapped_reads['hg18/chr9.fa'], 2)
212 self.assertEqual(lane.match_codes['U0'], 2)
213 self.assertEqual(lane.match_codes['R0'], 6)
214 self.assertEqual(lane.match_codes['R1'], 8)
215 self.assertEqual(lane.match_codes['R2'], 6)
216 self.assertEqual(lane.match_codes['NM'], 4)
218 self.assertTrue(lane.sample_name in samples)
219 #self.assertEqual(lane.lane_id, 1)
220 self.assertEqual(len(lane.mapped_reads), 1)
221 self.assertEqual(lane.match_codes['U1'], 0)
222 self.assertEqual(lane.match_codes['U2'], 0)
223 self.assertEqual(lane.match_codes['QC'], 0)
225 xml = eland_container.get_elements()
226 # just make sure that element tree can serialize the tree
227 xml_str = ElementTree.tostring(xml)
228 e2 = gerald.ELAND(xml=xml)
230 for key in eland_container.results:
231 l1 = eland_container.results[key]
233 self.failUnlessEqual(l1.reads, l2.reads)
234 self.failUnlessEqual(l1.sample_name, l2.sample_name)
235 self.failUnlessEqual(l1.lane_id, l2.lane_id)
236 if isinstance(l1, eland.ElandLane):
237 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
238 self.failUnlessEqual(len(l1.mapped_reads), 1)
239 for k in l1.mapped_reads.keys():
240 self.failUnlessEqual(l1.mapped_reads[k],
243 self.failUnlessEqual(len(l1.match_codes), 9)
244 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
245 for k in l1.match_codes.keys():
246 self.failUnlessEqual(l1.match_codes[k],
248 elif isinstance(l1, eland.SequenceLane):
249 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
251 def test_runfolder(self):
252 runs = runfolder.get_runs(self.runfolder_dir)
254 # do we get the flowcell id from the filename?
255 self.failUnlessEqual(len(runs), 1)
256 self.assertEqual(runs[0].flowcell_id, self.flowcell_id)
257 name = 'run_%s_%s.xml' % ( self.flowcell_id,
258 date.today().strftime('%Y-%m-%d'),)
259 self.failUnlessEqual(runs[0].name, name)
261 bustard_dir = os.path.join(self.runfolder_dir, 'Unaligned')
263 self.failUnlessEqual(r1.bustard.sequence_format, 'fastq')
264 self.failUnlessEqual(r1.bustard.pathname, bustard_dir)
265 self.failUnlessEqual(r1.gerald.runfolder_name, 'Unaligned')
267 xml = r1.get_elements()
268 xml_str = ElementTree.tostring(xml)
270 r2 = runfolder.PipelineRun(xml=xml)
271 self.failUnlessEqual(r1.name, r2.name)
272 self.failIfEqual(r2.image_analysis, None)
273 self.failIfEqual(r2.bustard, None)
274 self.failIfEqual(r2.gerald, None)
277 from unittest2 import TestSuite, defaultTestLoader
279 suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
283 if __name__ == "__main__":
284 from unittest2 import main
285 main(defaultTest="suite")