Merge commit 'upstream/0.3'
[pysam.git] / tests / tabix_test.py
diff --git a/tests/tabix_test.py b/tests/tabix_test.py
new file mode 100644 (file)
index 0000000..8eb8a60
--- /dev/null
@@ -0,0 +1,225 @@
+#!/usr/bin/env python
+'''unit testing code for pysam.
+
+Execute in the :file:`tests` directory as it requires the Makefile
+and data files located there.
+'''
+
+import sys, os, shutil, gzip
+import pysam
+import unittest
+import itertools
+import subprocess
+
+def checkBinaryEqual( filename1, filename2 ):
+    '''return true if the two files are binary equal.'''
+    if os.path.getsize( filename1 ) !=  os.path.getsize( filename2 ):
+        return False
+
+    infile1 = open(filename1, "rb")
+    infile2 = open(filename2, "rb")
+
+    def chariter( infile ):
+        while 1:
+            c = infile.read(1)
+            if c == "": break
+            yield c
+
+    found = False
+    for c1,c2 in itertools.izip( chariter( infile1), chariter( infile2) ):
+        if c1 != c2: break
+    else:
+        found = True
+
+    infile1.close()
+    infile2.close()
+    return found
+
+class TestIndexing(unittest.TestCase):
+    filename = "example.gtf.gz" 
+    filename_idx = "example.gtf.gz.tbi" 
+
+    def setUp( self ):
+        
+        self.tmpfilename = "tmp_%i.gtf.gz" % id(self)
+        shutil.copyfile( self.filename, self.tmpfilename )
+
+    def testIndexPreset( self ):
+        '''test indexing via preset.'''
+
+        pysam.tabix_index( self.tmpfilename, preset = "gff" )
+        checkBinaryEqual( self.tmpfilename + ".tbi", self.filename_idx )
+
+    def tearDown( self ):
+        os.unlink( self.tmpfilename )
+        os.unlink( self.tmpfilename + ".tbi" )
+
+class TestCompression(unittest.TestCase):
+    filename = "example.gtf.gz" 
+    filename_idx = "example.gtf.gz.tbi" 
+
+    def setUp( self ):
+        
+        self.tmpfilename = "tmp_%i.gtf" % id(self)
+        infile = gzip.open( self.filename, "r")
+        outfile = open( self.tmpfilename, "w" )
+        outfile.write( "".join(infile.readlines()) )
+        outfile.close()
+        infile.close()
+
+    def testIndexPreset( self ):
+        '''test indexing via preset.'''
+
+        pysam.tabix_index( self.tmpfilename, preset = "gff" )
+        checkBinaryEqual( self.tmpfilename + ".gz", self.filename )
+        checkBinaryEqual( self.tmpfilename + ".gz.tbi", self.filename_idx )
+
+    def tearDown( self ):
+        os.unlink( self.tmpfilename + ".gz" )
+        os.unlink( self.tmpfilename + ".gz.tbi" )
+
+class TestIteration( unittest.TestCase ):
+
+    filename = "example.gtf.gz" 
+
+    def setUp( self ):
+
+        self.tabix = pysam.Tabixfile( self.filename )
+        lines = gzip.open(self.filename).readlines()
+        # creates index of contig, start, end, adds content without newline.
+        self.compare = [ 
+            (x[0][0], int(x[0][3]), int(x[0][4]), x[1]) 
+            for x in [ (y.split("\t"), y[:-1]) for y in lines ] ]
+                         
+    def getSubset( self, contig = None, start = None, end = None):
+        
+        if contig == None:
+            # all lines
+            subset = [ x[3] for x in self.compare ]
+        else:
+            if start != None and end == None:
+                # until end of contig
+                subset = [ x[3] for x in self.compare if x[0] == contig and x[2] > start ]
+            elif start == None and end != None:
+                # from start of contig
+                subset = [ x[3] for x in self.compare if x[0] == contig and x[1] <= end ]
+            elif start == None and end == None:
+                subset = [ x[3] for x in self.compare if x[0] == contig ]
+            else:
+                # all within interval
+                subset = [ x[3] for x in self.compare if x[0] == contig and \
+                               min( x[2], end) - max(x[1], start) > 0 ]
+            
+        return subset
+
+    def checkPairwise( self, result, ref ):
+
+        result.sort()
+        ref.sort()
+
+        a = set(result)
+        b = set(ref)
+
+        self.assertEqual( len(result), len(ref),
+                          "unexpected number of results: %i, expected %i, differences are %s: %s" \
+                              % (len(result), len(ref),
+                                 a.difference(b), 
+                                 b.difference(a) ))
+
+        for x, d in enumerate( zip( result, ref )):
+            
+            self.assertEqual( d[0], d[1],
+                              "unexpected results in pair %i: '%s', expected '%s'" % \
+                                  (x, 
+                                   d[0], 
+                                   d[1]) )
+
+
+    def testAll( self ):
+        result = list(self.tabix.fetch())
+        ref = self.getSubset( )
+        self.checkPairwise( result, ref )
+
+    def testPerContig( self ):
+        for contig in ("chr1", "chr2", "chr1", "chr2" ):
+            result = list(self.tabix.fetch( contig ))
+            ref = self.getSubset( contig )
+            self.checkPairwise( result, ref )
+            
+    def testPerContigToEnd( self ):
+        
+        end = None
+        for contig in ("chr1", "chr2", "chr1", "chr2" ):
+            for start in range( 0, 200000, 1000):
+                result = list(self.tabix.fetch( contig, start, end ))
+                ref = self.getSubset( contig, start, end )
+                self.checkPairwise( result, ref )
+
+    def testPerContigFromStart( self ):
+        
+        start = None
+        for contig in ("chr1", "chr2", "chr1", "chr2" ):
+            for end in range( 0, 200000, 1000):
+                result = list(self.tabix.fetch( contig, start, end ))
+                ref = self.getSubset( contig, start, end )
+                self.checkPairwise( result, ref )
+
+    def testPerContig( self ):
+        
+        start, end  = None, None
+        for contig in ("chr1", "chr2", "chr1", "chr2" ):
+            result = list(self.tabix.fetch( contig, start, end ))
+            ref = self.getSubset( contig, start, end )
+            self.checkPairwise( result, ref )
+                
+    def testPerInterval( self ):
+        
+        start, end  = None, None
+        for contig in ("chr1", "chr2", "chr1", "chr2" ):
+            for start in range( 0, 200000, 2000):
+                for end in range( start, start + 2000, 500):
+                    result = list(self.tabix.fetch( contig, start, end ))
+                    ref = self.getSubset( contig, start, end )
+                    self.checkPairwise( result, ref )
+                
+
+    def testInvalidIntervals( self ):
+        
+        self.assertRaises( ValueError, self.tabix.fetch, "chr1", 0, -10)
+        self.assertRaises( ValueError, self.tabix.fetch, "chr1", -10, 200)
+        self.assertRaises( ValueError, self.tabix.fetch, "chr1", 200, 0)
+        self.assertRaises( ValueError, self.tabix.fetch, "chr1", -10, -20)
+        self.assertRaises( ValueError, self.tabix.fetch, "chrUn" )
+
+    def testGetContigs( self ):
+        self.assertEqual( sorted(self.tabix.contigs), ["chr1", "chr2"] )
+        # check that contigs is read-only
+        self.assertRaises( AttributeError, setattr, self.tabix, "contigs", ["chr1", "chr2"] )
+
+class TestParser( unittest.TestCase ):
+
+    filename = "example.gtf.gz" 
+
+    def setUp( self ):
+
+        self.tabix = pysam.Tabixfile( self.filename )
+        self.compare = [ x[:-1].split("\t") for x in gzip.open( self.filename, "r") ]
+
+    def testGTF( self ):
+
+        for x, r in enumerate(self.tabix.fetch( parser = pysam.asGTF() )):
+            self.assertEqual( "\t".join( self.compare[x]), str(r) )
+
+    def testTuple( self ):
+
+        for x, r in enumerate(self.tabix.fetch( parser = pysam.asTuple() )):
+            self.assertEqual( self.compare[x], list(r) )
+
+            self.assertEqual( len(self.compare[x]), len(r) )
+            for c in range(0,len(r)):
+                self.assertEqual( self.compare[x][c], r[c] )
+
+if __name__ == "__main__":
+    unittest.main()
+
+