Imported Upstream version 0.6
[pysam.git] / tests / tabix_test.py
1 #!/usr/bin/env python
2 '''unit testing code for pysam.
3
4 Execute in the :file:`tests` directory as it requires the Makefile
5 and data files located there.
6 '''
7
8 import sys, os, shutil, gzip
9 import pysam
10 import unittest
11 import itertools
12 import subprocess
13
14 def checkBinaryEqual( filename1, filename2 ):
15     '''return true if the two files are binary equal.'''
16     if os.path.getsize( filename1 ) !=  os.path.getsize( filename2 ):
17         return False
18
19     infile1 = open(filename1, "rb")
20     infile2 = open(filename2, "rb")
21
22     def chariter( infile ):
23         while 1:
24             c = infile.read(1)
25             if c == "": break
26             yield c
27
28     found = False
29     for c1,c2 in itertools.izip( chariter( infile1), chariter( infile2) ):
30         if c1 != c2: break
31     else:
32         found = True
33
34     infile1.close()
35     infile2.close()
36     return found
37
38 class TestIndexing(unittest.TestCase):
39     filename = "example.gtf.gz" 
40     filename_idx = "example.gtf.gz.tbi" 
41
42     def setUp( self ):
43         
44         self.tmpfilename = "tmp_%i.gtf.gz" % id(self)
45         shutil.copyfile( self.filename, self.tmpfilename )
46
47     def testIndexPreset( self ):
48         '''test indexing via preset.'''
49
50         pysam.tabix_index( self.tmpfilename, preset = "gff" )
51         checkBinaryEqual( self.tmpfilename + ".tbi", self.filename_idx )
52
53     def tearDown( self ):
54         os.unlink( self.tmpfilename )
55         os.unlink( self.tmpfilename + ".tbi" )
56
57 class TestCompression(unittest.TestCase):
58     filename = "example.gtf.gz" 
59     filename_idx = "example.gtf.gz.tbi" 
60
61     def setUp( self ):
62         
63         self.tmpfilename = "tmp_%i.gtf" % id(self)
64         infile = gzip.open( self.filename, "r")
65         outfile = open( self.tmpfilename, "w" )
66         outfile.write( "".join(infile.readlines()) )
67         outfile.close()
68         infile.close()
69
70     def testIndexPreset( self ):
71         '''test indexing via preset.'''
72
73         pysam.tabix_index( self.tmpfilename, preset = "gff" )
74         checkBinaryEqual( self.tmpfilename + ".gz", self.filename )
75         checkBinaryEqual( self.tmpfilename + ".gz.tbi", self.filename_idx )
76
77     def tearDown( self ):
78         os.unlink( self.tmpfilename + ".gz" )
79         os.unlink( self.tmpfilename + ".gz.tbi" )
80
81 class TestIteration( unittest.TestCase ):
82
83     filename = "example.gtf.gz" 
84
85     def setUp( self ):
86
87         self.tabix = pysam.Tabixfile( self.filename )
88         lines = [ x for x in gzip.open(self.filename).readlines() if not x.startswith("#") ]
89         # creates index of contig, start, end, adds content without newline.
90         self.compare = [ 
91             (x[0][0], int(x[0][3]), int(x[0][4]), x[1]) 
92             for x in [ (y.split("\t"), y[:-1]) for y in lines ] ]
93                          
94     def getSubset( self, contig = None, start = None, end = None):
95         
96         if contig == None:
97             # all lines
98             subset = [ x[3] for x in self.compare ]
99         else:
100             if start != None and end == None:
101                 # until end of contig
102                 subset = [ x[3] for x in self.compare if x[0] == contig and x[2] > start ]
103             elif start == None and end != None:
104                 # from start of contig
105                 subset = [ x[3] for x in self.compare if x[0] == contig and x[1] <= end ]
106             elif start == None and end == None:
107                 subset = [ x[3] for x in self.compare if x[0] == contig ]
108             else:
109                 # all within interval
110                 subset = [ x[3] for x in self.compare if x[0] == contig and \
111                                min( x[2], end) - max(x[1], start) > 0 ]
112             
113         return subset
114
115     def checkPairwise( self, result, ref ):
116
117         result.sort()
118         ref.sort()
119
120         a = set(result)
121         b = set(ref)
122
123         self.assertEqual( len(result), len(ref),
124                           "unexpected number of results: %i, expected %i, differences are %s: %s" \
125                               % (len(result), len(ref),
126                                  a.difference(b), 
127                                  b.difference(a) ))
128
129         for x, d in enumerate( zip( result, ref )):
130             self.assertEqual( d[0], d[1],
131                               "unexpected results in pair %i: '%s', expected '%s'" % \
132                                   (x, 
133                                    d[0], 
134                                    d[1]) )
135
136
137     def testAll( self ):
138         result = list(self.tabix.fetch())
139         ref = self.getSubset( )
140         self.checkPairwise( result, ref )
141
142     def testPerContig( self ):
143         for contig in ("chr1", "chr2", "chr1", "chr2" ):
144             result = list(self.tabix.fetch( contig ))
145             ref = self.getSubset( contig )
146             self.checkPairwise( result, ref )
147             
148     def testPerContigToEnd( self ):
149         
150         end = None
151         for contig in ("chr1", "chr2", "chr1", "chr2" ):
152             for start in range( 0, 200000, 1000):
153                 result = list(self.tabix.fetch( contig, start, end ))
154                 ref = self.getSubset( contig, start, end )
155                 self.checkPairwise( result, ref )
156
157     def testPerContigFromStart( self ):
158         
159         start = None
160         for contig in ("chr1", "chr2", "chr1", "chr2" ):
161             for end in range( 0, 200000, 1000):
162                 result = list(self.tabix.fetch( contig, start, end ))
163                 ref = self.getSubset( contig, start, end )
164                 self.checkPairwise( result, ref )
165
166     def testPerContig( self ):
167         
168         start, end  = None, None
169         for contig in ("chr1", "chr2", "chr1", "chr2" ):
170             result = list(self.tabix.fetch( contig, start, end ))
171             ref = self.getSubset( contig, start, end )
172             self.checkPairwise( result, ref )
173                 
174     def testPerInterval( self ):
175         
176         start, end  = None, None
177         for contig in ("chr1", "chr2", "chr1", "chr2" ):
178             for start in range( 0, 200000, 2000):
179                 for end in range( start, start + 2000, 500):
180                     result = list(self.tabix.fetch( contig, start, end ))
181                     ref = self.getSubset( contig, start, end )
182                     self.checkPairwise( result, ref )
183                 
184
185     def testInvalidIntervals( self ):
186         
187         self.assertRaises( ValueError, self.tabix.fetch, "chr1", 0, -10)
188         self.assertRaises( ValueError, self.tabix.fetch, "chr1", -10, 200)
189         self.assertRaises( ValueError, self.tabix.fetch, "chr1", 200, 0)
190         self.assertRaises( ValueError, self.tabix.fetch, "chr1", -10, -20)
191         self.assertRaises( ValueError, self.tabix.fetch, "chrUn" )
192
193     def testGetContigs( self ):
194         self.assertEqual( sorted(self.tabix.contigs), ["chr1", "chr2"] )
195         # check that contigs is read-only
196         self.assertRaises( AttributeError, setattr, self.tabix, "contigs", ["chr1", "chr2"] )
197
198     def testHeader( self ):
199         ref = []
200         for x in gzip.open( self.filename ):
201             if not x.startswith("#"): break
202             ref.append( x[:-1] )
203         header = list( self.tabix.header )
204         self.assertEqual( ref, header )
205
206     def testReopening( self ):
207         '''test repeated opening of the same file.'''
208         def func1():
209             # opens any tabix file
210             inf = pysam.Tabixfile(self.filename)
211             return
212
213         for i in range(10000):
214             func1()
215
216
217 class TestParser( unittest.TestCase ):
218
219     filename = "example.gtf.gz" 
220
221     def setUp( self ):
222
223         self.tabix = pysam.Tabixfile( self.filename )
224         self.compare = [ x[:-1].split("\t") for x in gzip.open( self.filename, "r") if not x.startswith("#") ]
225
226     def testRead( self ):
227
228         for x, r in enumerate(self.tabix.fetch( parser = pysam.asTuple() )):
229             self.assertEqual( self.compare[x], list(r) )
230             self.assertEqual( len(self.compare[x]), len(r) )
231
232             # test indexing
233             for c in range(0,len(r)):
234                 self.assertEqual( self.compare[x][c], r[c] )
235
236             # test slicing access
237             for c in range(0, len(r)-1):
238                 for cc in range(c+1, len(r)):
239                     self.assertEqual( self.compare[x][c:cc],
240                                       r[c:cc] )
241
242     def testWrite( self ):
243         
244         for x, r in enumerate(self.tabix.fetch( parser = pysam.asTuple() )):
245             self.assertEqual( self.compare[x], list(r) )
246             c = list(r)
247             for y in range(len(r)):
248                 r[y] = "test_%05i" % y
249                 c[y] = "test_%05i" % y
250             self.assertEqual( c, list(r) )
251             self.assertEqual( "\t".join( c ), str(r) )
252             # check second assignment
253             for y in range(len(r)):
254                 r[y] = "test_%05i" % y
255             self.assertEqual( c, list(r) )
256             self.assertEqual( "\t".join( c ), str(r) )
257
258     def testUnset( self ):
259         for x, r in enumerate(self.tabix.fetch( parser = pysam.asTuple() )):
260             self.assertEqual( self.compare[x], list(r) )
261             c = list(r)
262             e = list(r)
263             for y in range(len(r)):
264                 r[y] = c[y] = None
265                 e[y] = ""
266                 self.assertEqual( c, list(r) )
267                 self.assertEqual( "\t".join(e), str(r) )
268
269 class TestGTF( TestParser ):
270
271     def testRead( self ):
272
273         for x, r in enumerate(self.tabix.fetch( parser = pysam.asGTF() )):
274             c =  self.compare[x]
275             
276             self.assertEqual( len(c), len(r) )
277             self.assertEqual( "\t".join(c), str(r) )
278             self.assertTrue( r.gene_id.startswith("ENSG") )
279             if r.feature != "gene":
280                 self.assertTrue( r.transcript_id.startswith("ENST") )
281             self.assertEqual( c[0], r.contig )
282
283 class TestBed( unittest.TestCase ):
284     filename = "example.bed.gz"
285
286     def setUp( self ):
287
288         self.tabix = pysam.Tabixfile( self.filename )
289         self.compare = [ x[:-1].split("\t") for x in gzip.open( self.filename, "r") if not x.startswith("#") ]
290
291     def testRead( self ):
292
293         for x, r in enumerate(self.tabix.fetch( parser = pysam.asBed() )):
294             c = self.compare[x]
295             self.assertEqual( "\t".join( c ), str(r) )
296             self.assertEqual( list(c), list(r) )
297             self.assertEqual( c[0], r.contig)
298             self.assertEqual( int(c[1]), r.start)
299             self.assertEqual( int(c[2]), r.end)
300
301     def testWrite( self ):
302
303         for x, r in enumerate(self.tabix.fetch( parser = pysam.asBed() )):
304             c = self.compare[x]
305             self.assertEqual( "\t".join( c ), str(r) )
306             self.assertEqual( list(c), list(r) )
307
308             r.contig = "test"
309             self.assertEqual( "test", r.contig)
310             self.assertEqual( "test", r[0])
311
312             r.start += 1
313             self.assertEqual( int(c[1]) + 1, r.start )
314             self.assertEqual( str(int(c[1]) + 1), r[1] )
315
316             r.end += 1
317             self.assertEqual( int(c[2]) + 1, r.end )
318             self.assertEqual( str(int(c[2]) + 1), r[2] )
319
320 class TestVCF( TestParser ):
321
322     filename = "example.vcf40.gz"
323     columns = ("contig", "pos", "id", 
324                "ref", "alt", "qual", 
325                "filter", "info", "format" )
326
327     def testRead( self ):
328         
329         ncolumns = len(self.columns) 
330
331         for x, r in enumerate(self.tabix.fetch( parser = pysam.asVCF() )):
332             c = self.compare[x]
333             for y, field in enumerate( self.columns ):
334                 if field == "pos":
335                     self.assertEqual( int(c[y]) - 1, getattr( r, field ) )
336                     self.assertEqual( int(c[y]) - 1, r.pos )
337                 else:
338                     self.assertEqual( c[y], getattr( r, field ), 
339                                       "mismatch in field %s: %s != %s" %\
340                                           ( field,c[y], getattr( r, field ) ) )
341             self.assertEqual( len(c), len( r ) + ncolumns )
342             
343             for y in range(len(c) - ncolumns):
344                 self.assertEqual( c[ncolumns+y], r[y] )
345                 
346     def testWrite( self ):
347
348         ncolumns = len(self.columns) 
349
350         for x, r in enumerate(self.tabix.fetch( parser = pysam.asVCF() )):
351
352             c = self.compare[x]
353
354             # check unmodified string
355             ref_string = "\t".join( c )
356             cmp_string = str(r)
357             self.assertEqual( ref_string, cmp_string )
358
359             # set fields and compare field-wise
360             for y, field in enumerate( self.columns ):
361                 if field == "pos":
362                     rpos = getattr( r, field )
363                     self.assertEqual( int(c[y]) - 1, rpos )
364                     self.assertEqual( int(c[y]) - 1, r.pos )
365                     # increment pos by 1
366                     setattr( r, field, rpos + 1 )
367                     self.assertEqual( getattr( r, field ), rpos + 1 )
368                     c[y] = str(int(c[y]) + 1 ) 
369                 else:
370                     setattr( r, field, "test_%i" % y)
371                     c[y] = "test_%i" % y
372                     self.assertEqual( c[y], getattr( r, field ), 
373                                       "mismatch in field %s: %s != %s" %\
374                                           ( field,c[y], getattr( r, field ) ) )
375
376             self.assertEqual( len(c), len( r ) + ncolumns )
377             
378             for y in range(len(c) - ncolumns):
379                 c[ncolumns+y] = "test_%i" % y
380                 r[y] = "test_%i" % y
381                 self.assertEqual( c[ncolumns+y], r[y] )
382
383 class TestVCF( TestParser ):
384
385     filename = "example.vcf40.gz"
386
387     def testOpening( self ):
388         while 1:
389             infile = pysam.Tabixfile( self.filename )
390             infile.close()
391
392                 
393             # check strings
394             ref_string = "\t".join( c )
395             cmp_string = str(r)
396             
397             self.assertEqual( ref_string, cmp_string )
398
399 if __name__ == "__main__":
400
401     unittest.main()
402
403