3 from datetime import datetime, date
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines import ipar
12 from htsworkflow.pipelines import bustard
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines.runfolder import ElementTree
17 from htsworkflow.pipelines.test.simulate_runfolder import *
20 def make_runfolder(obj=None):
22 Make a fake runfolder, attach all the directories to obj if defined
24 # make a fake runfolder directory
25 flowcell_id = 'D07K6ACXX'
26 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
28 runfolder_dir = os.path.join(temp_dir,
29 '110815_SN787_0101_A{0}'.format(flowcell_id))
30 os.mkdir(runfolder_dir)
32 data_dir = os.path.join(runfolder_dir, 'Data')
35 intensities_dir = make_rta_intensities_1_12(data_dir)
36 make_status_rta1_12(data_dir)
38 basecalls_dir = make_rta_basecalls_1_12(intensities_dir)
39 make_matrix_dir_rta_1_12(basecalls_dir)
41 unaligned_dir = os.path.join(runfolder_dir, "Unaligned")
42 os.mkdir(unaligned_dir)
43 make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
44 make_unaligned_config_1_12(unaligned_dir)
46 aligned_dir = os.path.join(runfolder_dir, "Aligned")
48 make_aligned_eland_export(aligned_dir, flowcell_id)
49 make_aligned_config_1_12(aligned_dir)
52 obj.temp_dir = temp_dir
53 obj.runfolder_dir = runfolder_dir
54 obj.data_dir = data_dir
55 obj.image_analysis_dir = intensities_dir
56 obj.bustard_dir = unaligned_dir
57 obj.gerald_dir = aligned_dir
60 class RunfolderTests(unittest.TestCase):
62 Test components of the runfolder processing code
63 which includes firecrest, bustard, and gerald
66 # attaches all the directories to the object passed in
70 shutil.rmtree(self.temp_dir)
72 def test_bustard(self):
73 """Construct a bustard object"""
74 b = bustard.bustard(self.bustard_dir)
75 self.failUnlessEqual(b.software, 'RTA')
76 self.failUnlessEqual(b.version, '1.12.4.2')
77 self.failUnlessEqual(b.date, None)
78 self.failUnlessEqual(b.user, None)
79 self.failUnlessEqual(len(b.phasing), 0)
81 xml = b.get_elements()
82 b2 = bustard.Bustard(xml=xml)
83 self.failUnlessEqual(b.software, b2.software)
84 self.failUnlessEqual(b.version, b2.version)
85 self.failUnlessEqual(b.date, b2.date )
86 self.failUnlessEqual(b.user, b2.user)
88 def test_gerald(self):
89 # need to update gerald and make tests for it
90 g = gerald.gerald(self.gerald_dir)
92 self.failUnlessEqual(g.software, 'CASAVA')
93 self.failUnlessEqual(g.version, '1.8.1')
94 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
95 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
97 # list of genomes, matches what was defined up in
99 # the first None is to offset the genomes list to be 1..9
100 # instead of pythons default 0..8
101 # test lane specific parameters from gerald config file
103 undetermined = g.lanes['Undetermined_indices']
104 self.failUnlessEqual(undetermined.analysis, 'none')
105 self.failUnlessEqual(undetermined.read_length, None)
106 self.failUnlessEqual(undetermined.use_bases, None)
108 project = g.lanes['12383']
109 self.failUnlessEqual(project.analysis, 'eland_extended')
110 self.failUnlessEqual(project.eland_genome, '/g/hg18/chromosomes/')
111 self.failUnlessEqual(project.read_length, '49')
112 self.failUnlessEqual(project.use_bases, 'y'*49+'n')
114 # test data extracted from summary file
116 (3878755, 579626.0), (3920639, 1027332.4),
117 (5713049, 876187.3), (5852907, 538640.6),
118 (4006751, 1265247.4), (5678021, 627070.7),
119 (1854131, 429053.2), (4777517, 592904.0),
122 self.failUnlessEqual(len(g.summary), 2)
124 summary_lane = g.summary[0][i]
125 self.failUnlessEqual(summary_lane.cluster, clusters[i])
126 self.failUnlessEqual(summary_lane.lane, i)
128 xml = g.get_elements()
129 # just make sure that element tree can serialize the tree
130 xml_str = ElementTree.tostring(xml)
131 g2 = gerald.Gerald(xml=xml)
134 # do it all again after extracting from the xml file
135 self.failUnlessEqual(g.software, g2.software)
136 self.failUnlessEqual(g.version, g2.version)
137 self.failUnlessEqual(g.date, g2.date)
138 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
139 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
141 # test lane specific parameters from gerald config file
144 g2_lane = g2.lanes[i]
145 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
146 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
147 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
148 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
150 # test (some) summary elements
151 self.failUnlessEqual(len(g.summary), 1)
153 g_summary = g.summary[0][i]
154 g2_summary = g2.summary[0][i]
155 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
156 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
158 g_eland = g.eland_results
159 g2_eland = g2.eland_results
160 for lane in g_eland.results[0].keys():
161 g_results = g_eland.results[0][lane]
162 g2_results = g2_eland.results[0][lane]
163 self.failUnlessEqual(g_results.reads,
165 if isinstance(g_results, eland.ElandLane):
166 self.failUnlessEqual(len(g_results.mapped_reads),
167 len(g2_results.mapped_reads))
168 for k in g_results.mapped_reads.keys():
169 self.failUnlessEqual(g_results.mapped_reads[k],
170 g2_results.mapped_reads[k])
172 self.failUnlessEqual(len(g_results.match_codes),
173 len(g2_results.match_codes))
174 for k in g_results.match_codes.keys():
175 self.failUnlessEqual(g_results.match_codes[k],
176 g2_results.match_codes[k])
179 def test_eland(self):
181 hg_map = {'Lambda.fa': 'Lambda.fa'}
182 for i in range(1,22):
183 short_name = 'chr%d.fa' % (i,)
184 long_name = 'hg18/chr%d.fa' % (i,)
185 hg_map[short_name] = long_name
187 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
188 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
189 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
191 # I added sequence lanes to the last 2 lanes of this test case
193 lane = eland_container.results[0][i]
194 self.failUnlessEqual(lane.reads, 6)
195 self.failUnlessEqual(lane.sample_name, "s")
196 self.failUnlessEqual(lane.lane_id, i)
197 self.failUnlessEqual(len(lane.mapped_reads), 17)
198 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
199 self.failUnlessEqual(lane.match_codes['U0'], 3)
200 self.failUnlessEqual(lane.match_codes['R0'], 2)
201 self.failUnlessEqual(lane.match_codes['U1'], 1)
202 self.failUnlessEqual(lane.match_codes['R1'], 9)
203 self.failUnlessEqual(lane.match_codes['U2'], 0)
204 self.failUnlessEqual(lane.match_codes['R2'], 12)
205 self.failUnlessEqual(lane.match_codes['NM'], 1)
206 self.failUnlessEqual(lane.match_codes['QC'], 0)
209 lane = eland_container.results[0][7]
210 self.failUnlessEqual(lane.reads, 5)
211 self.failUnlessEqual(lane.sample_name, 's')
212 self.failUnlessEqual(lane.lane_id, 7)
213 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
216 lane = eland_container.results[0][8]
217 self.failUnlessEqual(lane.reads, 3)
218 self.failUnlessEqual(lane.sample_name, 's')
219 self.failUnlessEqual(lane.lane_id, 8)
220 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
222 xml = eland_container.get_elements()
223 # just make sure that element tree can serialize the tree
224 xml_str = ElementTree.tostring(xml)
225 e2 = gerald.ELAND(xml=xml)
228 l1 = eland_container.results[0][i]
229 l2 = e2.results[0][i]
230 self.failUnlessEqual(l1.reads, l2.reads)
231 self.failUnlessEqual(l1.sample_name, l2.sample_name)
232 self.failUnlessEqual(l1.lane_id, l2.lane_id)
233 if isinstance(l1, eland.ElandLane):
234 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
235 self.failUnlessEqual(len(l1.mapped_reads), 17)
236 for k in l1.mapped_reads.keys():
237 self.failUnlessEqual(l1.mapped_reads[k],
240 self.failUnlessEqual(len(l1.match_codes), 9)
241 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
242 for k in l1.match_codes.keys():
243 self.failUnlessEqual(l1.match_codes[k],
245 elif isinstance(l1, eland.SequenceLane):
246 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
248 def test_runfolder(self):
250 runs = runfolder.get_runs(self.runfolder_dir)
252 # do we get the flowcell id from the filename?
253 self.failUnlessEqual(len(runs), 1)
254 name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
255 self.failUnlessEqual(runs[0].name, name)
257 # do we get the flowcell id from the FlowcellId.xml file
258 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
259 runs = runfolder.get_runs(self.runfolder_dir)
260 self.failUnlessEqual(len(runs), 1)
261 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
262 self.failUnlessEqual(runs[0].name, name)
265 xml = r1.get_elements()
266 xml_str = ElementTree.tostring(xml)
268 r2 = runfolder.PipelineRun(xml=xml)
269 self.failUnlessEqual(r1.name, r2.name)
270 self.failIfEqual(r2.image_analysis, None)
271 self.failIfEqual(r2.bustard, None)
272 self.failIfEqual(r2.gerald, None)
276 return unittest.makeSuite(RunfolderTests,'test')
278 if __name__ == "__main__":
279 logging.basicConfig(level=logging.WARN)
280 unittest.main(defaultTest="suite")