Merge branch 'master' of mus.cacr.caltech.edu:htsworkflow
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_casava_1_7.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import glob
5 import os
6 import tempfile
7 import shutil
8 import unittest
9
10 from htsworkflow.pipelines import bustard
11 from htsworkflow.pipelines import eland
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import ipar
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines import srf
16 from htsworkflow.pipelines.runfolder import ElementTree
17
18 from htsworkflow.pipelines.test.simulate_runfolder import *
19
20 FCID = 'AA01AAABXX'
21 RUN_NAME = '110420_SN787_0069_%s' %( FCID,)
22
23 def make_runfolder(obj=None):
24     """
25     Make a fake runfolder, attach all the directories to obj if defined
26     """
27     # make a fake runfolder directory
28     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
29
30     runfolder_dir = os.path.join(temp_dir, RUN_NAME)
31     os.mkdir(runfolder_dir)
32
33     data_dir = os.path.join(runfolder_dir, 'Data')
34     os.mkdir(data_dir)
35
36     intensities_dir = make_rta_intensities_1_10(data_dir)
37
38     basecalls_dir = make_rta_basecalls_1_10(intensities_dir)
39     make_matrix_dir_rta_1_10(basecalls_dir)
40     make_qseqs(basecalls_dir, ABXX_BASE_CALL_INFO)
41
42     gerald_dir = os.path.join(basecalls_dir,
43                               'GERALD_02-05-2011_diane')
44     os.mkdir(gerald_dir)
45     make_gerald_config_1_7(gerald_dir)
46     make_summary_casava1_7_xml(gerald_dir)
47     make_eland_export(gerald_dir, lane_list=[4,5,6,7,8])
48     make_fastq(gerald_dir, lane_list=[1,2,3,])
49
50     if obj is not None:
51         obj.temp_dir = temp_dir
52         obj.runfolder_dir = runfolder_dir
53         obj.data_dir = data_dir
54         obj.image_analysis_dir = intensities_dir
55         obj.bustard_dir = basecalls_dir
56         obj.gerald_dir = gerald_dir
57
58
59 class RunfolderTests(unittest.TestCase):
60     """
61     Test components of the runfolder processing code
62     which includes firecrest, bustard, and gerald
63     """
64     def setUp(self):
65         # attaches all the directories to the object passed in
66         make_runfolder(self)
67
68     def tearDown(self):
69         shutil.rmtree(self.temp_dir)
70
71     def test_bustard(self):
72         """Construct a bustard object"""
73         b = bustard.bustard(self.bustard_dir)
74         self.failUnlessEqual(b.version, '1.10.36.0')
75         self.failUnlessEqual(b.date,    None)
76         self.failUnlessEqual(b.user,    None)
77         self.failUnlessEqual(len(b.phasing), 0)
78
79         xml = b.get_elements()
80         b2 = bustard.Bustard(xml=xml)
81         self.failUnlessEqual(b.version, b2.version)
82         self.failUnlessEqual(b.date,    b2.date )
83         self.failUnlessEqual(b.user,    b2.user)
84
85     def test_gerald(self):
86         # need to update gerald and make tests for it
87         g = gerald.gerald(self.gerald_dir)
88
89         self.failUnlessEqual(g.version,
90             'CASAVA-1.7.0')
91         self.failUnlessEqual(g.date, datetime(2011,5,2,19,19,49))
92         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
93         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
94
95
96         # list of genomes, matches what was defined up in
97         # make_gerald_config.
98         # the first None is to offset the genomes list to be 1..9
99         # instead of pythons default 0..8
100         genomes = [None, # placeholder
101                    None, # no genome
102                    None, # no genome
103                    None, # no genome
104                    '/g/hg18',
105                    '/g/elegans190',
106                    '/g/elegans190',
107                    '/g/elegans190',
108                    '/g/phi', ]
109         analysis = [None, # placeholder
110                     'sequence_pair',
111                     'sequence_pair',
112                     'sequence_pair',
113                     'eland_pair',
114                     'eland_pair',
115                     'eland_pair',
116                     'eland_pair',
117                     'eland_pair',]
118
119
120         # test lane specific parameters from gerald config file
121         for i in range(1,9):
122             cur_lane = g.lanes[i]
123             self.failUnlessEqual(cur_lane.analysis, analysis[i])
124             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
125             self.failUnlessEqual(cur_lane.read_length, '100')
126             self.failUnlessEqual(cur_lane.use_bases, 'Y'*100+'y'*100)
127
128         # I want to be able to use a simple iterator
129         for l in g.lanes.values():
130           self.failUnless(l.analysis in ('sequence_pair', 'eland_pair'))
131           self.failUnlessEqual(l.read_length, '100')
132           self.failUnlessEqual(l.use_bases, 'Y'*100+'y'*100)
133
134         # test data extracted from summary file
135         clusters = [None,
136                     (1073893, 146344), (1671038, 167728),
137                     (5778484, 83123),  (2411032, 290873),
138                     (2197555, 294381), (2725097, 333724),
139                     (2549849, 313056), (2282159, 232709),]
140
141         self.failUnlessEqual(len(g.summary), 1)
142         for i in range(1,9):
143             summary_lane = g.summary[0][i]
144             self.failUnlessEqual(summary_lane.cluster, clusters[i])
145             self.failUnlessEqual(summary_lane.lane, i)
146
147         xml = g.get_elements()
148         # just make sure that element tree can serialize the tree
149         xml_str = ElementTree.tostring(xml)
150         g2 = gerald.Gerald(xml=xml)
151
152         # do it all again after extracting from the xml file
153         self.failUnlessEqual(g.version, g2.version)
154         self.failUnlessEqual(g.date, g2.date)
155         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
156         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
157
158         # test lane specific parameters from gerald config file
159         for i in range(1,9):
160             g_lane = g.lanes[i]
161             g2_lane = g2.lanes[i]
162             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
163             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
164             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
165             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
166
167         # test (some) summary elements
168         self.failUnlessEqual(len(g.summary), 1)
169         for i in range(1,9):
170             g_summary = g.summary[0][i]
171             g2_summary = g2.summary[0][i]
172             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
173             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
174
175             g_eland = g.eland_results
176             g2_eland = g2.eland_results
177             for lane in g_eland.results[0].keys():
178                 g_results = g_eland.results[0][lane]
179                 g2_results = g2_eland.results[0][lane]
180                 self.failUnlessEqual(g_results.reads,
181                                      g2_results.reads)
182                 if isinstance(g_results, eland.ElandLane):
183                   self.failUnlessEqual(len(g_results.mapped_reads),
184                                        len(g2_results.mapped_reads))
185                   for k in g_results.mapped_reads.keys():
186                       self.failUnlessEqual(g_results.mapped_reads[k],
187                                            g2_results.mapped_reads[k])
188
189                   self.failUnlessEqual(len(g_results.match_codes),
190                                        len(g2_results.match_codes))
191                   for k in g_results.match_codes.keys():
192                       self.failUnlessEqual(g_results.match_codes[k],
193                                            g2_results.match_codes[k])
194
195
196     def test_eland(self):
197         ls_tree(self.runfolder_dir)
198         hg_map = {'Lambda.fa': 'Lambda.fa'}
199         for i in range(1,22):
200           short_name = 'chr%d.fa' % (i,)
201           long_name = 'hg18/chr%d.fa' % (i,)
202           hg_map[short_name] = long_name
203
204         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
205                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
206         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
207
208         # test fastq
209         for i in range(1,4):
210             lane = eland_container.results[0][i]
211             self.failUnlessEqual(lane.reads, 3)
212             self.failUnlessEqual(lane.sample_name, 's')
213             self.failUnlessEqual(lane.lane_id, i)
214             self.failUnlessEqual(lane.sequence_type,
215                                  eland.SequenceLane.FASTQ_TYPE)
216
217         # I added sequence lanes to the last 2 lanes of this test case
218         for i in range(4,9):
219             lane = eland_container.results[0][i]
220             self.failUnlessEqual(lane.reads, 28)
221             self.failUnlessEqual(lane.sample_name, "s")
222             self.failUnlessEqual(lane.lane_id, i)
223             self.failUnlessEqual(len(lane.mapped_reads), 7)
224             self.failUnlessEqual(lane.mapped_reads['hg18/chr7.fa'], 4)
225             self.failUnlessEqual(lane.match_codes['U0'], 1)
226             self.failUnlessEqual(lane.match_codes['R0'], 20)
227             self.failUnlessEqual(lane.match_codes['U1'], 1)
228             self.failUnlessEqual(lane.match_codes['R1'], 2)
229             self.failUnlessEqual(lane.match_codes['U2'], 11)
230             self.failUnlessEqual(lane.match_codes['R2'], 0)
231             self.failUnlessEqual(lane.match_codes['NM'], 2)
232             self.failUnlessEqual(lane.match_codes['QC'], 9)
233
234
235         xml = eland_container.get_elements()
236         # just make sure that element tree can serialize the tree
237         xml_str = ElementTree.tostring(xml)
238         e2 = gerald.ELAND(xml=xml)
239
240         for i in range(1,9):
241             l1 = eland_container.results[0][i]
242             l2 = e2.results[0][i]
243             self.failUnlessEqual(l1.reads, l2.reads)
244             self.failUnlessEqual(l1.sample_name, l2.sample_name)
245             self.failUnlessEqual(l1.lane_id, l2.lane_id)
246             if isinstance(l1, eland.ElandLane):
247               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
248               self.failUnlessEqual(len(l1.mapped_reads), 7)
249               for k in l1.mapped_reads.keys():
250                   self.failUnlessEqual(l1.mapped_reads[k],
251                                        l2.mapped_reads[k])
252
253               self.failUnlessEqual(len(l1.match_codes), 9)
254               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
255               for k in l1.match_codes.keys():
256                   self.failUnlessEqual(l1.match_codes[k],
257                                        l2.match_codes[k])
258             elif isinstance(l1, eland.SequenceLane):
259                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
260
261     def test_runfolder(self):
262         runs = runfolder.get_runs(self.runfolder_dir)
263
264         # do we get the flowcell id from the filename?
265         self.failUnlessEqual(len(runs), 1)
266         name = 'run_%s_%s.xml' % ( FCID, date.today().strftime('%Y-%m-%d'),)
267         self.failUnlessEqual(runs[0].name, name)
268
269         # do we get the flowcell id from the FlowcellId.xml file
270         make_flowcell_id(self.runfolder_dir, FCID)
271         runs = runfolder.get_runs(self.runfolder_dir)
272         self.failUnlessEqual(len(runs), 1)
273         name = 'run_%s_%s.xml' % ( FCID, date.today().strftime('%Y-%m-%d'),)
274         self.failUnlessEqual(runs[0].name, name)
275
276         r1 = runs[0]
277         xml = r1.get_elements()
278         xml_str = ElementTree.tostring(xml)
279
280         r2 = runfolder.PipelineRun(xml=xml)
281         self.failUnlessEqual(r1.name, r2.name)
282         self.failIfEqual(r2.image_analysis, None)
283         self.failIfEqual(r2.bustard, None)
284         self.failIfEqual(r2.gerald, None)
285
286     def test_srf(self):
287         pattern_list = srf.create_qseq_patterns(self.bustard_dir)
288         self.assertEqual(len(pattern_list), 1)
289         pattern = pattern_list[0][1] % (1,)
290         self.assertEqual(
291             glob.fnmatch.fnmatch("s_1_1_1101_qseq.txt", pattern),
292             True)
293         self.assertEqual(
294             glob.fnmatch.fnmatch("s_1_0001_qseq.txt", pattern),
295             False)
296
297         print pattern
298
299
300 def suite():
301     return unittest.makeSuite(RunfolderTests,'test')
302
303
304 if __name__ == "__main__":
305     unittest.main(defaultTest="suite")
306