2 '''unit testing code for pysam.
4 Execute in the :file:`tests` directory as it requires the Makefile
5 and data files located there.
11 import itertools, collections
17 WORKDIR="pysam_test_work"
19 def checkBinaryEqual( filename1, filename2 ):
20 '''return true if the two files are binary equal.'''
21 if os.path.getsize( filename1 ) != os.path.getsize( filename2 ):
24 infile1 = open(filename1, "rb")
25 infile2 = open(filename2, "rb")
27 def chariter( infile ):
34 for c1,c2 in itertools.izip( chariter( infile1), chariter( infile2) ):
43 def runSamtools( cmd ):
44 '''run a samtools command'''
47 retcode = subprocess.call(cmd, shell=True)
49 print >>sys.stderr, "Child was terminated by signal", -retcode
51 print >>sys.stderr, "Execution failed:", e
53 def getSamtoolsVersion():
54 '''return samtools version'''
56 pipe = subprocess.Popen(SAMTOOLS, shell=True, stderr=subprocess.PIPE).stderr
57 lines = "".join(pipe.readlines())
58 return re.search( "Version:\s+(\S+)", lines).groups()[0]
60 class BinaryTest(unittest.TestCase):
61 '''test samtools command line commands and compare
62 against pysam commands.
64 Tests fail, if the output is not binary identical.
69 # a list of commands to test
74 ("ex1.view", "view ex1.bam > ex1.view"),
75 ("pysam_ex1.view", (pysam.view, "ex1.bam" ) ),
79 ("ex1.view", "view -bT ex1.fa -o ex1.view2 ex1.sam"),
80 # note that -o ex1.view2 throws exception.
81 ("pysam_ex1.view", (pysam.view, "-bT ex1.fa -oex1.view2 ex1.sam" ) ),
85 ( "ex1.sort.bam", "sort ex1.bam ex1.sort" ),
86 ( "pysam_ex1.sort.bam", (pysam.sort, "ex1.bam pysam_ex1.sort" ) ),
90 ("ex1.pileup", "mpileup ex1.bam > ex1.pileup" ),
91 ("pysam_ex1.mpileup", (pysam.mpileup, "ex1.bam" ) ),
95 ("ex1.depth", "depth ex1.bam > ex1.depth" ),
96 ("pysam_ex1.depth", (pysam.depth, "ex1.bam" ) ),
100 ("ex1.fa.fai", "faidx ex1.fa"),
101 ("pysam_ex1.fa.fai", (pysam.faidx, "ex1.fa") ),
105 ("ex1.bam.bai", "index ex1.bam" ),
106 ("pysam_ex1.bam.bai", (pysam.index, "pysam_ex1.bam" ) ),
110 ("ex1.idxstats", "idxstats ex1.bam > ex1.idxstats" ),
111 ("pysam_ex1.idxstats", (pysam.idxstats, "pysam_ex1.bam" ) ),
115 ("ex1.fixmate", "fixmate ex1.bam ex1.fixmate" ),
116 ("pysam_ex1.fixmate", (pysam.fixmate, "pysam_ex1.bam pysam_ex1.fixmate") ),
120 ("ex1.flagstat", "flagstat ex1.bam > ex1.flagstat" ),
121 ("pysam_ex1.flagstat", (pysam.flagstat, "pysam_ex1.bam") ),
125 ("ex1.calmd", "calmd ex1.bam ex1.fa > ex1.calmd" ),
126 ("pysam_ex1.calmd", (pysam.calmd, "pysam_ex1.bam ex1.fa") ),
130 ("ex1.merge", "merge -f ex1.merge ex1.bam ex1.bam" ),
131 # -f option does not work - following command will cause the subsequent
133 ("pysam_ex1.merge", (pysam.merge, "pysam_ex1.merge pysam_ex1.bam pysam_ex1.bam") ),
137 ("ex1.rmdup", "rmdup ex1.bam ex1.rmdup" ),
138 ("pysam_ex1.rmdup", (pysam.rmdup, "pysam_ex1.bam pysam_ex1.rmdup" )),
142 ( "ex1.reheader", "reheader ex1.bam ex1.bam > ex1.reheader"),
143 ( "pysam_ex1.reheader", (pysam.reheader, "ex1.bam ex1.bam" ) ),
147 ( "ex1.cat", "cat ex1.bam ex1.bam > ex1.cat"),
148 ( "pysam_ex1.cat", (pysam.cat, "ex1.bam ex1.bam" ) ),
152 ("ex1.targetcut", "targetcut ex1.bam > ex1.targetcut" ),
153 ("pysam_ex1.targetcut", (pysam.targetcut, "pysam_ex1.bam") ),
157 ("ex1.phase", "phase ex1.bam > ex1.phase" ),
158 ("pysam_ex1.phase", (pysam.phase, "pysam_ex1.bam") ),
162 ("ex1.bam", "import ex1.fa.fai ex1.sam.gz ex1.bam" ),
163 ("pysam_ex1.bam", (pysam.samimport, "ex1.fa.fai ex1.sam.gz pysam_ex1.bam") ),
167 ("ex1.bam2fq", "bam2fq ex1.bam > ex1.bam2fq" ),
168 ("pysam_ex1.bam2fq", (pysam.bam2fq, "pysam_ex1.bam") ),
172 # some tests depend on others. The order specifies in which order
173 # the samtools commands are executed.
174 # The first three (faidx, import, index) need to be in that order,
175 # the rest is arbitrary.
176 order = ('faidx', 'import', 'index',
177 # 'pileup1', 'pileup2', deprecated
178 # 'glfview', deprecated
199 For setup, all commands will be run before the first test is
200 executed. Individual tests will then just compare the output
203 if BinaryTest.first_time:
205 # remove previous files
206 if os.path.exists( WORKDIR ):
207 shutil.rmtree( WORKDIR )
209 # copy the source files to WORKDIR
210 os.makedirs( WORKDIR )
212 shutil.copy( "ex1.fa", os.path.join( WORKDIR, "pysam_ex1.fa" ) )
213 shutil.copy( "ex1.fa", os.path.join( WORKDIR, "ex1.fa" ) )
214 shutil.copy( "ex1.sam.gz", os.path.join( WORKDIR, "ex1.sam.gz" ) )
215 shutil.copy( "ex1.sam", os.path.join( WORKDIR, "ex1.sam" ) )
218 savedir = os.getcwd()
221 for label in self.order:
222 command = self.commands[label]
223 samtools_target, samtools_command = command[0]
225 pysam_target, pysam_command = command[1]
226 except ValueError, msg:
227 raise ValueError( "error while setting up %s=%s: %s" %\
228 (label, command, msg) )
229 runSamtools( " ".join( (SAMTOOLS, samtools_command )))
230 pysam_method, pysam_options = pysam_command
232 output = pysam_method( *pysam_options.split(" "), raw=True)
233 except pysam.SamtoolsError, msg:
234 raise pysam.SamtoolsError( "error while executing %s: options=%s: msg=%s" %\
235 (label, pysam_options, msg) )
236 if ">" in samtools_command:
237 outfile = open( pysam_target, "wb" )
238 for line in output: outfile.write( line )
242 BinaryTest.first_time = False
246 samtools_version = getSamtoolsVersion()
250 # patch - remove any of the alpha/beta suffixes, i.e., 0.1.12a -> 0.1.12
251 if s.count('-') > 0: s = s[0:s.find('-')]
252 return re.sub( "[^0-9.]", "", s )
254 if _r(samtools_version) != _r( pysam.__samtools_version__):
255 raise ValueError("versions of pysam/samtools and samtools differ: %s != %s" % \
256 (pysam.__samtools_version__,
259 def checkCommand( self, command ):
261 samtools_target, pysam_target = self.commands[command][0][0], self.commands[command][1][0]
262 samtools_target = os.path.join( WORKDIR, samtools_target )
263 pysam_target = os.path.join( WORKDIR, pysam_target )
264 self.assertTrue( checkBinaryEqual( samtools_target, pysam_target ),
265 "%s failed: files %s and %s are not the same" % (command, samtools_target, pysam_target) )
267 def testImport( self ):
268 self.checkCommand( "import" )
270 def testIndex( self ):
271 self.checkCommand( "index" )
273 def testSort( self ):
274 self.checkCommand( "sort" )
276 def testMpileup( self ):
277 self.checkCommand( "mpileup" )
279 def testDepth( self ):
280 self.checkCommand( "depth" )
282 def testIdxstats( self ):
283 self.checkCommand( "idxstats" )
285 def testFixmate( self ):
286 self.checkCommand( "fixmate" )
288 def testFlagstat( self ):
289 self.checkCommand( "flagstat" )
291 def testMerge( self ):
292 self.checkCommand( "merge" )
294 def testRmdup( self ):
295 self.checkCommand( "rmdup" )
297 def testReheader( self ):
298 self.checkCommand( "reheader" )
301 self.checkCommand( "cat" )
303 def testTargetcut( self ):
304 self.checkCommand( "targetcut" )
306 def testPhase( self ):
307 self.checkCommand( "phase" )
309 def testBam2fq( self ):
310 self.checkCommand( "bam2fq" )
312 # def testPileup1( self ):
313 # self.checkCommand( "pileup1" )
315 # def testPileup2( self ):
316 # self.checkCommand( "pileup2" )
319 # def testGLFView( self ):
320 # self.checkCommand( "glfview" )
322 def testView( self ):
323 self.checkCommand( "view" )
325 def testEmptyIndex( self ):
326 self.assertRaises( IOError, pysam.index, "exdoesntexist.bam" )
329 if os.path.exists( WORKDIR ):
330 shutil.rmtree( WORKDIR )
332 class IOTest(unittest.TestCase):
333 '''check if reading samfile and writing a samfile are consistent.'''
335 def checkEcho( self, input_filename, reference_filename,
337 input_mode, output_mode, use_template = True ):
338 '''iterate through *input_filename* writing to *output_filename* and
339 comparing the output to *reference_filename*.
341 The files are opened according to the *input_mode* and *output_mode*.
343 If *use_template* is set, the header is copied from infile using the
344 template mechanism, otherwise target names and lengths are passed
349 infile = pysam.Samfile( input_filename, input_mode )
351 outfile = pysam.Samfile( output_filename, output_mode, template = infile )
353 outfile = pysam.Samfile( output_filename, output_mode,
354 referencenames = infile.references,
355 referencelengths = infile.lengths,
356 add_sq_text = False )
358 iter = infile.fetch()
359 for x in iter: outfile.write( x )
363 self.assertTrue( checkBinaryEqual( reference_filename, output_filename),
364 "files %s and %s are not the same" % (reference_filename, output_filename) )
366 def testReadWriteBam( self ):
368 input_filename = "ex1.bam"
369 output_filename = "pysam_ex1.bam"
370 reference_filename = "ex1.bam"
372 self.checkEcho( input_filename, reference_filename, output_filename,
375 def testReadWriteBamWithTargetNames( self ):
377 input_filename = "ex1.bam"
378 output_filename = "pysam_ex1.bam"
379 reference_filename = "ex1.bam"
381 self.checkEcho( input_filename, reference_filename, output_filename,
382 "rb", "wb", use_template = False )
384 def testReadWriteSamWithHeader( self ):
386 input_filename = "ex2.sam"
387 output_filename = "pysam_ex2.sam"
388 reference_filename = "ex2.sam"
390 self.checkEcho( input_filename, reference_filename, output_filename,
393 def testReadWriteSamWithoutHeader( self ):
395 input_filename = "ex2.sam"
396 output_filename = "pysam_ex2.sam"
397 reference_filename = "ex1.sam"
399 self.checkEcho( input_filename, reference_filename, output_filename,
402 def testReadSamWithoutHeaderWriteSamWithoutHeader( self ):
404 input_filename = "ex1.sam"
405 output_filename = "pysam_ex1.sam"
406 reference_filename = "ex1.sam"
408 # disabled - reading from a samfile without header
409 # is not implemented.
411 # self.checkEcho( input_filename, reference_filename, output_filename,
414 def testFetchFromClosedFile( self ):
416 samfile = pysam.Samfile( "ex1.bam", "rb" )
418 self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
420 def testClosedFile( self ):
421 '''test that access to a closed samfile raises ValueError.'''
423 samfile = pysam.Samfile( "ex1.bam", "rb" )
425 self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
426 self.assertRaises( ValueError, samfile.pileup, 'chr1', 100, 120)
427 self.assertRaises( ValueError, samfile.getrname, 0 )
428 self.assertRaises( ValueError, samfile.tell )
429 self.assertRaises( ValueError, samfile.seek, 0 )
430 self.assertRaises( ValueError, getattr, samfile, "nreferences" )
431 self.assertRaises( ValueError, getattr, samfile, "references" )
432 self.assertRaises( ValueError, getattr, samfile, "lengths" )
433 self.assertRaises( ValueError, getattr, samfile, "text" )
434 self.assertRaises( ValueError, getattr, samfile, "header" )
436 # write on closed file
437 self.assertEqual( 0, samfile.write(None) )
439 def testAutoDetection( self ):
440 '''test if autodetection works.'''
442 samfile = pysam.Samfile( "ex3.sam" )
443 self.assertRaises( ValueError, samfile.fetch, 'chr1' )
446 samfile = pysam.Samfile( "ex3.bam" )
447 samfile.fetch('chr1')
450 def testReadingFromSamFileWithoutHeader( self ):
451 '''read from samfile without header.
453 samfile = pysam.Samfile( "ex7.sam" )
454 self.assertRaises( NotImplementedError, samfile.__iter__ )
456 def testReadingFromFileWithoutIndex( self ):
457 '''read from bam file without index.'''
459 assert not os.path.exists( "ex2.bam.bai" )
460 samfile = pysam.Samfile( "ex2.bam", "rb" )
461 self.assertRaises( ValueError, samfile.fetch )
462 self.assertEqual( len(list( samfile.fetch(until_eof = True) )), 3270 )
464 def testReadingUniversalFileMode( self ):
465 '''read from samfile without header.
468 input_filename = "ex2.sam"
469 output_filename = "pysam_ex2.sam"
470 reference_filename = "ex1.sam"
472 self.checkEcho( input_filename, reference_filename, output_filename,
475 class TestFloatTagBug( unittest.TestCase ):
478 def testFloatTagBug( self ):
479 '''a float tag before another exposed a parsing bug in bam_aux_get - expected to fail
481 This test is expected to fail until samtools is fixed.
483 samfile = pysam.Samfile("tag_bug.bam")
484 read = samfile.fetch(until_eof=True).next()
485 self.assertTrue( ('XC',1) in read.tags )
486 self.assertEqual(read.opt('XC'), 1)
488 class TestTagParsing( unittest.TestCase ):
489 '''tests checking the accuracy of tag setting and retrieval.'''
491 def makeRead( self ):
492 a = pysam.AlignedRead()
493 a.qname = "read_12345"
500 a.cigar = ( (0,10), (2,1), (0,25) )
508 def testNegativeIntegers( self ):
510 aligned_read = self.makeRead()
511 aligned_read.tags = [("XD", int(x) ) ]
512 print aligned_read.tags
514 def testNegativeIntegers2( self ):
517 r.tags = [("XD", int(x) ) ]
518 outfile = pysam.Samfile( "test.bam",
520 referencenames = ("chr1",),
521 referencelengths = (1000,) )
525 class TestIteratorRow(unittest.TestCase):
528 self.samfile=pysam.Samfile( "ex1.bam","rb" )
530 def checkRange( self, rnge ):
531 '''compare results from iterator with those from samtools.'''
532 ps = list(self.samfile.fetch(region=rnge))
533 sa = list(pysam.view( "ex1.bam", rnge, raw = True) )
534 self.assertEqual( len(ps), len(sa), "unequal number of results for range %s: %i != %i" % (rnge, len(ps), len(sa) ))
535 # check if the same reads are returned and in the same order
536 for line, pair in enumerate( zip( ps, sa ) ):
539 self.assertEqual( a.qname, d[0], "line %i: read id mismatch: %s != %s" % (line, a.rname, d[0]) )
540 self.assertEqual( a.pos, int(d[3])-1, "line %i: read position mismatch: %s != %s, \n%s\n%s\n" % \
541 (line, a.pos, int(d[3])-1,
543 self.assertEqual( a.qual, d[10], "line %i: quality mismatch: %s != %s, \n%s\n%s\n" % \
544 (line, a.qual, d[10],
547 def testIteratePerContig(self):
548 '''check random access per contig'''
549 for contig in self.samfile.references:
550 self.checkRange( contig )
552 def testIterateRanges(self):
553 '''check random access per range'''
554 for contig, length in zip(self.samfile.references, self.samfile.lengths):
555 for start in range( 1, length, 90):
556 self.checkRange( "%s:%i-%i" % (contig, start, start + 90) ) # this includes empty ranges
561 class TestIteratorRowAll(unittest.TestCase):
564 self.samfile=pysam.Samfile( "ex1.bam","rb" )
566 def testIterate(self):
567 '''compare results from iterator with those from samtools.'''
568 ps = list(self.samfile.fetch())
569 sa = list(pysam.view( "ex1.bam", raw = True) )
570 self.assertEqual( len(ps), len(sa), "unequal number of results: %i != %i" % (len(ps), len(sa) ))
571 # check if the same reads are returned
572 for line, pair in enumerate( zip( ps, sa ) ):
573 data = pair[1].split("\t")
574 self.assertEqual( pair[0].qname, data[0], "read id mismatch in line %i: %s != %s" % (line, pair[0].rname, data[0]) )
579 class TestIteratorColumn(unittest.TestCase):
580 '''test iterator column against contents of ex3.bam.'''
582 # note that samfile contains 1-based coordinates
583 # 1D means deletion with respect to reference sequence
585 mCoverages = { 'chr1' : [ 0 ] * 20 + [1] * 36 + [0] * (100 - 20 -35 ),
586 'chr2' : [ 0 ] * 20 + [1] * 35 + [0] * (100 - 20 -35 ),
590 self.samfile=pysam.Samfile( "ex4.bam","rb" )
592 def checkRange( self, rnge ):
593 '''compare results from iterator with those from samtools.'''
594 # check if the same reads are returned and in the same order
595 for column in self.samfile.pileup(region=rnge):
596 thiscov = len(column.pileups)
597 refcov = self.mCoverages[self.samfile.getrname(column.tid)][column.pos]
598 self.assertEqual( thiscov, refcov, "wrong coverage at pos %s:%i %i should be %i" % (self.samfile.getrname(column.tid), column.pos, thiscov, refcov))
600 def testIterateAll(self):
601 '''check random access per contig'''
602 self.checkRange( None )
604 def testIteratePerContig(self):
605 '''check random access per contig'''
606 for contig in self.samfile.references:
607 self.checkRange( contig )
609 def testIterateRanges(self):
610 '''check random access per range'''
611 for contig, length in zip(self.samfile.references, self.samfile.lengths):
612 for start in range( 1, length, 90):
613 self.checkRange( "%s:%i-%i" % (contig, start, start + 90) ) # this includes empty ranges
615 def testInverse( self ):
616 '''test the inverse, is point-wise pileup accurate.'''
617 for contig, refseq in self.mCoverages.items():
618 refcolumns = sum(refseq)
619 for pos, refcov in enumerate( refseq ):
620 columns = list(self.samfile.pileup( contig, pos, pos+1) )
622 # if no read, no coverage
623 self.assertEqual( len(columns), refcov, "wrong number of pileup columns returned for position %s:%i, %i should be %i" %(contig,pos,len(columns), refcov) )
625 # one read, all columns of the read are returned
626 self.assertEqual( len(columns), refcolumns, "pileup incomplete at position %i: got %i, expected %i " %\
627 (pos, len(columns), refcolumns))
634 class TestAlignedReadFromBam(unittest.TestCase):
637 self.samfile=pysam.Samfile( "ex3.bam","rb" )
638 self.reads=list(self.samfile.fetch())
640 def testARqname(self):
641 self.assertEqual( self.reads[0].qname, "read_28833_29006_6945", "read name mismatch in read 1: %s != %s" % (self.reads[0].qname, "read_28833_29006_6945") )
642 self.assertEqual( self.reads[1].qname, "read_28701_28881_323b", "read name mismatch in read 2: %s != %s" % (self.reads[1].qname, "read_28701_28881_323b") )
644 def testARflag(self):
645 self.assertEqual( self.reads[0].flag, 99, "flag mismatch in read 1: %s != %s" % (self.reads[0].flag, 99) )
646 self.assertEqual( self.reads[1].flag, 147, "flag mismatch in read 2: %s != %s" % (self.reads[1].flag, 147) )
648 def testARrname(self):
649 self.assertEqual( self.reads[0].rname, 0, "chromosome/target id mismatch in read 1: %s != %s" % (self.reads[0].rname, 0) )
650 self.assertEqual( self.reads[1].rname, 1, "chromosome/target id mismatch in read 2: %s != %s" % (self.reads[1].rname, 1) )
653 self.assertEqual( self.reads[0].pos, 33-1, "mapping position mismatch in read 1: %s != %s" % (self.reads[0].pos, 33-1) )
654 self.assertEqual( self.reads[1].pos, 88-1, "mapping position mismatch in read 2: %s != %s" % (self.reads[1].pos, 88-1) )
656 def testARmapq(self):
657 self.assertEqual( self.reads[0].mapq, 20, "mapping quality mismatch in read 1: %s != %s" % (self.reads[0].mapq, 20) )
658 self.assertEqual( self.reads[1].mapq, 30, "mapping quality mismatch in read 2: %s != %s" % (self.reads[1].mapq, 30) )
660 def testARcigar(self):
661 self.assertEqual( self.reads[0].cigar, [(0, 10), (2, 1), (0, 25)], "read name length mismatch in read 1: %s != %s" % (self.reads[0].cigar, [(0, 10), (2, 1), (0, 25)]) )
662 self.assertEqual( self.reads[1].cigar, [(0, 35)], "read name length mismatch in read 2: %s != %s" % (self.reads[1].cigar, [(0, 35)]) )
664 def testARmrnm(self):
665 self.assertEqual( self.reads[0].mrnm, 0, "mate reference sequence name mismatch in read 1: %s != %s" % (self.reads[0].mrnm, 0) )
666 self.assertEqual( self.reads[1].mrnm, 1, "mate reference sequence name mismatch in read 2: %s != %s" % (self.reads[1].mrnm, 1) )
667 self.assertEqual( self.reads[0].rnext, 0, "mate reference sequence name mismatch in read 1: %s != %s" % (self.reads[0].rnext, 0) )
668 self.assertEqual( self.reads[1].rnext, 1, "mate reference sequence name mismatch in read 2: %s != %s" % (self.reads[1].rnext, 1) )
670 def testARmpos(self):
671 self.assertEqual( self.reads[0].mpos, 200-1, "mate mapping position mismatch in read 1: %s != %s" % (self.reads[0].mpos, 200-1) )
672 self.assertEqual( self.reads[1].mpos, 500-1, "mate mapping position mismatch in read 2: %s != %s" % (self.reads[1].mpos, 500-1) )
673 self.assertEqual( self.reads[0].pnext, 200-1, "mate mapping position mismatch in read 1: %s != %s" % (self.reads[0].pnext, 200-1) )
674 self.assertEqual( self.reads[1].pnext, 500-1, "mate mapping position mismatch in read 2: %s != %s" % (self.reads[1].pnext, 500-1) )
676 def testARisize(self):
677 self.assertEqual( self.reads[0].isize, 167, "insert size mismatch in read 1: %s != %s" % (self.reads[0].isize, 167) )
678 self.assertEqual( self.reads[1].isize, 412, "insert size mismatch in read 2: %s != %s" % (self.reads[1].isize, 412) )
679 self.assertEqual( self.reads[0].tlen, 167, "insert size mismatch in read 1: %s != %s" % (self.reads[0].tlen, 167) )
680 self.assertEqual( self.reads[1].tlen, 412, "insert size mismatch in read 2: %s != %s" % (self.reads[1].tlen, 412) )
683 self.assertEqual( self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 1: %s != %s" % (self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
684 self.assertEqual( self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "sequence size mismatch in read 2: %s != %s" % (self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
685 self.assertEqual( self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 4: %s != %s" % (self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
687 def testARqual(self):
688 self.assertEqual( self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 1: %s != %s" % (self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
689 self.assertEqual( self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "quality string mismatch in read 2: %s != %s" % (self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
690 self.assertEqual( self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 3: %s != %s" % (self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
692 def testARquery(self):
693 self.assertEqual( self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "query mismatch in read 1: %s != %s" % (self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
694 self.assertEqual( self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "query size mismatch in read 2: %s != %s" % (self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
695 self.assertEqual( self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT", "query mismatch in read 4: %s != %s" % (self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT") )
697 def testARqqual(self):
698 self.assertEqual( self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "qquality string mismatch in read 1: %s != %s" % (self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
699 self.assertEqual( self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "qquality string mismatch in read 2: %s != %s" % (self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
700 self.assertEqual( self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22", "qquality string mismatch in read 3: %s != %s" % (self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22") )
702 def testPresentOptionalFields(self):
703 self.assertEqual( self.reads[0].opt('NM'), 1, "optional field mismatch in read 1, NM: %s != %s" % (self.reads[0].opt('NM'), 1) )
704 self.assertEqual( self.reads[0].opt('RG'), 'L1', "optional field mismatch in read 1, RG: %s != %s" % (self.reads[0].opt('RG'), 'L1') )
705 self.assertEqual( self.reads[1].opt('RG'), 'L2', "optional field mismatch in read 2, RG: %s != %s" % (self.reads[1].opt('RG'), 'L2') )
706 self.assertEqual( self.reads[1].opt('MF'), 18, "optional field mismatch in read 2, MF: %s != %s" % (self.reads[1].opt('MF'), 18) )
708 def testPairedBools(self):
709 self.assertEqual( self.reads[0].is_paired, True, "is paired mismatch in read 1: %s != %s" % (self.reads[0].is_paired, True) )
710 self.assertEqual( self.reads[1].is_paired, True, "is paired mismatch in read 2: %s != %s" % (self.reads[1].is_paired, True) )
711 self.assertEqual( self.reads[0].is_proper_pair, True, "is proper pair mismatch in read 1: %s != %s" % (self.reads[0].is_proper_pair, True) )
712 self.assertEqual( self.reads[1].is_proper_pair, True, "is proper pair mismatch in read 2: %s != %s" % (self.reads[1].is_proper_pair, True) )
714 def testTags( self ):
715 self.assertEqual( self.reads[0].tags,
716 [('NM', 1), ('RG', 'L1'),
717 ('PG', 'P1'), ('XT', 'U')] )
718 self.assertEqual( self.reads[1].tags,
719 [('MF', 18), ('RG', 'L2'),
720 ('PG', 'P2'),('XT', 'R') ] )
723 self.assertEqual( self.reads[0].opt("XT"), "U" )
724 self.assertEqual( self.reads[1].opt("XT"), "R" )
726 def testMissingOpt( self ):
727 self.assertRaises( KeyError, self.reads[0].opt, "XP" )
729 def testEmptyOpt( self ):
730 self.assertRaises( KeyError, self.reads[2].opt, "XT" )
735 class TestAlignedReadFromSam(TestAlignedReadFromBam):
738 self.samfile=pysam.Samfile( "ex3.sam","r" )
739 self.reads=list(self.samfile.fetch())
741 # needs to be implemented
742 # class TestAlignedReadFromSamWithoutHeader(TestAlignedReadFromBam):
745 # self.samfile=pysam.Samfile( "ex7.sam","r" )
746 # self.reads=list(self.samfile.fetch())
748 class TestHeaderSam(unittest.TestCase):
750 header = {'SQ': [{'LN': 1575, 'SN': 'chr1'},
751 {'LN': 1584, 'SN': 'chr2'}],
752 'RG': [{'LB': 'SC_1', 'ID': 'L1', 'SM': 'NA12891', 'PU': 'SC_1_10', "CN":"name:with:colon"},
753 {'LB': 'SC_2', 'ID': 'L2', 'SM': 'NA12891', 'PU': 'SC_2_12', "CN":"name:with:colon"}],
754 'PG': [{'ID': 'P1', 'VN': '1.0'}, {'ID': 'P2', 'VN': '1.1'}],
756 'CO' : [ 'this is a comment', 'this is another comment'],
759 def compareHeaders( self, a, b ):
760 '''compare two headers a and b.'''
761 for ak,av in a.iteritems():
762 self.assertTrue( ak in b, "key '%s' not in '%s' " % (ak,b) )
763 self.assertEqual( av, b[ak] )
766 self.samfile=pysam.Samfile( "ex3.sam","r" )
768 def testHeaders(self):
769 self.compareHeaders( self.header, self.samfile.header )
770 self.compareHeaders( self.samfile.header, self.header )
772 def testNameMapping( self ):
773 for x, y in enumerate( ("chr1", "chr2")):
774 tid = self.samfile.gettid( y )
775 ref = self.samfile.getrname( x )
776 self.assertEqual( tid, x )
777 self.assertEqual( ref, y )
779 self.assertEqual( self.samfile.gettid("chr?"), -1 )
780 self.assertRaises( ValueError, self.samfile.getrname, 2 )
785 class TestHeaderBam(TestHeaderSam):
788 self.samfile=pysam.Samfile( "ex3.bam","rb" )
790 class TestUnmappedReads(unittest.TestCase):
793 samfile=pysam.Samfile( "ex5.sam","r" )
794 self.assertEqual( len(list(samfile.fetch( until_eof = True))), 2 )
798 samfile=pysam.Samfile( "ex5.bam","rb" )
799 self.assertEqual( len(list(samfile.fetch( until_eof = True))), 2 )
802 class TestPileupObjects(unittest.TestCase):
805 self.samfile=pysam.Samfile( "ex1.bam","rb" )
807 def testPileupColumn(self):
808 for pcolumn1 in self.samfile.pileup( region="chr1:105" ):
809 if pcolumn1.pos == 104:
810 self.assertEqual( pcolumn1.tid, 0, "chromosome/target id mismatch in position 1: %s != %s" % (pcolumn1.tid, 0) )
811 self.assertEqual( pcolumn1.pos, 105-1, "position mismatch in position 1: %s != %s" % (pcolumn1.pos, 105-1) )
812 self.assertEqual( pcolumn1.n, 2, "# reads mismatch in position 1: %s != %s" % (pcolumn1.n, 2) )
813 for pcolumn2 in self.samfile.pileup( region="chr2:1480" ):
814 if pcolumn2.pos == 1479:
815 self.assertEqual( pcolumn2.tid, 1, "chromosome/target id mismatch in position 1: %s != %s" % (pcolumn2.tid, 1) )
816 self.assertEqual( pcolumn2.pos, 1480-1, "position mismatch in position 1: %s != %s" % (pcolumn2.pos, 1480-1) )
817 self.assertEqual( pcolumn2.n, 12, "# reads mismatch in position 1: %s != %s" % (pcolumn2.n, 12) )
819 def testPileupRead(self):
820 for pcolumn1 in self.samfile.pileup( region="chr1:105" ):
821 if pcolumn1.pos == 104:
822 self.assertEqual( len(pcolumn1.pileups), 2, "# reads aligned to column mismatch in position 1: %s != %s" % (len(pcolumn1.pileups), 2) )
823 # self.assertEqual( pcolumn1.pileups[0] # need to test additional properties here
828 class TestContextManager(unittest.TestCase):
830 def testManager( self ):
831 with pysam.Samfile('ex1.bam', 'rb') as samfile:
833 self.assertEqual( samfile._isOpen(), False )
835 class TestExceptions(unittest.TestCase):
838 self.samfile=pysam.Samfile( "ex1.bam","rb" )
840 def testMissingFile(self):
842 self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.bam", "rb" )
843 self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.sam", "r" )
844 self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.bam", "r" )
845 self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.sam", "rb" )
847 def testBadContig(self):
848 self.assertRaises( ValueError, self.samfile.fetch, "chr88" )
850 def testMeaninglessCrap(self):
851 self.assertRaises( ValueError, self.samfile.fetch, "skljf" )
853 def testBackwardsOrderNewFormat(self):
854 self.assertRaises( ValueError, self.samfile.fetch, 'chr1', 100, 10 )
856 def testBackwardsOrderOldFormat(self):
857 self.assertRaises( ValueError, self.samfile.fetch, region="chr1:100-10")
859 def testOutOfRangeNegativeNewFormat(self):
860 self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, -10 )
861 self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, 0 )
862 self.assertRaises( ValueError, self.samfile.fetch, "chr1", -5, -10 )
864 self.assertRaises( ValueError, self.samfile.count, "chr1", 5, -10 )
865 self.assertRaises( ValueError, self.samfile.count, "chr1", 5, 0 )
866 self.assertRaises( ValueError, self.samfile.count, "chr1", -5, -10 )
868 def testOutOfRangeNegativeOldFormat(self):
869 self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-10" )
870 self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-0" )
871 self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5--10" )
873 self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-10" )
874 self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-0" )
875 self.assertRaises( ValueError, self.samfile.count, region="chr1:-5--10" )
877 def testOutOfRangNewFormat(self):
878 self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999, 99999999999 )
879 self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999, 99999999999 )
881 def testOutOfRangeLargeNewFormat(self):
882 self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
883 self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
885 def testOutOfRangeLargeOldFormat(self):
886 self.assertRaises( ValueError, self.samfile.fetch, "chr1:99999999999999999-999999999999999999" )
887 self.assertRaises( ValueError, self.samfile.count, "chr1:99999999999999999-999999999999999999" )
889 def testZeroToZero(self):
891 self.assertEqual( len(list(self.samfile.fetch('chr1', 0, 0))), 0)
896 class TestWrongFormat(unittest.TestCase):
897 '''test cases for opening files not in bam/sam format.'''
899 def testOpenSamAsBam( self ):
900 self.assertRaises( ValueError, pysam.Samfile, 'ex1.sam', 'rb' )
902 def testOpenBamAsSam( self ):
903 # test fails, needs to be implemented.
904 # sam.fetch() fails on reading, not on opening
905 # self.assertRaises( ValueError, pysam.Samfile, 'ex1.bam', 'r' )
908 def testOpenFastaAsSam( self ):
909 # test fails, needs to be implemented.
910 # sam.fetch() fails on reading, not on opening
911 # self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'r' )
914 def testOpenFastaAsBam( self ):
915 self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'rb' )
917 class TestFastaFile(unittest.TestCase):
919 mSequences = { 'chr1' :
920 "CACTAGTGGCTCATTGTAAATGTGTGGTTTAACTCGTCCATGGCCCAGCATTAGGGAGCTGTGGACCCTGCAGCCTGGCTGTGGGGGCCGCAGTGGCTGAGGGGTGCAGAGCCGAGTCACGGGGTTGCCAGCACAGGGGCTTAACCTCTGGTGACTGCCAGAGCTGCTGGCAAGCTAGAGTCCCATTTGGAGCCCCTCTAAGCCGTTCTATTTGTAATGAAAACTATATTTATGCTATTCAGTTCTAAATATAGAAATTGAAACAGCTGTGTTTAGTGCCTTTGTTCAACCCCCTTGCAACAACCTTGAGAACCCCAGGGAATTTGTCAATGTCAGGGAAGGAGCATTTTGTCAGTTACCAAATGTGTTTATTACCAGAGGGATGGAGGGAAGAGGGACGCTGAAGAACTTTGATGCCCTCTTCTTCCAAAGATGAAACGCGTAACTGCGCTCTCATTCACTCCAGCTCCCTGTCACCCAATGGACCTGTGATATCTGGATTCTGGGAAATTCTTCATCCTGGACCCTGAGAGATTCTGCAGCCCAGCTCCAGATTGCTTGTGGTCTGACAGGCTGCAACTGTGAGCCATCACAATGAACAACAGGAAGAAAAGGTCTTTCAAAAGGTGATGTGTGTTCTCATCAACCTCATACACACACATGGTTTAGGGGTATAATACCTCTACATGGCTGATTATGAAAACAATGTTCCCCAGATACCATCCCTGTCTTACTTCCAGCTCCCCAGAGGGAAAGCTTTCAACGCTTCTAGCCATTTCTTTTGGCATTTGCCTTCAGACCCTACACGAATGCGTCTCTACCACAGGGGGCTGCGCGGTTTCCCATCATGAAGCACTGAACTTCCACGTCTCATCTAGGGGAACAGGGAGGTGCACTAATGCGCTCCACGCCCAAGCCCTTCTCACAGTTTCTGCCCCCAGCATGGTTGTACTGGGCAATACATGAGATTATTAGGAAATGCTTTACTGTCATAACTATGAAGAGACTATTGCCAGATGAACCACACATTAATACTATGTTTCTTATCTGCACATTACTACCCTGCAATTAATATAATTGTGTCCATGTACACACGCTGTCCTATGTACTTATCATGACTCTATCCCAAATTCCCAATTACGTCCTATCTTCTTCTTAGGGAAGAACAGCTTAGGTATCAATTTGGTGTTCTGTGTAAAGTCTCAGGGAGCCGTCCGTGTCCTCCCATCTGGCCTCGTCCACACTGGTTCTCTTGAAAGCTTGGGCTGTAATGATGCCCCTTGGCCATCACCCAGTCCCTGCCCCATCTCTTGTAATCTCTCTCCTTTTTGCTGCATCCCTGTCTTCCTCTGTCTTGATTTACTTGTTGTTGGTTTTCTGTTTCTTTGTTTGATTTGGTGGAAGACATAATCCCACGCTTCCTATGGAAAGGTTGTTGGGAGATTTTTAATGATTCCTCAATGTTAAAATGTCTATTTTTGTCTTGACACCCAACTAATATTTGTCTGAGCAAAACAGTCTAGATGAGAGAGAACTTCCCTGGAGGTCTGATGGCGTTTCTCCCTCGTCTTCTTA",
922 "TTCAAATGAACTTCTGTAATTGAAAAATTCATTTAAGAAATTACAAAATATAGTTGAAAGCTCTAACAATAGACTAAACCAAGCAGAAGAAAGAGGTTCAGAACTTGAAGACAAGTCTCTTATGAATTAACCCAGTCAGACAAAAATAAAGAAAAAAATTTTAAAAATGAACAGAGCTTTCAAGAAGTATGAGATTATGTAAAGTAACTGAACCTATGAGTCACAGGTATTCCTGAGGAAAAAGAAAAAGTGAGAAGTTTGGAAAAACTATTTGAGGAAGTAATTGGGGAAAACCTCTTTAGTCTTGCTAGAGATTTAGACATCTAAATGAAAGAGGCTCAAAGAATGCCAGGAAGATACATTGCAAGACAGACTTCATCAAGATATGTAGTCATCAGACTATCTAAAGTCAACATGAAGGAAAAAAATTCTAAAATCAGCAAGAGAAAAGCATACAGTCATCTATAAAGGAAATCCCATCAGAATAACAATGGGCTTCTCAGCAGAAACCTTACAAGCCAGAAGAGATTGGATCTAATTTTTGGACTTCTTAAAGAAAAAAAAACCTGTCAAACACGAATGTTATGCCCTGCTAAACTAAGCATCATAAATGAAGGGGAAATAAAGTCAAGTCTTTCCTGACAAGCAAATGCTAAGATAATTCATCATCACTAAACCAGTCCTATAAGAAATGCTCAAAAGAATTGTAAAAGTCAAAATTAAAGTTCAATACTCACCATCATAAATACACACAAAAGTACAAAACTCACAGGTTTTATAAAACAATTGAGACTACAGAGCAACTAGGTAAAAAATTAACATTACAACAGGAACAAAACCTCATATATCAATATTAACTTTGAATAAAAAGGGATTAAATTCCCCCACTTAAGAGATATAGATTGGCAGAACAGATTTAAAAACATGAACTAACTATATGCTGTTTACAAGAAACTCATTAATAAAGACATGAGTTCAGGTAAAGGGGTGGAAAAAGATGTTCTACGCAAACAGAAACCAAATGAGAGAAGGAGTAGCTATACTTATATCAGATAAAGCACACTTTAAATCAACAACAGTAAAATAAAACAAAGGAGGTCATCATACAATGATAAAAAGATCAATTCAGCAAGAAGATATAACCATCCTACTAAATACATATGCACCTAACACAAGACTACCCAGATTCATAAAACAAATACTACTAGACCTAAGAGGGATGAGAAATTACCTAATTGGTACAATGTACAATATTCTGATGATGGTTACACTAAAAGCCCATACTTTACTGCTACTCAATATATCCATGTAACAAATCTGCGCTTGTACTTCTAAATCTATAAAAAAATTAAAATTTAACAAAAGTAAATAAAACACATAGCTAAAACTAAAAAAGCAAAAACAAAAACTATGCTAAGTATTGGTAAAGATGTGGGGAAAAAAGTAAACTCTCAAATATTGCTAGTGGGAGTATAAATTGTTTTCCACTTTGGAAAACAATTTGGTAATTTCGTTTTTTTTTTTTTCTTTTCTCTTTTTTTTTTTTTTTTTTTTGCATGCCAGAAAAAAATATTTACAGTAACT",
926 self.file=pysam.Fastafile( "ex1.fa" )
929 for id, seq in self.mSequences.items():
930 self.assertEqual( seq, self.file.fetch( id ) )
931 for x in range( 0, len(seq), 10):
932 self.assertEqual( seq[x:x+10], self.file.fetch( id, x, x+10) )
934 self.assertEqual( seq[x:], self.file.fetch( id, x) )
936 self.assertEqual( seq[:x], self.file.fetch( id, None, x) )
939 # unknown sequence returns ""
940 self.assertEqual( "", self.file.fetch("chr12") )
942 def testOutOfRangeAccess( self ):
943 '''test out of range access.'''
944 # out of range access returns an empty string
945 for contig, s in self.mSequences.iteritems():
946 self.assertEqual( self.file.fetch( contig, len(s), len(s)+1), "" )
948 self.assertEqual( self.file.fetch( "chr3", 0 , 100), "" )
950 def testFetchErrors( self ):
951 self.assertRaises( ValueError, self.file.fetch )
952 self.assertRaises( ValueError, self.file.fetch, "chr1", -1, 10 )
953 self.assertRaises( ValueError, self.file.fetch, "chr1", 20, 10 )
955 def testLength( self ):
956 self.assertEqual( len(self.file), 2 )
961 class TestAlignedRead(unittest.TestCase):
962 '''tests to check if aligned read can be constructed
966 def checkFieldEqual( self, read1, read2, exclude = []):
967 '''check if two reads are equal by comparing each field.'''
969 for x in ("qname", "seq", "flag",
970 "rname", "pos", "mapq", "cigar",
971 "mrnm", "mpos", "isize", "qual",
972 "is_paired", "is_proper_pair",
973 "is_unmapped", "mate_is_unmapped",
974 "is_reverse", "mate_is_reverse",
975 "is_read1", "is_read2",
976 "is_secondary", "is_qcfail",
977 "is_duplicate", "bin"):
978 if x in exclude: continue
979 self.assertEqual( getattr(read1, x), getattr(read2,x), "attribute mismatch for %s: %s != %s" %
980 (x, getattr(read1, x), getattr(read2,x)))
982 def testEmpty( self ):
983 a = pysam.AlignedRead()
984 self.assertEqual( a.qname, None )
985 self.assertEqual( a.seq, None )
986 self.assertEqual( a.qual, None )
987 self.assertEqual( a.flag, 0 )
988 self.assertEqual( a.rname, 0 )
989 self.assertEqual( a.mapq, 0 )
990 self.assertEqual( a.cigar, None )
991 self.assertEqual( a.tags, [] )
992 self.assertEqual( a.mrnm, 0 )
993 self.assertEqual( a.mpos, 0 )
994 self.assertEqual( a.isize, 0 )
996 def buildRead( self ):
997 '''build an example read.'''
999 a = pysam.AlignedRead()
1000 a.qname = "read_12345"
1006 a.cigar = ( (0,10), (2,1), (0,25) )
1014 def testUpdate( self ):
1015 '''check if updating fields affects other variable length data
1017 a = self.buildRead()
1018 b = self.buildRead()
1021 b.qname = "read_123"
1022 self.checkFieldEqual( a, b, "qname" )
1023 b.qname = "read_12345678"
1024 self.checkFieldEqual( a, b, "qname" )
1025 b.qname = "read_12345"
1026 self.checkFieldEqual( a, b)
1029 b.cigar = ( (0,10), )
1030 self.checkFieldEqual( a, b, "cigar" )
1031 b.cigar = ( (0,10), (2,1), (0,25), (2,1), (0,25) )
1032 self.checkFieldEqual( a, b, "cigar" )
1033 b.cigar = ( (0,10), (2,1), (0,25) )
1034 self.checkFieldEqual( a, b)
1038 self.checkFieldEqual( a, b, ("seq", "qual") )
1040 self.checkFieldEqual( a, b, ("seq", "qual") )
1042 self.checkFieldEqual( a, b, ("qual",))
1045 b = self.buildRead()
1049 "is_paired", "is_proper_pair",
1050 "is_unmapped", "mate_is_unmapped",
1051 "is_reverse", "mate_is_reverse",
1052 "is_read1", "is_read2",
1053 "is_secondary", "is_qcfail",
1055 setattr( b, x, True )
1056 self.assertEqual( getattr(b, x), True )
1057 self.checkFieldEqual( a, b, ("flag", x,) )
1058 setattr( b, x, False )
1059 self.assertEqual( getattr(b, x), False )
1060 self.checkFieldEqual( a, b )
1062 def testLargeRead( self ):
1063 '''build an example read.'''
1065 a = pysam.AlignedRead()
1066 a.qname = "read_12345"
1072 a.cigar = ( (0,10), (2,1), (0,25) )
1080 def testTagParsing( self ):
1081 '''test for tag parsing
1083 see http://groups.google.com/group/pysam-user-group/browse_thread/thread/67ca204059ea465a
1085 samfile=pysam.Samfile( "ex8.bam","rb" )
1087 for entry in samfile:
1089 entry.tags = entry.tags
1091 self.assertEqual( after, before )
1093 class TestDeNovoConstruction(unittest.TestCase):
1094 '''check BAM/SAM file construction using ex3.sam
1096 (note these are +1 coordinates):
1098 read_28833_29006_6945 99 chr1 33 20 10M1D25M = 200 167 AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG <<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<< NM:i:1 RG:Z:L1
1099 read_28701_28881_323b 147 chr2 88 30 35M = 500 412 ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA <<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<< MF:i:18 RG:Z:L2
1102 header = { 'HD': {'VN': '1.0'},
1103 'SQ': [{'LN': 1575, 'SN': 'chr1'},
1104 {'LN': 1584, 'SN': 'chr2'}], }
1109 def checkFieldEqual( self, read1, read2, exclude = []):
1110 '''check if two reads are equal by comparing each field.'''
1112 for x in ("qname", "seq", "flag",
1113 "rname", "pos", "mapq", "cigar",
1114 "mrnm", "mpos", "isize", "qual",
1116 "is_paired", "is_proper_pair",
1117 "is_unmapped", "mate_is_unmapped",
1118 "is_reverse", "mate_is_reverse",
1119 "is_read1", "is_read2",
1120 "is_secondary", "is_qcfail",
1122 if x in exclude: continue
1123 self.assertEqual( getattr(read1, x), getattr(read2,x), "attribute mismatch for %s: %s != %s" %
1124 (x, getattr(read1, x), getattr(read2,x)))
1129 a = pysam.AlignedRead()
1130 a.qname = "read_28833_29006_6945"
1131 a.seq="AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG"
1136 a.cigar = ( (0,10), (2,1), (0,25) )
1140 a.qual="<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<"
1141 a.tags = ( ("NM", 1),
1144 b = pysam.AlignedRead()
1145 b.qname = "read_28701_28881_323b"
1146 b.seq="ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA"
1151 b.cigar = ( (0,35), )
1155 b.qual="<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<"
1156 b.tags = ( ("MF", 18),
1161 def testSAMWholeFile( self ):
1163 tmpfilename = "tmp_%i.sam" % id(self)
1165 outfile = pysam.Samfile( tmpfilename, "wh", header = self.header )
1167 for x in self.reads: outfile.write( x )
1170 self.assertTrue( checkBinaryEqual( tmpfilename, self.samfile ),
1171 "mismatch when construction SAM file, see %s %s" % (tmpfilename, self.samfile))
1173 os.unlink( tmpfilename )
1175 def testBAMPerRead( self ):
1176 '''check if individual reads are binary equal.'''
1177 infile = pysam.Samfile( self.bamfile, "rb")
1179 others = list(infile)
1180 for denovo, other in zip( others, self.reads):
1181 self.checkFieldEqual( other, denovo )
1182 self.assertEqual( other.compare( denovo ), 0 )
1184 def testSAMPerRead( self ):
1185 '''check if individual reads are binary equal.'''
1186 infile = pysam.Samfile( self.samfile, "r")
1188 others = list(infile)
1189 for denovo, other in zip( others, self.reads):
1190 self.checkFieldEqual( other, denovo )
1191 self.assertEqual( other.compare( denovo), 0 )
1193 def testBAMWholeFile( self ):
1195 tmpfilename = "tmp_%i.bam" % id(self)
1197 outfile = pysam.Samfile( tmpfilename, "wb", header = self.header )
1199 for x in self.reads: outfile.write( x )
1202 self.assertTrue( checkBinaryEqual( tmpfilename, self.bamfile ),
1203 "mismatch when construction BAM file, see %s %s" % (tmpfilename, self.bamfile))
1205 os.unlink( tmpfilename )
1208 class TestDoubleFetch(unittest.TestCase):
1209 '''check if two iterators on the same bamfile are independent.'''
1211 def testDoubleFetch( self ):
1213 samfile1 = pysam.Samfile('ex1.bam', 'rb')
1215 for a,b in zip(samfile1.fetch(), samfile1.fetch()):
1216 self.assertEqual( a.compare( b ), 0 )
1218 def testDoubleFetchWithRegion( self ):
1220 samfile1 = pysam.Samfile('ex1.bam', 'rb')
1221 chr, start, stop = 'chr1', 200, 3000000
1222 self.assertTrue(len(list(samfile1.fetch ( chr, start, stop))) > 0) #just making sure the test has something to catch
1224 for a,b in zip(samfile1.fetch( chr, start, stop), samfile1.fetch( chr, start, stop)):
1225 self.assertEqual( a.compare( b ), 0 )
1227 def testDoubleFetchUntilEOF( self ):
1229 samfile1 = pysam.Samfile('ex1.bam', 'rb')
1231 for a,b in zip(samfile1.fetch( until_eof = True),
1232 samfile1.fetch( until_eof = True )):
1233 self.assertEqual( a.compare( b), 0 )
1235 class TestRemoteFileFTP(unittest.TestCase):
1236 '''test remote access.
1240 # Need to find an ftp server without password on standard
1243 url = "ftp://ftp.sanger.ac.uk/pub/rd/humanSequences/CV.bam"
1246 def testFTPView( self ):
1248 result = pysam.view( self.url, self.region )
1249 self.assertEqual( len(result), 36 )
1251 def testFTPFetch( self ):
1253 samfile = pysam.Samfile(self.url, "rb")
1254 result = list(samfile.fetch( region = self.region ))
1255 self.assertEqual( len(result), 36 )
1257 class TestRemoteFileHTTP( unittest.TestCase):
1259 url = "http://genserv.anat.ox.ac.uk/downloads/pysam/test/ex1.bam"
1260 region = "chr1:1-1000"
1263 def testView( self ):
1264 samfile_local = pysam.Samfile(self.local, "rb")
1265 ref = list(samfile_local.fetch( region = self.region ))
1267 result = pysam.view( self.url, self.region )
1268 self.assertEqual( len(result), len(ref) )
1270 def testFetch( self ):
1271 samfile = pysam.Samfile(self.url, "rb")
1272 result = list(samfile.fetch( region = self.region ))
1273 samfile_local = pysam.Samfile(self.local, "rb")
1274 ref = list(samfile_local.fetch( region = self.region ))
1276 self.assertEqual( len(ref), len(result) )
1277 for x, y in zip(result, ref):
1278 self.assertEqual( x.compare( y ), 0 )
1280 def testFetchAll( self ):
1281 samfile = pysam.Samfile(self.url, "rb")
1282 result = list(samfile.fetch())
1283 samfile_local = pysam.Samfile(self.local, "rb")
1284 ref = list(samfile_local.fetch() )
1286 self.assertEqual( len(ref), len(result) )
1287 for x, y in zip(result, ref):
1288 self.assertEqual( x.compare( y ), 0 )
1290 class TestLargeOptValues( unittest.TestCase ):
1292 ints = ( 65536, 214748, 2147484, 2147483647 )
1293 floats = ( 65536.0, 214748.0, 2147484.0 )
1295 def check( self, samfile ):
1298 for exp in self.ints:
1301 self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1303 for exp in [ -x for x in self.ints ]:
1306 self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1308 for exp in self.floats:
1311 self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1313 for exp in [ -x for x in self.floats ]:
1316 self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1318 def testSAM( self ):
1319 samfile = pysam.Samfile("ex10.sam", "r")
1320 self.check( samfile )
1322 def testBAM( self ):
1323 samfile = pysam.Samfile("ex10.bam", "rb")
1324 self.check( samfile )
1326 # class TestSNPCalls( unittest.TestCase ):
1327 # '''test pysam SNP calling ability.'''
1329 # def checkEqual( self, a, b ):
1330 # for x in ("reference_base",
1333 # "consensus_quality",
1335 # "mapping_quality",
1337 # self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" %
1338 # (x, getattr(a, x), getattr(b,x), str(a), str(b)))
1340 # def testAllPositionsViaIterator( self ):
1341 # samfile = pysam.Samfile( "ex1.bam", "rb")
1342 # fastafile = pysam.Fastafile( "ex1.fa" )
1344 # refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base != "*"]
1345 # except pysam.SamtoolsError:
1348 # i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1349 # calls = list(pysam.IteratorSNPCalls(i))
1350 # for x,y in zip( refs, calls ):
1351 # self.checkEqual( x, y )
1353 # def testPerPositionViaIterator( self ):
1354 # # test pileup for each position. This is a slow operation
1355 # # so this test is disabled
1357 # samfile = pysam.Samfile( "ex1.bam", "rb")
1358 # fastafile = pysam.Fastafile( "ex1.fa" )
1359 # for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1360 # if x.reference_base == "*": continue
1361 # i = samfile.pileup( x.chromosome, x.pos, x.pos+1,
1362 # fastafile = fastafile,
1363 # stepper = "samtools" )
1364 # z = [ zz for zz in pysam.IteratorSamtools(i) if zz.pos == x.pos ]
1365 # self.assertEqual( len(z), 1 )
1366 # self.checkEqual( x, z[0] )
1368 # def testPerPositionViaCaller( self ):
1369 # # test pileup for each position. This is a fast operation
1370 # samfile = pysam.Samfile( "ex1.bam", "rb")
1371 # fastafile = pysam.Fastafile( "ex1.fa" )
1372 # i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1373 # caller = pysam.SNPCaller( i )
1375 # for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1376 # if x.reference_base == "*": continue
1377 # call = caller.call( x.chromosome, x.pos )
1378 # self.checkEqual( x, call )
1380 # class TestIndelCalls( unittest.TestCase ):
1381 # '''test pysam indel calling.'''
1383 # def checkEqual( self, a, b ):
1387 # "consensus_quality",
1389 # "mapping_quality",
1396 # if b.genotype == "*/*" and x == "second_allele":
1397 # # ignore test for second allele (positions chr2:439 and chr2:1512)
1399 # self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" %
1400 # (x, getattr(a, x), getattr(b,x), str(a), str(b)))
1402 # def testAllPositionsViaIterator( self ):
1404 # samfile = pysam.Samfile( "ex1.bam", "rb")
1405 # fastafile = pysam.Fastafile( "ex1.fa" )
1407 # refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base == "*"]
1408 # except pysam.SamtoolsError:
1411 # i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1412 # calls = [ x for x in list(pysam.IteratorIndelCalls(i)) if x != None ]
1413 # for x,y in zip( refs, calls ):
1414 # self.checkEqual( x, y )
1416 # def testPerPositionViaCaller( self ):
1417 # # test pileup for each position. This is a fast operation
1418 # samfile = pysam.Samfile( "ex1.bam", "rb")
1419 # fastafile = pysam.Fastafile( "ex1.fa" )
1420 # i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1421 # caller = pysam.IndelCaller( i )
1423 # for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1424 # if x.reference_base != "*": continue
1425 # call = caller.call( x.chromosome, x.pos )
1426 # self.checkEqual( x, call )
1428 class TestLogging( unittest.TestCase ):
1429 '''test around bug issue 42,
1431 failed in versions < 0.4
1434 def check( self, bamfile, log ):
1437 logger = logging.getLogger('franklin')
1438 logger.setLevel(logging.INFO)
1439 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
1440 log_hand = logging.FileHandler('log.txt')
1441 log_hand.setFormatter(formatter)
1442 logger.addHandler(log_hand)
1444 bam = pysam.Samfile(bamfile, 'rb')
1446 self.assert_( True )
1448 def testFail1( self ):
1449 self.check( "ex9_fail.bam", False )
1450 self.check( "ex9_fail.bam", True )
1452 def testNoFail1( self ):
1453 self.check( "ex9_nofail.bam", False )
1454 self.check( "ex9_nofail.bam", True )
1456 def testNoFail2( self ):
1457 self.check( "ex9_nofail.bam", True )
1458 self.check( "ex9_nofail.bam", True )
1461 # 1. finish testing all properties within pileup objects
1462 # 2. check exceptions and bad input problems (missing files, optional fields that aren't present, etc...)
1463 # 3. check: presence of sequence
1465 class TestSamfileUtilityFunctions( unittest.TestCase ):
1467 def testCount( self ):
1469 samfile = pysam.Samfile( "ex1.bam", "rb" )
1471 for contig in ("chr1", "chr2" ):
1472 for start in xrange( 0, 2000, 100 ):
1474 self.assertEqual( len( list( samfile.fetch( contig, start, end ) ) ),
1475 samfile.count( contig, start, end ) )
1477 # test empty intervals
1478 self.assertEqual( len( list( samfile.fetch( contig, start, start ) ) ),
1479 samfile.count( contig, start, start ) )
1481 # test half empty intervals
1482 self.assertEqual( len( list( samfile.fetch( contig, start ) ) ),
1483 samfile.count( contig, start ) )
1485 def testMate( self ):
1486 '''test mate access.'''
1488 readnames = [ x.split("\t")[0] for x in open( "ex1.sam", "rb" ).readlines() ]
1489 counts = collections.defaultdict( int )
1490 for x in readnames: counts[x] += 1
1492 samfile = pysam.Samfile( "ex1.bam", "rb" )
1493 for read in samfile.fetch():
1494 if not read.is_paired:
1495 self.assertRaises( ValueError, samfile.mate, read )
1496 elif read.mate_is_unmapped:
1497 self.assertRaises( ValueError, samfile.mate, read )
1499 if counts[read.qname] == 1:
1500 self.assertRaises( ValueError, samfile.mate, read )
1502 mate = samfile.mate( read )
1503 self.assertEqual( read.qname, mate.qname )
1504 self.assertEqual( read.is_read1, mate.is_read2 )
1505 self.assertEqual( read.is_read2, mate.is_read1 )
1506 self.assertEqual( read.pos, mate.mpos )
1507 self.assertEqual( read.mpos, mate.pos )
1509 def testIndexStats( self ):
1510 '''test if total number of mapped/unmapped reads is correct.'''
1512 samfile = pysam.Samfile( "ex1.bam", "rb" )
1513 self.assertEqual( samfile.mapped, 3235 )
1514 self.assertEqual( samfile.unmapped, 35 )
1516 class TestSamtoolsProxy( unittest.TestCase ):
1517 '''tests for sanity checking access to samtools functions.'''
1519 def testIndex( self ):
1520 self.assertRaises( IOError, pysam.index, "missing_file" )
1522 def testView( self ):
1523 # note that view still echos "open: No such file or directory"
1524 self.assertRaises( pysam.SamtoolsError, pysam.view, "missing_file" )
1526 def testSort( self ):
1527 self.assertRaises( pysam.SamtoolsError, pysam.sort, "missing_file" )
1529 class TestSamfileIndex( unittest.TestCase):
1531 def testIndex( self ):
1532 samfile = pysam.Samfile( "ex1.bam", "rb" )
1533 index = pysam.IndexedReads( samfile )
1536 reads = collections.defaultdict( int )
1538 for read in samfile: reads[read.qname] += 1
1540 for qname, counts in reads.iteritems():
1541 found = list(index.find( qname ))
1542 self.assertEqual( len(found), counts )
1543 for x in found: self.assertEqual( x.qname, qname )
1546 if __name__ == "__main__":
1548 print "building data files"
1549 subprocess.call( "make", shell=True)
1550 print "starting tests"