Imported Upstream version 0.6
[pysam.git] / tests / tabix_test.py
index 8eb8a60a618d4865c07d5c853c7f2e37226e8cd6..2caa628660f74a4389a23bf8b09c8c9e55266dee 100644 (file)
@@ -85,7 +85,7 @@ class TestIteration( unittest.TestCase ):
     def setUp( self ):
 
         self.tabix = pysam.Tabixfile( self.filename )
-        lines = gzip.open(self.filename).readlines()
+        lines = [ x for x in gzip.open(self.filename).readlines() if not x.startswith("#") ]
         # creates index of contig, start, end, adds content without newline.
         self.compare = [ 
             (x[0][0], int(x[0][3]), int(x[0][4]), x[1]) 
@@ -127,7 +127,6 @@ class TestIteration( unittest.TestCase ):
                                  b.difference(a) ))
 
         for x, d in enumerate( zip( result, ref )):
-            
             self.assertEqual( d[0], d[1],
                               "unexpected results in pair %i: '%s', expected '%s'" % \
                                   (x, 
@@ -196,6 +195,25 @@ class TestIteration( unittest.TestCase ):
         # check that contigs is read-only
         self.assertRaises( AttributeError, setattr, self.tabix, "contigs", ["chr1", "chr2"] )
 
+    def testHeader( self ):
+        ref = []
+        for x in gzip.open( self.filename ):
+            if not x.startswith("#"): break
+            ref.append( x[:-1] )
+        header = list( self.tabix.header )
+        self.assertEqual( ref, header )
+
+    def testReopening( self ):
+        '''test repeated opening of the same file.'''
+        def func1():
+            # opens any tabix file
+            inf = pysam.Tabixfile(self.filename)
+            return
+
+        for i in range(10000):
+            func1()
+
+
 class TestParser( unittest.TestCase ):
 
     filename = "example.gtf.gz" 
@@ -203,23 +221,183 @@ class TestParser( unittest.TestCase ):
     def setUp( self ):
 
         self.tabix = pysam.Tabixfile( self.filename )
-        self.compare = [ x[:-1].split("\t") for x in gzip.open( self.filename, "r") ]
-
-    def testGTF( self ):
+        self.compare = [ x[:-1].split("\t") for x in gzip.open( self.filename, "r") if not x.startswith("#") ]
 
-        for x, r in enumerate(self.tabix.fetch( parser = pysam.asGTF() )):
-            self.assertEqual( "\t".join( self.compare[x]), str(r) )
-
-    def testTuple( self ):
+    def testRead( self ):
 
         for x, r in enumerate(self.tabix.fetch( parser = pysam.asTuple() )):
             self.assertEqual( self.compare[x], list(r) )
-
             self.assertEqual( len(self.compare[x]), len(r) )
+
+            # test indexing
             for c in range(0,len(r)):
                 self.assertEqual( self.compare[x][c], r[c] )
 
+            # test slicing access
+            for c in range(0, len(r)-1):
+                for cc in range(c+1, len(r)):
+                    self.assertEqual( self.compare[x][c:cc],
+                                      r[c:cc] )
+
+    def testWrite( self ):
+        
+        for x, r in enumerate(self.tabix.fetch( parser = pysam.asTuple() )):
+            self.assertEqual( self.compare[x], list(r) )
+            c = list(r)
+            for y in range(len(r)):
+                r[y] = "test_%05i" % y
+                c[y] = "test_%05i" % y
+            self.assertEqual( c, list(r) )
+            self.assertEqual( "\t".join( c ), str(r) )
+            # check second assignment
+            for y in range(len(r)):
+                r[y] = "test_%05i" % y
+            self.assertEqual( c, list(r) )
+            self.assertEqual( "\t".join( c ), str(r) )
+
+    def testUnset( self ):
+        for x, r in enumerate(self.tabix.fetch( parser = pysam.asTuple() )):
+            self.assertEqual( self.compare[x], list(r) )
+            c = list(r)
+            e = list(r)
+            for y in range(len(r)):
+                r[y] = c[y] = None
+                e[y] = ""
+                self.assertEqual( c, list(r) )
+                self.assertEqual( "\t".join(e), str(r) )
+
+class TestGTF( TestParser ):
+
+    def testRead( self ):
+
+        for x, r in enumerate(self.tabix.fetch( parser = pysam.asGTF() )):
+            c =  self.compare[x]
+            
+            self.assertEqual( len(c), len(r) )
+            self.assertEqual( "\t".join(c), str(r) )
+            self.assertTrue( r.gene_id.startswith("ENSG") )
+            if r.feature != "gene":
+                self.assertTrue( r.transcript_id.startswith("ENST") )
+            self.assertEqual( c[0], r.contig )
+
+class TestBed( unittest.TestCase ):
+    filename = "example.bed.gz"
+
+    def setUp( self ):
+
+        self.tabix = pysam.Tabixfile( self.filename )
+        self.compare = [ x[:-1].split("\t") for x in gzip.open( self.filename, "r") if not x.startswith("#") ]
+
+    def testRead( self ):
+
+        for x, r in enumerate(self.tabix.fetch( parser = pysam.asBed() )):
+            c = self.compare[x]
+            self.assertEqual( "\t".join( c ), str(r) )
+            self.assertEqual( list(c), list(r) )
+            self.assertEqual( c[0], r.contig)
+            self.assertEqual( int(c[1]), r.start)
+            self.assertEqual( int(c[2]), r.end)
+
+    def testWrite( self ):
+
+        for x, r in enumerate(self.tabix.fetch( parser = pysam.asBed() )):
+            c = self.compare[x]
+            self.assertEqual( "\t".join( c ), str(r) )
+            self.assertEqual( list(c), list(r) )
+
+            r.contig = "test"
+            self.assertEqual( "test", r.contig)
+            self.assertEqual( "test", r[0])
+
+            r.start += 1
+            self.assertEqual( int(c[1]) + 1, r.start )
+            self.assertEqual( str(int(c[1]) + 1), r[1] )
+
+            r.end += 1
+            self.assertEqual( int(c[2]) + 1, r.end )
+            self.assertEqual( str(int(c[2]) + 1), r[2] )
+
+class TestVCF( TestParser ):
+
+    filename = "example.vcf40.gz"
+    columns = ("contig", "pos", "id", 
+               "ref", "alt", "qual", 
+               "filter", "info", "format" )
+
+    def testRead( self ):
+        
+        ncolumns = len(self.columns) 
+
+        for x, r in enumerate(self.tabix.fetch( parser = pysam.asVCF() )):
+            c = self.compare[x]
+            for y, field in enumerate( self.columns ):
+                if field == "pos":
+                    self.assertEqual( int(c[y]) - 1, getattr( r, field ) )
+                    self.assertEqual( int(c[y]) - 1, r.pos )
+                else:
+                    self.assertEqual( c[y], getattr( r, field ), 
+                                      "mismatch in field %s: %s != %s" %\
+                                          ( field,c[y], getattr( r, field ) ) )
+            self.assertEqual( len(c), len( r ) + ncolumns )
+            
+            for y in range(len(c) - ncolumns):
+                self.assertEqual( c[ncolumns+y], r[y] )
+                
+    def testWrite( self ):
+
+        ncolumns = len(self.columns) 
+
+        for x, r in enumerate(self.tabix.fetch( parser = pysam.asVCF() )):
+
+            c = self.compare[x]
+
+            # check unmodified string
+            ref_string = "\t".join( c )
+            cmp_string = str(r)
+            self.assertEqual( ref_string, cmp_string )
+
+            # set fields and compare field-wise
+            for y, field in enumerate( self.columns ):
+                if field == "pos":
+                    rpos = getattr( r, field )
+                    self.assertEqual( int(c[y]) - 1, rpos )
+                    self.assertEqual( int(c[y]) - 1, r.pos )
+                    # increment pos by 1
+                    setattr( r, field, rpos + 1 )
+                    self.assertEqual( getattr( r, field ), rpos + 1 )
+                    c[y] = str(int(c[y]) + 1 ) 
+                else:
+                    setattr( r, field, "test_%i" % y)
+                    c[y] = "test_%i" % y
+                    self.assertEqual( c[y], getattr( r, field ), 
+                                      "mismatch in field %s: %s != %s" %\
+                                          ( field,c[y], getattr( r, field ) ) )
+
+            self.assertEqual( len(c), len( r ) + ncolumns )
+            
+            for y in range(len(c) - ncolumns):
+                c[ncolumns+y] = "test_%i" % y
+                r[y] = "test_%i" % y
+                self.assertEqual( c[ncolumns+y], r[y] )
+
+class TestVCF( TestParser ):
+
+    filename = "example.vcf40.gz"
+
+    def testOpening( self ):
+        while 1:
+            infile = pysam.Tabixfile( self.filename )
+            infile.close()
+
+                
+            # check strings
+            ref_string = "\t".join( c )
+            cmp_string = str(r)
+            
+            self.assertEqual( ref_string, cmp_string )
+
 if __name__ == "__main__":
+
     unittest.main()