Merge commit 'upstream/0.3'
[pysam.git] / tests / pysam_test.py
index c2ae6fa37b8d34008eb639adc608fc80de7411d2..5be5ca4c783f74cef3ebf775681ef1f95a619269 100755 (executable)
@@ -7,7 +7,7 @@ and data files located there.
 
 import pysam
 import unittest
-import os
+import os, re
 import itertools
 import subprocess
 import shutil
@@ -47,7 +47,13 @@ def runSamtools( cmd ):
     except OSError, e:
         print >>sys.stderr, "Execution failed:", e
 
-        
+def getSamtoolsVersion():
+    '''return samtools version'''
+
+    pipe = subprocess.Popen("samtools", shell=True, stderr=subprocess.PIPE).stderr
+    lines = "".join(pipe.readlines())
+    return re.search( "Version:\s+(\S+)", lines).groups()[0]
+
 class BinaryTest(unittest.TestCase):
     '''test samtools command line commands and compare
     against pysam commands.
@@ -125,12 +131,19 @@ class BinaryTest(unittest.TestCase):
 
             BinaryTest.first_time = False
 
+        samtools_version = getSamtoolsVersion()
+        if samtools_version != pysam.__samtools_version__:
+            raise ValueError("versions of pysam/samtools and samtools differ: %s != %s" % \
+                                 (pysam.__samtools_version__,
+                                  samtools_version ))
+
     def checkCommand( self, command ):
+
         if command:
             samtools_target, pysam_target = self.mCommands[command][0][0], self.mCommands[command][1][0]
             self.assertTrue( checkBinaryEqual( samtools_target, pysam_target ), 
                              "%s failed: files %s and %s are not the same" % (command, samtools_target, pysam_target) )
-
+            
     def testImport( self ):
         self.checkCommand( "import" )
 
@@ -153,7 +166,7 @@ class BinaryTest(unittest.TestCase):
         self.assertRaises( pysam.SamtoolsError, pysam.index, "exdoesntexist.bam" )
 
     def __del__(self):
-
+        return
         for label, command in self.mCommands.iteritems():
             samtools_target, samtools_command = command[0]
             pysam_target, pysam_command = command[1]
@@ -254,6 +267,12 @@ class IOTest(unittest.TestCase):
         self.assertRaises( ValueError, samfile.fetch )
         self.assertEqual( len(list( samfile.fetch(until_eof = True) )), 3270 )
 
+    def testReadingFromFileWithWrongMode( self ):
+
+        assert not os.path.exists( "ex2.bam.bai" )
+        samfile = pysam.Samfile( "ex2.bam", "r" )
+        self.assertRaises( ValueError, samfile.fetch )
+
 class TestIteratorRow(unittest.TestCase):
 
     def setUp(self):
@@ -283,6 +302,7 @@ class TestIteratorRow(unittest.TestCase):
     def tearDown(self):
         self.samfile.close()
 
+
 class TestIteratorRowAll(unittest.TestCase):
 
     def setUp(self):
@@ -348,7 +368,8 @@ class TestIteratorColumn(unittest.TestCase):
                     self.assertEqual( len(columns), refcov, "wrong number of pileup columns returned for position %s:%i, %i should be %i" %(contig,pos,len(columns), refcov) )
                 elif refcov == 1:
                     # one read, all columns of the read are returned
-                    self.assertEqual( len(columns), refcolumns, "pileup incomplete - %i should be %i " % (len(columns), refcolumns))
+                    self.assertEqual( len(columns), refcolumns, "pileup incomplete at position %i: got %i, expected %i " %\
+                                          (pos, len(columns), refcolumns))
                     
     def tearDown(self):
         self.samfile.close()
@@ -398,10 +419,22 @@ class TestAlignedReadFromBam(unittest.TestCase):
     def testARseq(self):
         self.assertEqual( self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 1: %s != %s" % (self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
         self.assertEqual( self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "sequence size mismatch in read 2: %s != %s" % (self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
+        self.assertEqual( self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 4: %s != %s" % (self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
 
     def testARqual(self):
         self.assertEqual( self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 1: %s != %s" % (self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
         self.assertEqual( self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "quality string mismatch in read 2: %s != %s" % (self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
+        self.assertEqual( self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 3: %s != %s" % (self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
+
+    def testARquery(self):
+        self.assertEqual( self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "query mismatch in read 1: %s != %s" % (self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
+        self.assertEqual( self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "query size mismatch in read 2: %s != %s" % (self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
+        self.assertEqual( self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT", "query mismatch in read 4: %s != %s" % (self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT") )
+
+    def testARqqual(self):
+        self.assertEqual( self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "qquality string mismatch in read 1: %s != %s" % (self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
+        self.assertEqual( self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "qquality string mismatch in read 2: %s != %s" % (self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
+        self.assertEqual( self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22", "qquality string mismatch in read 3: %s != %s" % (self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22") )
 
     def testPresentOptionalFields(self):
         self.assertEqual( self.reads[0].opt('NM'), 1, "optional field mismatch in read 1, NM: %s != %s" % (self.reads[0].opt('NM'), 1) )
@@ -518,7 +551,14 @@ class TestPileupObjects(unittest.TestCase):
 
     def tearDown(self):
         self.samfile.close()
-        
+
+class TestContextManager(unittest.TestCase):
+
+    def testManager( self ):
+        with pysam.Samfile('ex1.bam', 'rb') as samfile:
+            samfile.fetch()
+        self.assertEqual( samfile._isOpen(), False )
+
 class TestExceptions(unittest.TestCase):
 
     def setUp(self):
@@ -581,20 +621,26 @@ class TestFastaFile(unittest.TestCase):
             self.assertEqual( seq, self.file.fetch( id ) )
             for x in range( 0, len(seq), 10):
                 self.assertEqual( seq[x:x+10], self.file.fetch( id, x, x+10) )
+                # test x:end
+                self.assertEqual( seq[x:], self.file.fetch( id, x) )
+                # test 0:x
+                self.assertEqual( seq[:x], self.file.fetch( id, None, x) )
+
+        
+        # unknown sequence returns ""
+        self.assertEqual( "", self.file.fetch("chr12") )
 
     def testFetchErrors( self ):
         self.assertRaises( ValueError, self.file.fetch )
-        self.assertRaises( ValueError, self.file.fetch, "chr1", 0 )
         self.assertRaises( ValueError, self.file.fetch, "chr1", -1, 10 )
         self.assertRaises( ValueError, self.file.fetch, "chr1", 20, 10 )
-        # the following segfaults:
-        # self.assertRaises( IndexError, self.file.fetch, "chr12", )
-        pass
 
+    def testLength( self ):
+        self.assertEqual( len(self.file), 2 )
+        
     def tearDown(self):
         self.file.close()
 
-
 class TestAlignedRead(unittest.TestCase):
     '''tests to check if aligned read can be constructed
     and manipulated.
@@ -803,7 +849,7 @@ class TestDeNovoConstruction(unittest.TestCase):
         others = list(infile)
         for denovo, other in zip( others, self.reads):
             self.checkFieldEqual( other, denovo )
-            self.assertEqual( other, denovo)
+            self.assertEqual( other.compare( denovo ), 0 )
 
     def testSAMPerRead( self ):
         '''check if individual reads are binary equal.'''
@@ -812,7 +858,7 @@ class TestDeNovoConstruction(unittest.TestCase):
         others = list(infile)
         for denovo, other in zip( others, self.reads):
             self.checkFieldEqual( other, denovo )
-            self.assertEqual( other, denovo)
+            self.assertEqual( other.compare( denovo), 0 )
             
     def testBAMWholeFile( self ):
         
@@ -828,6 +874,82 @@ class TestDeNovoConstruction(unittest.TestCase):
         
         os.unlink( tmpfilename )
 
+class TestDoubleFetch(unittest.TestCase):
+    '''check if two iterators on the same bamfile are independent.'''
+    
+    def testDoubleFetch( self ):
+
+        samfile1 = pysam.Samfile('ex1.bam', 'rb')
+
+        for a,b in zip(samfile1.fetch(), samfile1.fetch()):
+            self.assertEqual( a.compare( b ), 0 )
+
+    def testDoubleFetchWithRegion( self ):
+
+        samfile1 = pysam.Samfile('ex1.bam', 'rb')
+        chr, start, stop = 'chr1', 200, 3000000
+        self.assertTrue(len(list(samfile1.fetch ( chr, start, stop))) > 0) #just making sure the test has something to catch
+
+        for a,b in zip(samfile1.fetch( chr, start, stop), samfile1.fetch( chr, start, stop)):
+            self.assertEqual( a.compare( b ), 0 ) 
+
+    def testDoubleFetchUntilEOF( self ):
+
+        samfile1 = pysam.Samfile('ex1.bam', 'rb')
+
+        for a,b in zip(samfile1.fetch( until_eof = True), 
+                       samfile1.fetch( until_eof = True )):
+            self.assertEqual( a.compare( b), 0 )
+
+class TestRemoteFileFTP(unittest.TestCase):
+    '''test remote access.
+
+    '''
+
+    # Need to find an ftp server without password on standard
+    # port.
+
+    url = "ftp://ftp.sanger.ac.uk/pub/rd/humanSequences/CV.bam"
+    region = "1:1-1000"
+
+    def testFTPView( self ):
+        result = pysam.view( self.url, self.region )
+        self.assertEqual( len(result), 36 )
+        
+    def testFTPFetch( self ):
+        samfile = pysam.Samfile(self.url, "rb")  
+        result = list(samfile.fetch( region = self.region ))
+        self.assertEqual( len(result), 36 )
+
+class TestRemoteFileHTTP( unittest.TestCase):
+
+    url = "http://genserv.anat.ox.ac.uk/downloads/pysam/test/ex1.bam"
+    region = "chr1:1-1000"
+    local = "ex1.bam"
+
+    def testView( self ):
+        self.assertRaises( pysam.SamtoolsError, pysam.view, self.url, self.region )
+        
+    def testFetch( self ):
+        samfile = pysam.Samfile(self.url, "rb")  
+        result = list(samfile.fetch( region = self.region ))
+        samfile_local = pysam.Samfile(self.local, "rb")  
+        ref = list(samfile_local.fetch( region = self.region ))
+
+        self.assertEqual( len(ref), len(result) )
+        for x, y in zip(result, ref):
+            self.assertEqual( x.compare( y ), 0 )
+
+    def testFetchAll( self ):
+        samfile = pysam.Samfile(self.url, "rb")  
+        result = list(samfile.fetch())
+        samfile_local = pysam.Samfile(self.local, "rb")  
+        ref = list(samfile_local.fetch() )
+
+        self.assertEqual( len(ref), len(result) )
+        for x, y in zip(result, ref):
+            self.assertEqual( x.compare( y ), 0 )
+
 
 # TODOS
 # 1. finish testing all properties within pileup objects