X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=pysam.git;a=blobdiff_plain;f=tests%2Fpysam_test.py;h=d2bf4150de58551accb107f1822e85912c7c58d5;hp=c2ae6fa37b8d34008eb639adc608fc80de7411d2;hb=e1756c41e7a1d7cc01fb95e42df9dd04da2d2991;hpb=aa8ecff068edbb09a03bd874fce716e93e22e53c diff --git a/tests/pysam_test.py b/tests/pysam_test.py index c2ae6fa..d2bf415 100755 --- a/tests/pysam_test.py +++ b/tests/pysam_test.py @@ -7,11 +7,12 @@ and data files located there. import pysam import unittest -import os -import itertools +import os, re, sys +import itertools, collections import subprocess import shutil - +import logging +SAMTOOLS="samtools" def checkBinaryEqual( filename1, filename2 ): '''return true if the two files are binary equal.''' @@ -47,7 +48,13 @@ def runSamtools( cmd ): except OSError, e: print >>sys.stderr, "Execution failed:", e - +def getSamtoolsVersion(): + '''return samtools version''' + + pipe = subprocess.Popen(SAMTOOLS, shell=True, stderr=subprocess.PIPE).stderr + lines = "".join(pipe.readlines()) + return re.search( "Version:\s+(\S+)", lines).groups()[0] + class BinaryTest(unittest.TestCase): '''test samtools command line commands and compare against pysam commands. @@ -61,44 +68,53 @@ class BinaryTest(unittest.TestCase): mCommands = \ { "faidx" : \ ( - ("ex1.fa.fai", "samtools faidx ex1.fa"), + ("ex1.fa.fai", "faidx ex1.fa"), ("pysam_ex1.fa.fai", (pysam.faidx, "ex1.fa") ), ), "import" : ( - ("ex1.bam", "samtools import ex1.fa.fai ex1.sam.gz ex1.bam" ), + ("ex1.bam", "import ex1.fa.fai ex1.sam.gz ex1.bam" ), ("pysam_ex1.bam", (pysam.samimport, "ex1.fa.fai ex1.sam.gz pysam_ex1.bam") ), ), "index": ( - ("ex1.bam.bai", "samtools index ex1.bam" ), + ("ex1.bam.bai", "index ex1.bam" ), ("pysam_ex1.bam.bai", (pysam.index, "pysam_ex1.bam" ) ), ), "pileup1" : ( - ("ex1.pileup", "samtools pileup -cf ex1.fa ex1.bam > ex1.pileup" ), + ("ex1.pileup", "pileup -cf ex1.fa ex1.bam > ex1.pileup" ), ("pysam_ex1.pileup", (pysam.pileup, "-c -f ex1.fa ex1.bam" ) ) ), "pileup2" : ( - ("ex1.glf", "samtools pileup -gf ex1.fa ex1.bam > ex1.glf" ), + ("ex1.glf", "pileup -gf ex1.fa ex1.bam > ex1.glf" ), ("pysam_ex1.glf", (pysam.pileup, "-g -f ex1.fa ex1.bam" ) ) ), "glfview" : ( - ("ex1.glfview", "samtools glfview ex1.glf > ex1.glfview"), + ("ex1.glfview", "glfview ex1.glf > ex1.glfview"), ("pysam_ex1.glfview", (pysam.glfview, "ex1.glf" ) ), ), "view" : ( - ("ex1.view", "samtools view ex1.bam > ex1.view"), + ("ex1.view", "view ex1.bam > ex1.view"), ("pysam_ex1.view", (pysam.view, "ex1.bam" ) ), ), + "view2" : + ( + ("ex1.view", "view -bT ex1.fa -o ex1.view2 ex1.sam"), + # note that -o ex1.view2 throws exception. + ("pysam_ex1.view", (pysam.view, "-bT ex1.fa -oex1.view2 ex1.sam" ) ), + ), } # some tests depend on others. The order specifies in which order # the samtools commands are executed. - mOrder = ('faidx', 'import', 'index', 'pileup1', 'pileup2', 'glfview', 'view' ) + mOrder = ('faidx', 'import', 'index', + 'pileup1', 'pileup2', + # 'glfview', deprecated + 'view', 'view2' ) def setUp( self ): '''setup tests. @@ -115,22 +131,34 @@ class BinaryTest(unittest.TestCase): command = self.mCommands[label] samtools_target, samtools_command = command[0] pysam_target, pysam_command = command[1] - runSamtools( samtools_command ) + runSamtools( " ".join( (SAMTOOLS, samtools_command ))) pysam_method, pysam_options = pysam_command output = pysam_method( *pysam_options.split(" "), raw=True) if ">" in samtools_command: - outfile = open( pysam_target, "w" ) + outfile = open( pysam_target, "wb" ) for line in output: outfile.write( line ) outfile.close() BinaryTest.first_time = False + samtools_version = getSamtoolsVersion() + + def _r( s ): + # patch - remove any of the alpha/beta suffixes, i.e., 0.1.12a -> 0.1.12 + if s.count('-') > 0: s = s[0:s.find('-')] + return re.sub( "[^0-9.]", "", s ) + + if _r(samtools_version) != _r( pysam.__samtools_version__): + raise ValueError("versions of pysam/samtools and samtools differ: %s != %s" % \ + (pysam.__samtools_version__, + samtools_version )) + def checkCommand( self, command ): if command: samtools_target, pysam_target = self.mCommands[command][0][0], self.mCommands[command][1][0] self.assertTrue( checkBinaryEqual( samtools_target, pysam_target ), "%s failed: files %s and %s are not the same" % (command, samtools_target, pysam_target) ) - + def testImport( self ): self.checkCommand( "import" ) @@ -143,17 +171,18 @@ class BinaryTest(unittest.TestCase): def testPileup2( self ): self.checkCommand( "pileup2" ) - def testGLFView( self ): - self.checkCommand( "glfview" ) + # deprecated + # def testGLFView( self ): + # self.checkCommand( "glfview" ) def testView( self ): self.checkCommand( "view" ) def testEmptyIndex( self ): - self.assertRaises( pysam.SamtoolsError, pysam.index, "exdoesntexist.bam" ) + self.assertRaises( IOError, pysam.index, "exdoesntexist.bam" ) def __del__(self): - + return for label, command in self.mCommands.iteritems(): samtools_target, samtools_command = command[0] pysam_target, pysam_command = command[1] @@ -234,17 +263,33 @@ class IOTest(unittest.TestCase): samfile.close() self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120) - def testPileupFromClosedFile( self ): + def testClosedFile( self ): + '''test that access to a closed samfile raises ValueError.''' samfile = pysam.Samfile( "ex1.bam", "rb" ) samfile.close() + self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120) self.assertRaises( ValueError, samfile.pileup, 'chr1', 100, 120) + self.assertRaises( ValueError, samfile.getrname, 0 ) + self.assertRaises( ValueError, samfile.tell ) + self.assertRaises( ValueError, samfile.seek, 0 ) + self.assertRaises( ValueError, getattr, samfile, "nreferences" ) + self.assertRaises( ValueError, getattr, samfile, "references" ) + self.assertRaises( ValueError, getattr, samfile, "lengths" ) + self.assertRaises( ValueError, getattr, samfile, "text" ) + self.assertRaises( ValueError, getattr, samfile, "header" ) + + # write on closed file + self.assertEqual( 0, samfile.write(None) ) + + def testAutoDetection( self ): + samfile = pysam.Samfile( "ex3.bam" ) + samfile.fetch() + samfile.close() - def testBinaryReadFromSamfile( self ): - pass - # needs to re-activated, see issue 19 - #samfile = pysam.Samfile( "ex1.bam", "r" ) - #samfile.fetch().next() + samfile = pysam.Samfile( "ex3.sam") + samfile.fetch() + samfile.close() def testReadingFromFileWithoutIndex( self ): '''read from bam file without index.''' @@ -262,12 +307,19 @@ class TestIteratorRow(unittest.TestCase): def checkRange( self, rnge ): '''compare results from iterator with those from samtools.''' ps = list(self.samfile.fetch(region=rnge)) - sa = list(pysam.view( "ex1.bam", rnge , raw = True) ) + sa = list(pysam.view( "ex1.bam", rnge, raw = True) ) self.assertEqual( len(ps), len(sa), "unequal number of results for range %s: %i != %i" % (rnge, len(ps), len(sa) )) # check if the same reads are returned and in the same order for line, pair in enumerate( zip( ps, sa ) ): - data = pair[1].split("\t") - self.assertEqual( pair[0].qname, data[0], "read id mismatch in line %i: %s != %s" % (line, pair[0].rname, data[0]) ) + a,b = pair + d = b.split("\t") + self.assertEqual( a.qname, d[0], "line %i: read id mismatch: %s != %s" % (line, a.rname, d[0]) ) + self.assertEqual( a.pos, int(d[3])-1, "line %i: read position mismatch: %s != %s, \n%s\n%s\n" % \ + (line, a.pos, int(d[3])-1, + str(a), str(d) ) ) + self.assertEqual( a.qual, d[10], "line %i: quality mismatch: %s != %s, \n%s\n%s\n" % \ + (line, a.qual, d[10], + str(a), str(d) ) ) def testIteratePerContig(self): '''check random access per contig''' @@ -348,8 +400,11 @@ class TestIteratorColumn(unittest.TestCase): self.assertEqual( len(columns), refcov, "wrong number of pileup columns returned for position %s:%i, %i should be %i" %(contig,pos,len(columns), refcov) ) elif refcov == 1: # one read, all columns of the read are returned - self.assertEqual( len(columns), refcolumns, "pileup incomplete - %i should be %i " % (len(columns), refcolumns)) + self.assertEqual( len(columns), refcolumns, "pileup incomplete at position %i: got %i, expected %i " %\ + (pos, len(columns), refcolumns)) + + def tearDown(self): self.samfile.close() @@ -398,10 +453,22 @@ class TestAlignedReadFromBam(unittest.TestCase): def testARseq(self): self.assertEqual( self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 1: %s != %s" % (self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") ) self.assertEqual( self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "sequence size mismatch in read 2: %s != %s" % (self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") ) + self.assertEqual( self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 4: %s != %s" % (self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") ) def testARqual(self): self.assertEqual( self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 1: %s != %s" % (self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") ) self.assertEqual( self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "quality string mismatch in read 2: %s != %s" % (self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") ) + self.assertEqual( self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 3: %s != %s" % (self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") ) + + def testARquery(self): + self.assertEqual( self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "query mismatch in read 1: %s != %s" % (self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") ) + self.assertEqual( self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "query size mismatch in read 2: %s != %s" % (self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") ) + self.assertEqual( self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT", "query mismatch in read 4: %s != %s" % (self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT") ) + + def testARqqual(self): + self.assertEqual( self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "qquality string mismatch in read 1: %s != %s" % (self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") ) + self.assertEqual( self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "qquality string mismatch in read 2: %s != %s" % (self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") ) + self.assertEqual( self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22", "qquality string mismatch in read 3: %s != %s" % (self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22") ) def testPresentOptionalFields(self): self.assertEqual( self.reads[0].opt('NM'), 1, "optional field mismatch in read 1, NM: %s != %s" % (self.reads[0].opt('NM'), 1) ) @@ -472,7 +539,17 @@ class TestHeaderSam(unittest.TestCase): def testHeaders(self): self.compareHeaders( self.header, self.samfile.header ) self.compareHeaders( self.samfile.header, self.header ) - + + def testNameMapping( self ): + for x, y in enumerate( ("chr1", "chr2")): + tid = self.samfile.gettid( y ) + ref = self.samfile.getrname( x ) + self.assertEqual( tid, x ) + self.assertEqual( ref, y ) + + self.assertEqual( self.samfile.gettid("chr?"), -1 ) + self.assertRaises( ValueError, self.samfile.getrname, 2 ) + def tearDown(self): self.samfile.close() @@ -518,7 +595,14 @@ class TestPileupObjects(unittest.TestCase): def tearDown(self): self.samfile.close() - + +class TestContextManager(unittest.TestCase): + + def testManager( self ): + with pysam.Samfile('ex1.bam', 'rb') as samfile: + samfile.fetch() + self.assertEqual( samfile._isOpen(), False ) + class TestExceptions(unittest.TestCase): def setUp(self): @@ -548,23 +632,53 @@ class TestExceptions(unittest.TestCase): self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, 0 ) self.assertRaises( ValueError, self.samfile.fetch, "chr1", -5, -10 ) + self.assertRaises( ValueError, self.samfile.count, "chr1", 5, -10 ) + self.assertRaises( ValueError, self.samfile.count, "chr1", 5, 0 ) + self.assertRaises( ValueError, self.samfile.count, "chr1", -5, -10 ) + def testOutOfRangeNegativeOldFormat(self): self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-10" ) self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-0" ) self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5--10" ) + self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-10" ) + self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-0" ) + self.assertRaises( ValueError, self.samfile.count, region="chr1:-5--10" ) + def testOutOfRangNewFormat(self): self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999, 99999999999 ) + self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999, 99999999999 ) def testOutOfRangeLargeNewFormat(self): self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 ) + self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 ) def testOutOfRangeLargeOldFormat(self): self.assertRaises( ValueError, self.samfile.fetch, "chr1:99999999999999999-999999999999999999" ) + self.assertRaises( ValueError, self.samfile.count, "chr1:99999999999999999-999999999999999999" ) + + def testZeroToZero(self): + '''see issue 44''' + self.assertEqual( len(list(self.samfile.fetch('chr1', 0, 0))), 0) def tearDown(self): self.samfile.close() +class TestWrongFormat(unittest.TestCase): + '''test cases for opening files not in bam/sam format.''' + + def testOpenSamAsBam( self ): + self.assertRaises( ValueError, pysam.Samfile, 'ex1.sam', 'rb' ) + + def testOpenBamAsSam( self ): + self.assertRaises( ValueError, pysam.Samfile, 'ex1.bam', 'r' ) + + def testOpenFastaAsSam( self ): + self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'r' ) + + def testOpenFastaAsBam( self ): + self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'rb' ) + class TestFastaFile(unittest.TestCase): mSequences = { 'chr1' : @@ -581,20 +695,34 @@ class TestFastaFile(unittest.TestCase): self.assertEqual( seq, self.file.fetch( id ) ) for x in range( 0, len(seq), 10): self.assertEqual( seq[x:x+10], self.file.fetch( id, x, x+10) ) + # test x:end + self.assertEqual( seq[x:], self.file.fetch( id, x) ) + # test 0:x + self.assertEqual( seq[:x], self.file.fetch( id, None, x) ) + + + # unknown sequence returns "" + self.assertEqual( "", self.file.fetch("chr12") ) + + def testOutOfRangeAccess( self ): + '''test out of range access.''' + # out of range access returns an empty string + for contig, s in self.mSequences.iteritems(): + self.assertEqual( self.file.fetch( contig, len(s), len(s)+1), "" ) + + self.assertEqual( self.file.fetch( "chr3", 0 , 100), "" ) def testFetchErrors( self ): self.assertRaises( ValueError, self.file.fetch ) - self.assertRaises( ValueError, self.file.fetch, "chr1", 0 ) self.assertRaises( ValueError, self.file.fetch, "chr1", -1, 10 ) self.assertRaises( ValueError, self.file.fetch, "chr1", 20, 10 ) - # the following segfaults: - # self.assertRaises( IndexError, self.file.fetch, "chr12", ) - pass + def testLength( self ): + self.assertEqual( len(self.file), 2 ) + def tearDown(self): self.file.close() - class TestAlignedRead(unittest.TestCase): '''tests to check if aligned read can be constructed and manipulated. @@ -645,7 +773,7 @@ class TestAlignedRead(unittest.TestCase): a.mpos=200 a.isize=167 a.qual="1234" * 3 - + # todo: create tags return a def testUpdate( self ): @@ -714,6 +842,19 @@ class TestAlignedRead(unittest.TestCase): return a + def testTagParsing( self ): + '''test for tag parsing + + see http://groups.google.com/group/pysam-user-group/browse_thread/thread/67ca204059ea465a + ''' + samfile=pysam.Samfile( "ex8.bam","rb" ) + + for entry in samfile: + before = entry.tags + entry.tags = entry.tags + after = entry.tags + self.assertEqual( after, before ) + class TestDeNovoConstruction(unittest.TestCase): '''check BAM/SAM file construction using ex3.sam @@ -803,7 +944,7 @@ class TestDeNovoConstruction(unittest.TestCase): others = list(infile) for denovo, other in zip( others, self.reads): self.checkFieldEqual( other, denovo ) - self.assertEqual( other, denovo) + self.assertEqual( other.compare( denovo ), 0 ) def testSAMPerRead( self ): '''check if individual reads are binary equal.''' @@ -812,7 +953,7 @@ class TestDeNovoConstruction(unittest.TestCase): others = list(infile) for denovo, other in zip( others, self.reads): self.checkFieldEqual( other, denovo ) - self.assertEqual( other, denovo) + self.assertEqual( other.compare( denovo), 0 ) def testBAMWholeFile( self ): @@ -829,9 +970,336 @@ class TestDeNovoConstruction(unittest.TestCase): os.unlink( tmpfilename ) +class TestDoubleFetch(unittest.TestCase): + '''check if two iterators on the same bamfile are independent.''' + + def testDoubleFetch( self ): + + samfile1 = pysam.Samfile('ex1.bam', 'rb') + + for a,b in zip(samfile1.fetch(), samfile1.fetch()): + self.assertEqual( a.compare( b ), 0 ) + + def testDoubleFetchWithRegion( self ): + + samfile1 = pysam.Samfile('ex1.bam', 'rb') + chr, start, stop = 'chr1', 200, 3000000 + self.assertTrue(len(list(samfile1.fetch ( chr, start, stop))) > 0) #just making sure the test has something to catch + + for a,b in zip(samfile1.fetch( chr, start, stop), samfile1.fetch( chr, start, stop)): + self.assertEqual( a.compare( b ), 0 ) + + def testDoubleFetchUntilEOF( self ): + + samfile1 = pysam.Samfile('ex1.bam', 'rb') + + for a,b in zip(samfile1.fetch( until_eof = True), + samfile1.fetch( until_eof = True )): + self.assertEqual( a.compare( b), 0 ) + +class TestRemoteFileFTP(unittest.TestCase): + '''test remote access. + + ''' + + # Need to find an ftp server without password on standard + # port. + + url = "ftp://ftp.sanger.ac.uk/pub/rd/humanSequences/CV.bam" + region = "1:1-1000" + + def testFTPView( self ): + return + result = pysam.view( self.url, self.region ) + self.assertEqual( len(result), 36 ) + + def testFTPFetch( self ): + return + samfile = pysam.Samfile(self.url, "rb") + result = list(samfile.fetch( region = self.region )) + self.assertEqual( len(result), 36 ) + +class TestRemoteFileHTTP( unittest.TestCase): + + url = "http://genserv.anat.ox.ac.uk/downloads/pysam/test/ex1.bam" + region = "chr1:1-1000" + local = "ex1.bam" + + def testView( self ): + samfile_local = pysam.Samfile(self.local, "rb") + ref = list(samfile_local.fetch( region = self.region )) + + result = pysam.view( self.url, self.region ) + self.assertEqual( len(result), len(ref) ) + + def testFetch( self ): + samfile = pysam.Samfile(self.url, "rb") + result = list(samfile.fetch( region = self.region )) + samfile_local = pysam.Samfile(self.local, "rb") + ref = list(samfile_local.fetch( region = self.region )) + + self.assertEqual( len(ref), len(result) ) + for x, y in zip(result, ref): + self.assertEqual( x.compare( y ), 0 ) + + def testFetchAll( self ): + samfile = pysam.Samfile(self.url, "rb") + result = list(samfile.fetch()) + samfile_local = pysam.Samfile(self.local, "rb") + ref = list(samfile_local.fetch() ) + + self.assertEqual( len(ref), len(result) ) + for x, y in zip(result, ref): + self.assertEqual( x.compare( y ), 0 ) + +class TestLargeOptValues( unittest.TestCase ): + + ints = ( 65536, 214748, 2147484, 2147483647 ) + floats = ( 65536.0, 214748.0, 2147484.0 ) + + def check( self, samfile ): + + i = samfile.fetch() + for exp in self.ints: + rr = i.next() + obs = rr.opt("ZP") + self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr))) + + for exp in [ -x for x in self.ints ]: + rr = i.next() + obs = rr.opt("ZP") + self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr))) + + for exp in self.floats: + rr = i.next() + obs = rr.opt("ZP") + self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr))) + + for exp in [ -x for x in self.floats ]: + rr = i.next() + obs = rr.opt("ZP") + self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr))) + + def testSAM( self ): + samfile = pysam.Samfile("ex10.sam", "r") + self.check( samfile ) + + def testBAM( self ): + samfile = pysam.Samfile("ex10.bam", "rb") + self.check( samfile ) + +class TestSNPCalls( unittest.TestCase ): + '''test pysam SNP calling ability.''' + + def checkEqual( self, a, b ): + for x in ("reference_base", + "pos", + "genotype", + "consensus_quality", + "snp_quality", + "mapping_quality", + "coverage" ): + self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" % + (x, getattr(a, x), getattr(b,x), str(a), str(b))) + + def testAllPositionsViaIterator( self ): + samfile = pysam.Samfile( "ex1.bam", "rb") + fastafile = pysam.Fastafile( "ex1.fa" ) + try: + refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base != "*"] + except pysam.SamtoolsError: + pass + + i = samfile.pileup( stepper = "samtools", fastafile = fastafile ) + calls = list(pysam.IteratorSNPCalls(i)) + for x,y in zip( refs, calls ): + self.checkEqual( x, y ) + + def testPerPositionViaIterator( self ): + # test pileup for each position. This is a slow operation + # so this test is disabled + return + samfile = pysam.Samfile( "ex1.bam", "rb") + fastafile = pysam.Fastafile( "ex1.fa" ) + for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ): + if x.reference_base == "*": continue + i = samfile.pileup( x.chromosome, x.pos, x.pos+1, + fastafile = fastafile, + stepper = "samtools" ) + z = [ zz for zz in pysam.IteratorSamtools(i) if zz.pos == x.pos ] + self.assertEqual( len(z), 1 ) + self.checkEqual( x, z[0] ) + + def testPerPositionViaCaller( self ): + # test pileup for each position. This is a fast operation + samfile = pysam.Samfile( "ex1.bam", "rb") + fastafile = pysam.Fastafile( "ex1.fa" ) + i = samfile.pileup( stepper = "samtools", fastafile = fastafile ) + caller = pysam.SNPCaller( i ) + + for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ): + if x.reference_base == "*": continue + call = caller.call( x.chromosome, x.pos ) + self.checkEqual( x, call ) + +class TestIndelCalls( unittest.TestCase ): + '''test pysam indel calling.''' + + def checkEqual( self, a, b ): + + for x in ("pos", + "genotype", + "consensus_quality", + "snp_quality", + "mapping_quality", + "coverage", + "first_allele", + "second_allele", + "reads_first", + "reads_second", + "reads_diff"): + if b.genotype == "*/*" and x == "second_allele": + # ignore test for second allele (positions chr2:439 and chr2:1512) + continue + self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" % + (x, getattr(a, x), getattr(b,x), str(a), str(b))) + + def testAllPositionsViaIterator( self ): + + samfile = pysam.Samfile( "ex1.bam", "rb") + fastafile = pysam.Fastafile( "ex1.fa" ) + try: + refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base == "*"] + except pysam.SamtoolsError: + pass + + i = samfile.pileup( stepper = "samtools", fastafile = fastafile ) + calls = [ x for x in list(pysam.IteratorIndelCalls(i)) if x != None ] + for x,y in zip( refs, calls ): + self.checkEqual( x, y ) + + def testPerPositionViaCaller( self ): + # test pileup for each position. This is a fast operation + samfile = pysam.Samfile( "ex1.bam", "rb") + fastafile = pysam.Fastafile( "ex1.fa" ) + i = samfile.pileup( stepper = "samtools", fastafile = fastafile ) + caller = pysam.IndelCaller( i ) + + for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ): + if x.reference_base != "*": continue + call = caller.call( x.chromosome, x.pos ) + self.checkEqual( x, call ) + +class TestLogging( unittest.TestCase ): + '''test around bug issue 42, + + failed in versions < 0.4 + ''' + + def check( self, bamfile, log ): + + if log: + logger = logging.getLogger('franklin') + logger.setLevel(logging.INFO) + formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') + log_hand = logging.FileHandler('log.txt') + log_hand.setFormatter(formatter) + logger.addHandler(log_hand) + + bam = pysam.Samfile(bamfile, 'rb') + cols = bam.pileup() + self.assert_( True ) + + def testFail1( self ): + self.check( "ex9_fail.bam", False ) + self.check( "ex9_fail.bam", True ) + + def testNoFail1( self ): + self.check( "ex9_nofail.bam", False ) + self.check( "ex9_nofail.bam", True ) + + def testNoFail2( self ): + self.check( "ex9_nofail.bam", True ) + self.check( "ex9_nofail.bam", True ) + # TODOS # 1. finish testing all properties within pileup objects # 2. check exceptions and bad input problems (missing files, optional fields that aren't present, etc...) +# 3. check: presence of sequence + +class TestSamfileUtilityFunctions( unittest.TestCase ): + + def testCount( self ): + + samfile = pysam.Samfile( "ex1.bam", "rb" ) + + for contig in ("chr1", "chr2" ): + for start in xrange( 0, 2000, 100 ): + end = start + 1 + self.assertEqual( len( list( samfile.fetch( contig, start, end ) ) ), + samfile.count( contig, start, end ) ) + + # test empty intervals + self.assertEqual( len( list( samfile.fetch( contig, start, start ) ) ), + samfile.count( contig, start, start ) ) + + # test half empty intervals + self.assertEqual( len( list( samfile.fetch( contig, start ) ) ), + samfile.count( contig, start ) ) + + def testMate( self ): + '''test mate access.''' + + readnames = [ x.split("\t")[0] for x in open( "ex1.sam", "rb" ).readlines() ] + counts = collections.defaultdict( int ) + for x in readnames: counts[x] += 1 + + samfile = pysam.Samfile( "ex1.bam", "rb" ) + for read in samfile.fetch(): + if not read.is_paired: + self.assertRaises( ValueError, samfile.mate, read ) + elif read.mate_is_unmapped: + self.assertRaises( ValueError, samfile.mate, read ) + else: + if counts[read.qname] == 1: + self.assertRaises( ValueError, samfile.mate, read ) + else: + mate = samfile.mate( read ) + self.assertEqual( read.qname, mate.qname ) + self.assertEqual( read.is_read1, mate.is_read2 ) + self.assertEqual( read.is_read2, mate.is_read1 ) + self.assertEqual( read.pos, mate.mpos ) + self.assertEqual( read.mpos, mate.pos ) + +class TestSamtoolsProxy( unittest.TestCase ): + '''tests for sanity checking access to samtools functions.''' + + def testIndex( self ): + self.assertRaises( IOError, pysam.index, "missing_file" ) + + def testView( self ): + # note that view still echos "open: No such file or directory" + self.assertRaises( pysam.SamtoolsError, pysam.view, "missing_file" ) + + def testSort( self ): + self.assertRaises( pysam.SamtoolsError, pysam.sort, "missing_file" ) + +class TestSamfileIndex( unittest.TestCase): + + def testIndex( self ): + samfile = pysam.Samfile( "ex1.bam", "rb" ) + index = pysam.IndexedReads( samfile ) + index.build() + + reads = collections.defaultdict( int ) + + for read in samfile: reads[read.qname] += 1 + + for qname, counts in reads.iteritems(): + found = list(index.find( qname )) + self.assertEqual( len(found), counts ) + for x in found: self.assertEqual( x.qname, qname ) + if __name__ == "__main__": # build data files