Imported Upstream version 0.3.1
[pysam.git] / tests / pysam_test.py
index c2ae6fa37b8d34008eb639adc608fc80de7411d2..aefcbb94e2a60fcdd651ae7695c2e22d9834952e 100755 (executable)
@@ -7,7 +7,7 @@ and data files located there.
 
 import pysam
 import unittest
-import os
+import os, re
 import itertools
 import subprocess
 import shutil
@@ -47,7 +47,13 @@ def runSamtools( cmd ):
     except OSError, e:
         print >>sys.stderr, "Execution failed:", e
 
-        
+def getSamtoolsVersion():
+    '''return samtools version'''
+
+    pipe = subprocess.Popen("samtools", shell=True, stderr=subprocess.PIPE).stderr
+    lines = "".join(pipe.readlines())
+    return re.search( "Version:\s+(\S+)", lines).groups()[0]
+
 class BinaryTest(unittest.TestCase):
     '''test samtools command line commands and compare
     against pysam commands.
@@ -94,11 +100,17 @@ class BinaryTest(unittest.TestCase):
                 ("ex1.view", "samtools view ex1.bam > ex1.view"),
                 ("pysam_ex1.view", (pysam.view, "ex1.bam" ) ),
                 ),
+          "view2" :
+        (
+                ("ex1.view", "samtools view -bT ex1.fa -o ex1.view2 ex1.sam"),
+                # note that -o ex1.view2 throws exception.
+                ("pysam_ex1.view", (pysam.view, "-bT ex1.fa -oex1.view2 ex1.sam" ) ),
+                ),
         }
 
     # some tests depend on others. The order specifies in which order
     # the samtools commands are executed.
-    mOrder = ('faidx', 'import', 'index', 'pileup1', 'pileup2', 'glfview', 'view' )
+    mOrder = ('faidx', 'import', 'index', 'pileup1', 'pileup2', 'glfview', 'view', 'view2' )
 
     def setUp( self ):
         '''setup tests. 
@@ -125,12 +137,19 @@ class BinaryTest(unittest.TestCase):
 
             BinaryTest.first_time = False
 
+        samtools_version = getSamtoolsVersion()
+        if samtools_version != pysam.__samtools_version__:
+            raise ValueError("versions of pysam/samtools and samtools differ: %s != %s" % \
+                                 (pysam.__samtools_version__,
+                                  samtools_version ))
+
     def checkCommand( self, command ):
+
         if command:
             samtools_target, pysam_target = self.mCommands[command][0][0], self.mCommands[command][1][0]
             self.assertTrue( checkBinaryEqual( samtools_target, pysam_target ), 
                              "%s failed: files %s and %s are not the same" % (command, samtools_target, pysam_target) )
-
+            
     def testImport( self ):
         self.checkCommand( "import" )
 
@@ -153,7 +172,7 @@ class BinaryTest(unittest.TestCase):
         self.assertRaises( pysam.SamtoolsError, pysam.index, "exdoesntexist.bam" )
 
     def __del__(self):
-
+        return
         for label, command in self.mCommands.iteritems():
             samtools_target, samtools_command = command[0]
             pysam_target, pysam_command = command[1]
@@ -234,11 +253,22 @@ class IOTest(unittest.TestCase):
         samfile.close()
         self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
 
-    def testPileupFromClosedFile( self ):
+    def testClosedFile( self ):
+        '''test that access to a closed samfile raises ValueError.'''
 
         samfile = pysam.Samfile( "ex1.bam", "rb" )
         samfile.close()
+        self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
         self.assertRaises( ValueError, samfile.pileup, 'chr1', 100, 120)
+        self.assertRaises( ValueError, samfile.getrname, 0 )
+        self.assertRaises( ValueError, samfile.tell )
+        self.assertRaises( ValueError, samfile.write, None )
+        self.assertRaises( ValueError, samfile.seek, 0 )
+        self.assertRaises( ValueError, getattr, samfile, "nreferences" )
+        self.assertRaises( ValueError, getattr, samfile, "references" )
+        self.assertRaises( ValueError, getattr, samfile, "lengths" )
+        self.assertRaises( ValueError, getattr, samfile, "text" )
+        self.assertRaises( ValueError, getattr, samfile, "header" )
 
     def testBinaryReadFromSamfile( self ):
         pass
@@ -283,6 +313,7 @@ class TestIteratorRow(unittest.TestCase):
     def tearDown(self):
         self.samfile.close()
 
+
 class TestIteratorRowAll(unittest.TestCase):
 
     def setUp(self):
@@ -348,7 +379,8 @@ class TestIteratorColumn(unittest.TestCase):
                     self.assertEqual( len(columns), refcov, "wrong number of pileup columns returned for position %s:%i, %i should be %i" %(contig,pos,len(columns), refcov) )
                 elif refcov == 1:
                     # one read, all columns of the read are returned
-                    self.assertEqual( len(columns), refcolumns, "pileup incomplete - %i should be %i " % (len(columns), refcolumns))
+                    self.assertEqual( len(columns), refcolumns, "pileup incomplete at position %i: got %i, expected %i " %\
+                                          (pos, len(columns), refcolumns))
                     
     def tearDown(self):
         self.samfile.close()
@@ -398,10 +430,22 @@ class TestAlignedReadFromBam(unittest.TestCase):
     def testARseq(self):
         self.assertEqual( self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 1: %s != %s" % (self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
         self.assertEqual( self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "sequence size mismatch in read 2: %s != %s" % (self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
+        self.assertEqual( self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 4: %s != %s" % (self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
 
     def testARqual(self):
         self.assertEqual( self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 1: %s != %s" % (self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
         self.assertEqual( self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "quality string mismatch in read 2: %s != %s" % (self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
+        self.assertEqual( self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 3: %s != %s" % (self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
+
+    def testARquery(self):
+        self.assertEqual( self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "query mismatch in read 1: %s != %s" % (self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
+        self.assertEqual( self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "query size mismatch in read 2: %s != %s" % (self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
+        self.assertEqual( self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT", "query mismatch in read 4: %s != %s" % (self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT") )
+
+    def testARqqual(self):
+        self.assertEqual( self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "qquality string mismatch in read 1: %s != %s" % (self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
+        self.assertEqual( self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "qquality string mismatch in read 2: %s != %s" % (self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
+        self.assertEqual( self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22", "qquality string mismatch in read 3: %s != %s" % (self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22") )
 
     def testPresentOptionalFields(self):
         self.assertEqual( self.reads[0].opt('NM'), 1, "optional field mismatch in read 1, NM: %s != %s" % (self.reads[0].opt('NM'), 1) )
@@ -518,7 +562,14 @@ class TestPileupObjects(unittest.TestCase):
 
     def tearDown(self):
         self.samfile.close()
-        
+
+class TestContextManager(unittest.TestCase):
+
+    def testManager( self ):
+        with pysam.Samfile('ex1.bam', 'rb') as samfile:
+            samfile.fetch()
+        self.assertEqual( samfile._isOpen(), False )
+
 class TestExceptions(unittest.TestCase):
 
     def setUp(self):
@@ -562,9 +613,29 @@ class TestExceptions(unittest.TestCase):
     def testOutOfRangeLargeOldFormat(self):
         self.assertRaises( ValueError, self.samfile.fetch, "chr1:99999999999999999-999999999999999999" )
 
+    def testZeroToZero(self):        
+        '''see issue 44'''
+        self.assertEqual( len(list(self.samfile.fetch('chr1', 0, 0))), 0)
+
     def tearDown(self):
         self.samfile.close()
 
+
+class TestWrongFormat(unittest.TestCase):
+    '''test cases for opening files not in bam/sam format.'''
+
+    def testOpenSamAsBam( self ):
+        self.assertRaises( ValueError, pysam.Samfile, 'ex1.sam', 'rb' )
+
+    def testOpenBamAsSam( self ):
+        self.assertRaises( ValueError, pysam.Samfile, 'ex1.bam', 'r' )
+
+    def testOpenFastaAsSam( self ):
+        self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'r' )
+
+    def testOpenFastaAsBam( self ):
+        self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'rb' )
+
 class TestFastaFile(unittest.TestCase):
 
     mSequences = { 'chr1' :
@@ -581,20 +652,26 @@ class TestFastaFile(unittest.TestCase):
             self.assertEqual( seq, self.file.fetch( id ) )
             for x in range( 0, len(seq), 10):
                 self.assertEqual( seq[x:x+10], self.file.fetch( id, x, x+10) )
+                # test x:end
+                self.assertEqual( seq[x:], self.file.fetch( id, x) )
+                # test 0:x
+                self.assertEqual( seq[:x], self.file.fetch( id, None, x) )
+
+        
+        # unknown sequence returns ""
+        self.assertEqual( "", self.file.fetch("chr12") )
 
     def testFetchErrors( self ):
         self.assertRaises( ValueError, self.file.fetch )
-        self.assertRaises( ValueError, self.file.fetch, "chr1", 0 )
         self.assertRaises( ValueError, self.file.fetch, "chr1", -1, 10 )
         self.assertRaises( ValueError, self.file.fetch, "chr1", 20, 10 )
-        # the following segfaults:
-        # self.assertRaises( IndexError, self.file.fetch, "chr12", )
-        pass
 
+    def testLength( self ):
+        self.assertEqual( len(self.file), 2 )
+        
     def tearDown(self):
         self.file.close()
 
-
 class TestAlignedRead(unittest.TestCase):
     '''tests to check if aligned read can be constructed
     and manipulated.
@@ -645,7 +722,7 @@ class TestAlignedRead(unittest.TestCase):
         a.mpos=200
         a.isize=167
        a.qual="1234" * 3
-
+        # todo: create tags
         return a
 
     def testUpdate( self ):
@@ -714,6 +791,19 @@ class TestAlignedRead(unittest.TestCase):
 
         return a
 
+    def testTagParsing( self ):
+        '''test for tag parsing
+
+        see http://groups.google.com/group/pysam-user-group/browse_thread/thread/67ca204059ea465a
+        '''
+        samfile=pysam.Samfile( "ex8.bam","rb" )
+
+        for entry in samfile:
+            before = entry.tags
+            entry.tags = entry.tags
+            after = entry.tags
+            self.assertEqual( after, before )
+
 class TestDeNovoConstruction(unittest.TestCase):
     '''check BAM/SAM file construction using ex3.sam
     
@@ -803,7 +893,7 @@ class TestDeNovoConstruction(unittest.TestCase):
         others = list(infile)
         for denovo, other in zip( others, self.reads):
             self.checkFieldEqual( other, denovo )
-            self.assertEqual( other, denovo)
+            self.assertEqual( other.compare( denovo ), 0 )
 
     def testSAMPerRead( self ):
         '''check if individual reads are binary equal.'''
@@ -812,7 +902,7 @@ class TestDeNovoConstruction(unittest.TestCase):
         others = list(infile)
         for denovo, other in zip( others, self.reads):
             self.checkFieldEqual( other, denovo )
-            self.assertEqual( other, denovo)
+            self.assertEqual( other.compare( denovo), 0 )
             
     def testBAMWholeFile( self ):
         
@@ -829,6 +919,83 @@ class TestDeNovoConstruction(unittest.TestCase):
         os.unlink( tmpfilename )
 
 
+class TestDoubleFetch(unittest.TestCase):
+    '''check if two iterators on the same bamfile are independent.'''
+    
+    def testDoubleFetch( self ):
+
+        samfile1 = pysam.Samfile('ex1.bam', 'rb')
+
+        for a,b in zip(samfile1.fetch(), samfile1.fetch()):
+            self.assertEqual( a.compare( b ), 0 )
+
+    def testDoubleFetchWithRegion( self ):
+
+        samfile1 = pysam.Samfile('ex1.bam', 'rb')
+        chr, start, stop = 'chr1', 200, 3000000
+        self.assertTrue(len(list(samfile1.fetch ( chr, start, stop))) > 0) #just making sure the test has something to catch
+
+        for a,b in zip(samfile1.fetch( chr, start, stop), samfile1.fetch( chr, start, stop)):
+            self.assertEqual( a.compare( b ), 0 ) 
+
+    def testDoubleFetchUntilEOF( self ):
+
+        samfile1 = pysam.Samfile('ex1.bam', 'rb')
+
+        for a,b in zip(samfile1.fetch( until_eof = True), 
+                       samfile1.fetch( until_eof = True )):
+            self.assertEqual( a.compare( b), 0 )
+
+class TestRemoteFileFTP(unittest.TestCase):
+    '''test remote access.
+
+    '''
+
+    # Need to find an ftp server without password on standard
+    # port.
+
+    url = "ftp://ftp.sanger.ac.uk/pub/rd/humanSequences/CV.bam"
+    region = "1:1-1000"
+
+    def testFTPView( self ):
+        result = pysam.view( self.url, self.region )
+        self.assertEqual( len(result), 36 )
+        
+    def testFTPFetch( self ):
+        samfile = pysam.Samfile(self.url, "rb")  
+        result = list(samfile.fetch( region = self.region ))
+        self.assertEqual( len(result), 36 )
+
+class TestRemoteFileHTTP( unittest.TestCase):
+
+    url = "http://genserv.anat.ox.ac.uk/downloads/pysam/test/ex1.bam"
+    region = "chr1:1-1000"
+    local = "ex1.bam"
+
+    def testView( self ):
+        self.assertRaises( pysam.SamtoolsError, pysam.view, self.url, self.region )
+        
+    def testFetch( self ):
+        samfile = pysam.Samfile(self.url, "rb")  
+        result = list(samfile.fetch( region = self.region ))
+        samfile_local = pysam.Samfile(self.local, "rb")  
+        ref = list(samfile_local.fetch( region = self.region ))
+
+        self.assertEqual( len(ref), len(result) )
+        for x, y in zip(result, ref):
+            self.assertEqual( x.compare( y ), 0 )
+
+    def testFetchAll( self ):
+        samfile = pysam.Samfile(self.url, "rb")  
+        result = list(samfile.fetch())
+        samfile_local = pysam.Samfile(self.local, "rb")  
+        ref = list(samfile_local.fetch() )
+
+        self.assertEqual( len(ref), len(result) )
+        for x, y in zip(result, ref):
+            self.assertEqual( x.compare( y ), 0 )
+
+
 # TODOS
 # 1. finish testing all properties within pileup objects
 # 2. check exceptions and bad input problems (missing files, optional fields that aren't present, etc...)