Imported Upstream version 0.2
[pysam.git] / tests / pysam_test.py
1 #!/usr/bin/env python
2 '''unit testing code for pysam.
3
4 Execute in the :file:`tests` directory as it requires the Makefile
5 and data files located there.
6 '''
7
8 import pysam
9 import unittest
10 import os
11 import itertools
12 import subprocess
13 import shutil
14
15
16 def checkBinaryEqual( filename1, filename2 ):
17     '''return true if the two files are binary equal.'''
18     if os.path.getsize( filename1 ) !=  os.path.getsize( filename2 ):
19         return False
20
21     infile1 = open(filename1, "rb")
22     infile2 = open(filename2, "rb")
23
24     def chariter( infile ):
25         while 1:
26             c = infile.read(1)
27             if c == "": break
28             yield c
29
30     found = False
31     for c1,c2 in itertools.izip( chariter( infile1), chariter( infile2) ):
32         if c1 != c2: break
33     else:
34         found = True
35
36     infile1.close()
37     infile2.close()
38     return found
39
40 def runSamtools( cmd ):
41     '''run a samtools command'''
42
43     try:
44         retcode = subprocess.call(cmd, shell=True)
45         if retcode < 0:
46             print >>sys.stderr, "Child was terminated by signal", -retcode
47     except OSError, e:
48         print >>sys.stderr, "Execution failed:", e
49
50         
51 class BinaryTest(unittest.TestCase):
52     '''test samtools command line commands and compare
53     against pysam commands.
54
55     Tests fail, if the output is not binary identical.
56     '''
57
58     first_time = True
59
60     # a list of commands to test
61     mCommands = \
62         { "faidx" : \
63         ( 
64             ("ex1.fa.fai", "samtools faidx ex1.fa"), 
65             ("pysam_ex1.fa.fai", (pysam.faidx, "ex1.fa") ),
66             ),
67           "import" :
68               (
69                 ("ex1.bam", "samtools import ex1.fa.fai ex1.sam.gz ex1.bam" ),
70                 ("pysam_ex1.bam", (pysam.samimport, "ex1.fa.fai ex1.sam.gz pysam_ex1.bam") ),
71                 ),
72           "index":
73               (
74                 ("ex1.bam.bai", "samtools index ex1.bam" ),
75                 ("pysam_ex1.bam.bai", (pysam.index, "pysam_ex1.bam" ) ),
76                 ),
77           "pileup1" :
78           (
79                 ("ex1.pileup", "samtools pileup -cf ex1.fa ex1.bam > ex1.pileup" ),
80                 ("pysam_ex1.pileup", (pysam.pileup, "-c -f ex1.fa ex1.bam" ) )
81                 ),
82           "pileup2" :
83           (
84                 ("ex1.glf", "samtools pileup -gf ex1.fa ex1.bam > ex1.glf" ),
85                 ("pysam_ex1.glf", (pysam.pileup, "-g -f ex1.fa ex1.bam" ) )
86                 ),
87           "glfview" :
88         (
89                 ("ex1.glfview", "samtools glfview ex1.glf > ex1.glfview"),
90                 ("pysam_ex1.glfview", (pysam.glfview, "ex1.glf" ) ),
91                 ),
92           "view" :
93         (
94                 ("ex1.view", "samtools view ex1.bam > ex1.view"),
95                 ("pysam_ex1.view", (pysam.view, "ex1.bam" ) ),
96                 ),
97         }
98
99     # some tests depend on others. The order specifies in which order
100     # the samtools commands are executed.
101     mOrder = ('faidx', 'import', 'index', 'pileup1', 'pileup2', 'glfview', 'view' )
102
103     def setUp( self ):
104         '''setup tests. 
105
106         For setup, all commands will be run before the first test is
107         executed. Individual tests will then just compare the output
108         files.
109         '''
110         if BinaryTest.first_time:
111             # copy the source 
112             shutil.copy( "ex1.fa", "pysam_ex1.fa" )
113
114             for label in self.mOrder:
115                 command = self.mCommands[label]
116                 samtools_target, samtools_command = command[0]
117                 pysam_target, pysam_command = command[1]
118                 runSamtools( samtools_command )
119                 pysam_method, pysam_options = pysam_command
120                 output = pysam_method( *pysam_options.split(" "), raw=True)
121                 if ">" in samtools_command:
122                     outfile = open( pysam_target, "w" )
123                     for line in output: outfile.write( line )
124                     outfile.close()
125
126             BinaryTest.first_time = False
127
128     def checkCommand( self, command ):
129         if command:
130             samtools_target, pysam_target = self.mCommands[command][0][0], self.mCommands[command][1][0]
131             self.assertTrue( checkBinaryEqual( samtools_target, pysam_target ), 
132                              "%s failed: files %s and %s are not the same" % (command, samtools_target, pysam_target) )
133
134     def testImport( self ):
135         self.checkCommand( "import" )
136
137     def testIndex( self ):
138         self.checkCommand( "index" )
139         
140     def testPileup1( self ):
141         self.checkCommand( "pileup1" )
142     
143     def testPileup2( self ):
144         self.checkCommand( "pileup2" )
145
146     def testGLFView( self ):
147         self.checkCommand( "glfview" )
148
149     def testView( self ):
150         self.checkCommand( "view" )
151
152     def testEmptyIndex( self ):
153         self.assertRaises( pysam.SamtoolsError, pysam.index, "exdoesntexist.bam" )
154
155     def __del__(self):
156
157         for label, command in self.mCommands.iteritems():
158             samtools_target, samtools_command = command[0]
159             pysam_target, pysam_command = command[1]
160             if os.path.exists( samtools_target): os.remove( samtools_target )
161             if os.path.exists( pysam_target): os.remove( pysam_target )
162         if os.path.exists( "pysam_ex1.fa" ): os.remove( "pysam_ex1.fa" )
163
164 class IOTest(unittest.TestCase):
165     '''check if reading samfile and writing a samfile are consistent.'''
166
167     def checkEcho( self, input_filename, reference_filename, 
168                    output_filename, 
169                    input_mode, output_mode, use_template = True):
170         '''iterate through *input_filename* writing to *output_filename* and
171         comparing the output to *reference_filename*. 
172         
173         The files are opened according to the *input_mode* and *output_mode*.
174
175         If *use_template* is set, the header is copied from infile using the
176         template mechanism, otherwise target names and lengths are passed explicitely. 
177         '''
178
179         infile = pysam.Samfile( input_filename, input_mode )
180         if use_template:
181             outfile = pysam.Samfile( output_filename, output_mode, template = infile )
182         else:
183             outfile = pysam.Samfile( output_filename, output_mode, 
184                                      referencenames = infile.references,
185                                      referencelengths = infile.lengths )
186
187         iter = infile.fetch()
188         for x in iter: outfile.write( x )
189         infile.close()
190         outfile.close()
191
192         self.assertTrue( checkBinaryEqual( reference_filename, output_filename), 
193                          "files %s and %s are not the same" % (reference_filename, output_filename) )
194
195     def testReadWriteBam( self ):
196         
197         input_filename = "ex1.bam"
198         output_filename = "pysam_ex1.bam"
199         reference_filename = "ex1.bam"
200
201         self.checkEcho( input_filename, reference_filename, output_filename,
202                         "rb", "wb" )
203
204     def testReadWriteBamWithTargetNames( self ):
205         
206         input_filename = "ex1.bam"
207         output_filename = "pysam_ex1.bam"
208         reference_filename = "ex1.bam"
209
210         self.checkEcho( input_filename, reference_filename, output_filename,
211                         "rb", "wb", use_template = False )
212
213     def testReadWriteSamWithHeader( self ):
214         
215         input_filename = "ex2.sam"
216         output_filename = "pysam_ex2.sam"
217         reference_filename = "ex2.sam"
218
219         self.checkEcho( input_filename, reference_filename, output_filename,
220                         "r", "wh" )
221
222     def testReadWriteSamWithoutHeader( self ):
223         
224         input_filename = "ex2.sam"
225         output_filename = "pysam_ex2.sam"
226         reference_filename = "ex1.sam"
227
228         self.checkEcho( input_filename, reference_filename, output_filename,
229                         "r", "w" )
230
231     def testFetchFromClosedFile( self ):
232
233         samfile = pysam.Samfile( "ex1.bam", "rb" )
234         samfile.close()
235         self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
236
237     def testPileupFromClosedFile( self ):
238
239         samfile = pysam.Samfile( "ex1.bam", "rb" )
240         samfile.close()
241         self.assertRaises( ValueError, samfile.pileup, 'chr1', 100, 120)
242
243     def testBinaryReadFromSamfile( self ):
244         pass
245         # needs to re-activated, see issue 19
246         #samfile = pysam.Samfile( "ex1.bam", "r" )
247         #samfile.fetch().next()
248
249     def testReadingFromFileWithoutIndex( self ):
250         '''read from bam file without index.'''
251
252         assert not os.path.exists( "ex2.bam.bai" )
253         samfile = pysam.Samfile( "ex2.bam", "rb" )
254         self.assertRaises( ValueError, samfile.fetch )
255         self.assertEqual( len(list( samfile.fetch(until_eof = True) )), 3270 )
256
257 class TestIteratorRow(unittest.TestCase):
258
259     def setUp(self):
260         self.samfile=pysam.Samfile( "ex1.bam","rb" )
261
262     def checkRange( self, rnge ):
263         '''compare results from iterator with those from samtools.'''
264         ps = list(self.samfile.fetch(region=rnge))
265         sa = list(pysam.view( "ex1.bam", rnge , raw = True) )
266         self.assertEqual( len(ps), len(sa), "unequal number of results for range %s: %i != %i" % (rnge, len(ps), len(sa) ))
267         # check if the same reads are returned and in the same order
268         for line, pair in enumerate( zip( ps, sa ) ):
269             data = pair[1].split("\t")
270             self.assertEqual( pair[0].qname, data[0], "read id mismatch in line %i: %s != %s" % (line, pair[0].rname, data[0]) )
271
272     def testIteratePerContig(self):
273         '''check random access per contig'''
274         for contig in self.samfile.references:
275             self.checkRange( contig )
276
277     def testIterateRanges(self):
278         '''check random access per range'''
279         for contig, length in zip(self.samfile.references, self.samfile.lengths):
280             for start in range( 1, length, 90):
281                 self.checkRange( "%s:%i-%i" % (contig, start, start + 90) ) # this includes empty ranges
282
283     def tearDown(self):
284         self.samfile.close()
285
286 class TestIteratorRowAll(unittest.TestCase):
287
288     def setUp(self):
289         self.samfile=pysam.Samfile( "ex1.bam","rb" )
290
291     def testIterate(self):
292         '''compare results from iterator with those from samtools.'''
293         ps = list(self.samfile.fetch())
294         sa = list(pysam.view( "ex1.bam", raw = True) )
295         self.assertEqual( len(ps), len(sa), "unequal number of results: %i != %i" % (len(ps), len(sa) ))
296         # check if the same reads are returned
297         for line, pair in enumerate( zip( ps, sa ) ):
298             data = pair[1].split("\t")
299             self.assertEqual( pair[0].qname, data[0], "read id mismatch in line %i: %s != %s" % (line, pair[0].rname, data[0]) )
300
301     def tearDown(self):
302         self.samfile.close()
303
304 class TestIteratorColumn(unittest.TestCase):
305     '''test iterator column against contents of ex3.bam.'''
306     
307     # note that samfile contains 1-based coordinates
308     # 1D means deletion with respect to reference sequence
309     # 
310     mCoverages = { 'chr1' : [ 0 ] * 20 + [1] * 36 + [0] * (100 - 20 -35 ),
311                    'chr2' : [ 0 ] * 20 + [1] * 35 + [0] * (100 - 20 -35 ),
312                    }
313
314     def setUp(self):
315         self.samfile=pysam.Samfile( "ex4.bam","rb" )
316
317     def checkRange( self, rnge ):
318         '''compare results from iterator with those from samtools.'''
319         # check if the same reads are returned and in the same order
320         for column in self.samfile.pileup(region=rnge):
321             thiscov = len(column.pileups)
322             refcov = self.mCoverages[self.samfile.getrname(column.tid)][column.pos]
323             self.assertEqual( thiscov, refcov, "wrong coverage at pos %s:%i %i should be %i" % (self.samfile.getrname(column.tid), column.pos, thiscov, refcov))
324
325     def testIterateAll(self):
326         '''check random access per contig'''
327         self.checkRange( None )
328
329     def testIteratePerContig(self):
330         '''check random access per contig'''
331         for contig in self.samfile.references:
332             self.checkRange( contig )
333
334     def testIterateRanges(self):
335         '''check random access per range'''
336         for contig, length in zip(self.samfile.references, self.samfile.lengths):
337             for start in range( 1, length, 90):
338                 self.checkRange( "%s:%i-%i" % (contig, start, start + 90) ) # this includes empty ranges
339
340     def testInverse( self ):
341         '''test the inverse, is point-wise pileup accurate.'''
342         for contig, refseq in self.mCoverages.items():
343             refcolumns = sum(refseq)
344             for pos, refcov in enumerate( refseq ):
345                 columns = list(self.samfile.pileup( contig, pos, pos+1) )
346                 if refcov == 0:
347                     # if no read, no coverage
348                     self.assertEqual( len(columns), refcov, "wrong number of pileup columns returned for position %s:%i, %i should be %i" %(contig,pos,len(columns), refcov) )
349                 elif refcov == 1:
350                     # one read, all columns of the read are returned
351                     self.assertEqual( len(columns), refcolumns, "pileup incomplete - %i should be %i " % (len(columns), refcolumns))
352                     
353     def tearDown(self):
354         self.samfile.close()
355     
356 class TestAlignedReadFromBam(unittest.TestCase):
357
358     def setUp(self):
359         self.samfile=pysam.Samfile( "ex3.bam","rb" )
360         self.reads=list(self.samfile.fetch())
361
362     def testARqname(self):
363         self.assertEqual( self.reads[0].qname, "read_28833_29006_6945", "read name mismatch in read 1: %s != %s" % (self.reads[0].qname, "read_28833_29006_6945") )
364         self.assertEqual( self.reads[1].qname, "read_28701_28881_323b", "read name mismatch in read 2: %s != %s" % (self.reads[1].qname, "read_28701_28881_323b") )
365
366     def testARflag(self):
367         self.assertEqual( self.reads[0].flag, 99, "flag mismatch in read 1: %s != %s" % (self.reads[0].flag, 99) )
368         self.assertEqual( self.reads[1].flag, 147, "flag mismatch in read 2: %s != %s" % (self.reads[1].flag, 147) )
369
370     def testARrname(self):
371         self.assertEqual( self.reads[0].rname, 0, "chromosome/target id mismatch in read 1: %s != %s" % (self.reads[0].rname, 0) )
372         self.assertEqual( self.reads[1].rname, 1, "chromosome/target id mismatch in read 2: %s != %s" % (self.reads[1].rname, 1) )
373
374     def testARpos(self):
375         self.assertEqual( self.reads[0].pos, 33-1, "mapping position mismatch in read 1: %s != %s" % (self.reads[0].pos, 33-1) )
376         self.assertEqual( self.reads[1].pos, 88-1, "mapping position mismatch in read 2: %s != %s" % (self.reads[1].pos, 88-1) )
377
378     def testARmapq(self):
379         self.assertEqual( self.reads[0].mapq, 20, "mapping quality mismatch in read 1: %s != %s" % (self.reads[0].mapq, 20) )
380         self.assertEqual( self.reads[1].mapq, 30, "mapping quality mismatch in read 2: %s != %s" % (self.reads[1].mapq, 30) )
381
382     def testARcigar(self):
383         self.assertEqual( self.reads[0].cigar, [(0, 10), (2, 1), (0, 25)], "read name length mismatch in read 1: %s != %s" % (self.reads[0].cigar, [(0, 10), (2, 1), (0, 25)]) )
384         self.assertEqual( self.reads[1].cigar, [(0, 35)], "read name length mismatch in read 2: %s != %s" % (self.reads[1].cigar, [(0, 35)]) )
385
386     def testARmrnm(self):
387         self.assertEqual( self.reads[0].mrnm, 0, "mate reference sequence name mismatch in read 1: %s != %s" % (self.reads[0].mrnm, 0) )
388         self.assertEqual( self.reads[1].mrnm, 1, "mate reference sequence name mismatch in read 2: %s != %s" % (self.reads[1].mrnm, 1) )
389
390     def testARmpos(self):
391         self.assertEqual( self.reads[0].mpos, 200-1, "mate mapping position mismatch in read 1: %s != %s" % (self.reads[0].mpos, 200-1) )
392         self.assertEqual( self.reads[1].mpos, 500-1, "mate mapping position mismatch in read 2: %s != %s" % (self.reads[1].mpos, 500-1) )
393
394     def testARisize(self):
395         self.assertEqual( self.reads[0].isize, 167, "insert size mismatch in read 1: %s != %s" % (self.reads[0].isize, 167) )
396         self.assertEqual( self.reads[1].isize, 412, "insert size mismatch in read 2: %s != %s" % (self.reads[1].isize, 412) )
397
398     def testARseq(self):
399         self.assertEqual( self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 1: %s != %s" % (self.reads[0].seq, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
400         self.assertEqual( self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "sequence size mismatch in read 2: %s != %s" % (self.reads[1].seq, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
401
402     def testARqual(self):
403         self.assertEqual( self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 1: %s != %s" % (self.reads[0].qual, "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
404         self.assertEqual( self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "quality string mismatch in read 2: %s != %s" % (self.reads[1].qual, "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
405
406     def testPresentOptionalFields(self):
407         self.assertEqual( self.reads[0].opt('NM'), 1, "optional field mismatch in read 1, NM: %s != %s" % (self.reads[0].opt('NM'), 1) )
408         self.assertEqual( self.reads[0].opt('RG'), 'L1', "optional field mismatch in read 1, RG: %s != %s" % (self.reads[0].opt('RG'), 'L1') )
409         self.assertEqual( self.reads[1].opt('RG'), 'L2', "optional field mismatch in read 2, RG: %s != %s" % (self.reads[1].opt('RG'), 'L2') )
410         self.assertEqual( self.reads[1].opt('MF'), 18, "optional field mismatch in read 2, MF: %s != %s" % (self.reads[1].opt('MF'), 18) )
411
412     def testPairedBools(self):
413         self.assertEqual( self.reads[0].is_paired, True, "is paired mismatch in read 1: %s != %s" % (self.reads[0].is_paired, True) )
414         self.assertEqual( self.reads[1].is_paired, True, "is paired mismatch in read 2: %s != %s" % (self.reads[1].is_paired, True) )
415         self.assertEqual( self.reads[0].is_proper_pair, True, "is proper pair mismatch in read 1: %s != %s" % (self.reads[0].is_proper_pair, True) )
416         self.assertEqual( self.reads[1].is_proper_pair, True, "is proper pair mismatch in read 2: %s != %s" % (self.reads[1].is_proper_pair, True) )
417
418     def testTags( self ):
419         self.assertEqual( self.reads[0].tags, 
420                           [('NM', 1), ('RG', 'L1'), 
421                            ('PG', 'P1'), ('XT', 'U')] )
422         self.assertEqual( self.reads[1].tags, 
423                           [('MF', 18), ('RG', 'L2'), 
424                            ('PG', 'P2'),('XT', 'R') ] )
425
426     def testOpt( self ):
427         self.assertEqual( self.reads[0].opt("XT"), "U" )
428         self.assertEqual( self.reads[1].opt("XT"), "R" )
429
430     def testMissingOpt( self ):
431         self.assertRaises( KeyError, self.reads[0].opt, "XP" )
432
433     def testEmptyOpt( self ):
434         self.assertRaises( KeyError, self.reads[2].opt, "XT" )
435
436     def tearDown(self):
437         self.samfile.close()
438
439 class TestAlignedReadFromSam(TestAlignedReadFromBam):
440
441     def setUp(self):
442         self.samfile=pysam.Samfile( "ex3.sam","r" )
443         self.reads=list(self.samfile.fetch())
444
445 # needs to be implemented 
446 # class TestAlignedReadFromSamWithoutHeader(TestAlignedReadFromBam):
447 #
448 #     def setUp(self):
449 #         self.samfile=pysam.Samfile( "ex7.sam","r" )
450 #         self.reads=list(self.samfile.fetch())
451
452 class TestHeaderSam(unittest.TestCase):
453
454     header = {'SQ': [{'LN': 1575, 'SN': 'chr1'}, 
455                      {'LN': 1584, 'SN': 'chr2'}], 
456               'RG': [{'LB': 'SC_1', 'ID': 'L1', 'SM': 'NA12891', 'PU': 'SC_1_10', "CN":"name:with:colon"}, 
457                      {'LB': 'SC_2', 'ID': 'L2', 'SM': 'NA12891', 'PU': 'SC_2_12', "CN":"name:with:colon"}],
458               'PG': [{'ID': 'P1', 'VN': '1.0'}, {'ID': 'P2', 'VN': '1.1'}], 
459               'HD': {'VN': '1.0'},
460               'CO' : [ 'this is a comment', 'this is another comment'],
461               }
462
463     def compareHeaders( self, a, b ):
464         '''compare two headers a and b.'''
465         for ak,av in a.iteritems():
466             self.assertTrue( ak in b, "key '%s' not in '%s' " % (ak,b) )
467             self.assertEqual( av, b[ak] )
468
469     def setUp(self):
470         self.samfile=pysam.Samfile( "ex3.sam","r" )
471
472     def testHeaders(self):
473         self.compareHeaders( self.header, self.samfile.header )
474         self.compareHeaders( self.samfile.header, self.header )
475         
476     def tearDown(self):
477         self.samfile.close()
478
479 class TestHeaderBam(TestHeaderSam):
480
481     def setUp(self):
482         self.samfile=pysam.Samfile( "ex3.bam","rb" )
483
484 class TestUnmappedReads(unittest.TestCase):
485
486     def testSAM(self):
487         samfile=pysam.Samfile( "ex5.sam","r" )
488         self.assertEqual( len(list(samfile.fetch( until_eof = True))), 2 ) 
489         samfile.close()
490
491     def testBAM(self):
492         samfile=pysam.Samfile( "ex5.bam","rb" )
493         self.assertEqual( len(list(samfile.fetch( until_eof = True))), 2 ) 
494         samfile.close()
495
496 class TestPileupObjects(unittest.TestCase):
497
498     def setUp(self):
499         self.samfile=pysam.Samfile( "ex1.bam","rb" )
500
501     def testPileupColumn(self):
502         for pcolumn1 in self.samfile.pileup( region="chr1:105" ):
503             if pcolumn1.pos == 104:
504                 self.assertEqual( pcolumn1.tid, 0, "chromosome/target id mismatch in position 1: %s != %s" % (pcolumn1.tid, 0) )
505                 self.assertEqual( pcolumn1.pos, 105-1, "position mismatch in position 1: %s != %s" % (pcolumn1.pos, 105-1) )
506                 self.assertEqual( pcolumn1.n, 2, "# reads mismatch in position 1: %s != %s" % (pcolumn1.n, 2) )
507         for pcolumn2 in self.samfile.pileup( region="chr2:1480" ):
508             if pcolumn2.pos == 1479:
509                 self.assertEqual( pcolumn2.tid, 1, "chromosome/target id mismatch in position 1: %s != %s" % (pcolumn2.tid, 1) )
510                 self.assertEqual( pcolumn2.pos, 1480-1, "position mismatch in position 1: %s != %s" % (pcolumn2.pos, 1480-1) )
511                 self.assertEqual( pcolumn2.n, 12, "# reads mismatch in position 1: %s != %s" % (pcolumn2.n, 12) )
512
513     def testPileupRead(self):
514         for pcolumn1 in self.samfile.pileup( region="chr1:105" ):
515             if pcolumn1.pos == 104:
516                 self.assertEqual( len(pcolumn1.pileups), 2, "# reads aligned to column mismatch in position 1: %s != %s" % (len(pcolumn1.pileups), 2) )
517 #                self.assertEqual( pcolumn1.pileups[0]  # need to test additional properties here
518
519     def tearDown(self):
520         self.samfile.close()
521         
522 class TestExceptions(unittest.TestCase):
523
524     def setUp(self):
525         self.samfile=pysam.Samfile( "ex1.bam","rb" )
526
527     def testMissingFile(self):
528
529         self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.bam", "rb" )
530         self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.sam", "r" )
531         self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.bam", "r" )
532         self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.sam", "rb" )
533
534     def testBadContig(self):
535         self.assertRaises( ValueError, self.samfile.fetch, "chr88" )
536
537     def testMeaninglessCrap(self):
538         self.assertRaises( ValueError, self.samfile.fetch, "skljf" )
539
540     def testBackwardsOrderNewFormat(self):
541         self.assertRaises( ValueError, self.samfile.fetch, 'chr1', 100, 10 )
542
543     def testBackwardsOrderOldFormat(self):
544         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:100-10")
545         
546     def testOutOfRangeNegativeNewFormat(self):
547         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, -10 )
548         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, 0 )
549         self.assertRaises( ValueError, self.samfile.fetch, "chr1", -5, -10 )
550
551     def testOutOfRangeNegativeOldFormat(self):
552         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-10" )
553         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-0" )
554         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5--10" )
555
556     def testOutOfRangNewFormat(self):
557         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999, 99999999999 )
558
559     def testOutOfRangeLargeNewFormat(self):
560         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
561
562     def testOutOfRangeLargeOldFormat(self):
563         self.assertRaises( ValueError, self.samfile.fetch, "chr1:99999999999999999-999999999999999999" )
564
565     def tearDown(self):
566         self.samfile.close()
567
568 class TestFastaFile(unittest.TestCase):
569
570     mSequences = { 'chr1' :
571                        "CACTAGTGGCTCATTGTAAATGTGTGGTTTAACTCGTCCATGGCCCAGCATTAGGGAGCTGTGGACCCTGCAGCCTGGCTGTGGGGGCCGCAGTGGCTGAGGGGTGCAGAGCCGAGTCACGGGGTTGCCAGCACAGGGGCTTAACCTCTGGTGACTGCCAGAGCTGCTGGCAAGCTAGAGTCCCATTTGGAGCCCCTCTAAGCCGTTCTATTTGTAATGAAAACTATATTTATGCTATTCAGTTCTAAATATAGAAATTGAAACAGCTGTGTTTAGTGCCTTTGTTCAACCCCCTTGCAACAACCTTGAGAACCCCAGGGAATTTGTCAATGTCAGGGAAGGAGCATTTTGTCAGTTACCAAATGTGTTTATTACCAGAGGGATGGAGGGAAGAGGGACGCTGAAGAACTTTGATGCCCTCTTCTTCCAAAGATGAAACGCGTAACTGCGCTCTCATTCACTCCAGCTCCCTGTCACCCAATGGACCTGTGATATCTGGATTCTGGGAAATTCTTCATCCTGGACCCTGAGAGATTCTGCAGCCCAGCTCCAGATTGCTTGTGGTCTGACAGGCTGCAACTGTGAGCCATCACAATGAACAACAGGAAGAAAAGGTCTTTCAAAAGGTGATGTGTGTTCTCATCAACCTCATACACACACATGGTTTAGGGGTATAATACCTCTACATGGCTGATTATGAAAACAATGTTCCCCAGATACCATCCCTGTCTTACTTCCAGCTCCCCAGAGGGAAAGCTTTCAACGCTTCTAGCCATTTCTTTTGGCATTTGCCTTCAGACCCTACACGAATGCGTCTCTACCACAGGGGGCTGCGCGGTTTCCCATCATGAAGCACTGAACTTCCACGTCTCATCTAGGGGAACAGGGAGGTGCACTAATGCGCTCCACGCCCAAGCCCTTCTCACAGTTTCTGCCCCCAGCATGGTTGTACTGGGCAATACATGAGATTATTAGGAAATGCTTTACTGTCATAACTATGAAGAGACTATTGCCAGATGAACCACACATTAATACTATGTTTCTTATCTGCACATTACTACCCTGCAATTAATATAATTGTGTCCATGTACACACGCTGTCCTATGTACTTATCATGACTCTATCCCAAATTCCCAATTACGTCCTATCTTCTTCTTAGGGAAGAACAGCTTAGGTATCAATTTGGTGTTCTGTGTAAAGTCTCAGGGAGCCGTCCGTGTCCTCCCATCTGGCCTCGTCCACACTGGTTCTCTTGAAAGCTTGGGCTGTAATGATGCCCCTTGGCCATCACCCAGTCCCTGCCCCATCTCTTGTAATCTCTCTCCTTTTTGCTGCATCCCTGTCTTCCTCTGTCTTGATTTACTTGTTGTTGGTTTTCTGTTTCTTTGTTTGATTTGGTGGAAGACATAATCCCACGCTTCCTATGGAAAGGTTGTTGGGAGATTTTTAATGATTCCTCAATGTTAAAATGTCTATTTTTGTCTTGACACCCAACTAATATTTGTCTGAGCAAAACAGTCTAGATGAGAGAGAACTTCCCTGGAGGTCTGATGGCGTTTCTCCCTCGTCTTCTTA",
572                    'chr2' :
573                        "TTCAAATGAACTTCTGTAATTGAAAAATTCATTTAAGAAATTACAAAATATAGTTGAAAGCTCTAACAATAGACTAAACCAAGCAGAAGAAAGAGGTTCAGAACTTGAAGACAAGTCTCTTATGAATTAACCCAGTCAGACAAAAATAAAGAAAAAAATTTTAAAAATGAACAGAGCTTTCAAGAAGTATGAGATTATGTAAAGTAACTGAACCTATGAGTCACAGGTATTCCTGAGGAAAAAGAAAAAGTGAGAAGTTTGGAAAAACTATTTGAGGAAGTAATTGGGGAAAACCTCTTTAGTCTTGCTAGAGATTTAGACATCTAAATGAAAGAGGCTCAAAGAATGCCAGGAAGATACATTGCAAGACAGACTTCATCAAGATATGTAGTCATCAGACTATCTAAAGTCAACATGAAGGAAAAAAATTCTAAAATCAGCAAGAGAAAAGCATACAGTCATCTATAAAGGAAATCCCATCAGAATAACAATGGGCTTCTCAGCAGAAACCTTACAAGCCAGAAGAGATTGGATCTAATTTTTGGACTTCTTAAAGAAAAAAAAACCTGTCAAACACGAATGTTATGCCCTGCTAAACTAAGCATCATAAATGAAGGGGAAATAAAGTCAAGTCTTTCCTGACAAGCAAATGCTAAGATAATTCATCATCACTAAACCAGTCCTATAAGAAATGCTCAAAAGAATTGTAAAAGTCAAAATTAAAGTTCAATACTCACCATCATAAATACACACAAAAGTACAAAACTCACAGGTTTTATAAAACAATTGAGACTACAGAGCAACTAGGTAAAAAATTAACATTACAACAGGAACAAAACCTCATATATCAATATTAACTTTGAATAAAAAGGGATTAAATTCCCCCACTTAAGAGATATAGATTGGCAGAACAGATTTAAAAACATGAACTAACTATATGCTGTTTACAAGAAACTCATTAATAAAGACATGAGTTCAGGTAAAGGGGTGGAAAAAGATGTTCTACGCAAACAGAAACCAAATGAGAGAAGGAGTAGCTATACTTATATCAGATAAAGCACACTTTAAATCAACAACAGTAAAATAAAACAAAGGAGGTCATCATACAATGATAAAAAGATCAATTCAGCAAGAAGATATAACCATCCTACTAAATACATATGCACCTAACACAAGACTACCCAGATTCATAAAACAAATACTACTAGACCTAAGAGGGATGAGAAATTACCTAATTGGTACAATGTACAATATTCTGATGATGGTTACACTAAAAGCCCATACTTTACTGCTACTCAATATATCCATGTAACAAATCTGCGCTTGTACTTCTAAATCTATAAAAAAATTAAAATTTAACAAAAGTAAATAAAACACATAGCTAAAACTAAAAAAGCAAAAACAAAAACTATGCTAAGTATTGGTAAAGATGTGGGGAAAAAAGTAAACTCTCAAATATTGCTAGTGGGAGTATAAATTGTTTTCCACTTTGGAAAACAATTTGGTAATTTCGTTTTTTTTTTTTTCTTTTCTCTTTTTTTTTTTTTTTTTTTTGCATGCCAGAAAAAAATATTTACAGTAACT",
574                    }
575
576     def setUp(self):
577         self.file=pysam.Fastafile( "ex1.fa" )
578
579     def testFetch(self):
580         for id, seq in self.mSequences.items():
581             self.assertEqual( seq, self.file.fetch( id ) )
582             for x in range( 0, len(seq), 10):
583                 self.assertEqual( seq[x:x+10], self.file.fetch( id, x, x+10) )
584
585     def testFetchErrors( self ):
586         self.assertRaises( ValueError, self.file.fetch )
587         self.assertRaises( ValueError, self.file.fetch, "chr1", 0 )
588         self.assertRaises( ValueError, self.file.fetch, "chr1", -1, 10 )
589         self.assertRaises( ValueError, self.file.fetch, "chr1", 20, 10 )
590         # the following segfaults:
591         # self.assertRaises( IndexError, self.file.fetch, "chr12", )
592         pass
593
594     def tearDown(self):
595         self.file.close()
596
597
598 class TestAlignedRead(unittest.TestCase):
599     '''tests to check if aligned read can be constructed
600     and manipulated.
601     '''
602
603     def checkFieldEqual( self, read1, read2, exclude = []):
604         '''check if two reads are equal by comparing each field.'''
605
606         for x in ("qname", "seq", "flag",
607                   "rname", "pos", "mapq", "cigar",
608                   "mrnm", "mpos", "isize", "qual",
609                   "is_paired", "is_proper_pair",
610                   "is_unmapped", "mate_is_unmapped",
611                   "is_reverse", "mate_is_reverse",
612                   "is_read1", "is_read2",
613                   "is_secondary", "is_qcfail",
614                   "is_duplicate", "bin"):
615             if x in exclude: continue
616             self.assertEqual( getattr(read1, x), getattr(read2,x), "attribute mismatch for %s: %s != %s" % 
617                               (x, getattr(read1, x), getattr(read2,x)))
618     
619     def testEmpty( self ):
620         a = pysam.AlignedRead()
621         self.assertEqual( a.qname, None )
622         self.assertEqual( a.seq, None )
623         self.assertEqual( a.qual, None )
624         self.assertEqual( a.flag, 0 )
625         self.assertEqual( a.rname, 0 )
626         self.assertEqual( a.mapq, 0 )
627         self.assertEqual( a.cigar, None )
628         self.assertEqual( a.tags, None )
629         self.assertEqual( a.mrnm, 0 )
630         self.assertEqual( a.mpos, 0 )
631         self.assertEqual( a.isize, 0 )
632
633     def buildRead( self ):
634         '''build an example read.'''
635         
636         a = pysam.AlignedRead()
637         a.qname = "read_12345"
638         a.seq="ACGT" * 3
639         a.flag = 0
640         a.rname = 0
641         a.pos = 33
642         a.mapq = 20
643         a.cigar = ( (0,10), (2,1), (0,25) )
644         a.mrnm = 0
645         a.mpos=200
646         a.isize=167
647         a.qual="1234" * 3
648
649         return a
650
651     def testUpdate( self ):
652         '''check if updating fields affects other variable length data
653         '''
654         a = self.buildRead()
655         b = self.buildRead()
656
657         # check qname
658         b.qname = "read_123"
659         self.checkFieldEqual( a, b, "qname" )
660         b.qname = "read_12345678"
661         self.checkFieldEqual( a, b, "qname" )
662         b.qname = "read_12345"
663         self.checkFieldEqual( a, b)
664
665         # check cigar
666         b.cigar = ( (0,10), )
667         self.checkFieldEqual( a, b, "cigar" )
668         b.cigar = ( (0,10), (2,1), (0,25), (2,1), (0,25) )
669         self.checkFieldEqual( a, b, "cigar" )
670         b.cigar = ( (0,10), (2,1), (0,25) )
671         self.checkFieldEqual( a, b)
672
673         # check seq 
674         b.seq = "ACGT"
675         self.checkFieldEqual( a, b, ("seq", "qual") )
676         b.seq = "ACGT" * 10
677         self.checkFieldEqual( a, b, ("seq", "qual") )
678         b.seq = "ACGT" * 3
679         self.checkFieldEqual( a, b, ("qual",))
680
681         # reset qual
682         b = self.buildRead()
683
684         # check flags:
685         for x in (
686             "is_paired", "is_proper_pair",
687             "is_unmapped", "mate_is_unmapped",
688             "is_reverse", "mate_is_reverse",
689             "is_read1", "is_read2",
690             "is_secondary", "is_qcfail",
691             "is_duplicate"):
692             setattr( b, x, True )
693             self.assertEqual( getattr(b, x), True )
694             self.checkFieldEqual( a, b, ("flag", x,) )
695             setattr( b, x, False )
696             self.assertEqual( getattr(b, x), False )
697             self.checkFieldEqual( a, b )
698
699     def testLargeRead( self ):
700         '''build an example read.'''
701         
702         a = pysam.AlignedRead()
703         a.qname = "read_12345"
704         a.seq="ACGT" * 200
705         a.flag = 0
706         a.rname = 0
707         a.pos = 33
708         a.mapq = 20
709         a.cigar = ( (0,10), (2,1), (0,25) )
710         a.mrnm = 0
711         a.mpos=200
712         a.isize=167
713         a.qual="1234" * 200
714
715         return a
716
717 class TestDeNovoConstruction(unittest.TestCase):
718     '''check BAM/SAM file construction using ex3.sam
719     
720     (note these are +1 coordinates):
721     
722     read_28833_29006_6945       99      chr1    33      20      10M1D25M        =       200     167     AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG     <<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<     NM:i:1  RG:Z:L1
723     read_28701_28881_323b       147     chr2    88      30      35M     =       500     412     ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA     <<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<     MF:i:18 RG:Z:L2
724     '''
725
726     header = { 'HD': {'VN': '1.0'},
727                'SQ': [{'LN': 1575, 'SN': 'chr1'}, 
728                       {'LN': 1584, 'SN': 'chr2'}], }
729
730     bamfile = "ex6.bam"
731     samfile = "ex6.sam"
732
733     def checkFieldEqual( self, read1, read2, exclude = []):
734         '''check if two reads are equal by comparing each field.'''
735
736         for x in ("qname", "seq", "flag",
737                   "rname", "pos", "mapq", "cigar",
738                   "mrnm", "mpos", "isize", "qual",
739                   "bin",
740                   "is_paired", "is_proper_pair",
741                   "is_unmapped", "mate_is_unmapped",
742                   "is_reverse", "mate_is_reverse",
743                   "is_read1", "is_read2",
744                   "is_secondary", "is_qcfail",
745                   "is_duplicate"):
746             if x in exclude: continue
747             self.assertEqual( getattr(read1, x), getattr(read2,x), "attribute mismatch for %s: %s != %s" % 
748                               (x, getattr(read1, x), getattr(read2,x)))
749
750     def setUp( self ):
751
752         
753         a = pysam.AlignedRead()
754         a.qname = "read_28833_29006_6945"
755         a.seq="AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG"
756         a.flag = 99
757         a.rname = 0
758         a.pos = 32
759         a.mapq = 20
760         a.cigar = ( (0,10), (2,1), (0,25) )
761         a.mrnm = 0
762         a.mpos=199
763         a.isize=167
764         a.qual="<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<"
765         a.tags = ( ("NM", 1),
766                    ("RG", "L1") )
767
768         b = pysam.AlignedRead()
769         b.qname = "read_28701_28881_323b"
770         b.seq="ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA"
771         b.flag = 147
772         b.rname = 1
773         b.pos = 87
774         b.mapq = 30
775         b.cigar = ( (0,35), )
776         b.mrnm = 1
777         b.mpos=499
778         b.isize=412
779         b.qual="<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<"
780         b.tags = ( ("MF", 18),
781                    ("RG", "L2") )
782
783         self.reads = (a,b)
784
785     def testSAMWholeFile( self ):
786         
787         tmpfilename = "tmp_%i.sam" % id(self)
788
789         outfile = pysam.Samfile( tmpfilename, "wh", header = self.header )
790
791         for x in self.reads: outfile.write( x )
792         outfile.close()
793         
794         self.assertTrue( checkBinaryEqual( tmpfilename, self.samfile ),
795                          "mismatch when construction SAM file, see %s %s" % (tmpfilename, self.samfile))
796         
797         os.unlink( tmpfilename )
798
799     def testBAMPerRead( self ):
800         '''check if individual reads are binary equal.'''
801         infile = pysam.Samfile( self.bamfile, "rb")
802
803         others = list(infile)
804         for denovo, other in zip( others, self.reads):
805             self.checkFieldEqual( other, denovo )
806             self.assertEqual( other, denovo)
807
808     def testSAMPerRead( self ):
809         '''check if individual reads are binary equal.'''
810         infile = pysam.Samfile( self.samfile, "r")
811
812         others = list(infile)
813         for denovo, other in zip( others, self.reads):
814             self.checkFieldEqual( other, denovo )
815             self.assertEqual( other, denovo)
816             
817     def testBAMWholeFile( self ):
818         
819         tmpfilename = "tmp_%i.bam" % id(self)
820
821         outfile = pysam.Samfile( tmpfilename, "wb", header = self.header )
822
823         for x in self.reads: outfile.write( x )
824         outfile.close()
825         
826         self.assertTrue( checkBinaryEqual( tmpfilename, self.bamfile ),
827                          "mismatch when construction BAM file, see %s %s" % (tmpfilename, self.bamfile))
828         
829         os.unlink( tmpfilename )
830
831
832 # TODOS
833 # 1. finish testing all properties within pileup objects
834 # 2. check exceptions and bad input problems (missing files, optional fields that aren't present, etc...)
835
836 if __name__ == "__main__":
837     # build data files
838     print "building data files"
839     subprocess.call( "make", shell=True)
840     print "starting tests"
841     unittest.main()