Merge ssh://jumpgate.caltech.edu/var/htsworkflow/htsworkflow
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_casava_1_7.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import glob
5 import os
6 import tempfile
7 import shutil
8 import unittest
9
10 from htsworkflow.pipelines import bustard
11 from htsworkflow.pipelines import eland
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import ipar
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines import srf
16 from htsworkflow.pipelines.runfolder import ElementTree
17
18 from htsworkflow.pipelines.test.simulate_runfolder import *
19
20 FCID = 'AA01AAABXX'
21 RUN_NAME = '110420_SN787_0069_%s' %( FCID,)
22
23 def make_runfolder(obj=None):
24     """
25     Make a fake runfolder, attach all the directories to obj if defined
26     """
27     # make a fake runfolder directory
28     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
29
30     runfolder_dir = os.path.join(temp_dir, RUN_NAME)
31     os.mkdir(runfolder_dir)
32
33     data_dir = os.path.join(runfolder_dir, 'Data')
34     os.mkdir(data_dir)
35
36     intensities_dir = make_rta_intensities_1_10(data_dir)
37
38     basecalls_dir = make_rta_basecalls_1_10(intensities_dir)
39     make_matrix_dir_rta_1_10(basecalls_dir)
40     make_qseqs(basecalls_dir, ABXX_BASE_CALL_INFO)
41
42     gerald_dir = os.path.join(basecalls_dir,
43                               'GERALD_02-05-2011_diane')
44     os.mkdir(gerald_dir)
45     make_gerald_config_1_7(gerald_dir)
46     make_summary_casava1_7_xml(gerald_dir)
47     make_eland_export(gerald_dir, lane_list=[4,5,6,7,8])
48     make_fastq(gerald_dir, lane_list=[1,2,3,])
49
50     if obj is not None:
51         obj.temp_dir = temp_dir
52         obj.runfolder_dir = runfolder_dir
53         obj.data_dir = data_dir
54         obj.image_analysis_dir = intensities_dir
55         obj.bustard_dir = basecalls_dir
56         obj.gerald_dir = gerald_dir
57
58
59 class RunfolderTests(unittest.TestCase):
60     """
61     Test components of the runfolder processing code
62     which includes firecrest, bustard, and gerald
63     """
64     def setUp(self):
65         # attaches all the directories to the object passed in
66         make_runfolder(self)
67
68     def tearDown(self):
69         shutil.rmtree(self.temp_dir)
70
71     def test_bustard(self):
72         """Construct a bustard object"""
73         b = bustard.bustard(self.bustard_dir)
74         self.failUnlessEqual(b.software, 'RTA')
75         self.failUnlessEqual(b.version, '1.10.36.0')
76         self.failUnlessEqual(b.date,    None)
77         self.failUnlessEqual(b.user,    None)
78         self.failUnlessEqual(len(b.phasing), 0)
79
80         xml = b.get_elements()
81         b2 = bustard.Bustard(xml=xml)
82         self.failUnlessEqual(b.software, b2.software)
83         self.failUnlessEqual(b.version,  b2.version)
84         self.failUnlessEqual(b.date,     b2.date )
85         self.failUnlessEqual(b.user,     b2.user)
86
87     def test_gerald(self):
88         # need to update gerald and make tests for it
89         g = gerald.gerald(self.gerald_dir)
90
91         self.failUnlessEqual(g.software, 'CASAVA')
92         self.failUnlessEqual(g.version, '1.7.0')
93         self.failUnlessEqual(g.date, datetime(2011,5,2,19,19,49))
94         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
95         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
96
97
98         # list of genomes, matches what was defined up in
99         # make_gerald_config.
100         # the first None is to offset the genomes list to be 1..9
101         # instead of pythons default 0..8
102         genomes = [None, # placeholder
103                    None, # no genome
104                    None, # no genome
105                    None, # no genome
106                    '/g/hg18',
107                    '/g/elegans190',
108                    '/g/elegans190',
109                    '/g/elegans190',
110                    '/g/phi', ]
111         analysis = [None, # placeholder
112                     'sequence_pair',
113                     'sequence_pair',
114                     'sequence_pair',
115                     'eland_pair',
116                     'eland_pair',
117                     'eland_pair',
118                     'eland_pair',
119                     'eland_pair',]
120
121
122         # test lane specific parameters from gerald config file
123         for i in range(1,9):
124             cur_lane = g.lanes[i]
125             self.failUnlessEqual(cur_lane.analysis, analysis[i])
126             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
127             self.failUnlessEqual(cur_lane.read_length, '100')
128             self.failUnlessEqual(cur_lane.use_bases, 'Y'*100+'y'*100)
129
130         # I want to be able to use a simple iterator
131         for l in g.lanes.values():
132           self.failUnless(l.analysis in ('sequence_pair', 'eland_pair'))
133           self.failUnlessEqual(l.read_length, '100')
134           self.failUnlessEqual(l.use_bases, 'Y'*100+'y'*100)
135
136         # test data extracted from summary file
137         clusters = [None,
138                     (1073893, 146344), (1671038, 167728),
139                     (5778484, 83123),  (2411032, 290873),
140                     (2197555, 294381), (2725097, 333724),
141                     (2549849, 313056), (2282159, 232709),]
142
143         self.failUnlessEqual(len(g.summary), 1)
144         for i in range(1,9):
145             summary_lane = g.summary[0][i]
146             self.failUnlessEqual(summary_lane.cluster, clusters[i])
147             self.failUnlessEqual(summary_lane.lane, i)
148
149         xml = g.get_elements()
150         # just make sure that element tree can serialize the tree
151         xml_str = ElementTree.tostring(xml)
152         g2 = gerald.Gerald(xml=xml)
153
154         # do it all again after extracting from the xml file
155         self.failUnlessEqual(g.software, g2.software)
156         self.failUnlessEqual(g.version, g2.version)
157         self.failUnlessEqual(g.date, g2.date)
158         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
159         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
160
161         # test lane specific parameters from gerald config file
162         for i in range(1,9):
163             g_lane = g.lanes[i]
164             g2_lane = g2.lanes[i]
165             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
166             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
167             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
168             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
169
170         # test (some) summary elements
171         self.failUnlessEqual(len(g.summary), 1)
172         for i in range(1,9):
173             g_summary = g.summary[0][i]
174             g2_summary = g2.summary[0][i]
175             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
176             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
177
178             g_eland = g.eland_results
179             g2_eland = g2.eland_results
180             for lane in g_eland.results[0].keys():
181                 g_results = g_eland.results[0][lane]
182                 g2_results = g2_eland.results[0][lane]
183                 self.failUnlessEqual(g_results.reads,
184                                      g2_results.reads)
185                 if isinstance(g_results, eland.ElandLane):
186                   self.failUnlessEqual(len(g_results.mapped_reads),
187                                        len(g2_results.mapped_reads))
188                   for k in g_results.mapped_reads.keys():
189                       self.failUnlessEqual(g_results.mapped_reads[k],
190                                            g2_results.mapped_reads[k])
191
192                   self.failUnlessEqual(len(g_results.match_codes),
193                                        len(g2_results.match_codes))
194                   for k in g_results.match_codes.keys():
195                       self.failUnlessEqual(g_results.match_codes[k],
196                                            g2_results.match_codes[k])
197
198
199     def test_eland(self):
200         ls_tree(self.runfolder_dir)
201         hg_map = {'Lambda.fa': 'Lambda.fa'}
202         for i in range(1,22):
203           short_name = 'chr%d.fa' % (i,)
204           long_name = 'hg18/chr%d.fa' % (i,)
205           hg_map[short_name] = long_name
206
207         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
208                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
209         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
210
211         # test fastq
212         for i in range(1,4):
213             lane = eland_container.results[0][i]
214             self.failUnlessEqual(lane.reads, 3)
215             self.failUnlessEqual(lane.sample_name, 's')
216             self.failUnlessEqual(lane.lane_id, i)
217             self.failUnlessEqual(lane.sequence_type,
218                                  eland.SequenceLane.FASTQ_TYPE)
219
220         # I added sequence lanes to the last 2 lanes of this test case
221         for i in range(4,9):
222             lane = eland_container.results[0][i]
223             self.failUnlessEqual(lane.reads, 28)
224             self.failUnlessEqual(lane.sample_name, "s")
225             self.failUnlessEqual(lane.lane_id, i)
226             self.failUnlessEqual(len(lane.mapped_reads), 7)
227             self.failUnlessEqual(lane.mapped_reads['hg18/chr7.fa'], 4)
228             self.failUnlessEqual(lane.match_codes['U0'], 1)
229             self.failUnlessEqual(lane.match_codes['R0'], 20)
230             self.failUnlessEqual(lane.match_codes['U1'], 1)
231             self.failUnlessEqual(lane.match_codes['R1'], 2)
232             self.failUnlessEqual(lane.match_codes['U2'], 11)
233             self.failUnlessEqual(lane.match_codes['R2'], 0)
234             self.failUnlessEqual(lane.match_codes['NM'], 2)
235             self.failUnlessEqual(lane.match_codes['QC'], 9)
236
237
238         xml = eland_container.get_elements()
239         # just make sure that element tree can serialize the tree
240         xml_str = ElementTree.tostring(xml)
241         e2 = gerald.ELAND(xml=xml)
242
243         for i in range(1,9):
244             l1 = eland_container.results[0][i]
245             l2 = e2.results[0][i]
246             self.failUnlessEqual(l1.reads, l2.reads)
247             self.failUnlessEqual(l1.sample_name, l2.sample_name)
248             self.failUnlessEqual(l1.lane_id, l2.lane_id)
249             if isinstance(l1, eland.ElandLane):
250               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
251               self.failUnlessEqual(len(l1.mapped_reads), 7)
252               for k in l1.mapped_reads.keys():
253                   self.failUnlessEqual(l1.mapped_reads[k],
254                                        l2.mapped_reads[k])
255
256               self.failUnlessEqual(len(l1.match_codes), 9)
257               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
258               for k in l1.match_codes.keys():
259                   self.failUnlessEqual(l1.match_codes[k],
260                                        l2.match_codes[k])
261             elif isinstance(l1, eland.SequenceLane):
262                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
263
264     def test_runfolder(self):
265         runs = runfolder.get_runs(self.runfolder_dir)
266
267         # do we get the flowcell id from the filename?
268         self.failUnlessEqual(len(runs), 1)
269         name = 'run_%s_%s.xml' % ( FCID, date.today().strftime('%Y-%m-%d'),)
270         self.failUnlessEqual(runs[0].name, name)
271
272         # do we get the flowcell id from the FlowcellId.xml file
273         make_flowcell_id(self.runfolder_dir, FCID)
274         runs = runfolder.get_runs(self.runfolder_dir)
275         self.failUnlessEqual(len(runs), 1)
276         name = 'run_%s_%s.xml' % ( FCID, date.today().strftime('%Y-%m-%d'),)
277         self.failUnlessEqual(runs[0].name, name)
278
279         r1 = runs[0]
280         xml = r1.get_elements()
281         xml_str = ElementTree.tostring(xml)
282
283         r2 = runfolder.PipelineRun(xml=xml)
284         self.failUnlessEqual(r1.name, r2.name)
285         self.failIfEqual(r2.image_analysis, None)
286         self.failIfEqual(r2.bustard, None)
287         self.failIfEqual(r2.gerald, None)
288
289     def test_srf(self):
290         pattern_list = srf.create_qseq_patterns(self.bustard_dir)
291         self.assertEqual(len(pattern_list), 1)
292         pattern = pattern_list[0][1] % (1,)
293         self.assertEqual(
294             glob.fnmatch.fnmatch("s_1_1_1101_qseq.txt", pattern),
295             True)
296         self.assertEqual(
297             glob.fnmatch.fnmatch("s_1_0001_qseq.txt", pattern),
298             False)
299
300         print pattern
301
302
303 def suite():
304     return unittest.makeSuite(RunfolderTests,'test')
305
306
307 if __name__ == "__main__":
308     unittest.main(defaultTest="suite")
309