Imported Upstream version 0.5
[pysam.git] / tests / pysam_test.py
1 #!/usr/bin/env python
2 '''unit testing code for pysam.
3
4 Execute in the :file:`tests` directory as it requires the Makefile
5 and data files located there.
6 '''
7
8 import pysam
9 import unittest
10 import os, re, sys
11 import itertools, collections
12 import subprocess
13 import shutil
14 import logging
15 SAMTOOLS="samtools"
16
17 def checkBinaryEqual( filename1, filename2 ):
18     '''return true if the two files are binary equal.'''
19     if os.path.getsize( filename1 ) !=  os.path.getsize( filename2 ):
20         return False
21
22     infile1 = open(filename1, "rb")
23     infile2 = open(filename2, "rb")
24
25     def chariter( infile ):
26         while 1:
27             c = infile.read(1)
28             if c == "": break
29             yield c
30
31     found = False
32     for c1,c2 in itertools.izip( chariter( infile1), chariter( infile2) ):
33         if c1 != c2: break
34     else:
35         found = True
36
37     infile1.close()
38     infile2.close()
39     return found
40
41 def runSamtools( cmd ):
42     '''run a samtools command'''
43
44     try:
45         retcode = subprocess.call(cmd, shell=True)
46         if retcode < 0:
47             print >>sys.stderr, "Child was terminated by signal", -retcode
48     except OSError, e:
49         print >>sys.stderr, "Execution failed:", e
50
51 def getSamtoolsVersion():
52     '''return samtools version'''
53
54     pipe = subprocess.Popen(SAMTOOLS, shell=True, stderr=subprocess.PIPE).stderr
55     lines = "".join(pipe.readlines())
56     return re.search( "Version:\s+(\S+)", lines).groups()[0]
57
58 class BinaryTest(unittest.TestCase):
59     '''test samtools command line commands and compare
60     against pysam commands.
61
62     Tests fail, if the output is not binary identical.
63     '''
64
65     first_time = True
66
67     # a list of commands to test
68     mCommands = \
69         { "faidx" : \
70         ( 
71             ("ex1.fa.fai", "faidx ex1.fa"), 
72             ("pysam_ex1.fa.fai", (pysam.faidx, "ex1.fa") ),
73             ),
74           "import" :
75               (
76                 ("ex1.bam", "import ex1.fa.fai ex1.sam.gz ex1.bam" ),
77                 ("pysam_ex1.bam", (pysam.samimport, "ex1.fa.fai ex1.sam.gz pysam_ex1.bam") ),
78                 ),
79           "index":
80               (
81                 ("ex1.bam.bai", "index ex1.bam" ),
82                 ("pysam_ex1.bam.bai", (pysam.index, "pysam_ex1.bam" ) ),
83                 ),
84           "pileup1" :
85           (
86                 ("ex1.pileup", "pileup -cf ex1.fa ex1.bam > ex1.pileup" ),
87                 ("pysam_ex1.pileup", (pysam.pileup, "-c -f ex1.fa ex1.bam" ) )
88                 ),
89           "pileup2" :
90           (
91                 ("ex1.glf", "pileup -gf ex1.fa ex1.bam > ex1.glf" ),
92                 ("pysam_ex1.glf", (pysam.pileup, "-g -f ex1.fa ex1.bam" ) )
93                 ),
94           "glfview" :
95         (
96                 ("ex1.glfview", "glfview ex1.glf > ex1.glfview"),
97                 ("pysam_ex1.glfview", (pysam.glfview, "ex1.glf" ) ),
98                 ),
99           "view" :
100         (
101                 ("ex1.view", "view ex1.bam > ex1.view"),
102                 ("pysam_ex1.view", (pysam.view, "ex1.bam" ) ),
103                 ),
104           "view2" :
105         (
106                 ("ex1.view", "view -bT ex1.fa -o ex1.view2 ex1.sam"),
107                 # note that -o ex1.view2 throws exception.
108                 ("pysam_ex1.view", (pysam.view, "-bT ex1.fa -oex1.view2 ex1.sam" ) ),
109                 ),
110         }
111
112     # some tests depend on others. The order specifies in which order
113     # the samtools commands are executed.
114     mOrder = ('faidx', 'import', 'index', 
115               'pileup1', 'pileup2', 
116               # 'glfview', deprecated
117               'view', 'view2' )
118
119     def setUp( self ):
120         '''setup tests. 
121
122         For setup, all commands will be run before the first test is
123         executed. Individual tests will then just compare the output
124         files.
125         '''
126         if BinaryTest.first_time:
127             # copy the source 
128             shutil.copy( "ex1.fa", "pysam_ex1.fa" )
129
130             for label in self.mOrder:
131                 command = self.mCommands[label]
132                 samtools_target, samtools_command = command[0]
133                 pysam_target, pysam_command = command[1]
134                 runSamtools( " ".join( (SAMTOOLS, samtools_command )))
135                 pysam_method, pysam_options = pysam_command
136                 output = pysam_method( *pysam_options.split(" "), raw=True)
137                 if ">" in samtools_command:
138                     outfile = open( pysam_target, "wb" )
139                     for line in output: outfile.write( line )
140                     outfile.close()
141
142             BinaryTest.first_time = False
143
144         samtools_version = getSamtoolsVersion()
145
146         def _r( s ):
147             # patch - remove any of the alpha/beta suffixes, i.e., 0.1.12a -> 0.1.12
148             if s.count('-') > 0: s = s[0:s.find('-')]
149             return re.sub( "[^0-9.]", "", s )
150
151         if _r(samtools_version) != _r( pysam.__samtools_version__):
152             raise ValueError("versions of pysam/samtools and samtools differ: %s != %s" % \
153                                  (pysam.__samtools_version__,
154                                   samtools_version ))
155
156     def checkCommand( self, command ):
157         if command:
158             samtools_target, pysam_target = self.mCommands[command][0][0], self.mCommands[command][1][0]
159             self.assertTrue( checkBinaryEqual( samtools_target, pysam_target ), 
160                              "%s failed: files %s and %s are not the same" % (command, samtools_target, pysam_target) )
161             
162     def testImport( self ):
163         self.checkCommand( "import" )
164
165     def testIndex( self ):
166         self.checkCommand( "index" )
167         
168     def testPileup1( self ):
169         self.checkCommand( "pileup1" )
170     
171     def testPileup2( self ):
172         self.checkCommand( "pileup2" )
173
174     # deprecated
175     # def testGLFView( self ):
176     #     self.checkCommand( "glfview" )
177
178     def testView( self ):
179         self.checkCommand( "view" )
180
181     def testEmptyIndex( self ):
182         self.assertRaises( IOError, pysam.index, "exdoesntexist.bam" )
183
184     def __del__(self):
185         return
186         for label, command in self.mCommands.iteritems():
187             samtools_target, samtools_command = command[0]
188             pysam_target, pysam_command = command[1]
189             if os.path.exists( samtools_target): os.remove( samtools_target )
190             if os.path.exists( pysam_target): os.remove( pysam_target )
191         if os.path.exists( "pysam_ex1.fa" ): os.remove( "pysam_ex1.fa" )
192
193 class IOTest(unittest.TestCase):
194     '''check if reading samfile and writing a samfile are consistent.'''
195
196     def checkEcho( self, input_filename, reference_filename, 
197                    output_filename, 
198                    input_mode, output_mode, use_template = True):
199         '''iterate through *input_filename* writing to *output_filename* and
200         comparing the output to *reference_filename*. 
201         
202         The files are opened according to the *input_mode* and *output_mode*.
203
204         If *use_template* is set, the header is copied from infile using the
205         template mechanism, otherwise target names and lengths are passed explicitely. 
206         '''
207
208         infile = pysam.Samfile( input_filename, input_mode )
209         if use_template:
210             outfile = pysam.Samfile( output_filename, output_mode, template = infile )
211         else:
212             outfile = pysam.Samfile( output_filename, output_mode, 
213                                      referencenames = infile.references,
214                                      referencelengths = infile.lengths )
215
216         iter = infile.fetch()
217         for x in iter: outfile.write( x )
218         infile.close()
219         outfile.close()
220
221         self.assertTrue( checkBinaryEqual( reference_filename, output_filename), 
222                          "files %s and %s are not the same" % (reference_filename, output_filename) )
223
224     def testReadWriteBam( self ):
225         
226         input_filename = "ex1.bam"
227         output_filename = "pysam_ex1.bam"
228         reference_filename = "ex1.bam"
229
230         self.checkEcho( input_filename, reference_filename, output_filename,
231                         "rb", "wb" )
232
233     def testReadWriteBamWithTargetNames( self ):
234         
235         input_filename = "ex1.bam"
236         output_filename = "pysam_ex1.bam"
237         reference_filename = "ex1.bam"
238
239         self.checkEcho( input_filename, reference_filename, output_filename,
240                         "rb", "wb", use_template = False )
241
242     def testReadWriteSamWithHeader( self ):
243         
244         input_filename = "ex2.sam"
245         output_filename = "pysam_ex2.sam"
246         reference_filename = "ex2.sam"
247
248         self.checkEcho( input_filename, reference_filename, output_filename,
249                         "r", "wh" )
250
251     def testReadWriteSamWithoutHeader( self ):
252         
253         input_filename = "ex2.sam"
254         output_filename = "pysam_ex2.sam"
255         reference_filename = "ex1.sam"
256
257         self.checkEcho( input_filename, reference_filename, output_filename,
258                         "r", "w" )
259
260     def testFetchFromClosedFile( self ):
261
262         samfile = pysam.Samfile( "ex1.bam", "rb" )
263         samfile.close()
264         self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
265
266     def testClosedFile( self ):
267         '''test that access to a closed samfile raises ValueError.'''
268
269         samfile = pysam.Samfile( "ex1.bam", "rb" )
270         samfile.close()
271         self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
272         self.assertRaises( ValueError, samfile.pileup, 'chr1', 100, 120)
273         self.assertRaises( ValueError, samfile.getrname, 0 )
274         self.assertRaises( ValueError, samfile.tell )
275         self.assertRaises( ValueError, samfile.seek, 0 )
276         self.assertRaises( ValueError, getattr, samfile, "nreferences" )
277         self.assertRaises( ValueError, getattr, samfile, "references" )
278         self.assertRaises( ValueError, getattr, samfile, "lengths" )
279         self.assertRaises( ValueError, getattr, samfile, "text" )
280         self.assertRaises( ValueError, getattr, samfile, "header" )
281
282         # write on closed file 
283         self.assertEqual( 0, samfile.write(None) )
284
285     def testAutoDetection( self ):
286         samfile = pysam.Samfile( "ex3.bam" )
287         samfile.fetch()
288         samfile.close()
289
290         samfile = pysam.Samfile( "ex3.sam")
291         samfile.fetch()
292         samfile.close()
293
294     def testReadingFromFileWithoutIndex( self ):
295         '''read from bam file without index.'''
296
297         assert not os.path.exists( "ex2.bam.bai" )
298         samfile = pysam.Samfile( "ex2.bam", "rb" )
299         self.assertRaises( ValueError, samfile.fetch )
300         self.assertEqual( len(list( samfile.fetch(until_eof = True) )), 3270 )
301
302 class TestIteratorRow(unittest.TestCase):
303
304     def setUp(self):
305         self.samfile=pysam.Samfile( "ex1.bam","rb" )
306
307     def checkRange( self, rnge ):
308         '''compare results from iterator with those from samtools.'''
309         ps = list(self.samfile.fetch(region=rnge))
310         sa = list(pysam.view( "ex1.bam", rnge, raw = True) )
311         self.assertEqual( len(ps), len(sa), "unequal number of results for range %s: %i != %i" % (rnge, len(ps), len(sa) ))
312         # check if the same reads are returned and in the same order
313         for line, pair in enumerate( zip( ps, sa ) ):
314             a,b = pair
315             d = b.split("\t")
316             self.assertEqual( a.qname, d[0], "line %i: read id mismatch: %s != %s" % (line, a.rname, d[0]) )
317             self.assertEqual( a.pos, int(d[3])-1, "line %i: read position mismatch: %s != %s, \n%s\n%s\n" % \
318                                   (line, a.pos, int(d[3])-1,
319                                    str(a), str(d) ) )
320             self.assertEqual( a.qual, d[10], "line %i: quality mismatch: %s != %s, \n%s\n%s\n" % \
321                                   (line, a.qual, d[10],
322                                    str(a), str(d) ) )
323
324     def testIteratePerContig(self):
325         '''check random access per contig'''
326         for contig in self.samfile.references:
327             self.checkRange( contig )
328
329     def testIterateRanges(self):
330         '''check random access per range'''
331         for contig, length in zip(self.samfile.references, self.samfile.lengths):
332             for start in range( 1, length, 90):
333                 self.checkRange( "%s:%i-%i" % (contig, start, start + 90) ) # this includes empty ranges
334
335     def tearDown(self):
336         self.samfile.close()
337
338 class TestIteratorRowAll(unittest.TestCase):
339
340     def setUp(self):
341         self.samfile=pysam.Samfile( "ex1.bam","rb" )
342
343     def testIterate(self):
344         '''compare results from iterator with those from samtools.'''
345         ps = list(self.samfile.fetch())
346         sa = list(pysam.view( "ex1.bam", raw = True) )
347         self.assertEqual( len(ps), len(sa), "unequal number of results: %i != %i" % (len(ps), len(sa) ))
348         # check if the same reads are returned
349         for line, pair in enumerate( zip( ps, sa ) ):
350             data = pair[1].split("\t")
351             self.assertEqual( pair[0].qname, data[0], "read id mismatch in line %i: %s != %s" % (line, pair[0].rname, data[0]) )
352
353     def tearDown(self):
354         self.samfile.close()
355
356 class TestIteratorColumn(unittest.TestCase):
357     '''test iterator column against contents of ex3.bam.'''
358     
359     # note that samfile contains 1-based coordinates
360     # 1D means deletion with respect to reference sequence
361     # 
362     mCoverages = { 'chr1' : [ 0 ] * 20 + [1] * 36 + [0] * (100 - 20 -35 ),
363                    'chr2' : [ 0 ] * 20 + [1] * 35 + [0] * (100 - 20 -35 ),
364                    }
365
366     def setUp(self):
367         self.samfile=pysam.Samfile( "ex4.bam","rb" )
368
369     def checkRange( self, rnge ):
370         '''compare results from iterator with those from samtools.'''
371         # check if the same reads are returned and in the same order
372         for column in self.samfile.pileup(region=rnge):
373             thiscov = len(column.pileups)
374             refcov = self.mCoverages[self.samfile.getrname(column.tid)][column.pos]
375             self.assertEqual( thiscov, refcov, "wrong coverage at pos %s:%i %i should be %i" % (self.samfile.getrname(column.tid), column.pos, thiscov, refcov))
376
377     def testIterateAll(self):
378         '''check random access per contig'''
379         self.checkRange( None )
380
381     def testIteratePerContig(self):
382         '''check random access per contig'''
383         for contig in self.samfile.references:
384             self.checkRange( contig )
385
386     def testIterateRanges(self):
387         '''check random access per range'''
388         for contig, length in zip(self.samfile.references, self.samfile.lengths):
389             for start in range( 1, length, 90):
390                 self.checkRange( "%s:%i-%i" % (contig, start, start + 90) ) # this includes empty ranges
391
392     def testInverse( self ):
393         '''test the inverse, is point-wise pileup accurate.'''
394         for contig, refseq in self.mCoverages.items():
395             refcolumns = sum(refseq)
396             for pos, refcov in enumerate( refseq ):
397                 columns = list(self.samfile.pileup( contig, pos, pos+1) )
398                 if refcov == 0:
399                     # if no read, no coverage
400                     self.assertEqual( len(columns), refcov, "wrong number of pileup columns returned for position %s:%i, %i should be %i" %(contig,pos,len(columns), refcov) )
401                 elif refcov == 1:
402                     # one read, all columns of the read are returned
403                     self.assertEqual( len(columns), refcolumns, "pileup incomplete at position %i: got %i, expected %i " %\
404                                           (pos, len(columns), refcolumns))
405
406                     
407     
408     def tearDown(self):
409         self.samfile.close()
410     
411 class TestAlignedReadFromBam(unittest.TestCase):
412
413     def setUp(self):
414         self.samfile=pysam.Samfile( "ex3.bam","rb" )
415         self.reads=list(self.samfile.fetch())
416
417     def testARqname(self):
418         self.assertEqual( self.reads[0].qname, "read_28833_29006_6945", "read name mismatch in read 1: %s != %s" % (self.reads[0].qname, "read_28833_29006_6945") )
419         self.assertEqual( self.reads[1].qname, "read_28701_28881_323b", "read name mismatch in read 2: %s != %s" % (self.reads[1].qname, "read_28701_28881_323b") )
420
421     def testARflag(self):
422         self.assertEqual( self.reads[0].flag, 99, "flag mismatch in read 1: %s != %s" % (self.reads[0].flag, 99) )
423         self.assertEqual( self.reads[1].flag, 147, "flag mismatch in read 2: %s != %s" % (self.reads[1].flag, 147) )
424
425     def testARrname(self):
426         self.assertEqual( self.reads[0].rname, 0, "chromosome/target id mismatch in read 1: %s != %s" % (self.reads[0].rname, 0) )
427         self.assertEqual( self.reads[1].rname, 1, "chromosome/target id mismatch in read 2: %s != %s" % (self.reads[1].rname, 1) )
428
429     def testARpos(self):
430         self.assertEqual( self.reads[0].pos, 33-1, "mapping position mismatch in read 1: %s != %s" % (self.reads[0].pos, 33-1) )
431         self.assertEqual( self.reads[1].pos, 88-1, "mapping position mismatch in read 2: %s != %s" % (self.reads[1].pos, 88-1) )
432
433     def testARmapq(self):
434         self.assertEqual( self.reads[0].mapq, 20, "mapping quality mismatch in read 1: %s != %s" % (self.reads[0].mapq, 20) )
435         self.assertEqual( self.reads[1].mapq, 30, "mapping quality mismatch in read 2: %s != %s" % (self.reads[1].mapq, 30) )
436
437     def testARcigar(self):
438         self.assertEqual( self.reads[0].cigar, [(0, 10), (2, 1), (0, 25)], "read name length mismatch in read 1: %s != %s" % (self.reads[0].cigar, [(0, 10), (2, 1), (0, 25)]) )
439         self.assertEqual( self.reads[1].cigar, [(0, 35)], "read name length mismatch in read 2: %s != %s" % (self.reads[1].cigar, [(0, 35)]) )
440
441     def testARmrnm(self):
442         self.assertEqual( self.reads[0].mrnm, 0, "mate reference sequence name mismatch in read 1: %s != %s" % (self.reads[0].mrnm, 0) )
443         self.assertEqual( self.reads[1].mrnm, 1, "mate reference sequence name mismatch in read 2: %s != %s" % (self.reads[1].mrnm, 1) )
444
445     def testARmpos(self):
446         self.assertEqual( self.reads[0].mpos, 200-1, "mate mapping position mismatch in read 1: %s != %s" % (self.reads[0].mpos, 200-1) )
447         self.assertEqual( self.reads[1].mpos, 500-1, "mate mapping position mismatch in read 2: %s != %s" % (self.reads[1].mpos, 500-1) )
448
449     def testARisize(self):
450         self.assertEqual( self.reads[0].isize, 167, "insert size mismatch in read 1: %s != %s" % (self.reads[0].isize, 167) )
451         self.assertEqual( self.reads[1].isize, 412, "insert size mismatch in read 2: %s != %s" % (self.reads[1].isize, 412) )
452
453     def testARseq(self):
454         self.assertEqual( self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 1: %s != %s" % (self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
455         self.assertEqual( self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "sequence size mismatch in read 2: %s != %s" % (self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
456         self.assertEqual( self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 4: %s != %s" % (self.reads[3].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
457
458     def testARqual(self):
459         self.assertEqual( self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 1: %s != %s" % (self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
460         self.assertEqual( self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "quality string mismatch in read 2: %s != %s" % (self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
461         self.assertEqual( self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 3: %s != %s" % (self.reads[3].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
462
463     def testARquery(self):
464         self.assertEqual( self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "query mismatch in read 1: %s != %s" % (self.reads[0].query, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
465         self.assertEqual( self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "query size mismatch in read 2: %s != %s" % (self.reads[1].query, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
466         self.assertEqual( self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT", "query mismatch in read 4: %s != %s" % (self.reads[3].query, "TAGCTAGCTACCTATATCTTGGTCTT") )
467
468     def testARqqual(self):
469         self.assertEqual( self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "qquality string mismatch in read 1: %s != %s" % (self.reads[0].qqual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
470         self.assertEqual( self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "qquality string mismatch in read 2: %s != %s" % (self.reads[1].qqual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
471         self.assertEqual( self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22", "qquality string mismatch in read 3: %s != %s" % (self.reads[3].qqual, "<<<<<<<<<<<<<<<<<:<9/,&,22") )
472
473     def testPresentOptionalFields(self):
474         self.assertEqual( self.reads[0].opt('NM'), 1, "optional field mismatch in read 1, NM: %s != %s" % (self.reads[0].opt('NM'), 1) )
475         self.assertEqual( self.reads[0].opt('RG'), 'L1', "optional field mismatch in read 1, RG: %s != %s" % (self.reads[0].opt('RG'), 'L1') )
476         self.assertEqual( self.reads[1].opt('RG'), 'L2', "optional field mismatch in read 2, RG: %s != %s" % (self.reads[1].opt('RG'), 'L2') )
477         self.assertEqual( self.reads[1].opt('MF'), 18, "optional field mismatch in read 2, MF: %s != %s" % (self.reads[1].opt('MF'), 18) )
478
479     def testPairedBools(self):
480         self.assertEqual( self.reads[0].is_paired, True, "is paired mismatch in read 1: %s != %s" % (self.reads[0].is_paired, True) )
481         self.assertEqual( self.reads[1].is_paired, True, "is paired mismatch in read 2: %s != %s" % (self.reads[1].is_paired, True) )
482         self.assertEqual( self.reads[0].is_proper_pair, True, "is proper pair mismatch in read 1: %s != %s" % (self.reads[0].is_proper_pair, True) )
483         self.assertEqual( self.reads[1].is_proper_pair, True, "is proper pair mismatch in read 2: %s != %s" % (self.reads[1].is_proper_pair, True) )
484
485     def testTags( self ):
486         self.assertEqual( self.reads[0].tags, 
487                           [('NM', 1), ('RG', 'L1'), 
488                            ('PG', 'P1'), ('XT', 'U')] )
489         self.assertEqual( self.reads[1].tags, 
490                           [('MF', 18), ('RG', 'L2'), 
491                            ('PG', 'P2'),('XT', 'R') ] )
492
493     def testOpt( self ):
494         self.assertEqual( self.reads[0].opt("XT"), "U" )
495         self.assertEqual( self.reads[1].opt("XT"), "R" )
496
497     def testMissingOpt( self ):
498         self.assertRaises( KeyError, self.reads[0].opt, "XP" )
499
500     def testEmptyOpt( self ):
501         self.assertRaises( KeyError, self.reads[2].opt, "XT" )
502
503     def tearDown(self):
504         self.samfile.close()
505
506 class TestAlignedReadFromSam(TestAlignedReadFromBam):
507
508     def setUp(self):
509         self.samfile=pysam.Samfile( "ex3.sam","r" )
510         self.reads=list(self.samfile.fetch())
511
512 # needs to be implemented 
513 # class TestAlignedReadFromSamWithoutHeader(TestAlignedReadFromBam):
514 #
515 #     def setUp(self):
516 #         self.samfile=pysam.Samfile( "ex7.sam","r" )
517 #         self.reads=list(self.samfile.fetch())
518
519 class TestHeaderSam(unittest.TestCase):
520
521     header = {'SQ': [{'LN': 1575, 'SN': 'chr1'}, 
522                      {'LN': 1584, 'SN': 'chr2'}], 
523               'RG': [{'LB': 'SC_1', 'ID': 'L1', 'SM': 'NA12891', 'PU': 'SC_1_10', "CN":"name:with:colon"}, 
524                      {'LB': 'SC_2', 'ID': 'L2', 'SM': 'NA12891', 'PU': 'SC_2_12', "CN":"name:with:colon"}],
525               'PG': [{'ID': 'P1', 'VN': '1.0'}, {'ID': 'P2', 'VN': '1.1'}], 
526               'HD': {'VN': '1.0'},
527               'CO' : [ 'this is a comment', 'this is another comment'],
528               }
529
530     def compareHeaders( self, a, b ):
531         '''compare two headers a and b.'''
532         for ak,av in a.iteritems():
533             self.assertTrue( ak in b, "key '%s' not in '%s' " % (ak,b) )
534             self.assertEqual( av, b[ak] )
535
536     def setUp(self):
537         self.samfile=pysam.Samfile( "ex3.sam","r" )
538
539     def testHeaders(self):
540         self.compareHeaders( self.header, self.samfile.header )
541         self.compareHeaders( self.samfile.header, self.header )
542
543     def testNameMapping( self ):
544         for x, y in enumerate( ("chr1", "chr2")):
545             tid = self.samfile.gettid( y )
546             ref = self.samfile.getrname( x )
547             self.assertEqual( tid, x )
548             self.assertEqual( ref, y )
549
550         self.assertEqual( self.samfile.gettid("chr?"), -1 )
551         self.assertRaises( ValueError, self.samfile.getrname, 2 )
552
553     def tearDown(self):
554         self.samfile.close()
555
556 class TestHeaderBam(TestHeaderSam):
557
558     def setUp(self):
559         self.samfile=pysam.Samfile( "ex3.bam","rb" )
560
561 class TestUnmappedReads(unittest.TestCase):
562
563     def testSAM(self):
564         samfile=pysam.Samfile( "ex5.sam","r" )
565         self.assertEqual( len(list(samfile.fetch( until_eof = True))), 2 ) 
566         samfile.close()
567
568     def testBAM(self):
569         samfile=pysam.Samfile( "ex5.bam","rb" )
570         self.assertEqual( len(list(samfile.fetch( until_eof = True))), 2 ) 
571         samfile.close()
572
573 class TestPileupObjects(unittest.TestCase):
574
575     def setUp(self):
576         self.samfile=pysam.Samfile( "ex1.bam","rb" )
577
578     def testPileupColumn(self):
579         for pcolumn1 in self.samfile.pileup( region="chr1:105" ):
580             if pcolumn1.pos == 104:
581                 self.assertEqual( pcolumn1.tid, 0, "chromosome/target id mismatch in position 1: %s != %s" % (pcolumn1.tid, 0) )
582                 self.assertEqual( pcolumn1.pos, 105-1, "position mismatch in position 1: %s != %s" % (pcolumn1.pos, 105-1) )
583                 self.assertEqual( pcolumn1.n, 2, "# reads mismatch in position 1: %s != %s" % (pcolumn1.n, 2) )
584         for pcolumn2 in self.samfile.pileup( region="chr2:1480" ):
585             if pcolumn2.pos == 1479:
586                 self.assertEqual( pcolumn2.tid, 1, "chromosome/target id mismatch in position 1: %s != %s" % (pcolumn2.tid, 1) )
587                 self.assertEqual( pcolumn2.pos, 1480-1, "position mismatch in position 1: %s != %s" % (pcolumn2.pos, 1480-1) )
588                 self.assertEqual( pcolumn2.n, 12, "# reads mismatch in position 1: %s != %s" % (pcolumn2.n, 12) )
589
590     def testPileupRead(self):
591         for pcolumn1 in self.samfile.pileup( region="chr1:105" ):
592             if pcolumn1.pos == 104:
593                 self.assertEqual( len(pcolumn1.pileups), 2, "# reads aligned to column mismatch in position 1: %s != %s" % (len(pcolumn1.pileups), 2) )
594 #                self.assertEqual( pcolumn1.pileups[0]  # need to test additional properties here
595
596     def tearDown(self):
597         self.samfile.close()
598
599 class TestContextManager(unittest.TestCase):
600
601     def testManager( self ):
602         with pysam.Samfile('ex1.bam', 'rb') as samfile:
603             samfile.fetch()
604         self.assertEqual( samfile._isOpen(), False )
605
606 class TestExceptions(unittest.TestCase):
607
608     def setUp(self):
609         self.samfile=pysam.Samfile( "ex1.bam","rb" )
610
611     def testMissingFile(self):
612
613         self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.bam", "rb" )
614         self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.sam", "r" )
615         self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.bam", "r" )
616         self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.sam", "rb" )
617
618     def testBadContig(self):
619         self.assertRaises( ValueError, self.samfile.fetch, "chr88" )
620
621     def testMeaninglessCrap(self):
622         self.assertRaises( ValueError, self.samfile.fetch, "skljf" )
623
624     def testBackwardsOrderNewFormat(self):
625         self.assertRaises( ValueError, self.samfile.fetch, 'chr1', 100, 10 )
626
627     def testBackwardsOrderOldFormat(self):
628         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:100-10")
629         
630     def testOutOfRangeNegativeNewFormat(self):
631         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, -10 )
632         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, 0 )
633         self.assertRaises( ValueError, self.samfile.fetch, "chr1", -5, -10 )
634
635         self.assertRaises( ValueError, self.samfile.count, "chr1", 5, -10 )
636         self.assertRaises( ValueError, self.samfile.count, "chr1", 5, 0 )        
637         self.assertRaises( ValueError, self.samfile.count, "chr1", -5, -10 )
638
639     def testOutOfRangeNegativeOldFormat(self):
640         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-10" )
641         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-0" )
642         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5--10" )
643
644         self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-10" )
645         self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-0" )
646         self.assertRaises( ValueError, self.samfile.count, region="chr1:-5--10" )
647
648     def testOutOfRangNewFormat(self):
649         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999, 99999999999 )
650         self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999, 99999999999 )
651
652     def testOutOfRangeLargeNewFormat(self):
653         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
654         self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
655
656     def testOutOfRangeLargeOldFormat(self):
657         self.assertRaises( ValueError, self.samfile.fetch, "chr1:99999999999999999-999999999999999999" )
658         self.assertRaises( ValueError, self.samfile.count, "chr1:99999999999999999-999999999999999999" )
659
660     def testZeroToZero(self):        
661         '''see issue 44'''
662         self.assertEqual( len(list(self.samfile.fetch('chr1', 0, 0))), 0)
663
664     def tearDown(self):
665         self.samfile.close()
666
667 class TestWrongFormat(unittest.TestCase):
668     '''test cases for opening files not in bam/sam format.'''
669
670     def testOpenSamAsBam( self ):
671         self.assertRaises( ValueError, pysam.Samfile, 'ex1.sam', 'rb' )
672
673     def testOpenBamAsSam( self ):
674         self.assertRaises( ValueError, pysam.Samfile, 'ex1.bam', 'r' )
675
676     def testOpenFastaAsSam( self ):
677         self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'r' )
678
679     def testOpenFastaAsBam( self ):
680         self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'rb' )
681
682 class TestFastaFile(unittest.TestCase):
683
684     mSequences = { 'chr1' :
685                        "CACTAGTGGCTCATTGTAAATGTGTGGTTTAACTCGTCCATGGCCCAGCATTAGGGAGCTGTGGACCCTGCAGCCTGGCTGTGGGGGCCGCAGTGGCTGAGGGGTGCAGAGCCGAGTCACGGGGTTGCCAGCACAGGGGCTTAACCTCTGGTGACTGCCAGAGCTGCTGGCAAGCTAGAGTCCCATTTGGAGCCCCTCTAAGCCGTTCTATTTGTAATGAAAACTATATTTATGCTATTCAGTTCTAAATATAGAAATTGAAACAGCTGTGTTTAGTGCCTTTGTTCAACCCCCTTGCAACAACCTTGAGAACCCCAGGGAATTTGTCAATGTCAGGGAAGGAGCATTTTGTCAGTTACCAAATGTGTTTATTACCAGAGGGATGGAGGGAAGAGGGACGCTGAAGAACTTTGATGCCCTCTTCTTCCAAAGATGAAACGCGTAACTGCGCTCTCATTCACTCCAGCTCCCTGTCACCCAATGGACCTGTGATATCTGGATTCTGGGAAATTCTTCATCCTGGACCCTGAGAGATTCTGCAGCCCAGCTCCAGATTGCTTGTGGTCTGACAGGCTGCAACTGTGAGCCATCACAATGAACAACAGGAAGAAAAGGTCTTTCAAAAGGTGATGTGTGTTCTCATCAACCTCATACACACACATGGTTTAGGGGTATAATACCTCTACATGGCTGATTATGAAAACAATGTTCCCCAGATACCATCCCTGTCTTACTTCCAGCTCCCCAGAGGGAAAGCTTTCAACGCTTCTAGCCATTTCTTTTGGCATTTGCCTTCAGACCCTACACGAATGCGTCTCTACCACAGGGGGCTGCGCGGTTTCCCATCATGAAGCACTGAACTTCCACGTCTCATCTAGGGGAACAGGGAGGTGCACTAATGCGCTCCACGCCCAAGCCCTTCTCACAGTTTCTGCCCCCAGCATGGTTGTACTGGGCAATACATGAGATTATTAGGAAATGCTTTACTGTCATAACTATGAAGAGACTATTGCCAGATGAACCACACATTAATACTATGTTTCTTATCTGCACATTACTACCCTGCAATTAATATAATTGTGTCCATGTACACACGCTGTCCTATGTACTTATCATGACTCTATCCCAAATTCCCAATTACGTCCTATCTTCTTCTTAGGGAAGAACAGCTTAGGTATCAATTTGGTGTTCTGTGTAAAGTCTCAGGGAGCCGTCCGTGTCCTCCCATCTGGCCTCGTCCACACTGGTTCTCTTGAAAGCTTGGGCTGTAATGATGCCCCTTGGCCATCACCCAGTCCCTGCCCCATCTCTTGTAATCTCTCTCCTTTTTGCTGCATCCCTGTCTTCCTCTGTCTTGATTTACTTGTTGTTGGTTTTCTGTTTCTTTGTTTGATTTGGTGGAAGACATAATCCCACGCTTCCTATGGAAAGGTTGTTGGGAGATTTTTAATGATTCCTCAATGTTAAAATGTCTATTTTTGTCTTGACACCCAACTAATATTTGTCTGAGCAAAACAGTCTAGATGAGAGAGAACTTCCCTGGAGGTCTGATGGCGTTTCTCCCTCGTCTTCTTA",
686                    'chr2' :
687                        "TTCAAATGAACTTCTGTAATTGAAAAATTCATTTAAGAAATTACAAAATATAGTTGAAAGCTCTAACAATAGACTAAACCAAGCAGAAGAAAGAGGTTCAGAACTTGAAGACAAGTCTCTTATGAATTAACCCAGTCAGACAAAAATAAAGAAAAAAATTTTAAAAATGAACAGAGCTTTCAAGAAGTATGAGATTATGTAAAGTAACTGAACCTATGAGTCACAGGTATTCCTGAGGAAAAAGAAAAAGTGAGAAGTTTGGAAAAACTATTTGAGGAAGTAATTGGGGAAAACCTCTTTAGTCTTGCTAGAGATTTAGACATCTAAATGAAAGAGGCTCAAAGAATGCCAGGAAGATACATTGCAAGACAGACTTCATCAAGATATGTAGTCATCAGACTATCTAAAGTCAACATGAAGGAAAAAAATTCTAAAATCAGCAAGAGAAAAGCATACAGTCATCTATAAAGGAAATCCCATCAGAATAACAATGGGCTTCTCAGCAGAAACCTTACAAGCCAGAAGAGATTGGATCTAATTTTTGGACTTCTTAAAGAAAAAAAAACCTGTCAAACACGAATGTTATGCCCTGCTAAACTAAGCATCATAAATGAAGGGGAAATAAAGTCAAGTCTTTCCTGACAAGCAAATGCTAAGATAATTCATCATCACTAAACCAGTCCTATAAGAAATGCTCAAAAGAATTGTAAAAGTCAAAATTAAAGTTCAATACTCACCATCATAAATACACACAAAAGTACAAAACTCACAGGTTTTATAAAACAATTGAGACTACAGAGCAACTAGGTAAAAAATTAACATTACAACAGGAACAAAACCTCATATATCAATATTAACTTTGAATAAAAAGGGATTAAATTCCCCCACTTAAGAGATATAGATTGGCAGAACAGATTTAAAAACATGAACTAACTATATGCTGTTTACAAGAAACTCATTAATAAAGACATGAGTTCAGGTAAAGGGGTGGAAAAAGATGTTCTACGCAAACAGAAACCAAATGAGAGAAGGAGTAGCTATACTTATATCAGATAAAGCACACTTTAAATCAACAACAGTAAAATAAAACAAAGGAGGTCATCATACAATGATAAAAAGATCAATTCAGCAAGAAGATATAACCATCCTACTAAATACATATGCACCTAACACAAGACTACCCAGATTCATAAAACAAATACTACTAGACCTAAGAGGGATGAGAAATTACCTAATTGGTACAATGTACAATATTCTGATGATGGTTACACTAAAAGCCCATACTTTACTGCTACTCAATATATCCATGTAACAAATCTGCGCTTGTACTTCTAAATCTATAAAAAAATTAAAATTTAACAAAAGTAAATAAAACACATAGCTAAAACTAAAAAAGCAAAAACAAAAACTATGCTAAGTATTGGTAAAGATGTGGGGAAAAAAGTAAACTCTCAAATATTGCTAGTGGGAGTATAAATTGTTTTCCACTTTGGAAAACAATTTGGTAATTTCGTTTTTTTTTTTTTCTTTTCTCTTTTTTTTTTTTTTTTTTTTGCATGCCAGAAAAAAATATTTACAGTAACT",
688                    }
689
690     def setUp(self):
691         self.file=pysam.Fastafile( "ex1.fa" )
692
693     def testFetch(self):
694         for id, seq in self.mSequences.items():
695             self.assertEqual( seq, self.file.fetch( id ) )
696             for x in range( 0, len(seq), 10):
697                 self.assertEqual( seq[x:x+10], self.file.fetch( id, x, x+10) )
698                 # test x:end
699                 self.assertEqual( seq[x:], self.file.fetch( id, x) )
700                 # test 0:x
701                 self.assertEqual( seq[:x], self.file.fetch( id, None, x) )
702
703         
704         # unknown sequence returns ""
705         self.assertEqual( "", self.file.fetch("chr12") )
706
707     def testOutOfRangeAccess( self ):
708         '''test out of range access.'''
709         # out of range access returns an empty string
710         for contig, s in self.mSequences.iteritems():
711             self.assertEqual( self.file.fetch( contig, len(s), len(s)+1), "" )
712
713         self.assertEqual( self.file.fetch( "chr3", 0 , 100), "" ) 
714
715     def testFetchErrors( self ):
716         self.assertRaises( ValueError, self.file.fetch )
717         self.assertRaises( ValueError, self.file.fetch, "chr1", -1, 10 )
718         self.assertRaises( ValueError, self.file.fetch, "chr1", 20, 10 )
719
720     def testLength( self ):
721         self.assertEqual( len(self.file), 2 )
722         
723     def tearDown(self):
724         self.file.close()
725
726 class TestAlignedRead(unittest.TestCase):
727     '''tests to check if aligned read can be constructed
728     and manipulated.
729     '''
730
731     def checkFieldEqual( self, read1, read2, exclude = []):
732         '''check if two reads are equal by comparing each field.'''
733
734         for x in ("qname", "seq", "flag",
735                   "rname", "pos", "mapq", "cigar",
736                   "mrnm", "mpos", "isize", "qual",
737                   "is_paired", "is_proper_pair",
738                   "is_unmapped", "mate_is_unmapped",
739                   "is_reverse", "mate_is_reverse",
740                   "is_read1", "is_read2",
741                   "is_secondary", "is_qcfail",
742                   "is_duplicate", "bin"):
743             if x in exclude: continue
744             self.assertEqual( getattr(read1, x), getattr(read2,x), "attribute mismatch for %s: %s != %s" % 
745                               (x, getattr(read1, x), getattr(read2,x)))
746     
747     def testEmpty( self ):
748         a = pysam.AlignedRead()
749         self.assertEqual( a.qname, None )
750         self.assertEqual( a.seq, None )
751         self.assertEqual( a.qual, None )
752         self.assertEqual( a.flag, 0 )
753         self.assertEqual( a.rname, 0 )
754         self.assertEqual( a.mapq, 0 )
755         self.assertEqual( a.cigar, None )
756         self.assertEqual( a.tags, None )
757         self.assertEqual( a.mrnm, 0 )
758         self.assertEqual( a.mpos, 0 )
759         self.assertEqual( a.isize, 0 )
760
761     def buildRead( self ):
762         '''build an example read.'''
763         
764         a = pysam.AlignedRead()
765         a.qname = "read_12345"
766         a.seq="ACGT" * 3
767         a.flag = 0
768         a.rname = 0
769         a.pos = 33
770         a.mapq = 20
771         a.cigar = ( (0,10), (2,1), (0,25) )
772         a.mrnm = 0
773         a.mpos=200
774         a.isize=167
775         a.qual="1234" * 3
776         # todo: create tags
777         return a
778
779     def testUpdate( self ):
780         '''check if updating fields affects other variable length data
781         '''
782         a = self.buildRead()
783         b = self.buildRead()
784
785         # check qname
786         b.qname = "read_123"
787         self.checkFieldEqual( a, b, "qname" )
788         b.qname = "read_12345678"
789         self.checkFieldEqual( a, b, "qname" )
790         b.qname = "read_12345"
791         self.checkFieldEqual( a, b)
792
793         # check cigar
794         b.cigar = ( (0,10), )
795         self.checkFieldEqual( a, b, "cigar" )
796         b.cigar = ( (0,10), (2,1), (0,25), (2,1), (0,25) )
797         self.checkFieldEqual( a, b, "cigar" )
798         b.cigar = ( (0,10), (2,1), (0,25) )
799         self.checkFieldEqual( a, b)
800
801         # check seq 
802         b.seq = "ACGT"
803         self.checkFieldEqual( a, b, ("seq", "qual") )
804         b.seq = "ACGT" * 10
805         self.checkFieldEqual( a, b, ("seq", "qual") )
806         b.seq = "ACGT" * 3
807         self.checkFieldEqual( a, b, ("qual",))
808
809         # reset qual
810         b = self.buildRead()
811
812         # check flags:
813         for x in (
814             "is_paired", "is_proper_pair",
815             "is_unmapped", "mate_is_unmapped",
816             "is_reverse", "mate_is_reverse",
817             "is_read1", "is_read2",
818             "is_secondary", "is_qcfail",
819             "is_duplicate"):
820             setattr( b, x, True )
821             self.assertEqual( getattr(b, x), True )
822             self.checkFieldEqual( a, b, ("flag", x,) )
823             setattr( b, x, False )
824             self.assertEqual( getattr(b, x), False )
825             self.checkFieldEqual( a, b )
826
827     def testLargeRead( self ):
828         '''build an example read.'''
829         
830         a = pysam.AlignedRead()
831         a.qname = "read_12345"
832         a.seq="ACGT" * 200
833         a.flag = 0
834         a.rname = 0
835         a.pos = 33
836         a.mapq = 20
837         a.cigar = ( (0,10), (2,1), (0,25) )
838         a.mrnm = 0
839         a.mpos=200
840         a.isize=167
841         a.qual="1234" * 200
842
843         return a
844
845     def testTagParsing( self ):
846         '''test for tag parsing
847
848         see http://groups.google.com/group/pysam-user-group/browse_thread/thread/67ca204059ea465a
849         '''
850         samfile=pysam.Samfile( "ex8.bam","rb" )
851
852         for entry in samfile:
853             before = entry.tags
854             entry.tags = entry.tags
855             after = entry.tags
856             self.assertEqual( after, before )
857
858 class TestDeNovoConstruction(unittest.TestCase):
859     '''check BAM/SAM file construction using ex3.sam
860     
861     (note these are +1 coordinates):
862     
863     read_28833_29006_6945       99      chr1    33      20      10M1D25M        =       200     167     AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG     <<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<     NM:i:1  RG:Z:L1
864     read_28701_28881_323b       147     chr2    88      30      35M     =       500     412     ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA     <<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<     MF:i:18 RG:Z:L2
865     '''
866
867     header = { 'HD': {'VN': '1.0'},
868                'SQ': [{'LN': 1575, 'SN': 'chr1'}, 
869                       {'LN': 1584, 'SN': 'chr2'}], }
870
871     bamfile = "ex6.bam"
872     samfile = "ex6.sam"
873
874     def checkFieldEqual( self, read1, read2, exclude = []):
875         '''check if two reads are equal by comparing each field.'''
876
877         for x in ("qname", "seq", "flag",
878                   "rname", "pos", "mapq", "cigar",
879                   "mrnm", "mpos", "isize", "qual",
880                   "bin",
881                   "is_paired", "is_proper_pair",
882                   "is_unmapped", "mate_is_unmapped",
883                   "is_reverse", "mate_is_reverse",
884                   "is_read1", "is_read2",
885                   "is_secondary", "is_qcfail",
886                   "is_duplicate"):
887             if x in exclude: continue
888             self.assertEqual( getattr(read1, x), getattr(read2,x), "attribute mismatch for %s: %s != %s" % 
889                               (x, getattr(read1, x), getattr(read2,x)))
890
891     def setUp( self ):
892
893         
894         a = pysam.AlignedRead()
895         a.qname = "read_28833_29006_6945"
896         a.seq="AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG"
897         a.flag = 99
898         a.rname = 0
899         a.pos = 32
900         a.mapq = 20
901         a.cigar = ( (0,10), (2,1), (0,25) )
902         a.mrnm = 0
903         a.mpos=199
904         a.isize=167
905         a.qual="<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<"
906         a.tags = ( ("NM", 1),
907                    ("RG", "L1") )
908
909         b = pysam.AlignedRead()
910         b.qname = "read_28701_28881_323b"
911         b.seq="ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA"
912         b.flag = 147
913         b.rname = 1
914         b.pos = 87
915         b.mapq = 30
916         b.cigar = ( (0,35), )
917         b.mrnm = 1
918         b.mpos=499
919         b.isize=412
920         b.qual="<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<"
921         b.tags = ( ("MF", 18),
922                    ("RG", "L2") )
923
924         self.reads = (a,b)
925
926     def testSAMWholeFile( self ):
927         
928         tmpfilename = "tmp_%i.sam" % id(self)
929
930         outfile = pysam.Samfile( tmpfilename, "wh", header = self.header )
931
932         for x in self.reads: outfile.write( x )
933         outfile.close()
934         
935         self.assertTrue( checkBinaryEqual( tmpfilename, self.samfile ),
936                          "mismatch when construction SAM file, see %s %s" % (tmpfilename, self.samfile))
937         
938         os.unlink( tmpfilename )
939
940     def testBAMPerRead( self ):
941         '''check if individual reads are binary equal.'''
942         infile = pysam.Samfile( self.bamfile, "rb")
943
944         others = list(infile)
945         for denovo, other in zip( others, self.reads):
946             self.checkFieldEqual( other, denovo )
947             self.assertEqual( other.compare( denovo ), 0 )
948
949     def testSAMPerRead( self ):
950         '''check if individual reads are binary equal.'''
951         infile = pysam.Samfile( self.samfile, "r")
952
953         others = list(infile)
954         for denovo, other in zip( others, self.reads):
955             self.checkFieldEqual( other, denovo )
956             self.assertEqual( other.compare( denovo), 0 )
957             
958     def testBAMWholeFile( self ):
959         
960         tmpfilename = "tmp_%i.bam" % id(self)
961
962         outfile = pysam.Samfile( tmpfilename, "wb", header = self.header )
963
964         for x in self.reads: outfile.write( x )
965         outfile.close()
966         
967         self.assertTrue( checkBinaryEqual( tmpfilename, self.bamfile ),
968                          "mismatch when construction BAM file, see %s %s" % (tmpfilename, self.bamfile))
969         
970         os.unlink( tmpfilename )
971
972
973 class TestDoubleFetch(unittest.TestCase):
974     '''check if two iterators on the same bamfile are independent.'''
975     
976     def testDoubleFetch( self ):
977
978         samfile1 = pysam.Samfile('ex1.bam', 'rb')
979
980         for a,b in zip(samfile1.fetch(), samfile1.fetch()):
981             self.assertEqual( a.compare( b ), 0 )
982
983     def testDoubleFetchWithRegion( self ):
984
985         samfile1 = pysam.Samfile('ex1.bam', 'rb')
986         chr, start, stop = 'chr1', 200, 3000000
987         self.assertTrue(len(list(samfile1.fetch ( chr, start, stop))) > 0) #just making sure the test has something to catch
988
989         for a,b in zip(samfile1.fetch( chr, start, stop), samfile1.fetch( chr, start, stop)):
990             self.assertEqual( a.compare( b ), 0 ) 
991
992     def testDoubleFetchUntilEOF( self ):
993
994         samfile1 = pysam.Samfile('ex1.bam', 'rb')
995
996         for a,b in zip(samfile1.fetch( until_eof = True), 
997                        samfile1.fetch( until_eof = True )):
998             self.assertEqual( a.compare( b), 0 )
999
1000 class TestRemoteFileFTP(unittest.TestCase):
1001     '''test remote access.
1002
1003     '''
1004
1005     # Need to find an ftp server without password on standard
1006     # port.
1007
1008     url = "ftp://ftp.sanger.ac.uk/pub/rd/humanSequences/CV.bam"
1009     region = "1:1-1000"
1010
1011     def testFTPView( self ):
1012         return
1013         result = pysam.view( self.url, self.region )
1014         self.assertEqual( len(result), 36 )
1015         
1016     def testFTPFetch( self ):
1017         return
1018         samfile = pysam.Samfile(self.url, "rb")  
1019         result = list(samfile.fetch( region = self.region ))
1020         self.assertEqual( len(result), 36 )
1021
1022 class TestRemoteFileHTTP( unittest.TestCase):
1023
1024     url = "http://genserv.anat.ox.ac.uk/downloads/pysam/test/ex1.bam"
1025     region = "chr1:1-1000"
1026     local = "ex1.bam"
1027
1028     def testView( self ):
1029         samfile_local = pysam.Samfile(self.local, "rb")  
1030         ref = list(samfile_local.fetch( region = self.region ))
1031         
1032         result = pysam.view( self.url, self.region )
1033         self.assertEqual( len(result), len(ref) )
1034         
1035     def testFetch( self ):
1036         samfile = pysam.Samfile(self.url, "rb")  
1037         result = list(samfile.fetch( region = self.region ))
1038         samfile_local = pysam.Samfile(self.local, "rb")  
1039         ref = list(samfile_local.fetch( region = self.region ))
1040
1041         self.assertEqual( len(ref), len(result) )
1042         for x, y in zip(result, ref):
1043             self.assertEqual( x.compare( y ), 0 )
1044
1045     def testFetchAll( self ):
1046         samfile = pysam.Samfile(self.url, "rb")  
1047         result = list(samfile.fetch())
1048         samfile_local = pysam.Samfile(self.local, "rb")  
1049         ref = list(samfile_local.fetch() )
1050
1051         self.assertEqual( len(ref), len(result) )
1052         for x, y in zip(result, ref):
1053             self.assertEqual( x.compare( y ), 0 )
1054
1055 class TestLargeOptValues( unittest.TestCase ):
1056
1057     ints = ( 65536, 214748, 2147484, 2147483647 )
1058     floats = ( 65536.0, 214748.0, 2147484.0 )
1059
1060     def check( self, samfile ):
1061         
1062         i = samfile.fetch()
1063         for exp in self.ints:
1064             rr = i.next()
1065             obs = rr.opt("ZP")
1066             self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1067
1068         for exp in [ -x for x in self.ints ]:
1069             rr = i.next()
1070             obs = rr.opt("ZP")
1071             self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1072
1073         for exp in self.floats:
1074             rr = i.next()
1075             obs = rr.opt("ZP")
1076             self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1077
1078         for exp in [ -x for x in self.floats ]:
1079             rr = i.next()
1080             obs = rr.opt("ZP")
1081             self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1082
1083     def testSAM( self ):
1084         samfile = pysam.Samfile("ex10.sam", "r")
1085         self.check( samfile )
1086
1087     def testBAM( self ):
1088         samfile = pysam.Samfile("ex10.bam", "rb")
1089         self.check( samfile )
1090
1091 class TestSNPCalls( unittest.TestCase ):
1092     '''test pysam SNP calling ability.'''
1093
1094     def checkEqual( self, a, b ):
1095         for x in ("reference_base", 
1096                   "pos",
1097                   "genotype",
1098                   "consensus_quality",
1099                   "snp_quality",
1100                   "mapping_quality",
1101                   "coverage" ):
1102             self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" % 
1103                               (x, getattr(a, x), getattr(b,x), str(a), str(b)))
1104
1105     def testAllPositionsViaIterator( self ):
1106         samfile = pysam.Samfile( "ex1.bam", "rb")  
1107         fastafile = pysam.Fastafile( "ex1.fa" )
1108         try: 
1109             refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base != "*"]
1110         except pysam.SamtoolsError:
1111             pass
1112
1113         i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1114         calls = list(pysam.IteratorSNPCalls(i))
1115         for x,y in zip( refs, calls ):
1116             self.checkEqual( x, y )
1117
1118     def testPerPositionViaIterator( self ):
1119         # test pileup for each position. This is a slow operation
1120         # so this test is disabled 
1121         return
1122         samfile = pysam.Samfile( "ex1.bam", "rb")  
1123         fastafile = pysam.Fastafile( "ex1.fa" )
1124         for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1125             if x.reference_base == "*": continue
1126             i = samfile.pileup( x.chromosome, x.pos, x.pos+1,
1127                                 fastafile = fastafile,
1128                                 stepper = "samtools" )
1129             z = [ zz for zz in pysam.IteratorSamtools(i) if zz.pos == x.pos ]
1130             self.assertEqual( len(z), 1 )
1131             self.checkEqual( x, z[0] )
1132
1133     def testPerPositionViaCaller( self ):
1134         # test pileup for each position. This is a fast operation
1135         samfile = pysam.Samfile( "ex1.bam", "rb")  
1136         fastafile = pysam.Fastafile( "ex1.fa" )
1137         i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1138         caller = pysam.SNPCaller( i )
1139
1140         for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1141             if x.reference_base == "*": continue
1142             call = caller.call( x.chromosome, x.pos )
1143             self.checkEqual( x, call )
1144
1145 class TestIndelCalls( unittest.TestCase ):
1146     '''test pysam indel calling.'''
1147
1148     def checkEqual( self, a, b ):
1149
1150         for x in ("pos",
1151                   "genotype",
1152                   "consensus_quality",
1153                   "snp_quality",
1154                   "mapping_quality",
1155                   "coverage",
1156                   "first_allele",
1157                   "second_allele",
1158                   "reads_first",
1159                   "reads_second",
1160                   "reads_diff"):
1161             if b.genotype == "*/*" and x == "second_allele":
1162                 # ignore test for second allele (positions chr2:439 and chr2:1512)
1163                 continue
1164             self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" % 
1165                               (x, getattr(a, x), getattr(b,x), str(a), str(b)))
1166
1167     def testAllPositionsViaIterator( self ):
1168
1169         samfile = pysam.Samfile( "ex1.bam", "rb")  
1170         fastafile = pysam.Fastafile( "ex1.fa" )
1171         try: 
1172             refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base == "*"]
1173         except pysam.SamtoolsError:
1174             pass
1175
1176         i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1177         calls = [ x for x in list(pysam.IteratorIndelCalls(i)) if x != None ]
1178         for x,y in zip( refs, calls ):
1179             self.checkEqual( x, y )
1180
1181     def testPerPositionViaCaller( self ):
1182         # test pileup for each position. This is a fast operation
1183         samfile = pysam.Samfile( "ex1.bam", "rb")  
1184         fastafile = pysam.Fastafile( "ex1.fa" )
1185         i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1186         caller = pysam.IndelCaller( i )
1187
1188         for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1189             if x.reference_base != "*": continue
1190             call = caller.call( x.chromosome, x.pos )
1191             self.checkEqual( x, call )
1192
1193 class TestLogging( unittest.TestCase ):
1194     '''test around bug issue 42,
1195
1196     failed in versions < 0.4
1197     '''
1198
1199     def check( self, bamfile, log ):
1200
1201         if log:
1202             logger = logging.getLogger('franklin')
1203             logger.setLevel(logging.INFO)
1204             formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
1205             log_hand  = logging.FileHandler('log.txt')
1206             log_hand.setFormatter(formatter)
1207             logger.addHandler(log_hand)
1208
1209         bam  = pysam.Samfile(bamfile, 'rb')
1210         cols = bam.pileup()
1211         self.assert_( True )
1212
1213     def testFail1( self ):
1214         self.check( "ex9_fail.bam", False )
1215         self.check( "ex9_fail.bam", True )
1216
1217     def testNoFail1( self ):
1218         self.check( "ex9_nofail.bam", False )
1219         self.check( "ex9_nofail.bam", True )
1220
1221     def testNoFail2( self ):
1222         self.check( "ex9_nofail.bam", True )
1223         self.check( "ex9_nofail.bam", True )
1224         
1225 # TODOS
1226 # 1. finish testing all properties within pileup objects
1227 # 2. check exceptions and bad input problems (missing files, optional fields that aren't present, etc...)
1228 # 3. check: presence of sequence
1229
1230 class TestSamfileUtilityFunctions( unittest.TestCase ):
1231
1232     def testCount( self ):
1233
1234         samfile = pysam.Samfile( "ex1.bam", "rb" )
1235
1236         for contig in ("chr1", "chr2" ):
1237             for start in xrange( 0, 2000, 100 ):
1238                 end = start + 1
1239                 self.assertEqual( len( list( samfile.fetch( contig, start, end ) ) ),
1240                                   samfile.count( contig, start, end ) )
1241
1242                 # test empty intervals
1243                 self.assertEqual( len( list( samfile.fetch( contig, start, start ) ) ),
1244                                   samfile.count( contig, start, start ) )
1245
1246                 # test half empty intervals
1247                 self.assertEqual( len( list( samfile.fetch( contig, start ) ) ),
1248                                   samfile.count( contig, start ) )
1249
1250     def testMate( self ):
1251         '''test mate access.'''
1252
1253         readnames = [ x.split("\t")[0] for x in open( "ex1.sam", "rb" ).readlines() ]
1254         counts = collections.defaultdict( int )
1255         for x in readnames: counts[x] += 1
1256
1257         samfile = pysam.Samfile( "ex1.bam", "rb" )
1258         for read in samfile.fetch():
1259             if not read.is_paired:
1260                 self.assertRaises( ValueError, samfile.mate, read )
1261             elif read.mate_is_unmapped:
1262                 self.assertRaises( ValueError, samfile.mate, read )
1263             else:
1264                 if counts[read.qname] == 1:
1265                     self.assertRaises( ValueError, samfile.mate, read )
1266                 else:
1267                     mate = samfile.mate( read )
1268                     self.assertEqual( read.qname, mate.qname )
1269                     self.assertEqual( read.is_read1, mate.is_read2 )
1270                     self.assertEqual( read.is_read2, mate.is_read1 )
1271                     self.assertEqual( read.pos, mate.mpos )
1272                     self.assertEqual( read.mpos, mate.pos )
1273
1274 class TestSamtoolsProxy( unittest.TestCase ):
1275     '''tests for sanity checking access to samtools functions.'''
1276
1277     def testIndex( self ):
1278         self.assertRaises( IOError, pysam.index, "missing_file" )
1279
1280     def testView( self ):
1281         # note that view still echos "open: No such file or directory"
1282         self.assertRaises( pysam.SamtoolsError, pysam.view, "missing_file" )
1283
1284     def testSort( self ):
1285         self.assertRaises( pysam.SamtoolsError, pysam.sort, "missing_file" )
1286
1287 class TestSamfileIndex( unittest.TestCase):
1288     
1289     def testIndex( self ):
1290         samfile = pysam.Samfile( "ex1.bam", "rb" )
1291         index = pysam.IndexedReads( samfile )
1292         index.build()
1293
1294         reads = collections.defaultdict( int )
1295
1296         for read in samfile: reads[read.qname] += 1
1297             
1298         for qname, counts in reads.iteritems():
1299             found = list(index.find( qname ))
1300             self.assertEqual( len(found), counts )
1301             for x in found: self.assertEqual( x.qname, qname )
1302             
1303
1304 if __name__ == "__main__":
1305     # build data files
1306     print "building data files"
1307     subprocess.call( "make", shell=True)
1308     print "starting tests"
1309     unittest.main()