X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=pysam.git;a=blobdiff_plain;f=tests%2Fpysam_test.py;fp=tests%2Fpysam_test.py;h=d2bf4150de58551accb107f1822e85912c7c58d5;hp=aefcbb94e2a60fcdd651ae7695c2e22d9834952e;hb=d02fe5283ed7a93a2f76a5d6dc6e37b40c11b9b1;hpb=d828f9c9aa78e3d1687265b52de841f3f3852089 diff --git a/tests/pysam_test.py b/tests/pysam_test.py index aefcbb9..d2bf415 100755 --- a/tests/pysam_test.py +++ b/tests/pysam_test.py @@ -7,11 +7,12 @@ and data files located there. import pysam import unittest -import os, re -import itertools +import os, re, sys +import itertools, collections import subprocess import shutil - +import logging +SAMTOOLS="samtools" def checkBinaryEqual( filename1, filename2 ): '''return true if the two files are binary equal.''' @@ -50,7 +51,7 @@ def runSamtools( cmd ): def getSamtoolsVersion(): '''return samtools version''' - pipe = subprocess.Popen("samtools", shell=True, stderr=subprocess.PIPE).stderr + pipe = subprocess.Popen(SAMTOOLS, shell=True, stderr=subprocess.PIPE).stderr lines = "".join(pipe.readlines()) return re.search( "Version:\s+(\S+)", lines).groups()[0] @@ -67,42 +68,42 @@ class BinaryTest(unittest.TestCase): mCommands = \ { "faidx" : \ ( - ("ex1.fa.fai", "samtools faidx ex1.fa"), + ("ex1.fa.fai", "faidx ex1.fa"), ("pysam_ex1.fa.fai", (pysam.faidx, "ex1.fa") ), ), "import" : ( - ("ex1.bam", "samtools import ex1.fa.fai ex1.sam.gz ex1.bam" ), + ("ex1.bam", "import ex1.fa.fai ex1.sam.gz ex1.bam" ), ("pysam_ex1.bam", (pysam.samimport, "ex1.fa.fai ex1.sam.gz pysam_ex1.bam") ), ), "index": ( - ("ex1.bam.bai", "samtools index ex1.bam" ), + ("ex1.bam.bai", "index ex1.bam" ), ("pysam_ex1.bam.bai", (pysam.index, "pysam_ex1.bam" ) ), ), "pileup1" : ( - ("ex1.pileup", "samtools pileup -cf ex1.fa ex1.bam > ex1.pileup" ), + ("ex1.pileup", "pileup -cf ex1.fa ex1.bam > ex1.pileup" ), ("pysam_ex1.pileup", (pysam.pileup, "-c -f ex1.fa ex1.bam" ) ) ), "pileup2" : ( - ("ex1.glf", "samtools pileup -gf ex1.fa ex1.bam > ex1.glf" ), + ("ex1.glf", "pileup -gf ex1.fa ex1.bam > ex1.glf" ), ("pysam_ex1.glf", (pysam.pileup, "-g -f ex1.fa ex1.bam" ) ) ), "glfview" : ( - ("ex1.glfview", "samtools glfview ex1.glf > ex1.glfview"), + ("ex1.glfview", "glfview ex1.glf > ex1.glfview"), ("pysam_ex1.glfview", (pysam.glfview, "ex1.glf" ) ), ), "view" : ( - ("ex1.view", "samtools view ex1.bam > ex1.view"), + ("ex1.view", "view ex1.bam > ex1.view"), ("pysam_ex1.view", (pysam.view, "ex1.bam" ) ), ), "view2" : ( - ("ex1.view", "samtools view -bT ex1.fa -o ex1.view2 ex1.sam"), + ("ex1.view", "view -bT ex1.fa -o ex1.view2 ex1.sam"), # note that -o ex1.view2 throws exception. ("pysam_ex1.view", (pysam.view, "-bT ex1.fa -oex1.view2 ex1.sam" ) ), ), @@ -110,7 +111,10 @@ class BinaryTest(unittest.TestCase): # some tests depend on others. The order specifies in which order # the samtools commands are executed. - mOrder = ('faidx', 'import', 'index', 'pileup1', 'pileup2', 'glfview', 'view', 'view2' ) + mOrder = ('faidx', 'import', 'index', + 'pileup1', 'pileup2', + # 'glfview', deprecated + 'view', 'view2' ) def setUp( self ): '''setup tests. @@ -127,24 +131,29 @@ class BinaryTest(unittest.TestCase): command = self.mCommands[label] samtools_target, samtools_command = command[0] pysam_target, pysam_command = command[1] - runSamtools( samtools_command ) + runSamtools( " ".join( (SAMTOOLS, samtools_command ))) pysam_method, pysam_options = pysam_command output = pysam_method( *pysam_options.split(" "), raw=True) if ">" in samtools_command: - outfile = open( pysam_target, "w" ) + outfile = open( pysam_target, "wb" ) for line in output: outfile.write( line ) outfile.close() BinaryTest.first_time = False samtools_version = getSamtoolsVersion() - if samtools_version != pysam.__samtools_version__: + + def _r( s ): + # patch - remove any of the alpha/beta suffixes, i.e., 0.1.12a -> 0.1.12 + if s.count('-') > 0: s = s[0:s.find('-')] + return re.sub( "[^0-9.]", "", s ) + + if _r(samtools_version) != _r( pysam.__samtools_version__): raise ValueError("versions of pysam/samtools and samtools differ: %s != %s" % \ (pysam.__samtools_version__, samtools_version )) def checkCommand( self, command ): - if command: samtools_target, pysam_target = self.mCommands[command][0][0], self.mCommands[command][1][0] self.assertTrue( checkBinaryEqual( samtools_target, pysam_target ), @@ -162,14 +171,15 @@ class BinaryTest(unittest.TestCase): def testPileup2( self ): self.checkCommand( "pileup2" ) - def testGLFView( self ): - self.checkCommand( "glfview" ) + # deprecated + # def testGLFView( self ): + # self.checkCommand( "glfview" ) def testView( self ): self.checkCommand( "view" ) def testEmptyIndex( self ): - self.assertRaises( pysam.SamtoolsError, pysam.index, "exdoesntexist.bam" ) + self.assertRaises( IOError, pysam.index, "exdoesntexist.bam" ) def __del__(self): return @@ -262,7 +272,6 @@ class IOTest(unittest.TestCase): self.assertRaises( ValueError, samfile.pileup, 'chr1', 100, 120) self.assertRaises( ValueError, samfile.getrname, 0 ) self.assertRaises( ValueError, samfile.tell ) - self.assertRaises( ValueError, samfile.write, None ) self.assertRaises( ValueError, samfile.seek, 0 ) self.assertRaises( ValueError, getattr, samfile, "nreferences" ) self.assertRaises( ValueError, getattr, samfile, "references" ) @@ -270,11 +279,17 @@ class IOTest(unittest.TestCase): self.assertRaises( ValueError, getattr, samfile, "text" ) self.assertRaises( ValueError, getattr, samfile, "header" ) - def testBinaryReadFromSamfile( self ): - pass - # needs to re-activated, see issue 19 - #samfile = pysam.Samfile( "ex1.bam", "r" ) - #samfile.fetch().next() + # write on closed file + self.assertEqual( 0, samfile.write(None) ) + + def testAutoDetection( self ): + samfile = pysam.Samfile( "ex3.bam" ) + samfile.fetch() + samfile.close() + + samfile = pysam.Samfile( "ex3.sam") + samfile.fetch() + samfile.close() def testReadingFromFileWithoutIndex( self ): '''read from bam file without index.''' @@ -292,12 +307,19 @@ class TestIteratorRow(unittest.TestCase): def checkRange( self, rnge ): '''compare results from iterator with those from samtools.''' ps = list(self.samfile.fetch(region=rnge)) - sa = list(pysam.view( "ex1.bam", rnge , raw = True) ) + sa = list(pysam.view( "ex1.bam", rnge, raw = True) ) self.assertEqual( len(ps), len(sa), "unequal number of results for range %s: %i != %i" % (rnge, len(ps), len(sa) )) # check if the same reads are returned and in the same order for line, pair in enumerate( zip( ps, sa ) ): - data = pair[1].split("\t") - self.assertEqual( pair[0].qname, data[0], "read id mismatch in line %i: %s != %s" % (line, pair[0].rname, data[0]) ) + a,b = pair + d = b.split("\t") + self.assertEqual( a.qname, d[0], "line %i: read id mismatch: %s != %s" % (line, a.rname, d[0]) ) + self.assertEqual( a.pos, int(d[3])-1, "line %i: read position mismatch: %s != %s, \n%s\n%s\n" % \ + (line, a.pos, int(d[3])-1, + str(a), str(d) ) ) + self.assertEqual( a.qual, d[10], "line %i: quality mismatch: %s != %s, \n%s\n%s\n" % \ + (line, a.qual, d[10], + str(a), str(d) ) ) def testIteratePerContig(self): '''check random access per contig''' @@ -313,7 +335,6 @@ class TestIteratorRow(unittest.TestCase): def tearDown(self): self.samfile.close() - class TestIteratorRowAll(unittest.TestCase): def setUp(self): @@ -381,7 +402,9 @@ class TestIteratorColumn(unittest.TestCase): # one read, all columns of the read are returned self.assertEqual( len(columns), refcolumns, "pileup incomplete at position %i: got %i, expected %i " %\ (pos, len(columns), refcolumns)) + + def tearDown(self): self.samfile.close() @@ -516,7 +539,17 @@ class TestHeaderSam(unittest.TestCase): def testHeaders(self): self.compareHeaders( self.header, self.samfile.header ) self.compareHeaders( self.samfile.header, self.header ) - + + def testNameMapping( self ): + for x, y in enumerate( ("chr1", "chr2")): + tid = self.samfile.gettid( y ) + ref = self.samfile.getrname( x ) + self.assertEqual( tid, x ) + self.assertEqual( ref, y ) + + self.assertEqual( self.samfile.gettid("chr?"), -1 ) + self.assertRaises( ValueError, self.samfile.getrname, 2 ) + def tearDown(self): self.samfile.close() @@ -599,19 +632,30 @@ class TestExceptions(unittest.TestCase): self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, 0 ) self.assertRaises( ValueError, self.samfile.fetch, "chr1", -5, -10 ) + self.assertRaises( ValueError, self.samfile.count, "chr1", 5, -10 ) + self.assertRaises( ValueError, self.samfile.count, "chr1", 5, 0 ) + self.assertRaises( ValueError, self.samfile.count, "chr1", -5, -10 ) + def testOutOfRangeNegativeOldFormat(self): self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-10" ) self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-0" ) self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5--10" ) + self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-10" ) + self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-0" ) + self.assertRaises( ValueError, self.samfile.count, region="chr1:-5--10" ) + def testOutOfRangNewFormat(self): self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999, 99999999999 ) + self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999, 99999999999 ) def testOutOfRangeLargeNewFormat(self): self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 ) + self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 ) def testOutOfRangeLargeOldFormat(self): self.assertRaises( ValueError, self.samfile.fetch, "chr1:99999999999999999-999999999999999999" ) + self.assertRaises( ValueError, self.samfile.count, "chr1:99999999999999999-999999999999999999" ) def testZeroToZero(self): '''see issue 44''' @@ -620,7 +664,6 @@ class TestExceptions(unittest.TestCase): def tearDown(self): self.samfile.close() - class TestWrongFormat(unittest.TestCase): '''test cases for opening files not in bam/sam format.''' @@ -661,6 +704,14 @@ class TestFastaFile(unittest.TestCase): # unknown sequence returns "" self.assertEqual( "", self.file.fetch("chr12") ) + def testOutOfRangeAccess( self ): + '''test out of range access.''' + # out of range access returns an empty string + for contig, s in self.mSequences.iteritems(): + self.assertEqual( self.file.fetch( contig, len(s), len(s)+1), "" ) + + self.assertEqual( self.file.fetch( "chr3", 0 , 100), "" ) + def testFetchErrors( self ): self.assertRaises( ValueError, self.file.fetch ) self.assertRaises( ValueError, self.file.fetch, "chr1", -1, 10 ) @@ -958,10 +1009,12 @@ class TestRemoteFileFTP(unittest.TestCase): region = "1:1-1000" def testFTPView( self ): + return result = pysam.view( self.url, self.region ) self.assertEqual( len(result), 36 ) def testFTPFetch( self ): + return samfile = pysam.Samfile(self.url, "rb") result = list(samfile.fetch( region = self.region )) self.assertEqual( len(result), 36 ) @@ -973,7 +1026,11 @@ class TestRemoteFileHTTP( unittest.TestCase): local = "ex1.bam" def testView( self ): - self.assertRaises( pysam.SamtoolsError, pysam.view, self.url, self.region ) + samfile_local = pysam.Samfile(self.local, "rb") + ref = list(samfile_local.fetch( region = self.region )) + + result = pysam.view( self.url, self.region ) + self.assertEqual( len(result), len(ref) ) def testFetch( self ): samfile = pysam.Samfile(self.url, "rb") @@ -995,10 +1052,254 @@ class TestRemoteFileHTTP( unittest.TestCase): for x, y in zip(result, ref): self.assertEqual( x.compare( y ), 0 ) +class TestLargeOptValues( unittest.TestCase ): + ints = ( 65536, 214748, 2147484, 2147483647 ) + floats = ( 65536.0, 214748.0, 2147484.0 ) + + def check( self, samfile ): + + i = samfile.fetch() + for exp in self.ints: + rr = i.next() + obs = rr.opt("ZP") + self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr))) + + for exp in [ -x for x in self.ints ]: + rr = i.next() + obs = rr.opt("ZP") + self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr))) + + for exp in self.floats: + rr = i.next() + obs = rr.opt("ZP") + self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr))) + + for exp in [ -x for x in self.floats ]: + rr = i.next() + obs = rr.opt("ZP") + self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr))) + + def testSAM( self ): + samfile = pysam.Samfile("ex10.sam", "r") + self.check( samfile ) + + def testBAM( self ): + samfile = pysam.Samfile("ex10.bam", "rb") + self.check( samfile ) + +class TestSNPCalls( unittest.TestCase ): + '''test pysam SNP calling ability.''' + + def checkEqual( self, a, b ): + for x in ("reference_base", + "pos", + "genotype", + "consensus_quality", + "snp_quality", + "mapping_quality", + "coverage" ): + self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" % + (x, getattr(a, x), getattr(b,x), str(a), str(b))) + + def testAllPositionsViaIterator( self ): + samfile = pysam.Samfile( "ex1.bam", "rb") + fastafile = pysam.Fastafile( "ex1.fa" ) + try: + refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base != "*"] + except pysam.SamtoolsError: + pass + + i = samfile.pileup( stepper = "samtools", fastafile = fastafile ) + calls = list(pysam.IteratorSNPCalls(i)) + for x,y in zip( refs, calls ): + self.checkEqual( x, y ) + + def testPerPositionViaIterator( self ): + # test pileup for each position. This is a slow operation + # so this test is disabled + return + samfile = pysam.Samfile( "ex1.bam", "rb") + fastafile = pysam.Fastafile( "ex1.fa" ) + for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ): + if x.reference_base == "*": continue + i = samfile.pileup( x.chromosome, x.pos, x.pos+1, + fastafile = fastafile, + stepper = "samtools" ) + z = [ zz for zz in pysam.IteratorSamtools(i) if zz.pos == x.pos ] + self.assertEqual( len(z), 1 ) + self.checkEqual( x, z[0] ) + + def testPerPositionViaCaller( self ): + # test pileup for each position. This is a fast operation + samfile = pysam.Samfile( "ex1.bam", "rb") + fastafile = pysam.Fastafile( "ex1.fa" ) + i = samfile.pileup( stepper = "samtools", fastafile = fastafile ) + caller = pysam.SNPCaller( i ) + + for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ): + if x.reference_base == "*": continue + call = caller.call( x.chromosome, x.pos ) + self.checkEqual( x, call ) + +class TestIndelCalls( unittest.TestCase ): + '''test pysam indel calling.''' + + def checkEqual( self, a, b ): + + for x in ("pos", + "genotype", + "consensus_quality", + "snp_quality", + "mapping_quality", + "coverage", + "first_allele", + "second_allele", + "reads_first", + "reads_second", + "reads_diff"): + if b.genotype == "*/*" and x == "second_allele": + # ignore test for second allele (positions chr2:439 and chr2:1512) + continue + self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" % + (x, getattr(a, x), getattr(b,x), str(a), str(b))) + + def testAllPositionsViaIterator( self ): + + samfile = pysam.Samfile( "ex1.bam", "rb") + fastafile = pysam.Fastafile( "ex1.fa" ) + try: + refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base == "*"] + except pysam.SamtoolsError: + pass + + i = samfile.pileup( stepper = "samtools", fastafile = fastafile ) + calls = [ x for x in list(pysam.IteratorIndelCalls(i)) if x != None ] + for x,y in zip( refs, calls ): + self.checkEqual( x, y ) + + def testPerPositionViaCaller( self ): + # test pileup for each position. This is a fast operation + samfile = pysam.Samfile( "ex1.bam", "rb") + fastafile = pysam.Fastafile( "ex1.fa" ) + i = samfile.pileup( stepper = "samtools", fastafile = fastafile ) + caller = pysam.IndelCaller( i ) + + for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ): + if x.reference_base != "*": continue + call = caller.call( x.chromosome, x.pos ) + self.checkEqual( x, call ) + +class TestLogging( unittest.TestCase ): + '''test around bug issue 42, + + failed in versions < 0.4 + ''' + + def check( self, bamfile, log ): + + if log: + logger = logging.getLogger('franklin') + logger.setLevel(logging.INFO) + formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') + log_hand = logging.FileHandler('log.txt') + log_hand.setFormatter(formatter) + logger.addHandler(log_hand) + + bam = pysam.Samfile(bamfile, 'rb') + cols = bam.pileup() + self.assert_( True ) + + def testFail1( self ): + self.check( "ex9_fail.bam", False ) + self.check( "ex9_fail.bam", True ) + + def testNoFail1( self ): + self.check( "ex9_nofail.bam", False ) + self.check( "ex9_nofail.bam", True ) + + def testNoFail2( self ): + self.check( "ex9_nofail.bam", True ) + self.check( "ex9_nofail.bam", True ) + # TODOS # 1. finish testing all properties within pileup objects # 2. check exceptions and bad input problems (missing files, optional fields that aren't present, etc...) +# 3. check: presence of sequence + +class TestSamfileUtilityFunctions( unittest.TestCase ): + + def testCount( self ): + + samfile = pysam.Samfile( "ex1.bam", "rb" ) + + for contig in ("chr1", "chr2" ): + for start in xrange( 0, 2000, 100 ): + end = start + 1 + self.assertEqual( len( list( samfile.fetch( contig, start, end ) ) ), + samfile.count( contig, start, end ) ) + + # test empty intervals + self.assertEqual( len( list( samfile.fetch( contig, start, start ) ) ), + samfile.count( contig, start, start ) ) + + # test half empty intervals + self.assertEqual( len( list( samfile.fetch( contig, start ) ) ), + samfile.count( contig, start ) ) + + def testMate( self ): + '''test mate access.''' + + readnames = [ x.split("\t")[0] for x in open( "ex1.sam", "rb" ).readlines() ] + counts = collections.defaultdict( int ) + for x in readnames: counts[x] += 1 + + samfile = pysam.Samfile( "ex1.bam", "rb" ) + for read in samfile.fetch(): + if not read.is_paired: + self.assertRaises( ValueError, samfile.mate, read ) + elif read.mate_is_unmapped: + self.assertRaises( ValueError, samfile.mate, read ) + else: + if counts[read.qname] == 1: + self.assertRaises( ValueError, samfile.mate, read ) + else: + mate = samfile.mate( read ) + self.assertEqual( read.qname, mate.qname ) + self.assertEqual( read.is_read1, mate.is_read2 ) + self.assertEqual( read.is_read2, mate.is_read1 ) + self.assertEqual( read.pos, mate.mpos ) + self.assertEqual( read.mpos, mate.pos ) + +class TestSamtoolsProxy( unittest.TestCase ): + '''tests for sanity checking access to samtools functions.''' + + def testIndex( self ): + self.assertRaises( IOError, pysam.index, "missing_file" ) + + def testView( self ): + # note that view still echos "open: No such file or directory" + self.assertRaises( pysam.SamtoolsError, pysam.view, "missing_file" ) + + def testSort( self ): + self.assertRaises( pysam.SamtoolsError, pysam.sort, "missing_file" ) + +class TestSamfileIndex( unittest.TestCase): + + def testIndex( self ): + samfile = pysam.Samfile( "ex1.bam", "rb" ) + index = pysam.IndexedReads( samfile ) + index.build() + + reads = collections.defaultdict( int ) + + for read in samfile: reads[read.qname] += 1 + + for qname, counts in reads.iteritems(): + found = list(index.find( qname )) + self.assertEqual( len(found), counts ) + for x in found: self.assertEqual( x.qname, qname ) + if __name__ == "__main__": # build data files