Imported Upstream version 0.5
[pysam.git] / tests / tabix_test.py
index 8eb8a60a618d4865c07d5c853c7f2e37226e8cd6..207a59c6c125360655ac77192f43640903fc1c1d 100644 (file)
@@ -85,7 +85,7 @@ class TestIteration( unittest.TestCase ):
     def setUp( self ):
 
         self.tabix = pysam.Tabixfile( self.filename )
-        lines = gzip.open(self.filename).readlines()
+        lines = [ x for x in gzip.open(self.filename).readlines() if not x.startswith("#") ]
         # creates index of contig, start, end, adds content without newline.
         self.compare = [ 
             (x[0][0], int(x[0][3]), int(x[0][4]), x[1]) 
@@ -196,6 +196,14 @@ class TestIteration( unittest.TestCase ):
         # check that contigs is read-only
         self.assertRaises( AttributeError, setattr, self.tabix, "contigs", ["chr1", "chr2"] )
 
+    def testHeader( self ):
+        ref = []
+        for x in gzip.open( self.filename ):
+            if not x.startswith("#"): break
+            ref.append( x[:-1] )
+        header = list( self.tabix.header )
+        self.assertEqual( ref, header )
+
 class TestParser( unittest.TestCase ):
 
     filename = "example.gtf.gz" 
@@ -203,23 +211,140 @@ class TestParser( unittest.TestCase ):
     def setUp( self ):
 
         self.tabix = pysam.Tabixfile( self.filename )
-        self.compare = [ x[:-1].split("\t") for x in gzip.open( self.filename, "r") ]
-
-    def testGTF( self ):
-
-        for x, r in enumerate(self.tabix.fetch( parser = pysam.asGTF() )):
-            self.assertEqual( "\t".join( self.compare[x]), str(r) )
+        self.compare = [ x[:-1].split("\t") for x in gzip.open( self.filename, "r") if not x.startswith("#") ]
 
-    def testTuple( self ):
+    def testRead( self ):
 
         for x, r in enumerate(self.tabix.fetch( parser = pysam.asTuple() )):
             self.assertEqual( self.compare[x], list(r) )
-
             self.assertEqual( len(self.compare[x]), len(r) )
+
             for c in range(0,len(r)):
                 self.assertEqual( self.compare[x][c], r[c] )
 
+    def testWrite( self ):
+        
+        for x, r in enumerate(self.tabix.fetch( parser = pysam.asTuple() )):
+            self.assertEqual( self.compare[x], list(r) )
+            c = list(r)
+            for y in range(len(r)):
+                r[y] = "test_%05i" % y
+                c[y] = "test_%05i" % y
+            self.assertEqual( c, list(r) )
+            self.assertEqual( "\t".join( c ), str(r) )
+            # check second assignment
+            for y in range(len(r)):
+                r[y] = "test_%05i" % y
+            self.assertEqual( c, list(r) )
+            self.assertEqual( "\t".join( c ), str(r) )
+
+    def testUnset( self ):
+        for x, r in enumerate(self.tabix.fetch( parser = pysam.asTuple() )):
+            self.assertEqual( self.compare[x], list(r) )
+            c = list(r)
+            e = list(r)
+            for y in range(len(r)):
+                r[y] = c[y] = None
+                e[y] = ""
+                self.assertEqual( c, list(r) )
+                self.assertEqual( "\t".join(e), str(r) )
+
+class TestGTF( TestParser ):
+    def testRead( self ):
+
+        for x, r in enumerate(self.tabix.fetch( parser = pysam.asGTF() )):
+            self.assertEqual( "\t".join( self.compare[x]), str(r) )
+
+class TestBed( unittest.TestCase ):
+    filename = "example.bed.gz"
+
+    def setUp( self ):
+
+        self.tabix = pysam.Tabixfile( self.filename )
+        self.compare = [ x[:-1].split("\t") for x in gzip.open( self.filename, "r") if not x.startswith("#") ]
+
+    def testRead( self ):
+
+        for x, r in enumerate(self.tabix.fetch( parser = pysam.asBed() )):
+            c = self.compare[x]
+            self.assertEqual( "\t".join( c ), str(r) )
+            self.assertEqual( list(c), list(r) )
+            self.assertEqual( c[0], r.contig)
+            self.assertEqual( int(c[1]), r.start)
+            self.assertEqual( int(c[2]), r.end)
+
+    def testWrite( self ):
+
+        for x, r in enumerate(self.tabix.fetch( parser = pysam.asBed() )):
+            c = self.compare[x]
+            self.assertEqual( "\t".join( c ), str(r) )
+            self.assertEqual( list(c), list(r) )
+
+            r.contig = "test"
+            self.assertEqual( "test", r.contig)
+            self.assertEqual( "test", r[0])
+
+            r.start += 1
+            self.assertEqual( int(c[1]) + 1, r.start )
+            self.assertEqual( str(int(c[1]) + 1), r[1] )
+
+            r.end += 1
+            self.assertEqual( int(c[2]) + 1, r.end )
+            self.assertEqual( str(int(c[2]) + 1), r[2] )
+
+class TestVCF( TestParser ):
+
+    filename = "example.vcf40.gz"
+    columns = ("contig", "pos", "id", 
+               "ref", "alt", "qual", 
+               "filter", "info", "format" )
+
+    def testRead( self ):
+        
+        ncolumns = len(self.columns) 
+
+        for x, r in enumerate(self.tabix.fetch( parser = pysam.asVCF() )):
+            c = self.compare[x]
+            for y, field in enumerate( self.columns ):
+                if field == "pos":
+                    self.assertEqual( int(c[y])-1, getattr( r, field ) )
+                    self.assertEqual( int(c[y])-1, r.pos )
+                else:
+                    self.assertEqual( c[y], getattr( r, field ), 
+                                      "mismatch in field %s: %s != %s" %\
+                                          ( field,c[y], getattr( r, field ) ) )
+            self.assertEqual( len(c), len( r ) + ncolumns )
+            
+            for y in range(len(c) - ncolumns):
+                self.assertEqual( c[ncolumns+y], r[y] )
+                
+    def testWrite( self ):
+
+        ncolumns = len(self.columns) 
+
+        for x, r in enumerate(self.tabix.fetch( parser = pysam.asVCF() )):
+            c = self.compare[x]
+            for y, field in enumerate( self.columns ):
+                if field == "pos":
+                    r.pos += 1
+                    self.assertEqual( int(c[y]), getattr( r, field ) )
+                    self.assertEqual( int(c[y]), r.pos )
+                else:
+                    setattr( r, field, "test_%i" % y)
+                    c[y] = "test_%i" % y
+                    self.assertEqual( c[y], getattr( r, field ), 
+                                      "mismatch in field %s: %s != %s" %\
+                                          ( field,c[y], getattr( r, field ) ) )
+
+            self.assertEqual( len(c), len( r ) + ncolumns )
+            
+            for y in range(len(c) - ncolumns):
+                c[ncolumns+y] = "test_%i" % y
+                r[y] = "test_%i" % y
+                self.assertEqual( c[ncolumns+y], r[y] )
+                
 if __name__ == "__main__":
+
     unittest.main()