2 '''unit testing code for pysam.
4 Execute in the :file:`tests` directory as it requires the Makefile
5 and data files located there.
16 def checkBinaryEqual( filename1, filename2 ):
17 '''return true if the two files are binary equal.'''
18 if os.path.getsize( filename1 ) != os.path.getsize( filename2 ):
21 infile1 = open(filename1, "rb")
22 infile2 = open(filename2, "rb")
24 def chariter( infile ):
31 for c1,c2 in itertools.izip( chariter( infile1), chariter( infile2) ):
40 def runSamtools( cmd ):
41 '''run a samtools command'''
44 retcode = subprocess.call(cmd, shell=True)
46 print >>sys.stderr, "Child was terminated by signal", -retcode
48 print >>sys.stderr, "Execution failed:", e
50 def getSamtoolsVersion():
51 '''return samtools version'''
53 pipe = subprocess.Popen("samtools", shell=True, stderr=subprocess.PIPE).stderr
54 lines = "".join(pipe.readlines())
55 return re.search( "Version:\s+(\S+)", lines).groups()[0]
57 class BinaryTest(unittest.TestCase):
58 '''test samtools command line commands and compare
59 against pysam commands.
61 Tests fail, if the output is not binary identical.
66 # a list of commands to test
70 ("ex1.fa.fai", "samtools faidx ex1.fa"),
71 ("pysam_ex1.fa.fai", (pysam.faidx, "ex1.fa") ),
75 ("ex1.bam", "samtools import ex1.fa.fai ex1.sam.gz ex1.bam" ),
76 ("pysam_ex1.bam", (pysam.samimport, "ex1.fa.fai ex1.sam.gz pysam_ex1.bam") ),
80 ("ex1.bam.bai", "samtools index ex1.bam" ),
81 ("pysam_ex1.bam.bai", (pysam.index, "pysam_ex1.bam" ) ),
85 ("ex1.pileup", "samtools pileup -cf ex1.fa ex1.bam > ex1.pileup" ),
86 ("pysam_ex1.pileup", (pysam.pileup, "-c -f ex1.fa ex1.bam" ) )
90 ("ex1.glf", "samtools pileup -gf ex1.fa ex1.bam > ex1.glf" ),
91 ("pysam_ex1.glf", (pysam.pileup, "-g -f ex1.fa ex1.bam" ) )
95 ("ex1.glfview", "samtools glfview ex1.glf > ex1.glfview"),
96 ("pysam_ex1.glfview", (pysam.glfview, "ex1.glf" ) ),
100 ("ex1.view", "samtools view ex1.bam > ex1.view"),
101 ("pysam_ex1.view", (pysam.view, "ex1.bam" ) ),
105 ("ex1.view", "samtools view -bT ex1.fa -o ex1.view2 ex1.sam"),
106 # note that -o ex1.view2 throws exception.
107 ("pysam_ex1.view", (pysam.view, "-bT ex1.fa -oex1.view2 ex1.sam" ) ),
111 # some tests depend on others. The order specifies in which order
112 # the samtools commands are executed.
113 mOrder = ('faidx', 'import', 'index', 'pileup1', 'pileup2', 'glfview', 'view', 'view2' )
118 For setup, all commands will be run before the first test is
119 executed. Individual tests will then just compare the output
122 if BinaryTest.first_time:
124 shutil.copy( "ex1.fa", "pysam_ex1.fa" )
126 for label in self.mOrder:
127 command = self.mCommands[label]
128 samtools_target, samtools_command = command[0]
129 pysam_target, pysam_command = command[1]
130 runSamtools( samtools_command )
131 pysam_method, pysam_options = pysam_command
132 output = pysam_method( *pysam_options.split(" "), raw=True)
133 if ">" in samtools_command:
134 outfile = open( pysam_target, "w" )
135 for line in output: outfile.write( line )
138 BinaryTest.first_time = False
140 samtools_version = getSamtoolsVersion()
141 if samtools_version != pysam.__samtools_version__:
142 raise ValueError("versions of pysam/samtools and samtools differ: %s != %s" % \
143 (pysam.__samtools_version__,
146 def checkCommand( self, command ):
149 samtools_target, pysam_target = self.mCommands[command][0][0], self.mCommands[command][1][0]
150 self.assertTrue( checkBinaryEqual( samtools_target, pysam_target ),
151 "%s failed: files %s and %s are not the same" % (command, samtools_target, pysam_target) )
153 def testImport( self ):
154 self.checkCommand( "import" )
156 def testIndex( self ):
157 self.checkCommand( "index" )
159 def testPileup1( self ):
160 self.checkCommand( "pileup1" )
162 def testPileup2( self ):
163 self.checkCommand( "pileup2" )
165 def testGLFView( self ):
166 self.checkCommand( "glfview" )
168 def testView( self ):
169 self.checkCommand( "view" )
171 def testEmptyIndex( self ):
172 self.assertRaises( pysam.SamtoolsError, pysam.index, "exdoesntexist.bam" )
176 for label, command in self.mCommands.iteritems():
177 samtools_target, samtools_command = command[0]
178 pysam_target, pysam_command = command[1]
179 if os.path.exists( samtools_target): os.remove( samtools_target )
180 if os.path.exists( pysam_target): os.remove( pysam_target )
181 if os.path.exists( "pysam_ex1.fa" ): os.remove( "pysam_ex1.fa" )
183 class IOTest(unittest.TestCase):
184 '''check if reading samfile and writing a samfile are consistent.'''
186 def checkEcho( self, input_filename, reference_filename,
188 input_mode, output_mode, use_template = True):
189 '''iterate through *input_filename* writing to *output_filename* and
190 comparing the output to *reference_filename*.
192 The files are opened according to the *input_mode* and *output_mode*.
194 If *use_template* is set, the header is copied from infile using the
195 template mechanism, otherwise target names and lengths are passed explicitely.
198 infile = pysam.Samfile( input_filename, input_mode )
200 outfile = pysam.Samfile( output_filename, output_mode, template = infile )
202 outfile = pysam.Samfile( output_filename, output_mode,
203 referencenames = infile.references,
204 referencelengths = infile.lengths )
206 iter = infile.fetch()
207 for x in iter: outfile.write( x )
211 self.assertTrue( checkBinaryEqual( reference_filename, output_filename),
212 "files %s and %s are not the same" % (reference_filename, output_filename) )
214 def testReadWriteBam( self ):
216 input_filename = "ex1.bam"
217 output_filename = "pysam_ex1.bam"
218 reference_filename = "ex1.bam"
220 self.checkEcho( input_filename, reference_filename, output_filename,
223 def testReadWriteBamWithTargetNames( self ):
225 input_filename = "ex1.bam"
226 output_filename = "pysam_ex1.bam"
227 reference_filename = "ex1.bam"
229 self.checkEcho( input_filename, reference_filename, output_filename,
230 "rb", "wb", use_template = False )
232 def testReadWriteSamWithHeader( self ):
234 input_filename = "ex2.sam"
235 output_filename = "pysam_ex2.sam"
236 reference_filename = "ex2.sam"
238 self.checkEcho( input_filename, reference_filename, output_filename,
241 def testReadWriteSamWithoutHeader( self ):
243 input_filename = "ex2.sam"
244 output_filename = "pysam_ex2.sam"
245 reference_filename = "ex1.sam"
247 self.checkEcho( input_filename, reference_filename, output_filename,
250 def testFetchFromClosedFile( self ):
252 samfile = pysam.Samfile( "ex1.bam", "rb" )
254 self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
256 def testClosedFile( self ):
257 '''test that access to a closed samfile raises ValueError.'''
259 samfile = pysam.Samfile( "ex1.bam", "rb" )
261 self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
262 self.assertRaises( ValueError, samfile.pileup, 'chr1', 100, 120)
263 self.assertRaises( ValueError, samfile.getrname, 0 )
264 self.assertRaises( ValueError, samfile.tell )
265 self.assertRaises( ValueError, samfile.write, None )
266 self.assertRaises( ValueError, samfile.seek, 0 )
267 self.assertRaises( ValueError, getattr, samfile, "nreferences" )
268 self.assertRaises( ValueError, getattr, samfile, "references" )
269 self.assertRaises( ValueError, getattr, samfile, "lengths" )
270 self.assertRaises( ValueError, getattr, samfile, "text" )
271 self.assertRaises( ValueError, getattr, samfile, "header" )
273 def testBinaryReadFromSamfile( self ):
275 # needs to re-activated, see issue 19
276 #samfile = pysam.Samfile( "ex1.bam", "r" )
277 #samfile.fetch().next()
279 def testReadingFromFileWithoutIndex( self ):
280 '''read from bam file without index.'''
282 assert not os.path.exists( "ex2.bam.bai" )
283 samfile = pysam.Samfile( "ex2.bam", "rb" )
284 self.assertRaises( ValueError, samfile.fetch )
285 self.assertEqual( len(list( samfile.fetch(until_eof = True) )), 3270 )
287 class TestIteratorRow(unittest.TestCase):
290 self.samfile=pysam.Samfile( "ex1.bam","rb" )
292 def checkRange( self, rnge ):
293 '''compare results from iterator with those from samtools.'''
294 ps = list(self.samfile.fetch(region=rnge))
295 sa = list(pysam.view( "ex1.bam", rnge , raw = True) )
296 self.assertEqual( len(ps), len(sa), "unequal number of results for range %s: %i != %i" % (rnge, len(ps), len(sa) ))
297 # check if the same reads are returned and in the same order
298 for line, pair in enumerate( zip( ps, sa ) ):
299 data = pair[1].split("\t")
300 self.assertEqual( pair[0].qname, data[0], "read id mismatch in line %i: %s != %s" % (line, pair[0].rname, data[0]) )
302 def testIteratePerContig(self):
303 '''check random access per contig'''
304 for contig in self.samfile.references:
305 self.checkRange( contig )
307 def testIterateRanges(self):
308 '''check random access per range'''
309 for contig, length in zip(self.samfile.references, self.samfile.lengths):
310 for start in range( 1, length, 90):
311 self.checkRange( "%s:%i-%i" % (contig, start, start + 90) ) # this includes empty ranges
317 class TestIteratorRowAll(unittest.TestCase):
320 self.samfile=pysam.Samfile( "ex1.bam","rb" )
322 def testIterate(self):
323 '''compare results from iterator with those from samtools.'''
324 ps = list(self.samfile.fetch())
325 sa = list(pysam.view( "ex1.bam", raw = True) )
326 self.assertEqual( len(ps), len(sa), "unequal number of results: %i != %i" % (len(ps), len(sa) ))
327 # check if the same reads are returned
328 for line, pair in enumerate( zip( ps, sa ) ):
329 data = pair[1].split("\t")
330 self.assertEqual( pair[0].qname, data[0], "read id mismatch in line %i: %s != %s" % (line, pair[0].rname, data[0]) )
335 class TestIteratorColumn(unittest.TestCase):
336 '''test iterator column against contents of ex3.bam.'''
338 # note that samfile contains 1-based coordinates
339 # 1D means deletion with respect to reference sequence
341 mCoverages = { 'chr1' : [ 0 ] * 20 + [1] * 36 + [0] * (100 - 20 -35 ),
342 'chr2' : [ 0 ] * 20 + [1] * 35 + [0] * (100 - 20 -35 ),
346 self.samfile=pysam.Samfile( "ex4.bam","rb" )
348 def checkRange( self, rnge ):
349 '''compare results from iterator with those from samtools.'''
350 # check if the same reads are returned and in the same order
351 for column in self.samfile.pileup(region=rnge):
352 thiscov = len(column.pileups)
353 refcov = self.mCoverages[self.samfile.getrname(column.tid)][column.pos]
354 self.assertEqual( thiscov, refcov, "wrong coverage at pos %s:%i %i should be %i" % (self.samfile.getrname(column.tid), column.pos, thiscov, refcov))
356 def testIterateAll(self):
357 '''check random access per contig'''
358 self.checkRange( None )
360 def testIteratePerContig(self):
361 '''check random access per contig'''
362 for contig in self.samfile.references:
363 self.checkRange( contig )
365 def testIterateRanges(self):
366 '''check random access per range'''
367 for contig, length in zip(self.samfile.references, self.samfile.lengths):
368 for start in range( 1, length, 90):
369 self.checkRange( "%s:%i-%i" % (contig, start, start + 90) ) # this includes empty ranges
371 def testInverse( self ):
372 '''test the inverse, is point-wise pileup accurate.'''
373 for contig, refseq in self.mCoverages.items():
374 refcolumns = sum(refseq)
375 for pos, refcov in enumerate( refseq ):
376 columns = list(self.samfile.pileup( contig, pos, pos+1) )
378 # if no read, no coverage
379 self.assertEqual( len(columns), refcov, "wrong number of pileup columns returned for position %s:%i, %i should be %i" %(contig,pos,len(columns), refcov) )
381 # one read, all columns of the read are returned
382 self.assertEqual( len(columns), refcolumns, "pileup incomplete at position %i: got %i, expected %i " %\
383 (pos, len(columns), refcolumns))
388 class TestAlignedReadFromBam(unittest.TestCase):
391 self.samfile=pysam.Samfile( "ex3.bam","rb" )
392 self.reads=list(self.samfile.fetch())
394 def testARqname(self):
395 self.assertEqual( self.reads[0].qname, "read_28833_29006_6945", "read name mismatch in read 1: %s != %s" % (self.reads[0].qname, "read_28833_29006_6945") )
396 self.assertEqual( self.reads[1].qname, "read_28701_28881_323b", "read name mismatch in read 2: %s != %s" % (self.reads[1].qname, "read_28701_28881_323b") )
398 def testARflag(self):
399 self.assertEqual( self.reads[0].flag, 99, "flag mismatch in read 1: %s != %s" % (self.reads[0].flag, 99) )
400 self.assertEqual( self.reads[1].flag, 147, "flag mismatch in read 2: %s != %s" % (self.reads[1].flag, 147) )
402 def testARrname(self):
403 self.assertEqual( self.reads[0].rname, 0, "chromosome/target id mismatch in read 1: %s != %s" % (self.reads[0].rname, 0) )
404 self.assertEqual( self.reads[1].rname, 1, "chromosome/target id mismatch in read 2: %s != %s" % (self.reads[1].rname, 1) )
407 self.assertEqual( self.reads[0].pos, 33-1, "mapping position mismatch in read 1: %s != %s" % (self.reads[0].pos, 33-1) )
408 self.assertEqual( self.reads[1].pos, 88-1, "mapping position mismatch in read 2: %s != %s" % (self.reads[1].pos, 88-1) )
410 def testARmapq(self):
411 self.assertEqual( self.reads[0].mapq, 20, "mapping quality mismatch in read 1: %s != %s" % (self.reads[0].mapq, 20) )
412 self.assertEqual( self.reads[1].mapq, 30, "mapping quality mismatch in read 2: %s != %s" % (self.reads[1].mapq, 30) )
414 def testARcigar(self):
415 self.assertEqual( self.reads[0].cigar, [(0, 10), (2, 1), (0, 25)], "read name length mismatch in read 1: %s != %s" % (self.reads[0].cigar, [(0, 10), (2, 1), (0, 25)]) )
416 self.assertEqual( self.reads[1].cigar, [(0, 35)], "read name length mismatch in read 2: %s != %s" % (self.reads[1].cigar, [(0, 35)]) )
418 def testARmrnm(self):
419 self.assertEqual( self.reads[0].mrnm, 0, "mate reference sequence name mismatch in read 1: %s != %s" % (self.reads[0].mrnm, 0) )
420 self.assertEqual( self.reads[1].mrnm, 1, "mate reference sequence name mismatch in read 2: %s != %s" % (self.reads[1].mrnm, 1) )
422 def testARmpos(self):
423 self.assertEqual( self.reads[0].mpos, 200-1, "mate mapping position mismatch in read 1: %s != %s" % (self.reads[0].mpos, 200-1) )
424 self.assertEqual( self.reads[1].mpos, 500-1, "mate mapping position mismatch in read 2: %s != %s" % (self.reads[1].mpos, 500-1) )
426 def testARisize(self):
427 self.assertEqual( self.reads[0].isize, 167, "insert size mismatch in read 1: %s != %s" % (self.reads[0].isize, 167) )
428 self.assertEqual( self.reads[1].isize, 412, "insert size mismatch in read 2: %s != %s" % (self.reads[1].isize, 412) )
431 self.assertEqual( self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 1: %s != %s" % (self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
432 self.assertEqual( self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "sequence size mismatch in read 2: %s != %s" % (self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
433 self.assertEqual( self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 4: %s != %s" % (self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
435 def testARqual(self):
436 self.assertEqual( self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 1: %s != %s" % (self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
437 self.assertEqual( self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "quality string mismatch in read 2: %s != %s" % (self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
438 self.assertEqual( self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 3: %s != %s" % (self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
440 def testARquery(self):
441 self.assertEqual( self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "query mismatch in read 1: %s != %s" % (self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
442 self.assertEqual( self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "query size mismatch in read 2: %s != %s" % (self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
443 self.assertEqual( self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT", "query mismatch in read 4: %s != %s" % (self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT") )
445 def testARqqual(self):
446 self.assertEqual( self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "qquality string mismatch in read 1: %s != %s" % (self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
447 self.assertEqual( self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "qquality string mismatch in read 2: %s != %s" % (self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
448 self.assertEqual( self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22", "qquality string mismatch in read 3: %s != %s" % (self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22") )
450 def testPresentOptionalFields(self):
451 self.assertEqual( self.reads[0].opt('NM'), 1, "optional field mismatch in read 1, NM: %s != %s" % (self.reads[0].opt('NM'), 1) )
452 self.assertEqual( self.reads[0].opt('RG'), 'L1', "optional field mismatch in read 1, RG: %s != %s" % (self.reads[0].opt('RG'), 'L1') )
453 self.assertEqual( self.reads[1].opt('RG'), 'L2', "optional field mismatch in read 2, RG: %s != %s" % (self.reads[1].opt('RG'), 'L2') )
454 self.assertEqual( self.reads[1].opt('MF'), 18, "optional field mismatch in read 2, MF: %s != %s" % (self.reads[1].opt('MF'), 18) )
456 def testPairedBools(self):
457 self.assertEqual( self.reads[0].is_paired, True, "is paired mismatch in read 1: %s != %s" % (self.reads[0].is_paired, True) )
458 self.assertEqual( self.reads[1].is_paired, True, "is paired mismatch in read 2: %s != %s" % (self.reads[1].is_paired, True) )
459 self.assertEqual( self.reads[0].is_proper_pair, True, "is proper pair mismatch in read 1: %s != %s" % (self.reads[0].is_proper_pair, True) )
460 self.assertEqual( self.reads[1].is_proper_pair, True, "is proper pair mismatch in read 2: %s != %s" % (self.reads[1].is_proper_pair, True) )
462 def testTags( self ):
463 self.assertEqual( self.reads[0].tags,
464 [('NM', 1), ('RG', 'L1'),
465 ('PG', 'P1'), ('XT', 'U')] )
466 self.assertEqual( self.reads[1].tags,
467 [('MF', 18), ('RG', 'L2'),
468 ('PG', 'P2'),('XT', 'R') ] )
471 self.assertEqual( self.reads[0].opt("XT"), "U" )
472 self.assertEqual( self.reads[1].opt("XT"), "R" )
474 def testMissingOpt( self ):
475 self.assertRaises( KeyError, self.reads[0].opt, "XP" )
477 def testEmptyOpt( self ):
478 self.assertRaises( KeyError, self.reads[2].opt, "XT" )
483 class TestAlignedReadFromSam(TestAlignedReadFromBam):
486 self.samfile=pysam.Samfile( "ex3.sam","r" )
487 self.reads=list(self.samfile.fetch())
489 # needs to be implemented
490 # class TestAlignedReadFromSamWithoutHeader(TestAlignedReadFromBam):
493 # self.samfile=pysam.Samfile( "ex7.sam","r" )
494 # self.reads=list(self.samfile.fetch())
496 class TestHeaderSam(unittest.TestCase):
498 header = {'SQ': [{'LN': 1575, 'SN': 'chr1'},
499 {'LN': 1584, 'SN': 'chr2'}],
500 'RG': [{'LB': 'SC_1', 'ID': 'L1', 'SM': 'NA12891', 'PU': 'SC_1_10', "CN":"name:with:colon"},
501 {'LB': 'SC_2', 'ID': 'L2', 'SM': 'NA12891', 'PU': 'SC_2_12', "CN":"name:with:colon"}],
502 'PG': [{'ID': 'P1', 'VN': '1.0'}, {'ID': 'P2', 'VN': '1.1'}],
504 'CO' : [ 'this is a comment', 'this is another comment'],
507 def compareHeaders( self, a, b ):
508 '''compare two headers a and b.'''
509 for ak,av in a.iteritems():
510 self.assertTrue( ak in b, "key '%s' not in '%s' " % (ak,b) )
511 self.assertEqual( av, b[ak] )
514 self.samfile=pysam.Samfile( "ex3.sam","r" )
516 def testHeaders(self):
517 self.compareHeaders( self.header, self.samfile.header )
518 self.compareHeaders( self.samfile.header, self.header )
523 class TestHeaderBam(TestHeaderSam):
526 self.samfile=pysam.Samfile( "ex3.bam","rb" )
528 class TestUnmappedReads(unittest.TestCase):
531 samfile=pysam.Samfile( "ex5.sam","r" )
532 self.assertEqual( len(list(samfile.fetch( until_eof = True))), 2 )
536 samfile=pysam.Samfile( "ex5.bam","rb" )
537 self.assertEqual( len(list(samfile.fetch( until_eof = True))), 2 )
540 class TestPileupObjects(unittest.TestCase):
543 self.samfile=pysam.Samfile( "ex1.bam","rb" )
545 def testPileupColumn(self):
546 for pcolumn1 in self.samfile.pileup( region="chr1:105" ):
547 if pcolumn1.pos == 104:
548 self.assertEqual( pcolumn1.tid, 0, "chromosome/target id mismatch in position 1: %s != %s" % (pcolumn1.tid, 0) )
549 self.assertEqual( pcolumn1.pos, 105-1, "position mismatch in position 1: %s != %s" % (pcolumn1.pos, 105-1) )
550 self.assertEqual( pcolumn1.n, 2, "# reads mismatch in position 1: %s != %s" % (pcolumn1.n, 2) )
551 for pcolumn2 in self.samfile.pileup( region="chr2:1480" ):
552 if pcolumn2.pos == 1479:
553 self.assertEqual( pcolumn2.tid, 1, "chromosome/target id mismatch in position 1: %s != %s" % (pcolumn2.tid, 1) )
554 self.assertEqual( pcolumn2.pos, 1480-1, "position mismatch in position 1: %s != %s" % (pcolumn2.pos, 1480-1) )
555 self.assertEqual( pcolumn2.n, 12, "# reads mismatch in position 1: %s != %s" % (pcolumn2.n, 12) )
557 def testPileupRead(self):
558 for pcolumn1 in self.samfile.pileup( region="chr1:105" ):
559 if pcolumn1.pos == 104:
560 self.assertEqual( len(pcolumn1.pileups), 2, "# reads aligned to column mismatch in position 1: %s != %s" % (len(pcolumn1.pileups), 2) )
561 # self.assertEqual( pcolumn1.pileups[0] # need to test additional properties here
566 class TestContextManager(unittest.TestCase):
568 def testManager( self ):
569 with pysam.Samfile('ex1.bam', 'rb') as samfile:
571 self.assertEqual( samfile._isOpen(), False )
573 class TestExceptions(unittest.TestCase):
576 self.samfile=pysam.Samfile( "ex1.bam","rb" )
578 def testMissingFile(self):
580 self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.bam", "rb" )
581 self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.sam", "r" )
582 self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.bam", "r" )
583 self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.sam", "rb" )
585 def testBadContig(self):
586 self.assertRaises( ValueError, self.samfile.fetch, "chr88" )
588 def testMeaninglessCrap(self):
589 self.assertRaises( ValueError, self.samfile.fetch, "skljf" )
591 def testBackwardsOrderNewFormat(self):
592 self.assertRaises( ValueError, self.samfile.fetch, 'chr1', 100, 10 )
594 def testBackwardsOrderOldFormat(self):
595 self.assertRaises( ValueError, self.samfile.fetch, region="chr1:100-10")
597 def testOutOfRangeNegativeNewFormat(self):
598 self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, -10 )
599 self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, 0 )
600 self.assertRaises( ValueError, self.samfile.fetch, "chr1", -5, -10 )
602 def testOutOfRangeNegativeOldFormat(self):
603 self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-10" )
604 self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-0" )
605 self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5--10" )
607 def testOutOfRangNewFormat(self):
608 self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999, 99999999999 )
610 def testOutOfRangeLargeNewFormat(self):
611 self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
613 def testOutOfRangeLargeOldFormat(self):
614 self.assertRaises( ValueError, self.samfile.fetch, "chr1:99999999999999999-999999999999999999" )
616 def testZeroToZero(self):
618 self.assertEqual( len(list(self.samfile.fetch('chr1', 0, 0))), 0)
624 class TestWrongFormat(unittest.TestCase):
625 '''test cases for opening files not in bam/sam format.'''
627 def testOpenSamAsBam( self ):
628 self.assertRaises( ValueError, pysam.Samfile, 'ex1.sam', 'rb' )
630 def testOpenBamAsSam( self ):
631 self.assertRaises( ValueError, pysam.Samfile, 'ex1.bam', 'r' )
633 def testOpenFastaAsSam( self ):
634 self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'r' )
636 def testOpenFastaAsBam( self ):
637 self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'rb' )
639 class TestFastaFile(unittest.TestCase):
641 mSequences = { 'chr1' :
642 "CACTAGTGGCTCATTGTAAATGTGTGGTTTAACTCGTCCATGGCCCAGCATTAGGGAGCTGTGGACCCTGCAGCCTGGCTGTGGGGGCCGCAGTGGCTGAGGGGTGCAGAGCCGAGTCACGGGGTTGCCAGCACAGGGGCTTAACCTCTGGTGACTGCCAGAGCTGCTGGCAAGCTAGAGTCCCATTTGGAGCCCCTCTAAGCCGTTCTATTTGTAATGAAAACTATATTTATGCTATTCAGTTCTAAATATAGAAATTGAAACAGCTGTGTTTAGTGCCTTTGTTCAACCCCCTTGCAACAACCTTGAGAACCCCAGGGAATTTGTCAATGTCAGGGAAGGAGCATTTTGTCAGTTACCAAATGTGTTTATTACCAGAGGGATGGAGGGAAGAGGGACGCTGAAGAACTTTGATGCCCTCTTCTTCCAAAGATGAAACGCGTAACTGCGCTCTCATTCACTCCAGCTCCCTGTCACCCAATGGACCTGTGATATCTGGATTCTGGGAAATTCTTCATCCTGGACCCTGAGAGATTCTGCAGCCCAGCTCCAGATTGCTTGTGGTCTGACAGGCTGCAACTGTGAGCCATCACAATGAACAACAGGAAGAAAAGGTCTTTCAAAAGGTGATGTGTGTTCTCATCAACCTCATACACACACATGGTTTAGGGGTATAATACCTCTACATGGCTGATTATGAAAACAATGTTCCCCAGATACCATCCCTGTCTTACTTCCAGCTCCCCAGAGGGAAAGCTTTCAACGCTTCTAGCCATTTCTTTTGGCATTTGCCTTCAGACCCTACACGAATGCGTCTCTACCACAGGGGGCTGCGCGGTTTCCCATCATGAAGCACTGAACTTCCACGTCTCATCTAGGGGAACAGGGAGGTGCACTAATGCGCTCCACGCCCAAGCCCTTCTCACAGTTTCTGCCCCCAGCATGGTTGTACTGGGCAATACATGAGATTATTAGGAAATGCTTTACTGTCATAACTATGAAGAGACTATTGCCAGATGAACCACACATTAATACTATGTTTCTTATCTGCACATTACTACCCTGCAATTAATATAATTGTGTCCATGTACACACGCTGTCCTATGTACTTATCATGACTCTATCCCAAATTCCCAATTACGTCCTATCTTCTTCTTAGGGAAGAACAGCTTAGGTATCAATTTGGTGTTCTGTGTAAAGTCTCAGGGAGCCGTCCGTGTCCTCCCATCTGGCCTCGTCCACACTGGTTCTCTTGAAAGCTTGGGCTGTAATGATGCCCCTTGGCCATCACCCAGTCCCTGCCCCATCTCTTGTAATCTCTCTCCTTTTTGCTGCATCCCTGTCTTCCTCTGTCTTGATTTACTTGTTGTTGGTTTTCTGTTTCTTTGTTTGATTTGGTGGAAGACATAATCCCACGCTTCCTATGGAAAGGTTGTTGGGAGATTTTTAATGATTCCTCAATGTTAAAATGTCTATTTTTGTCTTGACACCCAACTAATATTTGTCTGAGCAAAACAGTCTAGATGAGAGAGAACTTCCCTGGAGGTCTGATGGCGTTTCTCCCTCGTCTTCTTA",
644 "TTCAAATGAACTTCTGTAATTGAAAAATTCATTTAAGAAATTACAAAATATAGTTGAAAGCTCTAACAATAGACTAAACCAAGCAGAAGAAAGAGGTTCAGAACTTGAAGACAAGTCTCTTATGAATTAACCCAGTCAGACAAAAATAAAGAAAAAAATTTTAAAAATGAACAGAGCTTTCAAGAAGTATGAGATTATGTAAAGTAACTGAACCTATGAGTCACAGGTATTCCTGAGGAAAAAGAAAAAGTGAGAAGTTTGGAAAAACTATTTGAGGAAGTAATTGGGGAAAACCTCTTTAGTCTTGCTAGAGATTTAGACATCTAAATGAAAGAGGCTCAAAGAATGCCAGGAAGATACATTGCAAGACAGACTTCATCAAGATATGTAGTCATCAGACTATCTAAAGTCAACATGAAGGAAAAAAATTCTAAAATCAGCAAGAGAAAAGCATACAGTCATCTATAAAGGAAATCCCATCAGAATAACAATGGGCTTCTCAGCAGAAACCTTACAAGCCAGAAGAGATTGGATCTAATTTTTGGACTTCTTAAAGAAAAAAAAACCTGTCAAACACGAATGTTATGCCCTGCTAAACTAAGCATCATAAATGAAGGGGAAATAAAGTCAAGTCTTTCCTGACAAGCAAATGCTAAGATAATTCATCATCACTAAACCAGTCCTATAAGAAATGCTCAAAAGAATTGTAAAAGTCAAAATTAAAGTTCAATACTCACCATCATAAATACACACAAAAGTACAAAACTCACAGGTTTTATAAAACAATTGAGACTACAGAGCAACTAGGTAAAAAATTAACATTACAACAGGAACAAAACCTCATATATCAATATTAACTTTGAATAAAAAGGGATTAAATTCCCCCACTTAAGAGATATAGATTGGCAGAACAGATTTAAAAACATGAACTAACTATATGCTGTTTACAAGAAACTCATTAATAAAGACATGAGTTCAGGTAAAGGGGTGGAAAAAGATGTTCTACGCAAACAGAAACCAAATGAGAGAAGGAGTAGCTATACTTATATCAGATAAAGCACACTTTAAATCAACAACAGTAAAATAAAACAAAGGAGGTCATCATACAATGATAAAAAGATCAATTCAGCAAGAAGATATAACCATCCTACTAAATACATATGCACCTAACACAAGACTACCCAGATTCATAAAACAAATACTACTAGACCTAAGAGGGATGAGAAATTACCTAATTGGTACAATGTACAATATTCTGATGATGGTTACACTAAAAGCCCATACTTTACTGCTACTCAATATATCCATGTAACAAATCTGCGCTTGTACTTCTAAATCTATAAAAAAATTAAAATTTAACAAAAGTAAATAAAACACATAGCTAAAACTAAAAAAGCAAAAACAAAAACTATGCTAAGTATTGGTAAAGATGTGGGGAAAAAAGTAAACTCTCAAATATTGCTAGTGGGAGTATAAATTGTTTTCCACTTTGGAAAACAATTTGGTAATTTCGTTTTTTTTTTTTTCTTTTCTCTTTTTTTTTTTTTTTTTTTTGCATGCCAGAAAAAAATATTTACAGTAACT",
648 self.file=pysam.Fastafile( "ex1.fa" )
651 for id, seq in self.mSequences.items():
652 self.assertEqual( seq, self.file.fetch( id ) )
653 for x in range( 0, len(seq), 10):
654 self.assertEqual( seq[x:x+10], self.file.fetch( id, x, x+10) )
656 self.assertEqual( seq[x:], self.file.fetch( id, x) )
658 self.assertEqual( seq[:x], self.file.fetch( id, None, x) )
661 # unknown sequence returns ""
662 self.assertEqual( "", self.file.fetch("chr12") )
664 def testFetchErrors( self ):
665 self.assertRaises( ValueError, self.file.fetch )
666 self.assertRaises( ValueError, self.file.fetch, "chr1", -1, 10 )
667 self.assertRaises( ValueError, self.file.fetch, "chr1", 20, 10 )
669 def testLength( self ):
670 self.assertEqual( len(self.file), 2 )
675 class TestAlignedRead(unittest.TestCase):
676 '''tests to check if aligned read can be constructed
680 def checkFieldEqual( self, read1, read2, exclude = []):
681 '''check if two reads are equal by comparing each field.'''
683 for x in ("qname", "seq", "flag",
684 "rname", "pos", "mapq", "cigar",
685 "mrnm", "mpos", "isize", "qual",
686 "is_paired", "is_proper_pair",
687 "is_unmapped", "mate_is_unmapped",
688 "is_reverse", "mate_is_reverse",
689 "is_read1", "is_read2",
690 "is_secondary", "is_qcfail",
691 "is_duplicate", "bin"):
692 if x in exclude: continue
693 self.assertEqual( getattr(read1, x), getattr(read2,x), "attribute mismatch for %s: %s != %s" %
694 (x, getattr(read1, x), getattr(read2,x)))
696 def testEmpty( self ):
697 a = pysam.AlignedRead()
698 self.assertEqual( a.qname, None )
699 self.assertEqual( a.seq, None )
700 self.assertEqual( a.qual, None )
701 self.assertEqual( a.flag, 0 )
702 self.assertEqual( a.rname, 0 )
703 self.assertEqual( a.mapq, 0 )
704 self.assertEqual( a.cigar, None )
705 self.assertEqual( a.tags, None )
706 self.assertEqual( a.mrnm, 0 )
707 self.assertEqual( a.mpos, 0 )
708 self.assertEqual( a.isize, 0 )
710 def buildRead( self ):
711 '''build an example read.'''
713 a = pysam.AlignedRead()
714 a.qname = "read_12345"
720 a.cigar = ( (0,10), (2,1), (0,25) )
728 def testUpdate( self ):
729 '''check if updating fields affects other variable length data
736 self.checkFieldEqual( a, b, "qname" )
737 b.qname = "read_12345678"
738 self.checkFieldEqual( a, b, "qname" )
739 b.qname = "read_12345"
740 self.checkFieldEqual( a, b)
743 b.cigar = ( (0,10), )
744 self.checkFieldEqual( a, b, "cigar" )
745 b.cigar = ( (0,10), (2,1), (0,25), (2,1), (0,25) )
746 self.checkFieldEqual( a, b, "cigar" )
747 b.cigar = ( (0,10), (2,1), (0,25) )
748 self.checkFieldEqual( a, b)
752 self.checkFieldEqual( a, b, ("seq", "qual") )
754 self.checkFieldEqual( a, b, ("seq", "qual") )
756 self.checkFieldEqual( a, b, ("qual",))
763 "is_paired", "is_proper_pair",
764 "is_unmapped", "mate_is_unmapped",
765 "is_reverse", "mate_is_reverse",
766 "is_read1", "is_read2",
767 "is_secondary", "is_qcfail",
769 setattr( b, x, True )
770 self.assertEqual( getattr(b, x), True )
771 self.checkFieldEqual( a, b, ("flag", x,) )
772 setattr( b, x, False )
773 self.assertEqual( getattr(b, x), False )
774 self.checkFieldEqual( a, b )
776 def testLargeRead( self ):
777 '''build an example read.'''
779 a = pysam.AlignedRead()
780 a.qname = "read_12345"
786 a.cigar = ( (0,10), (2,1), (0,25) )
794 def testTagParsing( self ):
795 '''test for tag parsing
797 see http://groups.google.com/group/pysam-user-group/browse_thread/thread/67ca204059ea465a
799 samfile=pysam.Samfile( "ex8.bam","rb" )
801 for entry in samfile:
803 entry.tags = entry.tags
805 self.assertEqual( after, before )
807 class TestDeNovoConstruction(unittest.TestCase):
808 '''check BAM/SAM file construction using ex3.sam
810 (note these are +1 coordinates):
812 read_28833_29006_6945 99 chr1 33 20 10M1D25M = 200 167 AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG <<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<< NM:i:1 RG:Z:L1
813 read_28701_28881_323b 147 chr2 88 30 35M = 500 412 ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA <<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<< MF:i:18 RG:Z:L2
816 header = { 'HD': {'VN': '1.0'},
817 'SQ': [{'LN': 1575, 'SN': 'chr1'},
818 {'LN': 1584, 'SN': 'chr2'}], }
823 def checkFieldEqual( self, read1, read2, exclude = []):
824 '''check if two reads are equal by comparing each field.'''
826 for x in ("qname", "seq", "flag",
827 "rname", "pos", "mapq", "cigar",
828 "mrnm", "mpos", "isize", "qual",
830 "is_paired", "is_proper_pair",
831 "is_unmapped", "mate_is_unmapped",
832 "is_reverse", "mate_is_reverse",
833 "is_read1", "is_read2",
834 "is_secondary", "is_qcfail",
836 if x in exclude: continue
837 self.assertEqual( getattr(read1, x), getattr(read2,x), "attribute mismatch for %s: %s != %s" %
838 (x, getattr(read1, x), getattr(read2,x)))
843 a = pysam.AlignedRead()
844 a.qname = "read_28833_29006_6945"
845 a.seq="AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG"
850 a.cigar = ( (0,10), (2,1), (0,25) )
854 a.qual="<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<"
855 a.tags = ( ("NM", 1),
858 b = pysam.AlignedRead()
859 b.qname = "read_28701_28881_323b"
860 b.seq="ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA"
865 b.cigar = ( (0,35), )
869 b.qual="<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<"
870 b.tags = ( ("MF", 18),
875 def testSAMWholeFile( self ):
877 tmpfilename = "tmp_%i.sam" % id(self)
879 outfile = pysam.Samfile( tmpfilename, "wh", header = self.header )
881 for x in self.reads: outfile.write( x )
884 self.assertTrue( checkBinaryEqual( tmpfilename, self.samfile ),
885 "mismatch when construction SAM file, see %s %s" % (tmpfilename, self.samfile))
887 os.unlink( tmpfilename )
889 def testBAMPerRead( self ):
890 '''check if individual reads are binary equal.'''
891 infile = pysam.Samfile( self.bamfile, "rb")
893 others = list(infile)
894 for denovo, other in zip( others, self.reads):
895 self.checkFieldEqual( other, denovo )
896 self.assertEqual( other.compare( denovo ), 0 )
898 def testSAMPerRead( self ):
899 '''check if individual reads are binary equal.'''
900 infile = pysam.Samfile( self.samfile, "r")
902 others = list(infile)
903 for denovo, other in zip( others, self.reads):
904 self.checkFieldEqual( other, denovo )
905 self.assertEqual( other.compare( denovo), 0 )
907 def testBAMWholeFile( self ):
909 tmpfilename = "tmp_%i.bam" % id(self)
911 outfile = pysam.Samfile( tmpfilename, "wb", header = self.header )
913 for x in self.reads: outfile.write( x )
916 self.assertTrue( checkBinaryEqual( tmpfilename, self.bamfile ),
917 "mismatch when construction BAM file, see %s %s" % (tmpfilename, self.bamfile))
919 os.unlink( tmpfilename )
922 class TestDoubleFetch(unittest.TestCase):
923 '''check if two iterators on the same bamfile are independent.'''
925 def testDoubleFetch( self ):
927 samfile1 = pysam.Samfile('ex1.bam', 'rb')
929 for a,b in zip(samfile1.fetch(), samfile1.fetch()):
930 self.assertEqual( a.compare( b ), 0 )
932 def testDoubleFetchWithRegion( self ):
934 samfile1 = pysam.Samfile('ex1.bam', 'rb')
935 chr, start, stop = 'chr1', 200, 3000000
936 self.assertTrue(len(list(samfile1.fetch ( chr, start, stop))) > 0) #just making sure the test has something to catch
938 for a,b in zip(samfile1.fetch( chr, start, stop), samfile1.fetch( chr, start, stop)):
939 self.assertEqual( a.compare( b ), 0 )
941 def testDoubleFetchUntilEOF( self ):
943 samfile1 = pysam.Samfile('ex1.bam', 'rb')
945 for a,b in zip(samfile1.fetch( until_eof = True),
946 samfile1.fetch( until_eof = True )):
947 self.assertEqual( a.compare( b), 0 )
949 class TestRemoteFileFTP(unittest.TestCase):
950 '''test remote access.
954 # Need to find an ftp server without password on standard
957 url = "ftp://ftp.sanger.ac.uk/pub/rd/humanSequences/CV.bam"
960 def testFTPView( self ):
961 result = pysam.view( self.url, self.region )
962 self.assertEqual( len(result), 36 )
964 def testFTPFetch( self ):
965 samfile = pysam.Samfile(self.url, "rb")
966 result = list(samfile.fetch( region = self.region ))
967 self.assertEqual( len(result), 36 )
969 class TestRemoteFileHTTP( unittest.TestCase):
971 url = "http://genserv.anat.ox.ac.uk/downloads/pysam/test/ex1.bam"
972 region = "chr1:1-1000"
975 def testView( self ):
976 self.assertRaises( pysam.SamtoolsError, pysam.view, self.url, self.region )
978 def testFetch( self ):
979 samfile = pysam.Samfile(self.url, "rb")
980 result = list(samfile.fetch( region = self.region ))
981 samfile_local = pysam.Samfile(self.local, "rb")
982 ref = list(samfile_local.fetch( region = self.region ))
984 self.assertEqual( len(ref), len(result) )
985 for x, y in zip(result, ref):
986 self.assertEqual( x.compare( y ), 0 )
988 def testFetchAll( self ):
989 samfile = pysam.Samfile(self.url, "rb")
990 result = list(samfile.fetch())
991 samfile_local = pysam.Samfile(self.local, "rb")
992 ref = list(samfile_local.fetch() )
994 self.assertEqual( len(ref), len(result) )
995 for x, y in zip(result, ref):
996 self.assertEqual( x.compare( y ), 0 )
1000 # 1. finish testing all properties within pileup objects
1001 # 2. check exceptions and bad input problems (missing files, optional fields that aren't present, etc...)
1003 if __name__ == "__main__":
1005 print "building data files"
1006 subprocess.call( "make", shell=True)
1007 print "starting tests"