Recent IPAR xml config blocks include the runfolder name
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta180.py
1 #!/usr/bin/env python
2
3 from datetime import datetime, date
4 import os
5 import tempfile
6 import shutil
7 from unittest2 import TestCase
8
9 from htsworkflow.pipelines import eland
10 from htsworkflow.pipelines import ipar
11 from htsworkflow.pipelines import bustard
12 from htsworkflow.pipelines import gerald
13 from htsworkflow.pipelines import runfolder
14 from htsworkflow.pipelines.samplekey import SampleKey
15 from htsworkflow.pipelines import ElementTree
16
17 from htsworkflow.pipelines.test.simulate_runfolder import *
18
19
20 def make_runfolder(obj=None):
21     """
22     Make a fake runfolder, attach all the directories to obj if defined
23     """
24     # make a fake runfolder directory
25     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
26
27     flowcell_id = '4286GAAXX'
28     runfolder = '090608_HWI-EAS229_0117_{0}'.format(flowcell_id)
29     runfolder_dir = os.path.join(temp_dir, runfolder)
30     os.mkdir(runfolder_dir)
31
32     data_dir = os.path.join(runfolder_dir, 'Data')
33     os.mkdir(data_dir)
34
35     intensities_dir = make_rta_intensities_1870(data_dir)
36
37     basecalls_dir = make_rta_basecalls_1870(intensities_dir)
38     make_matrix_dir_rta160(basecalls_dir)
39
40     gerald_dir = os.path.join(basecalls_dir,
41                               'GERALD_07-09-2010_diane')
42     os.mkdir(gerald_dir)
43     make_gerald_config_100(gerald_dir)
44     make_summary_rta160_xml(gerald_dir)
45     make_eland_export(gerald_dir, lane_list=[1,2,3,4,5,6,])
46     make_scarf(gerald_dir, lane_list=[7,])
47     make_fastq(gerald_dir, lane_list=[8,])
48
49     if obj is not None:
50         obj.flowcell_id = flowcell_id
51         obj.temp_dir = temp_dir
52         obj.runfolder = runfolder
53         obj.runfolder_dir = runfolder_dir
54         obj.data_dir = data_dir
55         obj.image_analysis_dir = intensities_dir
56         obj.bustard_dir = basecalls_dir
57         obj.gerald_dir = gerald_dir
58
59
60 class RunfolderTests(TestCase):
61     """
62     Test components of the runfolder processing code
63     which includes firecrest, bustard, and gerald
64     """
65     def setUp(self):
66         # attaches all the directories to the object passed in
67         make_runfolder(self)
68
69     def tearDown(self):
70         shutil.rmtree(self.temp_dir)
71
72     def test_bustard(self):
73         """Construct a bustard object"""
74         b = bustard.bustard(self.bustard_dir)
75         self.failUnlessEqual(b.software, 'RTA')
76         self.failUnlessEqual(b.version, '1.8.70.0')
77         self.failUnlessEqual(b.date,    None)
78         self.failUnlessEqual(b.user,    None)
79         self.failUnlessEqual(len(b.phasing), 0)
80
81         xml = b.get_elements()
82         b2 = bustard.Bustard(xml=xml)
83         self.failUnlessEqual(b.software, b2.software)
84         self.failUnlessEqual(b.version, b2.version)
85         self.failUnlessEqual(b.date,    b2.date )
86         self.failUnlessEqual(b.user,    b2.user)
87
88     def test_gerald(self):
89         # need to update gerald and make tests for it
90         g = gerald.gerald(self.gerald_dir)
91
92         self.failUnlessEqual(g.software, 'GERALD')
93         self.failUnlessEqual(g.version, '1.171')
94         self.failUnlessEqual(g.date, datetime(2009,2,22,21,15,59))
95         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
96         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
97
98
99         # list of genomes, matches what was defined up in
100         # make_gerald_config.
101         # the first None is to offset the genomes list to be 1..9
102         # instead of pythons default 0..8
103         genomes = [None,
104                    '/g/mm9',
105                    '/g/mm9',
106                    '/g/elegans190',
107                    '/g/arabidopsis01222004',
108                    '/g/mm9',
109                    '/g/mm9',
110                    '/g/mm9',
111                    '/g/mm9', ]
112
113         # test lane specific parameters from gerald config file
114         for i in range(1,9):
115             cur_lane = g.lanes[i]
116             self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
117             self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
118             self.failUnlessEqual(cur_lane.read_length, '37')
119             self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
120
121         # I want to be able to use a simple iterator
122         for l in g.lanes.values():
123           self.failUnlessEqual(l.analysis, 'eland_extended')
124           self.failUnlessEqual(l.read_length, '37')
125           self.failUnlessEqual(l.use_bases, 'Y'*37)
126
127         # test data extracted from summary file
128         clusters = [None,
129                     (281331, 11169), (203841, 13513),
130                     (220889, 15653), (137294, 14666),
131                     (129388, 14525), (262092, 10751),
132                     (185754, 13503), (233765, 9537),]
133
134         self.failUnlessEqual(len(g.summary), 1)
135         for i in range(1,9):
136             summary_lane = g.summary[0][i]
137             self.failUnlessEqual(summary_lane.cluster, clusters[i])
138             self.failUnlessEqual(summary_lane.lane, i)
139
140         xml = g.get_elements()
141         # just make sure that element tree can serialize the tree
142         xml_str = ElementTree.tostring(xml)
143         g2 = gerald.Gerald(xml=xml)
144
145         # do it all again after extracting from the xml file
146         self.failUnlessEqual(g.software, g2.software)
147         self.failUnlessEqual(g.version, g2.version)
148         self.failUnlessEqual(g.date, g2.date)
149         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
150         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
151
152         # test lane specific parameters from gerald config file
153         for i in range(1,9):
154             g_lane = g.lanes[i]
155             g2_lane = g2.lanes[i]
156             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
157             self.failUnlessEqual(g_lane.eland_genome, g2_lane.eland_genome)
158             self.failUnlessEqual(g_lane.read_length, g2_lane.read_length)
159             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
160
161         # test (some) summary elements
162         self.failUnlessEqual(len(g.summary), 1)
163         for i in range(1,9):
164             g_summary = g.summary[0][i]
165             g2_summary = g2.summary[0][i]
166             self.failUnlessEqual(g_summary.cluster, g2_summary.cluster)
167             self.failUnlessEqual(g_summary.lane, g2_summary.lane)
168
169             g_eland = g.eland_results
170             g2_eland = g2.eland_results
171             for lane in g_eland:
172                 g_results = g_eland[lane]
173                 g2_results = g2_eland[lane]
174                 self.failUnlessEqual(g_results.reads,
175                                      g2_results.reads)
176                 if isinstance(g_results, eland.ElandLane):
177                   self.failUnlessEqual(len(g_results.mapped_reads),
178                                        len(g2_results.mapped_reads))
179                   for k in g_results.mapped_reads.keys():
180                       self.failUnlessEqual(g_results.mapped_reads[k],
181                                            g2_results.mapped_reads[k])
182
183                   self.failUnlessEqual(len(g_results.match_codes),
184                                        len(g2_results.match_codes))
185                   for k in g_results.match_codes.keys():
186                       self.failUnlessEqual(g_results.match_codes[k],
187                                            g2_results.match_codes[k])
188
189
190     def test_eland(self):
191         hg_map = {'Lambda.fa': 'Lambda.fa'}
192         for i in range(1,22):
193           short_name = 'chr%d.fa' % (i,)
194           long_name = 'hg18/chr%d.fa' % (i,)
195           hg_map[short_name] = long_name
196
197         genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
198                         5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
199         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
200
201         # I added sequence lanes to the last 2 lanes of this test case
202         keys = [SampleKey(lane=i, read=1, sample='s') for i in range(1,7)]
203         for key in keys:
204             lane = eland_container[key]
205             self.failUnlessEqual(lane.reads, 28)
206             self.failUnlessEqual(lane.sample_name, "s")
207             self.failUnlessEqual(lane.lane_id, key.lane)
208             self.failUnlessEqual(len(lane.mapped_reads), 7)
209             self.failUnlessEqual(lane.mapped_reads['hg18/chr7.fa'], 4)
210             self.failUnlessEqual(lane.mapped_reads['Lambda_1-1_11936nts.fa'], 1)
211             self.failUnlessEqual(lane.match_codes['U0'], 1)
212             self.failUnlessEqual(lane.match_codes['R0'], 20)
213             self.failUnlessEqual(lane.match_codes['U1'], 1)
214             self.failUnlessEqual(lane.match_codes['R1'], 2)
215             self.failUnlessEqual(lane.match_codes['U2'], 11)
216             self.failUnlessEqual(lane.match_codes['R2'], 0)
217             self.failUnlessEqual(lane.match_codes['NM'], 2)
218             self.failUnlessEqual(lane.match_codes['QC'], 9)
219
220         # test scarf
221         lane = eland_container[SampleKey(lane=7, read=1, sample='s')]
222         self.failUnlessEqual(lane.reads, 5)
223         self.failUnlessEqual(lane.sample_name, 's')
224         self.failUnlessEqual(lane.lane_id, 7)
225         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
226
227         # test fastq
228         lane = eland_container[SampleKey(lane=8, read=1, sample='s')]
229         self.failUnlessEqual(lane.reads, 3)
230         self.failUnlessEqual(lane.sample_name, 's')
231         self.failUnlessEqual(lane.lane_id, 8)
232         self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
233
234         xml = eland_container.get_elements()
235         # just make sure that element tree can serialize the tree
236         xml_str = ElementTree.tostring(xml)
237         e2 = gerald.ELAND(xml=xml)
238
239         for key in eland_container:
240             l1 = eland_container[key]
241             l2 = e2.results[key]
242             self.failUnlessEqual(l1.reads, l2.reads)
243             self.failUnlessEqual(l1.sample_name, l2.sample_name)
244             self.failUnlessEqual(l1.lane_id, l2.lane_id)
245             if isinstance(l1, eland.ElandLane):
246               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
247               self.failUnlessEqual(len(l1.mapped_reads), 7)
248               for k in l1.mapped_reads.keys():
249                   self.failUnlessEqual(l1.mapped_reads[k],
250                                        l2.mapped_reads[k])
251
252               self.failUnlessEqual(len(l1.match_codes), 9)
253               self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
254               for k in l1.match_codes.keys():
255                   self.failUnlessEqual(l1.match_codes[k],
256                                        l2.match_codes[k])
257             elif isinstance(l1, eland.SequenceLane):
258                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
259
260     def test_runfolder(self):
261         runs = runfolder.get_runs(self.runfolder_dir)
262
263         # do we get the flowcell id from the filename?
264         self.failUnlessEqual(len(runs), 1)
265         name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)        
266         self.failUnlessEqual(runs[0].serialization_filename, name)
267         self.assertEqual(runs[0].runfolder_name, '090220_HWI-EAS229_0093_30VR0AAXX')
268
269         # do we get the flowcell id from the FlowcellId.xml file
270         make_flowcell_id(self.runfolder_dir, '207BTAAXY')
271         runs = runfolder.get_runs(self.runfolder_dir)
272         self.failUnlessEqual(len(runs), 1)
273         name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
274         self.failUnlessEqual(runs[0].serialization_filename, name)
275
276
277         r1 = runs[0]
278         xml = r1.get_elements()
279         xml_str = ElementTree.tostring(xml)
280
281         r2 = runfolder.PipelineRun(xml=xml)
282         self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
283         self.failIfEqual(r2.image_analysis, None)
284         self.failIfEqual(r2.bustard, None)
285         self.failIfEqual(r2.gerald, None)
286
287
288 def suite():
289     from unittest2 import TestSuite, defaultTestLoader
290     suite = TestSuite()
291     suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
292     return suite
293
294
295 if __name__ == "__main__":
296     from unittest2 import main
297     main(defaultTest="suite")