Imported Upstream version 0.5
[pysam.git] / tests / pysam_test.py
index c2ae6fa37b8d34008eb639adc608fc80de7411d2..d2bf4150de58551accb107f1822e85912c7c58d5 100755 (executable)
@@ -7,11 +7,12 @@ and data files located there.
 
 import pysam
 import unittest
-import os
-import itertools
+import os, re, sys
+import itertools, collections
 import subprocess
 import shutil
-
+import logging
+SAMTOOLS="samtools"
 
 def checkBinaryEqual( filename1, filename2 ):
     '''return true if the two files are binary equal.'''
@@ -47,7 +48,13 @@ def runSamtools( cmd ):
     except OSError, e:
         print >>sys.stderr, "Execution failed:", e
 
-        
+def getSamtoolsVersion():
+    '''return samtools version'''
+
+    pipe = subprocess.Popen(SAMTOOLS, shell=True, stderr=subprocess.PIPE).stderr
+    lines = "".join(pipe.readlines())
+    return re.search( "Version:\s+(\S+)", lines).groups()[0]
+
 class BinaryTest(unittest.TestCase):
     '''test samtools command line commands and compare
     against pysam commands.
@@ -61,44 +68,53 @@ class BinaryTest(unittest.TestCase):
     mCommands = \
         { "faidx" : \
         ( 
-            ("ex1.fa.fai", "samtools faidx ex1.fa"), 
+            ("ex1.fa.fai", "faidx ex1.fa"), 
             ("pysam_ex1.fa.fai", (pysam.faidx, "ex1.fa") ),
             ),
           "import" :
               (
-                ("ex1.bam", "samtools import ex1.fa.fai ex1.sam.gz ex1.bam" ),
+                ("ex1.bam", "import ex1.fa.fai ex1.sam.gz ex1.bam" ),
                 ("pysam_ex1.bam", (pysam.samimport, "ex1.fa.fai ex1.sam.gz pysam_ex1.bam") ),
                 ),
           "index":
               (
-                ("ex1.bam.bai", "samtools index ex1.bam" ),
+                ("ex1.bam.bai", "index ex1.bam" ),
                 ("pysam_ex1.bam.bai", (pysam.index, "pysam_ex1.bam" ) ),
                 ),
           "pileup1" :
           (
-                ("ex1.pileup", "samtools pileup -cf ex1.fa ex1.bam > ex1.pileup" ),
+                ("ex1.pileup", "pileup -cf ex1.fa ex1.bam > ex1.pileup" ),
                 ("pysam_ex1.pileup", (pysam.pileup, "-c -f ex1.fa ex1.bam" ) )
                 ),
           "pileup2" :
           (
-                ("ex1.glf", "samtools pileup -gf ex1.fa ex1.bam > ex1.glf" ),
+                ("ex1.glf", "pileup -gf ex1.fa ex1.bam > ex1.glf" ),
                 ("pysam_ex1.glf", (pysam.pileup, "-g -f ex1.fa ex1.bam" ) )
                 ),
           "glfview" :
         (
-                ("ex1.glfview", "samtools glfview ex1.glf > ex1.glfview"),
+                ("ex1.glfview", "glfview ex1.glf > ex1.glfview"),
                 ("pysam_ex1.glfview", (pysam.glfview, "ex1.glf" ) ),
                 ),
           "view" :
         (
-                ("ex1.view", "samtools view ex1.bam > ex1.view"),
+                ("ex1.view", "view ex1.bam > ex1.view"),
                 ("pysam_ex1.view", (pysam.view, "ex1.bam" ) ),
                 ),
+          "view2" :
+        (
+                ("ex1.view", "view -bT ex1.fa -o ex1.view2 ex1.sam"),
+                # note that -o ex1.view2 throws exception.
+                ("pysam_ex1.view", (pysam.view, "-bT ex1.fa -oex1.view2 ex1.sam" ) ),
+                ),
         }
 
     # some tests depend on others. The order specifies in which order
     # the samtools commands are executed.
-    mOrder = ('faidx', 'import', 'index', 'pileup1', 'pileup2', 'glfview', 'view' )
+    mOrder = ('faidx', 'import', 'index', 
+              'pileup1', 'pileup2', 
+              # 'glfview', deprecated
+              'view', 'view2' )
 
     def setUp( self ):
         '''setup tests. 
@@ -115,22 +131,34 @@ class BinaryTest(unittest.TestCase):
                 command = self.mCommands[label]
                 samtools_target, samtools_command = command[0]
                 pysam_target, pysam_command = command[1]
-                runSamtools( samtools_command )
+                runSamtools( " ".join( (SAMTOOLS, samtools_command )))
                 pysam_method, pysam_options = pysam_command
                 output = pysam_method( *pysam_options.split(" "), raw=True)
                 if ">" in samtools_command:
-                    outfile = open( pysam_target, "w" )
+                    outfile = open( pysam_target, "wb" )
                     for line in output: outfile.write( line )
                     outfile.close()
 
             BinaryTest.first_time = False
 
+        samtools_version = getSamtoolsVersion()
+
+        def _r( s ):
+            # patch - remove any of the alpha/beta suffixes, i.e., 0.1.12a -> 0.1.12
+            if s.count('-') > 0: s = s[0:s.find('-')]
+            return re.sub( "[^0-9.]", "", s )
+
+        if _r(samtools_version) != _r( pysam.__samtools_version__):
+            raise ValueError("versions of pysam/samtools and samtools differ: %s != %s" % \
+                                 (pysam.__samtools_version__,
+                                  samtools_version ))
+
     def checkCommand( self, command ):
         if command:
             samtools_target, pysam_target = self.mCommands[command][0][0], self.mCommands[command][1][0]
             self.assertTrue( checkBinaryEqual( samtools_target, pysam_target ), 
                              "%s failed: files %s and %s are not the same" % (command, samtools_target, pysam_target) )
-
+            
     def testImport( self ):
         self.checkCommand( "import" )
 
@@ -143,17 +171,18 @@ class BinaryTest(unittest.TestCase):
     def testPileup2( self ):
         self.checkCommand( "pileup2" )
 
-    def testGLFView( self ):
-        self.checkCommand( "glfview" )
+    # deprecated
+    # def testGLFView( self ):
+    #     self.checkCommand( "glfview" )
 
     def testView( self ):
         self.checkCommand( "view" )
 
     def testEmptyIndex( self ):
-        self.assertRaises( pysam.SamtoolsError, pysam.index, "exdoesntexist.bam" )
+        self.assertRaises( IOError, pysam.index, "exdoesntexist.bam" )
 
     def __del__(self):
-
+        return
         for label, command in self.mCommands.iteritems():
             samtools_target, samtools_command = command[0]
             pysam_target, pysam_command = command[1]
@@ -234,17 +263,33 @@ class IOTest(unittest.TestCase):
         samfile.close()
         self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
 
-    def testPileupFromClosedFile( self ):
+    def testClosedFile( self ):
+        '''test that access to a closed samfile raises ValueError.'''
 
         samfile = pysam.Samfile( "ex1.bam", "rb" )
         samfile.close()
+        self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
         self.assertRaises( ValueError, samfile.pileup, 'chr1', 100, 120)
+        self.assertRaises( ValueError, samfile.getrname, 0 )
+        self.assertRaises( ValueError, samfile.tell )
+        self.assertRaises( ValueError, samfile.seek, 0 )
+        self.assertRaises( ValueError, getattr, samfile, "nreferences" )
+        self.assertRaises( ValueError, getattr, samfile, "references" )
+        self.assertRaises( ValueError, getattr, samfile, "lengths" )
+        self.assertRaises( ValueError, getattr, samfile, "text" )
+        self.assertRaises( ValueError, getattr, samfile, "header" )
+
+        # write on closed file 
+        self.assertEqual( 0, samfile.write(None) )
+
+    def testAutoDetection( self ):
+        samfile = pysam.Samfile( "ex3.bam" )
+        samfile.fetch()
+        samfile.close()
 
-    def testBinaryReadFromSamfile( self ):
-        pass
-        # needs to re-activated, see issue 19
-        #samfile = pysam.Samfile( "ex1.bam", "r" )
-        #samfile.fetch().next()
+        samfile = pysam.Samfile( "ex3.sam")
+        samfile.fetch()
+        samfile.close()
 
     def testReadingFromFileWithoutIndex( self ):
         '''read from bam file without index.'''
@@ -262,12 +307,19 @@ class TestIteratorRow(unittest.TestCase):
     def checkRange( self, rnge ):
         '''compare results from iterator with those from samtools.'''
         ps = list(self.samfile.fetch(region=rnge))
-        sa = list(pysam.view( "ex1.bam", rnge , raw = True) )
+        sa = list(pysam.view( "ex1.bam", rnge, raw = True) )
         self.assertEqual( len(ps), len(sa), "unequal number of results for range %s: %i != %i" % (rnge, len(ps), len(sa) ))
         # check if the same reads are returned and in the same order
         for line, pair in enumerate( zip( ps, sa ) ):
-            data = pair[1].split("\t")
-            self.assertEqual( pair[0].qname, data[0], "read id mismatch in line %i: %s != %s" % (line, pair[0].rname, data[0]) )
+            a,b = pair
+            d = b.split("\t")
+            self.assertEqual( a.qname, d[0], "line %i: read id mismatch: %s != %s" % (line, a.rname, d[0]) )
+            self.assertEqual( a.pos, int(d[3])-1, "line %i: read position mismatch: %s != %s, \n%s\n%s\n" % \
+                                  (line, a.pos, int(d[3])-1,
+                                   str(a), str(d) ) )
+            self.assertEqual( a.qual, d[10], "line %i: quality mismatch: %s != %s, \n%s\n%s\n" % \
+                                  (line, a.qual, d[10],
+                                   str(a), str(d) ) )
 
     def testIteratePerContig(self):
         '''check random access per contig'''
@@ -348,8 +400,11 @@ class TestIteratorColumn(unittest.TestCase):
                     self.assertEqual( len(columns), refcov, "wrong number of pileup columns returned for position %s:%i, %i should be %i" %(contig,pos,len(columns), refcov) )
                 elif refcov == 1:
                     # one read, all columns of the read are returned
-                    self.assertEqual( len(columns), refcolumns, "pileup incomplete - %i should be %i " % (len(columns), refcolumns))
+                    self.assertEqual( len(columns), refcolumns, "pileup incomplete at position %i: got %i, expected %i " %\
+                                          (pos, len(columns), refcolumns))
+
                     
+    
     def tearDown(self):
         self.samfile.close()
     
@@ -398,10 +453,22 @@ class TestAlignedReadFromBam(unittest.TestCase):
     def testARseq(self):
         self.assertEqual( self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 1: %s != %s" % (self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
         self.assertEqual( self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "sequence size mismatch in read 2: %s != %s" % (self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
+        self.assertEqual( self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 4: %s != %s" % (self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
 
     def testARqual(self):
         self.assertEqual( self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 1: %s != %s" % (self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
         self.assertEqual( self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "quality string mismatch in read 2: %s != %s" % (self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
+        self.assertEqual( self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 3: %s != %s" % (self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
+
+    def testARquery(self):
+        self.assertEqual( self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "query mismatch in read 1: %s != %s" % (self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
+        self.assertEqual( self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "query size mismatch in read 2: %s != %s" % (self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
+        self.assertEqual( self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT", "query mismatch in read 4: %s != %s" % (self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT") )
+
+    def testARqqual(self):
+        self.assertEqual( self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "qquality string mismatch in read 1: %s != %s" % (self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
+        self.assertEqual( self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "qquality string mismatch in read 2: %s != %s" % (self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
+        self.assertEqual( self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22", "qquality string mismatch in read 3: %s != %s" % (self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22") )
 
     def testPresentOptionalFields(self):
         self.assertEqual( self.reads[0].opt('NM'), 1, "optional field mismatch in read 1, NM: %s != %s" % (self.reads[0].opt('NM'), 1) )
@@ -472,7 +539,17 @@ class TestHeaderSam(unittest.TestCase):
     def testHeaders(self):
         self.compareHeaders( self.header, self.samfile.header )
         self.compareHeaders( self.samfile.header, self.header )
-        
+
+    def testNameMapping( self ):
+        for x, y in enumerate( ("chr1", "chr2")):
+            tid = self.samfile.gettid( y )
+            ref = self.samfile.getrname( x )
+            self.assertEqual( tid, x )
+            self.assertEqual( ref, y )
+
+        self.assertEqual( self.samfile.gettid("chr?"), -1 )
+        self.assertRaises( ValueError, self.samfile.getrname, 2 )
+
     def tearDown(self):
         self.samfile.close()
 
@@ -518,7 +595,14 @@ class TestPileupObjects(unittest.TestCase):
 
     def tearDown(self):
         self.samfile.close()
-        
+
+class TestContextManager(unittest.TestCase):
+
+    def testManager( self ):
+        with pysam.Samfile('ex1.bam', 'rb') as samfile:
+            samfile.fetch()
+        self.assertEqual( samfile._isOpen(), False )
+
 class TestExceptions(unittest.TestCase):
 
     def setUp(self):
@@ -548,23 +632,53 @@ class TestExceptions(unittest.TestCase):
         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, 0 )
         self.assertRaises( ValueError, self.samfile.fetch, "chr1", -5, -10 )
 
+        self.assertRaises( ValueError, self.samfile.count, "chr1", 5, -10 )
+        self.assertRaises( ValueError, self.samfile.count, "chr1", 5, 0 )        
+        self.assertRaises( ValueError, self.samfile.count, "chr1", -5, -10 )
+
     def testOutOfRangeNegativeOldFormat(self):
         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-10" )
         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-0" )
         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5--10" )
 
+        self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-10" )
+        self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-0" )
+        self.assertRaises( ValueError, self.samfile.count, region="chr1:-5--10" )
+
     def testOutOfRangNewFormat(self):
         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999, 99999999999 )
+        self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999, 99999999999 )
 
     def testOutOfRangeLargeNewFormat(self):
         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
+        self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
 
     def testOutOfRangeLargeOldFormat(self):
         self.assertRaises( ValueError, self.samfile.fetch, "chr1:99999999999999999-999999999999999999" )
+        self.assertRaises( ValueError, self.samfile.count, "chr1:99999999999999999-999999999999999999" )
+
+    def testZeroToZero(self):        
+        '''see issue 44'''
+        self.assertEqual( len(list(self.samfile.fetch('chr1', 0, 0))), 0)
 
     def tearDown(self):
         self.samfile.close()
 
+class TestWrongFormat(unittest.TestCase):
+    '''test cases for opening files not in bam/sam format.'''
+
+    def testOpenSamAsBam( self ):
+        self.assertRaises( ValueError, pysam.Samfile, 'ex1.sam', 'rb' )
+
+    def testOpenBamAsSam( self ):
+        self.assertRaises( ValueError, pysam.Samfile, 'ex1.bam', 'r' )
+
+    def testOpenFastaAsSam( self ):
+        self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'r' )
+
+    def testOpenFastaAsBam( self ):
+        self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'rb' )
+
 class TestFastaFile(unittest.TestCase):
 
     mSequences = { 'chr1' :
@@ -581,20 +695,34 @@ class TestFastaFile(unittest.TestCase):
             self.assertEqual( seq, self.file.fetch( id ) )
             for x in range( 0, len(seq), 10):
                 self.assertEqual( seq[x:x+10], self.file.fetch( id, x, x+10) )
+                # test x:end
+                self.assertEqual( seq[x:], self.file.fetch( id, x) )
+                # test 0:x
+                self.assertEqual( seq[:x], self.file.fetch( id, None, x) )
+
+        
+        # unknown sequence returns ""
+        self.assertEqual( "", self.file.fetch("chr12") )
+
+    def testOutOfRangeAccess( self ):
+        '''test out of range access.'''
+        # out of range access returns an empty string
+        for contig, s in self.mSequences.iteritems():
+            self.assertEqual( self.file.fetch( contig, len(s), len(s)+1), "" )
+
+        self.assertEqual( self.file.fetch( "chr3", 0 , 100), "" ) 
 
     def testFetchErrors( self ):
         self.assertRaises( ValueError, self.file.fetch )
-        self.assertRaises( ValueError, self.file.fetch, "chr1", 0 )
         self.assertRaises( ValueError, self.file.fetch, "chr1", -1, 10 )
         self.assertRaises( ValueError, self.file.fetch, "chr1", 20, 10 )
-        # the following segfaults:
-        # self.assertRaises( IndexError, self.file.fetch, "chr12", )
-        pass
 
+    def testLength( self ):
+        self.assertEqual( len(self.file), 2 )
+        
     def tearDown(self):
         self.file.close()
 
-
 class TestAlignedRead(unittest.TestCase):
     '''tests to check if aligned read can be constructed
     and manipulated.
@@ -645,7 +773,7 @@ class TestAlignedRead(unittest.TestCase):
         a.mpos=200
         a.isize=167
        a.qual="1234" * 3
-
+        # todo: create tags
         return a
 
     def testUpdate( self ):
@@ -714,6 +842,19 @@ class TestAlignedRead(unittest.TestCase):
 
         return a
 
+    def testTagParsing( self ):
+        '''test for tag parsing
+
+        see http://groups.google.com/group/pysam-user-group/browse_thread/thread/67ca204059ea465a
+        '''
+        samfile=pysam.Samfile( "ex8.bam","rb" )
+
+        for entry in samfile:
+            before = entry.tags
+            entry.tags = entry.tags
+            after = entry.tags
+            self.assertEqual( after, before )
+
 class TestDeNovoConstruction(unittest.TestCase):
     '''check BAM/SAM file construction using ex3.sam
     
@@ -803,7 +944,7 @@ class TestDeNovoConstruction(unittest.TestCase):
         others = list(infile)
         for denovo, other in zip( others, self.reads):
             self.checkFieldEqual( other, denovo )
-            self.assertEqual( other, denovo)
+            self.assertEqual( other.compare( denovo ), 0 )
 
     def testSAMPerRead( self ):
         '''check if individual reads are binary equal.'''
@@ -812,7 +953,7 @@ class TestDeNovoConstruction(unittest.TestCase):
         others = list(infile)
         for denovo, other in zip( others, self.reads):
             self.checkFieldEqual( other, denovo )
-            self.assertEqual( other, denovo)
+            self.assertEqual( other.compare( denovo), 0 )
             
     def testBAMWholeFile( self ):
         
@@ -829,9 +970,336 @@ class TestDeNovoConstruction(unittest.TestCase):
         os.unlink( tmpfilename )
 
 
+class TestDoubleFetch(unittest.TestCase):
+    '''check if two iterators on the same bamfile are independent.'''
+    
+    def testDoubleFetch( self ):
+
+        samfile1 = pysam.Samfile('ex1.bam', 'rb')
+
+        for a,b in zip(samfile1.fetch(), samfile1.fetch()):
+            self.assertEqual( a.compare( b ), 0 )
+
+    def testDoubleFetchWithRegion( self ):
+
+        samfile1 = pysam.Samfile('ex1.bam', 'rb')
+        chr, start, stop = 'chr1', 200, 3000000
+        self.assertTrue(len(list(samfile1.fetch ( chr, start, stop))) > 0) #just making sure the test has something to catch
+
+        for a,b in zip(samfile1.fetch( chr, start, stop), samfile1.fetch( chr, start, stop)):
+            self.assertEqual( a.compare( b ), 0 ) 
+
+    def testDoubleFetchUntilEOF( self ):
+
+        samfile1 = pysam.Samfile('ex1.bam', 'rb')
+
+        for a,b in zip(samfile1.fetch( until_eof = True), 
+                       samfile1.fetch( until_eof = True )):
+            self.assertEqual( a.compare( b), 0 )
+
+class TestRemoteFileFTP(unittest.TestCase):
+    '''test remote access.
+
+    '''
+
+    # Need to find an ftp server without password on standard
+    # port.
+
+    url = "ftp://ftp.sanger.ac.uk/pub/rd/humanSequences/CV.bam"
+    region = "1:1-1000"
+
+    def testFTPView( self ):
+        return
+        result = pysam.view( self.url, self.region )
+        self.assertEqual( len(result), 36 )
+        
+    def testFTPFetch( self ):
+        return
+        samfile = pysam.Samfile(self.url, "rb")  
+        result = list(samfile.fetch( region = self.region ))
+        self.assertEqual( len(result), 36 )
+
+class TestRemoteFileHTTP( unittest.TestCase):
+
+    url = "http://genserv.anat.ox.ac.uk/downloads/pysam/test/ex1.bam"
+    region = "chr1:1-1000"
+    local = "ex1.bam"
+
+    def testView( self ):
+        samfile_local = pysam.Samfile(self.local, "rb")  
+        ref = list(samfile_local.fetch( region = self.region ))
+        
+        result = pysam.view( self.url, self.region )
+        self.assertEqual( len(result), len(ref) )
+        
+    def testFetch( self ):
+        samfile = pysam.Samfile(self.url, "rb")  
+        result = list(samfile.fetch( region = self.region ))
+        samfile_local = pysam.Samfile(self.local, "rb")  
+        ref = list(samfile_local.fetch( region = self.region ))
+
+        self.assertEqual( len(ref), len(result) )
+        for x, y in zip(result, ref):
+            self.assertEqual( x.compare( y ), 0 )
+
+    def testFetchAll( self ):
+        samfile = pysam.Samfile(self.url, "rb")  
+        result = list(samfile.fetch())
+        samfile_local = pysam.Samfile(self.local, "rb")  
+        ref = list(samfile_local.fetch() )
+
+        self.assertEqual( len(ref), len(result) )
+        for x, y in zip(result, ref):
+            self.assertEqual( x.compare( y ), 0 )
+
+class TestLargeOptValues( unittest.TestCase ):
+
+    ints = ( 65536, 214748, 2147484, 2147483647 )
+    floats = ( 65536.0, 214748.0, 2147484.0 )
+
+    def check( self, samfile ):
+        
+        i = samfile.fetch()
+        for exp in self.ints:
+            rr = i.next()
+            obs = rr.opt("ZP")
+            self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
+
+        for exp in [ -x for x in self.ints ]:
+            rr = i.next()
+            obs = rr.opt("ZP")
+            self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
+
+        for exp in self.floats:
+            rr = i.next()
+            obs = rr.opt("ZP")
+            self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
+
+        for exp in [ -x for x in self.floats ]:
+            rr = i.next()
+            obs = rr.opt("ZP")
+            self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
+
+    def testSAM( self ):
+        samfile = pysam.Samfile("ex10.sam", "r")
+        self.check( samfile )
+
+    def testBAM( self ):
+        samfile = pysam.Samfile("ex10.bam", "rb")
+        self.check( samfile )
+
+class TestSNPCalls( unittest.TestCase ):
+    '''test pysam SNP calling ability.'''
+
+    def checkEqual( self, a, b ):
+        for x in ("reference_base", 
+                  "pos",
+                  "genotype",
+                  "consensus_quality",
+                  "snp_quality",
+                  "mapping_quality",
+                  "coverage" ):
+            self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" % 
+                              (x, getattr(a, x), getattr(b,x), str(a), str(b)))
+
+    def testAllPositionsViaIterator( self ):
+        samfile = pysam.Samfile( "ex1.bam", "rb")  
+        fastafile = pysam.Fastafile( "ex1.fa" )
+        try: 
+            refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base != "*"]
+        except pysam.SamtoolsError:
+            pass
+
+        i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
+        calls = list(pysam.IteratorSNPCalls(i))
+        for x,y in zip( refs, calls ):
+            self.checkEqual( x, y )
+
+    def testPerPositionViaIterator( self ):
+        # test pileup for each position. This is a slow operation
+        # so this test is disabled 
+        return
+        samfile = pysam.Samfile( "ex1.bam", "rb")  
+        fastafile = pysam.Fastafile( "ex1.fa" )
+        for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
+            if x.reference_base == "*": continue
+            i = samfile.pileup( x.chromosome, x.pos, x.pos+1,
+                                fastafile = fastafile,
+                                stepper = "samtools" )
+            z = [ zz for zz in pysam.IteratorSamtools(i) if zz.pos == x.pos ]
+            self.assertEqual( len(z), 1 )
+            self.checkEqual( x, z[0] )
+
+    def testPerPositionViaCaller( self ):
+        # test pileup for each position. This is a fast operation
+        samfile = pysam.Samfile( "ex1.bam", "rb")  
+        fastafile = pysam.Fastafile( "ex1.fa" )
+        i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
+        caller = pysam.SNPCaller( i )
+
+        for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
+            if x.reference_base == "*": continue
+            call = caller.call( x.chromosome, x.pos )
+            self.checkEqual( x, call )
+
+class TestIndelCalls( unittest.TestCase ):
+    '''test pysam indel calling.'''
+
+    def checkEqual( self, a, b ):
+
+        for x in ("pos",
+                  "genotype",
+                  "consensus_quality",
+                  "snp_quality",
+                  "mapping_quality",
+                  "coverage",
+                  "first_allele",
+                  "second_allele",
+                  "reads_first",
+                  "reads_second",
+                  "reads_diff"):
+            if b.genotype == "*/*" and x == "second_allele":
+                # ignore test for second allele (positions chr2:439 and chr2:1512)
+                continue
+            self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" % 
+                              (x, getattr(a, x), getattr(b,x), str(a), str(b)))
+
+    def testAllPositionsViaIterator( self ):
+
+        samfile = pysam.Samfile( "ex1.bam", "rb")  
+        fastafile = pysam.Fastafile( "ex1.fa" )
+        try: 
+            refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base == "*"]
+        except pysam.SamtoolsError:
+            pass
+
+        i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
+        calls = [ x for x in list(pysam.IteratorIndelCalls(i)) if x != None ]
+        for x,y in zip( refs, calls ):
+            self.checkEqual( x, y )
+
+    def testPerPositionViaCaller( self ):
+        # test pileup for each position. This is a fast operation
+        samfile = pysam.Samfile( "ex1.bam", "rb")  
+        fastafile = pysam.Fastafile( "ex1.fa" )
+        i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
+        caller = pysam.IndelCaller( i )
+
+        for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
+            if x.reference_base != "*": continue
+            call = caller.call( x.chromosome, x.pos )
+            self.checkEqual( x, call )
+
+class TestLogging( unittest.TestCase ):
+    '''test around bug issue 42,
+
+    failed in versions < 0.4
+    '''
+
+    def check( self, bamfile, log ):
+
+        if log:
+            logger = logging.getLogger('franklin')
+            logger.setLevel(logging.INFO)
+            formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
+            log_hand  = logging.FileHandler('log.txt')
+            log_hand.setFormatter(formatter)
+            logger.addHandler(log_hand)
+
+        bam  = pysam.Samfile(bamfile, 'rb')
+        cols = bam.pileup()
+        self.assert_( True )
+
+    def testFail1( self ):
+        self.check( "ex9_fail.bam", False )
+        self.check( "ex9_fail.bam", True )
+
+    def testNoFail1( self ):
+        self.check( "ex9_nofail.bam", False )
+        self.check( "ex9_nofail.bam", True )
+
+    def testNoFail2( self ):
+        self.check( "ex9_nofail.bam", True )
+        self.check( "ex9_nofail.bam", True )
+        
 # TODOS
 # 1. finish testing all properties within pileup objects
 # 2. check exceptions and bad input problems (missing files, optional fields that aren't present, etc...)
+# 3. check: presence of sequence
+
+class TestSamfileUtilityFunctions( unittest.TestCase ):
+
+    def testCount( self ):
+
+        samfile = pysam.Samfile( "ex1.bam", "rb" )
+
+        for contig in ("chr1", "chr2" ):
+            for start in xrange( 0, 2000, 100 ):
+                end = start + 1
+                self.assertEqual( len( list( samfile.fetch( contig, start, end ) ) ),
+                                  samfile.count( contig, start, end ) )
+
+                # test empty intervals
+                self.assertEqual( len( list( samfile.fetch( contig, start, start ) ) ),
+                                  samfile.count( contig, start, start ) )
+
+                # test half empty intervals
+                self.assertEqual( len( list( samfile.fetch( contig, start ) ) ),
+                                  samfile.count( contig, start ) )
+
+    def testMate( self ):
+        '''test mate access.'''
+
+        readnames = [ x.split("\t")[0] for x in open( "ex1.sam", "rb" ).readlines() ]
+        counts = collections.defaultdict( int )
+        for x in readnames: counts[x] += 1
+
+        samfile = pysam.Samfile( "ex1.bam", "rb" )
+        for read in samfile.fetch():
+            if not read.is_paired:
+                self.assertRaises( ValueError, samfile.mate, read )
+            elif read.mate_is_unmapped:
+                self.assertRaises( ValueError, samfile.mate, read )
+            else:
+                if counts[read.qname] == 1:
+                    self.assertRaises( ValueError, samfile.mate, read )
+                else:
+                    mate = samfile.mate( read )
+                    self.assertEqual( read.qname, mate.qname )
+                    self.assertEqual( read.is_read1, mate.is_read2 )
+                    self.assertEqual( read.is_read2, mate.is_read1 )
+                    self.assertEqual( read.pos, mate.mpos )
+                    self.assertEqual( read.mpos, mate.pos )
+
+class TestSamtoolsProxy( unittest.TestCase ):
+    '''tests for sanity checking access to samtools functions.'''
+
+    def testIndex( self ):
+        self.assertRaises( IOError, pysam.index, "missing_file" )
+
+    def testView( self ):
+        # note that view still echos "open: No such file or directory"
+        self.assertRaises( pysam.SamtoolsError, pysam.view, "missing_file" )
+
+    def testSort( self ):
+        self.assertRaises( pysam.SamtoolsError, pysam.sort, "missing_file" )
+
+class TestSamfileIndex( unittest.TestCase):
+    
+    def testIndex( self ):
+        samfile = pysam.Samfile( "ex1.bam", "rb" )
+        index = pysam.IndexedReads( samfile )
+        index.build()
+
+        reads = collections.defaultdict( int )
+
+        for read in samfile: reads[read.qname] += 1
+            
+        for qname, counts in reads.iteritems():
+            found = list(index.find( qname ))
+            self.assertEqual( len(found), counts )
+            for x in found: self.assertEqual( x.qname, qname )
+            
 
 if __name__ == "__main__":
     # build data files