take advantage of absolute_import to simplify import statements
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta180.py
1 #!/usr/bin/env python
2 from __future__ import absolute_import
3
4 from datetime import datetime, date
5 import os
6 import tempfile
7 import shutil
8 from unittest import TestCase
9
10 from htsworkflow.pipelines import eland
11 from htsworkflow.pipelines import ipar
12 from htsworkflow.pipelines import bustard
13 from htsworkflow.pipelines import gerald
14 from htsworkflow.pipelines import runfolder
15 from htsworkflow.pipelines.samplekey import SampleKey
16 from htsworkflow.pipelines import ElementTree
17
18 from .simulate_runfolder import *
19
20
21 def make_runfolder(obj=None):
22     """
23     Make a fake runfolder, attach all the directories to obj if defined
24     """
25     # make a fake runfolder directory
26     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
27
28     flowcell_id = '4286GAAXX'
29     runfolder = '090608_HWI-EAS229_0117_{0}'.format(flowcell_id)
30     runfolder_dir = os.path.join(temp_dir, runfolder)
31     os.mkdir(runfolder_dir)
32
33     data_dir = os.path.join(runfolder_dir, 'Data')
34     os.mkdir(data_dir)
35
36     intensities_dir = make_rta_intensities_1870(data_dir)
37
38     basecalls_dir = make_rta_basecalls_1870(intensities_dir)
39     make_matrix_dir_rta160(basecalls_dir)
40
41     gerald_dir = os.path.join(basecalls_dir,
42                               'GERALD_07-09-2010_diane')
43     os.mkdir(gerald_dir)
44     make_gerald_config_100(gerald_dir)
45     make_summary_rta160_xml(gerald_dir)
46     make_eland_export(gerald_dir, lane_list=[1,2,3,4,5,6,])
47     make_scarf(gerald_dir, lane_list=[7,])
48     make_fastq(gerald_dir, lane_list=[8,])
49
50     if obj is not None:
51         obj.flowcell_id = flowcell_id
52         obj.temp_dir = temp_dir
53         obj.runfolder = runfolder
54         obj.runfolder_dir = runfolder_dir
55         obj.data_dir = data_dir
56         obj.image_analysis_dir = intensities_dir
57         obj.bustard_dir = basecalls_dir
58         obj.gerald_dir = gerald_dir
59
60
61 class RunfolderTests(TestCase):
62     """
63     Test components of the runfolder processing code
64     which includes firecrest, bustard, and gerald
65     """
66     def setUp(self):
67         # attaches all the directories to the object passed in
68         make_runfolder(self)
69
70     def tearDown(self):
71         shutil.rmtree(self.temp_dir)
72
73     def test_bustard(self):
74         """Construct a bustard object"""
75         b = bustard.bustard(self.bustard_dir)
76         self.failUnlessEqual(b.software, 'RTA')
77         self.failUnlessEqual(b.version, '1.8.70.0')
78         self.failUnlessEqual(b.date,    None)
79         self.failUnlessEqual(b.user,    None)
80         self.failUnlessEqual(len(b.phasing), 0)
81
82         xml = b.get_elements()
83         b2 = bustard.Bustard(xml=xml)
84         self.failUnlessEqual(b.software, b2.software)
85         self.failUnlessEqual(b.version, b2.version)
86         self.failUnlessEqual(b.date,    b2.date )
87         self.failUnlessEqual(b.user,    b2.user)
88
89     def test_gerald(self):
90         # need to update gerald and make tests for it
91         g = gerald.gerald(self.gerald_dir)
92
93         self.failUnlessEqual(g.software, 'GERALD')
94         self.failUnlessEqual(g.version, '1.171')
95         self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
96         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
97         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
98
99
100         # list of genomes, matches what was defined up in
101         # make_gerald_config.
102         # the first None is to offset the genomes list to be 1..9
103         # instead of pythons default 0..8
104         genomes = [None,
105                    '/g/mm9',
106                    '/g/mm9',
107                    '/g/elegans190',
108                    '/g/arabidopsis01222004',
109                    '/g/mm9',
110                    '/g/mm9',
111                    '/g/mm9',
112                    '/g/mm9', ]
113
114         # test lane specific parameters from gerald config file
115         for i in range(1,9):
116             cur_lane = g.lanes[i]
117             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
118             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
119             self.failUnlessEqual(cur_lane.read_length, '37')
120             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
121
122         # I want to be able to use a simple iterator
123         for l in g.lanes.values():
124           self.failUnlessEqual(l.analysis, 'eland_extended')
125           self.failUnlessEqual(l.read_length, '37')
126           self.failUnlessEqual(l.use_bases, 'Y'*37)
127
128         # test data extracted from summary file
129         clusters = [None,
130                     (281331, 11169), (203841, 13513),
131                     (220889, 15653), (137294, 14666),
132                     (129388, 14525), (262092, 10751),
133                     (185754, 13503), (233765, 9537),]
134
135         self.failUnlessEqual(len(g.summary), 1)
136         for i in range(1,9):
137             summary_lane = g.summary[0][i]
138             self.failUnlessEqual(summary_lane.cluster, clusters[i])
139             self.failUnlessEqual(summary_lane.lane, i)
140
141         xml = g.get_elements()
142         # just make sure that element tree can serialize the tree
143         xml_str = ElementTree.tostring(xml)
144         g2 = gerald.Gerald(xml=xml)
145
146         # do it all again after extracting from the xml file
147         self.failUnlessEqual(g.software, g2.software)
148         self.failUnlessEqual(g.version, g2.version)
149         self.failUnlessEqual(g.date, g2.date)
150         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
151         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
152
153         # test lane specific parameters from gerald config file
154         for i in range(1,9):
155             g_lane = g.lanes[i]
156             g2_lane = g2.lanes[i]
157             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
158             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
159             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
160             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
161
162         # test (some) summary elements
163         self.failUnlessEqual(len(g.summary), 1)
164         for i in range(1,9):
165             g_summary = g.summary[0][i]
166             g2_summary = g2.summary[0][i]
167             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
168             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
169
170             g_eland = g.eland_results
171             g2_eland = g2.eland_results
172             for lane in g_eland:
173                 g_results = g_eland[lane]
174                 g2_results = g2_eland[lane]
175                 self.failUnlessEqual(g_results.reads,
176                                      g2_results.reads)
177                 if isinstance(g_results, eland.ElandLane):
178                   self.failUnlessEqual(len(g_results.mapped_reads),
179                                        len(g2_results.mapped_reads))
180                   for k in g_results.mapped_reads.keys():
181                       self.failUnlessEqual(g_results.mapped_reads[k],
182                                            g2_results.mapped_reads[k])
183
184                   self.failUnlessEqual(len(g_results.match_codes),
185                                        len(g2_results.match_codes))
186                   for k in g_results.match_codes.keys():
187                       self.failUnlessEqual(g_results.match_codes[k],
188                                            g2_results.match_codes[k])
189
190
191     def test_eland(self):
192         hg_map = {'Lambda.fa': 'Lambda.fa'}
193         for i in range(1,22):
194           short_name = 'chr%d.fa' % (i,)
195           long_name = 'hg18/chr%d.fa' % (i,)
196           hg_map[short_name] = long_name
197
198         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
199                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
200         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
201
202         # I added sequence lanes to the last 2 lanes of this test case
203         keys = [SampleKey(lane=i, read=1, sample='s') for i in range(1,7)]
204         for key in keys:
205             lane = eland_container[key]
206             self.failUnlessEqual(lane.reads, 28)
207             self.failUnlessEqual(lane.sample_name, "s")
208             self.failUnlessEqual(lane.lane_id, key.lane)
209             self.failUnlessEqual(len(lane.mapped_reads), 7)
210             self.failUnlessEqual(lane.mapped_reads['hg18/chr7.fa'], 4)
211             self.failUnlessEqual(lane.mapped_reads['Lambda_1-1_11936nts.fa'], 1)
212             self.failUnlessEqual(lane.match_codes['U0'], 1)
213             self.failUnlessEqual(lane.match_codes['R0'], 20)
214             self.failUnlessEqual(lane.match_codes['U1'], 1)
215             self.failUnlessEqual(lane.match_codes['R1'], 2)
216             self.failUnlessEqual(lane.match_codes['U2'], 11)
217             self.failUnlessEqual(lane.match_codes['R2'], 0)
218             self.failUnlessEqual(lane.match_codes['NM'], 2)
219             self.failUnlessEqual(lane.match_codes['QC'], 9)
220
221         # test scarf
222         lane = eland_container[SampleKey(lane=7, read=1, sample='s')]
223         self.failUnlessEqual(lane.reads, 5)
224         self.failUnlessEqual(lane.sample_name, 's')
225         self.failUnlessEqual(lane.lane_id, 7)
226         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
227
228         # test fastq
229         lane = eland_container[SampleKey(lane=8, read=1, sample='s')]
230         self.failUnlessEqual(lane.reads, 3)
231         self.failUnlessEqual(lane.sample_name, 's')
232         self.failUnlessEqual(lane.lane_id, 8)
233         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
234
235         xml = eland_container.get_elements()
236         # just make sure that element tree can serialize the tree
237         xml_str = ElementTree.tostring(xml)
238         e2 = gerald.ELAND(xml=xml)
239
240         for key in eland_container:
241             l1 = eland_container[key]
242             l2 = e2.results[key]
243             self.failUnlessEqual(l1.reads, l2.reads)
244             self.failUnlessEqual(l1.sample_name, l2.sample_name)
245             self.failUnlessEqual(l1.lane_id, l2.lane_id)
246             if isinstance(l1, eland.ElandLane):
247               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
248               self.failUnlessEqual(len(l1.mapped_reads), 7)
249               for k in l1.mapped_reads.keys():
250                   self.failUnlessEqual(l1.mapped_reads[k],
251                                        l2.mapped_reads[k])
252
253               self.failUnlessEqual(len(l1.match_codes), 9)
254               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
255               for k in l1.match_codes.keys():
256                   self.failUnlessEqual(l1.match_codes[k],
257                                        l2.match_codes[k])
258             elif isinstance(l1, eland.SequenceLane):
259                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
260
261     def test_runfolder(self):
262         runs = runfolder.get_runs(self.runfolder_dir)
263
264         # do we get the flowcell id from the filename?
265         self.failUnlessEqual(len(runs), 1)
266         name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)        
267         self.failUnlessEqual(runs[0].serialization_filename, name)
268         self.assertEqual(runs[0].runfolder_name, '090220_HWI-EAS229_0093_30VR0AAXX')
269
270         # do we get the flowcell id from the FlowcellId.xml file
271         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
272         runs = runfolder.get_runs(self.runfolder_dir)
273         self.failUnlessEqual(len(runs), 1)
274         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
275         self.failUnlessEqual(runs[0].serialization_filename, name)
276
277
278
279         r1 = runs[0]
280         xml = r1.get_elements()
281         xml_str = ElementTree.tostring(xml)
282
283         r2 = runfolder.PipelineRun(xml=xml)
284         self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
285         self.failIfEqual(r2.image_analysis, None)
286         self.failIfEqual(r2.bustard, None)
287         self.failIfEqual(r2.gerald, None)
288
289
290 def suite():
291     from unittest import TestSuite, defaultTestLoader
292     suite = TestSuite()
293     suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
294     return suite
295
296
297 if __name__ == "__main__":
298     from unittest import main
299     main(defaultTest="suite")