2 '''unit testing code for pysam.
4 Execute in the :file:`tests` directory as it requires the Makefile
5 and data files located there.
8 import sys, os, shutil, gzip
14 def checkBinaryEqual( filename1, filename2 ):
15 '''return true if the two files are binary equal.'''
16 if os.path.getsize( filename1 ) != os.path.getsize( filename2 ):
19 infile1 = open(filename1, "rb")
20 infile2 = open(filename2, "rb")
22 def chariter( infile ):
29 for c1,c2 in itertools.izip( chariter( infile1), chariter( infile2) ):
38 class TestIndexing(unittest.TestCase):
39 filename = "example.gtf.gz"
40 filename_idx = "example.gtf.gz.tbi"
44 self.tmpfilename = "tmp_%i.gtf.gz" % id(self)
45 shutil.copyfile( self.filename, self.tmpfilename )
47 def testIndexPreset( self ):
48 '''test indexing via preset.'''
50 pysam.tabix_index( self.tmpfilename, preset = "gff" )
51 checkBinaryEqual( self.tmpfilename + ".tbi", self.filename_idx )
54 os.unlink( self.tmpfilename )
55 os.unlink( self.tmpfilename + ".tbi" )
57 class TestCompression(unittest.TestCase):
58 filename = "example.gtf.gz"
59 filename_idx = "example.gtf.gz.tbi"
63 self.tmpfilename = "tmp_%i.gtf" % id(self)
64 infile = gzip.open( self.filename, "r")
65 outfile = open( self.tmpfilename, "w" )
66 outfile.write( "".join(infile.readlines()) )
70 def testIndexPreset( self ):
71 '''test indexing via preset.'''
73 pysam.tabix_index( self.tmpfilename, preset = "gff" )
74 checkBinaryEqual( self.tmpfilename + ".gz", self.filename )
75 checkBinaryEqual( self.tmpfilename + ".gz.tbi", self.filename_idx )
78 os.unlink( self.tmpfilename + ".gz" )
79 os.unlink( self.tmpfilename + ".gz.tbi" )
81 class TestIteration( unittest.TestCase ):
83 filename = "example.gtf.gz"
87 self.tabix = pysam.Tabixfile( self.filename )
88 lines = [ x for x in gzip.open(self.filename).readlines() if not x.startswith("#") ]
89 # creates index of contig, start, end, adds content without newline.
91 (x[0][0], int(x[0][3]), int(x[0][4]), x[1])
92 for x in [ (y.split("\t"), y[:-1]) for y in lines ] ]
94 def getSubset( self, contig = None, start = None, end = None):
98 subset = [ x[3] for x in self.compare ]
100 if start != None and end == None:
101 # until end of contig
102 subset = [ x[3] for x in self.compare if x[0] == contig and x[2] > start ]
103 elif start == None and end != None:
104 # from start of contig
105 subset = [ x[3] for x in self.compare if x[0] == contig and x[1] <= end ]
106 elif start == None and end == None:
107 subset = [ x[3] for x in self.compare if x[0] == contig ]
109 # all within interval
110 subset = [ x[3] for x in self.compare if x[0] == contig and \
111 min( x[2], end) - max(x[1], start) > 0 ]
115 def checkPairwise( self, result, ref ):
123 self.assertEqual( len(result), len(ref),
124 "unexpected number of results: %i, expected %i, differences are %s: %s" \
125 % (len(result), len(ref),
129 for x, d in enumerate( zip( result, ref )):
130 self.assertEqual( d[0], d[1],
131 "unexpected results in pair %i: '%s', expected '%s'" % \
138 result = list(self.tabix.fetch())
139 ref = self.getSubset( )
140 self.checkPairwise( result, ref )
142 def testPerContig( self ):
143 for contig in ("chr1", "chr2", "chr1", "chr2" ):
144 result = list(self.tabix.fetch( contig ))
145 ref = self.getSubset( contig )
146 self.checkPairwise( result, ref )
148 def testPerContigToEnd( self ):
151 for contig in ("chr1", "chr2", "chr1", "chr2" ):
152 for start in range( 0, 200000, 1000):
153 result = list(self.tabix.fetch( contig, start, end ))
154 ref = self.getSubset( contig, start, end )
155 self.checkPairwise( result, ref )
157 def testPerContigFromStart( self ):
160 for contig in ("chr1", "chr2", "chr1", "chr2" ):
161 for end in range( 0, 200000, 1000):
162 result = list(self.tabix.fetch( contig, start, end ))
163 ref = self.getSubset( contig, start, end )
164 self.checkPairwise( result, ref )
166 def testPerContig( self ):
168 start, end = None, None
169 for contig in ("chr1", "chr2", "chr1", "chr2" ):
170 result = list(self.tabix.fetch( contig, start, end ))
171 ref = self.getSubset( contig, start, end )
172 self.checkPairwise( result, ref )
174 def testPerInterval( self ):
176 start, end = None, None
177 for contig in ("chr1", "chr2", "chr1", "chr2" ):
178 for start in range( 0, 200000, 2000):
179 for end in range( start, start + 2000, 500):
180 result = list(self.tabix.fetch( contig, start, end ))
181 ref = self.getSubset( contig, start, end )
182 self.checkPairwise( result, ref )
185 def testInvalidIntervals( self ):
187 self.assertRaises( ValueError, self.tabix.fetch, "chr1", 0, -10)
188 self.assertRaises( ValueError, self.tabix.fetch, "chr1", -10, 200)
189 self.assertRaises( ValueError, self.tabix.fetch, "chr1", 200, 0)
190 self.assertRaises( ValueError, self.tabix.fetch, "chr1", -10, -20)
191 self.assertRaises( ValueError, self.tabix.fetch, "chrUn" )
193 def testGetContigs( self ):
194 self.assertEqual( sorted(self.tabix.contigs), ["chr1", "chr2"] )
195 # check that contigs is read-only
196 self.assertRaises( AttributeError, setattr, self.tabix, "contigs", ["chr1", "chr2"] )
198 def testHeader( self ):
200 for x in gzip.open( self.filename ):
201 if not x.startswith("#"): break
203 header = list( self.tabix.header )
204 self.assertEqual( ref, header )
206 def testReopening( self ):
207 '''test repeated opening of the same file.'''
209 # opens any tabix file
210 inf = pysam.Tabixfile(self.filename)
213 for i in range(10000):
217 class TestParser( unittest.TestCase ):
219 filename = "example.gtf.gz"
223 self.tabix = pysam.Tabixfile( self.filename )
224 self.compare = [ x[:-1].split("\t") for x in gzip.open( self.filename, "r") if not x.startswith("#") ]
226 def testRead( self ):
228 for x, r in enumerate(self.tabix.fetch( parser = pysam.asTuple() )):
229 self.assertEqual( self.compare[x], list(r) )
230 self.assertEqual( len(self.compare[x]), len(r) )
233 for c in range(0,len(r)):
234 self.assertEqual( self.compare[x][c], r[c] )
236 # test slicing access
237 for c in range(0, len(r)-1):
238 for cc in range(c+1, len(r)):
239 self.assertEqual( self.compare[x][c:cc],
242 def testWrite( self ):
244 for x, r in enumerate(self.tabix.fetch( parser = pysam.asTuple() )):
245 self.assertEqual( self.compare[x], list(r) )
247 for y in range(len(r)):
248 r[y] = "test_%05i" % y
249 c[y] = "test_%05i" % y
250 self.assertEqual( c, list(r) )
251 self.assertEqual( "\t".join( c ), str(r) )
252 # check second assignment
253 for y in range(len(r)):
254 r[y] = "test_%05i" % y
255 self.assertEqual( c, list(r) )
256 self.assertEqual( "\t".join( c ), str(r) )
258 def testUnset( self ):
259 for x, r in enumerate(self.tabix.fetch( parser = pysam.asTuple() )):
260 self.assertEqual( self.compare[x], list(r) )
263 for y in range(len(r)):
266 self.assertEqual( c, list(r) )
267 self.assertEqual( "\t".join(e), str(r) )
269 class TestGTF( TestParser ):
271 def testRead( self ):
273 for x, r in enumerate(self.tabix.fetch( parser = pysam.asGTF() )):
276 self.assertEqual( len(c), len(r) )
277 self.assertEqual( "\t".join(c), str(r) )
278 self.assertTrue( r.gene_id.startswith("ENSG") )
279 if r.feature != "gene":
280 self.assertTrue( r.transcript_id.startswith("ENST") )
281 self.assertEqual( c[0], r.contig )
283 class TestBed( unittest.TestCase ):
284 filename = "example.bed.gz"
288 self.tabix = pysam.Tabixfile( self.filename )
289 self.compare = [ x[:-1].split("\t") for x in gzip.open( self.filename, "r") if not x.startswith("#") ]
291 def testRead( self ):
293 for x, r in enumerate(self.tabix.fetch( parser = pysam.asBed() )):
295 self.assertEqual( "\t".join( c ), str(r) )
296 self.assertEqual( list(c), list(r) )
297 self.assertEqual( c[0], r.contig)
298 self.assertEqual( int(c[1]), r.start)
299 self.assertEqual( int(c[2]), r.end)
301 def testWrite( self ):
303 for x, r in enumerate(self.tabix.fetch( parser = pysam.asBed() )):
305 self.assertEqual( "\t".join( c ), str(r) )
306 self.assertEqual( list(c), list(r) )
309 self.assertEqual( "test", r.contig)
310 self.assertEqual( "test", r[0])
313 self.assertEqual( int(c[1]) + 1, r.start )
314 self.assertEqual( str(int(c[1]) + 1), r[1] )
317 self.assertEqual( int(c[2]) + 1, r.end )
318 self.assertEqual( str(int(c[2]) + 1), r[2] )
320 class TestVCF( TestParser ):
322 filename = "example.vcf40.gz"
323 columns = ("contig", "pos", "id",
324 "ref", "alt", "qual",
325 "filter", "info", "format" )
327 def testRead( self ):
329 ncolumns = len(self.columns)
331 for x, r in enumerate(self.tabix.fetch( parser = pysam.asVCF() )):
333 for y, field in enumerate( self.columns ):
335 self.assertEqual( int(c[y]) - 1, getattr( r, field ) )
336 self.assertEqual( int(c[y]) - 1, r.pos )
338 self.assertEqual( c[y], getattr( r, field ),
339 "mismatch in field %s: %s != %s" %\
340 ( field,c[y], getattr( r, field ) ) )
341 self.assertEqual( len(c), len( r ) + ncolumns )
343 for y in range(len(c) - ncolumns):
344 self.assertEqual( c[ncolumns+y], r[y] )
346 def testWrite( self ):
348 ncolumns = len(self.columns)
350 for x, r in enumerate(self.tabix.fetch( parser = pysam.asVCF() )):
354 # check unmodified string
355 ref_string = "\t".join( c )
357 self.assertEqual( ref_string, cmp_string )
359 # set fields and compare field-wise
360 for y, field in enumerate( self.columns ):
362 rpos = getattr( r, field )
363 self.assertEqual( int(c[y]) - 1, rpos )
364 self.assertEqual( int(c[y]) - 1, r.pos )
366 setattr( r, field, rpos + 1 )
367 self.assertEqual( getattr( r, field ), rpos + 1 )
368 c[y] = str(int(c[y]) + 1 )
370 setattr( r, field, "test_%i" % y)
372 self.assertEqual( c[y], getattr( r, field ),
373 "mismatch in field %s: %s != %s" %\
374 ( field,c[y], getattr( r, field ) ) )
376 self.assertEqual( len(c), len( r ) + ncolumns )
378 for y in range(len(c) - ncolumns):
379 c[ncolumns+y] = "test_%i" % y
381 self.assertEqual( c[ncolumns+y], r[y] )
383 class TestVCF( TestParser ):
385 filename = "example.vcf40.gz"
387 def testOpening( self ):
389 infile = pysam.Tabixfile( self.filename )
394 ref_string = "\t".join( c )
397 self.assertEqual( ref_string, cmp_string )
399 if __name__ == "__main__":