2 '''unit testing code for pysam.
4 Execute in the :file:`tests` directory as it requires the Makefile
5 and data files located there.
17 IS_PYTHON3 = sys.version_info[0] >= 3
20 from itertools import zip_longest
22 from itertools import izip as zip_longest
26 WORKDIR="pysam_test_work"
28 def checkBinaryEqual( filename1, filename2 ):
29 '''return true if the two files are binary equal.'''
30 if os.path.getsize( filename1 ) != os.path.getsize( filename2 ):
33 infile1 = open(filename1, "rb")
34 infile2 = open(filename2, "rb")
36 def chariter( infile ):
43 for c1,c2 in zip_longest( chariter( infile1), chariter( infile2) ):
52 def runSamtools( cmd ):
53 '''run a samtools command'''
56 retcode = subprocess.call(cmd, shell=True)
58 print("Child was terminated by signal", -retcode)
60 print("Execution failed:", e)
62 def getSamtoolsVersion():
63 '''return samtools version'''
65 with subprocess.Popen(SAMTOOLS, shell=True, stderr=subprocess.PIPE).stderr as pipe:
66 lines = b"".join(pipe.readlines())
69 lines = lines.decode('ascii')
70 return re.search( "Version:\s+(\S+)", lines).groups()[0]
72 class BinaryTest(unittest.TestCase):
73 '''test samtools command line commands and compare
74 against pysam commands.
76 Tests fail, if the output is not binary identical.
81 # a list of commands to test
86 ("ex1.view", "view ex1.bam > ex1.view"),
87 ("pysam_ex1.view", (pysam.view, "ex1.bam" ) ),
91 ("ex1.view", "view -bT ex1.fa -o ex1.view2 ex1.sam"),
92 # note that -o ex1.view2 throws exception.
93 ("pysam_ex1.view", (pysam.view, "-bT ex1.fa -oex1.view2 ex1.sam" ) ),
97 ( "ex1.sort.bam", "sort ex1.bam ex1.sort" ),
98 ( "pysam_ex1.sort.bam", (pysam.sort, "ex1.bam pysam_ex1.sort" ) ),
102 ("ex1.pileup", "mpileup ex1.bam > ex1.pileup" ),
103 ("pysam_ex1.mpileup", (pysam.mpileup, "ex1.bam" ) ),
107 ("ex1.depth", "depth ex1.bam > ex1.depth" ),
108 ("pysam_ex1.depth", (pysam.depth, "ex1.bam" ) ),
112 ("ex1.fa.fai", "faidx ex1.fa"),
113 ("pysam_ex1.fa.fai", (pysam.faidx, "ex1.fa") ),
117 ("ex1.bam.bai", "index ex1.bam" ),
118 ("pysam_ex1.bam.bai", (pysam.index, "pysam_ex1.bam" ) ),
122 ("ex1.idxstats", "idxstats ex1.bam > ex1.idxstats" ),
123 ("pysam_ex1.idxstats", (pysam.idxstats, "pysam_ex1.bam" ) ),
127 ("ex1.fixmate", "fixmate ex1.bam ex1.fixmate" ),
128 ("pysam_ex1.fixmate", (pysam.fixmate, "pysam_ex1.bam pysam_ex1.fixmate") ),
132 ("ex1.flagstat", "flagstat ex1.bam > ex1.flagstat" ),
133 ("pysam_ex1.flagstat", (pysam.flagstat, "pysam_ex1.bam") ),
137 ("ex1.calmd", "calmd ex1.bam ex1.fa > ex1.calmd" ),
138 ("pysam_ex1.calmd", (pysam.calmd, "pysam_ex1.bam ex1.fa") ),
142 ("ex1.merge", "merge -f ex1.merge ex1.bam ex1.bam" ),
143 # -f option does not work - following command will cause the subsequent
145 ("pysam_ex1.merge", (pysam.merge, "pysam_ex1.merge pysam_ex1.bam pysam_ex1.bam") ),
149 ("ex1.rmdup", "rmdup ex1.bam ex1.rmdup" ),
150 ("pysam_ex1.rmdup", (pysam.rmdup, "pysam_ex1.bam pysam_ex1.rmdup" )),
154 ( "ex1.reheader", "reheader ex1.bam ex1.bam > ex1.reheader"),
155 ( "pysam_ex1.reheader", (pysam.reheader, "ex1.bam ex1.bam" ) ),
159 ( "ex1.cat", "cat ex1.bam ex1.bam > ex1.cat"),
160 ( "pysam_ex1.cat", (pysam.cat, "ex1.bam ex1.bam" ) ),
164 ("ex1.targetcut", "targetcut ex1.bam > ex1.targetcut" ),
165 ("pysam_ex1.targetcut", (pysam.targetcut, "pysam_ex1.bam") ),
169 ("ex1.phase", "phase ex1.bam > ex1.phase" ),
170 ("pysam_ex1.phase", (pysam.phase, "pysam_ex1.bam") ),
174 ("ex1.bam", "import ex1.fa.fai ex1.sam.gz ex1.bam" ),
175 ("pysam_ex1.bam", (pysam.samimport, "ex1.fa.fai ex1.sam.gz pysam_ex1.bam") ),
179 ("ex1.bam2fq", "bam2fq ex1.bam > ex1.bam2fq" ),
180 ("pysam_ex1.bam2fq", (pysam.bam2fq, "pysam_ex1.bam") ),
184 # some tests depend on others. The order specifies in which order
185 # the samtools commands are executed.
186 # The first three (faidx, import, index) need to be in that order,
187 # the rest is arbitrary.
188 order = ('faidx', 'import', 'index',
189 # 'pileup1', 'pileup2', deprecated
190 # 'glfview', deprecated
211 For setup, all commands will be run before the first test is
212 executed. Individual tests will then just compare the output
215 if BinaryTest.first_time:
217 # remove previous files
218 if os.path.exists( WORKDIR ):
219 shutil.rmtree( WORKDIR )
222 # copy the source files to WORKDIR
223 os.makedirs( WORKDIR )
225 shutil.copy( "ex1.fa", os.path.join( WORKDIR, "pysam_ex1.fa" ) )
226 shutil.copy( "ex1.fa", os.path.join( WORKDIR, "ex1.fa" ) )
227 shutil.copy( "ex1.sam.gz", os.path.join( WORKDIR, "ex1.sam.gz" ) )
228 shutil.copy( "ex1.sam", os.path.join( WORKDIR, "ex1.sam" ) )
231 savedir = os.getcwd()
234 for label in self.order:
235 command = self.commands[label]
236 # build samtools command and target and run
237 samtools_target, samtools_command = command[0]
238 runSamtools( " ".join( (SAMTOOLS, samtools_command )))
240 # get pysam command and run
242 pysam_target, pysam_command = command[1]
243 except ValueError as msg:
244 raise ValueError( "error while setting up %s=%s: %s" %\
245 (label, command, msg) )
247 pysam_method, pysam_options = pysam_command
249 output = pysam_method( *pysam_options.split(" "), raw=True)
250 except pysam.SamtoolsError as msg:
251 raise pysam.SamtoolsError( "error while executing %s: options=%s: msg=%s" %\
252 (label, pysam_options, msg) )
256 if ">" in samtools_command:
257 with open( pysam_target, "wb" ) as outfile:
258 if type(output) == list:
261 outfile.write( line.encode('ascii') )
263 for line in output: outfile.write( line )
265 outfile.write(output)
268 BinaryTest.first_time = False
270 samtools_version = getSamtoolsVersion()
274 # patch - remove any of the alpha/beta suffixes, i.e., 0.1.12a -> 0.1.12
275 if s.count('-') > 0: s = s[0:s.find('-')]
276 return re.sub( "[^0-9.]", "", s )
278 if _r(samtools_version) != _r( pysam.__samtools_version__):
279 raise ValueError("versions of pysam/samtools and samtools differ: %s != %s" % \
280 (pysam.__samtools_version__,
283 def checkCommand( self, command ):
285 samtools_target, pysam_target = self.commands[command][0][0], self.commands[command][1][0]
286 samtools_target = os.path.join( WORKDIR, samtools_target )
287 pysam_target = os.path.join( WORKDIR, pysam_target )
288 self.assertTrue( checkBinaryEqual( samtools_target, pysam_target ),
289 "%s failed: files %s and %s are not the same" % (command, samtools_target, pysam_target) )
291 def testImport( self ):
292 self.checkCommand( "import" )
294 def testIndex( self ):
295 self.checkCommand( "index" )
297 def testSort( self ):
298 self.checkCommand( "sort" )
300 def testMpileup( self ):
301 self.checkCommand( "mpileup" )
303 def testDepth( self ):
304 self.checkCommand( "depth" )
306 def testIdxstats( self ):
307 self.checkCommand( "idxstats" )
309 def testFixmate( self ):
310 self.checkCommand( "fixmate" )
312 def testFlagstat( self ):
313 self.checkCommand( "flagstat" )
315 def testMerge( self ):
316 self.checkCommand( "merge" )
318 def testRmdup( self ):
319 self.checkCommand( "rmdup" )
321 def testReheader( self ):
322 self.checkCommand( "reheader" )
325 self.checkCommand( "cat" )
327 def testTargetcut( self ):
328 self.checkCommand( "targetcut" )
330 def testPhase( self ):
331 self.checkCommand( "phase" )
333 def testBam2fq( self ):
334 self.checkCommand( "bam2fq" )
336 # def testPileup1( self ):
337 # self.checkCommand( "pileup1" )
339 # def testPileup2( self ):
340 # self.checkCommand( "pileup2" )
343 # def testGLFView( self ):
344 # self.checkCommand( "glfview" )
346 def testView( self ):
347 self.checkCommand( "view" )
349 def testEmptyIndex( self ):
350 self.assertRaises( IOError, pysam.index, "exdoesntexist.bam" )
353 if os.path.exists( WORKDIR ):
355 # shutil.rmtree( WORKDIR )
357 class IOTest(unittest.TestCase):
358 '''check if reading samfile and writing a samfile are consistent.'''
360 def checkEcho( self, input_filename,
363 input_mode, output_mode, use_template = True ):
364 '''iterate through *input_filename* writing to *output_filename* and
365 comparing the output to *reference_filename*.
367 The files are opened according to the *input_mode* and *output_mode*.
369 If *use_template* is set, the header is copied from infile using the
370 template mechanism, otherwise target names and lengths are passed
375 infile = pysam.Samfile( input_filename, input_mode )
377 outfile = pysam.Samfile( output_filename, output_mode, template = infile )
379 outfile = pysam.Samfile( output_filename, output_mode,
380 referencenames = infile.references,
381 referencelengths = infile.lengths,
382 add_sq_text = False )
384 iter = infile.fetch()
386 for x in iter: outfile.write( x )
390 self.assertTrue( checkBinaryEqual( reference_filename, output_filename),
391 "files %s and %s are not the same" % (reference_filename, output_filename) )
394 def testReadWriteBam( self ):
396 input_filename = "ex1.bam"
397 output_filename = "pysam_ex1.bam"
398 reference_filename = "ex1.bam"
400 self.checkEcho( input_filename, reference_filename, output_filename,
403 def testReadWriteBamWithTargetNames( self ):
405 input_filename = "ex1.bam"
406 output_filename = "pysam_ex1.bam"
407 reference_filename = "ex1.bam"
409 self.checkEcho( input_filename, reference_filename, output_filename,
410 "rb", "wb", use_template = False )
412 def testReadWriteSamWithHeader( self ):
414 input_filename = "ex2.sam"
415 output_filename = "pysam_ex2.sam"
416 reference_filename = "ex2.sam"
418 self.checkEcho( input_filename, reference_filename, output_filename,
421 def testReadWriteSamWithoutHeader( self ):
423 input_filename = "ex2.sam"
424 output_filename = "pysam_ex2.sam"
425 reference_filename = "ex1.sam"
427 self.checkEcho( input_filename, reference_filename, output_filename,
430 def testReadSamWithoutTargetNames( self ):
432 input_filename = "example_unmapped_reads_no_sq.sam"
434 # raise exception in default mode
435 self.assertRaises( ValueError, pysam.Samfile, input_filename, "r" )
437 # raise exception if no SQ files
438 self.assertRaises( ValueError, pysam.Samfile, input_filename, "r",
441 infile = pysam.Samfile( input_filename, check_header = False, check_sq = False )
442 result = list(infile.fetch())
444 def testReadBamWithoutTargetNames( self ):
446 input_filename = "example_unmapped_reads_no_sq.bam"
448 # raise exception in default mode
449 self.assertRaises( ValueError, pysam.Samfile, input_filename, "r" )
451 # raise exception if no SQ files
452 self.assertRaises( ValueError, pysam.Samfile, input_filename, "r",
456 infile = pysam.Samfile( input_filename, check_header = False, check_sq = False )
457 result = list(infile.fetch( until_eof = True))
459 def testReadSamWithoutHeader( self ):
460 input_filename = "ex1.sam"
461 output_filename = "pysam_ex1.sam"
462 reference_filename = "ex1.sam"
464 # reading from a samfile without header is not implemented.
465 self.assertRaises( ValueError, pysam.Samfile, input_filename, "r" )
467 self.assertRaises( ValueError, pysam.Samfile, input_filename, "r",
468 check_header = False )
470 def testReadUnformattedFile( self ):
471 '''test reading from a file that is not bam/sam formatted'''
472 input_filename = "example.vcf40"
474 # bam - file raise error
475 self.assertRaises( ValueError, pysam.Samfile, input_filename, "rb" )
477 # sam - file error, but can't fetch
478 self.assertRaises( ValueError, pysam.Samfile, input_filename, "r" )
480 self.assertRaises( ValueError, pysam.Samfile, input_filename, "r",
481 check_header = False)
483 def testFetchFromClosedFile( self ):
485 samfile = pysam.Samfile( "ex1.bam", "rb" )
487 self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
489 def testClosedFile( self ):
490 '''test that access to a closed samfile raises ValueError.'''
492 samfile = pysam.Samfile( "ex1.bam", "rb" )
494 self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
495 self.assertRaises( ValueError, samfile.pileup, 'chr1', 100, 120)
496 self.assertRaises( ValueError, samfile.getrname, 0 )
497 self.assertRaises( ValueError, samfile.tell )
498 self.assertRaises( ValueError, samfile.seek, 0 )
499 self.assertRaises( ValueError, getattr, samfile, "nreferences" )
500 self.assertRaises( ValueError, getattr, samfile, "references" )
501 self.assertRaises( ValueError, getattr, samfile, "lengths" )
502 self.assertRaises( ValueError, getattr, samfile, "text" )
503 self.assertRaises( ValueError, getattr, samfile, "header" )
505 # write on closed file
506 self.assertEqual( 0, samfile.write(None) )
508 def testAutoDetection( self ):
509 '''test if autodetection works.'''
511 samfile = pysam.Samfile( "ex3.sam" )
512 self.assertRaises( ValueError, samfile.fetch, 'chr1' )
515 samfile = pysam.Samfile( "ex3.bam" )
516 samfile.fetch('chr1')
519 def testReadingFromSamFileWithoutHeader( self ):
520 '''read from samfile without header.
522 samfile = pysam.Samfile( "ex7.sam", check_header = False, check_sq = False )
523 self.assertRaises( NotImplementedError, samfile.__iter__ )
525 def testReadingFromFileWithoutIndex( self ):
526 '''read from bam file without index.'''
528 assert not os.path.exists( "ex2.bam.bai" )
529 samfile = pysam.Samfile( "ex2.bam", "rb" )
530 self.assertRaises( ValueError, samfile.fetch )
531 self.assertEqual( len(list( samfile.fetch(until_eof = True) )), 3270 )
533 def testReadingUniversalFileMode( self ):
534 '''read from samfile without header.
537 input_filename = "ex2.sam"
538 output_filename = "pysam_ex2.sam"
539 reference_filename = "ex1.sam"
541 self.checkEcho( input_filename, reference_filename, output_filename,
544 class TestFloatTagBug( unittest.TestCase ):
547 def testFloatTagBug( self ):
548 '''a float tag before another exposed a parsing bug in bam_aux_get - expected to fail
550 This test is expected to fail until samtools is fixed.
552 samfile = pysam.Samfile("tag_bug.bam")
553 read = next(samfile.fetch(until_eof=True))
554 self.assertTrue( ('XC',1) in read.tags )
555 self.assertEqual(read.opt('XC'), 1)
557 class TestLargeFieldBug( unittest.TestCase ):
560 def testLargeFileBug( self ):
561 '''when creating a read with a large entry in the tag field
563 NotImplementedError: tags field too large
565 samfile = pysam.Samfile("issue100.bam")
566 read = next(samfile.fetch(until_eof=True))
567 new_read = pysam.AlignedRead()
568 new_read.tags = read.tags
569 self.assertEqual( new_read.tags, read.tags )
571 class TestTagParsing( unittest.TestCase ):
572 '''tests checking the accuracy of tag setting and retrieval.'''
574 def makeRead( self ):
575 a = pysam.AlignedRead()
576 a.qname = "read_12345"
583 a.cigar = ( (0,10), (2,1), (0,25) )
591 def testNegativeIntegers( self ):
593 aligned_read = self.makeRead()
594 aligned_read.tags = [("XD", int(x) ) ]
595 # print (aligned_read.tags)
597 def testNegativeIntegers2( self ):
600 r.tags = [("XD", int(x) ) ]
601 outfile = pysam.Samfile( "test.bam",
603 referencenames = ("chr1",),
604 referencelengths = (1000,) )
608 def testCigarString( self ):
610 self.assertEqual( r.cigarstring, "M10D1M25" )
611 r.cigarstring = "M20D10M20"
612 self.assertEqual( r.cigar, [(0,20), (2,10), (0,20)])
614 class TestIteratorRow(unittest.TestCase):
617 self.samfile=pysam.Samfile( "ex1.bam","rb" )
619 def checkRange( self, rnge ):
620 '''compare results from iterator with those from samtools.'''
621 ps = list(self.samfile.fetch(region=rnge))
622 sa = list(pysam.view( "ex1.bam", rnge, raw = True) )
623 self.assertEqual( len(ps), len(sa), "unequal number of results for range %s: %i != %i" % (rnge, len(ps), len(sa) ))
624 # check if the same reads are returned and in the same order
625 for line, (a, b) in enumerate( list(zip( ps, sa )) ):
627 self.assertEqual( a.qname, d[0], "line %i: read id mismatch: %s != %s" % (line, a.rname, d[0]) )
628 self.assertEqual( a.pos, int(d[3])-1, "line %i: read position mismatch: %s != %s, \n%s\n%s\n" % \
629 (line, a.pos, int(d[3])-1,
631 if sys.version_info[0] < 3:
634 qual = d[10].encode('ascii')
635 self.assertEqual( a.qual, qual, "line %i: quality mismatch: %s != %s, \n%s\n%s\n" % \
639 def testIteratePerContig(self):
640 '''check random access per contig'''
641 for contig in self.samfile.references:
642 self.checkRange( contig )
644 def testIterateRanges(self):
645 '''check random access per range'''
646 for contig, length in zip(self.samfile.references, self.samfile.lengths):
647 for start in range( 1, length, 90):
648 self.checkRange( "%s:%i-%i" % (contig, start, start + 90) ) # this includes empty ranges
654 class TestIteratorRowAll(unittest.TestCase):
657 self.samfile=pysam.Samfile( "ex1.bam","rb" )
659 def testIterate(self):
660 '''compare results from iterator with those from samtools.'''
661 ps = list(self.samfile.fetch())
662 sa = list(pysam.view( "ex1.bam", raw = True) )
663 self.assertEqual( len(ps), len(sa), "unequal number of results: %i != %i" % (len(ps), len(sa) ))
664 # check if the same reads are returned
665 for line, pair in enumerate( list(zip( ps, sa )) ):
666 data = pair[1].split("\t")
667 self.assertEqual( pair[0].qname, data[0], "read id mismatch in line %i: %s != %s" % (line, pair[0].rname, data[0]) )
672 class TestIteratorColumn(unittest.TestCase):
673 '''test iterator column against contents of ex3.bam.'''
675 # note that samfile contains 1-based coordinates
676 # 1D means deletion with respect to reference sequence
678 mCoverages = { 'chr1' : [ 0 ] * 20 + [1] * 36 + [0] * (100 - 20 -35 ),
679 'chr2' : [ 0 ] * 20 + [1] * 35 + [0] * (100 - 20 -35 ),
683 self.samfile=pysam.Samfile( "ex4.bam","rb" )
685 def checkRange( self, contig, start = None, end = None, truncate = False ):
686 '''compare results from iterator with those from samtools.'''
687 # check if the same reads are returned and in the same order
688 for column in self.samfile.pileup(contig, start, end, truncate = truncate):
690 self.assertGreaterEqual( column.pos, start )
691 self.assertLess( column.pos, end )
692 thiscov = len(column.pileups)
693 refcov = self.mCoverages[self.samfile.getrname(column.tid)][column.pos]
694 self.assertEqual( thiscov, refcov, "wrong coverage at pos %s:%i %i should be %i" % (self.samfile.getrname(column.tid), column.pos, thiscov, refcov))
696 def testIterateAll(self):
697 '''check random access per contig'''
698 self.checkRange( None )
700 def testIteratePerContig(self):
701 '''check random access per contig'''
702 for contig in self.samfile.references:
703 self.checkRange( contig )
705 def testIterateRanges(self):
706 '''check random access per range'''
707 for contig, length in zip(self.samfile.references, self.samfile.lengths):
708 for start in range( 1, length, 90):
709 self.checkRange( contig, start, start + 90 ) # this includes empty ranges
711 def testInverse( self ):
712 '''test the inverse, is point-wise pileup accurate.'''
713 for contig, refseq in list(self.mCoverages.items()):
714 refcolumns = sum(refseq)
715 for pos, refcov in enumerate( refseq ):
716 columns = list(self.samfile.pileup( contig, pos, pos+1) )
718 # if no read, no coverage
719 self.assertEqual( len(columns), refcov, "wrong number of pileup columns returned for position %s:%i, %i should be %i" %(contig,pos,len(columns), refcov) )
721 # one read, all columns of the read are returned
722 self.assertEqual( len(columns), refcolumns, "pileup incomplete at position %i: got %i, expected %i " %\
723 (pos, len(columns), refcolumns))
725 def testIterateTruncate( self ):
726 '''check random access per range'''
727 for contig, length in zip(self.samfile.references, self.samfile.lengths):
728 for start in range( 1, length, 90):
729 self.checkRange( contig, start, start + 90, truncate = True ) # this includes empty ranges
737 class TestAlignedReadFromBam(unittest.TestCase):
740 self.samfile=pysam.Samfile( "ex3.bam","rb" )
741 self.reads=list(self.samfile.fetch())
743 def testARqname(self):
744 self.assertEqual( self.reads[0].qname, "read_28833_29006_6945", "read name mismatch in read 1: %s != %s" % (self.reads[0].qname, "read_28833_29006_6945") )
745 self.assertEqual( self.reads[1].qname, "read_28701_28881_323b", "read name mismatch in read 2: %s != %s" % (self.reads[1].qname, "read_28701_28881_323b") )
747 def testARflag(self):
748 self.assertEqual( self.reads[0].flag, 99, "flag mismatch in read 1: %s != %s" % (self.reads[0].flag, 99) )
749 self.assertEqual( self.reads[1].flag, 147, "flag mismatch in read 2: %s != %s" % (self.reads[1].flag, 147) )
751 def testARrname(self):
752 self.assertEqual( self.reads[0].rname, 0, "chromosome/target id mismatch in read 1: %s != %s" % (self.reads[0].rname, 0) )
753 self.assertEqual( self.reads[1].rname, 1, "chromosome/target id mismatch in read 2: %s != %s" % (self.reads[1].rname, 1) )
756 self.assertEqual( self.reads[0].pos, 33-1, "mapping position mismatch in read 1: %s != %s" % (self.reads[0].pos, 33-1) )
757 self.assertEqual( self.reads[1].pos, 88-1, "mapping position mismatch in read 2: %s != %s" % (self.reads[1].pos, 88-1) )
759 def testARmapq(self):
760 self.assertEqual( self.reads[0].mapq, 20, "mapping quality mismatch in read 1: %s != %s" % (self.reads[0].mapq, 20) )
761 self.assertEqual( self.reads[1].mapq, 30, "mapping quality mismatch in read 2: %s != %s" % (self.reads[1].mapq, 30) )
763 def testARcigar(self):
764 self.assertEqual( self.reads[0].cigar, [(0, 10), (2, 1), (0, 25)], "read name length mismatch in read 1: %s != %s" % (self.reads[0].cigar, [(0, 10), (2, 1), (0, 25)]) )
765 self.assertEqual( self.reads[1].cigar, [(0, 35)], "read name length mismatch in read 2: %s != %s" % (self.reads[1].cigar, [(0, 35)]) )
767 def testARmrnm(self):
768 self.assertEqual( self.reads[0].mrnm, 0, "mate reference sequence name mismatch in read 1: %s != %s" % (self.reads[0].mrnm, 0) )
769 self.assertEqual( self.reads[1].mrnm, 1, "mate reference sequence name mismatch in read 2: %s != %s" % (self.reads[1].mrnm, 1) )
770 self.assertEqual( self.reads[0].rnext, 0, "mate reference sequence name mismatch in read 1: %s != %s" % (self.reads[0].rnext, 0) )
771 self.assertEqual( self.reads[1].rnext, 1, "mate reference sequence name mismatch in read 2: %s != %s" % (self.reads[1].rnext, 1) )
773 def testARmpos(self):
774 self.assertEqual( self.reads[0].mpos, 200-1, "mate mapping position mismatch in read 1: %s != %s" % (self.reads[0].mpos, 200-1) )
775 self.assertEqual( self.reads[1].mpos, 500-1, "mate mapping position mismatch in read 2: %s != %s" % (self.reads[1].mpos, 500-1) )
776 self.assertEqual( self.reads[0].pnext, 200-1, "mate mapping position mismatch in read 1: %s != %s" % (self.reads[0].pnext, 200-1) )
777 self.assertEqual( self.reads[1].pnext, 500-1, "mate mapping position mismatch in read 2: %s != %s" % (self.reads[1].pnext, 500-1) )
779 def testARisize(self):
780 self.assertEqual( self.reads[0].isize, 167, "insert size mismatch in read 1: %s != %s" % (self.reads[0].isize, 167) )
781 self.assertEqual( self.reads[1].isize, 412, "insert size mismatch in read 2: %s != %s" % (self.reads[1].isize, 412) )
782 self.assertEqual( self.reads[0].tlen, 167, "insert size mismatch in read 1: %s != %s" % (self.reads[0].tlen, 167) )
783 self.assertEqual( self.reads[1].tlen, 412, "insert size mismatch in read 2: %s != %s" % (self.reads[1].tlen, 412) )
786 self.assertEqual( self.reads[0].seq, b"AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 1: %s != %s" % (self.reads[0].seq, b"AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
787 self.assertEqual( self.reads[1].seq, b"ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "sequence size mismatch in read 2: %s != %s" % (self.reads[1].seq, b"ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
788 self.assertEqual( self.reads[3].seq, b"AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 4: %s != %s" % (self.reads[3].seq, b"AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
790 def testARqual(self):
791 self.assertEqual( self.reads[0].qual, b"<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 1: %s != %s" % (self.reads[0].qual, b"<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
792 self.assertEqual( self.reads[1].qual, b"<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "quality string mismatch in read 2: %s != %s" % (self.reads[1].qual, b"<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
793 self.assertEqual( self.reads[3].qual, b"<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 3: %s != %s" % (self.reads[3].qual, b"<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
795 def testARquery(self):
796 self.assertEqual( self.reads[0].query, b"AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "query mismatch in read 1: %s != %s" % (self.reads[0].query, b"AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
797 self.assertEqual( self.reads[1].query, b"ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "query size mismatch in read 2: %s != %s" % (self.reads[1].query, b"ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
798 self.assertEqual( self.reads[3].query, b"TAGCTAGCTACCTATATCTTGGTCTT", "query mismatch in read 4: %s != %s" % (self.reads[3].query, b"TAGCTAGCTACCTATATCTTGGTCTT") )
800 def testARqqual(self):
801 self.assertEqual( self.reads[0].qqual, b"<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "qquality string mismatch in read 1: %s != %s" % (self.reads[0].qqual, b"<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
802 self.assertEqual( self.reads[1].qqual, b"<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "qquality string mismatch in read 2: %s != %s" % (self.reads[1].qqual, b"<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
803 self.assertEqual( self.reads[3].qqual, b"<<<<<<<<<<<<<<<<<:<9/,&,22", "qquality string mismatch in read 3: %s != %s" % (self.reads[3].qqual, b"<<<<<<<<<<<<<<<<<:<9/,&,22") )
805 def testPresentOptionalFields(self):
806 self.assertEqual( self.reads[0].opt('NM'), 1, "optional field mismatch in read 1, NM: %s != %s" % (self.reads[0].opt('NM'), 1) )
807 self.assertEqual( self.reads[0].opt('RG'), 'L1', "optional field mismatch in read 1, RG: %s != %s" % (self.reads[0].opt('RG'), 'L1') )
808 self.assertEqual( self.reads[1].opt('RG'), 'L2', "optional field mismatch in read 2, RG: %s != %s" % (self.reads[1].opt('RG'), 'L2') )
809 self.assertEqual( self.reads[1].opt('MF'), 18, "optional field mismatch in read 2, MF: %s != %s" % (self.reads[1].opt('MF'), 18) )
811 def testPairedBools(self):
812 self.assertEqual( self.reads[0].is_paired, True, "is paired mismatch in read 1: %s != %s" % (self.reads[0].is_paired, True) )
813 self.assertEqual( self.reads[1].is_paired, True, "is paired mismatch in read 2: %s != %s" % (self.reads[1].is_paired, True) )
814 self.assertEqual( self.reads[0].is_proper_pair, True, "is proper pair mismatch in read 1: %s != %s" % (self.reads[0].is_proper_pair, True) )
815 self.assertEqual( self.reads[1].is_proper_pair, True, "is proper pair mismatch in read 2: %s != %s" % (self.reads[1].is_proper_pair, True) )
817 def testTags( self ):
818 self.assertEqual( self.reads[0].tags,
819 [('NM', 1), ('RG', 'L1'),
820 ('PG', 'P1'), ('XT', 'U')] )
821 self.assertEqual( self.reads[1].tags,
822 [('MF', 18), ('RG', 'L2'),
823 ('PG', 'P2'),('XT', 'R') ] )
826 self.assertEqual( self.reads[0].opt("XT"), "U" )
827 self.assertEqual( self.reads[1].opt("XT"), "R" )
829 def testMissingOpt( self ):
830 self.assertRaises( KeyError, self.reads[0].opt, "XP" )
832 def testEmptyOpt( self ):
833 self.assertRaises( KeyError, self.reads[2].opt, "XT" )
838 class TestAlignedReadFromSam(TestAlignedReadFromBam):
841 self.samfile=pysam.Samfile( "ex3.sam","r" )
842 self.reads=list(self.samfile.fetch())
844 # needs to be implemented
845 # class TestAlignedReadFromSamWithoutHeader(TestAlignedReadFromBam):
848 # self.samfile=pysam.Samfile( "ex7.sam","r" )
849 # self.reads=list(self.samfile.fetch())
851 class TestHeaderSam(unittest.TestCase):
853 header = {'SQ': [{'LN': 1575, 'SN': 'chr1'},
854 {'LN': 1584, 'SN': 'chr2'}],
855 'RG': [{'LB': 'SC_1', 'ID': 'L1', 'SM': 'NA12891', 'PU': 'SC_1_10', "CN":"name:with:colon"},
856 {'LB': 'SC_2', 'ID': 'L2', 'SM': 'NA12891', 'PU': 'SC_2_12', "CN":"name:with:colon"}],
857 'PG': [{'ID': 'P1', 'VN': '1.0'}, {'ID': 'P2', 'VN': '1.1'}],
859 'CO' : [ 'this is a comment', 'this is another comment'],
862 def compareHeaders( self, a, b ):
863 '''compare two headers a and b.'''
864 for ak,av in a.items():
865 self.assertTrue( ak in b, "key '%s' not in '%s' " % (ak,b) )
866 self.assertEqual( av, b[ak] )
869 self.samfile=pysam.Samfile( "ex3.sam","r" )
871 def testHeaders(self):
872 self.compareHeaders( self.header, self.samfile.header )
873 self.compareHeaders( self.samfile.header, self.header )
875 def testNameMapping( self ):
876 for x, y in enumerate( ("chr1", "chr2")):
877 tid = self.samfile.gettid( y )
878 ref = self.samfile.getrname( x )
879 self.assertEqual( tid, x )
880 self.assertEqual( ref, y )
882 self.assertEqual( self.samfile.gettid("chr?"), -1 )
883 self.assertRaises( ValueError, self.samfile.getrname, 2 )
888 class TestHeaderBam(TestHeaderSam):
891 self.samfile=pysam.Samfile( "ex3.bam","rb" )
894 class TestUnmappedReads(unittest.TestCase):
897 samfile=pysam.Samfile( "ex5.sam","r" )
898 self.assertEqual( len(list(samfile.fetch( until_eof = True))), 2 )
902 samfile=pysam.Samfile( "ex5.bam","rb" )
903 self.assertEqual( len(list(samfile.fetch( until_eof = True))), 2 )
906 class TestPileupObjects(unittest.TestCase):
909 self.samfile=pysam.Samfile( "ex1.bam","rb" )
911 def testPileupColumn(self):
912 for pcolumn1 in self.samfile.pileup( region="chr1:105" ):
913 if pcolumn1.pos == 104:
914 self.assertEqual( pcolumn1.tid, 0, "chromosome/target id mismatch in position 1: %s != %s" % (pcolumn1.tid, 0) )
915 self.assertEqual( pcolumn1.pos, 105-1, "position mismatch in position 1: %s != %s" % (pcolumn1.pos, 105-1) )
916 self.assertEqual( pcolumn1.n, 2, "# reads mismatch in position 1: %s != %s" % (pcolumn1.n, 2) )
917 for pcolumn2 in self.samfile.pileup( region="chr2:1480" ):
918 if pcolumn2.pos == 1479:
919 self.assertEqual( pcolumn2.tid, 1, "chromosome/target id mismatch in position 1: %s != %s" % (pcolumn2.tid, 1) )
920 self.assertEqual( pcolumn2.pos, 1480-1, "position mismatch in position 1: %s != %s" % (pcolumn2.pos, 1480-1) )
921 self.assertEqual( pcolumn2.n, 12, "# reads mismatch in position 1: %s != %s" % (pcolumn2.n, 12) )
923 def testPileupRead(self):
924 for pcolumn1 in self.samfile.pileup( region="chr1:105" ):
925 if pcolumn1.pos == 104:
926 self.assertEqual( len(pcolumn1.pileups), 2, "# reads aligned to column mismatch in position 1: %s != %s" % (len(pcolumn1.pileups), 2) )
927 # self.assertEqual( pcolumn1.pileups[0] # need to test additional properties here
932 def testIteratorOutOfScope( self ):
933 '''test if exception is raised if pileup col is accessed after iterator is exhausted.'''
935 for pileupcol in self.samfile.pileup():
938 self.assertRaises( ValueError, getattr, pileupcol, "pileups" )
940 class TestContextManager(unittest.TestCase):
942 def testManager( self ):
943 with pysam.Samfile('ex1.bam', 'rb') as samfile:
945 self.assertEqual( samfile._isOpen(), False )
947 class TestExceptions(unittest.TestCase):
950 self.samfile=pysam.Samfile( "ex1.bam","rb" )
952 def testMissingFile(self):
954 self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.bam", "rb" )
955 self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.sam", "r" )
956 self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.bam", "r" )
957 self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.sam", "rb" )
959 def testBadContig(self):
960 self.assertRaises( ValueError, self.samfile.fetch, "chr88" )
962 def testMeaninglessCrap(self):
963 self.assertRaises( ValueError, self.samfile.fetch, "skljf" )
965 def testBackwardsOrderNewFormat(self):
966 self.assertRaises( ValueError, self.samfile.fetch, 'chr1', 100, 10 )
968 def testBackwardsOrderOldFormat(self):
969 self.assertRaises( ValueError, self.samfile.fetch, region="chr1:100-10")
971 def testOutOfRangeNegativeNewFormat(self):
972 self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, -10 )
973 self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, 0 )
974 self.assertRaises( ValueError, self.samfile.fetch, "chr1", -5, -10 )
976 self.assertRaises( ValueError, self.samfile.count, "chr1", 5, -10 )
977 self.assertRaises( ValueError, self.samfile.count, "chr1", 5, 0 )
978 self.assertRaises( ValueError, self.samfile.count, "chr1", -5, -10 )
980 def testOutOfRangeNegativeOldFormat(self):
981 self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-10" )
982 self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-0" )
983 self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5--10" )
985 self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-10" )
986 self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-0" )
987 self.assertRaises( ValueError, self.samfile.count, region="chr1:-5--10" )
989 def testOutOfRangNewFormat(self):
990 self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999, 99999999999 )
991 self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999, 99999999999 )
993 def testOutOfRangeLargeNewFormat(self):
994 self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
995 self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
997 def testOutOfRangeLargeOldFormat(self):
998 self.assertRaises( ValueError, self.samfile.fetch, "chr1:99999999999999999-999999999999999999" )
999 self.assertRaises( ValueError, self.samfile.count, "chr1:99999999999999999-999999999999999999" )
1001 def testZeroToZero(self):
1003 self.assertEqual( len(list(self.samfile.fetch('chr1', 0, 0))), 0)
1006 self.samfile.close()
1008 class TestWrongFormat(unittest.TestCase):
1009 '''test cases for opening files not in bam/sam format.'''
1011 def testOpenSamAsBam( self ):
1012 self.assertRaises( ValueError, pysam.Samfile, 'ex1.sam', 'rb' )
1014 def testOpenBamAsSam( self ):
1015 # test fails, needs to be implemented.
1016 # sam.fetch() fails on reading, not on opening
1017 # self.assertRaises( ValueError, pysam.Samfile, 'ex1.bam', 'r' )
1020 def testOpenFastaAsSam( self ):
1021 # test fails, needs to be implemented.
1022 # sam.fetch() fails on reading, not on opening
1023 # self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'r' )
1026 def testOpenFastaAsBam( self ):
1027 self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'rb' )
1029 class TestFastaFile(unittest.TestCase):
1031 mSequences = { 'chr1' :
1032 b"CACTAGTGGCTCATTGTAAATGTGTGGTTTAACTCGTCCATGGCCCAGCATTAGGGAGCTGTGGACCCTGCAGCCTGGCTGTGGGGGCCGCAGTGGCTGAGGGGTGCAGAGCCGAGTCACGGGGTTGCCAGCACAGGGGCTTAACCTCTGGTGACTGCCAGAGCTGCTGGCAAGCTAGAGTCCCATTTGGAGCCCCTCTAAGCCGTTCTATTTGTAATGAAAACTATATTTATGCTATTCAGTTCTAAATATAGAAATTGAAACAGCTGTGTTTAGTGCCTTTGTTCAACCCCCTTGCAACAACCTTGAGAACCCCAGGGAATTTGTCAATGTCAGGGAAGGAGCATTTTGTCAGTTACCAAATGTGTTTATTACCAGAGGGATGGAGGGAAGAGGGACGCTGAAGAACTTTGATGCCCTCTTCTTCCAAAGATGAAACGCGTAACTGCGCTCTCATTCACTCCAGCTCCCTGTCACCCAATGGACCTGTGATATCTGGATTCTGGGAAATTCTTCATCCTGGACCCTGAGAGATTCTGCAGCCCAGCTCCAGATTGCTTGTGGTCTGACAGGCTGCAACTGTGAGCCATCACAATGAACAACAGGAAGAAAAGGTCTTTCAAAAGGTGATGTGTGTTCTCATCAACCTCATACACACACATGGTTTAGGGGTATAATACCTCTACATGGCTGATTATGAAAACAATGTTCCCCAGATACCATCCCTGTCTTACTTCCAGCTCCCCAGAGGGAAAGCTTTCAACGCTTCTAGCCATTTCTTTTGGCATTTGCCTTCAGACCCTACACGAATGCGTCTCTACCACAGGGGGCTGCGCGGTTTCCCATCATGAAGCACTGAACTTCCACGTCTCATCTAGGGGAACAGGGAGGTGCACTAATGCGCTCCACGCCCAAGCCCTTCTCACAGTTTCTGCCCCCAGCATGGTTGTACTGGGCAATACATGAGATTATTAGGAAATGCTTTACTGTCATAACTATGAAGAGACTATTGCCAGATGAACCACACATTAATACTATGTTTCTTATCTGCACATTACTACCCTGCAATTAATATAATTGTGTCCATGTACACACGCTGTCCTATGTACTTATCATGACTCTATCCCAAATTCCCAATTACGTCCTATCTTCTTCTTAGGGAAGAACAGCTTAGGTATCAATTTGGTGTTCTGTGTAAAGTCTCAGGGAGCCGTCCGTGTCCTCCCATCTGGCCTCGTCCACACTGGTTCTCTTGAAAGCTTGGGCTGTAATGATGCCCCTTGGCCATCACCCAGTCCCTGCCCCATCTCTTGTAATCTCTCTCCTTTTTGCTGCATCCCTGTCTTCCTCTGTCTTGATTTACTTGTTGTTGGTTTTCTGTTTCTTTGTTTGATTTGGTGGAAGACATAATCCCACGCTTCCTATGGAAAGGTTGTTGGGAGATTTTTAATGATTCCTCAATGTTAAAATGTCTATTTTTGTCTTGACACCCAACTAATATTTGTCTGAGCAAAACAGTCTAGATGAGAGAGAACTTCCCTGGAGGTCTGATGGCGTTTCTCCCTCGTCTTCTTA",
1034 b"TTCAAATGAACTTCTGTAATTGAAAAATTCATTTAAGAAATTACAAAATATAGTTGAAAGCTCTAACAATAGACTAAACCAAGCAGAAGAAAGAGGTTCAGAACTTGAAGACAAGTCTCTTATGAATTAACCCAGTCAGACAAAAATAAAGAAAAAAATTTTAAAAATGAACAGAGCTTTCAAGAAGTATGAGATTATGTAAAGTAACTGAACCTATGAGTCACAGGTATTCCTGAGGAAAAAGAAAAAGTGAGAAGTTTGGAAAAACTATTTGAGGAAGTAATTGGGGAAAACCTCTTTAGTCTTGCTAGAGATTTAGACATCTAAATGAAAGAGGCTCAAAGAATGCCAGGAAGATACATTGCAAGACAGACTTCATCAAGATATGTAGTCATCAGACTATCTAAAGTCAACATGAAGGAAAAAAATTCTAAAATCAGCAAGAGAAAAGCATACAGTCATCTATAAAGGAAATCCCATCAGAATAACAATGGGCTTCTCAGCAGAAACCTTACAAGCCAGAAGAGATTGGATCTAATTTTTGGACTTCTTAAAGAAAAAAAAACCTGTCAAACACGAATGTTATGCCCTGCTAAACTAAGCATCATAAATGAAGGGGAAATAAAGTCAAGTCTTTCCTGACAAGCAAATGCTAAGATAATTCATCATCACTAAACCAGTCCTATAAGAAATGCTCAAAAGAATTGTAAAAGTCAAAATTAAAGTTCAATACTCACCATCATAAATACACACAAAAGTACAAAACTCACAGGTTTTATAAAACAATTGAGACTACAGAGCAACTAGGTAAAAAATTAACATTACAACAGGAACAAAACCTCATATATCAATATTAACTTTGAATAAAAAGGGATTAAATTCCCCCACTTAAGAGATATAGATTGGCAGAACAGATTTAAAAACATGAACTAACTATATGCTGTTTACAAGAAACTCATTAATAAAGACATGAGTTCAGGTAAAGGGGTGGAAAAAGATGTTCTACGCAAACAGAAACCAAATGAGAGAAGGAGTAGCTATACTTATATCAGATAAAGCACACTTTAAATCAACAACAGTAAAATAAAACAAAGGAGGTCATCATACAATGATAAAAAGATCAATTCAGCAAGAAGATATAACCATCCTACTAAATACATATGCACCTAACACAAGACTACCCAGATTCATAAAACAAATACTACTAGACCTAAGAGGGATGAGAAATTACCTAATTGGTACAATGTACAATATTCTGATGATGGTTACACTAAAAGCCCATACTTTACTGCTACTCAATATATCCATGTAACAAATCTGCGCTTGTACTTCTAAATCTATAAAAAAATTAAAATTTAACAAAAGTAAATAAAACACATAGCTAAAACTAAAAAAGCAAAAACAAAAACTATGCTAAGTATTGGTAAAGATGTGGGGAAAAAAGTAAACTCTCAAATATTGCTAGTGGGAGTATAAATTGTTTTCCACTTTGGAAAACAATTTGGTAATTTCGTTTTTTTTTTTTTCTTTTCTCTTTTTTTTTTTTTTTTTTTTGCATGCCAGAAAAAAATATTTACAGTAACT",
1038 self.file=pysam.Fastafile( "ex1.fa" )
1040 def testFetch(self):
1041 for id, seq in list(self.mSequences.items()):
1042 self.assertEqual( seq, self.file.fetch( id ) )
1043 for x in range( 0, len(seq), 10):
1044 self.assertEqual( seq[x:x+10], self.file.fetch( id, x, x+10) )
1046 self.assertEqual( seq[x:], self.file.fetch( id, x) )
1048 self.assertEqual( seq[:x], self.file.fetch( id, None, x) )
1051 # unknown sequence returns ""
1052 self.assertEqual( b"", self.file.fetch("chr12") )
1054 def testOutOfRangeAccess( self ):
1055 '''test out of range access.'''
1056 # out of range access returns an empty string
1057 for contig, s in self.mSequences.items():
1058 self.assertEqual( self.file.fetch( contig, len(s), len(s)+1), b"" )
1060 self.assertEqual( self.file.fetch( "chr3", 0 , 100), b"" )
1062 def testFetchErrors( self ):
1063 self.assertRaises( ValueError, self.file.fetch )
1064 self.assertRaises( ValueError, self.file.fetch, "chr1", -1, 10 )
1065 self.assertRaises( ValueError, self.file.fetch, "chr1", 20, 10 )
1067 def testLength( self ):
1068 self.assertEqual( len(self.file), 2 )
1073 class TestAlignedRead(unittest.TestCase):
1074 '''tests to check if aligned read can be constructed
1078 def checkFieldEqual( self, read1, read2, exclude = []):
1079 '''check if two reads are equal by comparing each field.'''
1081 for x in ("qname", "seq", "flag",
1082 "rname", "pos", "mapq", "cigar",
1083 "mrnm", "mpos", "isize", "qual",
1084 "is_paired", "is_proper_pair",
1085 "is_unmapped", "mate_is_unmapped",
1086 "is_reverse", "mate_is_reverse",
1087 "is_read1", "is_read2",
1088 "is_secondary", "is_qcfail",
1089 "is_duplicate", "bin"):
1090 if x in exclude: continue
1091 self.assertEqual( getattr(read1, x), getattr(read2,x), "attribute mismatch for %s: %s != %s" %
1092 (x, getattr(read1, x), getattr(read2,x)))
1094 def testEmpty( self ):
1095 a = pysam.AlignedRead()
1096 self.assertEqual( a.qname, None )
1097 self.assertEqual( a.seq, None )
1098 self.assertEqual( a.qual, None )
1099 self.assertEqual( a.flag, 0 )
1100 self.assertEqual( a.rname, 0 )
1101 self.assertEqual( a.mapq, 0 )
1102 self.assertEqual( a.cigar, None )
1103 self.assertEqual( a.tags, [] )
1104 self.assertEqual( a.mrnm, 0 )
1105 self.assertEqual( a.mpos, 0 )
1106 self.assertEqual( a.isize, 0 )
1108 def buildRead( self ):
1109 '''build an example read.'''
1111 a = pysam.AlignedRead()
1112 a.qname = "read_12345"
1118 a.cigar = ( (0,10), (2,1), (0,9), (1,1), (0,20) )
1126 def testUpdate( self ):
1127 '''check if updating fields affects other variable length data
1129 a = self.buildRead()
1130 b = self.buildRead()
1133 b.qname = "read_123"
1134 self.checkFieldEqual( a, b, "qname" )
1135 b.qname = "read_12345678"
1136 self.checkFieldEqual( a, b, "qname" )
1137 b.qname = "read_12345"
1138 self.checkFieldEqual( a, b)
1141 b.cigar = ( (0,10), )
1142 self.checkFieldEqual( a, b, "cigar" )
1143 b.cigar = ( (0,10), (2,1), (0,10) )
1144 self.checkFieldEqual( a, b, "cigar" )
1145 b.cigar = ( (0,10), (2,1), (0,9), (1,1), (0,20) )
1146 self.checkFieldEqual( a, b)
1150 self.checkFieldEqual( a, b, ("seq", "qual") )
1152 self.checkFieldEqual( a, b, ("seq", "qual") )
1154 self.checkFieldEqual( a, b, ("qual",))
1157 b = self.buildRead()
1161 "is_paired", "is_proper_pair",
1162 "is_unmapped", "mate_is_unmapped",
1163 "is_reverse", "mate_is_reverse",
1164 "is_read1", "is_read2",
1165 "is_secondary", "is_qcfail",
1167 setattr( b, x, True )
1168 self.assertEqual( getattr(b, x), True )
1169 self.checkFieldEqual( a, b, ("flag", x,) )
1170 setattr( b, x, False )
1171 self.assertEqual( getattr(b, x), False )
1172 self.checkFieldEqual( a, b )
1174 def testLargeRead( self ):
1175 '''build an example read.'''
1177 a = pysam.AlignedRead()
1178 a.qname = "read_12345"
1184 a.cigar = ( (0, 4 * 200), )
1192 def testTagParsing( self ):
1193 '''test for tag parsing
1195 see http://groups.google.com/group/pysam-user-group/browse_thread/thread/67ca204059ea465a
1197 samfile=pysam.Samfile( "ex8.bam","rb" )
1199 for entry in samfile:
1201 entry.tags = entry.tags
1203 self.assertEqual( after, before )
1205 def testUpdateTlen( self ):
1206 '''check if updating tlen works'''
1207 a = self.buildRead()
1211 self.assertEqual( a.tlen, oldlen )
1213 def testPositions( self ):
1214 a = self.buildRead()
1215 self.assertEqual( a.positions,
1216 [20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
1217 31, 32, 33, 34, 35, 36, 37, 38, 39,
1218 40, 41, 42, 43, 44, 45, 46, 47, 48, 49,
1219 50, 51, 52, 53, 54, 55, 56, 57, 58, 59] )
1221 self.assertEqual( a.aligned_pairs,
1222 [(0, 20), (1, 21), (2, 22), (3, 23), (4, 24),
1223 (5, 25), (6, 26), (7, 27), (8, 28), (9, 29),
1225 (10, 31), (11, 32), (12, 33), (13, 34), (14, 35),
1226 (15, 36), (16, 37), (17, 38), (18, 39), (19, None),
1227 (20, 40), (21, 41), (22, 42), (23, 43), (24, 44),
1228 (25, 45), (26, 46), (27, 47), (28, 48), (29, 49),
1229 (30, 50), (31, 51), (32, 52), (33, 53), (34, 54),
1230 (35, 55), (36, 56), (37, 57), (38, 58), (39, 59)] )
1232 self.assertEqual( a.positions, [x[1] for x in a.aligned_pairs if x[0] != None and x[1] != None] )
1233 # alen is the length of the aligned read in genome
1234 self.assertEqual( a.alen, a.aligned_pairs[-1][0] + 1 )
1235 # aend points to one beyond last aligned base in ref
1236 self.assertEqual( a.positions[-1], a.aend - 1 )
1238 class TestDeNovoConstruction(unittest.TestCase):
1239 '''check BAM/SAM file construction using ex6.sam
1241 (note these are +1 coordinates):
1243 read_28833_29006_6945 99 chr1 33 20 10M1D25M = 200 167 AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG <<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<< NM:i:1 RG:Z:L1
1244 read_28701_28881_323b 147 chr2 88 30 35M = 500 412 ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA <<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<< MF:i:18 RG:Z:L2
1247 header = { 'HD': {'VN': '1.0'},
1248 'SQ': [{'LN': 1575, 'SN': 'chr1'},
1249 {'LN': 1584, 'SN': 'chr2'}], }
1254 def checkFieldEqual( self, read1, read2, exclude = []):
1255 '''check if two reads are equal by comparing each field.'''
1257 for x in ("qname", "seq", "flag",
1258 "rname", "pos", "mapq", "cigar",
1259 "mrnm", "mpos", "isize", "qual",
1261 "is_paired", "is_proper_pair",
1262 "is_unmapped", "mate_is_unmapped",
1263 "is_reverse", "mate_is_reverse",
1264 "is_read1", "is_read2",
1265 "is_secondary", "is_qcfail",
1267 if x in exclude: continue
1268 self.assertEqual( getattr(read1, x), getattr(read2,x), "attribute mismatch for %s: %s != %s" %
1269 (x, getattr(read1, x), getattr(read2,x)))
1273 a = pysam.AlignedRead()
1274 a.qname = "read_28833_29006_6945"
1275 a.seq="AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG"
1280 a.cigar = ( (0,10), (2,1), (0,25) )
1284 a.qual="<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<"
1285 a.tags = ( ("NM", 1),
1288 b = pysam.AlignedRead()
1289 b.qname = "read_28701_28881_323b"
1290 b.seq="ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA"
1295 b.cigar = ( (0,35), )
1299 b.qual="<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<"
1300 b.tags = ( ("MF", 18),
1305 def testSAMWholeFile( self ):
1307 tmpfilename = "tmp_%i.sam" % id(self)
1309 outfile = pysam.Samfile( tmpfilename, "wh", header = self.header )
1311 for x in self.reads: outfile.write( x )
1313 self.assertTrue( checkBinaryEqual( tmpfilename, self.samfile ),
1314 "mismatch when construction SAM file, see %s %s" % (tmpfilename, self.samfile))
1316 os.unlink( tmpfilename )
1318 def testBAMPerRead( self ):
1319 '''check if individual reads are binary equal.'''
1320 infile = pysam.Samfile( self.bamfile, "rb")
1322 others = list(infile)
1323 for denovo, other in zip( others, self.reads):
1324 self.checkFieldEqual( other, denovo )
1325 self.assertEqual( other.compare( denovo ), 0 )
1327 def testSAMPerRead( self ):
1328 '''check if individual reads are binary equal.'''
1329 infile = pysam.Samfile( self.samfile, "r")
1331 others = list(infile)
1332 for denovo, other in zip( others, self.reads):
1333 self.checkFieldEqual( other, denovo )
1334 self.assertEqual( other.compare( denovo), 0 )
1336 def testBAMWholeFile( self ):
1338 tmpfilename = "tmp_%i.bam" % id(self)
1340 outfile = pysam.Samfile( tmpfilename, "wb", header = self.header )
1342 for x in self.reads: outfile.write( x )
1345 self.assertTrue( checkBinaryEqual( tmpfilename, self.bamfile ),
1346 "mismatch when construction BAM file, see %s %s" % (tmpfilename, self.bamfile))
1348 os.unlink( tmpfilename )
1350 class TestDeNovoConstructionUserTags(TestDeNovoConstruction):
1351 '''test de novo construction with a header that contains lower-case tags.'''
1353 header = { 'HD': {'VN': '1.0'},
1354 'SQ': [{'LN': 1575, 'SN': 'chr1'},
1355 {'LN': 1584, 'SN': 'chr2'}],
1356 'x1': {'A': 2, 'B': 5 },
1357 'x3': {'A': 6, 'B': 5 },
1358 'x2': {'A': 4, 'B': 5 } }
1360 bamfile = "example_user_header.bam"
1361 samfile = "example_user_header.sam"
1363 class TestEmptyHeader( unittest.TestCase ):
1366 def testEmptyHeader( self ):
1368 s = pysam.Samfile('example_empty_header.bam')
1369 self.assertEqual( s.header, {'SQ': [{'LN': 1000, 'SN': 'chr1'}]} )
1371 class TestBTagSam( unittest.TestCase ):
1374 compare = [ [100, 1, 91, 0, 7, 101, 0, 201, 96, 204, 0, 0, 87, 109, 0, 7, 97, 112, 1, 12, 78, 197, 0, 7, 100, 95, 101, 202, 0, 6, 0, 1, 186, 0, 84, 0, 244, 0, 0, 324, 0, 107, 195, 101, 113, 0, 102, 0, 104, 3, 0, 101, 1, 0, 212, 6, 0, 0, 1, 0, 74, 1, 11, 0, 196, 2, 197, 103, 0, 108, 98, 2, 7, 0, 1, 2, 194, 0, 180, 0, 108, 0, 203, 104, 16, 5, 205, 0, 0, 0, 1, 1, 100, 98, 0, 0, 204, 6, 0, 79, 0, 0, 101, 7, 109, 90, 265, 1, 27, 10, 109, 102, 9, 0, 292, 0, 110, 0, 0, 102, 112, 0, 0, 84, 100, 103, 2, 81, 126, 0, 2, 90, 0, 15, 96, 15, 1, 0, 2, 0, 107, 92, 0, 0, 101, 3, 98, 15, 102, 13, 116, 116, 90, 93, 198, 0, 0, 0, 199, 92, 26, 495, 100, 5, 0, 100, 5, 209, 0, 92, 107, 90, 0, 0, 0, 0, 109, 194, 7, 94, 200, 0, 40, 197, 0, 11, 0, 0, 112, 110, 6, 4, 200, 28, 0, 196, 0, 203, 1, 129, 0, 0, 1, 0, 94, 0, 1, 0, 107, 5, 201, 3, 3, 100, 0, 121, 0, 7, 0, 1, 105, 306, 3, 86, 8, 183, 0, 12, 163, 17, 83, 22, 0, 0, 1, 8, 109, 103, 0, 0, 295, 0, 200, 16, 172, 3, 16, 182, 3, 11, 0, 0, 223, 111, 103, 0, 5, 225, 0, 95],
1375 [-100,200,-300,-400],
1380 filename = 'example_btag.sam'
1382 def testRead( self ):
1384 s = pysam.Samfile(self.filename)
1385 for x, read in enumerate(s):
1387 self.assertEqual( read.tags, [('RG', 'QW85I'), ('PG', 'tmap'), ('MD', '140'), ('NM', 0), ('AS', 140), ('FZ', [100, 1, 91, 0, 7, 101, 0, 201, 96, 204, 0, 0, 87, 109, 0, 7, 97, 112, 1, 12, 78, 197, 0, 7, 100, 95, 101, 202, 0, 6, 0, 1, 186, 0, 84, 0, 244, 0, 0, 324, 0, 107, 195, 101, 113, 0, 102, 0, 104, 3, 0, 101, 1, 0, 212, 6, 0, 0, 1, 0, 74, 1, 11, 0, 196, 2, 197, 103, 0, 108, 98, 2, 7, 0, 1, 2, 194, 0, 180, 0, 108, 0, 203, 104, 16, 5, 205, 0, 0, 0, 1, 1, 100, 98, 0, 0, 204, 6, 0, 79, 0, 0, 101, 7, 109, 90, 265, 1, 27, 10, 109, 102, 9, 0, 292, 0, 110, 0, 0, 102, 112, 0, 0, 84, 100, 103, 2, 81, 126, 0, 2, 90, 0, 15, 96, 15, 1, 0, 2, 0, 107, 92, 0, 0, 101, 3, 98, 15, 102, 13, 116, 116, 90, 93, 198, 0, 0, 0, 199, 92, 26, 495, 100, 5, 0, 100, 5, 209, 0, 92, 107, 90, 0, 0, 0, 0, 109, 194, 7, 94, 200, 0, 40, 197, 0, 11, 0, 0, 112, 110, 6, 4, 200, 28, 0, 196, 0, 203, 1, 129, 0, 0, 1, 0, 94, 0, 1, 0, 107, 5, 201, 3, 3, 100, 0, 121, 0, 7, 0, 1, 105, 306, 3, 86, 8, 183, 0, 12, 163, 17, 83, 22, 0, 0, 1, 8, 109, 103, 0, 0, 295, 0, 200, 16, 172, 3, 16, 182, 3, 11, 0, 0, 223, 111, 103, 0, 5, 225, 0, 95]), ('XA', 'map2-1'), ('XS', 53), ('XT', 38), ('XF', 1), ('XE', 0)]
1390 fz = dict(read.tags)["FZ"]
1391 self.assertEqual( fz, self.compare[x] )
1392 self.assertEqual( read.opt("FZ"), self.compare[x])
1394 def testWrite( self ):
1396 s = pysam.Samfile(self.filename)
1399 read.tags = read.tags
1401 self.assertEqual( after, before )
1403 class TestBTagBam( TestBTagSam ):
1404 filename = 'example_btag.bam'
1406 class TestDoubleFetch(unittest.TestCase):
1407 '''check if two iterators on the same bamfile are independent.'''
1409 def testDoubleFetch( self ):
1411 samfile1 = pysam.Samfile('ex1.bam', 'rb')
1413 for a,b in zip(samfile1.fetch(), samfile1.fetch()):
1414 self.assertEqual( a.compare( b ), 0 )
1416 def testDoubleFetchWithRegion( self ):
1418 samfile1 = pysam.Samfile('ex1.bam', 'rb')
1419 chr, start, stop = 'chr1', 200, 3000000
1420 self.assertTrue(len(list(samfile1.fetch ( chr, start, stop))) > 0) #just making sure the test has something to catch
1422 for a,b in zip(samfile1.fetch( chr, start, stop), samfile1.fetch( chr, start, stop)):
1423 self.assertEqual( a.compare( b ), 0 )
1425 def testDoubleFetchUntilEOF( self ):
1427 samfile1 = pysam.Samfile('ex1.bam', 'rb')
1429 for a,b in zip(samfile1.fetch( until_eof = True),
1430 samfile1.fetch( until_eof = True )):
1431 self.assertEqual( a.compare( b), 0 )
1433 class TestRemoteFileFTP(unittest.TestCase):
1434 '''test remote access.
1438 # Need to find an ftp server without password on standard
1441 url = "ftp://ftp.sanger.ac.uk/pub/rd/humanSequences/CV.bam"
1444 def testFTPView( self ):
1446 result = pysam.view( self.url, self.region )
1447 self.assertEqual( len(result), 36 )
1449 def testFTPFetch( self ):
1451 samfile = pysam.Samfile(self.url, "rb")
1452 result = list(samfile.fetch( region = self.region ))
1453 self.assertEqual( len(result), 36 )
1455 class TestRemoteFileHTTP( unittest.TestCase):
1457 url = "http://genserv.anat.ox.ac.uk/downloads/pysam/test/ex1.bam"
1458 region = "chr1:1-1000"
1461 def testView( self ):
1462 samfile_local = pysam.Samfile(self.local, "rb")
1463 ref = list(samfile_local.fetch( region = self.region ))
1465 result = pysam.view( self.url, self.region )
1466 self.assertEqual( len(result), len(ref) )
1468 def testFetch( self ):
1469 samfile = pysam.Samfile(self.url, "rb")
1470 result = list(samfile.fetch( region = self.region ))
1471 samfile_local = pysam.Samfile(self.local, "rb")
1472 ref = list(samfile_local.fetch( region = self.region ))
1474 self.assertEqual( len(ref), len(result) )
1475 for x, y in zip(result, ref):
1476 self.assertEqual( x.compare( y ), 0 )
1478 def testFetchAll( self ):
1479 samfile = pysam.Samfile(self.url, "rb")
1480 result = list(samfile.fetch())
1481 samfile_local = pysam.Samfile(self.local, "rb")
1482 ref = list(samfile_local.fetch() )
1484 self.assertEqual( len(ref), len(result) )
1485 for x, y in zip(result, ref):
1486 self.assertEqual( x.compare( y ), 0 )
1488 class TestLargeOptValues( unittest.TestCase ):
1490 ints = ( 65536, 214748, 2147484, 2147483647 )
1491 floats = ( 65536.0, 214748.0, 2147484.0 )
1493 def check( self, samfile ):
1496 for exp in self.ints:
1499 self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1501 for exp in [ -x for x in self.ints ]:
1504 self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1506 for exp in self.floats:
1509 self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1511 for exp in [ -x for x in self.floats ]:
1514 self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1516 def testSAM( self ):
1517 samfile = pysam.Samfile("ex10.sam", "r")
1518 self.check( samfile )
1520 def testBAM( self ):
1521 samfile = pysam.Samfile("ex10.bam", "rb")
1522 self.check( samfile )
1524 # class TestSNPCalls( unittest.TestCase ):
1525 # '''test pysam SNP calling ability.'''
1527 # def checkEqual( self, a, b ):
1528 # for x in ("reference_base",
1531 # "consensus_quality",
1533 # "mapping_quality",
1535 # self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" %
1536 # (x, getattr(a, x), getattr(b,x), str(a), str(b)))
1538 # def testAllPositionsViaIterator( self ):
1539 # samfile = pysam.Samfile( "ex1.bam", "rb")
1540 # fastafile = pysam.Fastafile( "ex1.fa" )
1542 # refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base != "*"]
1543 # except pysam.SamtoolsError:
1546 # i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1547 # calls = list(pysam.IteratorSNPCalls(i))
1548 # for x,y in zip( refs, calls ):
1549 # self.checkEqual( x, y )
1551 # def testPerPositionViaIterator( self ):
1552 # # test pileup for each position. This is a slow operation
1553 # # so this test is disabled
1555 # samfile = pysam.Samfile( "ex1.bam", "rb")
1556 # fastafile = pysam.Fastafile( "ex1.fa" )
1557 # for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1558 # if x.reference_base == "*": continue
1559 # i = samfile.pileup( x.chromosome, x.pos, x.pos+1,
1560 # fastafile = fastafile,
1561 # stepper = "samtools" )
1562 # z = [ zz for zz in pysam.IteratorSamtools(i) if zz.pos == x.pos ]
1563 # self.assertEqual( len(z), 1 )
1564 # self.checkEqual( x, z[0] )
1566 # def testPerPositionViaCaller( self ):
1567 # # test pileup for each position. This is a fast operation
1568 # samfile = pysam.Samfile( "ex1.bam", "rb")
1569 # fastafile = pysam.Fastafile( "ex1.fa" )
1570 # i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1571 # caller = pysam.SNPCaller( i )
1573 # for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1574 # if x.reference_base == "*": continue
1575 # call = caller.call( x.chromosome, x.pos )
1576 # self.checkEqual( x, call )
1578 # class TestIndelCalls( unittest.TestCase ):
1579 # '''test pysam indel calling.'''
1581 # def checkEqual( self, a, b ):
1585 # "consensus_quality",
1587 # "mapping_quality",
1594 # if b.genotype == "*/*" and x == "second_allele":
1595 # # ignore test for second allele (positions chr2:439 and chr2:1512)
1597 # self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" %
1598 # (x, getattr(a, x), getattr(b,x), str(a), str(b)))
1600 # def testAllPositionsViaIterator( self ):
1602 # samfile = pysam.Samfile( "ex1.bam", "rb")
1603 # fastafile = pysam.Fastafile( "ex1.fa" )
1605 # refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base == "*"]
1606 # except pysam.SamtoolsError:
1609 # i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1610 # calls = [ x for x in list(pysam.IteratorIndelCalls(i)) if x != None ]
1611 # for x,y in zip( refs, calls ):
1612 # self.checkEqual( x, y )
1614 # def testPerPositionViaCaller( self ):
1615 # # test pileup for each position. This is a fast operation
1616 # samfile = pysam.Samfile( "ex1.bam", "rb")
1617 # fastafile = pysam.Fastafile( "ex1.fa" )
1618 # i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1619 # caller = pysam.IndelCaller( i )
1621 # for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1622 # if x.reference_base != "*": continue
1623 # call = caller.call( x.chromosome, x.pos )
1624 # self.checkEqual( x, call )
1626 class TestLogging( unittest.TestCase ):
1627 '''test around bug issue 42,
1629 failed in versions < 0.4
1632 def check( self, bamfile, log ):
1635 logger = logging.getLogger('franklin')
1636 logger.setLevel(logging.INFO)
1637 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
1638 log_hand = logging.FileHandler('log.txt')
1639 log_hand.setFormatter(formatter)
1640 logger.addHandler(log_hand)
1642 bam = pysam.Samfile(bamfile, 'rb')
1644 self.assertTrue( True )
1646 def testFail1( self ):
1647 self.check( "ex9_fail.bam", False )
1648 self.check( "ex9_fail.bam", True )
1650 def testNoFail1( self ):
1651 self.check( "ex9_nofail.bam", False )
1652 self.check( "ex9_nofail.bam", True )
1654 def testNoFail2( self ):
1655 self.check( "ex9_nofail.bam", True )
1656 self.check( "ex9_nofail.bam", True )
1659 # 1. finish testing all properties within pileup objects
1660 # 2. check exceptions and bad input problems (missing files, optional fields that aren't present, etc...)
1661 # 3. check: presence of sequence
1663 class TestSamfileUtilityFunctions( unittest.TestCase ):
1665 def testCount( self ):
1667 samfile = pysam.Samfile( "ex1.bam", "rb" )
1669 for contig in ("chr1", "chr2" ):
1670 for start in range( 0, 2000, 100 ):
1672 self.assertEqual( len( list( samfile.fetch( contig, start, end ) ) ),
1673 samfile.count( contig, start, end ) )
1675 # test empty intervals
1676 self.assertEqual( len( list( samfile.fetch( contig, start, start ) ) ),
1677 samfile.count( contig, start, start ) )
1679 # test half empty intervals
1680 self.assertEqual( len( list( samfile.fetch( contig, start ) ) ),
1681 samfile.count( contig, start ) )
1683 def testMate( self ):
1684 '''test mate access.'''
1686 with open( "ex1.sam", "rb" ) as inf:
1687 readnames = [ x.split(b"\t")[0] for x in inf.readlines() ]
1688 if sys.version_info[0] >= 3:
1689 readnames = [ name.decode('ascii') for name in readnames ]
1691 counts = collections.defaultdict( int )
1692 for x in readnames: counts[x] += 1
1694 samfile = pysam.Samfile( "ex1.bam", "rb" )
1695 for read in samfile.fetch():
1696 if not read.is_paired:
1697 self.assertRaises( ValueError, samfile.mate, read )
1698 elif read.mate_is_unmapped:
1699 self.assertRaises( ValueError, samfile.mate, read )
1701 if counts[read.qname] == 1:
1702 self.assertRaises( ValueError, samfile.mate, read )
1704 mate = samfile.mate( read )
1705 self.assertEqual( read.qname, mate.qname )
1706 self.assertEqual( read.is_read1, mate.is_read2 )
1707 self.assertEqual( read.is_read2, mate.is_read1 )
1708 self.assertEqual( read.pos, mate.mpos )
1709 self.assertEqual( read.mpos, mate.pos )
1711 def testIndexStats( self ):
1712 '''test if total number of mapped/unmapped reads is correct.'''
1714 samfile = pysam.Samfile( "ex1.bam", "rb" )
1715 self.assertEqual( samfile.mapped, 3235 )
1716 self.assertEqual( samfile.unmapped, 35 )
1718 class TestSamtoolsProxy( unittest.TestCase ):
1719 '''tests for sanity checking access to samtools functions.'''
1721 def testIndex( self ):
1722 self.assertRaises( IOError, pysam.index, "missing_file" )
1724 def testView( self ):
1725 # note that view still echos "open: No such file or directory"
1726 self.assertRaises( pysam.SamtoolsError, pysam.view, "missing_file" )
1728 def testSort( self ):
1729 self.assertRaises( pysam.SamtoolsError, pysam.sort, "missing_file" )
1731 class TestSamfileIndex( unittest.TestCase):
1733 def testIndex( self ):
1734 samfile = pysam.Samfile( "ex1.bam", "rb" )
1735 index = pysam.IndexedReads( samfile )
1738 reads = collections.defaultdict( int )
1740 for read in samfile: reads[read.qname] += 1
1742 for qname, counts in reads.items():
1743 found = list(index.find( qname ))
1744 self.assertEqual( len(found), counts )
1745 for x in found: self.assertEqual( x.qname, qname )
1748 if __name__ == "__main__":
1750 print ("building data files")
1751 subprocess.call( "make", shell=True)
1752 print ("starting tests")
1754 print ("completed tests")