Change unittest2 back into unittest.
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_casava_1_7.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import glob
5 import os
6 import tempfile
7 import shutil
8 from unittest import TestCase
9
10 from htsworkflow.pipelines import bustard
11 from htsworkflow.pipelines import eland
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import ipar
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines import srf
16 from htsworkflow.pipelines import ElementTree
17
18 from htsworkflow.pipelines.test.simulate_runfolder import *
19
20 FCID = 'AA01AAABXX'
21 RUN_NAME = '110420_SN787_0069_%s' %( FCID,)
22
23 def make_runfolder(obj=None):
24     """
25     Make a fake runfolder, attach all the directories to obj if defined
26     """
27     # make a fake runfolder directory
28     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
29
30     runfolder_dir = os.path.join(temp_dir, RUN_NAME)
31     os.mkdir(runfolder_dir)
32
33     data_dir = os.path.join(runfolder_dir, 'Data')
34     os.mkdir(data_dir)
35
36     intensities_dir = make_rta_intensities_1_10(data_dir)
37
38     basecalls_dir = make_rta_basecalls_1_10(intensities_dir)
39     make_matrix_dir_rta_1_10(basecalls_dir)
40     make_qseqs(basecalls_dir, ABXX_BASE_CALL_INFO)
41
42     gerald_dir = os.path.join(basecalls_dir,
43                               'GERALD_02-05-2011_diane')
44     os.mkdir(gerald_dir)
45     make_gerald_config_1_7(gerald_dir)
46     make_summary_casava1_7_xml(gerald_dir)
47     make_eland_export(gerald_dir, lane_list=[4,5,6,7,8])
48     make_fastq(gerald_dir, lane_list=[1,2,3,])
49
50     if obj is not None:
51         obj.temp_dir = temp_dir
52         obj.runfolder_dir = runfolder_dir
53         obj.data_dir = data_dir
54         obj.image_analysis_dir = intensities_dir
55         obj.bustard_dir = basecalls_dir
56         obj.gerald_dir = gerald_dir
57
58
59 class RunfolderTests(TestCase):
60     """
61     Test components of the runfolder processing code
62     which includes firecrest, bustard, and gerald
63     """
64     def setUp(self):
65         # attaches all the directories to the object passed in
66         make_runfolder(self)
67
68     def tearDown(self):
69         shutil.rmtree(self.temp_dir)
70
71     def test_bustard(self):
72         """Construct a bustard object"""
73         b = bustard.bustard(self.bustard_dir)
74         self.failUnlessEqual(b.software, 'RTA')
75         self.failUnlessEqual(b.version, '1.10.36.0')
76         self.failUnlessEqual(b.date,    None)
77         self.failUnlessEqual(b.user,    None)
78         self.failUnlessEqual(len(b.phasing), 0)
79
80         xml = b.get_elements()
81         b2 = bustard.Bustard(xml=xml)
82         self.failUnlessEqual(b.software, b2.software)
83         self.failUnlessEqual(b.version,  b2.version)
84         self.failUnlessEqual(b.date,     b2.date )
85         self.failUnlessEqual(b.user,     b2.user)
86
87     def test_gerald(self):
88         # need to update gerald and make tests for it
89         g = gerald.gerald(self.gerald_dir)
90
91         self.failUnlessEqual(g.software, 'CASAVA')
92         self.failUnlessEqual(g.version, '1.7.0')
93         self.failUnlessEqual(g.date, datetime(2011,5,2,19,19,49))
94         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
95         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
96
97
98         # list of genomes, matches what was defined up in
99         # make_gerald_config.
100         # the first None is to offset the genomes list to be 1..9
101         # instead of pythons default 0..8
102         genomes = [None, # placeholder
103                    None, # no genome
104                    None, # no genome
105                    None, # no genome
106                    '/g/hg18',
107                    '/g/elegans190',
108                    '/g/elegans190',
109                    '/g/elegans190',
110                    '/g/phi', ]
111         analysis = [None, # placeholder
112                     'sequence_pair',
113                     'sequence_pair',
114                     'sequence_pair',
115                     'eland_pair',
116                     'eland_pair',
117                     'eland_pair',
118                     'eland_pair',
119                     'eland_pair',]
120
121
122         # test lane specific parameters from gerald config file
123         for i in range(1,9):
124             cur_lane = g.lanes[i]
125             self.failUnlessEqual(cur_lane.analysis, analysis[i])
126             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
127             self.failUnlessEqual(cur_lane.read_length, '100')
128             self.failUnlessEqual(cur_lane.use_bases, 'Y'*100+'y'*100)
129
130         # I want to be able to use a simple iterator
131         for l in g.lanes.values():
132           self.failUnless(l.analysis in ('sequence_pair', 'eland_pair'))
133           self.failUnlessEqual(l.read_length, '100')
134           self.failUnlessEqual(l.use_bases, 'Y'*100+'y'*100)
135
136         # test data extracted from summary file
137         clusters = [None,
138                     (1073893, 146344), (1671038, 167728),
139                     (5778484, 83123),  (2411032, 290873),
140                     (2197555, 294381), (2725097, 333724),
141                     (2549849, 313056), (2282159, 232709),]
142
143         self.failUnlessEqual(len(g.summary), 1)
144         for i in range(1,9):
145             summary_lane = g.summary[0][i]
146             self.failUnlessEqual(summary_lane.cluster, clusters[i])
147             self.failUnlessEqual(summary_lane.lane, i)
148
149         xml = g.get_elements()
150         # just make sure that element tree can serialize the tree
151         xml_str = ElementTree.tostring(xml)
152         g2 = gerald.Gerald(xml=xml)
153
154         # do it all again after extracting from the xml file
155         self.failUnlessEqual(g.software, g2.software)
156         self.failUnlessEqual(g.version, g2.version)
157         self.failUnlessEqual(g.date, g2.date)
158         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
159         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
160
161         # test lane specific parameters from gerald config file
162         for i in range(1,9):
163             g_lane = g.lanes[i]
164             g2_lane = g2.lanes[i]
165             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
166             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
167             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
168             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
169
170         # test (some) summary elements
171         self.failUnlessEqual(len(g.summary), 1)
172         for i in range(1,9):
173             g_summary = g.summary[0][i]
174             g2_summary = g2.summary[0][i]
175             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
176             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
177
178             g_eland = g.eland_results
179             g2_eland = g2.eland_results
180             for key in g_eland:
181                 g_results = g_eland[key]
182                 g2_results = g2_eland[key]
183                 self.failUnlessEqual(g_results.reads,
184                                      g2_results.reads)
185                 if isinstance(g_results, eland.ElandLane):
186                   self.failUnlessEqual(len(g_results.mapped_reads),
187                                        len(g2_results.mapped_reads))
188                   for k in g_results.mapped_reads.keys():
189                       self.failUnlessEqual(g_results.mapped_reads[k],
190                                            g2_results.mapped_reads[k])
191
192                   self.failUnlessEqual(len(g_results.match_codes),
193                                        len(g2_results.match_codes))
194                   for k in g_results.match_codes.keys():
195                       self.failUnlessEqual(g_results.match_codes[k],
196                                            g2_results.match_codes[k])
197
198
199     def test_eland(self):
200         hg_map = {'Lambda.fa': 'Lambda.fa'}
201         for i in range(1,22):
202           short_name = 'chr%d.fa' % (i,)
203           long_name = 'hg18/chr%d.fa' % (i,)
204           hg_map[short_name] = long_name
205
206         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
207                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
208         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
209
210         # test fastq
211         for i in range(1,4):
212             key = eland.SampleKey(lane=i, read=1, sample='s')
213             lane = eland_container[key]
214             self.failUnlessEqual(lane.reads, 3)
215             self.failUnlessEqual(lane.sample_name, 's')
216             self.failUnlessEqual(lane.lane_id, i)
217             self.failUnlessEqual(lane.sequence_type,
218                                  eland.SequenceLane.FASTQ_TYPE)
219
220         # I added sequence lanes to the last 2 lanes of this test case
221         for i in range(4,9):
222             key = eland.SampleKey(lane=i, read=1, sample='s')
223             lane = eland_container[key]
224             self.failUnlessEqual(lane.reads, 28)
225             self.failUnlessEqual(lane.sample_name, "s")
226             self.failUnlessEqual(lane.lane_id, i)
227             self.failUnlessEqual(len(lane.mapped_reads), 7)
228             self.failUnlessEqual(lane.mapped_reads['hg18/chr7.fa'], 4)
229             self.failUnlessEqual(lane.match_codes['U0'], 1)
230             self.failUnlessEqual(lane.match_codes['R0'], 20)
231             self.failUnlessEqual(lane.match_codes['U1'], 1)
232             self.failUnlessEqual(lane.match_codes['R1'], 2)
233             self.failUnlessEqual(lane.match_codes['U2'], 11)
234             self.failUnlessEqual(lane.match_codes['R2'], 0)
235             self.failUnlessEqual(lane.match_codes['NM'], 2)
236             self.failUnlessEqual(lane.match_codes['QC'], 9)
237
238
239         xml = eland_container.get_elements()
240         # just make sure that element tree can serialize the tree
241         xml_str = ElementTree.tostring(xml)
242         e2 = gerald.ELAND(xml=xml)
243
244         for key in eland_container:
245             l1 = eland_container[key]
246             l2 = e2[key]
247             self.failUnlessEqual(l1.reads, l2.reads)
248             self.failUnlessEqual(l1.sample_name, l2.sample_name)
249             self.failUnlessEqual(l1.lane_id, l2.lane_id)
250             if isinstance(l1, eland.ElandLane):
251               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
252               self.failUnlessEqual(len(l1.mapped_reads), 7)
253               for k in l1.mapped_reads.keys():
254                   self.failUnlessEqual(l1.mapped_reads[k],
255                                        l2.mapped_reads[k])
256
257               self.failUnlessEqual(len(l1.match_codes), 9)
258               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
259               for k in l1.match_codes.keys():
260                   self.failUnlessEqual(l1.match_codes[k],
261                                        l2.match_codes[k])
262             elif isinstance(l1, eland.SequenceLane):
263                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
264
265     def test_runfolder(self):
266         runs = runfolder.get_runs(self.runfolder_dir)
267
268         # do we get the flowcell id from the filename?
269         self.failUnlessEqual(len(runs), 1)
270         name = 'run_%s_%s.xml' % ( FCID, date.today().strftime('%Y-%m-%d'),)
271         self.failUnlessEqual(runs[0].serialization_filename, name)
272
273         # do we get the flowcell id from the FlowcellId.xml file
274         make_flowcell_id(self.runfolder_dir, FCID)
275         runs = runfolder.get_runs(self.runfolder_dir)
276         self.failUnlessEqual(len(runs), 1)
277         name = 'run_%s_%s.xml' % ( FCID, date.today().strftime('%Y-%m-%d'),)
278         self.failUnlessEqual(runs[0].serialization_filename, name)
279
280         r1 = runs[0]
281         xml = r1.get_elements()
282         xml_str = ElementTree.tostring(xml)
283
284         r2 = runfolder.PipelineRun(xml=xml)
285         self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
286         self.failIfEqual(r2.image_analysis, None)
287         self.failIfEqual(r2.bustard, None)
288         self.failIfEqual(r2.gerald, None)
289
290     def test_srf(self):
291         pattern_list = srf.create_qseq_patterns(self.bustard_dir)
292         self.assertEqual(len(pattern_list), 1)
293         pattern = pattern_list[0][1] % (1,)
294         self.assertEqual(
295             glob.fnmatch.fnmatch("s_1_1_1101_qseq.txt", pattern),
296             True)
297         self.assertEqual(
298             glob.fnmatch.fnmatch("s_1_0001_qseq.txt", pattern),
299             False)
300
301
302 def suite():
303     from unittest import TestSuite, defaultTestLoader
304     suite = TestSuite()
305     suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
306     return suite
307
308
309 if __name__ == "__main__":
310     from unittest import main
311     main(defaultTest="suite")