import pysam
import unittest
-import os
+import os, re
import itertools
import subprocess
import shutil
except OSError, e:
print >>sys.stderr, "Execution failed:", e
-
+def getSamtoolsVersion():
+ '''return samtools version'''
+
+ pipe = subprocess.Popen("samtools", shell=True, stderr=subprocess.PIPE).stderr
+ lines = "".join(pipe.readlines())
+ return re.search( "Version:\s+(\S+)", lines).groups()[0]
+
class BinaryTest(unittest.TestCase):
'''test samtools command line commands and compare
against pysam commands.
BinaryTest.first_time = False
+ samtools_version = getSamtoolsVersion()
+ if samtools_version != pysam.__samtools_version__:
+ raise ValueError("versions of pysam/samtools and samtools differ: %s != %s" % \
+ (pysam.__samtools_version__,
+ samtools_version ))
+
def checkCommand( self, command ):
+
if command:
samtools_target, pysam_target = self.mCommands[command][0][0], self.mCommands[command][1][0]
self.assertTrue( checkBinaryEqual( samtools_target, pysam_target ),
"%s failed: files %s and %s are not the same" % (command, samtools_target, pysam_target) )
-
+
def testImport( self ):
self.checkCommand( "import" )
self.assertRaises( pysam.SamtoolsError, pysam.index, "exdoesntexist.bam" )
def __del__(self):
-
+ return
for label, command in self.mCommands.iteritems():
samtools_target, samtools_command = command[0]
pysam_target, pysam_command = command[1]
self.assertRaises( ValueError, samfile.fetch )
self.assertEqual( len(list( samfile.fetch(until_eof = True) )), 3270 )
+ def testReadingFromFileWithWrongMode( self ):
+
+ assert not os.path.exists( "ex2.bam.bai" )
+ samfile = pysam.Samfile( "ex2.bam", "r" )
+ self.assertRaises( ValueError, samfile.fetch )
+
class TestIteratorRow(unittest.TestCase):
def setUp(self):
def tearDown(self):
self.samfile.close()
+
class TestIteratorRowAll(unittest.TestCase):
def setUp(self):
self.assertEqual( len(columns), refcov, "wrong number of pileup columns returned for position %s:%i, %i should be %i" %(contig,pos,len(columns), refcov) )
elif refcov == 1:
# one read, all columns of the read are returned
- self.assertEqual( len(columns), refcolumns, "pileup incomplete - %i should be %i " % (len(columns), refcolumns))
+ self.assertEqual( len(columns), refcolumns, "pileup incomplete at position %i: got %i, expected %i " %\
+ (pos, len(columns), refcolumns))
def tearDown(self):
self.samfile.close()
def testARseq(self):
self.assertEqual( self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 1: %s != %s" % (self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
self.assertEqual( self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "sequence size mismatch in read 2: %s != %s" % (self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
+ self.assertEqual( self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 4: %s != %s" % (self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
def testARqual(self):
self.assertEqual( self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 1: %s != %s" % (self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
self.assertEqual( self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "quality string mismatch in read 2: %s != %s" % (self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
+ self.assertEqual( self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 3: %s != %s" % (self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
+
+ def testARquery(self):
+ self.assertEqual( self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "query mismatch in read 1: %s != %s" % (self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
+ self.assertEqual( self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "query size mismatch in read 2: %s != %s" % (self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
+ self.assertEqual( self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT", "query mismatch in read 4: %s != %s" % (self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT") )
+
+ def testARqqual(self):
+ self.assertEqual( self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "qquality string mismatch in read 1: %s != %s" % (self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
+ self.assertEqual( self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "qquality string mismatch in read 2: %s != %s" % (self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
+ self.assertEqual( self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22", "qquality string mismatch in read 3: %s != %s" % (self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22") )
def testPresentOptionalFields(self):
self.assertEqual( self.reads[0].opt('NM'), 1, "optional field mismatch in read 1, NM: %s != %s" % (self.reads[0].opt('NM'), 1) )
def tearDown(self):
self.samfile.close()
-
+
+class TestContextManager(unittest.TestCase):
+
+ def testManager( self ):
+ with pysam.Samfile('ex1.bam', 'rb') as samfile:
+ samfile.fetch()
+ self.assertEqual( samfile._isOpen(), False )
+
class TestExceptions(unittest.TestCase):
def setUp(self):
self.assertEqual( seq, self.file.fetch( id ) )
for x in range( 0, len(seq), 10):
self.assertEqual( seq[x:x+10], self.file.fetch( id, x, x+10) )
+ # test x:end
+ self.assertEqual( seq[x:], self.file.fetch( id, x) )
+ # test 0:x
+ self.assertEqual( seq[:x], self.file.fetch( id, None, x) )
+
+
+ # unknown sequence returns ""
+ self.assertEqual( "", self.file.fetch("chr12") )
def testFetchErrors( self ):
self.assertRaises( ValueError, self.file.fetch )
- self.assertRaises( ValueError, self.file.fetch, "chr1", 0 )
self.assertRaises( ValueError, self.file.fetch, "chr1", -1, 10 )
self.assertRaises( ValueError, self.file.fetch, "chr1", 20, 10 )
- # the following segfaults:
- # self.assertRaises( IndexError, self.file.fetch, "chr12", )
- pass
+ def testLength( self ):
+ self.assertEqual( len(self.file), 2 )
+
def tearDown(self):
self.file.close()
-
class TestAlignedRead(unittest.TestCase):
'''tests to check if aligned read can be constructed
and manipulated.
others = list(infile)
for denovo, other in zip( others, self.reads):
self.checkFieldEqual( other, denovo )
- self.assertEqual( other, denovo)
+ self.assertEqual( other.compare( denovo ), 0 )
def testSAMPerRead( self ):
'''check if individual reads are binary equal.'''
others = list(infile)
for denovo, other in zip( others, self.reads):
self.checkFieldEqual( other, denovo )
- self.assertEqual( other, denovo)
+ self.assertEqual( other.compare( denovo), 0 )
def testBAMWholeFile( self ):
os.unlink( tmpfilename )
+class TestDoubleFetch(unittest.TestCase):
+ '''check if two iterators on the same bamfile are independent.'''
+
+ def testDoubleFetch( self ):
+
+ samfile1 = pysam.Samfile('ex1.bam', 'rb')
+
+ for a,b in zip(samfile1.fetch(), samfile1.fetch()):
+ self.assertEqual( a.compare( b ), 0 )
+
+ def testDoubleFetchWithRegion( self ):
+
+ samfile1 = pysam.Samfile('ex1.bam', 'rb')
+ chr, start, stop = 'chr1', 200, 3000000
+ self.assertTrue(len(list(samfile1.fetch ( chr, start, stop))) > 0) #just making sure the test has something to catch
+
+ for a,b in zip(samfile1.fetch( chr, start, stop), samfile1.fetch( chr, start, stop)):
+ self.assertEqual( a.compare( b ), 0 )
+
+ def testDoubleFetchUntilEOF( self ):
+
+ samfile1 = pysam.Samfile('ex1.bam', 'rb')
+
+ for a,b in zip(samfile1.fetch( until_eof = True),
+ samfile1.fetch( until_eof = True )):
+ self.assertEqual( a.compare( b), 0 )
+
+class TestRemoteFileFTP(unittest.TestCase):
+ '''test remote access.
+
+ '''
+
+ # Need to find an ftp server without password on standard
+ # port.
+
+ url = "ftp://ftp.sanger.ac.uk/pub/rd/humanSequences/CV.bam"
+ region = "1:1-1000"
+
+ def testFTPView( self ):
+ result = pysam.view( self.url, self.region )
+ self.assertEqual( len(result), 36 )
+
+ def testFTPFetch( self ):
+ samfile = pysam.Samfile(self.url, "rb")
+ result = list(samfile.fetch( region = self.region ))
+ self.assertEqual( len(result), 36 )
+
+class TestRemoteFileHTTP( unittest.TestCase):
+
+ url = "http://genserv.anat.ox.ac.uk/downloads/pysam/test/ex1.bam"
+ region = "chr1:1-1000"
+ local = "ex1.bam"
+
+ def testView( self ):
+ self.assertRaises( pysam.SamtoolsError, pysam.view, self.url, self.region )
+
+ def testFetch( self ):
+ samfile = pysam.Samfile(self.url, "rb")
+ result = list(samfile.fetch( region = self.region ))
+ samfile_local = pysam.Samfile(self.local, "rb")
+ ref = list(samfile_local.fetch( region = self.region ))
+
+ self.assertEqual( len(ref), len(result) )
+ for x, y in zip(result, ref):
+ self.assertEqual( x.compare( y ), 0 )
+
+ def testFetchAll( self ):
+ samfile = pysam.Samfile(self.url, "rb")
+ result = list(samfile.fetch())
+ samfile_local = pysam.Samfile(self.local, "rb")
+ ref = list(samfile_local.fetch() )
+
+ self.assertEqual( len(ref), len(result) )
+ for x, y in zip(result, ref):
+ self.assertEqual( x.compare( y ), 0 )
+
# TODOS
# 1. finish testing all properties within pileup objects