3 from datetime import datetime, date
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines import ipar
12 from htsworkflow.pipelines import bustard
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines.runfolder import ElementTree
17 from htsworkflow.pipelines.test.simulate_runfolder import *
20 def make_runfolder(obj=None):
22 Make a fake runfolder, attach all the directories to obj if defined
24 # make a fake runfolder directory
25 flowcell_id = 'D07K6ACXX'
26 temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
28 runfolder_dir = os.path.join(temp_dir,
29 '110815_SN787_0101_A{0}'.format(flowcell_id))
30 os.mkdir(runfolder_dir)
32 data_dir = os.path.join(runfolder_dir, 'Data')
35 intensities_dir = make_rta_intensities_1_12(data_dir)
36 make_status_rta1_12(data_dir)
38 basecalls_dir = make_rta_basecalls_1_12(intensities_dir)
39 make_matrix_dir_rta_1_12(basecalls_dir)
41 unaligned_dir = os.path.join(runfolder_dir, "Unaligned")
42 os.mkdir(unaligned_dir)
43 make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
44 make_unaligned_config_1_12(unaligned_dir)
46 aligned_dir = os.path.join(runfolder_dir, "Aligned")
48 make_aligned_eland_export(aligned_dir, flowcell_id)
49 make_aligned_config_1_12(aligned_dir)
52 obj.temp_dir = temp_dir
53 obj.runfolder_dir = runfolder_dir
54 obj.data_dir = data_dir
55 obj.image_analysis_dir = intensities_dir
56 obj.bustard_dir = unaligned_dir
57 obj.gerald_dir = aligned_dir
60 class RunfolderTests(unittest.TestCase):
62 Test components of the runfolder processing code
63 which includes firecrest, bustard, and gerald
66 # attaches all the directories to the object passed in
70 shutil.rmtree(self.temp_dir)
72 def test_bustard(self):
73 """Construct a bustard object"""
74 b = bustard.bustard(self.bustard_dir)
75 self.failUnlessEqual(b.version, '1.12.4.2')
76 self.failUnlessEqual(b.date, None)
77 self.failUnlessEqual(b.user, None)
78 self.failUnlessEqual(len(b.phasing), 0)
80 xml = b.get_elements()
81 b2 = bustard.Bustard(xml=xml)
82 self.failUnlessEqual(b.version, b2.version)
83 self.failUnlessEqual(b.date, b2.date )
84 self.failUnlessEqual(b.user, b2.user)
86 def test_gerald(self):
87 # need to update gerald and make tests for it
88 g = gerald.gerald(self.gerald_dir)
90 self.failUnlessEqual(g.version, 'CASAVA-1.8.1')
91 self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
92 self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
94 # list of genomes, matches what was defined up in
96 # the first None is to offset the genomes list to be 1..9
97 # instead of pythons default 0..8
98 # test lane specific parameters from gerald config file
100 undetermined = g.lanes['Undetermined_indices']
101 self.failUnlessEqual(undetermined.analysis, 'none')
102 self.failUnlessEqual(undetermined.read_length, None)
103 self.failUnlessEqual(undetermined.use_bases, None)
105 project = g.lanes['12383']
106 self.failUnlessEqual(project.analysis, 'eland_extended')
107 self.failUnlessEqual(project.eland_genome, '/g/hg18/chromosomes/')
108 self.failUnlessEqual(project.read_length, '49')
109 self.failUnlessEqual(project.use_bases, 'y'*49+'n')
111 # test data extracted from summary file
113 (3878755, 579626.0), (3920639, 1027332.4),
114 (5713049, 876187.3), (5852907, 538640.6),
115 (4006751, 1265247.4), (5678021, 627070.7),
116 (1854131, 429053.2), (4777517, 592904.0),
119 self.failUnlessEqual(len(g.summary), 2)
121 summary_lane = g.summary[0][i]
122 self.failUnlessEqual(summary_lane.cluster, clusters[i])
123 self.failUnlessEqual(summary_lane.lane, i)
125 xml = g.get_elements()
126 # just make sure that element tree can serialize the tree
127 xml_str = ElementTree.tostring(xml)
128 g2 = gerald.Gerald(xml=xml)
131 # do it all again after extracting from the xml file
132 self.failUnlessEqual(g.version, g2.version)
133 self.failUnlessEqual(g.date, g2.date)
134 self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
135 self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
137 # test lane specific parameters from gerald config file
140 g2_lane = g2.lanes[i]
141 self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
142 self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
143 self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
144 self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
146 # test (some) summary elements
147 self.failUnlessEqual(len(g.summary), 1)
149 g_summary = g.summary[0][i]
150 g2_summary = g2.summary[0][i]
151 self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
152 self.failUnlessEqual(g_summary.lane, g2_summary.lane)
154 g_eland = g.eland_results
155 g2_eland = g2.eland_results
156 for lane in g_eland.results[0].keys():
157 g_results = g_eland.results[0][lane]
158 g2_results = g2_eland.results[0][lane]
159 self.failUnlessEqual(g_results.reads,
161 if isinstance(g_results, eland.ElandLane):
162 self.failUnlessEqual(len(g_results.mapped_reads),
163 len(g2_results.mapped_reads))
164 for k in g_results.mapped_reads.keys():
165 self.failUnlessEqual(g_results.mapped_reads[k],
166 g2_results.mapped_reads[k])
168 self.failUnlessEqual(len(g_results.match_codes),
169 len(g2_results.match_codes))
170 for k in g_results.match_codes.keys():
171 self.failUnlessEqual(g_results.match_codes[k],
172 g2_results.match_codes[k])
175 def test_eland(self):
177 hg_map = {'Lambda.fa': 'Lambda.fa'}
178 for i in range(1,22):
179 short_name = 'chr%d.fa' % (i,)
180 long_name = 'hg18/chr%d.fa' % (i,)
181 hg_map[short_name] = long_name
183 genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
184 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
185 eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
187 # I added sequence lanes to the last 2 lanes of this test case
189 lane = eland_container.results[0][i]
190 self.failUnlessEqual(lane.reads, 6)
191 self.failUnlessEqual(lane.sample_name, "s")
192 self.failUnlessEqual(lane.lane_id, i)
193 self.failUnlessEqual(len(lane.mapped_reads), 17)
194 self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
195 self.failUnlessEqual(lane.match_codes['U0'], 3)
196 self.failUnlessEqual(lane.match_codes['R0'], 2)
197 self.failUnlessEqual(lane.match_codes['U1'], 1)
198 self.failUnlessEqual(lane.match_codes['R1'], 9)
199 self.failUnlessEqual(lane.match_codes['U2'], 0)
200 self.failUnlessEqual(lane.match_codes['R2'], 12)
201 self.failUnlessEqual(lane.match_codes['NM'], 1)
202 self.failUnlessEqual(lane.match_codes['QC'], 0)
205 lane = eland_container.results[0][7]
206 self.failUnlessEqual(lane.reads, 5)
207 self.failUnlessEqual(lane.sample_name, 's')
208 self.failUnlessEqual(lane.lane_id, 7)
209 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
212 lane = eland_container.results[0][8]
213 self.failUnlessEqual(lane.reads, 3)
214 self.failUnlessEqual(lane.sample_name, 's')
215 self.failUnlessEqual(lane.lane_id, 8)
216 self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
218 xml = eland_container.get_elements()
219 # just make sure that element tree can serialize the tree
220 xml_str = ElementTree.tostring(xml)
221 e2 = gerald.ELAND(xml=xml)
224 l1 = eland_container.results[0][i]
225 l2 = e2.results[0][i]
226 self.failUnlessEqual(l1.reads, l2.reads)
227 self.failUnlessEqual(l1.sample_name, l2.sample_name)
228 self.failUnlessEqual(l1.lane_id, l2.lane_id)
229 if isinstance(l1, eland.ElandLane):
230 self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
231 self.failUnlessEqual(len(l1.mapped_reads), 17)
232 for k in l1.mapped_reads.keys():
233 self.failUnlessEqual(l1.mapped_reads[k],
236 self.failUnlessEqual(len(l1.match_codes), 9)
237 self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
238 for k in l1.match_codes.keys():
239 self.failUnlessEqual(l1.match_codes[k],
241 elif isinstance(l1, eland.SequenceLane):
242 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
244 def test_runfolder(self):
246 runs = runfolder.get_runs(self.runfolder_dir)
248 # do we get the flowcell id from the filename?
249 self.failUnlessEqual(len(runs), 1)
250 name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
251 self.failUnlessEqual(runs[0].name, name)
253 # do we get the flowcell id from the FlowcellId.xml file
254 make_flowcell_id(self.runfolder_dir, '207BTAAXY')
255 runs = runfolder.get_runs(self.runfolder_dir)
256 self.failUnlessEqual(len(runs), 1)
257 name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
258 self.failUnlessEqual(runs[0].name, name)
261 xml = r1.get_elements()
262 xml_str = ElementTree.tostring(xml)
264 r2 = runfolder.PipelineRun(xml=xml)
265 self.failUnlessEqual(r1.name, r2.name)
266 self.failIfEqual(r2.image_analysis, None)
267 self.failIfEqual(r2.bustard, None)
268 self.failIfEqual(r2.gerald, None)
272 return unittest.makeSuite(RunfolderTests,'test')
274 if __name__ == "__main__":
275 logging.basicConfig(level=logging.WARN)
276 unittest.main(defaultTest="suite")