X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=pysam.git;a=blobdiff_plain;f=pysam%2FVCF.py;fp=pysam%2FVCF.py;h=94d63f8558f6e24750dfe71fc942e7676e84d46a;hp=0000000000000000000000000000000000000000;hb=68c074fc81858150ca7447a88e731eec96be6378;hpb=768881ffd9d33e3c5fa00dd9ea6f488f4f0700b3 diff --git a/pysam/VCF.py b/pysam/VCF.py new file mode 100644 index 0000000..94d63f8 --- /dev/null +++ b/pysam/VCF.py @@ -0,0 +1,1087 @@ +# +# Code to read, write and edit VCF files +# +# VCF lines are encoded as a dictionary with these keys (note: all lowercase): +# 'chrom': string +# 'pos': integer +# 'id': string +# 'ref': string +# 'alt': list of strings +# 'qual': integer +# 'filter': None (missing value), or list of keys (strings); empty list parsed as ["PASS"] +# 'info': dictionary of values (see below) +# 'format': list of keys (strings) +# sample keys: dictionary of values (see below) +# +# The sample keys are accessible through vcf.getsamples() +# +# A dictionary of values contains value keys (defined in ##INFO or ##FORMAT lines) which map +# to a list, containign integers, floats, strings, or characters. Missing values are replaced +# by a particular value, often -1 or . +# +# Genotypes are not stored as a string, but as a list of 1 or 3 elements (for haploid and diploid samples), +# the first (and last) the integer representing an allele, and the second the separation character. +# Note that there is just one genotype per sample, but for consistency the single element is stored in a list. +# +# Header lines other than ##INFO, ##FORMAT and ##FILTER are stored as (key, value) pairs and are accessible +# through getheader() +# +# The VCF class can be instantiated with a 'regions' variable consisting of tuples (chrom,start,end) encoding +# 0-based half-open segments. Only variants with a position inside the segment will be parsed. A regions +# parser is available under parse_regions. +# +# When instantiated, a reference can be passed to the VCF class. This may be any class that supports a +# fetch(chrom, start, end) method. +# +# +# +# NOTE: the position that is returned to Python is 0-based, NOT 1-based as in the VCF file. +# +# +# +# TODO: +# only v4.0 writing is complete; alleles are not converted to v3.3 format +# + +from collections import namedtuple, defaultdict +from operator import itemgetter +import sys, re, copy, bisect + +import pysam + +gtsRegEx = re.compile("[|/\\\\]") +alleleRegEx = re.compile('^[ACGTN]+$') + +# Utility function. Uses 0-based coordinates +def get_sequence(chrom, start, end, fa): + # obtain sequence from .fa file, without truncation + if end<=start: return "" + if not fa: return "N"*(end-start) + if start<0: return "N"*(-start) + get_sequence(chrom, 0, end, fa).upper() + sequence = fa.fetch(chrom, start, end).upper() + if len(sequence) < end-start: sequence += "N"*(end-start-len(sequence)) + return sequence + +# Utility function. Parses a region string +def parse_regions( string ): + result = [] + for r in string.split(','): + elts = r.split(':') + chrom, start, end = elts[0], 0, 3000000000 + if len(elts)==1: pass + elif len(elts)==2: + if len(elts[1])>0: + ielts = elts[1].split('-') + if len(ielts) != 2: ValueError("Don't understand region string '%s'" % r) + try: start, end = int(ielts[0])-1, int(ielts[1]) + except: raise ValueError("Don't understand region string '%s'" % r) + else: + raise ValueError("Don't understand region string '%s'" % r) + result.append( (chrom,start,end) ) + return result + + +FORMAT = namedtuple('FORMAT','id numbertype number type description missingvalue') + +########################################################################################################### +# +# New class +# +########################################################################################################### + +class VCFRecord: + '''vcf record. + + initialized from data and vcf meta + ''' + + data = None + vcf = None + + def __init__(self, data, vcf): + self.data, self.vcf = data, vcf + + if len(data) != len(self.vcf._samples): + self.error(str(data), + self.BAD_NUMBER_OF_COLUMNS, + "expected %s for %s samples (%s), got %s" % \ + (len(self.vcf._samples), + len(self.vcf._samples), + self.vcf._samples, + len(data))) + + property contig: + def contig( self ): return self.data[0] + + property pos: + def __get__( self ): + return self.data.pos + + property id: + def __get__( self ): return self.data[2] + + property ref: + def __get__(self ): + # note: gerton substitutes reference if it can be fixed. + return self.data[3].upper() + + property alt: + def __get__(self): + # convert v3.3 to v4.0 alleles below + alt = self.data[4] + if alt == ".": alt = [] + else: alt = alt.upper().split(',') + return alt + + property qual: + def __get__(self): + qual = self.data[5] + if qual == ".": qual = -1 + else: + try: qual = float(qual) + except: self.error(line,self.QUAL_NOT_NUMERICAL) + + property filter: + def __get__(self): + # postpone checking that filters exist. Encode missing filter or no filtering as empty list + if cols[6] == "." or cols[6] == "PASS" or cols[6] == "0": filter = [] + else: filter = cols[6].split(';') + + return filter + + property info: + def __get__(self): + col = self.data[7] + # dictionary of keys, and list of values + info = {} + if col != ".": + for blurp in col.split(';'): + elts = blurp.split('=') + if len(elts) == 1: v = None + elif len(elts) == 2: v = elts[1] + else: self.error(str(self.data),self.ERROR_INFO_STRING) + info[elts[0]] = self.parse_formatdata(elts[0], v, self.vcf._info, line) + return info + + property format: + def __get__(self): + return self.data[8].split(':') + + def __getitem__(self, key): + + # parse sample columns + values = self.data[self.vcf._sample2column[key]].split(':') + alt = self.alt + format = self.format + + if len(values) > len(format): + self.error(line,self.BAD_NUMBER_OF_VALUES,"(found %s values in element %s; expected %s)" % (len(values),sample,len(format))) + + result = {} + for idx in range(len(format)): + expected = self.vcf.get_expected(format[idx], self.vcf._format, alt) + if idx < len(values): value = values[idx] + else: + if expected == -1: value = "." + else: value = ",".join(["."]*expected) + + result[format[idx]] = self.vcf.parse_formatdata(format[idx], value, self.vcf._format, line) + if expected != -1 and len(result[format[idx]]) != expected: + self.error(str(self.data),self.BAD_NUMBER_OF_PARAMETERS, + "id=%s, expected %s parameters, got %s" % (format[idx],expected,result[format[idx]])) + if len(result[format[idx]] ) < expected: result[format[idx]] += [result[format[idx]][-1]]*(expected-len(result[format[idx]])) + result[format[idx]] = result[format[idx]][:expected] + + return result + + def __str__(self): + return str(self.data) + +class VCF: + + # types + NT_UNKNOWN = 0 + NT_NUMBER = 1 + NT_ALLELES = 2 + NT_NR_ALLELES = 3 + NT_GENOTYPES = 4 + NT_PHASED_GENOTYPES = 5 + + _errors = { 0:"UNKNOWN_FORMAT_STRING:Unknown file format identifier", + 1:"BADLY_FORMATTED_FORMAT_STRING:Formatting error in the format string", + 2:"BADLY_FORMATTED_HEADING:Did not find 9 required headings (CHROM, POS, ..., FORMAT) %s", + 3:"BAD_NUMBER_OF_COLUMNS:Wrong number of columns found (%s)", + 4:"POS_NOT_NUMERICAL:Position column is not numerical", + 5:"UNKNOWN_CHAR_IN_REF:Unknown character in reference field", + 6:"V33_BAD_REF:Reference should be single-character in v3.3 VCF", + 7:"V33_BAD_ALLELE:Cannot interpret allele for v3.3 VCF", + 8:"POS_NOT_POSITIVE:Position field must be >0", + 9:"QUAL_NOT_NUMERICAL:Quality field must be numerical, or '.'", + 10:"ERROR_INFO_STRING:Error while parsing info field", + 11:"ERROR_UNKNOWN_KEY:Unknown key (%s) found in formatted field (info; format; or filter)", + 12:"ERROR_FORMAT_NOT_NUMERICAL:Expected integer or float in formatted field; got %s", + 13:"ERROR_FORMAT_NOT_CHAR:Eexpected character in formatted field; got string", + 14:"FILTER_NOT_DEFINED:Identifier (%s) in filter found which was not defined in header", + 15:"FORMAT_NOT_DEFINED:Identifier (%s) in format found which was not defined in header", + 16:"BAD_NUMBER_OF_VALUES:Found too many of values in sample column (%s)", + 17:"BAD_NUMBER_OF_PARAMETERS:Found unexpected number of parameters (%s)", + 18:"BAD_GENOTYPE:Cannot parse genotype (%s)", + 19:"V40_BAD_ALLELE:Bad allele found for v4.0 VCF (%s)", + 20:"MISSING_REF:Reference allele missing", + 21:"V33_UNMATCHED_DELETION:Deleted sequence does not match reference (%s)", + 22:"V40_MISSING_ANGLE_BRACKETS:Format definition is not deliminted by angular brackets", + 23:"FORMAT_MISSING_QUOTES:Description field in format definition is not surrounded by quotes", + 24:"V40_FORMAT_MUST_HAVE_NAMED_FIELDS:Fields in v4.0 VCF format definition must have named fields", + 25:"HEADING_NOT_SEPARATED_BY_TABS:Heading line appears separated by spaces, not tabs", + 26:"WRONG_REF:Wrong reference %s", + 27:"ERROR_TRAILING_DATA:Numerical field ('%s') has semicolon-separated trailing data", + 28:"BAD_CHR_TAG:Error calculating chr tag for %s", + 29:"ZERO_LENGTH_ALLELE:Found zero-length allele", + 30:"MISSING_INDEL_ALLELE_REF_BASE:Indel alleles must begin with single reference base" + } + + # tag-value pairs; tags are not unique; does not include fileformat, INFO, FILTER or FORMAT fields + _header = [] + + # version number; 33=v3.3; 40=v4.0 + _version = 40 + + # info, filter and format data + _info = {} + _filter = {} + _format = {} + + # header; and required columns + _required = ["CHROM","POS","ID","REF","ALT","QUAL","FILTER","INFO","FORMAT"] + _samples = [] + + # control behaviour + _ignored_errors = set([11]) # ERROR_UNKNOWN_KEY + _warn_errors = set([]) + _leftalign = False + + # reference sequence + _reference = None + + # regions to include; None includes everything + _regions = None + + # statefull stuff + _lineno = -1 + _line = None + _lines = None + + def __init__(self, _copy=None, reference=None, regions=None, lines=None, leftalign=False): + # make error identifiers accessible by name + for id in self._errors.keys(): self.__dict__[self._errors[id].split(':')[0]] = id + if _copy != None: + self._leftalign = _copy._leftalign + self._header = _copy._header[:] + self._version = _copy._version + self._info = copy.deepcopy(_copy._info) + self._filter = copy.deepcopy(_copy._filter) + self._format = copy.deepcopy(_copy._format) + self._samples = _copy._samples[:] + self._sample2column = copy.deepcopy(_copy._sample2column) + self._ignored_errors = copy.deepcopy(_copy._ignored_errors) + self._warn_errors = copy.deepcopy(_copy._warn_errors) + self._reference = _copy._reference + self._regions = _copy._regions + if reference: self._reference = reference + if regions: self._regions = regions + if leftalign: self._leftalign = leftalign + self._lines = lines + + def error(self,line,error,opt=None): + if error in self._ignored_errors: return + errorlabel, errorstring = self._errors[error].split(':') + if opt: errorstring = errorstring % opt + errwarn = ["Error","Warning"][error in self._warn_errors] + sys.stderr.write("Line %s: '%s'\n%s %s: %s\n" % (self._lineno,line,errwarn,errorlabel,errorstring)) + if error in self._warn_errors: return + raise ValueError(errorstring) + + def parse_format(self,line,format,filter=False): + if self._version >= 40: + if not format.startswith('<'): + self.error(line,self.V40_MISSING_ANGLE_BRACKETS) + format = "<"+format + if not format.endswith('>'): + self.error(line,self.V40_MISSING_ANGLE_BRACKETS) + format += ">" + format = format[1:-1] + data = {'id':None,'number':None,'type':None,'descr':None} + idx = 0 + while len(format.strip())>0: + elts = format.strip().split(',') + first, rest = elts[0], ','.join(elts[1:]) + if first.find('=') == -1 or (first.find('"')>=0 and first.find('=') > first.find('"')): + if self._version >= 40: self.error(line,self.V40_FORMAT_MUST_HAVE_NAMED_FIELDS) + if idx == 4: self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) + first = ["ID=","Number=","Type=","Description="][idx] + first + if first.startswith('ID='): data['id'] = first.split('=')[1] + elif first.startswith('Number='): data['number'] = first.split('=')[1] + elif first.startswith('Type='): data['type'] = first.split('=')[1] + elif first.startswith('Description='): + elts = format.split('"') + if len(elts)<3: + self.error(line,self.FORMAT_MISSING_QUOTES) + elts = first.split('=') + [rest] + data['descr'] = elts[1] + rest = '"'.join(elts[2:]) + if rest.startswith(','): rest = rest[1:] + else: + self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) + format = rest + idx += 1 + if filter and idx==1: idx=3 # skip number and type fields for FILTER format strings + if not data['id']: self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) + if not data['descr']: + self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) + data['descr'] = '' + if not data['type'] and not data['number']: + # fine, ##filter format + return FORMAT(data['id'],self.NT_NUMBER,0,"Flag",data['descr'],'.') + if not data['type'] in ["Integer","Float","Character","String","Flag"]: + self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) + # I would like a missing-value field, but it isn't there + if data['type'] in ['Integer','Float']: data['missing'] = None # Do NOT use arbitrary int/float as missing value + else: data['missing'] = '.' + if not data['number']: self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) + try: + n = int(data['number']) + t = self.NT_NUMBER + except ValueError: + n = -1 + if data['number'] == '.': t = self.NT_UNKNOWN + elif data['number'] == '#alleles': t = self.NT_ALLELES + elif data['number'] == '#nonref_alleles': t = self.NT_NR_ALLELES + elif data['number'] == '#genotypes': t = self.NT_GENOTYPES + elif data['number'] == '#phased_genotypes': t = self.NT_PHASED_GENOTYPES + else: + self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) + return FORMAT(data['id'],t,n,data['type'],data['descr'],data['missing']) + + + def format_format( self, fmt, filter=False ): + values = [('ID',fmt.id)] + if fmt.number != None and not filter: + if fmt.numbertype == self.NT_UNKNOWN: nmb = "." + elif fmt.numbertype == self.NT_NUMBER: nmb = str(fmt.number) + elif fmt.numbertype == self.NT_ALLELES: nmb = "#alleles" + elif fmt.numbertype == self.NT_NR_ALLELES: nmb = "#nonref_alleles" + elif fmt.numbertype == self.NT_GENOTYPES: nmb = "#genotypes" + elif fmt.numbertype == self.NT_PHASED_GENOTYPES: nmb = "#phased_genotypes" + else: + raise ValueError("Unknown number type encountered: %s" % fmt.numbertype) + values.append( ('Number',nmb) ) + values.append( ('Type', fmt.type) ) + values.append( ('Description', '"' + fmt.description + '"') ) + if self._version == 33: + format = ",".join(v for k,v in values) + else: + format = "<" + (",".join( "%s=%s" % (k,v) for (k,v) in values )) + ">" + return format + + def get_expected(self, format, formatdict, alt): + fmt = formatdict[format] + if fmt.numbertype == self.NT_UNKNOWN: return -1 + if fmt.numbertype == self.NT_NUMBER: return fmt.number + if fmt.numbertype == self.NT_ALLELES: return len(alt)+1 + if fmt.numbertype == self.NT_NR_ALLELES: return len(alt) + if fmt.numbertype == self.NT_GENOTYPES: return ((len(alt)+1)*(len(alt)+2)) // 2 + if fmt.numbertype == self.NT_PHASED_GENOTYPES: return (len(alt)+1)*(len(alt)+1) + return 0 + + + def _add_definition(self, formatdict, key, data, line ): + if key in formatdict: return + self.error(line,self.ERROR_UNKNOWN_KEY,key) + if data == None: + formatdict[key] = FORMAT(key,self.NT_NUMBER,0,"Flag","(Undefined tag)",".") + return + if data == []: data = [""] # unsure what type -- say string + if type(data[0]) == type(0.0): + formatdict[key] = FORMAT(key,self.NT_UNKNOWN,-1,"Float","(Undefined tag)",None) + return + if type(data[0]) == type(0): + formatdict[key] = FORMAT(key,self.NT_UNKNOWN,-1,"Integer","(Undefined tag)",None) + return + formatdict[key] = FORMAT(key,self.NT_UNKNOWN,-1,"String","(Undefined tag)",".") + + + # todo: trim trailing missing values + def format_formatdata( self, data, format, key=True, value=True, separator=":" ): + output, sdata = [], [] + if type(data) == type([]): # for FORMAT field, make data with dummy values + d = {} + for k in data: d[k] = [] + data = d + # convert missing values; and silently add definitions if required + for k in data: + self._add_definition( format, k, data[k], "(output)" ) + for idx,v in enumerate(data[k]): + if v == format[k].missingvalue: data[k][idx] = "." + # make sure GT comes first; and ensure fixed ordering; also convert GT data back to string + for k in data: + if k != 'GT': sdata.append( (k,data[k]) ) + sdata.sort() + if 'GT' in data: + sdata = [('GT',map(self.convertGTback,data['GT']))] + sdata + for k,v in sdata: + if v == []: v = None + if key and value: + if v != None: output.append( k+"="+','.join(map(str,v)) ) + else: output.append( k ) + elif key: output.append(k) + elif value: + if v != None: output.append( ','.join(map(str,v)) ) + else: output.append( "." ) # should not happen + # snip off trailing missing data + while len(output) > 1: + last = output[-1].replace(',','').replace('.','') + if len(last)>0: break + output = output[:-1] + return separator.join(output) + + + def enter_default_format(self): + for f in [FORMAT('GT',self.NT_NUMBER,1,'String','Genotype','.'), + FORMAT('GQ',self.NT_NUMBER,1,'Integer','Genotype Quality',-1), + FORMAT('DP',self.NT_NUMBER,1,'Integer','Read depth at this position for this sample',-1), + FORMAT('HQ',self.NT_UNKNOWN,-1,'Integer','Haplotype Quality',-1), # unknown number, since may be haploid + FORMAT('FT',self.NT_NUMBER,1,'String','Sample Genotype Filter','.')]: + if f.id not in self._format: + self._format[f.id] = f + + def parse_header( self, line ): + assert line.startswith('##') + elts = line[2:].split('=') + key = elts[0].strip() + value = '='.join(elts[1:]).strip() + if key == "fileformat": + if value == "VCFv3.3": + self._version = 33 + elif value == "VCFv4.0": + self._version = 40 + elif value == "VCFv4.1": + self._version = 41 + else: + self.error(line,self.UNKNOWN_FORMAT_STRING) + elif key == "INFO": + f = self.parse_format(line, value) + self._info[ f.id ] = f + elif key == "FILTER": + f = self.parse_format(line, value, filter=True) + self._filter[ f.id ] = f + elif key == "FORMAT": + f = self.parse_format(line, value) + self._format[ f.id ] = f + else: + # keep other keys in the header field + self._header.append( (key,value) ) + + + def write_header( self, stream ): + stream.write("##fileformat=VCFv%s.%s\n" % (self._version // 10, self._version % 10)) + for key,value in self._header: stream.write("##%s=%s\n" % (key,value)) + for var,label in [(self._info,"INFO"),(self._filter,"FILTER"),(self._format,"FORMAT")]: + for f in var.itervalues(): stream.write("##%s=%s\n" % (label,self.format_format(f,filter=(label=="FILTER")))) + + + def parse_heading( self, line ): + assert line.startswith('#') + assert not line.startswith('##') + headings = line[1:].split('\t') + if len(headings)==1 and len(line[1:].split()) >= 9: + self.error(line,self.HEADING_NOT_SEPARATED_BY_TABS) + headings = line[1:].split() + + for i,s in enumerate(self._required): + + if len(headings)<=i or headings[i] != s: + + if len(headings) <= i: + err = "(%sth entry not found)" % (i+1) + else: + err = "(found %s, expected %s)" % (headings[i],s) + + #self.error(line,self.BADLY_FORMATTED_HEADING,err) + + # allow FORMAT column to be absent + if len(headings) == 8: + headings.append("FORMAT") + else: + self.error(line,self.BADLY_FORMATTED_HEADING,err) + + self._samples = headings[9:] + self._sample2column = dict( [(y,x) for x,y in enumerate( self._samples ) ] ) + + def write_heading( self, stream ): + stream.write("#" + "\t".join(self._required + self._samples) + "\n") + + def convertGT(self, GTstring): + if GTstring == ".": return ["."] + try: + gts = gtsRegEx.split(GTstring) + if len(gts) == 1: return [int(gts[0])] + if len(gts) != 2: raise ValueError() + if gts[0] == "." and gts[1] == ".": return [gts[0],GTstring[len(gts[0]):-len(gts[1])],gts[1]] + return [int(gts[0]),GTstring[len(gts[0]):-len(gts[1])],int(gts[1])] + except ValueError: + self.error(self._line,self.BAD_GENOTYPE,GTstring) + return [".","|","."] + + + def convertGTback(self, GTdata): + return ''.join(map(str,GTdata)) + + def parse_formatdata( self, key, value, formatdict, line ): + # To do: check that the right number of values is present + f = formatdict.get(key,None) + if f == None: + self._add_definition(formatdict, key, value, line ) + f = formatdict[key] + if f.type == "Flag": + if value is not None: self.error(line,self.ERROR_FLAG_HAS_VALUE) + return [] + values = value.split(',') + # deal with trailing data in some early VCF files + if f.type in ["Float","Integer"] and len(values)>0 and values[-1].find(';') > -1: + self.error(line,self.ERROR_TRAILING_DATA,values[-1]) + values[-1] = values[-1].split(';')[0] + if f.type == "Integer": + for idx,v in enumerate(values): + try: + if v == ".": values[idx] = f.missingvalue + else: values[idx] = int(v) + except: + self.error(line,self.ERROR_FORMAT_NOT_NUMERICAL,values) + return [0] * len(values) + return values + elif f.type == "String": + self._line = line + if f.id == "GT": values = map( self.convertGT, values ) + return values + elif f.type == "Character": + for v in values: + if len(v) != 1: self.error(line,self.ERROR_FORMAT_NOT_CHAR) + return values + elif f.type == "Float": + for idx,v in enumerate(values): + if v == ".": values[idx] = f.missingvalue + try: return map(float,values) + except: + self.error(line,self.ERROR_FORMAT_NOT_NUMERICAL,values) + return [0.0] * len(values) + else: + # can't happen + self.error(line,self.ERROR_INFO_STRING) + + + def inregion(self, chrom, pos): + if not self._regions: return True + for r in self._regions: + if r[0] == chrom and r[1] <= pos < r[2]: return True + return False + + + def parse_data( self, line, lineparse=False ): + cols = line.split('\t') + if len(cols) != len(self._samples)+9: + # gracefully deal with absent FORMAT column + if len(cols) == 8 and len(self._samples)==0: + cols.append("") + else: + self.error(line, + self.BAD_NUMBER_OF_COLUMNS, + "expected %s for %s samples (%s), got %s" % (len(self._samples)+9, len(self._samples), self._samples, len(cols))) + + chrom = cols[0] + + # get 0-based position + try: pos = int(cols[1])-1 + except: self.error(line,self.POS_NOT_NUMERICAL) + if pos < 0: self.error(line,self.POS_NOT_POSITIVE) + + # implement filtering + if not self.inregion(chrom,pos): return None + + # end of first-pass parse for sortedVCF + if lineparse: return chrom, pos, line + + id = cols[2] + + ref = cols[3].upper() + if ref == ".": + self.error(line,self.MISSING_REF) + if self._version == 33: ref = get_sequence(chrom,pos,pos+1,self._reference) + else: ref = "" + else: + for c in ref: + if c not in "ACGTN": self.error(line,self.UNKNOWN_CHAR_IN_REF) + if "N" in ref: ref = get_sequence(chrom,pos,pos+len(ref),self._reference) + + # make sure reference is sane + if self._reference: + left = max(0,pos-100) + faref_leftflank = get_sequence(chrom,left,pos+len(ref),self._reference) + faref = faref_leftflank[pos-left:] + if faref != ref: self.error(line,self.WRONG_REF,"(reference is %s, VCF says %s)" % (faref,ref)) + ref = faref + + # convert v3.3 to v4.0 alleles below + if cols[4] == ".": alt = [] + else: alt = cols[4].upper().split(',') + + if cols[5] == ".": qual = -1 + else: + try: qual = float(cols[5]) + except: self.error(line,self.QUAL_NOT_NUMERICAL) + + # postpone checking that filters exist. Encode missing filter or no filtering as empty list + if cols[6] == "." or cols[6] == "PASS" or cols[6] == "0": filter = [] + else: filter = cols[6].split(';') + + # dictionary of keys, and list of values + info = {} + if cols[7] != ".": + for blurp in cols[7].split(';'): + elts = blurp.split('=') + if len(elts) == 1: v = None + elif len(elts) == 2: v = elts[1] + else: self.error(line,self.ERROR_INFO_STRING) + info[elts[0]] = self.parse_formatdata(elts[0], v, self._info, line) + + # Gracefully deal with absent FORMAT column + if cols[8] == "": format = [] + else: format = cols[8].split(':') + + # check: all filters are defined + for f in filter: + if f not in self._filter: self.error(line,self.FILTER_NOT_DEFINED, f) + + # check: format fields are defined + for f in format: + if f not in self._format: self.error(line,self.FORMAT_NOT_DEFINED, f) + + # convert v3.3 alleles + if self._version == 33: + if len(ref) != 1: self.error(line,self.V33_BAD_REF) + newalts = [] + have_deletions = False + for a in alt: + if len(a) == 1: a = a + ref[1:] # SNP; add trailing reference + elif a.startswith('I'): a = ref[0] + a[1:] + ref[1:] # insertion just beyond pos; add first and trailing reference + elif a.startswith('D'): # allow D and D + have_deletions = True + try: + l = int(a[1:]) # throws ValueError if sequence + if len(ref) < l: # add to reference if necessary + addns = get_sequence(chrom,pos+len(ref),pos+l,self._reference) + ref += addns + for i,na in enumerate(newalts): newalts[i] = na+addns + a = ref[l:] # new deletion, deleting pos...pos+l + except ValueError: + s = a[1:] + if len(ref) < len(s): # add Ns to reference if necessary + addns = get_sequence(chrom,pos+len(ref),pos+len(s),self._reference) + if not s.endswith(addns) and addns != 'N'*len(addns): + self.error(line,self.V33_UNMATCHED_DELETION, + "(deletion is %s, reference is %s)" % (a,get_sequence(chrom,pos,pos+len(s),self._reference))) + ref += addns + for i,na in enumerate(newalts): newalts[i] = na+addns + a = ref[len(s):] # new deletion, deleting from pos + else: + self.error(line,self.V33_BAD_ALLELE) + newalts.append(a) + alt = newalts + # deletion alleles exist, add dummy 1st reference allele, and account for leading base + if have_deletions: + if pos == 0: + # Petr Danacek's: we can't have a leading nucleotide at (1-based) position 1 + addn = get_sequence(chrom,pos+len(ref),pos+len(ref)+1,self._reference) + ref += addn + alt = [allele+addn for allele in alt] + else: + addn = get_sequence(chrom,pos-1,pos,self._reference) + ref = addn + ref + alt = [addn + allele for allele in alt] + pos -= 1 + else: + # format v4.0 -- just check for nucleotides + for allele in alt: + if not alleleRegEx.match(allele): + self.error(line,self.V40_BAD_ALLELE,allele) + + # check for leading nucleotide in indel calls + for allele in alt: + if len(allele) != len(ref): + if len(allele) == 0: self.error(line,self.ZERO_LENGTH_ALLELE) + if ref[0].upper() != allele[0].upper() and "N" not in (ref[0]+allele[0]).upper(): + self.error(line,self.MISSING_INDEL_ALLELE_REF_BASE) + + # trim trailing bases in alleles + for i in range(1,min(len(ref),min(map(len,alt)))): + if len(set(allele[-1].upper() for allele in alt)) > 1 or ref[-1].upper() != alt[0][-1].upper(): + break + ref, alt = ref[:-1], [allele[:-1] for allele in alt] + + # left-align alleles, if a reference is available + if self._leftalign and self._reference: + while left < pos: + movable = True + for allele in alt: + if len(allele) > len(ref): + longest, shortest = allele, ref + else: + longest, shortest = ref, allele + if len(longest) == len(shortest) or longest[:len(shortest)].upper() != shortest.upper(): + movable = False + if longest[-1].upper() != longest[len(shortest)-1].upper(): + movable = False + if not movable: + break + ref = ref[:-1] + alt = [allele[:-1] for allele in alt] + if min(len(allele) for allele in alt) == 0 or len(ref) == 0: + ref = faref_leftflank[pos-left-1] + ref + alt = [faref_leftflank[pos-left-1] + allele for allele in alt] + pos -= 1 + + # parse sample columns + samples = [] + for sample in cols[9:]: + dict = {} + values = sample.split(':') + if len(values) > len(format): + self.error(line,self.BAD_NUMBER_OF_VALUES,"(found %s values in element %s; expected %s)" % (len(values),sample,len(format))) + for idx in range(len(format)): + expected = self.get_expected(format[idx], self._format, alt) + if idx < len(values): value = values[idx] + else: + if expected == -1: value = "." + else: value = ",".join(["."]*expected) + dict[format[idx]] = self.parse_formatdata(format[idx], value, self._format, line) + if expected != -1 and len(dict[format[idx]]) != expected: + self.error(line,self.BAD_NUMBER_OF_PARAMETERS, + "id=%s, expected %s parameters, got %s" % (format[idx],expected,dict[format[idx]])) + if len(dict[format[idx]] ) < expected: dict[format[idx]] += [dict[format[idx]][-1]]*(expected-len(dict[format[idx]])) + dict[format[idx]] = dict[format[idx]][:expected] + samples.append( dict ) + + # done + d = {'chrom':chrom, + 'pos':pos, # return 0-based position + 'id':id, + 'ref':ref, + 'alt':alt, + 'qual':qual, + 'filter':filter, + 'info':info, + 'format':format} + for key,value in zip(self._samples,samples): + d[key] = value + + return d + + + def write_data(self, stream, data): + required = ['chrom','pos','id','ref','alt','qual','filter','info','format'] + self._samples + for k in required: + if k not in data: raise ValueError("Required key %s not found in data" % str(k)) + if data['alt'] == []: alt = "." + else: alt = ",".join(data['alt']) + if data['filter'] == None: filter = "." + elif data['filter'] == []: + if self._version == 33: filter = "0" + else: filter = "PASS" + else: filter = ';'.join(data['filter']) + if data['qual'] == -1: qual = "." + else: qual = str(data['qual']) + + output = [data['chrom'], + str(data['pos']+1), # change to 1-based position + data['id'], + data['ref'], + alt, + qual, + filter, + self.format_formatdata( data['info'], self._info, separator=";" ), + self.format_formatdata( data['format'], self._format, value=False ) ] + + for s in self._samples: + output.append( self.format_formatdata( data[s], self._format, key=False ) ) + + stream.write( "\t".join(output) + "\n" ) + + def _parse_header(self, stream): + self._lineno = 0 + for line in stream: + self._lineno += 1 + if line.startswith('##'): + self.parse_header( line.strip() ) + elif line.startswith('#'): + self.parse_heading( line.strip() ) + self.enter_default_format() + else: + break + return line + + def _parse(self, line, stream): + if len(line.strip()) > 0: + d = self.parse_data( line.strip() ) + if d: yield d + for line in stream: + self._lineno += 1 + if self._lines and self._lineno > self._lines: raise StopIteration + d = self.parse_data( line.strip() ) + if d: yield d + + ###################################################################################################### + # + # API follows + # + ###################################################################################################### + + def getsamples(self): + """ List of samples in VCF file """ + return self._samples + + def setsamples(self,samples): + """ List of samples in VCF file """ + self._samples = samples + + def getheader(self): + """ List of header key-value pairs (strings) """ + return self._header + + def setheader(self,header): + """ List of header key-value pairs (strings) """ + self._header = header + + def getinfo(self): + """ Dictionary of ##INFO tags, as VCF.FORMAT values """ + return self._info + + def setinfo(self,info): + """ Dictionary of ##INFO tags, as VCF.FORMAT values """ + self._info = info + + def getformat(self): + """ Dictionary of ##FORMAT tags, as VCF.FORMAT values """ + return self._format + + def setformat(self,format): + """ Dictionary of ##FORMAT tags, as VCF.FORMAT values """ + self._format = format + + def getfilter(self): + """ Dictionary of ##FILTER tags, as VCF.FORMAT values """ + return self._filter + + def setfilter(self,filter): + """ Dictionary of ##FILTER tags, as VCF.FORMAT values """ + self._filter = filter + + def setversion(self, version): + if version not in [33,40,41]: raise ValueError("Can only handle v3.3, v4.0 and v4.1 VCF files") + self._version = version + + def setregions(self, regions): + self._regions = regions + + def setreference(self, ref): + """ Provide a reference sequence; a Python class supporting a fetch(chromosome, start, end) method, e.g. PySam.FastaFile """ + self._reference = ref + + def ignoreerror(self, errorstring): + try: self._ignored_errors.add(self.__dict__[errorstring]) + except KeyError: raise ValueError("Invalid error string: %s" % errorstring) + + def warnerror(self, errorstring): + try: self._warn_errors.add(self.__dict__[errorstring]) + except KeyError: raise ValueError("Invalid error string: %s" % errorstring) + + def parse(self, stream): + """ Parse a stream of VCF-formatted lines. Initializes class instance and return generator """ + last_line = self._parse_header(stream) + # now return a generator that does the actual work. In this way the pre-processing is done + # before the first piece of data is yielded + return self._parse(last_line, stream) + + def write(self, stream, datagenerator): + """ Writes a VCF file to a stream, using a data generator (or list) """ + self.write_header(stream) + self.write_heading(stream) + for data in datagenerator: self.write_data(stream,data) + + def writeheader(self, stream): + """ Writes a VCF header """ + self.write_header(stream) + self.write_heading(stream) + + def compare_calls(self, pos1, ref1, alt1, pos2, ref2, alt2): + """ Utility function: compares two calls for equality """ + # a variant should always be assigned to a unique position, one base before + # the leftmost position of the alignment gap. If this rule is implemented + # correctly, the two positions must be equal for the calls to be identical. + if pos1 != pos2: return False + # from both calls, trim rightmost bases when identical. Do this safely, i.e. + # only when the reference bases are not Ns + while len(ref1)>0 and len(alt1)>0 and ref1[-1] == alt1[-1]: + ref1 = ref1[:-1] + alt1 = alt1[:-1] + while len(ref2)>0 and len(alt2)>0 and ref2[-1] == alt2[-1]: + ref2 = ref2[:-1] + alt2 = alt2[:-1] + # now, the alternative alleles must be identical + return alt1 == alt2 + +########################################################################################################### +########################################################################################################### +## API functions added by Andreas +########################################################################################################### + + def connect( self, filename ): + '''connect to tabix file.''' + self.tabixfile = pysam.Tabixfile( filename ) + self._parse_header(self.tabixfile.header) + + def fetch(self, + reference = None, + start = None, + end = None, + region = None ): + """ Parse a stream of VCF-formatted lines. Initializes class instance and return generator """ + + iter = self.tabixfile.fetch( reference, start, end, region, parser = pysam.asVCF() ) + for x in iter: + yield VCFRecord( x, self ) + + def validate( self, record ): + '''validate vcf record. + + returns a validated record. + ''' + + chrom, pos = record.chrom, record.pos + + # check reference + ref = record.ref + if ref == ".": + self.error(str(record),self.MISSING_REF) + if self._version == 33: ref = get_sequence(chrom,pos,pos+1,self._reference) + else: ref = "" + else: + for c in ref: + if c not in "ACGTN": self.error(str(record),self.UNKNOWN_CHAR_IN_REF) + if "N" in ref: ref = get_sequence(chrom, + pos, + pos+len(ref), + self._reference) + + # make sure reference is sane + if self._reference: + left = max(0,self.pos-100) + faref_leftflank = get_sequence(chrom,left,self.pos+len(ref),self._reference) + faref = faref_leftflank[pos-left:] + if faref != ref: self.error(line,self.WRONG_REF,"(reference is %s, VCF says %s)" % (faref,ref)) + ref = faref + + # check: format fields are defined + for f in record.format: + if f not in self._format: self.error(str(record),self.FORMAT_NOT_DEFINED, f) + + # check: all filters are defined + for f in record.filter: + if f not in self._filter: self.error(str(record),self.FILTER_NOT_DEFINED, f) + + # convert v3.3 alleles + if self._version == 33: + if len(ref) != 1: self.error(line,self.V33_BAD_REF) + newalts = [] + have_deletions = False + for a in alt: + if len(a) == 1: a = a + ref[1:] # SNP; add trailing reference + elif a.startswith('I'): a = ref[0] + a[1:] + ref[1:] # insertion just beyond pos; add first and trailing reference + elif a.startswith('D'): # allow D and D + have_deletions = True + try: + l = int(a[1:]) # throws ValueError if sequence + if len(ref) < l: # add to reference if necessary + addns = get_sequence(chrom,pos+len(ref),pos+l,self._reference) + ref += addns + for i,na in enumerate(newalts): newalts[i] = na+addns + a = ref[l:] # new deletion, deleting pos...pos+l + except ValueError: + s = a[1:] + if len(ref) < len(s): # add Ns to reference if necessary + addns = get_sequence(chrom,pos+len(ref),pos+len(s),self._reference) + if not s.endswith(addns) and addns != 'N'*len(addns): + self.error(line,self.V33_UNMATCHED_DELETION, + "(deletion is %s, reference is %s)" % (a,get_sequence(chrom,pos,pos+len(s),self._reference))) + ref += addns + for i,na in enumerate(newalts): newalts[i] = na+addns + a = ref[len(s):] # new deletion, deleting from pos + else: + self.error(line,self.V33_BAD_ALLELE) + newalts.append(a) + alt = newalts + # deletion alleles exist, add dummy 1st reference allele, and account for leading base + if have_deletions: + if pos == 0: + # Petr Danacek's: we can't have a leading nucleotide at (1-based) position 1 + addn = get_sequence(chrom,pos+len(ref),pos+len(ref)+1,self._reference) + ref += addn + alt = [allele+addn for allele in alt] + else: + addn = get_sequence(chrom,pos-1,pos,self._reference) + ref = addn + ref + alt = [addn + allele for allele in alt] + pos -= 1 + else: + # format v4.0 -- just check for nucleotides + for allele in alt: + if not alleleRegEx.match(allele): + self.error(line,self.V40_BAD_ALLELE,allele) + + + # check for leading nucleotide in indel calls + for allele in alt: + if len(allele) != len(ref): + if len(allele) == 0: self.error(line,self.ZERO_LENGTH_ALLELE) + if ref[0].upper() != allele[0].upper() and "N" not in (ref[0]+allele[0]).upper(): + self.error(line,self.MISSING_INDEL_ALLELE_REF_BASE) + + # trim trailing bases in alleles + for i in range(1,min(len(ref),min(map(len,alt)))): + if len(set(allele[-1].upper() for allele in alt)) > 1 or ref[-1].upper() != alt[0][-1].upper(): + break + ref, alt = ref[:-1], [allele[:-1] for allele in alt] + + # left-align alleles, if a reference is available + if self._leftalign and self._reference: + while left < pos: + movable = True + for allele in alt: + if len(allele) > len(ref): + longest, shortest = allele, ref + else: + longest, shortest = ref, allele + if len(longest) == len(shortest) or longest[:len(shortest)].upper() != shortest.upper(): + movable = False + if longest[-1].upper() != longest[len(shortest)-1].upper(): + movable = False + if not movable: + break + ref = ref[:-1] + alt = [allele[:-1] for allele in alt] + if min(len(allele) for allele in alt) == 0 or len(ref) == 0: + ref = faref_leftflank[pos-left-1] + ref + alt = [faref_leftflank[pos-left-1] + allele for allele in alt] + pos -= 1 + + + +