X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=pysam.git;a=blobdiff_plain;f=tests%2Fpysam_test.py;h=d2bf4150de58551accb107f1822e85912c7c58d5;hp=6938a977130436534a5dcbd80383a29053c3b0c4;hb=e1756c41e7a1d7cc01fb95e42df9dd04da2d2991;hpb=ca46ef4ba4a883c57cea62d5bf1bc021f1185109 diff --git a/tests/pysam_test.py b/tests/pysam_test.py index 6938a97..d2bf415 100755 --- a/tests/pysam_test.py +++ b/tests/pysam_test.py @@ -12,9 +12,7 @@ import itertools, collections import subprocess import shutil import logging - SAMTOOLS="samtools" -WORKDIR="pysam_test_work" def checkBinaryEqual( filename1, filename2 ): '''return true if the two files are binary equal.''' @@ -67,131 +65,56 @@ class BinaryTest(unittest.TestCase): first_time = True # a list of commands to test - commands = \ - { - "view" : - ( - ("ex1.view", "view ex1.bam > ex1.view"), - ("pysam_ex1.view", (pysam.view, "ex1.bam" ) ), - ), - "view2" : - ( - ("ex1.view", "view -bT ex1.fa -o ex1.view2 ex1.sam"), - # note that -o ex1.view2 throws exception. - ("pysam_ex1.view", (pysam.view, "-bT ex1.fa -oex1.view2 ex1.sam" ) ), - ), - "sort" : - ( - ( "ex1.sort.bam", "sort ex1.bam ex1.sort" ), - ( "pysam_ex1.sort.bam", (pysam.sort, "ex1.bam pysam_ex1.sort" ) ), - ), - "mpileup" : - ( - ("ex1.pileup", "mpileup ex1.bam > ex1.pileup" ), - ("pysam_ex1.mpileup", (pysam.mpileup, "ex1.bam" ) ), - ), - "depth" : + mCommands = \ + { "faidx" : \ + ( + ("ex1.fa.fai", "faidx ex1.fa"), + ("pysam_ex1.fa.fai", (pysam.faidx, "ex1.fa") ), + ), + "import" : ( - ("ex1.depth", "depth ex1.bam > ex1.depth" ), - ("pysam_ex1.depth", (pysam.depth, "ex1.bam" ) ), - ), - "faidx" : - ( - ("ex1.fa.fai", "faidx ex1.fa"), - ("pysam_ex1.fa.fai", (pysam.faidx, "ex1.fa") ), + ("ex1.bam", "import ex1.fa.fai ex1.sam.gz ex1.bam" ), + ("pysam_ex1.bam", (pysam.samimport, "ex1.fa.fai ex1.sam.gz pysam_ex1.bam") ), ), "index": ( ("ex1.bam.bai", "index ex1.bam" ), ("pysam_ex1.bam.bai", (pysam.index, "pysam_ex1.bam" ) ), ), - "idxstats" : - ( - ("ex1.idxstats", "idxstats ex1.bam > ex1.idxstats" ), - ("pysam_ex1.idxstats", (pysam.idxstats, "pysam_ex1.bam" ) ), - ), - "fixmate" : - ( - ("ex1.fixmate", "fixmate ex1.bam ex1.fixmate" ), - ("pysam_ex1.fixmate", (pysam.fixmate, "pysam_ex1.bam pysam_ex1.fixmate") ), - ), - "flagstat" : - ( - ("ex1.flagstat", "flagstat ex1.bam > ex1.flagstat" ), - ("pysam_ex1.flagstat", (pysam.flagstat, "pysam_ex1.bam") ), - ), - "calmd" : - ( - ("ex1.calmd", "calmd ex1.bam ex1.fa > ex1.calmd" ), - ("pysam_ex1.calmd", (pysam.calmd, "pysam_ex1.bam ex1.fa") ), - ), - "merge" : - ( - ("ex1.merge", "merge -f ex1.merge ex1.bam ex1.bam" ), - # -f option does not work - following command will cause the subsequent - # command to fail - ("pysam_ex1.merge", (pysam.merge, "pysam_ex1.merge pysam_ex1.bam pysam_ex1.bam") ), - ), - "rmdup" : - ( - ("ex1.rmdup", "rmdup ex1.bam ex1.rmdup" ), - ("pysam_ex1.rmdup", (pysam.rmdup, "pysam_ex1.bam pysam_ex1.rmdup" )), - ), - "reheader" : - ( - ( "ex1.reheader", "reheader ex1.bam ex1.bam > ex1.reheader"), - ( "pysam_ex1.reheader", (pysam.reheader, "ex1.bam ex1.bam" ) ), + "pileup1" : + ( + ("ex1.pileup", "pileup -cf ex1.fa ex1.bam > ex1.pileup" ), + ("pysam_ex1.pileup", (pysam.pileup, "-c -f ex1.fa ex1.bam" ) ) ), - "cat": - ( - ( "ex1.cat", "cat ex1.bam ex1.bam > ex1.cat"), - ( "pysam_ex1.cat", (pysam.cat, "ex1.bam ex1.bam" ) ), - ), - "targetcut": - ( - ("ex1.targetcut", "targetcut ex1.bam > ex1.targetcut" ), - ("pysam_ex1.targetcut", (pysam.targetcut, "pysam_ex1.bam") ), + "pileup2" : + ( + ("ex1.glf", "pileup -gf ex1.fa ex1.bam > ex1.glf" ), + ("pysam_ex1.glf", (pysam.pileup, "-g -f ex1.fa ex1.bam" ) ) ), - "phase": - ( - ("ex1.phase", "phase ex1.bam > ex1.phase" ), - ("pysam_ex1.phase", (pysam.phase, "pysam_ex1.bam") ), + "glfview" : + ( + ("ex1.glfview", "glfview ex1.glf > ex1.glfview"), + ("pysam_ex1.glfview", (pysam.glfview, "ex1.glf" ) ), ), - "import" : - ( - ("ex1.bam", "import ex1.fa.fai ex1.sam.gz ex1.bam" ), - ("pysam_ex1.bam", (pysam.samimport, "ex1.fa.fai ex1.sam.gz pysam_ex1.bam") ), + "view" : + ( + ("ex1.view", "view ex1.bam > ex1.view"), + ("pysam_ex1.view", (pysam.view, "ex1.bam" ) ), ), - "bam2fq": - ( - ("ex1.bam2fq", "bam2fq ex1.bam > ex1.bam2fq" ), - ("pysam_ex1.bam2fq", (pysam.bam2fq, "pysam_ex1.bam") ), + "view2" : + ( + ("ex1.view", "view -bT ex1.fa -o ex1.view2 ex1.sam"), + # note that -o ex1.view2 throws exception. + ("pysam_ex1.view", (pysam.view, "-bT ex1.fa -oex1.view2 ex1.sam" ) ), ), } # some tests depend on others. The order specifies in which order # the samtools commands are executed. - # The first three (faidx, import, index) need to be in that order, - # the rest is arbitrary. - order = ('faidx', 'import', 'index', - # 'pileup1', 'pileup2', deprecated + mOrder = ('faidx', 'import', 'index', + 'pileup1', 'pileup2', # 'glfview', deprecated - 'view', 'view2', - 'sort', - 'mpileup', - 'depth', - 'idxstats', - 'fixmate', - 'flagstat', - # 'calmd', - 'merge', - 'rmdup', - 'reheader', - 'cat', - 'targetcut', - 'phase', - 'bam2fq', - ) + 'view', 'view2' ) def setUp( self ): '''setup tests. @@ -201,51 +124,25 @@ class BinaryTest(unittest.TestCase): files. ''' if BinaryTest.first_time: + # copy the source + shutil.copy( "ex1.fa", "pysam_ex1.fa" ) - # remove previous files - if os.path.exists( WORKDIR ): - shutil.rmtree( WORKDIR ) - - # copy the source files to WORKDIR - os.makedirs( WORKDIR ) - - shutil.copy( "ex1.fa", os.path.join( WORKDIR, "pysam_ex1.fa" ) ) - shutil.copy( "ex1.fa", os.path.join( WORKDIR, "ex1.fa" ) ) - shutil.copy( "ex1.sam.gz", os.path.join( WORKDIR, "ex1.sam.gz" ) ) - shutil.copy( "ex1.sam", os.path.join( WORKDIR, "ex1.sam" ) ) - - # cd to workdir - savedir = os.getcwd() - os.chdir( WORKDIR ) - - for label in self.order: - command = self.commands[label] + for label in self.mOrder: + command = self.mCommands[label] samtools_target, samtools_command = command[0] - try: - pysam_target, pysam_command = command[1] - except ValueError, msg: - raise ValueError( "error while setting up %s=%s: %s" %\ - (label, command, msg) ) + pysam_target, pysam_command = command[1] runSamtools( " ".join( (SAMTOOLS, samtools_command ))) pysam_method, pysam_options = pysam_command - try: - output = pysam_method( *pysam_options.split(" "), raw=True) - except pysam.SamtoolsError, msg: - raise pysam.SamtoolsError( "error while executing %s: options=%s: msg=%s" %\ - (label, pysam_options, msg) ) + output = pysam_method( *pysam_options.split(" "), raw=True) if ">" in samtools_command: outfile = open( pysam_target, "wb" ) for line in output: outfile.write( line ) outfile.close() - - os.chdir( savedir ) - BinaryTest.first_time = False - + BinaryTest.first_time = False samtools_version = getSamtoolsVersion() - def _r( s ): # patch - remove any of the alpha/beta suffixes, i.e., 0.1.12a -> 0.1.12 if s.count('-') > 0: s = s[0:s.find('-')] @@ -258,9 +155,7 @@ class BinaryTest(unittest.TestCase): def checkCommand( self, command ): if command: - samtools_target, pysam_target = self.commands[command][0][0], self.commands[command][1][0] - samtools_target = os.path.join( WORKDIR, samtools_target ) - pysam_target = os.path.join( WORKDIR, pysam_target ) + samtools_target, pysam_target = self.mCommands[command][0][0], self.mCommands[command][1][0] self.assertTrue( checkBinaryEqual( samtools_target, pysam_target ), "%s failed: files %s and %s are not the same" % (command, samtools_target, pysam_target) ) @@ -269,51 +164,12 @@ class BinaryTest(unittest.TestCase): def testIndex( self ): self.checkCommand( "index" ) - - def testSort( self ): - self.checkCommand( "sort" ) - - def testMpileup( self ): - self.checkCommand( "mpileup" ) - - def testDepth( self ): - self.checkCommand( "depth" ) - - def testIdxstats( self ): - self.checkCommand( "idxstats" ) - - def testFixmate( self ): - self.checkCommand( "fixmate" ) - - def testFlagstat( self ): - self.checkCommand( "flagstat" ) - def testMerge( self ): - self.checkCommand( "merge" ) - - def testRmdup( self ): - self.checkCommand( "rmdup" ) - - def testReheader( self ): - self.checkCommand( "reheader" ) - - def testCat( self ): - self.checkCommand( "cat" ) - - def testTargetcut( self ): - self.checkCommand( "targetcut" ) - - def testPhase( self ): - self.checkCommand( "phase" ) - - def testBam2fq( self ): - self.checkCommand( "bam2fq" ) - - # def testPileup1( self ): - # self.checkCommand( "pileup1" ) + def testPileup1( self ): + self.checkCommand( "pileup1" ) - # def testPileup2( self ): - # self.checkCommand( "pileup2" ) + def testPileup2( self ): + self.checkCommand( "pileup2" ) # deprecated # def testGLFView( self ): @@ -326,24 +182,27 @@ class BinaryTest(unittest.TestCase): self.assertRaises( IOError, pysam.index, "exdoesntexist.bam" ) def __del__(self): - if os.path.exists( WORKDIR ): - shutil.rmtree( WORKDIR ) + return + for label, command in self.mCommands.iteritems(): + samtools_target, samtools_command = command[0] + pysam_target, pysam_command = command[1] + if os.path.exists( samtools_target): os.remove( samtools_target ) + if os.path.exists( pysam_target): os.remove( pysam_target ) + if os.path.exists( "pysam_ex1.fa" ): os.remove( "pysam_ex1.fa" ) class IOTest(unittest.TestCase): '''check if reading samfile and writing a samfile are consistent.''' def checkEcho( self, input_filename, reference_filename, output_filename, - input_mode, output_mode, use_template = True ): + input_mode, output_mode, use_template = True): '''iterate through *input_filename* writing to *output_filename* and comparing the output to *reference_filename*. The files are opened according to the *input_mode* and *output_mode*. If *use_template* is set, the header is copied from infile using the - template mechanism, otherwise target names and lengths are passed - explicitely. - + template mechanism, otherwise target names and lengths are passed explicitely. ''' infile = pysam.Samfile( input_filename, input_mode ) @@ -352,8 +211,7 @@ class IOTest(unittest.TestCase): else: outfile = pysam.Samfile( output_filename, output_mode, referencenames = infile.references, - referencelengths = infile.lengths, - add_sq_text = False ) + referencelengths = infile.lengths ) iter = infile.fetch() for x in iter: outfile.write( x ) @@ -399,18 +257,6 @@ class IOTest(unittest.TestCase): self.checkEcho( input_filename, reference_filename, output_filename, "r", "w" ) - def testReadSamWithoutHeaderWriteSamWithoutHeader( self ): - - input_filename = "ex1.sam" - output_filename = "pysam_ex1.sam" - reference_filename = "ex1.sam" - - # disabled - reading from a samfile without header - # is not implemented. - - # self.checkEcho( input_filename, reference_filename, output_filename, - # "r", "w" ) - def testFetchFromClosedFile( self ): samfile = pysam.Samfile( "ex1.bam", "rb" ) @@ -437,21 +283,13 @@ class IOTest(unittest.TestCase): self.assertEqual( 0, samfile.write(None) ) def testAutoDetection( self ): - '''test if autodetection works.''' - - samfile = pysam.Samfile( "ex3.sam" ) - self.assertRaises( ValueError, samfile.fetch, 'chr1' ) - samfile.close() - samfile = pysam.Samfile( "ex3.bam" ) - samfile.fetch('chr1') + samfile.fetch() samfile.close() - def testReadingFromSamFileWithoutHeader( self ): - '''read from samfile without header. - ''' - samfile = pysam.Samfile( "ex7.sam" ) - self.assertRaises( NotImplementedError, samfile.__iter__ ) + samfile = pysam.Samfile( "ex3.sam") + samfile.fetch() + samfile.close() def testReadingFromFileWithoutIndex( self ): '''read from bam file without index.''' @@ -461,67 +299,6 @@ class IOTest(unittest.TestCase): self.assertRaises( ValueError, samfile.fetch ) self.assertEqual( len(list( samfile.fetch(until_eof = True) )), 3270 ) - def testReadingUniversalFileMode( self ): - '''read from samfile without header. - ''' - - input_filename = "ex2.sam" - output_filename = "pysam_ex2.sam" - reference_filename = "ex1.sam" - - self.checkEcho( input_filename, reference_filename, output_filename, - "rU", "w" ) - -class TestFloatTagBug( unittest.TestCase ): - '''see issue 71''' - - def testFloatTagBug( self ): - '''a float tag before another exposed a parsing bug in bam_aux_get - expected to fail - - This test is expected to fail until samtools is fixed. - ''' - samfile = pysam.Samfile("tag_bug.bam") - read = samfile.fetch(until_eof=True).next() - self.assertTrue( ('XC',1) in read.tags ) - self.assertEqual(read.opt('XC'), 1) - -class TestTagParsing( unittest.TestCase ): - '''tests checking the accuracy of tag setting and retrieval.''' - - def makeRead( self ): - a = pysam.AlignedRead() - a.qname = "read_12345" - a.tid = 0 - a.seq="ACGT" * 3 - a.flag = 0 - a.rname = 0 - a.pos = 1 - a.mapq = 20 - a.cigar = ( (0,10), (2,1), (0,25) ) - a.mrnm = 0 - a.mpos=200 - a.isize = 0 - a.qual ="1234" * 3 - # todo: create tags - return a - - def testNegativeIntegers( self ): - x = -2 - aligned_read = self.makeRead() - aligned_read.tags = [("XD", int(x) ) ] - print aligned_read.tags - - def testNegativeIntegers2( self ): - x = -2 - r = self.makeRead() - r.tags = [("XD", int(x) ) ] - outfile = pysam.Samfile( "test.bam", - "wb", - referencenames = ("chr1",), - referencelengths = (1000,) ) - outfile.write (r ) - outfile.close() - class TestIteratorRow(unittest.TestCase): def setUp(self): @@ -664,20 +441,14 @@ class TestAlignedReadFromBam(unittest.TestCase): def testARmrnm(self): self.assertEqual( self.reads[0].mrnm, 0, "mate reference sequence name mismatch in read 1: %s != %s" % (self.reads[0].mrnm, 0) ) self.assertEqual( self.reads[1].mrnm, 1, "mate reference sequence name mismatch in read 2: %s != %s" % (self.reads[1].mrnm, 1) ) - self.assertEqual( self.reads[0].rnext, 0, "mate reference sequence name mismatch in read 1: %s != %s" % (self.reads[0].rnext, 0) ) - self.assertEqual( self.reads[1].rnext, 1, "mate reference sequence name mismatch in read 2: %s != %s" % (self.reads[1].rnext, 1) ) def testARmpos(self): self.assertEqual( self.reads[0].mpos, 200-1, "mate mapping position mismatch in read 1: %s != %s" % (self.reads[0].mpos, 200-1) ) self.assertEqual( self.reads[1].mpos, 500-1, "mate mapping position mismatch in read 2: %s != %s" % (self.reads[1].mpos, 500-1) ) - self.assertEqual( self.reads[0].pnext, 200-1, "mate mapping position mismatch in read 1: %s != %s" % (self.reads[0].pnext, 200-1) ) - self.assertEqual( self.reads[1].pnext, 500-1, "mate mapping position mismatch in read 2: %s != %s" % (self.reads[1].pnext, 500-1) ) def testARisize(self): self.assertEqual( self.reads[0].isize, 167, "insert size mismatch in read 1: %s != %s" % (self.reads[0].isize, 167) ) self.assertEqual( self.reads[1].isize, 412, "insert size mismatch in read 2: %s != %s" % (self.reads[1].isize, 412) ) - self.assertEqual( self.reads[0].tlen, 167, "insert size mismatch in read 1: %s != %s" % (self.reads[0].tlen, 167) ) - self.assertEqual( self.reads[1].tlen, 412, "insert size mismatch in read 2: %s != %s" % (self.reads[1].tlen, 412) ) def testARseq(self): self.assertEqual( self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 1: %s != %s" % (self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") ) @@ -900,16 +671,10 @@ class TestWrongFormat(unittest.TestCase): self.assertRaises( ValueError, pysam.Samfile, 'ex1.sam', 'rb' ) def testOpenBamAsSam( self ): - # test fails, needs to be implemented. - # sam.fetch() fails on reading, not on opening - # self.assertRaises( ValueError, pysam.Samfile, 'ex1.bam', 'r' ) - pass + self.assertRaises( ValueError, pysam.Samfile, 'ex1.bam', 'r' ) def testOpenFastaAsSam( self ): - # test fails, needs to be implemented. - # sam.fetch() fails on reading, not on opening - # self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'r' ) - pass + self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'r' ) def testOpenFastaAsBam( self ): self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'rb' ) @@ -988,7 +753,7 @@ class TestAlignedRead(unittest.TestCase): self.assertEqual( a.rname, 0 ) self.assertEqual( a.mapq, 0 ) self.assertEqual( a.cigar, None ) - self.assertEqual( a.tags, [] ) + self.assertEqual( a.tags, None ) self.assertEqual( a.mrnm, 0 ) self.assertEqual( a.mpos, 0 ) self.assertEqual( a.isize, 0 ) @@ -1323,107 +1088,107 @@ class TestLargeOptValues( unittest.TestCase ): samfile = pysam.Samfile("ex10.bam", "rb") self.check( samfile ) -# class TestSNPCalls( unittest.TestCase ): -# '''test pysam SNP calling ability.''' - -# def checkEqual( self, a, b ): -# for x in ("reference_base", -# "pos", -# "genotype", -# "consensus_quality", -# "snp_quality", -# "mapping_quality", -# "coverage" ): -# self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" % -# (x, getattr(a, x), getattr(b,x), str(a), str(b))) - -# def testAllPositionsViaIterator( self ): -# samfile = pysam.Samfile( "ex1.bam", "rb") -# fastafile = pysam.Fastafile( "ex1.fa" ) -# try: -# refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base != "*"] -# except pysam.SamtoolsError: -# pass - -# i = samfile.pileup( stepper = "samtools", fastafile = fastafile ) -# calls = list(pysam.IteratorSNPCalls(i)) -# for x,y in zip( refs, calls ): -# self.checkEqual( x, y ) - -# def testPerPositionViaIterator( self ): -# # test pileup for each position. This is a slow operation -# # so this test is disabled -# return -# samfile = pysam.Samfile( "ex1.bam", "rb") -# fastafile = pysam.Fastafile( "ex1.fa" ) -# for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ): -# if x.reference_base == "*": continue -# i = samfile.pileup( x.chromosome, x.pos, x.pos+1, -# fastafile = fastafile, -# stepper = "samtools" ) -# z = [ zz for zz in pysam.IteratorSamtools(i) if zz.pos == x.pos ] -# self.assertEqual( len(z), 1 ) -# self.checkEqual( x, z[0] ) - -# def testPerPositionViaCaller( self ): -# # test pileup for each position. This is a fast operation -# samfile = pysam.Samfile( "ex1.bam", "rb") -# fastafile = pysam.Fastafile( "ex1.fa" ) -# i = samfile.pileup( stepper = "samtools", fastafile = fastafile ) -# caller = pysam.SNPCaller( i ) - -# for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ): -# if x.reference_base == "*": continue -# call = caller.call( x.chromosome, x.pos ) -# self.checkEqual( x, call ) - -# class TestIndelCalls( unittest.TestCase ): -# '''test pysam indel calling.''' - -# def checkEqual( self, a, b ): - -# for x in ("pos", -# "genotype", -# "consensus_quality", -# "snp_quality", -# "mapping_quality", -# "coverage", -# "first_allele", -# "second_allele", -# "reads_first", -# "reads_second", -# "reads_diff"): -# if b.genotype == "*/*" and x == "second_allele": -# # ignore test for second allele (positions chr2:439 and chr2:1512) -# continue -# self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" % -# (x, getattr(a, x), getattr(b,x), str(a), str(b))) - -# def testAllPositionsViaIterator( self ): - -# samfile = pysam.Samfile( "ex1.bam", "rb") -# fastafile = pysam.Fastafile( "ex1.fa" ) -# try: -# refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base == "*"] -# except pysam.SamtoolsError: -# pass - -# i = samfile.pileup( stepper = "samtools", fastafile = fastafile ) -# calls = [ x for x in list(pysam.IteratorIndelCalls(i)) if x != None ] -# for x,y in zip( refs, calls ): -# self.checkEqual( x, y ) - -# def testPerPositionViaCaller( self ): -# # test pileup for each position. This is a fast operation -# samfile = pysam.Samfile( "ex1.bam", "rb") -# fastafile = pysam.Fastafile( "ex1.fa" ) -# i = samfile.pileup( stepper = "samtools", fastafile = fastafile ) -# caller = pysam.IndelCaller( i ) - -# for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ): -# if x.reference_base != "*": continue -# call = caller.call( x.chromosome, x.pos ) -# self.checkEqual( x, call ) +class TestSNPCalls( unittest.TestCase ): + '''test pysam SNP calling ability.''' + + def checkEqual( self, a, b ): + for x in ("reference_base", + "pos", + "genotype", + "consensus_quality", + "snp_quality", + "mapping_quality", + "coverage" ): + self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" % + (x, getattr(a, x), getattr(b,x), str(a), str(b))) + + def testAllPositionsViaIterator( self ): + samfile = pysam.Samfile( "ex1.bam", "rb") + fastafile = pysam.Fastafile( "ex1.fa" ) + try: + refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base != "*"] + except pysam.SamtoolsError: + pass + + i = samfile.pileup( stepper = "samtools", fastafile = fastafile ) + calls = list(pysam.IteratorSNPCalls(i)) + for x,y in zip( refs, calls ): + self.checkEqual( x, y ) + + def testPerPositionViaIterator( self ): + # test pileup for each position. This is a slow operation + # so this test is disabled + return + samfile = pysam.Samfile( "ex1.bam", "rb") + fastafile = pysam.Fastafile( "ex1.fa" ) + for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ): + if x.reference_base == "*": continue + i = samfile.pileup( x.chromosome, x.pos, x.pos+1, + fastafile = fastafile, + stepper = "samtools" ) + z = [ zz for zz in pysam.IteratorSamtools(i) if zz.pos == x.pos ] + self.assertEqual( len(z), 1 ) + self.checkEqual( x, z[0] ) + + def testPerPositionViaCaller( self ): + # test pileup for each position. This is a fast operation + samfile = pysam.Samfile( "ex1.bam", "rb") + fastafile = pysam.Fastafile( "ex1.fa" ) + i = samfile.pileup( stepper = "samtools", fastafile = fastafile ) + caller = pysam.SNPCaller( i ) + + for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ): + if x.reference_base == "*": continue + call = caller.call( x.chromosome, x.pos ) + self.checkEqual( x, call ) + +class TestIndelCalls( unittest.TestCase ): + '''test pysam indel calling.''' + + def checkEqual( self, a, b ): + + for x in ("pos", + "genotype", + "consensus_quality", + "snp_quality", + "mapping_quality", + "coverage", + "first_allele", + "second_allele", + "reads_first", + "reads_second", + "reads_diff"): + if b.genotype == "*/*" and x == "second_allele": + # ignore test for second allele (positions chr2:439 and chr2:1512) + continue + self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" % + (x, getattr(a, x), getattr(b,x), str(a), str(b))) + + def testAllPositionsViaIterator( self ): + + samfile = pysam.Samfile( "ex1.bam", "rb") + fastafile = pysam.Fastafile( "ex1.fa" ) + try: + refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base == "*"] + except pysam.SamtoolsError: + pass + + i = samfile.pileup( stepper = "samtools", fastafile = fastafile ) + calls = [ x for x in list(pysam.IteratorIndelCalls(i)) if x != None ] + for x,y in zip( refs, calls ): + self.checkEqual( x, y ) + + def testPerPositionViaCaller( self ): + # test pileup for each position. This is a fast operation + samfile = pysam.Samfile( "ex1.bam", "rb") + fastafile = pysam.Fastafile( "ex1.fa" ) + i = samfile.pileup( stepper = "samtools", fastafile = fastafile ) + caller = pysam.IndelCaller( i ) + + for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ): + if x.reference_base != "*": continue + call = caller.call( x.chromosome, x.pos ) + self.checkEqual( x, call ) class TestLogging( unittest.TestCase ): '''test around bug issue 42, @@ -1506,13 +1271,6 @@ class TestSamfileUtilityFunctions( unittest.TestCase ): self.assertEqual( read.pos, mate.mpos ) self.assertEqual( read.mpos, mate.pos ) - def testIndexStats( self ): - '''test if total number of mapped/unmapped reads is correct.''' - - samfile = pysam.Samfile( "ex1.bam", "rb" ) - self.assertEqual( samfile.mapped, 3235 ) - self.assertEqual( samfile.unmapped, 35 ) - class TestSamtoolsProxy( unittest.TestCase ): '''tests for sanity checking access to samtools functions.'''