3 from datetime import datetime, date
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines import ipar
12 from htsworkflow.pipelines import bustard
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines.runfolder import ElementTree
17 from htsworkflow.pipelines.test.simulate_runfolder import *
20 def make_runfolder(obj=None):
22 Make a fake runfolder, attach all the directories to obj if defined
24 # make a fake runfolder directory
25 flowcell_id = 'D07K6ACXX'
26 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
28 runfolder_dir = os.path.join(temp_dir,
29 '110815_SN787_0101_A{0}'.format(flowcell_id))
30 os.mkdir(runfolder_dir)
32 data_dir = os.path.join(runfolder_dir, 'Data')
35 intensities_dir = make_rta_intensities_1_12(data_dir)
36 make_status_rta1_12(data_dir)
38 basecalls_dir = make_rta_basecalls_1_12(intensities_dir)
39 make_matrix_dir_rta_1_12(basecalls_dir)
41 unaligned_dir = os.path.join(runfolder_dir, "Unaligned")
42 os.mkdir(unaligned_dir)
43 make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
44 make_unaligned_config_1_12(unaligned_dir)
46 aligned_dir = os.path.join(runfolder_dir, "Aligned")
48 make_aligned_eland_export(aligned_dir, flowcell_id)
49 make_aligned_config_1_12(aligned_dir)
52 obj.temp_dir = temp_dir
53 obj.runfolder_dir = runfolder_dir
54 obj.data_dir = data_dir
55 obj.image_analysis_dir = intensities_dir
56 obj.bustard_dir = unaligned_dir
57 obj.gerald_dir = aligned_dir
61 class RunfolderTests(unittest.TestCase):
63 Test components of the runfolder processing code
64 which includes firecrest, bustard, and gerald
67 # attaches all the directories to the object passed in
71 shutil.rmtree(self.temp_dir)
73 def test_bustard(self):
74 """Construct a bustard object"""
75 b = bustard.bustard(self.bustard_dir)
76 self.failUnlessEqual(b.software, 'RTA')
77 self.failUnlessEqual(b.version, '1.12.4.2')
78 self.failUnlessEqual(b.date, None)
79 self.failUnlessEqual(b.user, None)
80 self.failUnlessEqual(len(b.phasing), 0)
82 xml = b.get_elements()
83 b2 = bustard.Bustard(xml=xml)
84 self.failUnlessEqual(b.software, b2.software)
85 self.failUnlessEqual(b.version, b2.version)
86 self.failUnlessEqual(b.date, b2.date )
87 self.failUnlessEqual(b.user, b2.user)
89 def test_gerald(self):
90 # need to update gerald and make tests for it
91 g = gerald.gerald(self.gerald_dir)
93 self.failUnlessEqual(g.software, 'CASAVA')
94 self.failUnlessEqual(g.version, '1.8.1')
95 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
96 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
98 # list of genomes, matches what was defined up in
100 # the first None is to offset the genomes list to be 1..9
101 # instead of pythons default 0..8
102 # test lane specific parameters from gerald config file
104 undetermined = g.lanes['Undetermined_indices']
105 self.failUnlessEqual(undetermined.analysis, 'none')
106 self.failUnlessEqual(undetermined.read_length, None)
107 self.failUnlessEqual(undetermined.use_bases, None)
109 project = g.lanes['12383']
110 self.failUnlessEqual(project.analysis, 'eland_extended')
111 self.failUnlessEqual(project.eland_genome, '/g/hg18/chromosomes/')
112 self.failUnlessEqual(project.read_length, '49')
113 self.failUnlessEqual(project.use_bases, 'y'*49+'n')
115 # test data extracted from summary file
117 (3878755, 579626.0), (3920639, 1027332.4),
118 (5713049, 876187.3), (5852907, 538640.6),
119 (4006751, 1265247.4), (5678021, 627070.7),
120 (1854131, 429053.2), (4777517, 592904.0),
123 self.failUnlessEqual(len(g.summary), self.reads)
125 summary_lane = g.summary[0][i]
126 self.failUnlessEqual(summary_lane.cluster, clusters[i])
127 self.failUnlessEqual(summary_lane.lane, i)
129 xml = g.get_elements()
130 # just make sure that element tree can serialize the tree
131 xml_str = ElementTree.tostring(xml)
132 g2 = gerald.CASAVA(xml=xml)
134 # do it all again after extracting from the xml file
135 self.failUnlessEqual(g.software, g2.software)
136 self.failUnlessEqual(g.version, g2.version)
137 self.failUnlessEqual(g.date, g2.date)
138 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
139 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
141 # test lane specific parameters from gerald config file
142 for i in g.lanes.keys():
144 g2_lane = g2.lanes[i]
145 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
146 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
147 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
148 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
150 # test (some) summary elements
151 self.failUnlessEqual(len(g.summary), self.reads)
153 g_summary = g.summary[0][i]
154 g2_summary = g2.summary[0][i]
155 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
156 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
158 g_eland = g.eland_results
159 g2_eland = g2.eland_results
160 for lane in g_eland.results[0].keys():
161 g_results = g_eland.results[0][lane]
162 g2_results = g2_eland.results[0][lane]
163 self.failUnlessEqual(g_results.reads,
165 if isinstance(g_results, eland.ElandLane):
166 self.failUnlessEqual(len(g_results.mapped_reads),
167 len(g2_results.mapped_reads))
168 for k in g_results.mapped_reads.keys():
169 self.failUnlessEqual(g_results.mapped_reads[k],
170 g2_results.mapped_reads[k])
172 self.failUnlessEqual(len(g_results.match_codes),
173 len(g2_results.match_codes))
174 for k in g_results.match_codes.keys():
175 self.failUnlessEqual(g_results.match_codes[k],
176 g2_results.match_codes[k])
179 def test_eland(self):
180 hg_map = {'Lambda.fa': 'Lambda.fa'}
181 for i in range(1,22):
182 short_name = 'chr%d.fa' % (i,)
183 long_name = 'hg18/chr%d.fa' % (i,)
184 hg_map[short_name] = long_name
186 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
187 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
188 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
190 # I added sequence lanes to the last 2 lanes of this test case
192 lane = eland_container.results[0][i]
193 self.failUnlessEqual(lane.reads, 6)
194 self.failUnlessEqual(lane.sample_name, "s")
195 self.failUnlessEqual(lane.lane_id, i)
196 self.failUnlessEqual(len(lane.mapped_reads), 17)
197 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
198 self.failUnlessEqual(lane.match_codes['U0'], 3)
199 self.failUnlessEqual(lane.match_codes['R0'], 2)
200 self.failUnlessEqual(lane.match_codes['U1'], 1)
201 self.failUnlessEqual(lane.match_codes['R1'], 9)
202 self.failUnlessEqual(lane.match_codes['U2'], 0)
203 self.failUnlessEqual(lane.match_codes['R2'], 12)
204 self.failUnlessEqual(lane.match_codes['NM'], 1)
205 self.failUnlessEqual(lane.match_codes['QC'], 0)
208 lane = eland_container.results[0][7]
209 self.failUnlessEqual(lane.reads, 5)
210 self.failUnlessEqual(lane.sample_name, 's')
211 self.failUnlessEqual(lane.lane_id, 7)
212 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
215 lane = eland_container.results[0][8]
216 self.failUnlessEqual(lane.reads, 3)
217 self.failUnlessEqual(lane.sample_name, 's')
218 self.failUnlessEqual(lane.lane_id, 8)
219 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
221 xml = eland_container.get_elements()
222 # just make sure that element tree can serialize the tree
223 xml_str = ElementTree.tostring(xml)
224 e2 = gerald.ELAND(xml=xml)
227 l1 = eland_container.results[0][i]
228 l2 = e2.results[0][i]
229 self.failUnlessEqual(l1.reads, l2.reads)
230 self.failUnlessEqual(l1.sample_name, l2.sample_name)
231 self.failUnlessEqual(l1.lane_id, l2.lane_id)
232 if isinstance(l1, eland.ElandLane):
233 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
234 self.failUnlessEqual(len(l1.mapped_reads), 17)
235 for k in l1.mapped_reads.keys():
236 self.failUnlessEqual(l1.mapped_reads[k],
239 self.failUnlessEqual(len(l1.match_codes), 9)
240 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
241 for k in l1.match_codes.keys():
242 self.failUnlessEqual(l1.match_codes[k],
244 elif isinstance(l1, eland.SequenceLane):
245 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
247 def test_runfolder(self):
249 runs = runfolder.get_runs(self.runfolder_dir)
251 # do we get the flowcell id from the filename?
252 self.failUnlessEqual(len(runs), 1)
253 name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
254 self.failUnlessEqual(runs[0].name, name)
256 # do we get the flowcell id from the FlowcellId.xml file
257 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
258 runs = runfolder.get_runs(self.runfolder_dir)
259 self.failUnlessEqual(len(runs), 1)
260 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
261 self.failUnlessEqual(runs[0].name, name)
264 xml = r1.get_elements()
265 xml_str = ElementTree.tostring(xml)
267 r2 = runfolder.PipelineRun(xml=xml)
268 self.failUnlessEqual(r1.name, r2.name)
269 self.failIfEqual(r2.image_analysis, None)
270 self.failIfEqual(r2.bustard, None)
271 self.failIfEqual(r2.gerald, None)
275 return unittest.makeSuite(RunfolderTests,'test')
277 if __name__ == "__main__":
278 logging.basicConfig(level=logging.WARN)
279 unittest.main(defaultTest="suite")