PileupSubstitution = collections.namedtuple( "PileupSubstitution",
" ".join( (\
"chromosome",
- "position",
+ "pos",
"reference_base",
- "consensus_base",
+ "genotype",
"consensus_quality",
"snp_quality",
- "rms_mapping_quality",
+ "mapping_quality",
"coverage",
"read_bases",
"base_qualities" ) ) )
PileupIndel = collections.namedtuple( "PileupIndel",
" ".join( (\
"chromosome",
- "position",
+ "pos",
"reference_base",
"genotype",
"consensus_quality",
"snp_quality",
- "rms_mapping_quality",
+ "mapping_quality",
"coverage",
- "first_allelle",
+ "first_allele",
"second_allele",
"reads_first",
"reads_second",
yield PileupSubstitution( *[x(y) for x,y in zip(conv_subst,d) ] )
except TypeError:
raise pysam.SamtoolsError( "parsing error in line: `%s`" % line)
+
+ENCODE_GENOTYPE = {
+ 'A': 'A', 'C': 'C', 'G': 'G', 'T': 'T',
+ 'AA': 'A', 'CC': 'C', 'GG': 'G', 'TT': 'T', 'UU': 'U',
+ 'AG': 'r', 'GA': 'R',
+ 'CT': 'y', 'TC': 'Y',
+ 'AC': 'm', 'CA': 'M',
+ 'GT': 'k', 'TG': 'K',
+ 'CG': 's', 'GC': 'S',
+ 'AT': 'w', 'TA': 'W',
+ }
+
+DECODE_GENOTYPE = {
+ 'A': 'AA',
+ 'C': 'CC',
+ 'G': 'GG',
+ 'T': 'TT',
+ 'r': 'AG', 'R': 'AG',
+ 'y': 'CT', 'Y': 'CT',
+ 'm': 'AC', 'M': 'AC',
+ 'k': 'GT', 'K': 'GT',
+ 's': 'CG', 'S': 'CG',
+ 'w': 'AT', 'W': 'AT',
+ }
+
+##------------------------------------------------------------
+def encodeGenotype( code ):
+ '''encode genotypes like GG, GA into a one-letter code.
+ The returned code is lower case if code[0] < code[1], otherwise
+ it is uppercase.
+ '''
+ return ENCODE_GENOTYPE[ code.upper() ]
+
+def decodeGenotype( code ):
+ '''decode single letter genotypes like m, M into two letters.
+ This is the reverse operation to :meth:`encodeGenotype`.
+ '''
+ return DECODE_GENOTYPE[ code ]
+
+def translateIndelGenotypeFromVCF( vcf_genotypes, ref ):
+ '''translate indel from vcf to pileup format.'''
+
+ # indels
+ def getPrefix( s1, s2 ):
+ '''get common prefix of strings s1 and s2.'''
+ n = min( len( s1), len( s2 ) )
+ for x in range( n ):
+ if s1[x] != s2[x]: return s1[:x]
+ return s1[:n]
+
+ def getSuffix( s1, s2 ):
+ '''get common sufix of strings s1 and s2.'''
+ n = min( len( s1), len( s2 ) )
+ if s1[-1] != s2[-1]: return ""
+ for x in range( -2, -n - 1, -1 ):
+ if s1[x] != s2[x]: return s1[x+1:]
+ return s1[-n:]
+
+ def getGenotype( variant, ref ):
+
+ if variant == ref: return "*", 0
+
+ if len(ref) > len(variant):
+ # is a deletion
+ if ref.startswith(variant):
+ return "-%s" % ref[len(variant):], len(variant) - 1
+ elif ref.endswith( variant ):
+ return "-%s" % ref[:-len(variant)], -1
+ else:
+ prefix = getPrefix( ref, variant )
+ suffix = getSuffix( ref, variant )
+ shared = len(prefix) + len(suffix) - len(variant)
+ # print "-", prefix, suffix, ref, variant, shared, len(prefix), len(suffix), len(ref)
+ if shared < 0:
+ raise ValueError()
+ return "-%s" % ref[len(prefix):-(len(suffix)-shared)], len(prefix) - 1
+
+ elif len(ref) < len(variant):
+ # is an insertion
+ if variant.startswith(ref):
+ return "+%s" % variant[len(ref):], len(ref) - 1
+ elif variant.endswith(ref):
+ return "+%s" % variant[:len(ref)], 0
+ else:
+ prefix = getPrefix( ref, variant )
+ suffix = getSuffix( ref, variant )
+ shared = len(prefix) + len(suffix) - len(ref)
+ if shared < 0:
+ raise ValueError()
+
+ return "+%s" % variant[len(prefix):-(len(suffix)-shared)], len(prefix)
+ else:
+ assert 0, "snp?"
+
+ # in pileup, the position refers to the base
+ # after the coordinate, hence subtract 1
+ #pos -= 1
+
+ genotypes, offsets = [], []
+ is_error = True
+
+ for variant in vcf_genotypes:
+ try:
+ g, offset = getGenotype( variant, ref )
+ except ValueError:
+ break
+
+ genotypes.append( g )
+ if g != "*": offsets.append( offset )
+
+ else:
+ is_error = False
+
+ if is_error:
+ raise ValueError()
+
+ assert len(set(offsets )) == 1, "multiple offsets for indel"
+ offset = offsets[0]
+
+ genotypes = "/".join( genotypes )
+ return genotypes, offset
+
+def vcf2pileup( vcf, sample ):
+ '''convert vcf record to pileup record.'''
+
+ chromosome = vcf.contig
+ pos = vcf.pos
+ reference = vcf.ref
+ allelles = [reference] + vcf.alt
+
+ data = vcf[sample]
+
+ # get genotype
+ genotypes = data["GT"]
+ if len(genotypes) > 1:
+ raise ValueError( "only single genotype per position, %s" % (str(vcf)))
+
+ genotypes = genotypes[0]
+
+ # not a variant
+ if genotypes[0] == ".": return None
+
+ genotypes = [ allelles[int(x)] for x in genotypes if x != "/" ]
+
+ # snp_quality is "genotype quality"
+ snp_quality = consensus_quality = data.get( "GQ", [0])[0]
+ mapping_quality = vcf.info.get( "MQ", [0])[0]
+ coverage = data.get( "DP", 0)
+
+ if len(reference) > 1 or max([len(x) for x in vcf.alt] ) > 1:
+ # indel
+ genotype, offset = translateIndelGenotypeFromVCF( genotypes, reference )
+
+ return PileupIndel( chromosome,
+ pos + offset,
+ "*",
+ genotype,
+ consensus_quality,
+ snp_quality,
+ mapping_quality,
+ coverage,
+ genotype,
+ "<" * len(genotype),
+ 0,
+ 0,
+ 0 )
+
+ else:
+
+ genotype = encodeGenotype( "".join(genotypes) )
+
+
+ read_bases = ""
+ base_qualities = ""
+
+ return PileupSubstitution( chromosome, pos, reference,
+ genotype,
+ consensus_quality,
+ snp_quality,
+ mapping_quality,
+ coverage, read_bases, base_qualities )
+
+
+def iterate_from_vcf( infile, sample ):
+ '''iterate over a vcf-formatted file.
+
+ *infile* can be any iterator over a lines.
+
+ The function yields named tuples of the type :class:`pysam.Pileup.PileupSubstitution`
+ or :class:`pysam.Pileup.PileupIndel`.
+
+ Positions without a snp will be skipped.
+
+ This method is wasteful and written to support same
+ legacy code that expects samtools pileup output.
+
+ Better use the vcf parser directly.
+
+ '''
+
+
+ vcf = pysam.VCF()
+ vcf.connect( infile )
+
+ if sample not in vcf.getsamples():
+ raise KeyErorr( "sample %s not vcf file" )
+
+ for row in vcf.fetch():
+ result = vcf2pileup( row, sample )
+ if result: yield result
+