Convert to unittest2
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_casava_1_7.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import glob
5 import os
6 import tempfile
7 import shutil
8 from unittest2 import TestCase
9
10 from htsworkflow.pipelines import bustard
11 from htsworkflow.pipelines import eland
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import ipar
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines import srf
16 from htsworkflow.pipelines.runfolder import ElementTree
17
18 from htsworkflow.pipelines.test.simulate_runfolder import *
19
20 FCID = 'AA01AAABXX'
21 RUN_NAME = '110420_SN787_0069_%s' %( FCID,)
22
23 def make_runfolder(obj=None):
24     """
25     Make a fake runfolder, attach all the directories to obj if defined
26     """
27     # make a fake runfolder directory
28     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
29
30     runfolder_dir = os.path.join(temp_dir, RUN_NAME)
31     os.mkdir(runfolder_dir)
32
33     data_dir = os.path.join(runfolder_dir, 'Data')
34     os.mkdir(data_dir)
35
36     intensities_dir = make_rta_intensities_1_10(data_dir)
37
38     basecalls_dir = make_rta_basecalls_1_10(intensities_dir)
39     make_matrix_dir_rta_1_10(basecalls_dir)
40     make_qseqs(basecalls_dir, ABXX_BASE_CALL_INFO)
41
42     gerald_dir = os.path.join(basecalls_dir,
43                               'GERALD_02-05-2011_diane')
44     os.mkdir(gerald_dir)
45     make_gerald_config_1_7(gerald_dir)
46     make_summary_casava1_7_xml(gerald_dir)
47     make_eland_export(gerald_dir, lane_list=[4,5,6,7,8])
48     make_fastq(gerald_dir, lane_list=[1,2,3,])
49
50     if obj is not None:
51         obj.temp_dir = temp_dir
52         obj.runfolder_dir = runfolder_dir
53         obj.data_dir = data_dir
54         obj.image_analysis_dir = intensities_dir
55         obj.bustard_dir = basecalls_dir
56         obj.gerald_dir = gerald_dir
57
58
59 class RunfolderTests(TestCase):
60     """
61     Test components of the runfolder processing code
62     which includes firecrest, bustard, and gerald
63     """
64     def setUp(self):
65         # attaches all the directories to the object passed in
66         make_runfolder(self)
67
68     def tearDown(self):
69         shutil.rmtree(self.temp_dir)
70
71     def test_bustard(self):
72         """Construct a bustard object"""
73         b = bustard.bustard(self.bustard_dir)
74         self.failUnlessEqual(b.software, 'RTA')
75         self.failUnlessEqual(b.version, '1.10.36.0')
76         self.failUnlessEqual(b.date,    None)
77         self.failUnlessEqual(b.user,    None)
78         self.failUnlessEqual(len(b.phasing), 0)
79
80         xml = b.get_elements()
81         b2 = bustard.Bustard(xml=xml)
82         self.failUnlessEqual(b.software, b2.software)
83         self.failUnlessEqual(b.version,  b2.version)
84         self.failUnlessEqual(b.date,     b2.date )
85         self.failUnlessEqual(b.user,     b2.user)
86
87     def test_gerald(self):
88         # need to update gerald and make tests for it
89         g = gerald.gerald(self.gerald_dir)
90
91         self.failUnlessEqual(g.software, 'CASAVA')
92         self.failUnlessEqual(g.version, '1.7.0')
93         self.failUnlessEqual(g.date, datetime(2011,5,2,19,19,49))
94         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
95         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
96
97
98         # list of genomes, matches what was defined up in
99         # make_gerald_config.
100         # the first None is to offset the genomes list to be 1..9
101         # instead of pythons default 0..8
102         genomes = [None, # placeholder
103                    None, # no genome
104                    None, # no genome
105                    None, # no genome
106                    '/g/hg18',
107                    '/g/elegans190',
108                    '/g/elegans190',
109                    '/g/elegans190',
110                    '/g/phi', ]
111         analysis = [None, # placeholder
112                     'sequence_pair',
113                     'sequence_pair',
114                     'sequence_pair',
115                     'eland_pair',
116                     'eland_pair',
117                     'eland_pair',
118                     'eland_pair',
119                     'eland_pair',]
120
121
122         # test lane specific parameters from gerald config file
123         for i in range(1,9):
124             cur_lane = g.lanes[i]
125             self.failUnlessEqual(cur_lane.analysis, analysis[i])
126             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
127             self.failUnlessEqual(cur_lane.read_length, '100')
128             self.failUnlessEqual(cur_lane.use_bases, 'Y'*100+'y'*100)
129
130         # I want to be able to use a simple iterator
131         for l in g.lanes.values():
132           self.failUnless(l.analysis in ('sequence_pair', 'eland_pair'))
133           self.failUnlessEqual(l.read_length, '100')
134           self.failUnlessEqual(l.use_bases, 'Y'*100+'y'*100)
135
136         # test data extracted from summary file
137         clusters = [None,
138                     (1073893, 146344), (1671038, 167728),
139                     (5778484, 83123),  (2411032, 290873),
140                     (2197555, 294381), (2725097, 333724),
141                     (2549849, 313056), (2282159, 232709),]
142
143         self.failUnlessEqual(len(g.summary), 1)
144         for i in range(1,9):
145             summary_lane = g.summary[0][i]
146             self.failUnlessEqual(summary_lane.cluster, clusters[i])
147             self.failUnlessEqual(summary_lane.lane, i)
148
149         xml = g.get_elements()
150         # just make sure that element tree can serialize the tree
151         xml_str = ElementTree.tostring(xml)
152         g2 = gerald.Gerald(xml=xml)
153
154         # do it all again after extracting from the xml file
155         self.failUnlessEqual(g.software, g2.software)
156         self.failUnlessEqual(g.version, g2.version)
157         self.failUnlessEqual(g.date, g2.date)
158         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
159         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
160
161         # test lane specific parameters from gerald config file
162         for i in range(1,9):
163             g_lane = g.lanes[i]
164             g2_lane = g2.lanes[i]
165             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
166             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
167             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
168             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
169
170         # test (some) summary elements
171         self.failUnlessEqual(len(g.summary), 1)
172         for i in range(1,9):
173             g_summary = g.summary[0][i]
174             g2_summary = g2.summary[0][i]
175             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
176             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
177
178             g_eland = g.eland_results
179             g2_eland = g2.eland_results
180             for key in g_eland:
181                 g_results = g_eland[key]
182                 g2_results = g2_eland[key]
183                 self.failUnlessEqual(g_results.reads,
184                                      g2_results.reads)
185                 if isinstance(g_results, eland.ElandLane):
186                   self.failUnlessEqual(len(g_results.mapped_reads),
187                                        len(g2_results.mapped_reads))
188                   for k in g_results.mapped_reads.keys():
189                       self.failUnlessEqual(g_results.mapped_reads[k],
190                                            g2_results.mapped_reads[k])
191
192                   self.failUnlessEqual(len(g_results.match_codes),
193                                        len(g2_results.match_codes))
194                   for k in g_results.match_codes.keys():
195                       self.failUnlessEqual(g_results.match_codes[k],
196                                            g2_results.match_codes[k])
197
198
199     def test_eland(self):
200         ls_tree(self.runfolder_dir)
201         hg_map = {'Lambda.fa': 'Lambda.fa'}
202         for i in range(1,22):
203           short_name = 'chr%d.fa' % (i,)
204           long_name = 'hg18/chr%d.fa' % (i,)
205           hg_map[short_name] = long_name
206
207         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
208                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
209         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
210
211         # test fastq
212         for i in range(1,4):
213             key = eland.SampleKey(lane=i, read=1, sample='s')
214             lane = eland_container[key]
215             self.failUnlessEqual(lane.reads, 3)
216             self.failUnlessEqual(lane.sample_name, 's')
217             self.failUnlessEqual(lane.lane_id, i)
218             self.failUnlessEqual(lane.sequence_type,
219                                  eland.SequenceLane.FASTQ_TYPE)
220
221         # I added sequence lanes to the last 2 lanes of this test case
222         for i in range(4,9):
223             key = eland.SampleKey(lane=i, read=1, sample='s')
224             lane = eland_container[key]
225             self.failUnlessEqual(lane.reads, 28)
226             self.failUnlessEqual(lane.sample_name, "s")
227             self.failUnlessEqual(lane.lane_id, i)
228             self.failUnlessEqual(len(lane.mapped_reads), 7)
229             self.failUnlessEqual(lane.mapped_reads['hg18/chr7.fa'], 4)
230             self.failUnlessEqual(lane.match_codes['U0'], 1)
231             self.failUnlessEqual(lane.match_codes['R0'], 20)
232             self.failUnlessEqual(lane.match_codes['U1'], 1)
233             self.failUnlessEqual(lane.match_codes['R1'], 2)
234             self.failUnlessEqual(lane.match_codes['U2'], 11)
235             self.failUnlessEqual(lane.match_codes['R2'], 0)
236             self.failUnlessEqual(lane.match_codes['NM'], 2)
237             self.failUnlessEqual(lane.match_codes['QC'], 9)
238
239
240         xml = eland_container.get_elements()
241         # just make sure that element tree can serialize the tree
242         xml_str = ElementTree.tostring(xml)
243         e2 = gerald.ELAND(xml=xml)
244
245         for key in eland_container:
246             l1 = eland_container[key]
247             l2 = e2[key]
248             self.failUnlessEqual(l1.reads, l2.reads)
249             self.failUnlessEqual(l1.sample_name, l2.sample_name)
250             self.failUnlessEqual(l1.lane_id, l2.lane_id)
251             if isinstance(l1, eland.ElandLane):
252               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
253               self.failUnlessEqual(len(l1.mapped_reads), 7)
254               for k in l1.mapped_reads.keys():
255                   self.failUnlessEqual(l1.mapped_reads[k],
256                                        l2.mapped_reads[k])
257
258               self.failUnlessEqual(len(l1.match_codes), 9)
259               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
260               for k in l1.match_codes.keys():
261                   self.failUnlessEqual(l1.match_codes[k],
262                                        l2.match_codes[k])
263             elif isinstance(l1, eland.SequenceLane):
264                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
265
266     def test_runfolder(self):
267         runs = runfolder.get_runs(self.runfolder_dir)
268
269         # do we get the flowcell id from the filename?
270         self.failUnlessEqual(len(runs), 1)
271         name = 'run_%s_%s.xml' % ( FCID, date.today().strftime('%Y-%m-%d'),)
272         self.failUnlessEqual(runs[0].name, name)
273
274         # do we get the flowcell id from the FlowcellId.xml file
275         make_flowcell_id(self.runfolder_dir, FCID)
276         runs = runfolder.get_runs(self.runfolder_dir)
277         self.failUnlessEqual(len(runs), 1)
278         name = 'run_%s_%s.xml' % ( FCID, date.today().strftime('%Y-%m-%d'),)
279         self.failUnlessEqual(runs[0].name, name)
280
281         r1 = runs[0]
282         xml = r1.get_elements()
283         xml_str = ElementTree.tostring(xml)
284
285         r2 = runfolder.PipelineRun(xml=xml)
286         self.failUnlessEqual(r1.name, r2.name)
287         self.failIfEqual(r2.image_analysis, None)
288         self.failIfEqual(r2.bustard, None)
289         self.failIfEqual(r2.gerald, None)
290
291     def test_srf(self):
292         pattern_list = srf.create_qseq_patterns(self.bustard_dir)
293         self.assertEqual(len(pattern_list), 1)
294         pattern = pattern_list[0][1] % (1,)
295         self.assertEqual(
296             glob.fnmatch.fnmatch("s_1_1_1101_qseq.txt", pattern),
297             True)
298         self.assertEqual(
299             glob.fnmatch.fnmatch("s_1_0001_qseq.txt", pattern),
300             False)
301
302         print pattern
303
304
305 def suite():
306     from unittest2 import TestSuite, defaultTestLoader
307     suite = TestSuite()
308     suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
309     return suite
310
311
312 if __name__ == "__main__":
313     from unittest2 import main
314     main(defaultTest="suite")