import pysam
import unittest
-import os
-import itertools
+import os, re, sys
+import itertools, collections
import subprocess
import shutil
-
+import logging
+SAMTOOLS="samtools"
def checkBinaryEqual( filename1, filename2 ):
'''return true if the two files are binary equal.'''
except OSError, e:
print >>sys.stderr, "Execution failed:", e
-
+def getSamtoolsVersion():
+ '''return samtools version'''
+
+ pipe = subprocess.Popen(SAMTOOLS, shell=True, stderr=subprocess.PIPE).stderr
+ lines = "".join(pipe.readlines())
+ return re.search( "Version:\s+(\S+)", lines).groups()[0]
+
class BinaryTest(unittest.TestCase):
'''test samtools command line commands and compare
against pysam commands.
mCommands = \
{ "faidx" : \
(
- ("ex1.fa.fai", "samtools faidx ex1.fa"),
+ ("ex1.fa.fai", "faidx ex1.fa"),
("pysam_ex1.fa.fai", (pysam.faidx, "ex1.fa") ),
),
"import" :
(
- ("ex1.bam", "samtools import ex1.fa.fai ex1.sam.gz ex1.bam" ),
+ ("ex1.bam", "import ex1.fa.fai ex1.sam.gz ex1.bam" ),
("pysam_ex1.bam", (pysam.samimport, "ex1.fa.fai ex1.sam.gz pysam_ex1.bam") ),
),
"index":
(
- ("ex1.bam.bai", "samtools index ex1.bam" ),
+ ("ex1.bam.bai", "index ex1.bam" ),
("pysam_ex1.bam.bai", (pysam.index, "pysam_ex1.bam" ) ),
),
"pileup1" :
(
- ("ex1.pileup", "samtools pileup -cf ex1.fa ex1.bam > ex1.pileup" ),
+ ("ex1.pileup", "pileup -cf ex1.fa ex1.bam > ex1.pileup" ),
("pysam_ex1.pileup", (pysam.pileup, "-c -f ex1.fa ex1.bam" ) )
),
"pileup2" :
(
- ("ex1.glf", "samtools pileup -gf ex1.fa ex1.bam > ex1.glf" ),
+ ("ex1.glf", "pileup -gf ex1.fa ex1.bam > ex1.glf" ),
("pysam_ex1.glf", (pysam.pileup, "-g -f ex1.fa ex1.bam" ) )
),
"glfview" :
(
- ("ex1.glfview", "samtools glfview ex1.glf > ex1.glfview"),
+ ("ex1.glfview", "glfview ex1.glf > ex1.glfview"),
("pysam_ex1.glfview", (pysam.glfview, "ex1.glf" ) ),
),
"view" :
(
- ("ex1.view", "samtools view ex1.bam > ex1.view"),
+ ("ex1.view", "view ex1.bam > ex1.view"),
("pysam_ex1.view", (pysam.view, "ex1.bam" ) ),
),
+ "view2" :
+ (
+ ("ex1.view", "view -bT ex1.fa -o ex1.view2 ex1.sam"),
+ # note that -o ex1.view2 throws exception.
+ ("pysam_ex1.view", (pysam.view, "-bT ex1.fa -oex1.view2 ex1.sam" ) ),
+ ),
}
# some tests depend on others. The order specifies in which order
# the samtools commands are executed.
- mOrder = ('faidx', 'import', 'index', 'pileup1', 'pileup2', 'glfview', 'view' )
+ mOrder = ('faidx', 'import', 'index',
+ 'pileup1', 'pileup2',
+ # 'glfview', deprecated
+ 'view', 'view2' )
def setUp( self ):
'''setup tests.
command = self.mCommands[label]
samtools_target, samtools_command = command[0]
pysam_target, pysam_command = command[1]
- runSamtools( samtools_command )
+ runSamtools( " ".join( (SAMTOOLS, samtools_command )))
pysam_method, pysam_options = pysam_command
output = pysam_method( *pysam_options.split(" "), raw=True)
if ">" in samtools_command:
- outfile = open( pysam_target, "w" )
+ outfile = open( pysam_target, "wb" )
for line in output: outfile.write( line )
outfile.close()
BinaryTest.first_time = False
+ samtools_version = getSamtoolsVersion()
+
+ def _r( s ):
+ # patch - remove any of the alpha/beta suffixes, i.e., 0.1.12a -> 0.1.12
+ if s.count('-') > 0: s = s[0:s.find('-')]
+ return re.sub( "[^0-9.]", "", s )
+
+ if _r(samtools_version) != _r( pysam.__samtools_version__):
+ raise ValueError("versions of pysam/samtools and samtools differ: %s != %s" % \
+ (pysam.__samtools_version__,
+ samtools_version ))
+
def checkCommand( self, command ):
if command:
samtools_target, pysam_target = self.mCommands[command][0][0], self.mCommands[command][1][0]
self.assertTrue( checkBinaryEqual( samtools_target, pysam_target ),
"%s failed: files %s and %s are not the same" % (command, samtools_target, pysam_target) )
-
+
def testImport( self ):
self.checkCommand( "import" )
def testPileup2( self ):
self.checkCommand( "pileup2" )
- def testGLFView( self ):
- self.checkCommand( "glfview" )
+ # deprecated
+ # def testGLFView( self ):
+ # self.checkCommand( "glfview" )
def testView( self ):
self.checkCommand( "view" )
def testEmptyIndex( self ):
- self.assertRaises( pysam.SamtoolsError, pysam.index, "exdoesntexist.bam" )
+ self.assertRaises( IOError, pysam.index, "exdoesntexist.bam" )
def __del__(self):
-
+ return
for label, command in self.mCommands.iteritems():
samtools_target, samtools_command = command[0]
pysam_target, pysam_command = command[1]
samfile.close()
self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
- def testPileupFromClosedFile( self ):
+ def testClosedFile( self ):
+ '''test that access to a closed samfile raises ValueError.'''
samfile = pysam.Samfile( "ex1.bam", "rb" )
samfile.close()
+ self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
self.assertRaises( ValueError, samfile.pileup, 'chr1', 100, 120)
+ self.assertRaises( ValueError, samfile.getrname, 0 )
+ self.assertRaises( ValueError, samfile.tell )
+ self.assertRaises( ValueError, samfile.seek, 0 )
+ self.assertRaises( ValueError, getattr, samfile, "nreferences" )
+ self.assertRaises( ValueError, getattr, samfile, "references" )
+ self.assertRaises( ValueError, getattr, samfile, "lengths" )
+ self.assertRaises( ValueError, getattr, samfile, "text" )
+ self.assertRaises( ValueError, getattr, samfile, "header" )
+
+ # write on closed file
+ self.assertEqual( 0, samfile.write(None) )
+
+ def testAutoDetection( self ):
+ samfile = pysam.Samfile( "ex3.bam" )
+ samfile.fetch()
+ samfile.close()
- def testBinaryReadFromSamfile( self ):
- pass
- # needs to re-activated, see issue 19
- #samfile = pysam.Samfile( "ex1.bam", "r" )
- #samfile.fetch().next()
+ samfile = pysam.Samfile( "ex3.sam")
+ samfile.fetch()
+ samfile.close()
def testReadingFromFileWithoutIndex( self ):
'''read from bam file without index.'''
def checkRange( self, rnge ):
'''compare results from iterator with those from samtools.'''
ps = list(self.samfile.fetch(region=rnge))
- sa = list(pysam.view( "ex1.bam", rnge , raw = True) )
+ sa = list(pysam.view( "ex1.bam", rnge, raw = True) )
self.assertEqual( len(ps), len(sa), "unequal number of results for range %s: %i != %i" % (rnge, len(ps), len(sa) ))
# check if the same reads are returned and in the same order
for line, pair in enumerate( zip( ps, sa ) ):
- data = pair[1].split("\t")
- self.assertEqual( pair[0].qname, data[0], "read id mismatch in line %i: %s != %s" % (line, pair[0].rname, data[0]) )
+ a,b = pair
+ d = b.split("\t")
+ self.assertEqual( a.qname, d[0], "line %i: read id mismatch: %s != %s" % (line, a.rname, d[0]) )
+ self.assertEqual( a.pos, int(d[3])-1, "line %i: read position mismatch: %s != %s, \n%s\n%s\n" % \
+ (line, a.pos, int(d[3])-1,
+ str(a), str(d) ) )
+ self.assertEqual( a.qual, d[10], "line %i: quality mismatch: %s != %s, \n%s\n%s\n" % \
+ (line, a.qual, d[10],
+ str(a), str(d) ) )
def testIteratePerContig(self):
'''check random access per contig'''
self.assertEqual( len(columns), refcov, "wrong number of pileup columns returned for position %s:%i, %i should be %i" %(contig,pos,len(columns), refcov) )
elif refcov == 1:
# one read, all columns of the read are returned
- self.assertEqual( len(columns), refcolumns, "pileup incomplete - %i should be %i " % (len(columns), refcolumns))
+ self.assertEqual( len(columns), refcolumns, "pileup incomplete at position %i: got %i, expected %i " %\
+ (pos, len(columns), refcolumns))
+
+
def tearDown(self):
self.samfile.close()
def testARseq(self):
self.assertEqual( self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 1: %s != %s" % (self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
self.assertEqual( self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "sequence size mismatch in read 2: %s != %s" % (self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
+ self.assertEqual( self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 4: %s != %s" % (self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
def testARqual(self):
self.assertEqual( self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 1: %s != %s" % (self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
self.assertEqual( self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "quality string mismatch in read 2: %s != %s" % (self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
+ self.assertEqual( self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 3: %s != %s" % (self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
+
+ def testARquery(self):
+ self.assertEqual( self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "query mismatch in read 1: %s != %s" % (self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
+ self.assertEqual( self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "query size mismatch in read 2: %s != %s" % (self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
+ self.assertEqual( self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT", "query mismatch in read 4: %s != %s" % (self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT") )
+
+ def testARqqual(self):
+ self.assertEqual( self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "qquality string mismatch in read 1: %s != %s" % (self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
+ self.assertEqual( self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "qquality string mismatch in read 2: %s != %s" % (self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
+ self.assertEqual( self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22", "qquality string mismatch in read 3: %s != %s" % (self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22") )
def testPresentOptionalFields(self):
self.assertEqual( self.reads[0].opt('NM'), 1, "optional field mismatch in read 1, NM: %s != %s" % (self.reads[0].opt('NM'), 1) )
def testHeaders(self):
self.compareHeaders( self.header, self.samfile.header )
self.compareHeaders( self.samfile.header, self.header )
-
+
+ def testNameMapping( self ):
+ for x, y in enumerate( ("chr1", "chr2")):
+ tid = self.samfile.gettid( y )
+ ref = self.samfile.getrname( x )
+ self.assertEqual( tid, x )
+ self.assertEqual( ref, y )
+
+ self.assertEqual( self.samfile.gettid("chr?"), -1 )
+ self.assertRaises( ValueError, self.samfile.getrname, 2 )
+
def tearDown(self):
self.samfile.close()
def tearDown(self):
self.samfile.close()
-
+
+class TestContextManager(unittest.TestCase):
+
+ def testManager( self ):
+ with pysam.Samfile('ex1.bam', 'rb') as samfile:
+ samfile.fetch()
+ self.assertEqual( samfile._isOpen(), False )
+
class TestExceptions(unittest.TestCase):
def setUp(self):
self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, 0 )
self.assertRaises( ValueError, self.samfile.fetch, "chr1", -5, -10 )
+ self.assertRaises( ValueError, self.samfile.count, "chr1", 5, -10 )
+ self.assertRaises( ValueError, self.samfile.count, "chr1", 5, 0 )
+ self.assertRaises( ValueError, self.samfile.count, "chr1", -5, -10 )
+
def testOutOfRangeNegativeOldFormat(self):
self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-10" )
self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-0" )
self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5--10" )
+ self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-10" )
+ self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-0" )
+ self.assertRaises( ValueError, self.samfile.count, region="chr1:-5--10" )
+
def testOutOfRangNewFormat(self):
self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999, 99999999999 )
+ self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999, 99999999999 )
def testOutOfRangeLargeNewFormat(self):
self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
+ self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
def testOutOfRangeLargeOldFormat(self):
self.assertRaises( ValueError, self.samfile.fetch, "chr1:99999999999999999-999999999999999999" )
+ self.assertRaises( ValueError, self.samfile.count, "chr1:99999999999999999-999999999999999999" )
+
+ def testZeroToZero(self):
+ '''see issue 44'''
+ self.assertEqual( len(list(self.samfile.fetch('chr1', 0, 0))), 0)
def tearDown(self):
self.samfile.close()
+class TestWrongFormat(unittest.TestCase):
+ '''test cases for opening files not in bam/sam format.'''
+
+ def testOpenSamAsBam( self ):
+ self.assertRaises( ValueError, pysam.Samfile, 'ex1.sam', 'rb' )
+
+ def testOpenBamAsSam( self ):
+ self.assertRaises( ValueError, pysam.Samfile, 'ex1.bam', 'r' )
+
+ def testOpenFastaAsSam( self ):
+ self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'r' )
+
+ def testOpenFastaAsBam( self ):
+ self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'rb' )
+
class TestFastaFile(unittest.TestCase):
mSequences = { 'chr1' :
self.assertEqual( seq, self.file.fetch( id ) )
for x in range( 0, len(seq), 10):
self.assertEqual( seq[x:x+10], self.file.fetch( id, x, x+10) )
+ # test x:end
+ self.assertEqual( seq[x:], self.file.fetch( id, x) )
+ # test 0:x
+ self.assertEqual( seq[:x], self.file.fetch( id, None, x) )
+
+
+ # unknown sequence returns ""
+ self.assertEqual( "", self.file.fetch("chr12") )
+
+ def testOutOfRangeAccess( self ):
+ '''test out of range access.'''
+ # out of range access returns an empty string
+ for contig, s in self.mSequences.iteritems():
+ self.assertEqual( self.file.fetch( contig, len(s), len(s)+1), "" )
+
+ self.assertEqual( self.file.fetch( "chr3", 0 , 100), "" )
def testFetchErrors( self ):
self.assertRaises( ValueError, self.file.fetch )
- self.assertRaises( ValueError, self.file.fetch, "chr1", 0 )
self.assertRaises( ValueError, self.file.fetch, "chr1", -1, 10 )
self.assertRaises( ValueError, self.file.fetch, "chr1", 20, 10 )
- # the following segfaults:
- # self.assertRaises( IndexError, self.file.fetch, "chr12", )
- pass
+ def testLength( self ):
+ self.assertEqual( len(self.file), 2 )
+
def tearDown(self):
self.file.close()
-
class TestAlignedRead(unittest.TestCase):
'''tests to check if aligned read can be constructed
and manipulated.
a.mpos=200
a.isize=167
a.qual="1234" * 3
-
+ # todo: create tags
return a
def testUpdate( self ):
return a
+ def testTagParsing( self ):
+ '''test for tag parsing
+
+ see http://groups.google.com/group/pysam-user-group/browse_thread/thread/67ca204059ea465a
+ '''
+ samfile=pysam.Samfile( "ex8.bam","rb" )
+
+ for entry in samfile:
+ before = entry.tags
+ entry.tags = entry.tags
+ after = entry.tags
+ self.assertEqual( after, before )
+
class TestDeNovoConstruction(unittest.TestCase):
'''check BAM/SAM file construction using ex3.sam
others = list(infile)
for denovo, other in zip( others, self.reads):
self.checkFieldEqual( other, denovo )
- self.assertEqual( other, denovo)
+ self.assertEqual( other.compare( denovo ), 0 )
def testSAMPerRead( self ):
'''check if individual reads are binary equal.'''
others = list(infile)
for denovo, other in zip( others, self.reads):
self.checkFieldEqual( other, denovo )
- self.assertEqual( other, denovo)
+ self.assertEqual( other.compare( denovo), 0 )
def testBAMWholeFile( self ):
os.unlink( tmpfilename )
+class TestDoubleFetch(unittest.TestCase):
+ '''check if two iterators on the same bamfile are independent.'''
+
+ def testDoubleFetch( self ):
+
+ samfile1 = pysam.Samfile('ex1.bam', 'rb')
+
+ for a,b in zip(samfile1.fetch(), samfile1.fetch()):
+ self.assertEqual( a.compare( b ), 0 )
+
+ def testDoubleFetchWithRegion( self ):
+
+ samfile1 = pysam.Samfile('ex1.bam', 'rb')
+ chr, start, stop = 'chr1', 200, 3000000
+ self.assertTrue(len(list(samfile1.fetch ( chr, start, stop))) > 0) #just making sure the test has something to catch
+
+ for a,b in zip(samfile1.fetch( chr, start, stop), samfile1.fetch( chr, start, stop)):
+ self.assertEqual( a.compare( b ), 0 )
+
+ def testDoubleFetchUntilEOF( self ):
+
+ samfile1 = pysam.Samfile('ex1.bam', 'rb')
+
+ for a,b in zip(samfile1.fetch( until_eof = True),
+ samfile1.fetch( until_eof = True )):
+ self.assertEqual( a.compare( b), 0 )
+
+class TestRemoteFileFTP(unittest.TestCase):
+ '''test remote access.
+
+ '''
+
+ # Need to find an ftp server without password on standard
+ # port.
+
+ url = "ftp://ftp.sanger.ac.uk/pub/rd/humanSequences/CV.bam"
+ region = "1:1-1000"
+
+ def testFTPView( self ):
+ return
+ result = pysam.view( self.url, self.region )
+ self.assertEqual( len(result), 36 )
+
+ def testFTPFetch( self ):
+ return
+ samfile = pysam.Samfile(self.url, "rb")
+ result = list(samfile.fetch( region = self.region ))
+ self.assertEqual( len(result), 36 )
+
+class TestRemoteFileHTTP( unittest.TestCase):
+
+ url = "http://genserv.anat.ox.ac.uk/downloads/pysam/test/ex1.bam"
+ region = "chr1:1-1000"
+ local = "ex1.bam"
+
+ def testView( self ):
+ samfile_local = pysam.Samfile(self.local, "rb")
+ ref = list(samfile_local.fetch( region = self.region ))
+
+ result = pysam.view( self.url, self.region )
+ self.assertEqual( len(result), len(ref) )
+
+ def testFetch( self ):
+ samfile = pysam.Samfile(self.url, "rb")
+ result = list(samfile.fetch( region = self.region ))
+ samfile_local = pysam.Samfile(self.local, "rb")
+ ref = list(samfile_local.fetch( region = self.region ))
+
+ self.assertEqual( len(ref), len(result) )
+ for x, y in zip(result, ref):
+ self.assertEqual( x.compare( y ), 0 )
+
+ def testFetchAll( self ):
+ samfile = pysam.Samfile(self.url, "rb")
+ result = list(samfile.fetch())
+ samfile_local = pysam.Samfile(self.local, "rb")
+ ref = list(samfile_local.fetch() )
+
+ self.assertEqual( len(ref), len(result) )
+ for x, y in zip(result, ref):
+ self.assertEqual( x.compare( y ), 0 )
+
+class TestLargeOptValues( unittest.TestCase ):
+
+ ints = ( 65536, 214748, 2147484, 2147483647 )
+ floats = ( 65536.0, 214748.0, 2147484.0 )
+
+ def check( self, samfile ):
+
+ i = samfile.fetch()
+ for exp in self.ints:
+ rr = i.next()
+ obs = rr.opt("ZP")
+ self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
+
+ for exp in [ -x for x in self.ints ]:
+ rr = i.next()
+ obs = rr.opt("ZP")
+ self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
+
+ for exp in self.floats:
+ rr = i.next()
+ obs = rr.opt("ZP")
+ self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
+
+ for exp in [ -x for x in self.floats ]:
+ rr = i.next()
+ obs = rr.opt("ZP")
+ self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
+
+ def testSAM( self ):
+ samfile = pysam.Samfile("ex10.sam", "r")
+ self.check( samfile )
+
+ def testBAM( self ):
+ samfile = pysam.Samfile("ex10.bam", "rb")
+ self.check( samfile )
+
+class TestSNPCalls( unittest.TestCase ):
+ '''test pysam SNP calling ability.'''
+
+ def checkEqual( self, a, b ):
+ for x in ("reference_base",
+ "pos",
+ "genotype",
+ "consensus_quality",
+ "snp_quality",
+ "mapping_quality",
+ "coverage" ):
+ self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" %
+ (x, getattr(a, x), getattr(b,x), str(a), str(b)))
+
+ def testAllPositionsViaIterator( self ):
+ samfile = pysam.Samfile( "ex1.bam", "rb")
+ fastafile = pysam.Fastafile( "ex1.fa" )
+ try:
+ refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base != "*"]
+ except pysam.SamtoolsError:
+ pass
+
+ i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
+ calls = list(pysam.IteratorSNPCalls(i))
+ for x,y in zip( refs, calls ):
+ self.checkEqual( x, y )
+
+ def testPerPositionViaIterator( self ):
+ # test pileup for each position. This is a slow operation
+ # so this test is disabled
+ return
+ samfile = pysam.Samfile( "ex1.bam", "rb")
+ fastafile = pysam.Fastafile( "ex1.fa" )
+ for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
+ if x.reference_base == "*": continue
+ i = samfile.pileup( x.chromosome, x.pos, x.pos+1,
+ fastafile = fastafile,
+ stepper = "samtools" )
+ z = [ zz for zz in pysam.IteratorSamtools(i) if zz.pos == x.pos ]
+ self.assertEqual( len(z), 1 )
+ self.checkEqual( x, z[0] )
+
+ def testPerPositionViaCaller( self ):
+ # test pileup for each position. This is a fast operation
+ samfile = pysam.Samfile( "ex1.bam", "rb")
+ fastafile = pysam.Fastafile( "ex1.fa" )
+ i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
+ caller = pysam.SNPCaller( i )
+
+ for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
+ if x.reference_base == "*": continue
+ call = caller.call( x.chromosome, x.pos )
+ self.checkEqual( x, call )
+
+class TestIndelCalls( unittest.TestCase ):
+ '''test pysam indel calling.'''
+
+ def checkEqual( self, a, b ):
+
+ for x in ("pos",
+ "genotype",
+ "consensus_quality",
+ "snp_quality",
+ "mapping_quality",
+ "coverage",
+ "first_allele",
+ "second_allele",
+ "reads_first",
+ "reads_second",
+ "reads_diff"):
+ if b.genotype == "*/*" and x == "second_allele":
+ # ignore test for second allele (positions chr2:439 and chr2:1512)
+ continue
+ self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" %
+ (x, getattr(a, x), getattr(b,x), str(a), str(b)))
+
+ def testAllPositionsViaIterator( self ):
+
+ samfile = pysam.Samfile( "ex1.bam", "rb")
+ fastafile = pysam.Fastafile( "ex1.fa" )
+ try:
+ refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base == "*"]
+ except pysam.SamtoolsError:
+ pass
+
+ i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
+ calls = [ x for x in list(pysam.IteratorIndelCalls(i)) if x != None ]
+ for x,y in zip( refs, calls ):
+ self.checkEqual( x, y )
+
+ def testPerPositionViaCaller( self ):
+ # test pileup for each position. This is a fast operation
+ samfile = pysam.Samfile( "ex1.bam", "rb")
+ fastafile = pysam.Fastafile( "ex1.fa" )
+ i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
+ caller = pysam.IndelCaller( i )
+
+ for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
+ if x.reference_base != "*": continue
+ call = caller.call( x.chromosome, x.pos )
+ self.checkEqual( x, call )
+
+class TestLogging( unittest.TestCase ):
+ '''test around bug issue 42,
+
+ failed in versions < 0.4
+ '''
+
+ def check( self, bamfile, log ):
+
+ if log:
+ logger = logging.getLogger('franklin')
+ logger.setLevel(logging.INFO)
+ formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
+ log_hand = logging.FileHandler('log.txt')
+ log_hand.setFormatter(formatter)
+ logger.addHandler(log_hand)
+
+ bam = pysam.Samfile(bamfile, 'rb')
+ cols = bam.pileup()
+ self.assert_( True )
+
+ def testFail1( self ):
+ self.check( "ex9_fail.bam", False )
+ self.check( "ex9_fail.bam", True )
+
+ def testNoFail1( self ):
+ self.check( "ex9_nofail.bam", False )
+ self.check( "ex9_nofail.bam", True )
+
+ def testNoFail2( self ):
+ self.check( "ex9_nofail.bam", True )
+ self.check( "ex9_nofail.bam", True )
+
# TODOS
# 1. finish testing all properties within pileup objects
# 2. check exceptions and bad input problems (missing files, optional fields that aren't present, etc...)
+# 3. check: presence of sequence
+
+class TestSamfileUtilityFunctions( unittest.TestCase ):
+
+ def testCount( self ):
+
+ samfile = pysam.Samfile( "ex1.bam", "rb" )
+
+ for contig in ("chr1", "chr2" ):
+ for start in xrange( 0, 2000, 100 ):
+ end = start + 1
+ self.assertEqual( len( list( samfile.fetch( contig, start, end ) ) ),
+ samfile.count( contig, start, end ) )
+
+ # test empty intervals
+ self.assertEqual( len( list( samfile.fetch( contig, start, start ) ) ),
+ samfile.count( contig, start, start ) )
+
+ # test half empty intervals
+ self.assertEqual( len( list( samfile.fetch( contig, start ) ) ),
+ samfile.count( contig, start ) )
+
+ def testMate( self ):
+ '''test mate access.'''
+
+ readnames = [ x.split("\t")[0] for x in open( "ex1.sam", "rb" ).readlines() ]
+ counts = collections.defaultdict( int )
+ for x in readnames: counts[x] += 1
+
+ samfile = pysam.Samfile( "ex1.bam", "rb" )
+ for read in samfile.fetch():
+ if not read.is_paired:
+ self.assertRaises( ValueError, samfile.mate, read )
+ elif read.mate_is_unmapped:
+ self.assertRaises( ValueError, samfile.mate, read )
+ else:
+ if counts[read.qname] == 1:
+ self.assertRaises( ValueError, samfile.mate, read )
+ else:
+ mate = samfile.mate( read )
+ self.assertEqual( read.qname, mate.qname )
+ self.assertEqual( read.is_read1, mate.is_read2 )
+ self.assertEqual( read.is_read2, mate.is_read1 )
+ self.assertEqual( read.pos, mate.mpos )
+ self.assertEqual( read.mpos, mate.pos )
+
+class TestSamtoolsProxy( unittest.TestCase ):
+ '''tests for sanity checking access to samtools functions.'''
+
+ def testIndex( self ):
+ self.assertRaises( IOError, pysam.index, "missing_file" )
+
+ def testView( self ):
+ # note that view still echos "open: No such file or directory"
+ self.assertRaises( pysam.SamtoolsError, pysam.view, "missing_file" )
+
+ def testSort( self ):
+ self.assertRaises( pysam.SamtoolsError, pysam.sort, "missing_file" )
+
+class TestSamfileIndex( unittest.TestCase):
+
+ def testIndex( self ):
+ samfile = pysam.Samfile( "ex1.bam", "rb" )
+ index = pysam.IndexedReads( samfile )
+ index.build()
+
+ reads = collections.defaultdict( int )
+
+ for read in samfile: reads[read.qname] += 1
+
+ for qname, counts in reads.iteritems():
+ found = list(index.find( qname ))
+ self.assertEqual( len(found), counts )
+ for x in found: self.assertEqual( x.qname, qname )
+
if __name__ == "__main__":
# build data files