3 from datetime import datetime, date
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines.samplekey import SampleKey
12 from htsworkflow.pipelines import ipar
13 from htsworkflow.pipelines import bustard
14 from htsworkflow.pipelines import gerald
15 from htsworkflow.pipelines import runfolder
16 from htsworkflow.pipelines.runfolder import ElementTree
18 from htsworkflow.pipelines.test.simulate_runfolder import *
21 def make_runfolder(obj=None):
23 Make a fake runfolder, attach all the directories to obj if defined
25 # make a fake runfolder directory
26 flowcell_id = 'D07K6ACXX'
27 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
29 runfolder_dir = os.path.join(temp_dir,
30 '110815_SN787_0101_A{0}'.format(flowcell_id))
31 os.mkdir(runfolder_dir)
33 data_dir = os.path.join(runfolder_dir, 'Data')
36 intensities_dir = make_rta_intensities_1_12(data_dir)
37 make_status_rta1_12(data_dir)
39 basecalls_dir = make_rta_basecalls_1_12(intensities_dir)
40 make_matrix_dir_rta_1_12(basecalls_dir)
42 unaligned_dir = os.path.join(runfolder_dir, "Unaligned")
43 os.mkdir(unaligned_dir)
44 make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
45 make_unaligned_config_1_12(unaligned_dir)
47 aligned_dir = os.path.join(runfolder_dir, "Aligned")
49 make_aligned_eland_export(aligned_dir, flowcell_id)
50 make_aligned_config_1_12(aligned_dir)
53 obj.temp_dir = temp_dir
54 obj.runfolder_dir = runfolder_dir
55 obj.data_dir = data_dir
56 obj.image_analysis_dir = intensities_dir
57 obj.bustard_dir = unaligned_dir
58 obj.gerald_dir = aligned_dir
62 class RunfolderTests(unittest.TestCase):
64 Test components of the runfolder processing code
65 which includes firecrest, bustard, and gerald
68 # attaches all the directories to the object passed in
72 shutil.rmtree(self.temp_dir)
74 def test_bustard(self):
75 """Construct a bustard object"""
76 b = bustard.bustard(self.bustard_dir)
77 self.failUnlessEqual(b.software, 'RTA')
78 self.failUnlessEqual(b.version, '1.12.4.2')
79 self.failUnlessEqual(b.date, None)
80 self.failUnlessEqual(b.user, None)
81 self.failUnlessEqual(len(b.phasing), 0)
83 xml = b.get_elements()
84 b2 = bustard.Bustard(xml=xml)
85 self.failUnlessEqual(b.software, b2.software)
86 self.failUnlessEqual(b.version, b2.version)
87 self.failUnlessEqual(b.date, b2.date )
88 self.failUnlessEqual(b.user, b2.user)
90 def test_gerald(self):
91 # need to update gerald and make tests for it
92 g = gerald.gerald(self.gerald_dir)
94 self.failUnlessEqual(g.software, 'CASAVA')
95 self.failUnlessEqual(g.version, '1.8.1')
96 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
97 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
99 # list of genomes, matches what was defined up in
100 # make_gerald_config.
101 # the first None is to offset the genomes list to be 1..9
102 # instead of pythons default 0..8
103 # test lane specific parameters from gerald config file
105 undetermined = g.lanes[SampleKey(sample='Undetermined_indices')]
106 self.failUnlessEqual(undetermined.analysis, 'none')
107 self.failUnlessEqual(undetermined.read_length, None)
108 self.failUnlessEqual(undetermined.use_bases, None)
110 project = g.lanes[SampleKey(sample='11115')]
111 self.failUnlessEqual(project.analysis, 'eland_extended')
112 self.failUnlessEqual(project.eland_genome, '/g/hg18/chromosomes/')
113 self.failUnlessEqual(project.read_length, '49')
114 self.failUnlessEqual(project.use_bases, 'y'*49+'n')
116 # test data extracted from summary file
118 (3878755, 579626.0), (3920639, 1027332.4),
119 (5713049, 876187.3), (5852907, 538640.6),
120 (4006751, 1265247.4), (5678021, 627070.7),
121 (1854131, 429053.2), (4777517, 592904.0),
124 self.failUnlessEqual(len(g.summary), self.reads)
126 summary_lane = g.summary[0][i]
127 self.failUnlessEqual(summary_lane.cluster, clusters[i])
128 self.failUnlessEqual(summary_lane.lane, i)
130 xml = g.get_elements()
131 # just make sure that element tree can serialize the tree
132 xml_str = ElementTree.tostring(xml)
133 g2 = gerald.CASAVA(xml=xml)
135 # do it all again after extracting from the xml file
136 self.failUnlessEqual(g.software, g2.software)
137 self.failUnlessEqual(g.version, g2.version)
138 self.failUnlessEqual(g.date, g2.date)
139 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
140 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
142 # test lane specific parameters from gerald config file
143 for i in g.lanes.keys():
145 g2_lane = g2.lanes[i]
146 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
147 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
148 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
149 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
151 # test (some) summary elements
152 self.failUnlessEqual(len(g.summary), self.reads)
154 g_summary = g.summary[0][i]
155 g2_summary = g2.summary[0][i]
156 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
157 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
159 g_eland = g.eland_results
160 g2_eland = g2.eland_results
162 g_results = g_eland[key]
163 g2_results = g2_eland[key]
164 self.failUnlessEqual(g_results.reads,
166 if isinstance(g_results, eland.ElandLane):
167 self.failUnlessEqual(len(g_results.mapped_reads),
168 len(g2_results.mapped_reads))
169 for k in g_results.mapped_reads.keys():
170 self.failUnlessEqual(g_results.mapped_reads[k],
171 g2_results.mapped_reads[k])
173 self.failUnlessEqual(len(g_results.match_codes),
174 len(g2_results.match_codes))
175 for k in g_results.match_codes.keys():
176 self.failUnlessEqual(g_results.match_codes[k],
177 g2_results.match_codes[k])
180 def test_eland(self):
181 hg_map = {'Lambda.fa': 'Lambda.fa'}
182 for i in range(1,22):
183 short_name = 'chr%d.fa' % (i,)
184 long_name = 'hg18/chr%d.fa' % (i,)
185 hg_map[short_name] = long_name
187 samples = set(('11111', '11112', '11113', '11114', '11115',
188 '11116', '11117', '11118', '11119', '11120'))
191 genome_maps[i] = hg_map
193 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
195 for lane in eland_container.values():
196 # I added sequence lanes to the last 2 lanes of this test case
197 if lane.sample_name == '11113':
198 self.assertEqual(lane.reads, 24)
199 self.assertEqual(lane.mapped_reads['hg18/chr9.fa'], 6)
200 self.assertEqual(lane.match_codes['U0'], 6)
201 self.assertEqual(lane.match_codes['R0'], 18)
202 self.assertEqual(lane.match_codes['R1'], 24)
203 self.assertEqual(lane.match_codes['R2'], 18)
204 self.assertEqual(lane.match_codes['NM'], 12)
206 self.assertEqual(lane.reads, 8)
207 self.assertEqual(lane.mapped_reads['hg18/chr9.fa'], 2)
208 self.assertEqual(lane.match_codes['U0'], 2)
209 self.assertEqual(lane.match_codes['R0'], 6)
210 self.assertEqual(lane.match_codes['R1'], 8)
211 self.assertEqual(lane.match_codes['R2'], 6)
212 self.assertEqual(lane.match_codes['NM'], 4)
214 self.assertTrue(lane.sample_name in samples)
215 #self.assertEqual(lane.lane_id, 1)
216 self.assertEqual(len(lane.mapped_reads), 1)
217 self.assertEqual(lane.match_codes['U1'], 0)
218 self.assertEqual(lane.match_codes['U2'], 0)
219 self.assertEqual(lane.match_codes['QC'], 0)
221 xml = eland_container.get_elements()
222 # just make sure that element tree can serialize the tree
223 xml_str = ElementTree.tostring(xml)
224 e2 = gerald.ELAND(xml=xml)
226 for key in eland_container.results:
227 l1 = eland_container.results[key]
229 self.failUnlessEqual(l1.reads, l2.reads)
230 self.failUnlessEqual(l1.sample_name, l2.sample_name)
231 self.failUnlessEqual(l1.lane_id, l2.lane_id)
232 if isinstance(l1, eland.ElandLane):
233 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
234 self.failUnlessEqual(len(l1.mapped_reads), 1)
235 for k in l1.mapped_reads.keys():
236 self.failUnlessEqual(l1.mapped_reads[k],
239 self.failUnlessEqual(len(l1.match_codes), 9)
240 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
241 for k in l1.match_codes.keys():
242 self.failUnlessEqual(l1.match_codes[k],
244 elif isinstance(l1, eland.SequenceLane):
245 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
247 def test_runfolder(self):
249 runs = runfolder.get_runs(self.runfolder_dir)
251 # do we get the flowcell id from the filename?
252 self.failUnlessEqual(len(runs), 1)
253 name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
254 self.failUnlessEqual(runs[0].name, name)
256 # do we get the flowcell id from the FlowcellId.xml file
257 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
258 runs = runfolder.get_runs(self.runfolder_dir)
259 self.failUnlessEqual(len(runs), 1)
260 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
261 self.failUnlessEqual(runs[0].name, name)
264 xml = r1.get_elements()
265 xml_str = ElementTree.tostring(xml)
267 r2 = runfolder.PipelineRun(xml=xml)
268 self.failUnlessEqual(r1.name, r2.name)
269 self.failIfEqual(r2.image_analysis, None)
270 self.failIfEqual(r2.bustard, None)
271 self.failIfEqual(r2.gerald, None)
275 return unittest.makeSuite(RunfolderTests,'test')
277 if __name__ == "__main__":
278 logging.basicConfig(level=logging.WARN)
279 unittest.main(defaultTest="suite")