Imported Upstream version 0.6
[pysam.git] / tests / pysam_test.py
1 #!/usr/bin/env python
2 '''unit testing code for pysam.
3
4 Execute in the :file:`tests` directory as it requires the Makefile
5 and data files located there.
6 '''
7
8 import pysam
9 import unittest
10 import os, re, sys
11 import itertools, collections
12 import subprocess
13 import shutil
14 import logging
15
16 SAMTOOLS="samtools"
17 WORKDIR="pysam_test_work"
18
19 def checkBinaryEqual( filename1, filename2 ):
20     '''return true if the two files are binary equal.'''
21     if os.path.getsize( filename1 ) !=  os.path.getsize( filename2 ):
22         return False
23
24     infile1 = open(filename1, "rb")
25     infile2 = open(filename2, "rb")
26
27     def chariter( infile ):
28         while 1:
29             c = infile.read(1)
30             if c == "": break
31             yield c
32
33     found = False
34     for c1,c2 in itertools.izip( chariter( infile1), chariter( infile2) ):
35         if c1 != c2: break
36     else:
37         found = True
38
39     infile1.close()
40     infile2.close()
41     return found
42
43 def runSamtools( cmd ):
44     '''run a samtools command'''
45
46     try:
47         retcode = subprocess.call(cmd, shell=True)
48         if retcode < 0:
49             print >>sys.stderr, "Child was terminated by signal", -retcode
50     except OSError, e:
51         print >>sys.stderr, "Execution failed:", e
52
53 def getSamtoolsVersion():
54     '''return samtools version'''
55
56     pipe = subprocess.Popen(SAMTOOLS, shell=True, stderr=subprocess.PIPE).stderr
57     lines = "".join(pipe.readlines())
58     return re.search( "Version:\s+(\S+)", lines).groups()[0]
59
60 class BinaryTest(unittest.TestCase):
61     '''test samtools command line commands and compare
62     against pysam commands.
63
64     Tests fail, if the output is not binary identical.
65     '''
66
67     first_time = True
68
69     # a list of commands to test
70     commands = \
71         { 
72           "view" :
73               (
74                 ("ex1.view", "view ex1.bam > ex1.view"),
75                 ("pysam_ex1.view", (pysam.view, "ex1.bam" ) ),
76                 ),
77           "view2" :
78               (
79                 ("ex1.view", "view -bT ex1.fa -o ex1.view2 ex1.sam"),
80                 # note that -o ex1.view2 throws exception.
81                 ("pysam_ex1.view", (pysam.view, "-bT ex1.fa -oex1.view2 ex1.sam" ) ),
82                 ),
83           "sort" :
84               (
85                 ( "ex1.sort.bam", "sort ex1.bam ex1.sort" ),
86                 ( "pysam_ex1.sort.bam", (pysam.sort, "ex1.bam pysam_ex1.sort" ) ),
87                 ),
88           "mpileup" :
89               (
90                 ("ex1.pileup", "mpileup ex1.bam > ex1.pileup" ),
91                 ("pysam_ex1.mpileup", (pysam.mpileup, "ex1.bam" ) ),
92                 ),
93           "depth" :
94               (
95                 ("ex1.depth", "depth ex1.bam > ex1.depth" ),
96                 ("pysam_ex1.depth", (pysam.depth, "ex1.bam" ) ),
97                 ),
98           "faidx" : 
99               ( 
100                 ("ex1.fa.fai", "faidx ex1.fa"), 
101                 ("pysam_ex1.fa.fai", (pysam.faidx, "ex1.fa") ),
102                 ),
103           "index":
104               (
105                 ("ex1.bam.bai", "index ex1.bam" ),
106                 ("pysam_ex1.bam.bai", (pysam.index, "pysam_ex1.bam" ) ),
107                 ),
108           "idxstats" :
109               ( 
110                 ("ex1.idxstats", "idxstats ex1.bam > ex1.idxstats" ),
111                 ("pysam_ex1.idxstats", (pysam.idxstats, "pysam_ex1.bam" ) ),
112                 ),
113           "fixmate" :
114               (
115                 ("ex1.fixmate", "fixmate ex1.bam ex1.fixmate" ),
116                 ("pysam_ex1.fixmate", (pysam.fixmate, "pysam_ex1.bam pysam_ex1.fixmate") ),
117                 ),
118           "flagstat" :
119               (
120                 ("ex1.flagstat", "flagstat ex1.bam > ex1.flagstat" ),
121                 ("pysam_ex1.flagstat", (pysam.flagstat, "pysam_ex1.bam") ),
122                 ),
123           "calmd" :
124               (
125                 ("ex1.calmd", "calmd ex1.bam ex1.fa > ex1.calmd" ),
126                 ("pysam_ex1.calmd", (pysam.calmd, "pysam_ex1.bam ex1.fa") ),
127                 ),
128           "merge" :
129               (
130                 ("ex1.merge", "merge -f ex1.merge ex1.bam ex1.bam" ),
131                 # -f option does not work - following command will cause the subsequent
132                 # command to fail
133                 ("pysam_ex1.merge", (pysam.merge, "pysam_ex1.merge pysam_ex1.bam pysam_ex1.bam") ),
134                 ),
135           "rmdup" :
136               (
137                 ("ex1.rmdup", "rmdup ex1.bam ex1.rmdup" ),
138                 ("pysam_ex1.rmdup", (pysam.rmdup, "pysam_ex1.bam pysam_ex1.rmdup" )),
139                 ),
140           "reheader" :
141               (
142                 ( "ex1.reheader", "reheader ex1.bam ex1.bam > ex1.reheader"),
143                 ( "pysam_ex1.reheader", (pysam.reheader, "ex1.bam ex1.bam" ) ),
144                 ),
145           "cat":
146               (
147                 ( "ex1.cat", "cat ex1.bam ex1.bam > ex1.cat"),
148                 ( "pysam_ex1.cat", (pysam.cat, "ex1.bam ex1.bam" ) ),
149                 ),
150           "targetcut":
151               (
152                 ("ex1.targetcut", "targetcut ex1.bam > ex1.targetcut" ),
153                 ("pysam_ex1.targetcut", (pysam.targetcut, "pysam_ex1.bam") ),
154                 ),
155           "phase":
156               (
157                 ("ex1.phase", "phase ex1.bam > ex1.phase" ),
158                 ("pysam_ex1.phase", (pysam.phase, "pysam_ex1.bam") ),
159                 ),
160           "import" :
161               (
162                 ("ex1.bam", "import ex1.fa.fai ex1.sam.gz ex1.bam" ),
163                 ("pysam_ex1.bam", (pysam.samimport, "ex1.fa.fai ex1.sam.gz pysam_ex1.bam") ),
164                 ),
165           "bam2fq":
166               (
167                 ("ex1.bam2fq", "bam2fq ex1.bam > ex1.bam2fq" ),
168                 ("pysam_ex1.bam2fq", (pysam.bam2fq, "pysam_ex1.bam") ),
169                 ),
170         }
171
172     # some tests depend on others. The order specifies in which order
173     # the samtools commands are executed.
174     # The first three (faidx, import, index) need to be in that order, 
175     # the rest is arbitrary.
176     order = ('faidx', 'import', 'index', 
177               # 'pileup1', 'pileup2', deprecated
178               # 'glfview', deprecated
179               'view', 'view2',
180               'sort',
181               'mpileup',
182               'depth',
183               'idxstats',
184               'fixmate',
185               'flagstat',
186               # 'calmd',
187               'merge',
188               'rmdup',
189               'reheader',
190               'cat',
191               'targetcut',
192               'phase',
193               'bam2fq',
194               )
195
196     def setUp( self ):
197         '''setup tests. 
198
199         For setup, all commands will be run before the first test is
200         executed. Individual tests will then just compare the output
201         files.
202         '''
203         if BinaryTest.first_time:
204
205             # remove previous files
206             if os.path.exists( WORKDIR ):
207                 shutil.rmtree( WORKDIR )
208                 
209             # copy the source files to WORKDIR
210             os.makedirs( WORKDIR )
211
212             shutil.copy( "ex1.fa", os.path.join( WORKDIR, "pysam_ex1.fa" ) )
213             shutil.copy( "ex1.fa", os.path.join( WORKDIR, "ex1.fa" ) )
214             shutil.copy( "ex1.sam.gz", os.path.join( WORKDIR, "ex1.sam.gz" ) )
215             shutil.copy( "ex1.sam", os.path.join( WORKDIR, "ex1.sam" ) )
216
217             # cd to workdir
218             savedir = os.getcwd()
219             os.chdir( WORKDIR )
220             
221             for label in self.order:
222                 command = self.commands[label]
223                 samtools_target, samtools_command = command[0]
224                 try:
225                     pysam_target, pysam_command = command[1]
226                 except ValueError, msg:
227                     raise ValueError( "error while setting up %s=%s: %s" %\
228                                           (label, command, msg) )
229                 runSamtools( " ".join( (SAMTOOLS, samtools_command )))
230                 pysam_method, pysam_options = pysam_command
231                 try:
232                     output = pysam_method( *pysam_options.split(" "), raw=True)
233                 except pysam.SamtoolsError, msg:
234                     raise pysam.SamtoolsError( "error while executing %s: options=%s: msg=%s" %\
235                                                    (label, pysam_options, msg) )
236                 if ">" in samtools_command:
237                     outfile = open( pysam_target, "wb" )
238                     for line in output: outfile.write( line )
239                     outfile.close()
240                     
241             os.chdir( savedir )
242             BinaryTest.first_time = False
243
244             
245
246         samtools_version = getSamtoolsVersion()
247
248         
249         def _r( s ):
250             # patch - remove any of the alpha/beta suffixes, i.e., 0.1.12a -> 0.1.12
251             if s.count('-') > 0: s = s[0:s.find('-')]
252             return re.sub( "[^0-9.]", "", s )
253
254         if _r(samtools_version) != _r( pysam.__samtools_version__):
255             raise ValueError("versions of pysam/samtools and samtools differ: %s != %s" % \
256                                  (pysam.__samtools_version__,
257                                   samtools_version ))
258
259     def checkCommand( self, command ):
260         if command:
261             samtools_target, pysam_target = self.commands[command][0][0], self.commands[command][1][0]
262             samtools_target = os.path.join( WORKDIR, samtools_target )
263             pysam_target = os.path.join( WORKDIR, pysam_target )
264             self.assertTrue( checkBinaryEqual( samtools_target, pysam_target ), 
265                              "%s failed: files %s and %s are not the same" % (command, samtools_target, pysam_target) )
266             
267     def testImport( self ):
268         self.checkCommand( "import" )
269
270     def testIndex( self ):
271         self.checkCommand( "index" )
272
273     def testSort( self ):
274         self.checkCommand( "sort" )
275
276     def testMpileup( self ):
277         self.checkCommand( "mpileup" )
278
279     def testDepth( self ):
280         self.checkCommand( "depth" )
281
282     def testIdxstats( self ):
283         self.checkCommand( "idxstats" )
284
285     def testFixmate( self ):
286         self.checkCommand( "fixmate" )
287
288     def testFlagstat( self ):
289         self.checkCommand( "flagstat" )
290         
291     def testMerge( self ):
292         self.checkCommand( "merge" )
293
294     def testRmdup( self ):
295         self.checkCommand( "rmdup" )
296
297     def testReheader( self ):
298         self.checkCommand( "reheader" )
299
300     def testCat( self ):
301         self.checkCommand( "cat" )
302
303     def testTargetcut( self ):
304         self.checkCommand( "targetcut" )
305
306     def testPhase( self ):
307         self.checkCommand( "phase" )
308
309     def testBam2fq( self ):
310         self.checkCommand( "bam2fq" )
311
312     # def testPileup1( self ):
313     #     self.checkCommand( "pileup1" )
314     
315     # def testPileup2( self ):
316     #     self.checkCommand( "pileup2" )
317
318     # deprecated
319     # def testGLFView( self ):
320     #     self.checkCommand( "glfview" )
321
322     def testView( self ):
323         self.checkCommand( "view" )
324
325     def testEmptyIndex( self ):
326         self.assertRaises( IOError, pysam.index, "exdoesntexist.bam" )
327
328     def __del__(self):
329         if os.path.exists( WORKDIR ):
330             shutil.rmtree( WORKDIR )
331
332 class IOTest(unittest.TestCase):
333     '''check if reading samfile and writing a samfile are consistent.'''
334
335     def checkEcho( self, input_filename, reference_filename, 
336                    output_filename, 
337                    input_mode, output_mode, use_template = True ):
338         '''iterate through *input_filename* writing to *output_filename* and
339         comparing the output to *reference_filename*. 
340         
341         The files are opened according to the *input_mode* and *output_mode*.
342
343         If *use_template* is set, the header is copied from infile using the
344         template mechanism, otherwise target names and lengths are passed 
345         explicitely. 
346
347         '''
348
349         infile = pysam.Samfile( input_filename, input_mode )
350         if use_template:
351             outfile = pysam.Samfile( output_filename, output_mode, template = infile )
352         else:
353             outfile = pysam.Samfile( output_filename, output_mode, 
354                                      referencenames = infile.references,
355                                      referencelengths = infile.lengths,
356                                      add_sq_text = False )
357
358         iter = infile.fetch()
359         for x in iter: outfile.write( x )
360         infile.close()
361         outfile.close()
362
363         self.assertTrue( checkBinaryEqual( reference_filename, output_filename), 
364                          "files %s and %s are not the same" % (reference_filename, output_filename) )
365
366     def testReadWriteBam( self ):
367         
368         input_filename = "ex1.bam"
369         output_filename = "pysam_ex1.bam"
370         reference_filename = "ex1.bam"
371
372         self.checkEcho( input_filename, reference_filename, output_filename,
373                         "rb", "wb" )
374
375     def testReadWriteBamWithTargetNames( self ):
376         
377         input_filename = "ex1.bam"
378         output_filename = "pysam_ex1.bam"
379         reference_filename = "ex1.bam"
380
381         self.checkEcho( input_filename, reference_filename, output_filename,
382                         "rb", "wb", use_template = False )
383
384     def testReadWriteSamWithHeader( self ):
385         
386         input_filename = "ex2.sam"
387         output_filename = "pysam_ex2.sam"
388         reference_filename = "ex2.sam"
389
390         self.checkEcho( input_filename, reference_filename, output_filename,
391                         "r", "wh" )
392
393     def testReadWriteSamWithoutHeader( self ):
394         
395         input_filename = "ex2.sam"
396         output_filename = "pysam_ex2.sam"
397         reference_filename = "ex1.sam"
398
399         self.checkEcho( input_filename, reference_filename, output_filename,
400                         "r", "w" )
401
402     def testReadSamWithoutHeaderWriteSamWithoutHeader( self ):
403         
404         input_filename = "ex1.sam"
405         output_filename = "pysam_ex1.sam"
406         reference_filename = "ex1.sam"
407
408         # disabled - reading from a samfile without header
409         # is not implemented.
410         
411         # self.checkEcho( input_filename, reference_filename, output_filename,
412         #                 "r", "w" )
413
414     def testFetchFromClosedFile( self ):
415
416         samfile = pysam.Samfile( "ex1.bam", "rb" )
417         samfile.close()
418         self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
419
420     def testClosedFile( self ):
421         '''test that access to a closed samfile raises ValueError.'''
422
423         samfile = pysam.Samfile( "ex1.bam", "rb" )
424         samfile.close()
425         self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
426         self.assertRaises( ValueError, samfile.pileup, 'chr1', 100, 120)
427         self.assertRaises( ValueError, samfile.getrname, 0 )
428         self.assertRaises( ValueError, samfile.tell )
429         self.assertRaises( ValueError, samfile.seek, 0 )
430         self.assertRaises( ValueError, getattr, samfile, "nreferences" )
431         self.assertRaises( ValueError, getattr, samfile, "references" )
432         self.assertRaises( ValueError, getattr, samfile, "lengths" )
433         self.assertRaises( ValueError, getattr, samfile, "text" )
434         self.assertRaises( ValueError, getattr, samfile, "header" )
435
436         # write on closed file 
437         self.assertEqual( 0, samfile.write(None) )
438
439     def testAutoDetection( self ):
440         '''test if autodetection works.'''
441
442         samfile = pysam.Samfile( "ex3.sam" )
443         self.assertRaises( ValueError, samfile.fetch, 'chr1' )
444         samfile.close()
445
446         samfile = pysam.Samfile( "ex3.bam" )
447         samfile.fetch('chr1')
448         samfile.close()
449
450     def testReadingFromSamFileWithoutHeader( self ):
451         '''read from samfile without header.
452         '''
453         samfile = pysam.Samfile( "ex7.sam" )
454         self.assertRaises( NotImplementedError, samfile.__iter__ )
455
456     def testReadingFromFileWithoutIndex( self ):
457         '''read from bam file without index.'''
458
459         assert not os.path.exists( "ex2.bam.bai" )
460         samfile = pysam.Samfile( "ex2.bam", "rb" )
461         self.assertRaises( ValueError, samfile.fetch )
462         self.assertEqual( len(list( samfile.fetch(until_eof = True) )), 3270 )
463
464     def testReadingUniversalFileMode( self ):
465         '''read from samfile without header.
466         '''
467
468         input_filename = "ex2.sam"
469         output_filename = "pysam_ex2.sam"
470         reference_filename = "ex1.sam"
471
472         self.checkEcho( input_filename, reference_filename, output_filename,
473                         "rU", "w" )
474
475 class TestFloatTagBug( unittest.TestCase ):
476     '''see issue 71'''
477
478     def testFloatTagBug( self ): 
479         '''a float tag before another exposed a parsing bug in bam_aux_get - expected to fail
480
481         This test is expected to fail until samtools is fixed.
482         '''
483         samfile = pysam.Samfile("tag_bug.bam")
484         read = samfile.fetch(until_eof=True).next()
485         self.assertTrue( ('XC',1) in read.tags )
486         self.assertEqual(read.opt('XC'), 1)
487
488 class TestTagParsing( unittest.TestCase ):
489     '''tests checking the accuracy of tag setting and retrieval.'''
490
491     def makeRead( self ):
492         a = pysam.AlignedRead()
493         a.qname = "read_12345"
494         a.tid = 0
495         a.seq="ACGT" * 3
496         a.flag = 0
497         a.rname = 0
498         a.pos = 1
499         a.mapq = 20
500         a.cigar = ( (0,10), (2,1), (0,25) )
501         a.mrnm = 0
502         a.mpos=200
503         a.isize = 0
504         a.qual ="1234" * 3
505         # todo: create tags
506         return a
507
508     def testNegativeIntegers( self ):
509         x = -2
510         aligned_read = self.makeRead()
511         aligned_read.tags = [("XD", int(x) ) ]
512         print aligned_read.tags
513
514     def testNegativeIntegers2( self ):
515         x = -2
516         r = self.makeRead()
517         r.tags = [("XD", int(x) ) ]
518         outfile = pysam.Samfile( "test.bam",
519                                  "wb",
520                                  referencenames = ("chr1",),
521                                  referencelengths = (1000,) )
522         outfile.write (r )
523         outfile.close()
524
525 class TestIteratorRow(unittest.TestCase):
526
527     def setUp(self):
528         self.samfile=pysam.Samfile( "ex1.bam","rb" )
529
530     def checkRange( self, rnge ):
531         '''compare results from iterator with those from samtools.'''
532         ps = list(self.samfile.fetch(region=rnge))
533         sa = list(pysam.view( "ex1.bam", rnge, raw = True) )
534         self.assertEqual( len(ps), len(sa), "unequal number of results for range %s: %i != %i" % (rnge, len(ps), len(sa) ))
535         # check if the same reads are returned and in the same order
536         for line, pair in enumerate( zip( ps, sa ) ):
537             a,b = pair
538             d = b.split("\t")
539             self.assertEqual( a.qname, d[0], "line %i: read id mismatch: %s != %s" % (line, a.rname, d[0]) )
540             self.assertEqual( a.pos, int(d[3])-1, "line %i: read position mismatch: %s != %s, \n%s\n%s\n" % \
541                                   (line, a.pos, int(d[3])-1,
542                                    str(a), str(d) ) )
543             self.assertEqual( a.qual, d[10], "line %i: quality mismatch: %s != %s, \n%s\n%s\n" % \
544                                   (line, a.qual, d[10],
545                                    str(a), str(d) ) )
546
547     def testIteratePerContig(self):
548         '''check random access per contig'''
549         for contig in self.samfile.references:
550             self.checkRange( contig )
551
552     def testIterateRanges(self):
553         '''check random access per range'''
554         for contig, length in zip(self.samfile.references, self.samfile.lengths):
555             for start in range( 1, length, 90):
556                 self.checkRange( "%s:%i-%i" % (contig, start, start + 90) ) # this includes empty ranges
557
558     def tearDown(self):
559         self.samfile.close()
560
561 class TestIteratorRowAll(unittest.TestCase):
562
563     def setUp(self):
564         self.samfile=pysam.Samfile( "ex1.bam","rb" )
565
566     def testIterate(self):
567         '''compare results from iterator with those from samtools.'''
568         ps = list(self.samfile.fetch())
569         sa = list(pysam.view( "ex1.bam", raw = True) )
570         self.assertEqual( len(ps), len(sa), "unequal number of results: %i != %i" % (len(ps), len(sa) ))
571         # check if the same reads are returned
572         for line, pair in enumerate( zip( ps, sa ) ):
573             data = pair[1].split("\t")
574             self.assertEqual( pair[0].qname, data[0], "read id mismatch in line %i: %s != %s" % (line, pair[0].rname, data[0]) )
575
576     def tearDown(self):
577         self.samfile.close()
578
579 class TestIteratorColumn(unittest.TestCase):
580     '''test iterator column against contents of ex3.bam.'''
581     
582     # note that samfile contains 1-based coordinates
583     # 1D means deletion with respect to reference sequence
584     # 
585     mCoverages = { 'chr1' : [ 0 ] * 20 + [1] * 36 + [0] * (100 - 20 -35 ),
586                    'chr2' : [ 0 ] * 20 + [1] * 35 + [0] * (100 - 20 -35 ),
587                    }
588
589     def setUp(self):
590         self.samfile=pysam.Samfile( "ex4.bam","rb" )
591
592     def checkRange( self, rnge ):
593         '''compare results from iterator with those from samtools.'''
594         # check if the same reads are returned and in the same order
595         for column in self.samfile.pileup(region=rnge):
596             thiscov = len(column.pileups)
597             refcov = self.mCoverages[self.samfile.getrname(column.tid)][column.pos]
598             self.assertEqual( thiscov, refcov, "wrong coverage at pos %s:%i %i should be %i" % (self.samfile.getrname(column.tid), column.pos, thiscov, refcov))
599
600     def testIterateAll(self):
601         '''check random access per contig'''
602         self.checkRange( None )
603
604     def testIteratePerContig(self):
605         '''check random access per contig'''
606         for contig in self.samfile.references:
607             self.checkRange( contig )
608
609     def testIterateRanges(self):
610         '''check random access per range'''
611         for contig, length in zip(self.samfile.references, self.samfile.lengths):
612             for start in range( 1, length, 90):
613                 self.checkRange( "%s:%i-%i" % (contig, start, start + 90) ) # this includes empty ranges
614
615     def testInverse( self ):
616         '''test the inverse, is point-wise pileup accurate.'''
617         for contig, refseq in self.mCoverages.items():
618             refcolumns = sum(refseq)
619             for pos, refcov in enumerate( refseq ):
620                 columns = list(self.samfile.pileup( contig, pos, pos+1) )
621                 if refcov == 0:
622                     # if no read, no coverage
623                     self.assertEqual( len(columns), refcov, "wrong number of pileup columns returned for position %s:%i, %i should be %i" %(contig,pos,len(columns), refcov) )
624                 elif refcov == 1:
625                     # one read, all columns of the read are returned
626                     self.assertEqual( len(columns), refcolumns, "pileup incomplete at position %i: got %i, expected %i " %\
627                                           (pos, len(columns), refcolumns))
628
629                     
630     
631     def tearDown(self):
632         self.samfile.close()
633     
634 class TestAlignedReadFromBam(unittest.TestCase):
635
636     def setUp(self):
637         self.samfile=pysam.Samfile( "ex3.bam","rb" )
638         self.reads=list(self.samfile.fetch())
639
640     def testARqname(self):
641         self.assertEqual( self.reads[0].qname, "read_28833_29006_6945", "read name mismatch in read 1: %s != %s" % (self.reads[0].qname, "read_28833_29006_6945") )
642         self.assertEqual( self.reads[1].qname, "read_28701_28881_323b", "read name mismatch in read 2: %s != %s" % (self.reads[1].qname, "read_28701_28881_323b") )
643
644     def testARflag(self):
645         self.assertEqual( self.reads[0].flag, 99, "flag mismatch in read 1: %s != %s" % (self.reads[0].flag, 99) )
646         self.assertEqual( self.reads[1].flag, 147, "flag mismatch in read 2: %s != %s" % (self.reads[1].flag, 147) )
647
648     def testARrname(self):
649         self.assertEqual( self.reads[0].rname, 0, "chromosome/target id mismatch in read 1: %s != %s" % (self.reads[0].rname, 0) )
650         self.assertEqual( self.reads[1].rname, 1, "chromosome/target id mismatch in read 2: %s != %s" % (self.reads[1].rname, 1) )
651
652     def testARpos(self):
653         self.assertEqual( self.reads[0].pos, 33-1, "mapping position mismatch in read 1: %s != %s" % (self.reads[0].pos, 33-1) )
654         self.assertEqual( self.reads[1].pos, 88-1, "mapping position mismatch in read 2: %s != %s" % (self.reads[1].pos, 88-1) )
655
656     def testARmapq(self):
657         self.assertEqual( self.reads[0].mapq, 20, "mapping quality mismatch in read 1: %s != %s" % (self.reads[0].mapq, 20) )
658         self.assertEqual( self.reads[1].mapq, 30, "mapping quality mismatch in read 2: %s != %s" % (self.reads[1].mapq, 30) )
659
660     def testARcigar(self):
661         self.assertEqual( self.reads[0].cigar, [(0, 10), (2, 1), (0, 25)], "read name length mismatch in read 1: %s != %s" % (self.reads[0].cigar, [(0, 10), (2, 1), (0, 25)]) )
662         self.assertEqual( self.reads[1].cigar, [(0, 35)], "read name length mismatch in read 2: %s != %s" % (self.reads[1].cigar, [(0, 35)]) )
663
664     def testARmrnm(self):
665         self.assertEqual( self.reads[0].mrnm, 0, "mate reference sequence name mismatch in read 1: %s != %s" % (self.reads[0].mrnm, 0) )
666         self.assertEqual( self.reads[1].mrnm, 1, "mate reference sequence name mismatch in read 2: %s != %s" % (self.reads[1].mrnm, 1) )
667         self.assertEqual( self.reads[0].rnext, 0, "mate reference sequence name mismatch in read 1: %s != %s" % (self.reads[0].rnext, 0) )
668         self.assertEqual( self.reads[1].rnext, 1, "mate reference sequence name mismatch in read 2: %s != %s" % (self.reads[1].rnext, 1) )
669
670     def testARmpos(self):
671         self.assertEqual( self.reads[0].mpos, 200-1, "mate mapping position mismatch in read 1: %s != %s" % (self.reads[0].mpos, 200-1) )
672         self.assertEqual( self.reads[1].mpos, 500-1, "mate mapping position mismatch in read 2: %s != %s" % (self.reads[1].mpos, 500-1) )
673         self.assertEqual( self.reads[0].pnext, 200-1, "mate mapping position mismatch in read 1: %s != %s" % (self.reads[0].pnext, 200-1) )
674         self.assertEqual( self.reads[1].pnext, 500-1, "mate mapping position mismatch in read 2: %s != %s" % (self.reads[1].pnext, 500-1) )
675
676     def testARisize(self):
677         self.assertEqual( self.reads[0].isize, 167, "insert size mismatch in read 1: %s != %s" % (self.reads[0].isize, 167) )
678         self.assertEqual( self.reads[1].isize, 412, "insert size mismatch in read 2: %s != %s" % (self.reads[1].isize, 412) )
679         self.assertEqual( self.reads[0].tlen, 167, "insert size mismatch in read 1: %s != %s" % (self.reads[0].tlen, 167) )
680         self.assertEqual( self.reads[1].tlen, 412, "insert size mismatch in read 2: %s != %s" % (self.reads[1].tlen, 412) )
681
682     def testARseq(self):
683         self.assertEqual( self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 1: %s != %s" % (self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
684         self.assertEqual( self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "sequence size mismatch in read 2: %s != %s" % (self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
685         self.assertEqual( self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 4: %s != %s" % (self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
686
687     def testARqual(self):
688         self.assertEqual( self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 1: %s != %s" % (self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
689         self.assertEqual( self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "quality string mismatch in read 2: %s != %s" % (self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
690         self.assertEqual( self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 3: %s != %s" % (self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
691
692     def testARquery(self):
693         self.assertEqual( self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "query mismatch in read 1: %s != %s" % (self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
694         self.assertEqual( self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "query size mismatch in read 2: %s != %s" % (self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
695         self.assertEqual( self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT", "query mismatch in read 4: %s != %s" % (self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT") )
696
697     def testARqqual(self):
698         self.assertEqual( self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "qquality string mismatch in read 1: %s != %s" % (self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
699         self.assertEqual( self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "qquality string mismatch in read 2: %s != %s" % (self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
700         self.assertEqual( self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22", "qquality string mismatch in read 3: %s != %s" % (self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22") )
701
702     def testPresentOptionalFields(self):
703         self.assertEqual( self.reads[0].opt('NM'), 1, "optional field mismatch in read 1, NM: %s != %s" % (self.reads[0].opt('NM'), 1) )
704         self.assertEqual( self.reads[0].opt('RG'), 'L1', "optional field mismatch in read 1, RG: %s != %s" % (self.reads[0].opt('RG'), 'L1') )
705         self.assertEqual( self.reads[1].opt('RG'), 'L2', "optional field mismatch in read 2, RG: %s != %s" % (self.reads[1].opt('RG'), 'L2') )
706         self.assertEqual( self.reads[1].opt('MF'), 18, "optional field mismatch in read 2, MF: %s != %s" % (self.reads[1].opt('MF'), 18) )
707
708     def testPairedBools(self):
709         self.assertEqual( self.reads[0].is_paired, True, "is paired mismatch in read 1: %s != %s" % (self.reads[0].is_paired, True) )
710         self.assertEqual( self.reads[1].is_paired, True, "is paired mismatch in read 2: %s != %s" % (self.reads[1].is_paired, True) )
711         self.assertEqual( self.reads[0].is_proper_pair, True, "is proper pair mismatch in read 1: %s != %s" % (self.reads[0].is_proper_pair, True) )
712         self.assertEqual( self.reads[1].is_proper_pair, True, "is proper pair mismatch in read 2: %s != %s" % (self.reads[1].is_proper_pair, True) )
713
714     def testTags( self ):
715         self.assertEqual( self.reads[0].tags, 
716                           [('NM', 1), ('RG', 'L1'), 
717                            ('PG', 'P1'), ('XT', 'U')] )
718         self.assertEqual( self.reads[1].tags, 
719                           [('MF', 18), ('RG', 'L2'), 
720                            ('PG', 'P2'),('XT', 'R') ] )
721
722     def testOpt( self ):
723         self.assertEqual( self.reads[0].opt("XT"), "U" )
724         self.assertEqual( self.reads[1].opt("XT"), "R" )
725
726     def testMissingOpt( self ):
727         self.assertRaises( KeyError, self.reads[0].opt, "XP" )
728
729     def testEmptyOpt( self ):
730         self.assertRaises( KeyError, self.reads[2].opt, "XT" )
731
732     def tearDown(self):
733         self.samfile.close()
734
735 class TestAlignedReadFromSam(TestAlignedReadFromBam):
736
737     def setUp(self):
738         self.samfile=pysam.Samfile( "ex3.sam","r" )
739         self.reads=list(self.samfile.fetch())
740
741 # needs to be implemented 
742 # class TestAlignedReadFromSamWithoutHeader(TestAlignedReadFromBam):
743 #
744 #     def setUp(self):
745 #         self.samfile=pysam.Samfile( "ex7.sam","r" )
746 #         self.reads=list(self.samfile.fetch())
747
748 class TestHeaderSam(unittest.TestCase):
749
750     header = {'SQ': [{'LN': 1575, 'SN': 'chr1'}, 
751                      {'LN': 1584, 'SN': 'chr2'}], 
752               'RG': [{'LB': 'SC_1', 'ID': 'L1', 'SM': 'NA12891', 'PU': 'SC_1_10', "CN":"name:with:colon"}, 
753                      {'LB': 'SC_2', 'ID': 'L2', 'SM': 'NA12891', 'PU': 'SC_2_12', "CN":"name:with:colon"}],
754               'PG': [{'ID': 'P1', 'VN': '1.0'}, {'ID': 'P2', 'VN': '1.1'}], 
755               'HD': {'VN': '1.0'},
756               'CO' : [ 'this is a comment', 'this is another comment'],
757               }
758
759     def compareHeaders( self, a, b ):
760         '''compare two headers a and b.'''
761         for ak,av in a.iteritems():
762             self.assertTrue( ak in b, "key '%s' not in '%s' " % (ak,b) )
763             self.assertEqual( av, b[ak] )
764
765     def setUp(self):
766         self.samfile=pysam.Samfile( "ex3.sam","r" )
767
768     def testHeaders(self):
769         self.compareHeaders( self.header, self.samfile.header )
770         self.compareHeaders( self.samfile.header, self.header )
771
772     def testNameMapping( self ):
773         for x, y in enumerate( ("chr1", "chr2")):
774             tid = self.samfile.gettid( y )
775             ref = self.samfile.getrname( x )
776             self.assertEqual( tid, x )
777             self.assertEqual( ref, y )
778
779         self.assertEqual( self.samfile.gettid("chr?"), -1 )
780         self.assertRaises( ValueError, self.samfile.getrname, 2 )
781
782     def tearDown(self):
783         self.samfile.close()
784
785 class TestHeaderBam(TestHeaderSam):
786
787     def setUp(self):
788         self.samfile=pysam.Samfile( "ex3.bam","rb" )
789
790 class TestUnmappedReads(unittest.TestCase):
791
792     def testSAM(self):
793         samfile=pysam.Samfile( "ex5.sam","r" )
794         self.assertEqual( len(list(samfile.fetch( until_eof = True))), 2 ) 
795         samfile.close()
796
797     def testBAM(self):
798         samfile=pysam.Samfile( "ex5.bam","rb" )
799         self.assertEqual( len(list(samfile.fetch( until_eof = True))), 2 ) 
800         samfile.close()
801
802 class TestPileupObjects(unittest.TestCase):
803
804     def setUp(self):
805         self.samfile=pysam.Samfile( "ex1.bam","rb" )
806
807     def testPileupColumn(self):
808         for pcolumn1 in self.samfile.pileup( region="chr1:105" ):
809             if pcolumn1.pos == 104:
810                 self.assertEqual( pcolumn1.tid, 0, "chromosome/target id mismatch in position 1: %s != %s" % (pcolumn1.tid, 0) )
811                 self.assertEqual( pcolumn1.pos, 105-1, "position mismatch in position 1: %s != %s" % (pcolumn1.pos, 105-1) )
812                 self.assertEqual( pcolumn1.n, 2, "# reads mismatch in position 1: %s != %s" % (pcolumn1.n, 2) )
813         for pcolumn2 in self.samfile.pileup( region="chr2:1480" ):
814             if pcolumn2.pos == 1479:
815                 self.assertEqual( pcolumn2.tid, 1, "chromosome/target id mismatch in position 1: %s != %s" % (pcolumn2.tid, 1) )
816                 self.assertEqual( pcolumn2.pos, 1480-1, "position mismatch in position 1: %s != %s" % (pcolumn2.pos, 1480-1) )
817                 self.assertEqual( pcolumn2.n, 12, "# reads mismatch in position 1: %s != %s" % (pcolumn2.n, 12) )
818
819     def testPileupRead(self):
820         for pcolumn1 in self.samfile.pileup( region="chr1:105" ):
821             if pcolumn1.pos == 104:
822                 self.assertEqual( len(pcolumn1.pileups), 2, "# reads aligned to column mismatch in position 1: %s != %s" % (len(pcolumn1.pileups), 2) )
823 #                self.assertEqual( pcolumn1.pileups[0]  # need to test additional properties here
824
825     def tearDown(self):
826         self.samfile.close()
827
828 class TestContextManager(unittest.TestCase):
829
830     def testManager( self ):
831         with pysam.Samfile('ex1.bam', 'rb') as samfile:
832             samfile.fetch()
833         self.assertEqual( samfile._isOpen(), False )
834
835 class TestExceptions(unittest.TestCase):
836
837     def setUp(self):
838         self.samfile=pysam.Samfile( "ex1.bam","rb" )
839
840     def testMissingFile(self):
841
842         self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.bam", "rb" )
843         self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.sam", "r" )
844         self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.bam", "r" )
845         self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.sam", "rb" )
846
847     def testBadContig(self):
848         self.assertRaises( ValueError, self.samfile.fetch, "chr88" )
849
850     def testMeaninglessCrap(self):
851         self.assertRaises( ValueError, self.samfile.fetch, "skljf" )
852
853     def testBackwardsOrderNewFormat(self):
854         self.assertRaises( ValueError, self.samfile.fetch, 'chr1', 100, 10 )
855
856     def testBackwardsOrderOldFormat(self):
857         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:100-10")
858         
859     def testOutOfRangeNegativeNewFormat(self):
860         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, -10 )
861         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, 0 )
862         self.assertRaises( ValueError, self.samfile.fetch, "chr1", -5, -10 )
863
864         self.assertRaises( ValueError, self.samfile.count, "chr1", 5, -10 )
865         self.assertRaises( ValueError, self.samfile.count, "chr1", 5, 0 )        
866         self.assertRaises( ValueError, self.samfile.count, "chr1", -5, -10 )
867
868     def testOutOfRangeNegativeOldFormat(self):
869         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-10" )
870         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-0" )
871         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5--10" )
872
873         self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-10" )
874         self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-0" )
875         self.assertRaises( ValueError, self.samfile.count, region="chr1:-5--10" )
876
877     def testOutOfRangNewFormat(self):
878         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999, 99999999999 )
879         self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999, 99999999999 )
880
881     def testOutOfRangeLargeNewFormat(self):
882         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
883         self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
884
885     def testOutOfRangeLargeOldFormat(self):
886         self.assertRaises( ValueError, self.samfile.fetch, "chr1:99999999999999999-999999999999999999" )
887         self.assertRaises( ValueError, self.samfile.count, "chr1:99999999999999999-999999999999999999" )
888
889     def testZeroToZero(self):        
890         '''see issue 44'''
891         self.assertEqual( len(list(self.samfile.fetch('chr1', 0, 0))), 0)
892
893     def tearDown(self):
894         self.samfile.close()
895
896 class TestWrongFormat(unittest.TestCase):
897     '''test cases for opening files not in bam/sam format.'''
898
899     def testOpenSamAsBam( self ):
900         self.assertRaises( ValueError, pysam.Samfile, 'ex1.sam', 'rb' )
901
902     def testOpenBamAsSam( self ):
903         # test fails, needs to be implemented.
904         # sam.fetch() fails on reading, not on opening
905         # self.assertRaises( ValueError, pysam.Samfile, 'ex1.bam', 'r' )
906         pass
907
908     def testOpenFastaAsSam( self ):
909         # test fails, needs to be implemented.
910         # sam.fetch() fails on reading, not on opening
911         # self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'r' )
912         pass
913
914     def testOpenFastaAsBam( self ):
915         self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'rb' )
916
917 class TestFastaFile(unittest.TestCase):
918
919     mSequences = { 'chr1' :
920                        "CACTAGTGGCTCATTGTAAATGTGTGGTTTAACTCGTCCATGGCCCAGCATTAGGGAGCTGTGGACCCTGCAGCCTGGCTGTGGGGGCCGCAGTGGCTGAGGGGTGCAGAGCCGAGTCACGGGGTTGCCAGCACAGGGGCTTAACCTCTGGTGACTGCCAGAGCTGCTGGCAAGCTAGAGTCCCATTTGGAGCCCCTCTAAGCCGTTCTATTTGTAATGAAAACTATATTTATGCTATTCAGTTCTAAATATAGAAATTGAAACAGCTGTGTTTAGTGCCTTTGTTCAACCCCCTTGCAACAACCTTGAGAACCCCAGGGAATTTGTCAATGTCAGGGAAGGAGCATTTTGTCAGTTACCAAATGTGTTTATTACCAGAGGGATGGAGGGAAGAGGGACGCTGAAGAACTTTGATGCCCTCTTCTTCCAAAGATGAAACGCGTAACTGCGCTCTCATTCACTCCAGCTCCCTGTCACCCAATGGACCTGTGATATCTGGATTCTGGGAAATTCTTCATCCTGGACCCTGAGAGATTCTGCAGCCCAGCTCCAGATTGCTTGTGGTCTGACAGGCTGCAACTGTGAGCCATCACAATGAACAACAGGAAGAAAAGGTCTTTCAAAAGGTGATGTGTGTTCTCATCAACCTCATACACACACATGGTTTAGGGGTATAATACCTCTACATGGCTGATTATGAAAACAATGTTCCCCAGATACCATCCCTGTCTTACTTCCAGCTCCCCAGAGGGAAAGCTTTCAACGCTTCTAGCCATTTCTTTTGGCATTTGCCTTCAGACCCTACACGAATGCGTCTCTACCACAGGGGGCTGCGCGGTTTCCCATCATGAAGCACTGAACTTCCACGTCTCATCTAGGGGAACAGGGAGGTGCACTAATGCGCTCCACGCCCAAGCCCTTCTCACAGTTTCTGCCCCCAGCATGGTTGTACTGGGCAATACATGAGATTATTAGGAAATGCTTTACTGTCATAACTATGAAGAGACTATTGCCAGATGAACCACACATTAATACTATGTTTCTTATCTGCACATTACTACCCTGCAATTAATATAATTGTGTCCATGTACACACGCTGTCCTATGTACTTATCATGACTCTATCCCAAATTCCCAATTACGTCCTATCTTCTTCTTAGGGAAGAACAGCTTAGGTATCAATTTGGTGTTCTGTGTAAAGTCTCAGGGAGCCGTCCGTGTCCTCCCATCTGGCCTCGTCCACACTGGTTCTCTTGAAAGCTTGGGCTGTAATGATGCCCCTTGGCCATCACCCAGTCCCTGCCCCATCTCTTGTAATCTCTCTCCTTTTTGCTGCATCCCTGTCTTCCTCTGTCTTGATTTACTTGTTGTTGGTTTTCTGTTTCTTTGTTTGATTTGGTGGAAGACATAATCCCACGCTTCCTATGGAAAGGTTGTTGGGAGATTTTTAATGATTCCTCAATGTTAAAATGTCTATTTTTGTCTTGACACCCAACTAATATTTGTCTGAGCAAAACAGTCTAGATGAGAGAGAACTTCCCTGGAGGTCTGATGGCGTTTCTCCCTCGTCTTCTTA",
921                    'chr2' :
922                        "TTCAAATGAACTTCTGTAATTGAAAAATTCATTTAAGAAATTACAAAATATAGTTGAAAGCTCTAACAATAGACTAAACCAAGCAGAAGAAAGAGGTTCAGAACTTGAAGACAAGTCTCTTATGAATTAACCCAGTCAGACAAAAATAAAGAAAAAAATTTTAAAAATGAACAGAGCTTTCAAGAAGTATGAGATTATGTAAAGTAACTGAACCTATGAGTCACAGGTATTCCTGAGGAAAAAGAAAAAGTGAGAAGTTTGGAAAAACTATTTGAGGAAGTAATTGGGGAAAACCTCTTTAGTCTTGCTAGAGATTTAGACATCTAAATGAAAGAGGCTCAAAGAATGCCAGGAAGATACATTGCAAGACAGACTTCATCAAGATATGTAGTCATCAGACTATCTAAAGTCAACATGAAGGAAAAAAATTCTAAAATCAGCAAGAGAAAAGCATACAGTCATCTATAAAGGAAATCCCATCAGAATAACAATGGGCTTCTCAGCAGAAACCTTACAAGCCAGAAGAGATTGGATCTAATTTTTGGACTTCTTAAAGAAAAAAAAACCTGTCAAACACGAATGTTATGCCCTGCTAAACTAAGCATCATAAATGAAGGGGAAATAAAGTCAAGTCTTTCCTGACAAGCAAATGCTAAGATAATTCATCATCACTAAACCAGTCCTATAAGAAATGCTCAAAAGAATTGTAAAAGTCAAAATTAAAGTTCAATACTCACCATCATAAATACACACAAAAGTACAAAACTCACAGGTTTTATAAAACAATTGAGACTACAGAGCAACTAGGTAAAAAATTAACATTACAACAGGAACAAAACCTCATATATCAATATTAACTTTGAATAAAAAGGGATTAAATTCCCCCACTTAAGAGATATAGATTGGCAGAACAGATTTAAAAACATGAACTAACTATATGCTGTTTACAAGAAACTCATTAATAAAGACATGAGTTCAGGTAAAGGGGTGGAAAAAGATGTTCTACGCAAACAGAAACCAAATGAGAGAAGGAGTAGCTATACTTATATCAGATAAAGCACACTTTAAATCAACAACAGTAAAATAAAACAAAGGAGGTCATCATACAATGATAAAAAGATCAATTCAGCAAGAAGATATAACCATCCTACTAAATACATATGCACCTAACACAAGACTACCCAGATTCATAAAACAAATACTACTAGACCTAAGAGGGATGAGAAATTACCTAATTGGTACAATGTACAATATTCTGATGATGGTTACACTAAAAGCCCATACTTTACTGCTACTCAATATATCCATGTAACAAATCTGCGCTTGTACTTCTAAATCTATAAAAAAATTAAAATTTAACAAAAGTAAATAAAACACATAGCTAAAACTAAAAAAGCAAAAACAAAAACTATGCTAAGTATTGGTAAAGATGTGGGGAAAAAAGTAAACTCTCAAATATTGCTAGTGGGAGTATAAATTGTTTTCCACTTTGGAAAACAATTTGGTAATTTCGTTTTTTTTTTTTTCTTTTCTCTTTTTTTTTTTTTTTTTTTTGCATGCCAGAAAAAAATATTTACAGTAACT",
923                    }
924
925     def setUp(self):
926         self.file=pysam.Fastafile( "ex1.fa" )
927
928     def testFetch(self):
929         for id, seq in self.mSequences.items():
930             self.assertEqual( seq, self.file.fetch( id ) )
931             for x in range( 0, len(seq), 10):
932                 self.assertEqual( seq[x:x+10], self.file.fetch( id, x, x+10) )
933                 # test x:end
934                 self.assertEqual( seq[x:], self.file.fetch( id, x) )
935                 # test 0:x
936                 self.assertEqual( seq[:x], self.file.fetch( id, None, x) )
937
938         
939         # unknown sequence returns ""
940         self.assertEqual( "", self.file.fetch("chr12") )
941
942     def testOutOfRangeAccess( self ):
943         '''test out of range access.'''
944         # out of range access returns an empty string
945         for contig, s in self.mSequences.iteritems():
946             self.assertEqual( self.file.fetch( contig, len(s), len(s)+1), "" )
947
948         self.assertEqual( self.file.fetch( "chr3", 0 , 100), "" ) 
949
950     def testFetchErrors( self ):
951         self.assertRaises( ValueError, self.file.fetch )
952         self.assertRaises( ValueError, self.file.fetch, "chr1", -1, 10 )
953         self.assertRaises( ValueError, self.file.fetch, "chr1", 20, 10 )
954
955     def testLength( self ):
956         self.assertEqual( len(self.file), 2 )
957         
958     def tearDown(self):
959         self.file.close()
960
961 class TestAlignedRead(unittest.TestCase):
962     '''tests to check if aligned read can be constructed
963     and manipulated.
964     '''
965
966     def checkFieldEqual( self, read1, read2, exclude = []):
967         '''check if two reads are equal by comparing each field.'''
968
969         for x in ("qname", "seq", "flag",
970                   "rname", "pos", "mapq", "cigar",
971                   "mrnm", "mpos", "isize", "qual",
972                   "is_paired", "is_proper_pair",
973                   "is_unmapped", "mate_is_unmapped",
974                   "is_reverse", "mate_is_reverse",
975                   "is_read1", "is_read2",
976                   "is_secondary", "is_qcfail",
977                   "is_duplicate", "bin"):
978             if x in exclude: continue
979             self.assertEqual( getattr(read1, x), getattr(read2,x), "attribute mismatch for %s: %s != %s" % 
980                               (x, getattr(read1, x), getattr(read2,x)))
981     
982     def testEmpty( self ):
983         a = pysam.AlignedRead()
984         self.assertEqual( a.qname, None )
985         self.assertEqual( a.seq, None )
986         self.assertEqual( a.qual, None )
987         self.assertEqual( a.flag, 0 )
988         self.assertEqual( a.rname, 0 )
989         self.assertEqual( a.mapq, 0 )
990         self.assertEqual( a.cigar, None )
991         self.assertEqual( a.tags, [] )
992         self.assertEqual( a.mrnm, 0 )
993         self.assertEqual( a.mpos, 0 )
994         self.assertEqual( a.isize, 0 )
995
996     def buildRead( self ):
997         '''build an example read.'''
998         
999         a = pysam.AlignedRead()
1000         a.qname = "read_12345"
1001         a.seq="ACGT" * 3
1002         a.flag = 0
1003         a.rname = 0
1004         a.pos = 33
1005         a.mapq = 20
1006         a.cigar = ( (0,10), (2,1), (0,25) )
1007         a.mrnm = 0
1008         a.mpos=200
1009         a.isize=167
1010         a.qual="1234" * 3
1011         # todo: create tags
1012         return a
1013
1014     def testUpdate( self ):
1015         '''check if updating fields affects other variable length data
1016         '''
1017         a = self.buildRead()
1018         b = self.buildRead()
1019
1020         # check qname
1021         b.qname = "read_123"
1022         self.checkFieldEqual( a, b, "qname" )
1023         b.qname = "read_12345678"
1024         self.checkFieldEqual( a, b, "qname" )
1025         b.qname = "read_12345"
1026         self.checkFieldEqual( a, b)
1027
1028         # check cigar
1029         b.cigar = ( (0,10), )
1030         self.checkFieldEqual( a, b, "cigar" )
1031         b.cigar = ( (0,10), (2,1), (0,25), (2,1), (0,25) )
1032         self.checkFieldEqual( a, b, "cigar" )
1033         b.cigar = ( (0,10), (2,1), (0,25) )
1034         self.checkFieldEqual( a, b)
1035
1036         # check seq 
1037         b.seq = "ACGT"
1038         self.checkFieldEqual( a, b, ("seq", "qual") )
1039         b.seq = "ACGT" * 10
1040         self.checkFieldEqual( a, b, ("seq", "qual") )
1041         b.seq = "ACGT" * 3
1042         self.checkFieldEqual( a, b, ("qual",))
1043
1044         # reset qual
1045         b = self.buildRead()
1046
1047         # check flags:
1048         for x in (
1049             "is_paired", "is_proper_pair",
1050             "is_unmapped", "mate_is_unmapped",
1051             "is_reverse", "mate_is_reverse",
1052             "is_read1", "is_read2",
1053             "is_secondary", "is_qcfail",
1054             "is_duplicate"):
1055             setattr( b, x, True )
1056             self.assertEqual( getattr(b, x), True )
1057             self.checkFieldEqual( a, b, ("flag", x,) )
1058             setattr( b, x, False )
1059             self.assertEqual( getattr(b, x), False )
1060             self.checkFieldEqual( a, b )
1061
1062     def testLargeRead( self ):
1063         '''build an example read.'''
1064         
1065         a = pysam.AlignedRead()
1066         a.qname = "read_12345"
1067         a.seq="ACGT" * 200
1068         a.flag = 0
1069         a.rname = 0
1070         a.pos = 33
1071         a.mapq = 20
1072         a.cigar = ( (0,10), (2,1), (0,25) )
1073         a.mrnm = 0
1074         a.mpos=200
1075         a.isize=167
1076         a.qual="1234" * 200
1077
1078         return a
1079
1080     def testTagParsing( self ):
1081         '''test for tag parsing
1082
1083         see http://groups.google.com/group/pysam-user-group/browse_thread/thread/67ca204059ea465a
1084         '''
1085         samfile=pysam.Samfile( "ex8.bam","rb" )
1086
1087         for entry in samfile:
1088             before = entry.tags
1089             entry.tags = entry.tags
1090             after = entry.tags
1091             self.assertEqual( after, before )
1092
1093 class TestDeNovoConstruction(unittest.TestCase):
1094     '''check BAM/SAM file construction using ex3.sam
1095     
1096     (note these are +1 coordinates):
1097     
1098     read_28833_29006_6945       99      chr1    33      20      10M1D25M        =       200     167     AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG     <<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<     NM:i:1  RG:Z:L1
1099     read_28701_28881_323b       147     chr2    88      30      35M     =       500     412     ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA     <<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<     MF:i:18 RG:Z:L2
1100     '''
1101
1102     header = { 'HD': {'VN': '1.0'},
1103                'SQ': [{'LN': 1575, 'SN': 'chr1'}, 
1104                       {'LN': 1584, 'SN': 'chr2'}], }
1105
1106     bamfile = "ex6.bam"
1107     samfile = "ex6.sam"
1108
1109     def checkFieldEqual( self, read1, read2, exclude = []):
1110         '''check if two reads are equal by comparing each field.'''
1111
1112         for x in ("qname", "seq", "flag",
1113                   "rname", "pos", "mapq", "cigar",
1114                   "mrnm", "mpos", "isize", "qual",
1115                   "bin",
1116                   "is_paired", "is_proper_pair",
1117                   "is_unmapped", "mate_is_unmapped",
1118                   "is_reverse", "mate_is_reverse",
1119                   "is_read1", "is_read2",
1120                   "is_secondary", "is_qcfail",
1121                   "is_duplicate"):
1122             if x in exclude: continue
1123             self.assertEqual( getattr(read1, x), getattr(read2,x), "attribute mismatch for %s: %s != %s" % 
1124                               (x, getattr(read1, x), getattr(read2,x)))
1125
1126     def setUp( self ):
1127
1128         
1129         a = pysam.AlignedRead()
1130         a.qname = "read_28833_29006_6945"
1131         a.seq="AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG"
1132         a.flag = 99
1133         a.rname = 0
1134         a.pos = 32
1135         a.mapq = 20
1136         a.cigar = ( (0,10), (2,1), (0,25) )
1137         a.mrnm = 0
1138         a.mpos=199
1139         a.isize=167
1140         a.qual="<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<"
1141         a.tags = ( ("NM", 1),
1142                    ("RG", "L1") )
1143
1144         b = pysam.AlignedRead()
1145         b.qname = "read_28701_28881_323b"
1146         b.seq="ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA"
1147         b.flag = 147
1148         b.rname = 1
1149         b.pos = 87
1150         b.mapq = 30
1151         b.cigar = ( (0,35), )
1152         b.mrnm = 1
1153         b.mpos=499
1154         b.isize=412
1155         b.qual="<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<"
1156         b.tags = ( ("MF", 18),
1157                    ("RG", "L2") )
1158
1159         self.reads = (a,b)
1160
1161     def testSAMWholeFile( self ):
1162         
1163         tmpfilename = "tmp_%i.sam" % id(self)
1164
1165         outfile = pysam.Samfile( tmpfilename, "wh", header = self.header )
1166
1167         for x in self.reads: outfile.write( x )
1168         outfile.close()
1169         
1170         self.assertTrue( checkBinaryEqual( tmpfilename, self.samfile ),
1171                          "mismatch when construction SAM file, see %s %s" % (tmpfilename, self.samfile))
1172         
1173         os.unlink( tmpfilename )
1174
1175     def testBAMPerRead( self ):
1176         '''check if individual reads are binary equal.'''
1177         infile = pysam.Samfile( self.bamfile, "rb")
1178
1179         others = list(infile)
1180         for denovo, other in zip( others, self.reads):
1181             self.checkFieldEqual( other, denovo )
1182             self.assertEqual( other.compare( denovo ), 0 )
1183
1184     def testSAMPerRead( self ):
1185         '''check if individual reads are binary equal.'''
1186         infile = pysam.Samfile( self.samfile, "r")
1187
1188         others = list(infile)
1189         for denovo, other in zip( others, self.reads):
1190             self.checkFieldEqual( other, denovo )
1191             self.assertEqual( other.compare( denovo), 0 )
1192             
1193     def testBAMWholeFile( self ):
1194         
1195         tmpfilename = "tmp_%i.bam" % id(self)
1196
1197         outfile = pysam.Samfile( tmpfilename, "wb", header = self.header )
1198
1199         for x in self.reads: outfile.write( x )
1200         outfile.close()
1201         
1202         self.assertTrue( checkBinaryEqual( tmpfilename, self.bamfile ),
1203                          "mismatch when construction BAM file, see %s %s" % (tmpfilename, self.bamfile))
1204         
1205         os.unlink( tmpfilename )
1206
1207
1208 class TestDoubleFetch(unittest.TestCase):
1209     '''check if two iterators on the same bamfile are independent.'''
1210     
1211     def testDoubleFetch( self ):
1212
1213         samfile1 = pysam.Samfile('ex1.bam', 'rb')
1214
1215         for a,b in zip(samfile1.fetch(), samfile1.fetch()):
1216             self.assertEqual( a.compare( b ), 0 )
1217
1218     def testDoubleFetchWithRegion( self ):
1219
1220         samfile1 = pysam.Samfile('ex1.bam', 'rb')
1221         chr, start, stop = 'chr1', 200, 3000000
1222         self.assertTrue(len(list(samfile1.fetch ( chr, start, stop))) > 0) #just making sure the test has something to catch
1223
1224         for a,b in zip(samfile1.fetch( chr, start, stop), samfile1.fetch( chr, start, stop)):
1225             self.assertEqual( a.compare( b ), 0 ) 
1226
1227     def testDoubleFetchUntilEOF( self ):
1228
1229         samfile1 = pysam.Samfile('ex1.bam', 'rb')
1230
1231         for a,b in zip(samfile1.fetch( until_eof = True), 
1232                        samfile1.fetch( until_eof = True )):
1233             self.assertEqual( a.compare( b), 0 )
1234
1235 class TestRemoteFileFTP(unittest.TestCase):
1236     '''test remote access.
1237
1238     '''
1239
1240     # Need to find an ftp server without password on standard
1241     # port.
1242
1243     url = "ftp://ftp.sanger.ac.uk/pub/rd/humanSequences/CV.bam"
1244     region = "1:1-1000"
1245
1246     def testFTPView( self ):
1247         return
1248         result = pysam.view( self.url, self.region )
1249         self.assertEqual( len(result), 36 )
1250         
1251     def testFTPFetch( self ):
1252         return
1253         samfile = pysam.Samfile(self.url, "rb")  
1254         result = list(samfile.fetch( region = self.region ))
1255         self.assertEqual( len(result), 36 )
1256
1257 class TestRemoteFileHTTP( unittest.TestCase):
1258
1259     url = "http://genserv.anat.ox.ac.uk/downloads/pysam/test/ex1.bam"
1260     region = "chr1:1-1000"
1261     local = "ex1.bam"
1262
1263     def testView( self ):
1264         samfile_local = pysam.Samfile(self.local, "rb")  
1265         ref = list(samfile_local.fetch( region = self.region ))
1266         
1267         result = pysam.view( self.url, self.region )
1268         self.assertEqual( len(result), len(ref) )
1269         
1270     def testFetch( self ):
1271         samfile = pysam.Samfile(self.url, "rb")  
1272         result = list(samfile.fetch( region = self.region ))
1273         samfile_local = pysam.Samfile(self.local, "rb")  
1274         ref = list(samfile_local.fetch( region = self.region ))
1275
1276         self.assertEqual( len(ref), len(result) )
1277         for x, y in zip(result, ref):
1278             self.assertEqual( x.compare( y ), 0 )
1279
1280     def testFetchAll( self ):
1281         samfile = pysam.Samfile(self.url, "rb")  
1282         result = list(samfile.fetch())
1283         samfile_local = pysam.Samfile(self.local, "rb")  
1284         ref = list(samfile_local.fetch() )
1285
1286         self.assertEqual( len(ref), len(result) )
1287         for x, y in zip(result, ref):
1288             self.assertEqual( x.compare( y ), 0 )
1289
1290 class TestLargeOptValues( unittest.TestCase ):
1291
1292     ints = ( 65536, 214748, 2147484, 2147483647 )
1293     floats = ( 65536.0, 214748.0, 2147484.0 )
1294
1295     def check( self, samfile ):
1296         
1297         i = samfile.fetch()
1298         for exp in self.ints:
1299             rr = i.next()
1300             obs = rr.opt("ZP")
1301             self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1302
1303         for exp in [ -x for x in self.ints ]:
1304             rr = i.next()
1305             obs = rr.opt("ZP")
1306             self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1307
1308         for exp in self.floats:
1309             rr = i.next()
1310             obs = rr.opt("ZP")
1311             self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1312
1313         for exp in [ -x for x in self.floats ]:
1314             rr = i.next()
1315             obs = rr.opt("ZP")
1316             self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1317
1318     def testSAM( self ):
1319         samfile = pysam.Samfile("ex10.sam", "r")
1320         self.check( samfile )
1321
1322     def testBAM( self ):
1323         samfile = pysam.Samfile("ex10.bam", "rb")
1324         self.check( samfile )
1325
1326 # class TestSNPCalls( unittest.TestCase ):
1327 #     '''test pysam SNP calling ability.'''
1328
1329 #     def checkEqual( self, a, b ):
1330 #         for x in ("reference_base", 
1331 #                   "pos",
1332 #                   "genotype",
1333 #                   "consensus_quality",
1334 #                   "snp_quality",
1335 #                   "mapping_quality",
1336 #                   "coverage" ):
1337 #             self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" % 
1338 #                               (x, getattr(a, x), getattr(b,x), str(a), str(b)))
1339
1340 #     def testAllPositionsViaIterator( self ):
1341 #         samfile = pysam.Samfile( "ex1.bam", "rb")  
1342 #         fastafile = pysam.Fastafile( "ex1.fa" )
1343 #         try: 
1344 #             refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base != "*"]
1345 #         except pysam.SamtoolsError:
1346 #             pass
1347
1348 #         i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1349 #         calls = list(pysam.IteratorSNPCalls(i))
1350 #         for x,y in zip( refs, calls ):
1351 #             self.checkEqual( x, y )
1352
1353 #     def testPerPositionViaIterator( self ):
1354 #         # test pileup for each position. This is a slow operation
1355 #         # so this test is disabled 
1356 #         return
1357 #         samfile = pysam.Samfile( "ex1.bam", "rb")  
1358 #         fastafile = pysam.Fastafile( "ex1.fa" )
1359 #         for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1360 #             if x.reference_base == "*": continue
1361 #             i = samfile.pileup( x.chromosome, x.pos, x.pos+1,
1362 #                                 fastafile = fastafile,
1363 #                                 stepper = "samtools" )
1364 #             z = [ zz for zz in pysam.IteratorSamtools(i) if zz.pos == x.pos ]
1365 #             self.assertEqual( len(z), 1 )
1366 #             self.checkEqual( x, z[0] )
1367
1368 #     def testPerPositionViaCaller( self ):
1369 #         # test pileup for each position. This is a fast operation
1370 #         samfile = pysam.Samfile( "ex1.bam", "rb")  
1371 #         fastafile = pysam.Fastafile( "ex1.fa" )
1372 #         i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1373 #         caller = pysam.SNPCaller( i )
1374
1375 #         for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1376 #             if x.reference_base == "*": continue
1377 #             call = caller.call( x.chromosome, x.pos )
1378 #             self.checkEqual( x, call )
1379
1380 # class TestIndelCalls( unittest.TestCase ):
1381 #     '''test pysam indel calling.'''
1382
1383 #     def checkEqual( self, a, b ):
1384
1385 #         for x in ("pos",
1386 #                   "genotype",
1387 #                   "consensus_quality",
1388 #                   "snp_quality",
1389 #                   "mapping_quality",
1390 #                   "coverage",
1391 #                   "first_allele",
1392 #                   "second_allele",
1393 #                   "reads_first",
1394 #                   "reads_second",
1395 #                   "reads_diff"):
1396 #             if b.genotype == "*/*" and x == "second_allele":
1397 #                 # ignore test for second allele (positions chr2:439 and chr2:1512)
1398 #                 continue
1399 #             self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" % 
1400 #                               (x, getattr(a, x), getattr(b,x), str(a), str(b)))
1401
1402 #     def testAllPositionsViaIterator( self ):
1403
1404 #         samfile = pysam.Samfile( "ex1.bam", "rb")  
1405 #         fastafile = pysam.Fastafile( "ex1.fa" )
1406 #         try: 
1407 #             refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base == "*"]
1408 #         except pysam.SamtoolsError:
1409 #             pass
1410
1411 #         i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1412 #         calls = [ x for x in list(pysam.IteratorIndelCalls(i)) if x != None ]
1413 #         for x,y in zip( refs, calls ):
1414 #             self.checkEqual( x, y )
1415
1416 #     def testPerPositionViaCaller( self ):
1417 #         # test pileup for each position. This is a fast operation
1418 #         samfile = pysam.Samfile( "ex1.bam", "rb")  
1419 #         fastafile = pysam.Fastafile( "ex1.fa" )
1420 #         i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1421 #         caller = pysam.IndelCaller( i )
1422
1423 #         for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1424 #             if x.reference_base != "*": continue
1425 #             call = caller.call( x.chromosome, x.pos )
1426 #             self.checkEqual( x, call )
1427
1428 class TestLogging( unittest.TestCase ):
1429     '''test around bug issue 42,
1430
1431     failed in versions < 0.4
1432     '''
1433
1434     def check( self, bamfile, log ):
1435
1436         if log:
1437             logger = logging.getLogger('franklin')
1438             logger.setLevel(logging.INFO)
1439             formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
1440             log_hand  = logging.FileHandler('log.txt')
1441             log_hand.setFormatter(formatter)
1442             logger.addHandler(log_hand)
1443
1444         bam  = pysam.Samfile(bamfile, 'rb')
1445         cols = bam.pileup()
1446         self.assert_( True )
1447
1448     def testFail1( self ):
1449         self.check( "ex9_fail.bam", False )
1450         self.check( "ex9_fail.bam", True )
1451
1452     def testNoFail1( self ):
1453         self.check( "ex9_nofail.bam", False )
1454         self.check( "ex9_nofail.bam", True )
1455
1456     def testNoFail2( self ):
1457         self.check( "ex9_nofail.bam", True )
1458         self.check( "ex9_nofail.bam", True )
1459         
1460 # TODOS
1461 # 1. finish testing all properties within pileup objects
1462 # 2. check exceptions and bad input problems (missing files, optional fields that aren't present, etc...)
1463 # 3. check: presence of sequence
1464
1465 class TestSamfileUtilityFunctions( unittest.TestCase ):
1466
1467     def testCount( self ):
1468
1469         samfile = pysam.Samfile( "ex1.bam", "rb" )
1470
1471         for contig in ("chr1", "chr2" ):
1472             for start in xrange( 0, 2000, 100 ):
1473                 end = start + 1
1474                 self.assertEqual( len( list( samfile.fetch( contig, start, end ) ) ),
1475                                   samfile.count( contig, start, end ) )
1476
1477                 # test empty intervals
1478                 self.assertEqual( len( list( samfile.fetch( contig, start, start ) ) ),
1479                                   samfile.count( contig, start, start ) )
1480
1481                 # test half empty intervals
1482                 self.assertEqual( len( list( samfile.fetch( contig, start ) ) ),
1483                                   samfile.count( contig, start ) )
1484
1485     def testMate( self ):
1486         '''test mate access.'''
1487
1488         readnames = [ x.split("\t")[0] for x in open( "ex1.sam", "rb" ).readlines() ]
1489         counts = collections.defaultdict( int )
1490         for x in readnames: counts[x] += 1
1491
1492         samfile = pysam.Samfile( "ex1.bam", "rb" )
1493         for read in samfile.fetch():
1494             if not read.is_paired:
1495                 self.assertRaises( ValueError, samfile.mate, read )
1496             elif read.mate_is_unmapped:
1497                 self.assertRaises( ValueError, samfile.mate, read )
1498             else:
1499                 if counts[read.qname] == 1:
1500                     self.assertRaises( ValueError, samfile.mate, read )
1501                 else:
1502                     mate = samfile.mate( read )
1503                     self.assertEqual( read.qname, mate.qname )
1504                     self.assertEqual( read.is_read1, mate.is_read2 )
1505                     self.assertEqual( read.is_read2, mate.is_read1 )
1506                     self.assertEqual( read.pos, mate.mpos )
1507                     self.assertEqual( read.mpos, mate.pos )
1508
1509     def testIndexStats( self ):
1510         '''test if total number of mapped/unmapped reads is correct.'''
1511
1512         samfile = pysam.Samfile( "ex1.bam", "rb" )
1513         self.assertEqual( samfile.mapped, 3235 )
1514         self.assertEqual( samfile.unmapped, 35 )
1515
1516 class TestSamtoolsProxy( unittest.TestCase ):
1517     '''tests for sanity checking access to samtools functions.'''
1518
1519     def testIndex( self ):
1520         self.assertRaises( IOError, pysam.index, "missing_file" )
1521
1522     def testView( self ):
1523         # note that view still echos "open: No such file or directory"
1524         self.assertRaises( pysam.SamtoolsError, pysam.view, "missing_file" )
1525
1526     def testSort( self ):
1527         self.assertRaises( pysam.SamtoolsError, pysam.sort, "missing_file" )
1528
1529 class TestSamfileIndex( unittest.TestCase):
1530     
1531     def testIndex( self ):
1532         samfile = pysam.Samfile( "ex1.bam", "rb" )
1533         index = pysam.IndexedReads( samfile )
1534         index.build()
1535
1536         reads = collections.defaultdict( int )
1537
1538         for read in samfile: reads[read.qname] += 1
1539             
1540         for qname, counts in reads.iteritems():
1541             found = list(index.find( qname ))
1542             self.assertEqual( len(found), counts )
1543             for x in found: self.assertEqual( x.qname, qname )
1544             
1545
1546 if __name__ == "__main__":
1547     # build data files
1548     print "building data files"
1549     subprocess.call( "make", shell=True)
1550     print "starting tests"
1551     unittest.main()