self.stderr_save = Outs( sys.stderr.fileno() )
self.stderr_save.setfd( self.stderr_h )
+ def readAndRelease( self ):
+ self.stderr_save.restore()
+ lines = []
+ if os.path.exists(self.stderr_f):
+ lines = open( self.stderr_f, "r" ).readlines()
+ os.remove( self.stderr_f )
+ return lines
+
def release(self):
self.stderr_save.restore()
if os.path.exists(self.stderr_f):
else:
# build header from a target names and lengths
- assert referencenames and referencelengths, "either supply options `template`, `header` or both `refernencenames` and `referencelengths` for writing"
+ assert referencenames and referencelengths, "either supply options `template`, `header` or both `referencenames` and `referencelengths` for writing"
assert len(referencenames) == len(referencelengths), "unequal names and lengths of reference sequences"
# allocate and fill header
store = StderrStore()
self.samfile = samopen( filename, mode, NULL )
- store.release()
+ result = store.readAndRelease()
+ # test for specific messages as open also outputs status messages
+ # that can be ignored.
+ if "[bam_header_read] invalid BAM binary header (this is not a BAM file).\n" in result:
+ raise ValueError( "invalid BAM binary header (is this a BAM file?)" )
+ elif '[samopen] no @SQ lines in the header.\n' in result:
+ raise ValueError( "no @SQ lines in the header (is this a SAM file?)")
if self.samfile == NULL:
raise IOError("could not open file `%s`" % filename )
def getrname( self, tid ):
'''(tid )
convert numerical :term:`tid` into :ref:`reference` name.'''
+ if not self._isOpen(): raise ValueError( "I/O operation on closed file" )
if not 0 <= tid < self.samfile.header.n_targets:
raise ValueError( "tid out of range 0<=tid<%i" % self.samfile.header.n_targets )
return self.samfile.header.target_name[tid]
Note that regions are 1-based, while start,end are python coordinates.
'''
+ # This method's main objective is to translate from a reference to a tid.
+ # For now, it calls bam_parse_region, which is clumsy. Might be worth
+ # implementing it all in pysam (makes use of khash).
cdef int rtid
cdef int rstart
# translate to a region
if reference:
if start != None and end != None:
+ if start > end: raise ValueError( 'invalid region: start (%i) > end (%i)' % (start, end) )
region = "%s:%i-%i" % (reference, start+1, end)
else:
region = reference
if region:
- store = StderrStore()
+ # this function might be called often (multiprocessing)
+ # thus avoid using StderrStore, see issue 46.
bam_parse_region( self.samfile.header, region, &rtid, &rstart, &rend)
- store.release()
if rtid < 0: raise ValueError( "invalid region `%s`" % region )
if rstart > rend: raise ValueError( 'invalid region: start (%i) > end (%i)' % (rstart, rend) )
if not 0 <= rstart < max_pos: raise ValueError( 'start out of range (%i)' % rstart )
def tell( self ):
'''return current file position'''
+ if not self._isOpen():
+ raise ValueError( "I/O operation on closed file" )
if not self.isbam:
raise NotImplementedError("seek only available in bam files")
return the number of bytes written.
'''
+ if not self._isOpen():
+ raise ValueError( "I/O operation on closed file" )
+
return samwrite( self.samfile, read._delegate )
def __enter__(self):
property nreferences:
'''number of :term:`reference` sequences in the file.'''
def __get__(self):
+ if not self._isOpen(): raise ValueError( "I/O operation on closed file" )
return self.samfile.header.n_targets
property references:
"""tuple with the names of :term:`reference` sequences."""
def __get__(self):
+ if not self._isOpen(): raise ValueError( "I/O operation on closed file" )
t = []
for x from 0 <= x < self.samfile.header.n_targets:
t.append( self.samfile.header.target_name[x] )
"""tuple of the lengths of the :term:`reference` sequences. The lengths are in the same order as :attr:`pysam.Samfile.reference`
"""
def __get__(self):
+ if not self._isOpen(): raise ValueError( "I/O operation on closed file" )
t = []
for x from 0 <= x < self.samfile.header.n_targets:
t.append( self.samfile.header.target_len[x] )
property text:
'''full contents of the :term:`sam file` header as a string.'''
def __get__(self):
+ if not self._isOpen(): raise ValueError( "I/O operation on closed file" )
return PyString_FromStringAndSize(self.samfile.header.text, self.samfile.header.l_text)
property header:
a two-level dictionary.
'''
def __get__(self):
+ if not self._isOpen(): raise ValueError( "I/O operation on closed file" )
+
result = {}
if self.samfile.header.text != NULL:
return dest
def __iter__(self):
+ if not self._isOpen(): raise ValueError( "I/O operation on closed file" )
return self
cdef bam1_t * getCurrent( self ):
return self.fastafile != NULL
def __len__(self):
- assert self.fastafile != NULL
+ if self.fastafile == NULL:
+ raise ValueError( "calling len() on closed file" )
+
return faidx_fetch_nseq(self.fastafile)
def _open( self,
if not self._isOpen():
raise ValueError( "I/O operation on closed file" )
- cdef int len, max_pos
+ cdef int length, max_pos
cdef char * seq
max_pos = 2 << 29
# valid ranges are from 0 to 2^29-1
if not 0 <= start < max_pos: raise ValueError( 'start out of range (%i)' % start )
if not 0 <= end < max_pos: raise ValueError( 'end out of range (%i)' % end )
-
- seq = faidx_fetch_seq(self.fastafile, reference,
+ seq = faidx_fetch_seq(self.fastafile,
+ reference,
start,
- end-1, &len)
+ end-1,
+ &length)
else:
# samtools adds a '\0' at the end
- seq = fai_fetch( self.fastafile, region, &len )
+ seq = fai_fetch( self.fastafile, region, &length )
# copy to python
if seq == NULL:
return ""
else:
- result = seq
- # clean up
- free(seq)
-
- return result
+ try:
+ py_seq = PyString_FromStringAndSize(seq, length)
+ finally:
+ free(seq)
+
+ return py_seq
###########################################################################
###########################################################################
def __cinit__(self, Samfile samfile, int tid, int beg, int end ):
- assert samfile._isOpen()
- assert samfile._hasIndex()
+ if not samfile._isOpen():
+ raise ValueError( "I/O operation on closed file" )
+
+ if not samfile._hasIndex():
+ raise ValueError( "no index available for pileup" )
# makes sure that samfile stays alive as long as the
# iterator is alive
def __cinit__(self, Samfile samfile):
- assert samfile._isOpen()
+ if not samfile._isOpen():
+ raise ValueError( "I/O operation on closed file" )
if samfile.isbam: mode = "rb"
else: mode = "r"
property tags:
"""the tags in the AUX field.
+
This property permits convenience access to
the tags. Changes it the returned list will
not update the tags automatically. Instead,
cdef char * ctag
cdef bam1_t * src
cdef uint8_t * s
- cdef char tpe
+ cdef char auxtag[3]
+ cdef char auxtype
src = self._delegate
if src.l_aux == 0: return None
s = bam1_aux( src )
result = []
- ctag = <char*>calloc( 3, sizeof(char) )
- cdef int x
+ auxtag[2] = 0
while s < (src.data + src.data_len):
# get tag
- ctag[0] = s[0]
- ctag[1] = s[1]
- pytag = ctag
-
+ auxtag[0] = s[0]
+ auxtag[1] = s[1]
s += 2
+ auxtype = s[0]
- # convert type - is there a better way?
- ctag[0] = s[0]
- ctag[1] = 0
- pytype = ctag
- # get type and value
- # how do I do char literal comparison in cython?
- # the code below works (i.e, is C comparison)
- tpe = toupper(s[0])
- if tpe == 'S':
+ if auxtype in ('c', 'C'):
value = <int>bam_aux2i(s)
- s += 2
- elif tpe == 'I':
+ s += 1
+ elif auxtype in ('s', 'S'):
value = <int>bam_aux2i(s)
+ s += 2
+ elif auxtype in ('i', 'I'):
+ value = <float>bam_aux2i(s)
s += 4
- elif tpe == 'F':
+ elif auxtype == 'f':
value = <float>bam_aux2f(s)
s += 4
- elif tpe == 'D':
+ elif auxtype == 'd':
value = <double>bam_aux2d(s)
s += 8
- elif tpe == 'C':
- value = <int>bam_aux2i(s)
- s += 1
- elif tpe == 'A':
- # there might a more efficient way
- # to convert a char into a string
+ elif auxtype == 'A':
value = "%c" % <char>bam_aux2A(s)
s += 1
- elif tpe == 'Z':
+ elif auxtype in ('Z', 'H'):
value = <char*>bam_aux2Z(s)
# +1 for NULL terminated string
s += len(value) + 1
-
- # skip over type
+ #
s += 1
+
+ result.append( (auxtag, value) )
- # ignore pytype
- result.append( (pytag, value) )
-
- free( ctag )
return result
def __set__(self, tags):
cdef bam1_t * src
cdef uint8_t * s
cdef uint8_t * new_data
+ cdef char * temp
cdef int guessed_size, control_size
+ cdef int max_size, size, offset
+
src = self._delegate
- cdef int max_size, size
max_size = 4000
-
- # map samtools code to python.struct code and byte size
- buffer = ctypes.create_string_buffer(max_size)
-
offset = 0
- for pytag, value in tags:
- t = type(value)
- if t == types.FloatType:
- fmt = "<cccf"
- elif t == types.IntType:
- if value < 0:
- if value >= -127: fmt, pytype = "<cccb", 'c'
- elif value >= -32767: fmt, pytype = "<ccch", 's'
- elif value < -2147483648: raise ValueError( "integer %i out of range of BAM/SAM specification" % value )
- else: fmt, ctype = "<ccci", 'i'[0]
- else:
- if value <= 255: fmt, pytype = "<cccB", 'C'
- elif value <= 65535: fmt, pytype = "<cccH", 'S'
- elif value > 4294967295: raise ValueError( "integer %i out of range of BAM/SAM specification" % value )
- else: fmt, pytype = "<cccI", 'I'
- else:
- # Note: hex strings (H) are not supported yet
- if len(value) == 1:
- fmt, pytype = "<cccc", 'A'
+
+ if tags != None:
+
+ # map samtools code to python.struct code and byte size
+ buffer = ctypes.create_string_buffer(max_size)
+
+ for pytag, value in tags:
+ t = type(value)
+ if t == types.FloatType:
+ fmt = "<cccf"
+ elif t == types.IntType:
+ if value < 0:
+ if value >= -127: fmt, pytype = "<cccb", 'c'
+ elif value >= -32767: fmt, pytype = "<ccch", 's'
+ elif value < -2147483648: raise ValueError( "integer %i out of range of BAM/SAM specification" % value )
+ else: fmt, ctype = "<ccci", 'i'[0]
+ else:
+ if value <= 255: fmt, pytype = "<cccB", 'C'
+ elif value <= 65535: fmt, pytype = "<cccH", 'S'
+ elif value > 4294967295: raise ValueError( "integer %i out of range of BAM/SAM specification" % value )
+ else: fmt, pytype = "<cccI", 'I'
else:
- fmt, pytype = "<ccc%is" % (len(value)+1), 'Z'
-
- size = struct.calcsize(fmt)
- if offset + size > max_size:
- raise NotImplementedError("tags field too large")
-
- struct.pack_into( fmt,
- buffer,
- offset,
- pytag[0],
- pytag[1],
- pytype,
- value )
- offset += size
+ # Note: hex strings (H) are not supported yet
+ if len(value) == 1:
+ fmt, pytype = "<cccc", 'A'
+ else:
+ fmt, pytype = "<ccc%is" % (len(value)+1), 'Z'
+
+ size = struct.calcsize(fmt)
+ if offset + size > max_size:
+ raise NotImplementedError("tags field too large")
+
+ struct.pack_into( fmt,
+ buffer,
+ offset,
+ pytag[0],
+ pytag[1],
+ pytype,
+ value )
+ offset += size
# delete the old data and allocate new
+ # if offset == 0, the aux field will be
+ # empty
pysam_bam_update( src,
src.l_aux,
offset,
src.l_aux = offset
- if offset == 0: return
+ # copy data only if there is any
+ if offset != 0:
- # get location of new data
- s = bam1_aux( src )
+ # get location of new data
+ s = bam1_aux( src )
- # check if there is direct path from buffer.raw to tmp
- cdef char * temp
- temp = buffer.raw
- memcpy( s, temp, offset )
+ # check if there is direct path from buffer.raw to tmp
+ temp = buffer.raw
+ memcpy( s, temp, offset )
property flag:
"""properties flag"""
("ex1.view", "samtools view ex1.bam > ex1.view"),
("pysam_ex1.view", (pysam.view, "ex1.bam" ) ),
),
+ "view2" :
+ (
+ ("ex1.view", "samtools view -bT ex1.fa -o ex1.view2 ex1.sam"),
+ # note that -o ex1.view2 throws exception.
+ ("pysam_ex1.view", (pysam.view, "-bT ex1.fa -oex1.view2 ex1.sam" ) ),
+ ),
}
# some tests depend on others. The order specifies in which order
# the samtools commands are executed.
- mOrder = ('faidx', 'import', 'index', 'pileup1', 'pileup2', 'glfview', 'view' )
+ mOrder = ('faidx', 'import', 'index', 'pileup1', 'pileup2', 'glfview', 'view', 'view2' )
def setUp( self ):
'''setup tests.
samfile.close()
self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
- def testPileupFromClosedFile( self ):
+ def testClosedFile( self ):
+ '''test that access to a closed samfile raises ValueError.'''
samfile = pysam.Samfile( "ex1.bam", "rb" )
samfile.close()
+ self.assertRaises( ValueError, samfile.fetch, 'chr1', 100, 120)
self.assertRaises( ValueError, samfile.pileup, 'chr1', 100, 120)
+ self.assertRaises( ValueError, samfile.getrname, 0 )
+ self.assertRaises( ValueError, samfile.tell )
+ self.assertRaises( ValueError, samfile.write, None )
+ self.assertRaises( ValueError, samfile.seek, 0 )
+ self.assertRaises( ValueError, getattr, samfile, "nreferences" )
+ self.assertRaises( ValueError, getattr, samfile, "references" )
+ self.assertRaises( ValueError, getattr, samfile, "lengths" )
+ self.assertRaises( ValueError, getattr, samfile, "text" )
+ self.assertRaises( ValueError, getattr, samfile, "header" )
def testBinaryReadFromSamfile( self ):
pass
self.assertRaises( ValueError, samfile.fetch )
self.assertEqual( len(list( samfile.fetch(until_eof = True) )), 3270 )
- def testReadingFromFileWithWrongMode( self ):
-
- assert not os.path.exists( "ex2.bam.bai" )
- samfile = pysam.Samfile( "ex2.bam", "r" )
- self.assertRaises( ValueError, samfile.fetch )
-
class TestIteratorRow(unittest.TestCase):
def setUp(self):
def testOutOfRangeLargeOldFormat(self):
self.assertRaises( ValueError, self.samfile.fetch, "chr1:99999999999999999-999999999999999999" )
+ def testZeroToZero(self):
+ '''see issue 44'''
+ self.assertEqual( len(list(self.samfile.fetch('chr1', 0, 0))), 0)
+
def tearDown(self):
self.samfile.close()
+
+class TestWrongFormat(unittest.TestCase):
+ '''test cases for opening files not in bam/sam format.'''
+
+ def testOpenSamAsBam( self ):
+ self.assertRaises( ValueError, pysam.Samfile, 'ex1.sam', 'rb' )
+
+ def testOpenBamAsSam( self ):
+ self.assertRaises( ValueError, pysam.Samfile, 'ex1.bam', 'r' )
+
+ def testOpenFastaAsSam( self ):
+ self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'r' )
+
+ def testOpenFastaAsBam( self ):
+ self.assertRaises( ValueError, pysam.Samfile, 'ex1.fa', 'rb' )
+
class TestFastaFile(unittest.TestCase):
mSequences = { 'chr1' :
a.mpos=200
a.isize=167
a.qual="1234" * 3
-
+ # todo: create tags
return a
def testUpdate( self ):
return a
+ def testTagParsing( self ):
+ '''test for tag parsing
+
+ see http://groups.google.com/group/pysam-user-group/browse_thread/thread/67ca204059ea465a
+ '''
+ samfile=pysam.Samfile( "ex8.bam","rb" )
+
+ for entry in samfile:
+ before = entry.tags
+ entry.tags = entry.tags
+ after = entry.tags
+ self.assertEqual( after, before )
+
class TestDeNovoConstruction(unittest.TestCase):
'''check BAM/SAM file construction using ex3.sam
os.unlink( tmpfilename )
+
class TestDoubleFetch(unittest.TestCase):
'''check if two iterators on the same bamfile are independent.'''