take advantage of absolute_import to simplify import statements
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_casava_1_7.py
1 #!/usr/bin/env python
2 from __future__ import absolute_import
3
4 from datetime import datetime, date
5 import glob
6 import os
7 import tempfile
8 import shutil
9 from unittest import TestCase
10
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import eland
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import ipar
15 from htsworkflow.pipelines import runfolder
16 from htsworkflow.pipelines import srf
17 from htsworkflow.pipelines import ElementTree
18
19 from .simulate_runfolder import *
20
21 FCID = 'AA01AAABXX'
22 RUN_NAME = '110420_SN787_0069_%s' %( FCID,)
23
24 def make_runfolder(obj=None):
25     """
26     Make a fake runfolder, attach all the directories to obj if defined
27     """
28     # make a fake runfolder directory
29     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
30
31     runfolder_dir = os.path.join(temp_dir, RUN_NAME)
32     os.mkdir(runfolder_dir)
33
34     data_dir = os.path.join(runfolder_dir, 'Data')
35     os.mkdir(data_dir)
36
37     intensities_dir = make_rta_intensities_1_10(data_dir)
38
39     basecalls_dir = make_rta_basecalls_1_10(intensities_dir)
40     make_matrix_dir_rta_1_10(basecalls_dir)
41     make_qseqs(basecalls_dir, ABXX_BASE_CALL_INFO)
42
43     gerald_dir = os.path.join(basecalls_dir,
44                               'GERALD_02-05-2011_diane')
45     os.mkdir(gerald_dir)
46     make_gerald_config_1_7(gerald_dir)
47     make_summary_casava1_7_xml(gerald_dir)
48     make_eland_export(gerald_dir, lane_list=[4,5,6,7,8])
49     make_fastq(gerald_dir, lane_list=[1,2,3,])
50
51     if obj is not None:
52         obj.temp_dir = temp_dir
53         obj.runfolder_dir = runfolder_dir
54         obj.data_dir = data_dir
55         obj.image_analysis_dir = intensities_dir
56         obj.bustard_dir = basecalls_dir
57         obj.gerald_dir = gerald_dir
58
59
60 class RunfolderTests(TestCase):
61     """
62     Test components of the runfolder processing code
63     which includes firecrest, bustard, and gerald
64     """
65     def setUp(self):
66         # attaches all the directories to the object passed in
67         make_runfolder(self)
68
69     def tearDown(self):
70         shutil.rmtree(self.temp_dir)
71
72     def test_bustard(self):
73         """Construct a bustard object"""
74         b = bustard.bustard(self.bustard_dir)
75         self.failUnlessEqual(b.software, 'RTA')
76         self.failUnlessEqual(b.version, '1.10.36.0')
77         self.failUnlessEqual(b.date,    None)
78         self.failUnlessEqual(b.user,    None)
79         self.failUnlessEqual(len(b.phasing), 0)
80
81         xml = b.get_elements()
82         b2 = bustard.Bustard(xml=xml)
83         self.failUnlessEqual(b.software, b2.software)
84         self.failUnlessEqual(b.version,  b2.version)
85         self.failUnlessEqual(b.date,     b2.date )
86         self.failUnlessEqual(b.user,     b2.user)
87
88     def test_gerald(self):
89         # need to update gerald and make tests for it
90         g = gerald.gerald(self.gerald_dir)
91
92         self.failUnlessEqual(g.software, 'CASAVA')
93         self.failUnlessEqual(g.version, '1.7.0')
94         self.failUnlessEqual(g.date, datetime(2011,5,2,19,19,49))
95         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
96         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
97
98
99         # list of genomes, matches what was defined up in
100         # make_gerald_config.
101         # the first None is to offset the genomes list to be 1..9
102         # instead of pythons default 0..8
103         genomes = [None, # placeholder
104                    None, # no genome
105                    None, # no genome
106                    None, # no genome
107                    '/g/hg18',
108                    '/g/elegans190',
109                    '/g/elegans190',
110                    '/g/elegans190',
111                    '/g/phi', ]
112         analysis = [None, # placeholder
113                     'sequence_pair',
114                     'sequence_pair',
115                     'sequence_pair',
116                     'eland_pair',
117                     'eland_pair',
118                     'eland_pair',
119                     'eland_pair',
120                     'eland_pair',]
121
122
123         # test lane specific parameters from gerald config file
124         for i in range(1,9):
125             cur_lane = g.lanes[i]
126             self.failUnlessEqual(cur_lane.analysis, analysis[i])
127             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
128             self.failUnlessEqual(cur_lane.read_length, '100')
129             self.failUnlessEqual(cur_lane.use_bases, 'Y'*100+'y'*100)
130
131         # I want to be able to use a simple iterator
132         for l in g.lanes.values():
133           self.failUnless(l.analysis in ('sequence_pair', 'eland_pair'))
134           self.failUnlessEqual(l.read_length, '100')
135           self.failUnlessEqual(l.use_bases, 'Y'*100+'y'*100)
136
137         # test data extracted from summary file
138         clusters = [None,
139                     (1073893, 146344), (1671038, 167728),
140                     (5778484, 83123),  (2411032, 290873),
141                     (2197555, 294381), (2725097, 333724),
142                     (2549849, 313056), (2282159, 232709),]
143
144         self.failUnlessEqual(len(g.summary), 1)
145         for i in range(1,9):
146             summary_lane = g.summary[0][i]
147             self.failUnlessEqual(summary_lane.cluster, clusters[i])
148             self.failUnlessEqual(summary_lane.lane, i)
149
150         xml = g.get_elements()
151         # just make sure that element tree can serialize the tree
152         xml_str = ElementTree.tostring(xml)
153         g2 = gerald.Gerald(xml=xml)
154
155         # do it all again after extracting from the xml file
156         self.failUnlessEqual(g.software, g2.software)
157         self.failUnlessEqual(g.version, g2.version)
158         self.failUnlessEqual(g.date, g2.date)
159         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
160         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
161
162         # test lane specific parameters from gerald config file
163         for i in range(1,9):
164             g_lane = g.lanes[i]
165             g2_lane = g2.lanes[i]
166             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
167             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
168             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
169             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
170
171         # test (some) summary elements
172         self.failUnlessEqual(len(g.summary), 1)
173         for i in range(1,9):
174             g_summary = g.summary[0][i]
175             g2_summary = g2.summary[0][i]
176             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
177             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
178
179             g_eland = g.eland_results
180             g2_eland = g2.eland_results
181             for key in g_eland:
182                 g_results = g_eland[key]
183                 g2_results = g2_eland[key]
184                 self.failUnlessEqual(g_results.reads,
185                                      g2_results.reads)
186                 if isinstance(g_results, eland.ElandLane):
187                   self.failUnlessEqual(len(g_results.mapped_reads),
188                                        len(g2_results.mapped_reads))
189                   for k in g_results.mapped_reads.keys():
190                       self.failUnlessEqual(g_results.mapped_reads[k],
191                                            g2_results.mapped_reads[k])
192
193                   self.failUnlessEqual(len(g_results.match_codes),
194                                        len(g2_results.match_codes))
195                   for k in g_results.match_codes.keys():
196                       self.failUnlessEqual(g_results.match_codes[k],
197                                            g2_results.match_codes[k])
198
199
200     def test_eland(self):
201         hg_map = {'Lambda.fa': 'Lambda.fa'}
202         for i in range(1,22):
203           short_name = 'chr%d.fa' % (i,)
204           long_name = 'hg18/chr%d.fa' % (i,)
205           hg_map[short_name] = long_name
206
207         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
208                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
209         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
210
211         # test fastq
212         for i in range(1,4):
213             key = eland.SampleKey(lane=i, read=1, sample='s')
214             lane = eland_container[key]
215             self.failUnlessEqual(lane.reads, 3)
216             self.failUnlessEqual(lane.sample_name, 's')
217             self.failUnlessEqual(lane.lane_id, i)
218             self.failUnlessEqual(lane.sequence_type,
219                                  eland.SequenceLane.FASTQ_TYPE)
220
221         # I added sequence lanes to the last 2 lanes of this test case
222         for i in range(4,9):
223             key = eland.SampleKey(lane=i, read=1, sample='s')
224             lane = eland_container[key]
225             self.failUnlessEqual(lane.reads, 28)
226             self.failUnlessEqual(lane.sample_name, "s")
227             self.failUnlessEqual(lane.lane_id, i)
228             self.failUnlessEqual(len(lane.mapped_reads), 7)
229             self.failUnlessEqual(lane.mapped_reads['hg18/chr7.fa'], 4)
230             self.failUnlessEqual(lane.match_codes['U0'], 1)
231             self.failUnlessEqual(lane.match_codes['R0'], 20)
232             self.failUnlessEqual(lane.match_codes['U1'], 1)
233             self.failUnlessEqual(lane.match_codes['R1'], 2)
234             self.failUnlessEqual(lane.match_codes['U2'], 11)
235             self.failUnlessEqual(lane.match_codes['R2'], 0)
236             self.failUnlessEqual(lane.match_codes['NM'], 2)
237             self.failUnlessEqual(lane.match_codes['QC'], 9)
238
239
240         xml = eland_container.get_elements()
241         # just make sure that element tree can serialize the tree
242         xml_str = ElementTree.tostring(xml)
243         e2 = gerald.ELAND(xml=xml)
244
245         for key in eland_container:
246             l1 = eland_container[key]
247             l2 = e2[key]
248             self.failUnlessEqual(l1.reads, l2.reads)
249             self.failUnlessEqual(l1.sample_name, l2.sample_name)
250             self.failUnlessEqual(l1.lane_id, l2.lane_id)
251             if isinstance(l1, eland.ElandLane):
252               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
253               self.failUnlessEqual(len(l1.mapped_reads), 7)
254               for k in l1.mapped_reads.keys():
255                   self.failUnlessEqual(l1.mapped_reads[k],
256                                        l2.mapped_reads[k])
257
258               self.failUnlessEqual(len(l1.match_codes), 9)
259               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
260               for k in l1.match_codes.keys():
261                   self.failUnlessEqual(l1.match_codes[k],
262                                        l2.match_codes[k])
263             elif isinstance(l1, eland.SequenceLane):
264                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
265
266     def test_runfolder(self):
267         runs = runfolder.get_runs(self.runfolder_dir)
268
269         # do we get the flowcell id from the filename?
270         self.failUnlessEqual(len(runs), 1)
271         name = 'run_%s_%s.xml' % ( FCID, date.today().strftime('%Y-%m-%d'),)
272         self.failUnlessEqual(runs[0].serialization_filename, name)
273
274         # do we get the flowcell id from the FlowcellId.xml file
275         make_flowcell_id(self.runfolder_dir, FCID)
276         runs = runfolder.get_runs(self.runfolder_dir)
277         self.failUnlessEqual(len(runs), 1)
278         name = 'run_%s_%s.xml' % ( FCID, date.today().strftime('%Y-%m-%d'),)
279         self.failUnlessEqual(runs[0].serialization_filename, name)
280
281         r1 = runs[0]
282         xml = r1.get_elements()
283         xml_str = ElementTree.tostring(xml)
284
285         r2 = runfolder.PipelineRun(xml=xml)
286         self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
287         self.failIfEqual(r2.image_analysis, None)
288         self.failIfEqual(r2.bustard, None)
289         self.failIfEqual(r2.gerald, None)
290
291     def test_srf(self):
292         pattern_list = srf.create_qseq_patterns(self.bustard_dir)
293         self.assertEqual(len(pattern_list), 1)
294         pattern = pattern_list[0][1] % (1,)
295         self.assertEqual(
296             glob.fnmatch.fnmatch("s_1_1_1101_qseq.txt", pattern),
297             True)
298         self.assertEqual(
299             glob.fnmatch.fnmatch("s_1_0001_qseq.txt", pattern),
300             False)
301
302
303 def suite():
304     from unittest import TestSuite, defaultTestLoader
305     suite = TestSuite()
306     suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
307     return suite
308
309
310 if __name__ == "__main__":
311     from unittest import main
312     main(defaultTest="suite")