+ ints = ( 65536, 214748, 2147484, 2147483647 )
+ floats = ( 65536.0, 214748.0, 2147484.0 )
+
+ def check( self, samfile ):
+
+ i = samfile.fetch()
+ for exp in self.ints:
+ rr = i.next()
+ obs = rr.opt("ZP")
+ self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
+
+ for exp in [ -x for x in self.ints ]:
+ rr = i.next()
+ obs = rr.opt("ZP")
+ self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
+
+ for exp in self.floats:
+ rr = i.next()
+ obs = rr.opt("ZP")
+ self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
+
+ for exp in [ -x for x in self.floats ]:
+ rr = i.next()
+ obs = rr.opt("ZP")
+ self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
+
+ def testSAM( self ):
+ samfile = pysam.Samfile("ex10.sam", "r")
+ self.check( samfile )
+
+ def testBAM( self ):
+ samfile = pysam.Samfile("ex10.bam", "rb")
+ self.check( samfile )
+
+class TestSNPCalls( unittest.TestCase ):
+ '''test pysam SNP calling ability.'''
+
+ def checkEqual( self, a, b ):
+ for x in ("reference_base",
+ "pos",
+ "genotype",
+ "consensus_quality",
+ "snp_quality",
+ "mapping_quality",
+ "coverage" ):
+ self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" %
+ (x, getattr(a, x), getattr(b,x), str(a), str(b)))
+
+ def testAllPositionsViaIterator( self ):
+ samfile = pysam.Samfile( "ex1.bam", "rb")
+ fastafile = pysam.Fastafile( "ex1.fa" )
+ try:
+ refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base != "*"]
+ except pysam.SamtoolsError:
+ pass
+
+ i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
+ calls = list(pysam.IteratorSNPCalls(i))
+ for x,y in zip( refs, calls ):
+ self.checkEqual( x, y )
+
+ def testPerPositionViaIterator( self ):
+ # test pileup for each position. This is a slow operation
+ # so this test is disabled
+ return
+ samfile = pysam.Samfile( "ex1.bam", "rb")
+ fastafile = pysam.Fastafile( "ex1.fa" )
+ for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
+ if x.reference_base == "*": continue
+ i = samfile.pileup( x.chromosome, x.pos, x.pos+1,
+ fastafile = fastafile,
+ stepper = "samtools" )
+ z = [ zz for zz in pysam.IteratorSamtools(i) if zz.pos == x.pos ]
+ self.assertEqual( len(z), 1 )
+ self.checkEqual( x, z[0] )
+
+ def testPerPositionViaCaller( self ):
+ # test pileup for each position. This is a fast operation
+ samfile = pysam.Samfile( "ex1.bam", "rb")
+ fastafile = pysam.Fastafile( "ex1.fa" )
+ i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
+ caller = pysam.SNPCaller( i )
+
+ for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
+ if x.reference_base == "*": continue
+ call = caller.call( x.chromosome, x.pos )
+ self.checkEqual( x, call )
+
+class TestIndelCalls( unittest.TestCase ):
+ '''test pysam indel calling.'''
+
+ def checkEqual( self, a, b ):
+
+ for x in ("pos",
+ "genotype",
+ "consensus_quality",
+ "snp_quality",
+ "mapping_quality",
+ "coverage",
+ "first_allele",
+ "second_allele",
+ "reads_first",
+ "reads_second",
+ "reads_diff"):
+ if b.genotype == "*/*" and x == "second_allele":
+ # ignore test for second allele (positions chr2:439 and chr2:1512)
+ continue
+ self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" %
+ (x, getattr(a, x), getattr(b,x), str(a), str(b)))
+
+ def testAllPositionsViaIterator( self ):
+
+ samfile = pysam.Samfile( "ex1.bam", "rb")
+ fastafile = pysam.Fastafile( "ex1.fa" )
+ try:
+ refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base == "*"]
+ except pysam.SamtoolsError:
+ pass
+
+ i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
+ calls = [ x for x in list(pysam.IteratorIndelCalls(i)) if x != None ]
+ for x,y in zip( refs, calls ):
+ self.checkEqual( x, y )
+
+ def testPerPositionViaCaller( self ):
+ # test pileup for each position. This is a fast operation
+ samfile = pysam.Samfile( "ex1.bam", "rb")
+ fastafile = pysam.Fastafile( "ex1.fa" )
+ i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
+ caller = pysam.IndelCaller( i )
+
+ for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
+ if x.reference_base != "*": continue
+ call = caller.call( x.chromosome, x.pos )
+ self.checkEqual( x, call )
+
+class TestLogging( unittest.TestCase ):
+ '''test around bug issue 42,
+
+ failed in versions < 0.4
+ '''
+
+ def check( self, bamfile, log ):
+
+ if log:
+ logger = logging.getLogger('franklin')
+ logger.setLevel(logging.INFO)
+ formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
+ log_hand = logging.FileHandler('log.txt')
+ log_hand.setFormatter(formatter)
+ logger.addHandler(log_hand)
+
+ bam = pysam.Samfile(bamfile, 'rb')
+ cols = bam.pileup()
+ self.assert_( True )
+
+ def testFail1( self ):
+ self.check( "ex9_fail.bam", False )
+ self.check( "ex9_fail.bam", True )
+
+ def testNoFail1( self ):
+ self.check( "ex9_nofail.bam", False )
+ self.check( "ex9_nofail.bam", True )
+
+ def testNoFail2( self ):
+ self.check( "ex9_nofail.bam", True )
+ self.check( "ex9_nofail.bam", True )
+