Imported Upstream version 0.7
[pysam.git] / tests / pysam_test.py
1 #!/usr/bin/env python
2 '''unit testing code for pysam.
3
4 Execute in the :file:`tests` directory as it requires the Makefile
5 and data files located there.
6 '''
7
8 import pysam
9 import unittest
10 import os, re, sys
11 import itertools
12 import collections
13 import subprocess
14 import shutil
15 import logging
16
17 IS_PYTHON3 = sys.version_info[0] >= 3
18
19 if IS_PYTHON3:
20     from itertools import zip_longest
21 else:
22     from itertools import izip as zip_longest
23
24
25 SAMTOOLS="samtools"
26 WORKDIR="pysam_test_work"
27
28 def checkBinaryEqual( filename1, filename2 ):
29     '''return true if the two files are binary equal.'''
30     if os.path.getsize( filename1 ) !=  os.path.getsize( filename2 ):
31         return False
32
33     infile1 = open(filename1, "rb")
34     infile2 = open(filename2, "rb")
35
36     def chariter( infile ):
37         while 1:
38             c = infile.read(1)
39             if c == b"": break
40             yield c
41
42     found = False
43     for c1,c2 in zip_longest( chariter( infile1), chariter( infile2) ):
44         if c1 != c2: break
45     else:
46         found = True
47
48     infile1.close()
49     infile2.close()
50     return found
51
52 def runSamtools( cmd ):
53     '''run a samtools command'''
54
55     try:
56         retcode = subprocess.call(cmd, shell=True)
57         if retcode < 0:
58             print("Child was terminated by signal", -retcode)
59     except OSError as e:
60         print("Execution failed:", e)
61
62 def getSamtoolsVersion():
63     '''return samtools version'''
64
65     with subprocess.Popen(SAMTOOLS, shell=True, stderr=subprocess.PIPE).stderr as pipe:
66         lines = b"".join(pipe.readlines())
67
68     if IS_PYTHON3:
69         lines = lines.decode('ascii')
70     return re.search( "Version:\s+(\S+)", lines).groups()[0]
71
72 class BinaryTest(unittest.TestCase):
73     '''test samtools command line commands and compare
74     against pysam commands.
75
76     Tests fail, if the output is not binary identical.
77     '''
78
79     first_time = True
80
81     # a list of commands to test
82     commands = \
83         { 
84           "view" :
85               (
86                 ("ex1.view", "view ex1.bam > ex1.view"),
87                 ("pysam_ex1.view", (pysam.view, "ex1.bam" ) ),
88                 ),
89           "view2" :
90               (
91                 ("ex1.view", "view -bT ex1.fa -o ex1.view2 ex1.sam"),
92                 # note that -o ex1.view2 throws exception.
93                 ("pysam_ex1.view", (pysam.view, "-bT ex1.fa -oex1.view2 ex1.sam" ) ),
94                 ),
95           "sort" :
96               (
97                 ( "ex1.sort.bam", "sort ex1.bam ex1.sort" ),
98                 ( "pysam_ex1.sort.bam", (pysam.sort, "ex1.bam pysam_ex1.sort" ) ),
99                 ),
100           "mpileup" :
101               (
102                 ("ex1.pileup", "mpileup ex1.bam > ex1.pileup" ),
103                 ("pysam_ex1.mpileup", (pysam.mpileup, "ex1.bam" ) ),
104                 ),
105           "depth" :
106               (
107                 ("ex1.depth", "depth ex1.bam > ex1.depth" ),
108                 ("pysam_ex1.depth", (pysam.depth, "ex1.bam" ) ),
109                 ),
110           "faidx" : 
111               ( 
112                 ("ex1.fa.fai", "faidx ex1.fa"), 
113                 ("pysam_ex1.fa.fai", (pysam.faidx, "ex1.fa") ),
114                 ),
115           "index":
116               (
117                 ("ex1.bam.bai", "index ex1.bam" ),
118                 ("pysam_ex1.bam.bai", (pysam.index, "pysam_ex1.bam" ) ),
119                 ),
120           "idxstats" :
121               ( 
122                 ("ex1.idxstats", "idxstats ex1.bam > ex1.idxstats" ),
123                 ("pysam_ex1.idxstats", (pysam.idxstats, "pysam_ex1.bam" ) ),
124                 ),
125           "fixmate" :
126               (
127                 ("ex1.fixmate", "fixmate ex1.bam ex1.fixmate" ),
128                 ("pysam_ex1.fixmate", (pysam.fixmate, "pysam_ex1.bam pysam_ex1.fixmate") ),
129                 ),
130           "flagstat" :
131               (
132                 ("ex1.flagstat", "flagstat ex1.bam > ex1.flagstat" ),
133                 ("pysam_ex1.flagstat", (pysam.flagstat, "pysam_ex1.bam") ),
134                 ),
135           "calmd" :
136               (
137                 ("ex1.calmd", "calmd ex1.bam ex1.fa > ex1.calmd" ),
138                 ("pysam_ex1.calmd", (pysam.calmd, "pysam_ex1.bam ex1.fa") ),
139                 ),
140           "merge" :
141               (
142                 ("ex1.merge", "merge -f ex1.merge ex1.bam ex1.bam" ),
143                 # -f option does not work - following command will cause the subsequent
144                 # command to fail
145                 ("pysam_ex1.merge", (pysam.merge, "pysam_ex1.merge pysam_ex1.bam pysam_ex1.bam") ),
146                 ),
147           "rmdup" :
148               (
149                 ("ex1.rmdup", "rmdup ex1.bam ex1.rmdup" ),
150                 ("pysam_ex1.rmdup", (pysam.rmdup, "pysam_ex1.bam pysam_ex1.rmdup" )),
151                 ),
152           "reheader" :
153               (
154                 ( "ex1.reheader", "reheader ex1.bam ex1.bam > ex1.reheader"),
155                 ( "pysam_ex1.reheader", (pysam.reheader, "ex1.bam ex1.bam" ) ),
156                 ),
157           "cat":
158               (
159                 ( "ex1.cat", "cat ex1.bam ex1.bam > ex1.cat"),
160                 ( "pysam_ex1.cat", (pysam.cat, "ex1.bam ex1.bam" ) ),
161                 ),
162           "targetcut":
163               (
164                 ("ex1.targetcut", "targetcut ex1.bam > ex1.targetcut" ),
165                 ("pysam_ex1.targetcut", (pysam.targetcut, "pysam_ex1.bam") ),
166                 ),
167           "phase":
168               (
169                 ("ex1.phase", "phase ex1.bam > ex1.phase" ),
170                 ("pysam_ex1.phase", (pysam.phase, "pysam_ex1.bam") ),
171                 ),
172           "import" :
173               (
174                 ("ex1.bam", "import ex1.fa.fai ex1.sam.gz ex1.bam" ),
175                 ("pysam_ex1.bam", (pysam.samimport, "ex1.fa.fai ex1.sam.gz pysam_ex1.bam") ),
176                 ),
177           "bam2fq":
178               (
179                 ("ex1.bam2fq", "bam2fq ex1.bam > ex1.bam2fq" ),
180                 ("pysam_ex1.bam2fq", (pysam.bam2fq, "pysam_ex1.bam") ),
181                 ),
182         }
183
184     # some tests depend on others. The order specifies in which order
185     # the samtools commands are executed.
186     # The first three (faidx, import, index) need to be in that order, 
187     # the rest is arbitrary.
188     order = ('faidx', 'import', 'index', 
189               # 'pileup1', 'pileup2', deprecated
190               # 'glfview', deprecated
191               'view', 'view2',
192               'sort',
193               'mpileup',
194               'depth',
195               'idxstats',
196               'fixmate',
197               'flagstat',
198               # 'calmd',
199               'merge',
200               'rmdup',
201               'reheader',
202               'cat',
203               'targetcut',
204               'phase',
205               'bam2fq',
206               )
207
208     def setUp( self ):
209         '''setup tests. 
210
211         For setup, all commands will be run before the first test is
212         executed. Individual tests will then just compare the output
213         files.
214         '''
215         if BinaryTest.first_time:
216
217             # remove previous files
218             if os.path.exists( WORKDIR ):
219                 shutil.rmtree( WORKDIR )
220                 pass
221
222             # copy the source files to WORKDIR
223             os.makedirs( WORKDIR )
224
225             shutil.copy( "ex1.fa", os.path.join( WORKDIR, "pysam_ex1.fa" ) )
226             shutil.copy( "ex1.fa", os.path.join( WORKDIR, "ex1.fa" ) )
227             shutil.copy( "ex1.sam.gz", os.path.join( WORKDIR, "ex1.sam.gz" ) )
228             shutil.copy( "ex1.sam", os.path.join( WORKDIR, "ex1.sam" ) )
229
230             # cd to workdir
231             savedir = os.getcwd()
232             os.chdir( WORKDIR )
233             
234             for label in self.order:
235                 command = self.commands[label]
236                 # build samtools command and target and run
237                 samtools_target, samtools_command = command[0]
238                 runSamtools( " ".join( (SAMTOOLS, samtools_command )))
239
240                 # get pysam command and run
241                 try:
242                     pysam_target, pysam_command = command[1]
243                 except ValueError as msg:
244                     raise ValueError( "error while setting up %s=%s: %s" %\
245                                           (label, command, msg) )
246
247                 pysam_method, pysam_options = pysam_command
248                 try:
249                     output = pysam_method( *pysam_options.split(" "), raw=True)
250                 except pysam.SamtoolsError as msg:
251                     raise pysam.SamtoolsError( "error while executing %s: options=%s: msg=%s" %\
252                                                    (label, pysam_options, msg) )
253
254                 
255
256                 if ">" in samtools_command:
257                     with open( pysam_target, "wb" ) as outfile:
258                         if type(output) == list:
259                             if IS_PYTHON3:
260                                 for line in output: 
261                                     outfile.write( line.encode('ascii') )
262                             else:
263                                 for line in output: outfile.write( line )
264                         else:
265                             outfile.write(output)
266
267             os.chdir( savedir )
268             BinaryTest.first_time = False
269
270         samtools_version = getSamtoolsVersion()
271
272         
273         def _r( s ):
274             # patch - remove any of the alpha/beta suffixes, i.e., 0.1.12a -> 0.1.12
275             if s.count('-') > 0: s = s[0:s.find('-')]
276             return re.sub( "[^0-9.]", "", s )
277
278         if _r(samtools_version) != _r( pysam.__samtools_version__):
279             raise ValueError("versions of pysam/samtools and samtools differ: %s != %s" % \
280                                  (pysam.__samtools_version__,
281                                   samtools_version ))
282
283     def checkCommand( self, command ):
284         if command:
285             samtools_target, pysam_target = self.commands[command][0][0], self.commands[command][1][0]
286             samtools_target = os.path.join( WORKDIR, samtools_target )
287             pysam_target = os.path.join( WORKDIR, pysam_target )
288             self.assertTrue( checkBinaryEqual( samtools_target, pysam_target ), 
289                              "%s failed: files %s and %s are not the same" % (command, samtools_target, pysam_target) )
290             
291     def testImport( self ):
292         self.checkCommand( "import" )
293
294     def testIndex( self ):
295         self.checkCommand( "index" )
296
297     def testSort( self ):
298         self.checkCommand( "sort" )
299
300     def testMpileup( self ):
301         self.checkCommand( "mpileup" )
302
303     def testDepth( self ):
304         self.checkCommand( "depth" )
305
306     def testIdxstats( self ):
307         self.checkCommand( "idxstats" )
308
309     def testFixmate( self ):
310         self.checkCommand( "fixmate" )
311
312     def testFlagstat( self ):
313         self.checkCommand( "flagstat" )
314         
315     def testMerge( self ):
316         self.checkCommand( "merge" )
317
318     def testRmdup( self ):
319         self.checkCommand( "rmdup" )
320
321     def testReheader( self ):
322         self.checkCommand( "reheader" )
323
324     def testCat( self ):
325         self.checkCommand( "cat" )
326
327     def testTargetcut( self ):
328         self.checkCommand( "targetcut" )
329
330     def testPhase( self ):
331         self.checkCommand( "phase" )
332
333     def testBam2fq( self ):
334         self.checkCommand( "bam2fq" )
335
336     # def testPileup1( self ):
337     #     self.checkCommand( "pileup1" )
338     
339     # def testPileup2( self ):
340     #     self.checkCommand( "pileup2" )
341
342     # deprecated
343     # def testGLFView( self ):
344     #     self.checkCommand( "glfview" )
345
346     def testView( self ):
347         self.checkCommand( "view" )
348
349     def testEmptyIndex( self ):
350         self.assertRaises( IOError, pysam.index, "exdoesntexist.bam" )
351
352     def __del__(self):
353         if os.path.exists( WORKDIR ):
354             pass
355         # shutil.rmtree( WORKDIR )
356
357 class IOTest(unittest.TestCase):
358     '''check if reading samfile and writing a samfile are consistent.'''
359
360     def checkEcho( self, input_filename, 
361                    reference_filename, 
362                    output_filename, 
363                    input_mode, output_mode, use_template = True ):
364         '''iterate through *input_filename* writing to *output_filename* and
365         comparing the output to *reference_filename*. 
366         
367         The files are opened according to the *input_mode* and *output_mode*.
368
369         If *use_template* is set, the header is copied from infile using the
370         template mechanism, otherwise target names and lengths are passed 
371         explicitely. 
372
373         '''
374
375         infile = pysam.Samfile( input_filename, input_mode )
376         if use_template:
377             outfile = pysam.Samfile( output_filename, output_mode, template = infile )
378         else:
379             outfile = pysam.Samfile( output_filename, output_mode, 
380                                      referencenames = infile.references,
381                                      referencelengths = infile.lengths,
382                                      add_sq_text = False )
383             
384         iter = infile.fetch()
385
386         for x in iter: outfile.write( x )
387         infile.close()
388         outfile.close()
389
390         self.assertTrue( checkBinaryEqual( reference_filename, output_filename), 
391                          "files %s and %s are not the same" % (reference_filename, output_filename) )
392
393
394     def testReadWriteBam( self ):
395         
396         input_filename = "ex1.bam"
397         output_filename = "pysam_ex1.bam"
398         reference_filename = "ex1.bam"
399
400         self.checkEcho( input_filename, reference_filename, output_filename,
401                         "rb", "wb" )
402
403     def testReadWriteBamWithTargetNames( self ):
404         
405         input_filename = "ex1.bam"
406         output_filename = "pysam_ex1.bam"
407         reference_filename = "ex1.bam"
408
409         self.checkEcho( input_filename, reference_filename, output_filename,
410                         "rb", "wb", use_template = False )
411
412     def testReadWriteSamWithHeader( self ):
413         
414         input_filename = "ex2.sam"
415         output_filename = "pysam_ex2.sam"
416         reference_filename = "ex2.sam"
417
418         self.checkEcho( input_filename, reference_filename, output_filename,
419                         "r", "wh" )
420
421     def testReadWriteSamWithoutHeader( self ):
422         
423         input_filename = "ex2.sam"
424         output_filename = "pysam_ex2.sam"
425         reference_filename = "ex1.sam"
426
427         self.checkEcho( input_filename, reference_filename, output_filename,
428                         "r", "w" )
429
430     def testReadSamWithoutTargetNames( self ):
431         '''see issue 104.'''
432         input_filename = "example_unmapped_reads_no_sq.sam"
433
434         # raise exception in default mode
435         self.assertRaises( ValueError, pysam.Samfile, input_filename, "r" )
436
437         # raise exception if no SQ files
438         self.assertRaises( ValueError, pysam.Samfile, input_filename, "r",
439                            check_header = True)
440
441         infile = pysam.Samfile( input_filename, check_header = False, check_sq = False )
442         result = list(infile.fetch())
443
444     def testReadBamWithoutTargetNames( self ):
445         '''see issue 104.'''
446         input_filename = "example_unmapped_reads_no_sq.bam"
447
448         # raise exception in default mode
449         self.assertRaises( ValueError, pysam.Samfile, input_filename, "r" )
450
451         # raise exception if no SQ files
452         self.assertRaises( ValueError, pysam.Samfile, input_filename, "r",
453                            check_header = True)
454
455
456         infile = pysam.Samfile( input_filename, check_header = False, check_sq = False )
457         result = list(infile.fetch( until_eof = True))
458
459     def testReadSamWithoutHeader( self ):
460         input_filename = "ex1.sam"
461         output_filename = "pysam_ex1.sam"
462         reference_filename = "ex1.sam"
463
464         # reading from a samfile without header is not implemented.
465         self.assertRaises( ValueError, pysam.Samfile, input_filename, "r" )
466
467         self.assertRaises( ValueError, pysam.Samfile, input_filename, "r",
468                            check_header = False )
469
470     def testReadUnformattedFile( self ):
471         '''test reading from a file that is not bam/sam formatted'''
472         input_filename = "example.vcf40"
473
474         # bam - file raise error
475         self.assertRaises( ValueError, pysam.Samfile, input_filename, "rb" )
476
477         # sam - file error, but can't fetch
478         self.assertRaises( ValueError, pysam.Samfile, input_filename, "r" )
479         
480         self.assertRaises( ValueError, pysam.Samfile, input_filename, "r", 
481                            check_header = False)
482
483     def testFetchFromClosedFile( self ):
484
485         samfile = pysam.Samfile( "ex1.bam", "rb" )
486         samfile.close()
487         self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
488
489     def testClosedFile( self ):
490         '''test that access to a closed samfile raises ValueError.'''
491
492         samfile = pysam.Samfile( "ex1.bam", "rb" )
493         samfile.close()
494         self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
495         self.assertRaises( ValueError, samfile.pileup, 'chr1', 100, 120)
496         self.assertRaises( ValueError, samfile.getrname, 0 )
497         self.assertRaises( ValueError, samfile.tell )
498         self.assertRaises( ValueError, samfile.seek, 0 )
499         self.assertRaises( ValueError, getattr, samfile, "nreferences" )
500         self.assertRaises( ValueError, getattr, samfile, "references" )
501         self.assertRaises( ValueError, getattr, samfile, "lengths" )
502         self.assertRaises( ValueError, getattr, samfile, "text" )
503         self.assertRaises( ValueError, getattr, samfile, "header" )
504
505         # write on closed file 
506         self.assertEqual( 0, samfile.write(None) )
507
508     def testAutoDetection( self ):
509         '''test if autodetection works.'''
510
511         samfile = pysam.Samfile( "ex3.sam" )
512         self.assertRaises( ValueError, samfile.fetch, 'chr1' )
513         samfile.close()
514
515         samfile = pysam.Samfile( "ex3.bam" )
516         samfile.fetch('chr1')
517         samfile.close()
518
519     def testReadingFromSamFileWithoutHeader( self ):
520         '''read from samfile without header.
521         '''
522         samfile = pysam.Samfile( "ex7.sam", check_header = False, check_sq = False )
523         self.assertRaises( NotImplementedError, samfile.__iter__ )
524
525     def testReadingFromFileWithoutIndex( self ):
526         '''read from bam file without index.'''
527
528         assert not os.path.exists( "ex2.bam.bai" )
529         samfile = pysam.Samfile( "ex2.bam", "rb" )
530         self.assertRaises( ValueError, samfile.fetch )
531         self.assertEqual( len(list( samfile.fetch(until_eof = True) )), 3270 )
532
533     def testReadingUniversalFileMode( self ):
534         '''read from samfile without header.
535         '''
536
537         input_filename = "ex2.sam"
538         output_filename = "pysam_ex2.sam"
539         reference_filename = "ex1.sam"
540
541         self.checkEcho( input_filename, reference_filename, output_filename,
542                         "rU", "w" )
543
544 class TestFloatTagBug( unittest.TestCase ):
545     '''see issue 71'''
546
547     def testFloatTagBug( self ): 
548         '''a float tag before another exposed a parsing bug in bam_aux_get - expected to fail
549
550         This test is expected to fail until samtools is fixed.
551         '''
552         samfile = pysam.Samfile("tag_bug.bam")
553         read = next(samfile.fetch(until_eof=True))
554         self.assertTrue( ('XC',1) in read.tags )
555         self.assertEqual(read.opt('XC'), 1)
556
557 class TestLargeFieldBug( unittest.TestCase ):
558     '''see issue 100'''
559
560     def testLargeFileBug( self ): 
561         '''when creating a read with a large entry in the tag field
562         causes an errror:
563             NotImplementedError: tags field too large
564         '''
565         samfile = pysam.Samfile("issue100.bam")
566         read = next(samfile.fetch(until_eof=True))
567         new_read = pysam.AlignedRead()
568         new_read.tags = read.tags
569         self.assertEqual( new_read.tags, read.tags )
570
571 class TestTagParsing( unittest.TestCase ):
572     '''tests checking the accuracy of tag setting and retrieval.'''
573
574     def makeRead( self ):
575         a = pysam.AlignedRead()
576         a.qname = "read_12345"
577         a.tid = 0
578         a.seq="ACGT" * 3
579         a.flag = 0
580         a.rname = 0
581         a.pos = 1
582         a.mapq = 20
583         a.cigar = ( (0,10), (2,1), (0,25) )
584         a.mrnm = 0
585         a.mpos=200
586         a.isize = 0
587         a.qual ="1234" * 3
588         # todo: create tags
589         return a
590
591     def testNegativeIntegers( self ):
592         x = -2
593         aligned_read = self.makeRead()
594         aligned_read.tags = [("XD", int(x) ) ]
595         # print (aligned_read.tags)
596
597     def testNegativeIntegers2( self ):
598         x = -2
599         r = self.makeRead()
600         r.tags = [("XD", int(x) ) ]
601         outfile = pysam.Samfile( "test.bam",
602                                  "wb",
603                                  referencenames = ("chr1",),
604                                  referencelengths = (1000,) )
605         outfile.write (r )
606         outfile.close()
607
608     def testCigarString( self ):
609         r = self.makeRead()
610         self.assertEqual( r.cigarstring, "M10D1M25" )
611         r.cigarstring = "M20D10M20"
612         self.assertEqual( r.cigar, [(0,20), (2,10), (0,20)])
613
614 class TestIteratorRow(unittest.TestCase):
615
616     def setUp(self):
617         self.samfile=pysam.Samfile( "ex1.bam","rb" )
618
619     def checkRange( self, rnge ):
620         '''compare results from iterator with those from samtools.'''
621         ps = list(self.samfile.fetch(region=rnge))
622         sa = list(pysam.view( "ex1.bam", rnge, raw = True) )
623         self.assertEqual( len(ps), len(sa), "unequal number of results for range %s: %i != %i" % (rnge, len(ps), len(sa) ))
624         # check if the same reads are returned and in the same order
625         for line, (a, b) in enumerate( list(zip( ps, sa )) ):
626             d = b.split("\t")
627             self.assertEqual( a.qname, d[0], "line %i: read id mismatch: %s != %s" % (line, a.rname, d[0]) )
628             self.assertEqual( a.pos, int(d[3])-1, "line %i: read position mismatch: %s != %s, \n%s\n%s\n" % \
629                                   (line, a.pos, int(d[3])-1,
630                                    str(a), str(d) ) )
631             if sys.version_info[0] < 3:
632                 qual = d[10]
633             else:
634                 qual = d[10].encode('ascii')
635             self.assertEqual( a.qual, qual, "line %i: quality mismatch: %s != %s, \n%s\n%s\n" % \
636                                   (line, a.qual, qual,
637                                    str(a), str(d) ) )
638
639     def testIteratePerContig(self):
640         '''check random access per contig'''
641         for contig in self.samfile.references:
642             self.checkRange( contig )
643
644     def testIterateRanges(self):
645         '''check random access per range'''
646         for contig, length in zip(self.samfile.references, self.samfile.lengths):
647             for start in range( 1, length, 90):
648                 self.checkRange( "%s:%i-%i" % (contig, start, start + 90) ) # this includes empty ranges
649
650     def tearDown(self):
651         self.samfile.close()
652
653
654 class TestIteratorRowAll(unittest.TestCase):
655
656     def setUp(self):
657         self.samfile=pysam.Samfile( "ex1.bam","rb" )
658
659     def testIterate(self):
660         '''compare results from iterator with those from samtools.'''
661         ps = list(self.samfile.fetch())
662         sa = list(pysam.view( "ex1.bam", raw = True) )
663         self.assertEqual( len(ps), len(sa), "unequal number of results: %i != %i" % (len(ps), len(sa) ))
664         # check if the same reads are returned
665         for line, pair in enumerate( list(zip( ps, sa )) ):
666             data = pair[1].split("\t")
667             self.assertEqual( pair[0].qname, data[0], "read id mismatch in line %i: %s != %s" % (line, pair[0].rname, data[0]) )
668
669     def tearDown(self):
670         self.samfile.close()
671
672 class TestIteratorColumn(unittest.TestCase):
673     '''test iterator column against contents of ex3.bam.'''
674     
675     # note that samfile contains 1-based coordinates
676     # 1D means deletion with respect to reference sequence
677     # 
678     mCoverages = { 'chr1' : [ 0 ] * 20 + [1] * 36 + [0] * (100 - 20 -35 ),
679                    'chr2' : [ 0 ] * 20 + [1] * 35 + [0] * (100 - 20 -35 ),
680                    }
681
682     def setUp(self):
683         self.samfile=pysam.Samfile( "ex4.bam","rb" )
684
685     def checkRange( self, contig, start = None, end = None, truncate = False ):
686         '''compare results from iterator with those from samtools.'''
687         # check if the same reads are returned and in the same order
688         for column in self.samfile.pileup(contig, start, end, truncate = truncate):
689             if truncate:
690                 self.assertGreaterEqual( column.pos, start )
691                 self.assertLess( column.pos, end )
692             thiscov = len(column.pileups)
693             refcov = self.mCoverages[self.samfile.getrname(column.tid)][column.pos]
694             self.assertEqual( thiscov, refcov, "wrong coverage at pos %s:%i %i should be %i" % (self.samfile.getrname(column.tid), column.pos, thiscov, refcov))
695
696     def testIterateAll(self):
697         '''check random access per contig'''
698         self.checkRange( None )
699
700     def testIteratePerContig(self):
701         '''check random access per contig'''
702         for contig in self.samfile.references:
703             self.checkRange( contig )
704
705     def testIterateRanges(self):
706         '''check random access per range'''
707         for contig, length in zip(self.samfile.references, self.samfile.lengths):
708             for start in range( 1, length, 90):
709                 self.checkRange( contig, start, start + 90 ) # this includes empty ranges
710
711     def testInverse( self ):
712         '''test the inverse, is point-wise pileup accurate.'''
713         for contig, refseq in list(self.mCoverages.items()):
714             refcolumns = sum(refseq)
715             for pos, refcov in enumerate( refseq ):
716                 columns = list(self.samfile.pileup( contig, pos, pos+1) )
717                 if refcov == 0:
718                     # if no read, no coverage
719                     self.assertEqual( len(columns), refcov, "wrong number of pileup columns returned for position %s:%i, %i should be %i" %(contig,pos,len(columns), refcov) )
720                 elif refcov == 1:
721                     # one read, all columns of the read are returned
722                     self.assertEqual( len(columns), refcolumns, "pileup incomplete at position %i: got %i, expected %i " %\
723                                           (pos, len(columns), refcolumns))
724
725     def testIterateTruncate( self ):
726         '''check random access per range'''
727         for contig, length in zip(self.samfile.references, self.samfile.lengths):
728             for start in range( 1, length, 90):
729                 self.checkRange( contig, start, start + 90, truncate = True ) # this includes empty ranges
730                 
731         
732         
733     def tearDown(self):
734         self.samfile.close()
735
736     
737 class TestAlignedReadFromBam(unittest.TestCase):
738
739     def setUp(self):
740         self.samfile=pysam.Samfile( "ex3.bam","rb" )
741         self.reads=list(self.samfile.fetch())
742
743     def testARqname(self):
744         self.assertEqual( self.reads[0].qname, "read_28833_29006_6945", "read name mismatch in read 1: %s != %s" % (self.reads[0].qname, "read_28833_29006_6945") )
745         self.assertEqual( self.reads[1].qname, "read_28701_28881_323b", "read name mismatch in read 2: %s != %s" % (self.reads[1].qname, "read_28701_28881_323b") )
746
747     def testARflag(self):
748         self.assertEqual( self.reads[0].flag, 99, "flag mismatch in read 1: %s != %s" % (self.reads[0].flag, 99) )
749         self.assertEqual( self.reads[1].flag, 147, "flag mismatch in read 2: %s != %s" % (self.reads[1].flag, 147) )
750
751     def testARrname(self):
752         self.assertEqual( self.reads[0].rname, 0, "chromosome/target id mismatch in read 1: %s != %s" % (self.reads[0].rname, 0) )
753         self.assertEqual( self.reads[1].rname, 1, "chromosome/target id mismatch in read 2: %s != %s" % (self.reads[1].rname, 1) )
754
755     def testARpos(self):
756         self.assertEqual( self.reads[0].pos, 33-1, "mapping position mismatch in read 1: %s != %s" % (self.reads[0].pos, 33-1) )
757         self.assertEqual( self.reads[1].pos, 88-1, "mapping position mismatch in read 2: %s != %s" % (self.reads[1].pos, 88-1) )
758
759     def testARmapq(self):
760         self.assertEqual( self.reads[0].mapq, 20, "mapping quality mismatch in read 1: %s != %s" % (self.reads[0].mapq, 20) )
761         self.assertEqual( self.reads[1].mapq, 30, "mapping quality mismatch in read 2: %s != %s" % (self.reads[1].mapq, 30) )
762
763     def testARcigar(self):
764         self.assertEqual( self.reads[0].cigar, [(0, 10), (2, 1), (0, 25)], "read name length mismatch in read 1: %s != %s" % (self.reads[0].cigar, [(0, 10), (2, 1), (0, 25)]) )
765         self.assertEqual( self.reads[1].cigar, [(0, 35)], "read name length mismatch in read 2: %s != %s" % (self.reads[1].cigar, [(0, 35)]) )
766
767     def testARmrnm(self):
768         self.assertEqual( self.reads[0].mrnm, 0, "mate reference sequence name mismatch in read 1: %s != %s" % (self.reads[0].mrnm, 0) )
769         self.assertEqual( self.reads[1].mrnm, 1, "mate reference sequence name mismatch in read 2: %s != %s" % (self.reads[1].mrnm, 1) )
770         self.assertEqual( self.reads[0].rnext, 0, "mate reference sequence name mismatch in read 1: %s != %s" % (self.reads[0].rnext, 0) )
771         self.assertEqual( self.reads[1].rnext, 1, "mate reference sequence name mismatch in read 2: %s != %s" % (self.reads[1].rnext, 1) )
772
773     def testARmpos(self):
774         self.assertEqual( self.reads[0].mpos, 200-1, "mate mapping position mismatch in read 1: %s != %s" % (self.reads[0].mpos, 200-1) )
775         self.assertEqual( self.reads[1].mpos, 500-1, "mate mapping position mismatch in read 2: %s != %s" % (self.reads[1].mpos, 500-1) )
776         self.assertEqual( self.reads[0].pnext, 200-1, "mate mapping position mismatch in read 1: %s != %s" % (self.reads[0].pnext, 200-1) )
777         self.assertEqual( self.reads[1].pnext, 500-1, "mate mapping position mismatch in read 2: %s != %s" % (self.reads[1].pnext, 500-1) )
778
779     def testARisize(self):
780         self.assertEqual( self.reads[0].isize, 167, "insert size mismatch in read 1: %s != %s" % (self.reads[0].isize, 167) )
781         self.assertEqual( self.reads[1].isize, 412, "insert size mismatch in read 2: %s != %s" % (self.reads[1].isize, 412) )
782         self.assertEqual( self.reads[0].tlen, 167, "insert size mismatch in read 1: %s != %s" % (self.reads[0].tlen, 167) )
783         self.assertEqual( self.reads[1].tlen, 412, "insert size mismatch in read 2: %s != %s" % (self.reads[1].tlen, 412) )
784
785     def testARseq(self):
786         self.assertEqual( self.reads[0].seq, b"AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 1: %s != %s" % (self.reads[0].seq, b"AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
787         self.assertEqual( self.reads[1].seq, b"ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "sequence size mismatch in read 2: %s != %s" % (self.reads[1].seq, b"ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
788         self.assertEqual( self.reads[3].seq, b"AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 4: %s != %s" % (self.reads[3].seq, b"AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
789
790     def testARqual(self):
791         self.assertEqual( self.reads[0].qual, b"<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 1: %s != %s" % (self.reads[0].qual, b"<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
792         self.assertEqual( self.reads[1].qual, b"<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "quality string mismatch in read 2: %s != %s" % (self.reads[1].qual, b"<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
793         self.assertEqual( self.reads[3].qual, b"<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "quality string mismatch in read 3: %s != %s" % (self.reads[3].qual, b"<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
794
795     def testARquery(self):
796         self.assertEqual( self.reads[0].query, b"AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "query mismatch in read 1: %s != %s" % (self.reads[0].query, b"AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") )
797         self.assertEqual( self.reads[1].query, b"ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "query size mismatch in read 2: %s != %s" % (self.reads[1].query, b"ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") )
798         self.assertEqual( self.reads[3].query, b"TAGCTAGCTACCTATATCTTGGTCTT", "query mismatch in read 4: %s != %s" % (self.reads[3].query, b"TAGCTAGCTACCTATATCTTGGTCTT") )
799
800     def testARqqual(self):
801         self.assertEqual( self.reads[0].qqual, b"<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "qquality string mismatch in read 1: %s != %s" % (self.reads[0].qqual, b"<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<") )
802         self.assertEqual( self.reads[1].qqual, b"<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "qquality string mismatch in read 2: %s != %s" % (self.reads[1].qqual, b"<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<") )
803         self.assertEqual( self.reads[3].qqual, b"<<<<<<<<<<<<<<<<<:<9/,&,22", "qquality string mismatch in read 3: %s != %s" % (self.reads[3].qqual, b"<<<<<<<<<<<<<<<<<:<9/,&,22") )
804
805     def testPresentOptionalFields(self):
806         self.assertEqual( self.reads[0].opt('NM'), 1, "optional field mismatch in read 1, NM: %s != %s" % (self.reads[0].opt('NM'), 1) )
807         self.assertEqual( self.reads[0].opt('RG'), 'L1', "optional field mismatch in read 1, RG: %s != %s" % (self.reads[0].opt('RG'), 'L1') )
808         self.assertEqual( self.reads[1].opt('RG'), 'L2', "optional field mismatch in read 2, RG: %s != %s" % (self.reads[1].opt('RG'), 'L2') )
809         self.assertEqual( self.reads[1].opt('MF'), 18, "optional field mismatch in read 2, MF: %s != %s" % (self.reads[1].opt('MF'), 18) )
810
811     def testPairedBools(self):
812         self.assertEqual( self.reads[0].is_paired, True, "is paired mismatch in read 1: %s != %s" % (self.reads[0].is_paired, True) )
813         self.assertEqual( self.reads[1].is_paired, True, "is paired mismatch in read 2: %s != %s" % (self.reads[1].is_paired, True) )
814         self.assertEqual( self.reads[0].is_proper_pair, True, "is proper pair mismatch in read 1: %s != %s" % (self.reads[0].is_proper_pair, True) )
815         self.assertEqual( self.reads[1].is_proper_pair, True, "is proper pair mismatch in read 2: %s != %s" % (self.reads[1].is_proper_pair, True) )
816
817     def testTags( self ):
818         self.assertEqual( self.reads[0].tags, 
819                           [('NM', 1), ('RG', 'L1'), 
820                            ('PG', 'P1'), ('XT', 'U')] )
821         self.assertEqual( self.reads[1].tags, 
822                           [('MF', 18), ('RG', 'L2'), 
823                            ('PG', 'P2'),('XT', 'R') ] )
824
825     def testOpt( self ):
826         self.assertEqual( self.reads[0].opt("XT"), "U" )
827         self.assertEqual( self.reads[1].opt("XT"), "R" )
828
829     def testMissingOpt( self ):
830         self.assertRaises( KeyError, self.reads[0].opt, "XP" )
831
832     def testEmptyOpt( self ):
833         self.assertRaises( KeyError, self.reads[2].opt, "XT" )
834
835     def tearDown(self):
836         self.samfile.close()
837
838 class TestAlignedReadFromSam(TestAlignedReadFromBam):
839
840     def setUp(self):
841         self.samfile=pysam.Samfile( "ex3.sam","r" )
842         self.reads=list(self.samfile.fetch())
843
844 # needs to be implemented 
845 # class TestAlignedReadFromSamWithoutHeader(TestAlignedReadFromBam):
846 #
847 #     def setUp(self):
848 #         self.samfile=pysam.Samfile( "ex7.sam","r" )
849 #         self.reads=list(self.samfile.fetch())
850
851 class TestHeaderSam(unittest.TestCase):
852
853     header = {'SQ': [{'LN': 1575, 'SN': 'chr1'}, 
854                      {'LN': 1584, 'SN': 'chr2'}], 
855               'RG': [{'LB': 'SC_1', 'ID': 'L1', 'SM': 'NA12891', 'PU': 'SC_1_10', "CN":"name:with:colon"}, 
856                      {'LB': 'SC_2', 'ID': 'L2', 'SM': 'NA12891', 'PU': 'SC_2_12', "CN":"name:with:colon"}],
857               'PG': [{'ID': 'P1', 'VN': '1.0'}, {'ID': 'P2', 'VN': '1.1'}], 
858               'HD': {'VN': '1.0'},
859               'CO' : [ 'this is a comment', 'this is another comment'],
860               }
861
862     def compareHeaders( self, a, b ):
863         '''compare two headers a and b.'''
864         for ak,av in a.items():
865             self.assertTrue( ak in b, "key '%s' not in '%s' " % (ak,b) )
866             self.assertEqual( av, b[ak] )
867
868     def setUp(self):
869         self.samfile=pysam.Samfile( "ex3.sam","r" )
870
871     def testHeaders(self):
872         self.compareHeaders( self.header, self.samfile.header )
873         self.compareHeaders( self.samfile.header, self.header )
874
875     def testNameMapping( self ):
876         for x, y in enumerate( ("chr1", "chr2")):
877             tid = self.samfile.gettid( y )
878             ref = self.samfile.getrname( x )
879             self.assertEqual( tid, x )
880             self.assertEqual( ref, y )
881
882         self.assertEqual( self.samfile.gettid("chr?"), -1 )
883         self.assertRaises( ValueError, self.samfile.getrname, 2 )
884
885     def tearDown(self):
886         self.samfile.close()
887
888 class TestHeaderBam(TestHeaderSam):
889
890     def setUp(self):
891         self.samfile=pysam.Samfile( "ex3.bam","rb" )
892
893
894 class TestUnmappedReads(unittest.TestCase):
895
896     def testSAM(self):
897         samfile=pysam.Samfile( "ex5.sam","r" )
898         self.assertEqual( len(list(samfile.fetch( until_eof = True))), 2 ) 
899         samfile.close()
900
901     def testBAM(self):
902         samfile=pysam.Samfile( "ex5.bam","rb" )
903         self.assertEqual( len(list(samfile.fetch( until_eof = True))), 2 ) 
904         samfile.close()
905
906 class TestPileupObjects(unittest.TestCase):
907
908     def setUp(self):
909         self.samfile=pysam.Samfile( "ex1.bam","rb" )
910
911     def testPileupColumn(self):
912         for pcolumn1 in self.samfile.pileup( region="chr1:105" ):
913             if pcolumn1.pos == 104:
914                 self.assertEqual( pcolumn1.tid, 0, "chromosome/target id mismatch in position 1: %s != %s" % (pcolumn1.tid, 0) )
915                 self.assertEqual( pcolumn1.pos, 105-1, "position mismatch in position 1: %s != %s" % (pcolumn1.pos, 105-1) )
916                 self.assertEqual( pcolumn1.n, 2, "# reads mismatch in position 1: %s != %s" % (pcolumn1.n, 2) )
917         for pcolumn2 in self.samfile.pileup( region="chr2:1480" ):
918             if pcolumn2.pos == 1479:
919                 self.assertEqual( pcolumn2.tid, 1, "chromosome/target id mismatch in position 1: %s != %s" % (pcolumn2.tid, 1) )
920                 self.assertEqual( pcolumn2.pos, 1480-1, "position mismatch in position 1: %s != %s" % (pcolumn2.pos, 1480-1) )
921                 self.assertEqual( pcolumn2.n, 12, "# reads mismatch in position 1: %s != %s" % (pcolumn2.n, 12) )
922
923     def testPileupRead(self):
924         for pcolumn1 in self.samfile.pileup( region="chr1:105" ):
925             if pcolumn1.pos == 104:
926                 self.assertEqual( len(pcolumn1.pileups), 2, "# reads aligned to column mismatch in position 1: %s != %s" % (len(pcolumn1.pileups), 2) )
927 #                self.assertEqual( pcolumn1.pileups[0]  # need to test additional properties here
928
929     def tearDown(self):
930         self.samfile.close()
931
932     def testIteratorOutOfScope( self ):
933         '''test if exception is raised if pileup col is accessed after iterator is exhausted.'''
934
935         for pileupcol in self.samfile.pileup():
936             pass
937         
938         self.assertRaises( ValueError, getattr, pileupcol, "pileups" )
939
940 class TestContextManager(unittest.TestCase):
941
942     def testManager( self ):
943         with pysam.Samfile('ex1.bam', 'rb') as samfile:
944             samfile.fetch()
945         self.assertEqual( samfile._isOpen(), False )
946
947 class TestExceptions(unittest.TestCase):
948
949     def setUp(self):
950         self.samfile=pysam.Samfile( "ex1.bam","rb" )
951
952     def testMissingFile(self):
953
954         self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.bam", "rb" )
955         self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.sam", "r" )
956         self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.bam", "r" )
957         self.assertRaises( IOError, pysam.Samfile, "exdoesntexist.sam", "rb" )
958
959     def testBadContig(self):
960         self.assertRaises( ValueError, self.samfile.fetch, "chr88" )
961
962     def testMeaninglessCrap(self):
963         self.assertRaises( ValueError, self.samfile.fetch, "skljf" )
964
965     def testBackwardsOrderNewFormat(self):
966         self.assertRaises( ValueError, self.samfile.fetch, 'chr1', 100, 10 )
967
968     def testBackwardsOrderOldFormat(self):
969         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:100-10")
970         
971     def testOutOfRangeNegativeNewFormat(self):
972         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, -10 )
973         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 5, 0 )
974         self.assertRaises( ValueError, self.samfile.fetch, "chr1", -5, -10 )
975
976         self.assertRaises( ValueError, self.samfile.count, "chr1", 5, -10 )
977         self.assertRaises( ValueError, self.samfile.count, "chr1", 5, 0 )        
978         self.assertRaises( ValueError, self.samfile.count, "chr1", -5, -10 )
979
980     def testOutOfRangeNegativeOldFormat(self):
981         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-10" )
982         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5-0" )
983         self.assertRaises( ValueError, self.samfile.fetch, region="chr1:-5--10" )
984
985         self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-10" )
986         self.assertRaises( ValueError, self.samfile.count, region="chr1:-5-0" )
987         self.assertRaises( ValueError, self.samfile.count, region="chr1:-5--10" )
988
989     def testOutOfRangNewFormat(self):
990         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999, 99999999999 )
991         self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999, 99999999999 )
992
993     def testOutOfRangeLargeNewFormat(self):
994         self.assertRaises( ValueError, self.samfile.fetch, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
995         self.assertRaises( ValueError, self.samfile.count, "chr1", 9999999999999999999999999999999, 9999999999999999999999999999999999999999 )
996
997     def testOutOfRangeLargeOldFormat(self):
998         self.assertRaises( ValueError, self.samfile.fetch, "chr1:99999999999999999-999999999999999999" )
999         self.assertRaises( ValueError, self.samfile.count, "chr1:99999999999999999-999999999999999999" )
1000
1001     def testZeroToZero(self):        
1002         '''see issue 44'''
1003         self.assertEqual( len(list(self.samfile.fetch('chr1', 0, 0))), 0)
1004
1005     def tearDown(self):
1006         self.samfile.close()
1007
1008 class TestWrongFormat(unittest.TestCase):
1009     '''test cases for opening files not in bam/sam format.'''
1010
1011     def testOpenSamAsBam( self ):
1012         self.assertRaises( ValueError, pysam.Samfile, 'ex1.sam', 'rb' )
1013
1014     def testOpenBamAsSam( self ):
1015         # test fails, needs to be implemented.
1016         # sam.fetch() fails on reading, not on opening
1017         # self.assertRaises( ValueError, pysam.Samfile, 'ex1.bam', 'r' )
1018         pass
1019
1020     def testOpenFastaAsSam( self ):
1021         # test fails, needs to be implemented.
1022         # sam.fetch() fails on reading, not on opening
1023         # self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'r' )
1024         pass
1025
1026     def testOpenFastaAsBam( self ):
1027         self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'rb' )
1028
1029 class TestFastaFile(unittest.TestCase):
1030
1031     mSequences = { 'chr1' :
1032                        b"CACTAGTGGCTCATTGTAAATGTGTGGTTTAACTCGTCCATGGCCCAGCATTAGGGAGCTGTGGACCCTGCAGCCTGGCTGTGGGGGCCGCAGTGGCTGAGGGGTGCAGAGCCGAGTCACGGGGTTGCCAGCACAGGGGCTTAACCTCTGGTGACTGCCAGAGCTGCTGGCAAGCTAGAGTCCCATTTGGAGCCCCTCTAAGCCGTTCTATTTGTAATGAAAACTATATTTATGCTATTCAGTTCTAAATATAGAAATTGAAACAGCTGTGTTTAGTGCCTTTGTTCAACCCCCTTGCAACAACCTTGAGAACCCCAGGGAATTTGTCAATGTCAGGGAAGGAGCATTTTGTCAGTTACCAAATGTGTTTATTACCAGAGGGATGGAGGGAAGAGGGACGCTGAAGAACTTTGATGCCCTCTTCTTCCAAAGATGAAACGCGTAACTGCGCTCTCATTCACTCCAGCTCCCTGTCACCCAATGGACCTGTGATATCTGGATTCTGGGAAATTCTTCATCCTGGACCCTGAGAGATTCTGCAGCCCAGCTCCAGATTGCTTGTGGTCTGACAGGCTGCAACTGTGAGCCATCACAATGAACAACAGGAAGAAAAGGTCTTTCAAAAGGTGATGTGTGTTCTCATCAACCTCATACACACACATGGTTTAGGGGTATAATACCTCTACATGGCTGATTATGAAAACAATGTTCCCCAGATACCATCCCTGTCTTACTTCCAGCTCCCCAGAGGGAAAGCTTTCAACGCTTCTAGCCATTTCTTTTGGCATTTGCCTTCAGACCCTACACGAATGCGTCTCTACCACAGGGGGCTGCGCGGTTTCCCATCATGAAGCACTGAACTTCCACGTCTCATCTAGGGGAACAGGGAGGTGCACTAATGCGCTCCACGCCCAAGCCCTTCTCACAGTTTCTGCCCCCAGCATGGTTGTACTGGGCAATACATGAGATTATTAGGAAATGCTTTACTGTCATAACTATGAAGAGACTATTGCCAGATGAACCACACATTAATACTATGTTTCTTATCTGCACATTACTACCCTGCAATTAATATAATTGTGTCCATGTACACACGCTGTCCTATGTACTTATCATGACTCTATCCCAAATTCCCAATTACGTCCTATCTTCTTCTTAGGGAAGAACAGCTTAGGTATCAATTTGGTGTTCTGTGTAAAGTCTCAGGGAGCCGTCCGTGTCCTCCCATCTGGCCTCGTCCACACTGGTTCTCTTGAAAGCTTGGGCTGTAATGATGCCCCTTGGCCATCACCCAGTCCCTGCCCCATCTCTTGTAATCTCTCTCCTTTTTGCTGCATCCCTGTCTTCCTCTGTCTTGATTTACTTGTTGTTGGTTTTCTGTTTCTTTGTTTGATTTGGTGGAAGACATAATCCCACGCTTCCTATGGAAAGGTTGTTGGGAGATTTTTAATGATTCCTCAATGTTAAAATGTCTATTTTTGTCTTGACACCCAACTAATATTTGTCTGAGCAAAACAGTCTAGATGAGAGAGAACTTCCCTGGAGGTCTGATGGCGTTTCTCCCTCGTCTTCTTA",
1033                    'chr2' :
1034                        b"TTCAAATGAACTTCTGTAATTGAAAAATTCATTTAAGAAATTACAAAATATAGTTGAAAGCTCTAACAATAGACTAAACCAAGCAGAAGAAAGAGGTTCAGAACTTGAAGACAAGTCTCTTATGAATTAACCCAGTCAGACAAAAATAAAGAAAAAAATTTTAAAAATGAACAGAGCTTTCAAGAAGTATGAGATTATGTAAAGTAACTGAACCTATGAGTCACAGGTATTCCTGAGGAAAAAGAAAAAGTGAGAAGTTTGGAAAAACTATTTGAGGAAGTAATTGGGGAAAACCTCTTTAGTCTTGCTAGAGATTTAGACATCTAAATGAAAGAGGCTCAAAGAATGCCAGGAAGATACATTGCAAGACAGACTTCATCAAGATATGTAGTCATCAGACTATCTAAAGTCAACATGAAGGAAAAAAATTCTAAAATCAGCAAGAGAAAAGCATACAGTCATCTATAAAGGAAATCCCATCAGAATAACAATGGGCTTCTCAGCAGAAACCTTACAAGCCAGAAGAGATTGGATCTAATTTTTGGACTTCTTAAAGAAAAAAAAACCTGTCAAACACGAATGTTATGCCCTGCTAAACTAAGCATCATAAATGAAGGGGAAATAAAGTCAAGTCTTTCCTGACAAGCAAATGCTAAGATAATTCATCATCACTAAACCAGTCCTATAAGAAATGCTCAAAAGAATTGTAAAAGTCAAAATTAAAGTTCAATACTCACCATCATAAATACACACAAAAGTACAAAACTCACAGGTTTTATAAAACAATTGAGACTACAGAGCAACTAGGTAAAAAATTAACATTACAACAGGAACAAAACCTCATATATCAATATTAACTTTGAATAAAAAGGGATTAAATTCCCCCACTTAAGAGATATAGATTGGCAGAACAGATTTAAAAACATGAACTAACTATATGCTGTTTACAAGAAACTCATTAATAAAGACATGAGTTCAGGTAAAGGGGTGGAAAAAGATGTTCTACGCAAACAGAAACCAAATGAGAGAAGGAGTAGCTATACTTATATCAGATAAAGCACACTTTAAATCAACAACAGTAAAATAAAACAAAGGAGGTCATCATACAATGATAAAAAGATCAATTCAGCAAGAAGATATAACCATCCTACTAAATACATATGCACCTAACACAAGACTACCCAGATTCATAAAACAAATACTACTAGACCTAAGAGGGATGAGAAATTACCTAATTGGTACAATGTACAATATTCTGATGATGGTTACACTAAAAGCCCATACTTTACTGCTACTCAATATATCCATGTAACAAATCTGCGCTTGTACTTCTAAATCTATAAAAAAATTAAAATTTAACAAAAGTAAATAAAACACATAGCTAAAACTAAAAAAGCAAAAACAAAAACTATGCTAAGTATTGGTAAAGATGTGGGGAAAAAAGTAAACTCTCAAATATTGCTAGTGGGAGTATAAATTGTTTTCCACTTTGGAAAACAATTTGGTAATTTCGTTTTTTTTTTTTTCTTTTCTCTTTTTTTTTTTTTTTTTTTTGCATGCCAGAAAAAAATATTTACAGTAACT",
1035                    }
1036
1037     def setUp(self):
1038         self.file=pysam.Fastafile( "ex1.fa" )
1039
1040     def testFetch(self):
1041         for id, seq in list(self.mSequences.items()):
1042             self.assertEqual( seq, self.file.fetch( id ) )
1043             for x in range( 0, len(seq), 10):
1044                 self.assertEqual( seq[x:x+10], self.file.fetch( id, x, x+10) )
1045                 # test x:end
1046                 self.assertEqual( seq[x:], self.file.fetch( id, x) )
1047                 # test 0:x
1048                 self.assertEqual( seq[:x], self.file.fetch( id, None, x) )
1049
1050         
1051         # unknown sequence returns ""
1052         self.assertEqual( b"", self.file.fetch("chr12") )
1053
1054     def testOutOfRangeAccess( self ):
1055         '''test out of range access.'''
1056         # out of range access returns an empty string
1057         for contig, s in self.mSequences.items():
1058             self.assertEqual( self.file.fetch( contig, len(s), len(s)+1), b"" )
1059
1060         self.assertEqual( self.file.fetch( "chr3", 0 , 100), b"" ) 
1061
1062     def testFetchErrors( self ):
1063         self.assertRaises( ValueError, self.file.fetch )
1064         self.assertRaises( ValueError, self.file.fetch, "chr1", -1, 10 )
1065         self.assertRaises( ValueError, self.file.fetch, "chr1", 20, 10 )
1066
1067     def testLength( self ):
1068         self.assertEqual( len(self.file), 2 )
1069         
1070     def tearDown(self):
1071         self.file.close()
1072
1073 class TestAlignedRead(unittest.TestCase):
1074     '''tests to check if aligned read can be constructed
1075     and manipulated.
1076     '''
1077
1078     def checkFieldEqual( self, read1, read2, exclude = []):
1079         '''check if two reads are equal by comparing each field.'''
1080
1081         for x in ("qname", "seq", "flag",
1082                   "rname", "pos", "mapq", "cigar",
1083                   "mrnm", "mpos", "isize", "qual",
1084                   "is_paired", "is_proper_pair",
1085                   "is_unmapped", "mate_is_unmapped",
1086                   "is_reverse", "mate_is_reverse",
1087                   "is_read1", "is_read2",
1088                   "is_secondary", "is_qcfail",
1089                   "is_duplicate", "bin"):
1090             if x in exclude: continue
1091             self.assertEqual( getattr(read1, x), getattr(read2,x), "attribute mismatch for %s: %s != %s" % 
1092                               (x, getattr(read1, x), getattr(read2,x)))
1093     
1094     def testEmpty( self ):
1095         a = pysam.AlignedRead()
1096         self.assertEqual( a.qname, None )
1097         self.assertEqual( a.seq, None )
1098         self.assertEqual( a.qual, None )
1099         self.assertEqual( a.flag, 0 )
1100         self.assertEqual( a.rname, 0 )
1101         self.assertEqual( a.mapq, 0 )
1102         self.assertEqual( a.cigar, None )
1103         self.assertEqual( a.tags, [] )
1104         self.assertEqual( a.mrnm, 0 )
1105         self.assertEqual( a.mpos, 0 )
1106         self.assertEqual( a.isize, 0 )
1107
1108     def buildRead( self ):
1109         '''build an example read.'''
1110         
1111         a = pysam.AlignedRead()
1112         a.qname = "read_12345"
1113         a.seq="ACGT" * 10
1114         a.flag = 0
1115         a.rname = 0
1116         a.pos = 20
1117         a.mapq = 20
1118         a.cigar = ( (0,10), (2,1), (0,9), (1,1), (0,20) )
1119         a.mrnm = 0
1120         a.mpos=200
1121         a.isize=167
1122         a.qual="1234" * 10
1123         # todo: create tags
1124         return a
1125
1126     def testUpdate( self ):
1127         '''check if updating fields affects other variable length data
1128         '''
1129         a = self.buildRead()
1130         b = self.buildRead()
1131
1132         # check qname
1133         b.qname = "read_123"
1134         self.checkFieldEqual( a, b, "qname" )
1135         b.qname = "read_12345678"
1136         self.checkFieldEqual( a, b, "qname" )
1137         b.qname = "read_12345"
1138         self.checkFieldEqual( a, b)
1139
1140         # check cigar
1141         b.cigar = ( (0,10), )
1142         self.checkFieldEqual( a, b, "cigar" )
1143         b.cigar = ( (0,10), (2,1), (0,10) )
1144         self.checkFieldEqual( a, b, "cigar" )
1145         b.cigar = ( (0,10), (2,1), (0,9), (1,1), (0,20) )
1146         self.checkFieldEqual( a, b)
1147
1148         # check seq 
1149         b.seq = "ACGT"
1150         self.checkFieldEqual( a, b, ("seq", "qual") )
1151         b.seq = "ACGT" * 3
1152         self.checkFieldEqual( a, b, ("seq", "qual") )
1153         b.seq = "ACGT" * 10
1154         self.checkFieldEqual( a, b, ("qual",))
1155
1156         # reset qual
1157         b = self.buildRead()
1158
1159         # check flags:
1160         for x in (
1161             "is_paired", "is_proper_pair",
1162             "is_unmapped", "mate_is_unmapped",
1163             "is_reverse", "mate_is_reverse",
1164             "is_read1", "is_read2",
1165             "is_secondary", "is_qcfail",
1166             "is_duplicate"):
1167             setattr( b, x, True )
1168             self.assertEqual( getattr(b, x), True )
1169             self.checkFieldEqual( a, b, ("flag", x,) )
1170             setattr( b, x, False )
1171             self.assertEqual( getattr(b, x), False )
1172             self.checkFieldEqual( a, b )
1173
1174     def testLargeRead( self ):
1175         '''build an example read.'''
1176         
1177         a = pysam.AlignedRead()
1178         a.qname = "read_12345"
1179         a.seq="ACGT" * 200
1180         a.flag = 0
1181         a.rname = 0
1182         a.pos = 20
1183         a.mapq = 20
1184         a.cigar = ( (0, 4 * 200), )
1185         a.mrnm = 0
1186         a.mpos=200
1187         a.isize=167
1188         a.qual="1234" * 200
1189
1190         return a
1191
1192     def testTagParsing( self ):
1193         '''test for tag parsing
1194
1195         see http://groups.google.com/group/pysam-user-group/browse_thread/thread/67ca204059ea465a
1196         '''
1197         samfile=pysam.Samfile( "ex8.bam","rb" )
1198
1199         for entry in samfile:
1200             before = entry.tags
1201             entry.tags = entry.tags
1202             after = entry.tags
1203             self.assertEqual( after, before )
1204
1205     def testUpdateTlen( self ):
1206         '''check if updating tlen works'''
1207         a = self.buildRead()
1208         oldlen = a.tlen
1209         oldlen *= 2
1210         a.tlen = oldlen
1211         self.assertEqual( a.tlen, oldlen )
1212
1213     def testPositions( self ):
1214         a = self.buildRead()
1215         self.assertEqual( a.positions,
1216                           [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 
1217                                 31, 32, 33, 34, 35, 36, 37, 38, 39, 
1218                            40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 
1219                            50, 51, 52, 53, 54, 55, 56, 57, 58, 59] )
1220
1221         self.assertEqual( a.aligned_pairs,
1222                           [(0, 20), (1, 21), (2, 22), (3, 23), (4, 24), 
1223                            (5, 25), (6, 26), (7, 27), (8, 28), (9, 29), 
1224                            (None, 30), 
1225                            (10, 31), (11, 32), (12, 33), (13, 34), (14, 35), 
1226                            (15, 36), (16, 37), (17, 38), (18, 39), (19, None), 
1227                            (20, 40), (21, 41), (22, 42), (23, 43), (24, 44), 
1228                            (25, 45), (26, 46), (27, 47), (28, 48), (29, 49), 
1229                            (30, 50), (31, 51), (32, 52), (33, 53), (34, 54), 
1230                            (35, 55), (36, 56), (37, 57), (38, 58), (39, 59)] )
1231
1232         self.assertEqual( a.positions, [x[1] for x in a.aligned_pairs if x[0] != None and x[1] != None] )
1233         # alen is the length of the aligned read in genome
1234         self.assertEqual( a.alen, a.aligned_pairs[-1][0] + 1 )
1235         # aend points to one beyond last aligned base in ref
1236         self.assertEqual( a.positions[-1], a.aend - 1 )
1237
1238 class TestDeNovoConstruction(unittest.TestCase):
1239     '''check BAM/SAM file construction using ex6.sam
1240     
1241     (note these are +1 coordinates):
1242     
1243     read_28833_29006_6945       99      chr1    33      20      10M1D25M        =       200     167     AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG     <<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<     NM:i:1  RG:Z:L1
1244     read_28701_28881_323b       147     chr2    88      30      35M     =       500     412     ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA     <<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<     MF:i:18 RG:Z:L2
1245     '''
1246
1247     header = { 'HD': {'VN': '1.0'},
1248                'SQ': [{'LN': 1575, 'SN': 'chr1'}, 
1249                       {'LN': 1584, 'SN': 'chr2'}], }
1250
1251     bamfile = "ex6.bam"
1252     samfile = "ex6.sam"
1253
1254     def checkFieldEqual( self, read1, read2, exclude = []):
1255         '''check if two reads are equal by comparing each field.'''
1256
1257         for x in ("qname", "seq", "flag",
1258                   "rname", "pos", "mapq", "cigar",
1259                   "mrnm", "mpos", "isize", "qual",
1260                   "bin",
1261                   "is_paired", "is_proper_pair",
1262                   "is_unmapped", "mate_is_unmapped",
1263                   "is_reverse", "mate_is_reverse",
1264                   "is_read1", "is_read2",
1265                   "is_secondary", "is_qcfail",
1266                   "is_duplicate"):
1267             if x in exclude: continue
1268             self.assertEqual( getattr(read1, x), getattr(read2,x), "attribute mismatch for %s: %s != %s" % 
1269                               (x, getattr(read1, x), getattr(read2,x)))
1270
1271     def setUp( self ):
1272
1273         a = pysam.AlignedRead()
1274         a.qname = "read_28833_29006_6945"
1275         a.seq="AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG"
1276         a.flag = 99
1277         a.rname = 0
1278         a.pos = 32
1279         a.mapq = 20
1280         a.cigar = ( (0,10), (2,1), (0,25) )
1281         a.mrnm = 0
1282         a.mpos=199
1283         a.isize=167
1284         a.qual="<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<"
1285         a.tags = ( ("NM", 1),
1286                    ("RG", "L1") )
1287
1288         b = pysam.AlignedRead()
1289         b.qname = "read_28701_28881_323b"
1290         b.seq="ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA"
1291         b.flag = 147
1292         b.rname = 1
1293         b.pos = 87
1294         b.mapq = 30
1295         b.cigar = ( (0,35), )
1296         b.mrnm = 1
1297         b.mpos=499
1298         b.isize=412
1299         b.qual="<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<"
1300         b.tags = ( ("MF", 18),
1301                    ("RG", "L2") )
1302
1303         self.reads = (a,b)
1304
1305     def testSAMWholeFile( self ):
1306         
1307         tmpfilename = "tmp_%i.sam" % id(self)
1308
1309         outfile = pysam.Samfile( tmpfilename, "wh", header = self.header )
1310
1311         for x in self.reads: outfile.write( x )
1312         outfile.close()
1313         self.assertTrue( checkBinaryEqual( tmpfilename, self.samfile ),
1314                          "mismatch when construction SAM file, see %s %s" % (tmpfilename, self.samfile))
1315         
1316         os.unlink( tmpfilename )
1317
1318     def testBAMPerRead( self ):
1319         '''check if individual reads are binary equal.'''
1320         infile = pysam.Samfile( self.bamfile, "rb")
1321
1322         others = list(infile)
1323         for denovo, other in zip( others, self.reads):
1324             self.checkFieldEqual( other, denovo )
1325             self.assertEqual( other.compare( denovo ), 0 )
1326
1327     def testSAMPerRead( self ):
1328         '''check if individual reads are binary equal.'''
1329         infile = pysam.Samfile( self.samfile, "r")
1330
1331         others = list(infile)
1332         for denovo, other in zip( others, self.reads):
1333             self.checkFieldEqual( other, denovo )
1334             self.assertEqual( other.compare( denovo), 0 )
1335             
1336     def testBAMWholeFile( self ):
1337         
1338         tmpfilename = "tmp_%i.bam" % id(self)
1339
1340         outfile = pysam.Samfile( tmpfilename, "wb", header = self.header )
1341
1342         for x in self.reads: outfile.write( x )
1343         outfile.close()
1344         
1345         self.assertTrue( checkBinaryEqual( tmpfilename, self.bamfile ),
1346                          "mismatch when construction BAM file, see %s %s" % (tmpfilename, self.bamfile))
1347         
1348         os.unlink( tmpfilename )
1349
1350 class TestDeNovoConstructionUserTags(TestDeNovoConstruction):
1351     '''test de novo construction with a header that contains lower-case tags.'''
1352
1353     header = { 'HD': {'VN': '1.0'},
1354                'SQ': [{'LN': 1575, 'SN': 'chr1'}, 
1355                       {'LN': 1584, 'SN': 'chr2'}],
1356                'x1': {'A': 2, 'B': 5 },
1357                'x3': {'A': 6, 'B': 5 },
1358                'x2': {'A': 4, 'B': 5 } }
1359
1360     bamfile = "example_user_header.bam"
1361     samfile = "example_user_header.sam"
1362
1363 class TestEmptyHeader( unittest.TestCase ):
1364     '''see issue 84.'''
1365     
1366     def testEmptyHeader( self ):
1367
1368         s = pysam.Samfile('example_empty_header.bam')
1369         self.assertEqual( s.header, {'SQ': [{'LN': 1000, 'SN': 'chr1'}]} )
1370
1371 class TestBTagSam( unittest.TestCase ):
1372     '''see issue 81.'''
1373     
1374     compare = [ [100, 1, 91, 0, 7, 101, 0, 201, 96, 204, 0, 0, 87, 109, 0, 7, 97, 112, 1, 12, 78, 197, 0, 7, 100, 95, 101, 202, 0, 6, 0, 1, 186, 0, 84, 0, 244, 0, 0, 324, 0, 107, 195, 101, 113, 0, 102, 0, 104, 3, 0, 101, 1, 0, 212, 6, 0, 0, 1, 0, 74, 1, 11, 0, 196, 2, 197, 103, 0, 108, 98, 2, 7, 0, 1, 2, 194, 0, 180, 0, 108, 0, 203, 104, 16, 5, 205, 0, 0, 0, 1, 1, 100, 98, 0, 0, 204, 6, 0, 79, 0, 0, 101, 7, 109, 90, 265, 1, 27, 10, 109, 102, 9, 0, 292, 0, 110, 0, 0, 102, 112, 0, 0, 84, 100, 103, 2, 81, 126, 0, 2, 90, 0, 15, 96, 15, 1, 0, 2, 0, 107, 92, 0, 0, 101, 3, 98, 15, 102, 13, 116, 116, 90, 93, 198, 0, 0, 0, 199, 92, 26, 495, 100, 5, 0, 100, 5, 209, 0, 92, 107, 90, 0, 0, 0, 0, 109, 194, 7, 94, 200, 0, 40, 197, 0, 11, 0, 0, 112, 110, 6, 4, 200, 28, 0, 196, 0, 203, 1, 129, 0, 0, 1, 0, 94, 0, 1, 0, 107, 5, 201, 3, 3, 100, 0, 121, 0, 7, 0, 1, 105, 306, 3, 86, 8, 183, 0, 12, 163, 17, 83, 22, 0, 0, 1, 8, 109, 103, 0, 0, 295, 0, 200, 16, 172, 3, 16, 182, 3, 11, 0, 0, 223, 111, 103, 0, 5, 225, 0, 95],
1375                 [-100,200,-300,-400],
1376                 [-100,12],
1377                 [12,15],
1378                 [-1.0,5.0,2.5] ]
1379             
1380     filename = 'example_btag.sam'
1381
1382     def testRead( self ):
1383
1384         s = pysam.Samfile(self.filename)
1385         for x, read in enumerate(s):
1386             if x == 0:
1387                 self.assertEqual( read.tags, [('RG', 'QW85I'), ('PG', 'tmap'), ('MD', '140'), ('NM', 0), ('AS', 140), ('FZ', [100, 1, 91, 0, 7, 101, 0, 201, 96, 204, 0, 0, 87, 109, 0, 7, 97, 112, 1, 12, 78, 197, 0, 7, 100, 95, 101, 202, 0, 6, 0, 1, 186, 0, 84, 0, 244, 0, 0, 324, 0, 107, 195, 101, 113, 0, 102, 0, 104, 3, 0, 101, 1, 0, 212, 6, 0, 0, 1, 0, 74, 1, 11, 0, 196, 2, 197, 103, 0, 108, 98, 2, 7, 0, 1, 2, 194, 0, 180, 0, 108, 0, 203, 104, 16, 5, 205, 0, 0, 0, 1, 1, 100, 98, 0, 0, 204, 6, 0, 79, 0, 0, 101, 7, 109, 90, 265, 1, 27, 10, 109, 102, 9, 0, 292, 0, 110, 0, 0, 102, 112, 0, 0, 84, 100, 103, 2, 81, 126, 0, 2, 90, 0, 15, 96, 15, 1, 0, 2, 0, 107, 92, 0, 0, 101, 3, 98, 15, 102, 13, 116, 116, 90, 93, 198, 0, 0, 0, 199, 92, 26, 495, 100, 5, 0, 100, 5, 209, 0, 92, 107, 90, 0, 0, 0, 0, 109, 194, 7, 94, 200, 0, 40, 197, 0, 11, 0, 0, 112, 110, 6, 4, 200, 28, 0, 196, 0, 203, 1, 129, 0, 0, 1, 0, 94, 0, 1, 0, 107, 5, 201, 3, 3, 100, 0, 121, 0, 7, 0, 1, 105, 306, 3, 86, 8, 183, 0, 12, 163, 17, 83, 22, 0, 0, 1, 8, 109, 103, 0, 0, 295, 0, 200, 16, 172, 3, 16, 182, 3, 11, 0, 0, 223, 111, 103, 0, 5, 225, 0, 95]), ('XA', 'map2-1'), ('XS', 53), ('XT', 38), ('XF', 1), ('XE', 0)] 
1388                                   )
1389                          
1390             fz = dict(read.tags)["FZ"]
1391             self.assertEqual( fz, self.compare[x] )
1392             self.assertEqual( read.opt("FZ"), self.compare[x])
1393
1394     def testWrite( self ):
1395         
1396         s = pysam.Samfile(self.filename)
1397         for read in s:
1398             before = read.tags
1399             read.tags = read.tags
1400             after = read.tags
1401             self.assertEqual( after, before )
1402
1403 class TestBTagBam( TestBTagSam ):
1404     filename = 'example_btag.bam'
1405
1406 class TestDoubleFetch(unittest.TestCase):
1407     '''check if two iterators on the same bamfile are independent.'''
1408     
1409     def testDoubleFetch( self ):
1410
1411         samfile1 = pysam.Samfile('ex1.bam', 'rb')
1412
1413         for a,b in zip(samfile1.fetch(), samfile1.fetch()):
1414             self.assertEqual( a.compare( b ), 0 )
1415
1416     def testDoubleFetchWithRegion( self ):
1417
1418         samfile1 = pysam.Samfile('ex1.bam', 'rb')
1419         chr, start, stop = 'chr1', 200, 3000000
1420         self.assertTrue(len(list(samfile1.fetch ( chr, start, stop))) > 0) #just making sure the test has something to catch
1421
1422         for a,b in zip(samfile1.fetch( chr, start, stop), samfile1.fetch( chr, start, stop)):
1423             self.assertEqual( a.compare( b ), 0 ) 
1424
1425     def testDoubleFetchUntilEOF( self ):
1426
1427         samfile1 = pysam.Samfile('ex1.bam', 'rb')
1428
1429         for a,b in zip(samfile1.fetch( until_eof = True), 
1430                        samfile1.fetch( until_eof = True )):
1431             self.assertEqual( a.compare( b), 0 )
1432
1433 class TestRemoteFileFTP(unittest.TestCase):
1434     '''test remote access.
1435
1436     '''
1437
1438     # Need to find an ftp server without password on standard
1439     # port.
1440
1441     url = "ftp://ftp.sanger.ac.uk/pub/rd/humanSequences/CV.bam"
1442     region = "1:1-1000"
1443
1444     def testFTPView( self ):
1445         return
1446         result = pysam.view( self.url, self.region )
1447         self.assertEqual( len(result), 36 )
1448         
1449     def testFTPFetch( self ):
1450         return
1451         samfile = pysam.Samfile(self.url, "rb")  
1452         result = list(samfile.fetch( region = self.region ))
1453         self.assertEqual( len(result), 36 )
1454
1455 class TestRemoteFileHTTP( unittest.TestCase):
1456
1457     url = "http://genserv.anat.ox.ac.uk/downloads/pysam/test/ex1.bam"
1458     region = "chr1:1-1000"
1459     local = "ex1.bam"
1460
1461     def testView( self ):
1462         samfile_local = pysam.Samfile(self.local, "rb")  
1463         ref = list(samfile_local.fetch( region = self.region ))
1464         
1465         result = pysam.view( self.url, self.region )
1466         self.assertEqual( len(result), len(ref) )
1467         
1468     def testFetch( self ):
1469         samfile = pysam.Samfile(self.url, "rb")  
1470         result = list(samfile.fetch( region = self.region ))
1471         samfile_local = pysam.Samfile(self.local, "rb")  
1472         ref = list(samfile_local.fetch( region = self.region ))
1473
1474         self.assertEqual( len(ref), len(result) )
1475         for x, y in zip(result, ref):
1476             self.assertEqual( x.compare( y ), 0 )
1477
1478     def testFetchAll( self ):
1479         samfile = pysam.Samfile(self.url, "rb")  
1480         result = list(samfile.fetch())
1481         samfile_local = pysam.Samfile(self.local, "rb")  
1482         ref = list(samfile_local.fetch() )
1483
1484         self.assertEqual( len(ref), len(result) )
1485         for x, y in zip(result, ref):
1486             self.assertEqual( x.compare( y ), 0 )
1487
1488 class TestLargeOptValues( unittest.TestCase ):
1489
1490     ints = ( 65536, 214748, 2147484, 2147483647 )
1491     floats = ( 65536.0, 214748.0, 2147484.0 )
1492
1493     def check( self, samfile ):
1494         
1495         i = samfile.fetch()
1496         for exp in self.ints:
1497             rr = next(i)
1498             obs = rr.opt("ZP")
1499             self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1500
1501         for exp in [ -x for x in self.ints ]:
1502             rr = next(i)
1503             obs = rr.opt("ZP")
1504             self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1505
1506         for exp in self.floats:
1507             rr = next(i)
1508             obs = rr.opt("ZP")
1509             self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1510
1511         for exp in [ -x for x in self.floats ]:
1512             rr = next(i)
1513             obs = rr.opt("ZP")
1514             self.assertEqual( exp, obs, "expected %s, got %s\n%s" % (str(exp), str(obs), str(rr)))
1515
1516     def testSAM( self ):
1517         samfile = pysam.Samfile("ex10.sam", "r")
1518         self.check( samfile )
1519
1520     def testBAM( self ):
1521         samfile = pysam.Samfile("ex10.bam", "rb")
1522         self.check( samfile )
1523
1524 # class TestSNPCalls( unittest.TestCase ):
1525 #     '''test pysam SNP calling ability.'''
1526
1527 #     def checkEqual( self, a, b ):
1528 #         for x in ("reference_base", 
1529 #                   "pos",
1530 #                   "genotype",
1531 #                   "consensus_quality",
1532 #                   "snp_quality",
1533 #                   "mapping_quality",
1534 #                   "coverage" ):
1535 #             self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" % 
1536 #                               (x, getattr(a, x), getattr(b,x), str(a), str(b)))
1537
1538 #     def testAllPositionsViaIterator( self ):
1539 #         samfile = pysam.Samfile( "ex1.bam", "rb")  
1540 #         fastafile = pysam.Fastafile( "ex1.fa" )
1541 #         try: 
1542 #             refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base != "*"]
1543 #         except pysam.SamtoolsError:
1544 #             pass
1545
1546 #         i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1547 #         calls = list(pysam.IteratorSNPCalls(i))
1548 #         for x,y in zip( refs, calls ):
1549 #             self.checkEqual( x, y )
1550
1551 #     def testPerPositionViaIterator( self ):
1552 #         # test pileup for each position. This is a slow operation
1553 #         # so this test is disabled 
1554 #         return
1555 #         samfile = pysam.Samfile( "ex1.bam", "rb")  
1556 #         fastafile = pysam.Fastafile( "ex1.fa" )
1557 #         for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1558 #             if x.reference_base == "*": continue
1559 #             i = samfile.pileup( x.chromosome, x.pos, x.pos+1,
1560 #                                 fastafile = fastafile,
1561 #                                 stepper = "samtools" )
1562 #             z = [ zz for zz in pysam.IteratorSamtools(i) if zz.pos == x.pos ]
1563 #             self.assertEqual( len(z), 1 )
1564 #             self.checkEqual( x, z[0] )
1565
1566 #     def testPerPositionViaCaller( self ):
1567 #         # test pileup for each position. This is a fast operation
1568 #         samfile = pysam.Samfile( "ex1.bam", "rb")  
1569 #         fastafile = pysam.Fastafile( "ex1.fa" )
1570 #         i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1571 #         caller = pysam.SNPCaller( i )
1572
1573 #         for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1574 #             if x.reference_base == "*": continue
1575 #             call = caller.call( x.chromosome, x.pos )
1576 #             self.checkEqual( x, call )
1577
1578 # class TestIndelCalls( unittest.TestCase ):
1579 #     '''test pysam indel calling.'''
1580
1581 #     def checkEqual( self, a, b ):
1582
1583 #         for x in ("pos",
1584 #                   "genotype",
1585 #                   "consensus_quality",
1586 #                   "snp_quality",
1587 #                   "mapping_quality",
1588 #                   "coverage",
1589 #                   "first_allele",
1590 #                   "second_allele",
1591 #                   "reads_first",
1592 #                   "reads_second",
1593 #                   "reads_diff"):
1594 #             if b.genotype == "*/*" and x == "second_allele":
1595 #                 # ignore test for second allele (positions chr2:439 and chr2:1512)
1596 #                 continue
1597 #             self.assertEqual( getattr(a, x), getattr(b,x), "%s mismatch: %s != %s\n%s\n%s" % 
1598 #                               (x, getattr(a, x), getattr(b,x), str(a), str(b)))
1599
1600 #     def testAllPositionsViaIterator( self ):
1601
1602 #         samfile = pysam.Samfile( "ex1.bam", "rb")  
1603 #         fastafile = pysam.Fastafile( "ex1.fa" )
1604 #         try: 
1605 #             refs = [ x for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ) if x.reference_base == "*"]
1606 #         except pysam.SamtoolsError:
1607 #             pass
1608
1609 #         i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1610 #         calls = [ x for x in list(pysam.IteratorIndelCalls(i)) if x != None ]
1611 #         for x,y in zip( refs, calls ):
1612 #             self.checkEqual( x, y )
1613
1614 #     def testPerPositionViaCaller( self ):
1615 #         # test pileup for each position. This is a fast operation
1616 #         samfile = pysam.Samfile( "ex1.bam", "rb")  
1617 #         fastafile = pysam.Fastafile( "ex1.fa" )
1618 #         i = samfile.pileup( stepper = "samtools", fastafile = fastafile )
1619 #         caller = pysam.IndelCaller( i )
1620
1621 #         for x in pysam.pileup( "-c", "-f", "ex1.fa", "ex1.bam" ):
1622 #             if x.reference_base != "*": continue
1623 #             call = caller.call( x.chromosome, x.pos )
1624 #             self.checkEqual( x, call )
1625
1626 class TestLogging( unittest.TestCase ):
1627     '''test around bug issue 42,
1628
1629     failed in versions < 0.4
1630     '''
1631
1632     def check( self, bamfile, log ):
1633
1634         if log:
1635             logger = logging.getLogger('franklin')
1636             logger.setLevel(logging.INFO)
1637             formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
1638             log_hand  = logging.FileHandler('log.txt')
1639             log_hand.setFormatter(formatter)
1640             logger.addHandler(log_hand)
1641
1642         bam  = pysam.Samfile(bamfile, 'rb')
1643         cols = bam.pileup()
1644         self.assertTrue( True )
1645
1646     def testFail1( self ):
1647         self.check( "ex9_fail.bam", False )
1648         self.check( "ex9_fail.bam", True )
1649
1650     def testNoFail1( self ):
1651         self.check( "ex9_nofail.bam", False )
1652         self.check( "ex9_nofail.bam", True )
1653
1654     def testNoFail2( self ):
1655         self.check( "ex9_nofail.bam", True )
1656         self.check( "ex9_nofail.bam", True )
1657         
1658 # TODOS
1659 # 1. finish testing all properties within pileup objects
1660 # 2. check exceptions and bad input problems (missing files, optional fields that aren't present, etc...)
1661 # 3. check: presence of sequence
1662
1663 class TestSamfileUtilityFunctions( unittest.TestCase ):
1664
1665     def testCount( self ):
1666
1667         samfile = pysam.Samfile( "ex1.bam", "rb" )
1668
1669         for contig in ("chr1", "chr2" ):
1670             for start in range( 0, 2000, 100 ):
1671                 end = start + 1
1672                 self.assertEqual( len( list( samfile.fetch( contig, start, end ) ) ),
1673                                   samfile.count( contig, start, end ) )
1674
1675                 # test empty intervals
1676                 self.assertEqual( len( list( samfile.fetch( contig, start, start ) ) ),
1677                                   samfile.count( contig, start, start ) )
1678
1679                 # test half empty intervals
1680                 self.assertEqual( len( list( samfile.fetch( contig, start ) ) ),
1681                                   samfile.count( contig, start ) )
1682
1683     def testMate( self ):
1684         '''test mate access.'''
1685
1686         with open( "ex1.sam", "rb" ) as inf:
1687             readnames = [ x.split(b"\t")[0] for x in inf.readlines() ]
1688         if sys.version_info[0] >= 3:
1689             readnames = [ name.decode('ascii') for name in readnames ]
1690             
1691         counts = collections.defaultdict( int )
1692         for x in readnames: counts[x] += 1
1693
1694         samfile = pysam.Samfile( "ex1.bam", "rb" )
1695         for read in samfile.fetch():
1696             if not read.is_paired:
1697                 self.assertRaises( ValueError, samfile.mate, read )
1698             elif read.mate_is_unmapped:
1699                 self.assertRaises( ValueError, samfile.mate, read )
1700             else:
1701                 if counts[read.qname] == 1:
1702                     self.assertRaises( ValueError, samfile.mate, read )
1703                 else:
1704                     mate = samfile.mate( read )
1705                     self.assertEqual( read.qname, mate.qname )
1706                     self.assertEqual( read.is_read1, mate.is_read2 )
1707                     self.assertEqual( read.is_read2, mate.is_read1 )
1708                     self.assertEqual( read.pos, mate.mpos )
1709                     self.assertEqual( read.mpos, mate.pos )
1710
1711     def testIndexStats( self ):
1712         '''test if total number of mapped/unmapped reads is correct.'''
1713
1714         samfile = pysam.Samfile( "ex1.bam", "rb" )
1715         self.assertEqual( samfile.mapped, 3235 )
1716         self.assertEqual( samfile.unmapped, 35 )
1717
1718 class TestSamtoolsProxy( unittest.TestCase ):
1719     '''tests for sanity checking access to samtools functions.'''
1720
1721     def testIndex( self ):
1722         self.assertRaises( IOError, pysam.index, "missing_file" )
1723
1724     def testView( self ):
1725         # note that view still echos "open: No such file or directory"
1726         self.assertRaises( pysam.SamtoolsError, pysam.view, "missing_file" )
1727
1728     def testSort( self ):
1729         self.assertRaises( pysam.SamtoolsError, pysam.sort, "missing_file" )
1730
1731 class TestSamfileIndex( unittest.TestCase):
1732     
1733     def testIndex( self ):
1734         samfile = pysam.Samfile( "ex1.bam", "rb" )
1735         index = pysam.IndexedReads( samfile )
1736         index.build()
1737
1738         reads = collections.defaultdict( int )
1739
1740         for read in samfile: reads[read.qname] += 1
1741             
1742         for qname, counts in reads.items():
1743             found = list(index.find( qname ))
1744             self.assertEqual( len(found), counts )
1745             for x in found: self.assertEqual( x.qname, qname )
1746             
1747
1748 if __name__ == "__main__":
1749     # build data files
1750     print ("building data files")
1751     subprocess.call( "make", shell=True)
1752     print ("starting tests")
1753     unittest.main()
1754     print ("completed tests")