X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=pysam.git;a=blobdiff_plain;f=tests%2Ftabix_test.py;fp=tests%2Ftabix_test.py;h=5ff36badb0e41903e5dddf203105338a5057d260;hp=2caa628660f74a4389a23bf8b09c8c9e55266dee;hb=920cc77ed26707a0d92f9e436121b61d84bde627;hpb=8ab946e230447a6fe686edabc5e3cfa445da1721 diff --git a/tests/tabix_test.py b/tests/tabix_test.py index 2caa628..5ff36ba 100644 --- a/tests/tabix_test.py +++ b/tests/tabix_test.py @@ -10,6 +10,45 @@ import pysam import unittest import itertools import subprocess +import glob +import re + +IS_PYTHON3 = sys.version_info[0] >= 3 + +def myzip_open( infile, mode = "r" ): + '''open compressed file and decode.''' + + def _convert(f): + for l in f: + yield l.decode("ascii") + + if IS_PYTHON3: + if mode == "r": + return _convert(gzip.open(infile,"r")) + else: + return gzip.open( mode ) + +def loadAndConvert( infile ): + '''load and convert all fields to bytes''' + data = [] + if infile.endswith(".gz"): + for line in gzip.open( infile ): + line = line.decode("ascii") + if line.startswith("#"): continue + d = line.strip().split("\t") + data.append( [x.encode("ascii") for x in d ] ) + else: + with open(infile) as f: + for line in f: + if line.startswith("#"): continue + d = line.strip().split("\t") + data.append( [x.encode("ascii") for x in d ] ) + + return data + +def splitToBytes( s ): + '''split string and return list of bytes.''' + return [x.encode("ascii") for x in s.split("\t")] def checkBinaryEqual( filename1, filename2 ): '''return true if the two files are binary equal.''' @@ -19,14 +58,9 @@ def checkBinaryEqual( filename1, filename2 ): infile1 = open(filename1, "rb") infile2 = open(filename2, "rb") - def chariter( infile ): - while 1: - c = infile.read(1) - if c == "": break - yield c - + d1, d2 = infile1.read(), infile2.read() found = False - for c1,c2 in itertools.izip( chariter( infile1), chariter( infile2) ): + for c1,c2 in zip( d1, d2 ): if c1 != c2: break else: found = True @@ -61,22 +95,28 @@ class TestCompression(unittest.TestCase): def setUp( self ): self.tmpfilename = "tmp_%i.gtf" % id(self) - infile = gzip.open( self.filename, "r") - outfile = open( self.tmpfilename, "w" ) - outfile.write( "".join(infile.readlines()) ) + infile = gzip.open( self.filename, "rb") + outfile = open( self.tmpfilename, "wb" ) + outfile.write( infile.read() ) outfile.close() infile.close() def testIndexPreset( self ): '''test indexing via preset.''' - + pysam.tabix_index( self.tmpfilename, preset = "gff" ) checkBinaryEqual( self.tmpfilename + ".gz", self.filename ) checkBinaryEqual( self.tmpfilename + ".gz.tbi", self.filename_idx ) + def testCompression( self ): + '''see also issue 106''' + pysam.tabix_compress( self.tmpfilename, self.tmpfilename + ".gz" ) + checkBinaryEqual( self.tmpfilename, self.tmpfilename + ".gz" ) + def tearDown( self ): os.unlink( self.tmpfilename + ".gz" ) - os.unlink( self.tmpfilename + ".gz.tbi" ) + if os.path.exists( self.tmpfilename + ".gz.tbi" ): + os.unlink( self.tmpfilename + ".gz.tbi" ) class TestIteration( unittest.TestCase ): @@ -85,10 +125,16 @@ class TestIteration( unittest.TestCase ): def setUp( self ): self.tabix = pysam.Tabixfile( self.filename ) - lines = [ x for x in gzip.open(self.filename).readlines() if not x.startswith("#") ] + lines = [] + inf = gzip.open( self.filename, "rb") + for line in inf: + line = line.decode('ascii') + if line.startswith("#"): continue + lines.append( line ) + inf.close() # creates index of contig, start, end, adds content without newline. self.compare = [ - (x[0][0], int(x[0][3]), int(x[0][4]), x[1]) + (x[0][0], int(x[0][3]), int(x[0][4]), x[1]) for x in [ (y.split("\t"), y[:-1]) for y in lines ] ] def getSubset( self, contig = None, start = None, end = None): @@ -113,7 +159,8 @@ class TestIteration( unittest.TestCase ): return subset def checkPairwise( self, result, ref ): - + '''check pairwise results. + ''' result.sort() ref.sort() @@ -121,14 +168,14 @@ class TestIteration( unittest.TestCase ): b = set(ref) self.assertEqual( len(result), len(ref), - "unexpected number of results: %i, expected %i, differences are %s: %s" \ + "unexpected number of results: result=%i, expected ref=%i, differences are %s: %s" \ % (len(result), len(ref), a.difference(b), b.difference(a) )) - for x, d in enumerate( zip( result, ref )): + for x, d in enumerate( list(zip( result, ref ))): self.assertEqual( d[0], d[1], - "unexpected results in pair %i: '%s', expected '%s'" % \ + "unexpected results in pair %i:\n'%s', expected\n'%s'" % \ (x, d[0], d[1]) ) @@ -191,15 +238,19 @@ class TestIteration( unittest.TestCase ): self.assertRaises( ValueError, self.tabix.fetch, "chrUn" ) def testGetContigs( self ): - self.assertEqual( sorted(self.tabix.contigs), ["chr1", "chr2"] ) + self.assertEqual( sorted(self.tabix.contigs), [b"chr1", b"chr2"] ) # check that contigs is read-only self.assertRaises( AttributeError, setattr, self.tabix, "contigs", ["chr1", "chr2"] ) def testHeader( self ): ref = [] - for x in gzip.open( self.filename ): + inf = gzip.open( self.filename ) + for x in inf: + x = x.decode("ascii") if not x.startswith("#"): break - ref.append( x[:-1] ) + ref.append( x[:-1].encode('ascii') ) + inf.close() + header = list( self.tabix.header ) self.assertEqual( ref, header ) @@ -213,7 +264,6 @@ class TestIteration( unittest.TestCase ): for i in range(10000): func1() - class TestParser( unittest.TestCase ): filename = "example.gtf.gz" @@ -221,7 +271,7 @@ class TestParser( unittest.TestCase ): def setUp( self ): self.tabix = pysam.Tabixfile( self.filename ) - self.compare = [ x[:-1].split("\t") for x in gzip.open( self.filename, "r") if not x.startswith("#") ] + self.compare = loadAndConvert( self.filename ) def testRead( self ): @@ -247,36 +297,80 @@ class TestParser( unittest.TestCase ): for y in range(len(r)): r[y] = "test_%05i" % y c[y] = "test_%05i" % y - self.assertEqual( c, list(r) ) + self.assertEqual( [x.encode("ascii") for x in c], list(r) ) self.assertEqual( "\t".join( c ), str(r) ) # check second assignment for y in range(len(r)): r[y] = "test_%05i" % y - self.assertEqual( c, list(r) ) + self.assertEqual( [x.encode("ascii") for x in c], list(r) ) self.assertEqual( "\t".join( c ), str(r) ) def testUnset( self ): for x, r in enumerate(self.tabix.fetch( parser = pysam.asTuple() )): self.assertEqual( self.compare[x], list(r) ) c = list(r) - e = list(r) + e = [ x.decode('ascii') for x in r ] for y in range(len(r)): - r[y] = c[y] = None + r[y] = None + c[y] = None e[y] = "" self.assertEqual( c, list(r) ) self.assertEqual( "\t".join(e), str(r) ) + def testIteratorCompressed( self ): + '''test iteration from compressed file.''' + with gzip.open( self.filename ) as infile: + for x, r in enumerate(pysam.tabix_iterator( infile, pysam.asTuple() )): + self.assertEqual( self.compare[x], list(r) ) + self.assertEqual( len(self.compare[x]), len(r) ) + + # test indexing + for c in range(0,len(r)): + self.assertEqual( self.compare[x][c], r[c] ) + + # test slicing access + for c in range(0, len(r)-1): + for cc in range(c+1, len(r)): + self.assertEqual( self.compare[x][c:cc], + r[c:cc] ) + + def testIteratorUncompressed( self ): + '''test iteration from uncompressed file.''' + tmpfilename = 'tmp_testIteratorUncompressed' + infile = gzip.open( self.filename, "rb") + outfile = open( tmpfilename, "wb" ) + outfile.write( infile.read() ) + outfile.close() + infile.close() + + with open( tmpfilename ) as infile: + for x, r in enumerate(pysam.tabix_iterator( infile, pysam.asTuple() )): + self.assertEqual( self.compare[x], list(r) ) + self.assertEqual( len(self.compare[x]), len(r) ) + + # test indexing + for c in range(0,len(r)): + self.assertEqual( self.compare[x][c], r[c] ) + + # test slicing access + for c in range(0, len(r)-1): + for cc in range(c+1, len(r)): + self.assertEqual( self.compare[x][c:cc], + r[c:cc] ) + + os.unlink( tmpfilename ) + class TestGTF( TestParser ): def testRead( self ): for x, r in enumerate(self.tabix.fetch( parser = pysam.asGTF() )): - c = self.compare[x] - + c = self.compare[x] self.assertEqual( len(c), len(r) ) - self.assertEqual( "\t".join(c), str(r) ) + self.assertEqual( list(c), list(r) ) + self.assertEqual( c, splitToBytes( str(r) ) ) self.assertTrue( r.gene_id.startswith("ENSG") ) - if r.feature != "gene": + if r.feature != b'gene': self.assertTrue( r.transcript_id.startswith("ENST") ) self.assertEqual( c[0], r.contig ) @@ -285,14 +379,15 @@ class TestBed( unittest.TestCase ): def setUp( self ): - self.tabix = pysam.Tabixfile( self.filename ) - self.compare = [ x[:-1].split("\t") for x in gzip.open( self.filename, "r") if not x.startswith("#") ] + self.tabix = pysam.Tabixfile( self.filename) + self.compare = loadAndConvert( self.filename ) def testRead( self ): for x, r in enumerate(self.tabix.fetch( parser = pysam.asBed() )): c = self.compare[x] - self.assertEqual( "\t".join( c ), str(r) ) + self.assertEqual( len(c), len(r) ) + self.assertEqual( c, splitToBytes( str(r) ) ) self.assertEqual( list(c), list(r) ) self.assertEqual( c[0], r.contig) self.assertEqual( int(c[1]), r.start) @@ -302,35 +397,54 @@ class TestBed( unittest.TestCase ): for x, r in enumerate(self.tabix.fetch( parser = pysam.asBed() )): c = self.compare[x] - self.assertEqual( "\t".join( c ), str(r) ) + self.assertEqual( c, splitToBytes(str(r) )) self.assertEqual( list(c), list(r) ) r.contig = "test" - self.assertEqual( "test", r.contig) - self.assertEqual( "test", r[0]) + self.assertEqual( b"test", r.contig) + self.assertEqual( b"test", r[0]) r.start += 1 self.assertEqual( int(c[1]) + 1, r.start ) - self.assertEqual( str(int(c[1]) + 1), r[1] ) + self.assertEqual( str(int(c[1]) + 1), r[1].decode("ascii" )) r.end += 1 self.assertEqual( int(c[2]) + 1, r.end ) - self.assertEqual( str(int(c[2]) + 1), r[2] ) + self.assertEqual( str(int(c[2]) + 1), r[2].decode("ascii") ) -class TestVCF( TestParser ): +class TestVCFFromTabix( unittest.TestCase ): + + filename = "example.vcf40" - filename = "example.vcf40.gz" columns = ("contig", "pos", "id", "ref", "alt", "qual", "filter", "info", "format" ) + def setUp( self ): + + self.tmpfilename = "tmp_%s.vcf" % id(self) + shutil.copyfile( self.filename, self.tmpfilename ) + pysam.tabix_index( self.tmpfilename, preset = "vcf" ) + + self.tabix = pysam.Tabixfile( self.tmpfilename + ".gz" ) + self.compare = loadAndConvert( self.filename ) + + def tearDown( self ): + + os.unlink( self.tmpfilename + ".gz" ) + os.unlink( self.tmpfilename + ".gz.tbi" ) + def testRead( self ): ncolumns = len(self.columns) for x, r in enumerate(self.tabix.fetch( parser = pysam.asVCF() )): c = self.compare[x] + for y, field in enumerate( self.columns ): + # it is ok to have a missing format column + if y == 8 and y == len(c): continue + if field == "pos": self.assertEqual( int(c[y]) - 1, getattr( r, field ) ) self.assertEqual( int(c[y]) - 1, r.pos ) @@ -338,7 +452,10 @@ class TestVCF( TestParser ): self.assertEqual( c[y], getattr( r, field ), "mismatch in field %s: %s != %s" %\ ( field,c[y], getattr( r, field ) ) ) - self.assertEqual( len(c), len( r ) + ncolumns ) + if len(c) == 8: + self.assertEqual( 0, len(r) ) + else: + self.assertEqual( len(c), len( r ) + ncolumns ) for y in range(len(c) - ncolumns): self.assertEqual( c[ncolumns+y], r[y] ) @@ -348,16 +465,17 @@ class TestVCF( TestParser ): ncolumns = len(self.columns) for x, r in enumerate(self.tabix.fetch( parser = pysam.asVCF() )): - c = self.compare[x] - # check unmodified string - ref_string = "\t".join( c ) cmp_string = str(r) - self.assertEqual( ref_string, cmp_string ) + ref_string = "\t".join( [x.decode() for x in c] ) + self.assertEqual( ref_string, cmp_string ) + # set fields and compare field-wise for y, field in enumerate( self.columns ): + # it is ok to have a missing format column + if y == 8 and y == len(c): continue if field == "pos": rpos = getattr( r, field ) self.assertEqual( int(c[y]) - 1, rpos ) @@ -368,33 +486,98 @@ class TestVCF( TestParser ): c[y] = str(int(c[y]) + 1 ) else: setattr( r, field, "test_%i" % y) - c[y] = "test_%i" % y + c[y] = ("test_%i" % y).encode('ascii') self.assertEqual( c[y], getattr( r, field ), "mismatch in field %s: %s != %s" %\ ( field,c[y], getattr( r, field ) ) ) - self.assertEqual( len(c), len( r ) + ncolumns ) + if len(c) == 8: + self.assertEqual( 0, len(r) ) + else: + self.assertEqual( len(c), len( r ) + ncolumns ) for y in range(len(c) - ncolumns): - c[ncolumns+y] = "test_%i" % y - r[y] = "test_%i" % y + c[ncolumns+y] = ("test_%i" % y).encode('ascii') + r[y] = ("test_%i" % y).encode('ascii') self.assertEqual( c[ncolumns+y], r[y] ) -class TestVCF( TestParser ): - filename = "example.vcf40.gz" +class TestVCFFromVCF( unittest.TestCase ): - def testOpening( self ): - while 1: - infile = pysam.Tabixfile( self.filename ) - infile.close() + filename = "example.vcf40" + + columns = ("contig", "pos", "id", + "ref", "alt", "qual", + "filter", "info", "format" ) + + # tests failing while parsing + fail_on_parsing = ( (5, "Flag fields should not have a value"), + (9, "aouao" ), + (13, "aoeu" ), + (18, "Error BAD_NUMBER_OF_PARAMETERS" ), + (24, "Error HEADING_NOT_SEPARATED_BY_TABS" ) ) + + # tests failing on opening + fail_on_opening = ( (24, "Error HEADING_NOT_SEPARATED_BY_TABS" ), + ) + + def setUp( self ): + + self.vcf = pysam.VCF() + self.compare = loadAndConvert( self.filename ) + + def testParsing( self ): + + fn = os.path.basename( self.filename ) + with open(self.filename) as f: + + for x, msg in self.fail_on_opening: + if "%i.vcf" % x == fn: + self.assertRaises( ValueError, self.vcf.parse, f ) + return + else: + iter = self.vcf.parse(f) + + for x, msg in self.fail_on_parsing: + if "%i.vcf" % x == fn: + self.assertRaises( ValueError, list, iter ) + break + # python 2.7 + # self.assertRaisesRegexp( ValueError, re.compile(msg), self.vcf.parse, f ) + else: + for ln in iter: + pass + +############################################################################ +# create a test class for each example vcf file. +# Two samples are created - +# 1. Testing pysam/tabix access +# 2. Testing the VCF class +vcf_files = glob.glob( "vcf-examples/*.vcf" ) + +for vcf_file in vcf_files: + n = "VCFFromTabixTest_%s" % os.path.basename( vcf_file[:-4] ) + globals()[n] = type( n, (TestVCFFromTabix,), dict( filename=vcf_file,) ) + n = "VCFFromVCFTest_%s" % os.path.basename( vcf_file[:-4] ) + globals()[n] = type( n, (TestVCFFromVCF,), dict( filename=vcf_file,) ) + +############################################################################ +class TestRemoteFileHTTP( unittest.TestCase): + + url = "http://genserv.anat.ox.ac.uk/downloads/pysam/test/example.gtf.gz" + region = "chr1:1-1000" + local = "example.gtf.gz" + + def testFetchAll( self ): + remote_file = pysam.Tabixfile(self.url, "r") + remote_result = list(remote_file.fetch()) + local_file = pysam.Tabixfile(self.local, "r") + local_result = list(local_file.fetch()) + + self.assertEqual( len(remote_result), len(local_result) ) + for x, y in zip(remote_result, local_result): + self.assertEqual( x, y ) - - # check strings - ref_string = "\t".join( c ) - cmp_string = str(r) - - self.assertEqual( ref_string, cmp_string ) if __name__ == "__main__":