X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=pysam.git;a=blobdiff_plain;f=tests%2Fpysam_test.py;fp=tests%2Fpysam_test.py;h=5be5ca4c783f74cef3ebf775681ef1f95a619269;hp=c2ae6fa37b8d34008eb639adc608fc80de7411d2;hb=70e0c1963e5f83b0a6ab2aa139e1a8f30aa25c56;hpb=91f4887d1b19c44d68a2b19f0abee56de3dbb8ea diff --git a/tests/pysam_test.py b/tests/pysam_test.py index c2ae6fa..5be5ca4 100755 --- a/tests/pysam_test.py +++ b/tests/pysam_test.py @@ -7,7 +7,7 @@ and data files located there. import pysam import unittest -import os +import os, re import itertools import subprocess import shutil @@ -47,7 +47,13 @@ def runSamtools( cmd ): except OSError, e: print >>sys.stderr, "Execution failed:", e - +def getSamtoolsVersion(): + '''return samtools version''' + + pipe = subprocess.Popen("samtools", shell=True, stderr=subprocess.PIPE).stderr + lines = "".join(pipe.readlines()) + return re.search( "Version:\s+(\S+)", lines).groups()[0] + class BinaryTest(unittest.TestCase): '''test samtools command line commands and compare against pysam commands. @@ -125,12 +131,19 @@ class BinaryTest(unittest.TestCase): BinaryTest.first_time = False + samtools_version = getSamtoolsVersion() + if samtools_version != pysam.__samtools_version__: + raise ValueError("versions of pysam/samtools and samtools differ: %s != %s" % \ + (pysam.__samtools_version__, + samtools_version )) + def checkCommand( self, command ): + if command: samtools_target, pysam_target = self.mCommands[command][0][0], self.mCommands[command][1][0] self.assertTrue( checkBinaryEqual( samtools_target, pysam_target ), "%s failed: files %s and %s are not the same" % (command, samtools_target, pysam_target) ) - + def testImport( self ): self.checkCommand( "import" ) @@ -153,7 +166,7 @@ class BinaryTest(unittest.TestCase): self.assertRaises( pysam.SamtoolsError, pysam.index, "exdoesntexist.bam" ) def __del__(self): - + return for label, command in self.mCommands.iteritems(): samtools_target, samtools_command = command[0] pysam_target, pysam_command = command[1] @@ -254,6 +267,12 @@ class IOTest(unittest.TestCase): self.assertRaises( ValueError, samfile.fetch ) self.assertEqual( len(list( samfile.fetch(until_eof = True) )), 3270 ) + def testReadingFromFileWithWrongMode( self ): + + assert not os.path.exists( "ex2.bam.bai" ) + samfile = pysam.Samfile( "ex2.bam", "r" ) + self.assertRaises( ValueError, samfile.fetch ) + class TestIteratorRow(unittest.TestCase): def setUp(self): @@ -283,6 +302,7 @@ class TestIteratorRow(unittest.TestCase): def tearDown(self): self.samfile.close() + class TestIteratorRowAll(unittest.TestCase): def setUp(self): @@ -348,7 +368,8 @@ class TestIteratorColumn(unittest.TestCase): self.assertEqual( len(columns), refcov, "wrong number of pileup columns returned for position %s:%i, %i should be %i" %(contig,pos,len(columns), refcov) ) elif refcov == 1: # one read, all columns of the read are returned - self.assertEqual( len(columns), refcolumns, "pileup incomplete - %i should be %i " % (len(columns), refcolumns)) + self.assertEqual( len(columns), refcolumns, "pileup incomplete at position %i: got %i, expected %i " %\ + (pos, len(columns), refcolumns)) def tearDown(self): self.samfile.close() @@ -398,10 +419,22 @@ class TestAlignedReadFromBam(unittest.TestCase): def testARseq(self): self.assertEqual( self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 1: %s != %s" % (self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") ) self.assertEqual( self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "sequence size mismatch in read 2: %s != %s" % (self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") ) + self.assertEqual( self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 4: %s != %s" % (self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") ) def testARqual(self): self.assertEqual( self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 1: %s != %s" % (self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") ) self.assertEqual( self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "quality string mismatch in read 2: %s != %s" % (self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") ) + self.assertEqual( self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 3: %s != %s" % (self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") ) + + def testARquery(self): + self.assertEqual( self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "query mismatch in read 1: %s != %s" % (self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") ) + self.assertEqual( self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "query size mismatch in read 2: %s != %s" % (self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") ) + self.assertEqual( self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT", "query mismatch in read 4: %s != %s" % (self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT") ) + + def testARqqual(self): + self.assertEqual( self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "qquality string mismatch in read 1: %s != %s" % (self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") ) + self.assertEqual( self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "qquality string mismatch in read 2: %s != %s" % (self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") ) + self.assertEqual( self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22", "qquality string mismatch in read 3: %s != %s" % (self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22") ) def testPresentOptionalFields(self): self.assertEqual( self.reads[0].opt('NM'), 1, "optional field mismatch in read 1, NM: %s != %s" % (self.reads[0].opt('NM'), 1) ) @@ -518,7 +551,14 @@ class TestPileupObjects(unittest.TestCase): def tearDown(self): self.samfile.close() - + +class TestContextManager(unittest.TestCase): + + def testManager( self ): + with pysam.Samfile('ex1.bam', 'rb') as samfile: + samfile.fetch() + self.assertEqual( samfile._isOpen(), False ) + class TestExceptions(unittest.TestCase): def setUp(self): @@ -581,20 +621,26 @@ class TestFastaFile(unittest.TestCase): self.assertEqual( seq, self.file.fetch( id ) ) for x in range( 0, len(seq), 10): self.assertEqual( seq[x:x+10], self.file.fetch( id, x, x+10) ) + # test x:end + self.assertEqual( seq[x:], self.file.fetch( id, x) ) + # test 0:x + self.assertEqual( seq[:x], self.file.fetch( id, None, x) ) + + + # unknown sequence returns "" + self.assertEqual( "", self.file.fetch("chr12") ) def testFetchErrors( self ): self.assertRaises( ValueError, self.file.fetch ) - self.assertRaises( ValueError, self.file.fetch, "chr1", 0 ) self.assertRaises( ValueError, self.file.fetch, "chr1", -1, 10 ) self.assertRaises( ValueError, self.file.fetch, "chr1", 20, 10 ) - # the following segfaults: - # self.assertRaises( IndexError, self.file.fetch, "chr12", ) - pass + def testLength( self ): + self.assertEqual( len(self.file), 2 ) + def tearDown(self): self.file.close() - class TestAlignedRead(unittest.TestCase): '''tests to check if aligned read can be constructed and manipulated. @@ -803,7 +849,7 @@ class TestDeNovoConstruction(unittest.TestCase): others = list(infile) for denovo, other in zip( others, self.reads): self.checkFieldEqual( other, denovo ) - self.assertEqual( other, denovo) + self.assertEqual( other.compare( denovo ), 0 ) def testSAMPerRead( self ): '''check if individual reads are binary equal.''' @@ -812,7 +858,7 @@ class TestDeNovoConstruction(unittest.TestCase): others = list(infile) for denovo, other in zip( others, self.reads): self.checkFieldEqual( other, denovo ) - self.assertEqual( other, denovo) + self.assertEqual( other.compare( denovo), 0 ) def testBAMWholeFile( self ): @@ -828,6 +874,82 @@ class TestDeNovoConstruction(unittest.TestCase): os.unlink( tmpfilename ) +class TestDoubleFetch(unittest.TestCase): + '''check if two iterators on the same bamfile are independent.''' + + def testDoubleFetch( self ): + + samfile1 = pysam.Samfile('ex1.bam', 'rb') + + for a,b in zip(samfile1.fetch(), samfile1.fetch()): + self.assertEqual( a.compare( b ), 0 ) + + def testDoubleFetchWithRegion( self ): + + samfile1 = pysam.Samfile('ex1.bam', 'rb') + chr, start, stop = 'chr1', 200, 3000000 + self.assertTrue(len(list(samfile1.fetch ( chr, start, stop))) > 0) #just making sure the test has something to catch + + for a,b in zip(samfile1.fetch( chr, start, stop), samfile1.fetch( chr, start, stop)): + self.assertEqual( a.compare( b ), 0 ) + + def testDoubleFetchUntilEOF( self ): + + samfile1 = pysam.Samfile('ex1.bam', 'rb') + + for a,b in zip(samfile1.fetch( until_eof = True), + samfile1.fetch( until_eof = True )): + self.assertEqual( a.compare( b), 0 ) + +class TestRemoteFileFTP(unittest.TestCase): + '''test remote access. + + ''' + + # Need to find an ftp server without password on standard + # port. + + url = "ftp://ftp.sanger.ac.uk/pub/rd/humanSequences/CV.bam" + region = "1:1-1000" + + def testFTPView( self ): + result = pysam.view( self.url, self.region ) + self.assertEqual( len(result), 36 ) + + def testFTPFetch( self ): + samfile = pysam.Samfile(self.url, "rb") + result = list(samfile.fetch( region = self.region )) + self.assertEqual( len(result), 36 ) + +class TestRemoteFileHTTP( unittest.TestCase): + + url = "http://genserv.anat.ox.ac.uk/downloads/pysam/test/ex1.bam" + region = "chr1:1-1000" + local = "ex1.bam" + + def testView( self ): + self.assertRaises( pysam.SamtoolsError, pysam.view, self.url, self.region ) + + def testFetch( self ): + samfile = pysam.Samfile(self.url, "rb") + result = list(samfile.fetch( region = self.region )) + samfile_local = pysam.Samfile(self.local, "rb") + ref = list(samfile_local.fetch( region = self.region )) + + self.assertEqual( len(ref), len(result) ) + for x, y in zip(result, ref): + self.assertEqual( x.compare( y ), 0 ) + + def testFetchAll( self ): + samfile = pysam.Samfile(self.url, "rb") + result = list(samfile.fetch()) + samfile_local = pysam.Samfile(self.local, "rb") + ref = list(samfile_local.fetch() ) + + self.assertEqual( len(ref), len(result) ) + for x, y in zip(result, ref): + self.assertEqual( x.compare( y ), 0 ) + # TODOS # 1. finish testing all properties within pileup objects