2 '''unit testing code for pysam.
4 Execute in the :file:`tests` directory as it requires the Makefile
5 and data files located there.
11 import itertools, collections
17 def checkBinaryEqual( filename1, filename2 ):
18 '''return true if the two files are binary equal.'''
19 if os.path.getsize( filename1 ) != os.path.getsize( filename2 ):
22 infile1 = open(filename1, "rb")
23 infile2 = open(filename2, "rb")
25 def chariter( infile ):
32 for c1,c2 in itertools.izip( chariter( infile1), chariter( infile2) ):
41 def runSamtools( cmd ):
42 '''run a samtools command'''
45 retcode = subprocess.call(cmd, shell=True)
47 print >>sys.stderr, "Child was terminated by signal", -retcode
49 print >>sys.stderr, "Execution failed:", e
51 def getSamtoolsVersion():
52 '''return samtools version'''
54 pipe = subprocess.Popen(SAMTOOLS, shell=True, stderr=subprocess.PIPE).stderr
55 lines = "".join(pipe.readlines())
56 return re.search( "Version:\s+(\S+)", lines).groups()[0]
58 class BinaryTest(unittest.TestCase):
59 '''test samtools command line commands and compare
60 against pysam commands.
62 Tests fail, if the output is not binary identical.
67 # a list of commands to test
71 ("ex1.fa.fai", "faidx ex1.fa"),
72 ("pysam_ex1.fa.fai", (pysam.faidx, "ex1.fa") ),
76 ("ex1.bam", "import ex1.fa.fai ex1.sam.gz ex1.bam" ),
77 ("pysam_ex1.bam", (pysam.samimport, "ex1.fa.fai ex1.sam.gz pysam_ex1.bam") ),
81 ("ex1.bam.bai", "index ex1.bam" ),
82 ("pysam_ex1.bam.bai", (pysam.index, "pysam_ex1.bam" ) ),
86 ("ex1.pileup", "pileup -cf ex1.fa ex1.bam > ex1.pileup" ),
87 ("pysam_ex1.pileup", (pysam.pileup, "-c -f ex1.fa ex1.bam" ) )
91 ("ex1.glf", "pileup -gf ex1.fa ex1.bam > ex1.glf" ),
92 ("pysam_ex1.glf", (pysam.pileup, "-g -f ex1.fa ex1.bam" ) )
96 ("ex1.glfview", "glfview ex1.glf > ex1.glfview"),
97 ("pysam_ex1.glfview", (pysam.glfview, "ex1.glf" ) ),
101 ("ex1.view", "view ex1.bam > ex1.view"),
102 ("pysam_ex1.view", (pysam.view, "ex1.bam" ) ),
106 ("ex1.view", "view -bT ex1.fa -o ex1.view2 ex1.sam"),
107 # note that -o ex1.view2 throws exception.
108 ("pysam_ex1.view", (pysam.view, "-bT ex1.fa -oex1.view2 ex1.sam" ) ),
112 # some tests depend on others. The order specifies in which order
113 # the samtools commands are executed.
114 mOrder = ('faidx', 'import', 'index',
115 'pileup1', 'pileup2',
116 # 'glfview', deprecated
122 For setup, all commands will be run before the first test is
123 executed. Individual tests will then just compare the output
126 if BinaryTest.first_time:
128 shutil.copy( "ex1.fa", "pysam_ex1.fa" )
130 for label in self.mOrder:
131 command = self.mCommands[label]
132 samtools_target, samtools_command = command[0]
133 pysam_target, pysam_command = command[1]
134 runSamtools( " ".join( (SAMTOOLS, samtools_command )))
135 pysam_method, pysam_options = pysam_command
136 output = pysam_method( *pysam_options.split(" "), raw=True)
137 if ">" in samtools_command:
138 outfile = open( pysam_target, "wb" )
139 for line in output: outfile.write( line )
142 BinaryTest.first_time = False
144 samtools_version = getSamtoolsVersion()
147 # patch - remove any of the alpha/beta suffixes, i.e., 0.1.12a -> 0.1.12
148 if s.count('-') > 0: s = s[0:s.find('-')]
149 return re.sub( "[^0-9.]", "", s )
151 if _r(samtools_version) != _r( pysam.__samtools_version__):
152 raise ValueError("versions of pysam/samtools and samtools differ: %s != %s" % \
153 (pysam.__samtools_version__,
156 def checkCommand( self, command ):
158 samtools_target, pysam_target = self.mCommands[command][0][0], self.mCommands[command][1][0]
159 self.assertTrue( checkBinaryEqual( samtools_target, pysam_target ),
160 "%s failed: files %s and %s are not the same" % (command, samtools_target, pysam_target) )
162 def testImport( self ):
163 self.checkCommand( "import" )
165 def testIndex( self ):
166 self.checkCommand( "index" )
168 def testPileup1( self ):
169 self.checkCommand( "pileup1" )
171 def testPileup2( self ):
172 self.checkCommand( "pileup2" )
175 # def testGLFView( self ):
176 # self.checkCommand( "glfview" )
178 def testView( self ):
179 self.checkCommand( "view" )
181 def testEmptyIndex( self ):
182 self.assertRaises( IOError, pysam.index, "exdoesntexist.bam" )
186 for label, command in self.mCommands.iteritems():
187 samtools_target, samtools_command = command[0]
188 pysam_target, pysam_command = command[1]
189 if os.path.exists( samtools_target): os.remove( samtools_target )
190 if os.path.exists( pysam_target): os.remove( pysam_target )
191 if os.path.exists( "pysam_ex1.fa" ): os.remove( "pysam_ex1.fa" )
193 class IOTest(unittest.TestCase):
194 '''check if reading samfile and writing a samfile are consistent.'''
196 def checkEcho( self, input_filename, reference_filename,
198 input_mode, output_mode, use_template = True):
199 '''iterate through *input_filename* writing to *output_filename* and
200 comparing the output to *reference_filename*.
202 The files are opened according to the *input_mode* and *output_mode*.
204 If *use_template* is set, the header is copied from infile using the
205 template mechanism, otherwise target names and lengths are passed explicitely.
208 infile = pysam.Samfile( input_filename, input_mode )
210 outfile = pysam.Samfile( output_filename, output_mode, template = infile )
212 outfile = pysam.Samfile( output_filename, output_mode,
213 referencenames = infile.references,
214 referencelengths = infile.lengths )
216 iter = infile.fetch()
217 for x in iter: outfile.write( x )
221 self.assertTrue( checkBinaryEqual( reference_filename, output_filename),
222 "files %s and %s are not the same" % (reference_filename, output_filename) )
224 def testReadWriteBam( self ):
226 input_filename = "ex1.bam"
227 output_filename = "pysam_ex1.bam"
228 reference_filename = "ex1.bam"
230 self.checkEcho( input_filename, reference_filename, output_filename,
233 def testReadWriteBamWithTargetNames( self ):
235 input_filename = "ex1.bam"
236 output_filename = "pysam_ex1.bam"
237 reference_filename = "ex1.bam"
239 self.checkEcho( input_filename, reference_filename, output_filename,
240 "rb", "wb", use_template = False )
242 def testReadWriteSamWithHeader( self ):
244 input_filename = "ex2.sam"
245 output_filename = "pysam_ex2.sam"
246 reference_filename = "ex2.sam"
248 self.checkEcho( input_filename, reference_filename, output_filename,
251 def testReadWriteSamWithoutHeader( self ):
253 input_filename = "ex2.sam"
254 output_filename = "pysam_ex2.sam"
255 reference_filename = "ex1.sam"
257 self.checkEcho( input_filename, reference_filename, output_filename,
260 def testFetchFromClosedFile( self ):
262 samfile = pysam.Samfile( "ex1.bam", "rb" )
264 self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
266 def testClosedFile( self ):
267 '''test that access to a closed samfile raises ValueError.'''
269 samfile = pysam.Samfile( "ex1.bam", "rb" )
271 self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
272 self.assertRaises( ValueError, samfile.pileup, 'chr1', 100, 120)
273 self.assertRaises( ValueError, samfile.getrname, 0 )
274 self.assertRaises( ValueError, samfile.tell )
275 self.assertRaises( ValueError, samfile.seek, 0 )
276 self.assertRaises( ValueError, getattr, samfile, "nreferences" )
277 self.assertRaises( ValueError, getattr, samfile, "references" )
278 self.assertRaises( ValueError, getattr, samfile, "lengths" )
279 self.assertRaises( ValueError, getattr, samfile, "text" )
280 self.assertRaises( ValueError, getattr, samfile, "header" )
282 # write on closed file
283 self.assertEqual( 0, samfile.write(None) )
285 def testAutoDetection( self ):
286 samfile = pysam.Samfile( "ex3.bam" )
290 samfile = pysam.Samfile( "ex3.sam")
294 def testReadingFromFileWithoutIndex( self ):
295 '''read from bam file without index.'''
297 assert not os.path.exists( "ex2.bam.bai" )
298 samfile = pysam.Samfile( "ex2.bam", "rb" )
299 self.assertRaises( ValueError, samfile.fetch )
300 self.assertEqual( len(list( samfile.fetch(until_eof = True) )), 3270 )
302 class TestIteratorRow(unittest.TestCase):
305 self.samfile=pysam.Samfile( "ex1.bam","rb" )
307 def checkRange( self, rnge ):
308 '''compare results from iterator with those from samtools.'''
309 ps = list(self.samfile.fetch(region=rnge))
310 sa = list(pysam.view( "ex1.bam", rnge, raw = True) )
311 self.assertEqual( len(ps), len(sa), "unequal number of results for range %s: %i != %i" % (rnge, len(ps), len(sa) ))
312 # check if the same reads are returned and in the same order
313 for line, pair in enumerate( zip( ps, sa ) ):
316 self.assertEqual( a.qname, d[0], "line %i: read id mismatch: %s != %s" % (line, a.rname, d[0]) )
317 self.assertEqual( a.pos, int(d[3])-1, "line %i: read position mismatch: %s != %s, \n%s\n%s\n" % \
318 (line, a.pos, int(d[3])-1,
320 self.assertEqual( a.qual, d[10], "line %i: quality mismatch: %s != %s, \n%s\n%s\n" % \
321 (line, a.qual, d[10],
324 def testIteratePerContig(self):
325 '''check random access per contig'''
326 for contig in self.samfile.references:
327 self.checkRange( contig )
329 def testIterateRanges(self):
330 '''check random access per range'''
331 for contig, length in zip(self.samfile.references, self.samfile.lengths):
332 for start in range( 1, length, 90):
333 self.checkRange( "%s:%i-%i" % (contig, start, start + 90) ) # this includes empty ranges
338 class TestIteratorRowAll(unittest.TestCase):
341 self.samfile=pysam.Samfile( "ex1.bam","rb" )
343 def testIterate(self):
344 '''compare results from iterator with those from samtools.'''
345 ps = list(self.samfile.fetch())
346 sa = list(pysam.view( "ex1.bam", raw = True) )
347 self.assertEqual( len(ps), len(sa), "unequal number of results: %i != %i" % (len(ps), len(sa) ))
348 # check if the same reads are returned
349 for line, pair in enumerate( zip( ps, sa ) ):
350 data = pair[1].split("\t")
351 self.assertEqual( pair[0].qname, data[0], "read id mismatch in line %i: %s != %s" % (line, pair[0].rname, data[0]) )
356 class TestIteratorColumn(unittest.TestCase):
357 '''test iterator column against contents of ex3.bam.'''
359 # note that samfile contains 1-based coordinates
360 # 1D means deletion with respect to reference sequence
362 mCoverages = { 'chr1' : [ 0 ] * 20 + [1] * 36 + [0] * (100 - 20 -35 ),
363 'chr2' : [ 0 ] * 20 + [1] * 35 + [0] * (100 - 20 -35 ),
367 self.samfile=pysam.Samfile( "ex4.bam","rb" )
369 def checkRange( self, rnge ):
370 '''compare results from iterator with those from samtools.'''
371 # check if the same reads are returned and in the same order
372 for column in self.samfile.pileup(region=rnge):
373 thiscov = len(column.pileups)
374 refcov = self.mCoverages[self.samfile.getrname(column.tid)][column.pos]
375 self.assertEqual( thiscov, refcov, "wrong coverage at pos %s:%i %i should be %i" % (self.samfile.getrname(column.tid), column.pos, thiscov, refcov))
377 def testIterateAll(self):
378 '''check random access per contig'''
379 self.checkRange( None )
381 def testIteratePerContig(self):
382 '''check random access per contig'''
383 for contig in self.samfile.references:
384 self.checkRange( contig )
386 def testIterateRanges(self):
387 '''check random access per range'''
388 for contig, length in zip(self.samfile.references, self.samfile.lengths):
389 for start in range( 1, length, 90):
390 self.checkRange( "%s:%i-%i" % (contig, start, start + 90) ) # this includes empty ranges
392 def testInverse( self ):
393 '''test the inverse, is point-wise pileup accurate.'''
394 for contig, refseq in self.mCoverages.items():
395 refcolumns = sum(refseq)
396 for pos, refcov in enumerate( refseq ):
397 columns = list(self.samfile.pileup( contig, pos, pos+1) )
399 # if no read, no coverage
400 self.assertEqual( len(columns), refcov, "wrong number of pileup columns returned for position %s:%i, %i should be %i" %(contig,pos,len(columns), refcov) )
402 # one read, all columns of the read are returned
403 self.assertEqual( len(columns), refcolumns, "pileup incomplete at position %i: got %i, expected %i " %\
404 (pos, len(columns), refcolumns))
411 class TestAlignedReadFromBam(unittest.TestCase):
414 self.samfile=pysam.Samfile( "ex3.bam","rb" )
415 self.reads=list(self.samfile.fetch())
417 def testARqname(self):
418 self.assertEqual( self.reads[0].qname, "read_28833_29006_6945", "read name mismatch in read 1: %s != %s" % (self.reads[0].qname, "read_28833_29006_6945") )
419 self.assertEqual( self.reads[1].qname, "read_28701_28881_323b", "read name mismatch in read 2: %s != %s" % (self.reads[1].qname, "read_28701_28881_323b") )
421 def testARflag(self):
422 self.assertEqual( self.reads[0].flag, 99, "flag mismatch in read 1: %s != %s" % (self.reads[0].flag, 99) )
423 self.assertEqual( self.reads[1].flag, 147, "flag mismatch in read 2: %s != %s" % (self.reads[1].flag, 147) )
425 def testARrname(self):
426 self.assertEqual( self.reads[0].rname, 0, "chromosome/target id mismatch in read 1: %s != %s" % (self.reads[0].rname, 0) )
427 self.assertEqual( self.reads[1].rname, 1, "chromosome/target id mismatch in read 2: %s != %s" % (self.reads[1].rname, 1) )
430 self.assertEqual( self.reads[0].pos, 33-1, "mapping position mismatch in read 1: %s != %s" % (self.reads[0].pos, 33-1) )
431 self.assertEqual( self.reads[1].pos, 88-1, "mapping position mismatch in read 2: %s != %s" % (self.reads[1].pos, 88-1) )
433 def testARmapq(self):
434 self.assertEqual( self.reads[0].mapq, 20, "mapping quality mismatch in read 1: %s != %s" % (self.reads[0].mapq, 20) )
435 self.assertEqual( self.reads[1].mapq, 30, "mapping quality mismatch in read 2: %s != %s" % (self.reads[1].mapq, 30) )
437 def testARcigar(self):
438 self.assertEqual( self.reads[0].cigar, [(0, 10), (2, 1), (0, 25)], "read name length mismatch in read 1: %s != %s" % (self.reads[0].cigar, [(0, 10), (2, 1), (0, 25)]) )
439 self.assertEqual( self.reads[1].cigar, [(0, 35)], "read name length mismatch in read 2: %s != %s" % (self.reads[1].cigar, [(0, 35)]) )
441 def testARmrnm(self):
442 self.assertEqual( self.reads[0].mrnm, 0, "mate reference sequence name mismatch in read 1: %s != %s" % (self.reads[0].mrnm, 0) )
443 self.assertEqual( self.reads[1].mrnm, 1, "mate reference sequence name mismatch in read 2: %s != %s" % (self.reads[1].mrnm, 1) )
445 def testARmpos(self):
446 self.assertEqual( self.reads[0].mpos, 200-1, "mate mapping position mismatch in read 1: %s != %s" % (self.reads[0].mpos, 200-1) )
447 self.assertEqual( self.reads[1].mpos, 500-1, "mate mapping position mismatch in read 2: %s != %s" % (self.reads[1].mpos, 500-1) )
449 def testARisize(self):
450 self.assertEqual( self.reads[0].isize, 167, "insert size mismatch in read 1: %s != %s" % (self.reads[0].isize, 167) )
451 self.assertEqual( self.reads[1].isize, 412, "insert size mismatch in read 2: %s != %s" % (self.reads[1].isize, 412) )
454 self.assertEqual( self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 1: %s != %s" % (self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
455 self.assertEqual( self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "sequence size mismatch in read 2: %s != %s" % (self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
456 self.assertEqual( self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 4: %s != %s" % (self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
458 def testARqual(self):
459 self.assertEqual( self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 1: %s != %s" % (self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
460 self.assertEqual( self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "quality string mismatch in read 2: %s != %s" % (self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
461 self.assertEqual( self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 3: %s != %s" % (self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
463 def testARquery(self):
464 self.assertEqual( self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "query mismatch in read 1: %s != %s" % (self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
465 self.assertEqual( self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "query size mismatch in read 2: %s != %s" % (self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
466 self.assertEqual( self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT", "query mismatch in read 4: %s != %s" % (self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT") )
468 def testARqqual(self):
469 self.assertEqual( self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "qquality string mismatch in read 1: %s != %s" % (self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
470 self.assertEqual( self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "qquality string mismatch in read 2: %s != %s" % (self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
471 self.assertEqual( self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22", "qquality string mismatch in read 3: %s != %s" % (self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22") )
473 def testPresentOptionalFields(self):
474 self.assertEqual( self.reads[0].opt('NM'), 1, "optional field mismatch in read 1, NM: %s != %s" % (self.reads[0].opt('NM'), 1) )
475 self.assertEqual( self.reads[0].opt('RG'), 'L1', "optional field mismatch in read 1, RG: %s != %s" % (self.reads[0].opt('RG'), 'L1') )
476 self.assertEqual( self.reads[1].opt('RG'), 'L2', "optional field mismatch in read 2, RG: %s != %s" % (self.reads[1].opt('RG'), 'L2') )
477 self.assertEqual( self.reads[1].opt('MF'), 18, "optional field mismatch in read 2, MF: %s != %s" % (self.reads[1].opt('MF'), 18) )
479 def testPairedBools(self):
480 self.assertEqual( self.reads[0].is_paired, True, "is paired mismatch in read 1: %s != %s" % (self.reads[0].is_paired, True) )
481 self.assertEqual( self.reads[1].is_paired, True, "is paired mismatch in read 2: %s != %s" % (self.reads[1].is_paired, True) )
482 self.assertEqual( self.reads[0].is_proper_pair, True, "is proper pair mismatch in read 1: %s != %s" % (self.reads[0].is_proper_pair, True) )
483 self.assertEqual( self.reads[1].is_proper_pair, True, "is proper pair mismatch in read 2: %s != %s" % (self.reads[1].is_proper_pair, True) )
485 def testTags( self ):
486 self.assertEqual( self.reads[0].tags,
487 [('NM', 1), ('RG', 'L1'),
488 ('PG', 'P1'), ('XT', 'U')] )
489 self.assertEqual( self.reads[1].tags,
490 [('MF', 18), ('RG', 'L2'),
491 ('PG', 'P2'),('XT', 'R') ] )
494 self.assertEqual( self.reads[0].opt("XT"), "U" )
495 self.assertEqual( self.reads[1].opt("XT"), "R" )
497 def testMissingOpt( self ):
498 self.assertRaises( KeyError, self.reads[0].opt, "XP" )
500 def testEmptyOpt( self ):
501 self.assertRaises( KeyError, self.reads[2].opt, "XT" )
506 class TestAlignedReadFromSam(TestAlignedReadFromBam):
509 self.samfile=pysam.Samfile( "ex3.sam","r" )
510 self.reads=list(self.samfile.fetch())
512 # needs to be implemented
513 # class TestAlignedReadFromSamWithoutHeader(TestAlignedReadFromBam):
516 # self.samfile=pysam.Samfile( "ex7.sam","r" )
517 # self.reads=list(self.samfile.fetch())
519 class TestHeaderSam(unittest.TestCase):
521 header = {'SQ': [{'LN': 1575, 'SN': 'chr1'},
522 {'LN': 1584, 'SN': 'chr2'}],
523 'RG': [{'LB': 'SC_1', 'ID': 'L1', 'SM': 'NA12891', 'PU': 'SC_1_10', "CN":"name:with:colon"},
524 {'LB': 'SC_2', 'ID': 'L2', 'SM': 'NA12891', 'PU': 'SC_2_12', "CN":"name:with:colon"}],
525 'PG': [{'ID': 'P1', 'VN': '1.0'}, {'ID': 'P2', 'VN': '1.1'}],
527 'CO' : [ 'this is a comment', 'this is another comment'],
530 def compareHeaders( self, a, b ):
531 '''compare two headers a and b.'''
532 for ak,av in a.iteritems():
533 self.assertTrue( ak in b, "key '%s' not in '%s' " % (ak,b) )
534 self.assertEqual( av, b[ak] )
537 self.samfile=pysam.Samfile( "ex3.sam","r" )
539 def testHeaders(self):
540 self.compareHeaders( self.header, self.samfile.header )
541 self.compareHeaders( self.samfile.header, self.header )
543 def testNameMapping( self ):
544 for x, y in enumerate( ("chr1", "chr2")):
545 tid = self.samfile.gettid( y )
546 ref = self.samfile.getrname( x )
547 self.assertEqual( tid, x )
548 self.assertEqual( ref, y )
550 self.assertEqual( self.samfile.gettid("chr?"), -1 )
551 self.assertRaises( ValueError, self.samfile.getrname, 2 )
556 class TestHeaderBam(TestHeaderSam):
559 self.samfile=pysam.Samfile( "ex3.bam","rb" )
561 class TestUnmappedReads(unittest.TestCase):
564 samfile=pysam.Samfile( "ex5.sam","r" )
565 self.assertEqual( len(list(samfile.fetch( until_eof = True))), 2 )
569 samfile=pysam.Samfile( "ex5.bam","rb" )
570 self.assertEqual( len(list(samfile.fetch( until_eof = True))), 2 )
573 class TestPileupObjects(unittest.TestCase):
576 self.samfile=pysam.Samfile( "ex1.bam","rb" )
578 def testPileupColumn(self):
579 for pcolumn1 in self.samfile.pileup( region="chr1:105" ):
580 if pcolumn1.pos == 104:
581 self.assertEqual( pcolumn1.tid, 0, "chromosome/target id mismatch in position 1: %s != %s" % (pcolumn1.tid, 0) )
582 self.assertEqual( pcolumn1.pos, 105-1, "position mismatch in position 1: %s != %s" % (pcolumn1.pos, 105-1) )
583 self.assertEqual( pcolumn1.n, 2, "# reads mismatch in position 1: %s != %s" % (pcolumn1.n, 2) )
584 for pcolumn2 in self.samfile.pileup( region="chr2:1480" ):
585 if pcolumn2.pos == 1479:
586 self.assertEqual( pcolumn2.tid, 1, "chromosome/target id mismatch in position 1: %s != %s" % (pcolumn2.tid, 1) )
587 self.assertEqual( pcolumn2.pos, 1480-1, "position mismatch in position 1: %s != %s" % (pcolumn2.pos, 1480-1) )
588 self.assertEqual( pcolumn2.n, 12, "# reads mismatch in position 1: %s != %s" % (pcolumn2.n, 12) )
590 def testPileupRead(self):
591 for pcolumn1 in self.samfile.pileup( region="chr1:105" ):
592 if pcolumn1.pos == 104:
593 self.assertEqual( len(pcolumn1.pileups), 2, "# reads aligned to column mismatch in position 1: %s != %s" % (len(pcolumn1.pileups), 2) )
594 # self.assertEqual( pcolumn1.pileups[0] # need to test additional properties here
599 class TestContextManager(unittest.TestCase):
601 def testManager( self ):
602 with pysam.Samfile('ex1.bam', 'rb') as samfile:
604 self.assertEqual( samfile._isOpen(), False )
606 class TestExceptions(unittest.TestCase):
609 self.samfile=pysam.Samfile( "ex1.bam","rb" )
611 def testMissingFile(self):
613 self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.bam", "rb" )
614 self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.sam", "r" )
615 self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.bam", "r" )
616 self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.sam", "rb" )
618 def testBadContig(self):
619 self.assertRaises( ValueError, self.samfile.fetch, "chr88" )
621 def testMeaninglessCrap(self):
622 self.assertRaises( ValueError, self.samfile.fetch, "skljf" )
624 def testBackwardsOrderNewFormat(self):
625 self.assertRaises( ValueError, self.samfile.fetch, 'chr1', 100, 10 )
627 def testBackwardsOrderOldFormat(self):
628 self.assertRaises( ValueError, self.samfile.fetch, region="chr1:100-10")
630 def testOutOfRangeNegativeNewFormat(self):
631 self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, -10 )
632 self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, 0 )
633 self.assertRaises( ValueError, self.samfile.fetch, "chr1", -5, -10 )
635 self.assertRaises( ValueError, self.samfile.count, "chr1", 5, -10 )
636 self.assertRaises( ValueError, self.samfile.count, "chr1", 5, 0 )
637 self.assertRaises( ValueError, self.samfile.count, "chr1", -5, -10 )
639 def testOutOfRangeNegativeOldFormat(self):
640 self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-10" )
641 self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-0" )
642 self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5--10" )
644 self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-10" )
645 self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-0" )
646 self.assertRaises( ValueError, self.samfile.count, region="chr1:-5--10" )
648 def testOutOfRangNewFormat(self):
649 self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999, 99999999999 )
650 self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999, 99999999999 )
652 def testOutOfRangeLargeNewFormat(self):
653 self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
654 self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
656 def testOutOfRangeLargeOldFormat(self):
657 self.assertRaises( ValueError, self.samfile.fetch, "chr1:99999999999999999-999999999999999999" )
658 self.assertRaises( ValueError, self.samfile.count, "chr1:99999999999999999-999999999999999999" )
660 def testZeroToZero(self):
662 self.assertEqual( len(list(self.samfile.fetch('chr1', 0, 0))), 0)
667 class TestWrongFormat(unittest.TestCase):
668 '''test cases for opening files not in bam/sam format.'''
670 def testOpenSamAsBam( self ):
671 self.assertRaises( ValueError, pysam.Samfile, 'ex1.sam', 'rb' )
673 def testOpenBamAsSam( self ):
674 self.assertRaises( ValueError, pysam.Samfile, 'ex1.bam', 'r' )
676 def testOpenFastaAsSam( self ):
677 self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'r' )
679 def testOpenFastaAsBam( self ):
680 self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'rb' )
682 class TestFastaFile(unittest.TestCase):
684 mSequences = { 'chr1' :
685 "CACTAGTGGCTCATTGTAAATGTGTGGTTTAACTCGTCCATGGCCCAGCATTAGGGAGCTGTGGACCCTGCAGCCTGGCTGTGGGGGCCGCAGTGGCTGAGGGGTGCAGAGCCGAGTCACGGGGTTGCCAGCACAGGGGCTTAACCTCTGGTGACTGCCAGAGCTGCTGGCAAGCTAGAGTCCCATTTGGAGCCCCTCTAAGCCGTTCTATTTGTAATGAAAACTATATTTATGCTATTCAGTTCTAAATATAGAAATTGAAACAGCTGTGTTTAGTGCCTTTGTTCAACCCCCTTGCAACAACCTTGAGAACCCCAGGGAATTTGTCAATGTCAGGGAAGGAGCATTTTGTCAGTTACCAAATGTGTTTATTACCAGAGGGATGGAGGGAAGAGGGACGCTGAAGAACTTTGATGCCCTCTTCTTCCAAAGATGAAACGCGTAACTGCGCTCTCATTCACTCCAGCTCCCTGTCACCCAATGGACCTGTGATATCTGGATTCTGGGAAATTCTTCATCCTGGACCCTGAGAGATTCTGCAGCCCAGCTCCAGATTGCTTGTGGTCTGACAGGCTGCAACTGTGAGCCATCACAATGAACAACAGGAAGAAAAGGTCTTTCAAAAGGTGATGTGTGTTCTCATCAACCTCATACACACACATGGTTTAGGGGTATAATACCTCTACATGGCTGATTATGAAAACAATGTTCCCCAGATACCATCCCTGTCTTACTTCCAGCTCCCCAGAGGGAAAGCTTTCAACGCTTCTAGCCATTTCTTTTGGCATTTGCCTTCAGACCCTACACGAATGCGTCTCTACCACAGGGGGCTGCGCGGTTTCCCATCATGAAGCACTGAACTTCCACGTCTCATCTAGGGGAACAGGGAGGTGCACTAATGCGCTCCACGCCCAAGCCCTTCTCACAGTTTCTGCCCCCAGCATGGTTGTACTGGGCAATACATGAGATTATTAGGAAATGCTTTACTGTCATAACTATGAAGAGACTATTGCCAGATGAACCACACATTAATACTATGTTTCTTATCTGCACATTACTACCCTGCAATTAATATAATTGTGTCCATGTACACACGCTGTCCTATGTACTTATCATGACTCTATCCCAAATTCCCAATTACGTCCTATCTTCTTCTTAGGGAAGAACAGCTTAGGTATCAATTTGGTGTTCTGTGTAAAGTCTCAGGGAGCCGTCCGTGTCCTCCCATCTGGCCTCGTCCACACTGGTTCTCTTGAAAGCTTGGGCTGTAATGATGCCCCTTGGCCATCACCCAGTCCCTGCCCCATCTCTTGTAATCTCTCTCCTTTTTGCTGCATCCCTGTCTTCCTCTGTCTTGATTTACTTGTTGTTGGTTTTCTGTTTCTTTGTTTGATTTGGTGGAAGACATAATCCCACGCTTCCTATGGAAAGGTTGTTGGGAGATTTTTAATGATTCCTCAATGTTAAAATGTCTATTTTTGTCTTGACACCCAACTAATATTTGTCTGAGCAAAACAGTCTAGATGAGAGAGAACTTCCCTGGAGGTCTGATGGCGTTTCTCCCTCGTCTTCTTA",
687 "TTCAAATGAACTTCTGTAATTGAAAAATTCATTTAAGAAATTACAAAATATAGTTGAAAGCTCTAACAATAGACTAAACCAAGCAGAAGAAAGAGGTTCAGAACTTGAAGACAAGTCTCTTATGAATTAACCCAGTCAGACAAAAATAAAGAAAAAAATTTTAAAAATGAACAGAGCTTTCAAGAAGTATGAGATTATGTAAAGTAACTGAACCTATGAGTCACAGGTATTCCTGAGGAAAAAGAAAAAGTGAGAAGTTTGGAAAAACTATTTGAGGAAGTAATTGGGGAAAACCTCTTTAGTCTTGCTAGAGATTTAGACATCTAAATGAAAGAGGCTCAAAGAATGCCAGGAAGATACATTGCAAGACAGACTTCATCAAGATATGTAGTCATCAGACTATCTAAAGTCAACATGAAGGAAAAAAATTCTAAAATCAGCAAGAGAAAAGCATACAGTCATCTATAAAGGAAATCCCATCAGAATAACAATGGGCTTCTCAGCAGAAACCTTACAAGCCAGAAGAGATTGGATCTAATTTTTGGACTTCTTAAAGAAAAAAAAACCTGTCAAACACGAATGTTATGCCCTGCTAAACTAAGCATCATAAATGAAGGGGAAATAAAGTCAAGTCTTTCCTGACAAGCAAATGCTAAGATAATTCATCATCACTAAACCAGTCCTATAAGAAATGCTCAAAAGAATTGTAAAAGTCAAAATTAAAGTTCAATACTCACCATCATAAATACACACAAAAGTACAAAACTCACAGGTTTTATAAAACAATTGAGACTACAGAGCAACTAGGTAAAAAATTAACATTACAACAGGAACAAAACCTCATATATCAATATTAACTTTGAATAAAAAGGGATTAAATTCCCCCACTTAAGAGATATAGATTGGCAGAACAGATTTAAAAACATGAACTAACTATATGCTGTTTACAAGAAACTCATTAATAAAGACATGAGTTCAGGTAAAGGGGTGGAAAAAGATGTTCTACGCAAACAGAAACCAAATGAGAGAAGGAGTAGCTATACTTATATCAGATAAAGCACACTTTAAATCAACAACAGTAAAATAAAACAAAGGAGGTCATCATACAATGATAAAAAGATCAATTCAGCAAGAAGATATAACCATCCTACTAAATACATATGCACCTAACACAAGACTACCCAGATTCATAAAACAAATACTACTAGACCTAAGAGGGATGAGAAATTACCTAATTGGTACAATGTACAATATTCTGATGATGGTTACACTAAAAGCCCATACTTTACTGCTACTCAATATATCCATGTAACAAATCTGCGCTTGTACTTCTAAATCTATAAAAAAATTAAAATTTAACAAAAGTAAATAAAACACATAGCTAAAACTAAAAAAGCAAAAACAAAAACTATGCTAAGTATTGGTAAAGATGTGGGGAAAAAAGTAAACTCTCAAATATTGCTAGTGGGAGTATAAATTGTTTTCCACTTTGGAAAACAATTTGGTAATTTCGTTTTTTTTTTTTTCTTTTCTCTTTTTTTTTTTTTTTTTTTTGCATGCCAGAAAAAAATATTTACAGTAACT",
691 self.file=pysam.Fastafile( "ex1.fa" )
694 for id, seq in self.mSequences.items():
695 self.assertEqual( seq, self.file.fetch( id ) )
696 for x in range( 0, len(seq), 10):
697 self.assertEqual( seq[x:x+10], self.file.fetch( id, x, x+10) )
699 self.assertEqual( seq[x:], self.file.fetch( id, x) )
701 self.assertEqual( seq[:x], self.file.fetch( id, None, x) )
704 # unknown sequence returns ""
705 self.assertEqual( "", self.file.fetch("chr12") )
707 def testOutOfRangeAccess( self ):
708 '''test out of range access.'''
709 # out of range access returns an empty string
710 for contig, s in self.mSequences.iteritems():
711 self.assertEqual( self.file.fetch( contig, len(s), len(s)+1), "" )
713 self.assertEqual( self.file.fetch( "chr3", 0 , 100), "" )
715 def testFetchErrors( self ):
716 self.assertRaises( ValueError, self.file.fetch )
717 self.assertRaises( ValueError, self.file.fetch, "chr1", -1, 10 )
718 self.assertRaises( ValueError, self.file.fetch, "chr1", 20, 10 )
720 def testLength( self ):
721 self.assertEqual( len(self.file), 2 )
726 class TestAlignedRead(unittest.TestCase):
727 '''tests to check if aligned read can be constructed
731 def checkFieldEqual( self, read1, read2, exclude = []):
732 '''check if two reads are equal by comparing each field.'''
734 for x in ("qname", "seq", "flag",
735 "rname", "pos", "mapq", "cigar",
736 "mrnm", "mpos", "isize", "qual",
737 "is_paired", "is_proper_pair",
738 "is_unmapped", "mate_is_unmapped",
739 "is_reverse", "mate_is_reverse",
740 "is_read1", "is_read2",
741 "is_secondary", "is_qcfail",
742 "is_duplicate", "bin"):
743 if x in exclude: continue
744 self.assertEqual( getattr(read1, x), getattr(read2,x), "attribute mismatch for %s: %s != %s" %
745 (x, getattr(read1, x), getattr(read2,x)))
747 def testEmpty( self ):
748 a = pysam.AlignedRead()
749 self.assertEqual( a.qname, None )
750 self.assertEqual( a.seq, None )
751 self.assertEqual( a.qual, None )
752 self.assertEqual( a.flag, 0 )
753 self.assertEqual( a.rname, 0 )
754 self.assertEqual( a.mapq, 0 )
755 self.assertEqual( a.cigar, None )
756 self.assertEqual( a.tags, None )
757 self.assertEqual( a.mrnm, 0 )
758 self.assertEqual( a.mpos, 0 )
759 self.assertEqual( a.isize, 0 )
761 def buildRead( self ):
762 '''build an example read.'''
764 a = pysam.AlignedRead()
765 a.qname = "read_12345"
771 a.cigar = ( (0,10), (2,1), (0,25) )
779 def testUpdate( self ):
780 '''check if updating fields affects other variable length data
787 self.checkFieldEqual( a, b, "qname" )
788 b.qname = "read_12345678"
789 self.checkFieldEqual( a, b, "qname" )
790 b.qname = "read_12345"
791 self.checkFieldEqual( a, b)
794 b.cigar = ( (0,10), )
795 self.checkFieldEqual( a, b, "cigar" )
796 b.cigar = ( (0,10), (2,1), (0,25), (2,1), (0,25) )
797 self.checkFieldEqual( a, b, "cigar" )
798 b.cigar = ( (0,10), (2,1), (0,25) )
799 self.checkFieldEqual( a, b)
803 self.checkFieldEqual( a, b, ("seq", "qual") )
805 self.checkFieldEqual( a, b, ("seq", "qual") )
807 self.checkFieldEqual( a, b, ("qual",))
814 "is_paired", "is_proper_pair",
815 "is_unmapped", "mate_is_unmapped",
816 "is_reverse", "mate_is_reverse",
817 "is_read1", "is_read2",
818 "is_secondary", "is_qcfail",
820 setattr( b, x, True )
821 self.assertEqual( getattr(b, x), True )
822 self.checkFieldEqual( a, b, ("flag", x,) )
823 setattr( b, x, False )
824 self.assertEqual( getattr(b, x), False )
825 self.checkFieldEqual( a, b )
827 def testLargeRead( self ):
828 '''build an example read.'''
830 a = pysam.AlignedRead()
831 a.qname = "read_12345"
837 a.cigar = ( (0,10), (2,1), (0,25) )
845 def testTagParsing( self ):
846 '''test for tag parsing
848 see http://groups.google.com/group/pysam-user-group/browse_thread/thread/67ca204059ea465a
850 samfile=pysam.Samfile( "ex8.bam","rb" )
852 for entry in samfile:
854 entry.tags = entry.tags
856 self.assertEqual( after, before )
858 class TestDeNovoConstruction(unittest.TestCase):
859 '''check BAM/SAM file construction using ex3.sam
861 (note these are +1 coordinates):
863 read_28833_29006_6945 99 chr1 33 20 10M1D25M = 200 167 AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG <<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<< NM:i:1 RG:Z:L1
864 read_28701_28881_323b 147 chr2 88 30 35M = 500 412 ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA <<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<< MF:i:18 RG:Z:L2
867 header = { 'HD': {'VN': '1.0'},
868 'SQ': [{'LN': 1575, 'SN': 'chr1'},
869 {'LN': 1584, 'SN': 'chr2'}], }
874 def checkFieldEqual( self, read1, read2, exclude = []):
875 '''check if two reads are equal by comparing each field.'''
877 for x in ("qname", "seq", "flag",
878 "rname", "pos", "mapq", "cigar",
879 "mrnm", "mpos", "isize", "qual",
881 "is_paired", "is_proper_pair",
882 "is_unmapped", "mate_is_unmapped",
883 "is_reverse", "mate_is_reverse",
884 "is_read1", "is_read2",
885 "is_secondary", "is_qcfail",
887 if x in exclude: continue
888 self.assertEqual( getattr(read1, x), getattr(read2,x), "attribute mismatch for %s: %s != %s" %
889 (x, getattr(read1, x), getattr(read2,x)))
894 a = pysam.AlignedRead()
895 a.qname = "read_28833_29006_6945"
896 a.seq="AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG"
901 a.cigar = ( (0,10), (2,1), (0,25) )
905 a.qual="<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<"
906 a.tags = ( ("NM", 1),
909 b = pysam.AlignedRead()
910 b.qname = "read_28701_28881_323b"
911 b.seq="ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA"
916 b.cigar = ( (0,35), )
920 b.qual="<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<"
921 b.tags = ( ("MF", 18),
926 def testSAMWholeFile( self ):
928 tmpfilename = "tmp_%i.sam" % id(self)
930 outfile = pysam.Samfile( tmpfilename, "wh", header = self.header )
932 for x in self.reads: outfile.write( x )
935 self.assertTrue( checkBinaryEqual( tmpfilename, self.samfile ),
936 "mismatch when construction SAM file, see %s %s" % (tmpfilename, self.samfile))
938 os.unlink( tmpfilename )
940 def testBAMPerRead( self ):
941 '''check if individual reads are binary equal.'''
942 infile = pysam.Samfile( self.bamfile, "rb")
944 others = list(infile)
945 for denovo, other in zip( others, self.reads):
946 self.checkFieldEqual( other, denovo )
947 self.assertEqual( other.compare( denovo ), 0 )
949 def testSAMPerRead( self ):
950 '''check if individual reads are binary equal.'''
951 infile = pysam.Samfile( self.samfile, "r")
953 others = list(infile)
954 for denovo, other in zip( others, self.reads):
955 self.checkFieldEqual( other, denovo )
956 self.assertEqual( other.compare( denovo), 0 )
958 def testBAMWholeFile( self ):
960 tmpfilename = "tmp_%i.bam" % id(self)
962 outfile = pysam.Samfile( tmpfilename, "wb", header = self.header )
964 for x in self.reads: outfile.write( x )
967 self.assertTrue( checkBinaryEqual( tmpfilename, self.bamfile ),
968 "mismatch when construction BAM file, see %s %s" % (tmpfilename, self.bamfile))
970 os.unlink( tmpfilename )
973 class TestDoubleFetch(unittest.TestCase):
974 '''check if two iterators on the same bamfile are independent.'''
976 def testDoubleFetch( self ):
978 samfile1 = pysam.Samfile('ex1.bam', 'rb')
980 for a,b in zip(samfile1.fetch(), samfile1.fetch()):
981 self.assertEqual( a.compare( b ), 0 )
983 def testDoubleFetchWithRegion( self ):
985 samfile1 = pysam.Samfile('ex1.bam', 'rb')
986 chr, start, stop = 'chr1', 200, 3000000
987 self.assertTrue(len(list(samfile1.fetch ( chr, start, stop))) > 0) #just making sure the test has something to catch
989 for a,b in zip(samfile1.fetch( chr, start, stop), samfile1.fetch( chr, start, stop)):
990 self.assertEqual( a.compare( b ), 0 )
992 def testDoubleFetchUntilEOF( self ):
994 samfile1 = pysam.Samfile('ex1.bam', 'rb')
996 for a,b in zip(samfile1.fetch( until_eof = True),
997 samfile1.fetch( until_eof = True )):
998 self.assertEqual( a.compare( b), 0 )
1000 class TestRemoteFileFTP(unittest.TestCase):
1001 '''test remote access.
1005 # Need to find an ftp server without password on standard
1008 url = "ftp://ftp.sanger.ac.uk/pub/rd/humanSequences/CV.bam"
1011 def testFTPView( self ):
1013 result = pysam.view( self.url, self.region )
1014 self.assertEqual( len(result), 36 )
1016 def testFTPFetch( self ):
1018 samfile = pysam.Samfile(self.url, "rb")
1019 result = list(samfile.fetch( region = self.region ))
1020 self.assertEqual( len(result), 36 )
1022 class TestRemoteFileHTTP( unittest.TestCase):
1024 url = "http://genserv.anat.ox.ac.uk/downloads/pysam/test/ex1.bam"
1025 region = "chr1:1-1000"
1028 def testView( self ):
1029 samfile_local = pysam.Samfile(self.local, "rb")
1030 ref = list(samfile_local.fetch( region = self.region ))
1032 result = pysam.view( self.url, self.region )
1033 self.assertEqual( len(result), len(ref) )
1035 def testFetch( self ):
1036 samfile = pysam.Samfile(self.url, "rb")
1037 result = list(samfile.fetch( region = self.region ))
1038 samfile_local = pysam.Samfile(self.local, "rb")
1039 ref = list(samfile_local.fetch( region = self.region ))
1041 self.assertEqual( len(ref), len(result) )
1042 for x, y in zip(result, ref):
1043 self.assertEqual( x.compare( y ), 0 )
1045 def testFetchAll( self ):
1046 samfile = pysam.Samfile(self.url, "rb")
1047 result = list(samfile.fetch())
1048 samfile_local = pysam.Samfile(self.local, "rb")
1049 ref = list(samfile_local.fetch() )
1051 self.assertEqual( len(ref), len(result) )
1052 for x, y in zip(result, ref):
1053 self.assertEqual( x.compare( y ), 0 )
1055 class TestLargeOptValues( unittest.TestCase ):
1057 ints = ( 65536, 214748, 2147484, 2147483647 )
1058 floats = ( 65536.0, 214748.0, 2147484.0 )
1060 def check( self, samfile ):
1063 for exp in self.ints:
1066 self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1068 for exp in [ -x for x in self.ints ]:
1071 self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1073 for exp in self.floats:
1076 self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1078 for exp in [ -x for x in self.floats ]:
1081 self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1083 def testSAM( self ):
1084 samfile = pysam.Samfile("ex10.sam", "r")
1085 self.check( samfile )
1087 def testBAM( self ):
1088 samfile = pysam.Samfile("ex10.bam", "rb")
1089 self.check( samfile )
1091 class TestSNPCalls( unittest.TestCase ):
1092 '''test pysam SNP calling ability.'''
1094 def checkEqual( self, a, b ):
1095 for x in ("reference_base",
1098 "consensus_quality",
1102 self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" %
1103 (x, getattr(a, x), getattr(b,x), str(a), str(b)))
1105 def testAllPositionsViaIterator( self ):
1106 samfile = pysam.Samfile( "ex1.bam", "rb")
1107 fastafile = pysam.Fastafile( "ex1.fa" )
1109 refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base != "*"]
1110 except pysam.SamtoolsError:
1113 i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1114 calls = list(pysam.IteratorSNPCalls(i))
1115 for x,y in zip( refs, calls ):
1116 self.checkEqual( x, y )
1118 def testPerPositionViaIterator( self ):
1119 # test pileup for each position. This is a slow operation
1120 # so this test is disabled
1122 samfile = pysam.Samfile( "ex1.bam", "rb")
1123 fastafile = pysam.Fastafile( "ex1.fa" )
1124 for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1125 if x.reference_base == "*": continue
1126 i = samfile.pileup( x.chromosome, x.pos, x.pos+1,
1127 fastafile = fastafile,
1128 stepper = "samtools" )
1129 z = [ zz for zz in pysam.IteratorSamtools(i) if zz.pos == x.pos ]
1130 self.assertEqual( len(z), 1 )
1131 self.checkEqual( x, z[0] )
1133 def testPerPositionViaCaller( self ):
1134 # test pileup for each position. This is a fast operation
1135 samfile = pysam.Samfile( "ex1.bam", "rb")
1136 fastafile = pysam.Fastafile( "ex1.fa" )
1137 i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1138 caller = pysam.SNPCaller( i )
1140 for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1141 if x.reference_base == "*": continue
1142 call = caller.call( x.chromosome, x.pos )
1143 self.checkEqual( x, call )
1145 class TestIndelCalls( unittest.TestCase ):
1146 '''test pysam indel calling.'''
1148 def checkEqual( self, a, b ):
1152 "consensus_quality",
1161 if b.genotype == "*/*" and x == "second_allele":
1162 # ignore test for second allele (positions chr2:439 and chr2:1512)
1164 self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" %
1165 (x, getattr(a, x), getattr(b,x), str(a), str(b)))
1167 def testAllPositionsViaIterator( self ):
1169 samfile = pysam.Samfile( "ex1.bam", "rb")
1170 fastafile = pysam.Fastafile( "ex1.fa" )
1172 refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base == "*"]
1173 except pysam.SamtoolsError:
1176 i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1177 calls = [ x for x in list(pysam.IteratorIndelCalls(i)) if x != None ]
1178 for x,y in zip( refs, calls ):
1179 self.checkEqual( x, y )
1181 def testPerPositionViaCaller( self ):
1182 # test pileup for each position. This is a fast operation
1183 samfile = pysam.Samfile( "ex1.bam", "rb")
1184 fastafile = pysam.Fastafile( "ex1.fa" )
1185 i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1186 caller = pysam.IndelCaller( i )
1188 for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1189 if x.reference_base != "*": continue
1190 call = caller.call( x.chromosome, x.pos )
1191 self.checkEqual( x, call )
1193 class TestLogging( unittest.TestCase ):
1194 '''test around bug issue 42,
1196 failed in versions < 0.4
1199 def check( self, bamfile, log ):
1202 logger = logging.getLogger('franklin')
1203 logger.setLevel(logging.INFO)
1204 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
1205 log_hand = logging.FileHandler('log.txt')
1206 log_hand.setFormatter(formatter)
1207 logger.addHandler(log_hand)
1209 bam = pysam.Samfile(bamfile, 'rb')
1211 self.assert_( True )
1213 def testFail1( self ):
1214 self.check( "ex9_fail.bam", False )
1215 self.check( "ex9_fail.bam", True )
1217 def testNoFail1( self ):
1218 self.check( "ex9_nofail.bam", False )
1219 self.check( "ex9_nofail.bam", True )
1221 def testNoFail2( self ):
1222 self.check( "ex9_nofail.bam", True )
1223 self.check( "ex9_nofail.bam", True )
1226 # 1. finish testing all properties within pileup objects
1227 # 2. check exceptions and bad input problems (missing files, optional fields that aren't present, etc...)
1228 # 3. check: presence of sequence
1230 class TestSamfileUtilityFunctions( unittest.TestCase ):
1232 def testCount( self ):
1234 samfile = pysam.Samfile( "ex1.bam", "rb" )
1236 for contig in ("chr1", "chr2" ):
1237 for start in xrange( 0, 2000, 100 ):
1239 self.assertEqual( len( list( samfile.fetch( contig, start, end ) ) ),
1240 samfile.count( contig, start, end ) )
1242 # test empty intervals
1243 self.assertEqual( len( list( samfile.fetch( contig, start, start ) ) ),
1244 samfile.count( contig, start, start ) )
1246 # test half empty intervals
1247 self.assertEqual( len( list( samfile.fetch( contig, start ) ) ),
1248 samfile.count( contig, start ) )
1250 def testMate( self ):
1251 '''test mate access.'''
1253 readnames = [ x.split("\t")[0] for x in open( "ex1.sam", "rb" ).readlines() ]
1254 counts = collections.defaultdict( int )
1255 for x in readnames: counts[x] += 1
1257 samfile = pysam.Samfile( "ex1.bam", "rb" )
1258 for read in samfile.fetch():
1259 if not read.is_paired:
1260 self.assertRaises( ValueError, samfile.mate, read )
1261 elif read.mate_is_unmapped:
1262 self.assertRaises( ValueError, samfile.mate, read )
1264 if counts[read.qname] == 1:
1265 self.assertRaises( ValueError, samfile.mate, read )
1267 mate = samfile.mate( read )
1268 self.assertEqual( read.qname, mate.qname )
1269 self.assertEqual( read.is_read1, mate.is_read2 )
1270 self.assertEqual( read.is_read2, mate.is_read1 )
1271 self.assertEqual( read.pos, mate.mpos )
1272 self.assertEqual( read.mpos, mate.pos )
1274 class TestSamtoolsProxy( unittest.TestCase ):
1275 '''tests for sanity checking access to samtools functions.'''
1277 def testIndex( self ):
1278 self.assertRaises( IOError, pysam.index, "missing_file" )
1280 def testView( self ):
1281 # note that view still echos "open: No such file or directory"
1282 self.assertRaises( pysam.SamtoolsError, pysam.view, "missing_file" )
1284 def testSort( self ):
1285 self.assertRaises( pysam.SamtoolsError, pysam.sort, "missing_file" )
1287 class TestSamfileIndex( unittest.TestCase):
1289 def testIndex( self ):
1290 samfile = pysam.Samfile( "ex1.bam", "rb" )
1291 index = pysam.IndexedReads( samfile )
1294 reads = collections.defaultdict( int )
1296 for read in samfile: reads[read.qname] += 1
1298 for qname, counts in reads.iteritems():
1299 found = list(index.find( qname ))
1300 self.assertEqual( len(found), counts )
1301 for x in found: self.assertEqual( x.qname, qname )
1304 if __name__ == "__main__":
1306 print "building data files"
1307 subprocess.call( "make", shell=True)
1308 print "starting tests"