X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=pysam.git;a=blobdiff_plain;f=pysam%2FVCF.py;fp=pysam%2FVCF.py;h=0000000000000000000000000000000000000000;hp=115aa2ba06c67089467963ba47af61d7698d5cd4;hb=ca46ef4ba4a883c57cea62d5bf1bc021f1185109;hpb=d02fe5283ed7a93a2f76a5d6dc6e37b40c11b9b1 diff --git a/pysam/VCF.py b/pysam/VCF.py deleted file mode 100644 index 115aa2b..0000000 --- a/pysam/VCF.py +++ /dev/null @@ -1,1088 +0,0 @@ -# -# Code to read, write and edit VCF files -# -# VCF lines are encoded as a dictionary with these keys (note: all lowercase): -# 'chrom': string -# 'pos': integer -# 'id': string -# 'ref': string -# 'alt': list of strings -# 'qual': integer -# 'filter': None (missing value), or list of keys (strings); empty list parsed as ["PASS"] -# 'info': dictionary of values (see below) -# 'format': list of keys (strings) -# sample keys: dictionary of values (see below) -# -# The sample keys are accessible through vcf.getsamples() -# -# A dictionary of values contains value keys (defined in ##INFO or ##FORMAT lines) which map -# to a list, containign integers, floats, strings, or characters. Missing values are replaced -# by a particular value, often -1 or . -# -# Genotypes are not stored as a string, but as a list of 1 or 3 elements (for haploid and diploid samples), -# the first (and last) the integer representing an allele, and the second the separation character. -# Note that there is just one genotype per sample, but for consistency the single element is stored in a list. -# -# Header lines other than ##INFO, ##FORMAT and ##FILTER are stored as (key, value) pairs and are accessible -# through getheader() -# -# The VCF class can be instantiated with a 'regions' variable consisting of tuples (chrom,start,end) encoding -# 0-based half-open segments. Only variants with a position inside the segment will be parsed. A regions -# parser is available under parse_regions. -# -# When instantiated, a reference can be passed to the VCF class. This may be any class that supports a -# fetch(chrom, start, end) method. -# -# -# -# NOTE: the position that is returned to Python is 0-based, NOT 1-based as in the VCF file. -# -# -# -# TODO: -# only v4.0 writing is complete; alleles are not converted to v3.3 format -# - -from collections import namedtuple, defaultdict -from operator import itemgetter -import sys, re, copy, bisect - -import pysam - -gtsRegEx = re.compile("[|/\\\\]") -alleleRegEx = re.compile('^[ACGTN]+$') - -# Utility function. Uses 0-based coordinates -def get_sequence(chrom, start, end, fa): - # obtain sequence from .fa file, without truncation - if end<=start: return "" - if not fa: return "N"*(end-start) - if start<0: return "N"*(-start) + get_sequence(chrom, 0, end, fa).upper() - sequence = fa.fetch(chrom, start, end).upper() - if len(sequence) < end-start: sequence += "N"*(end-start-len(sequence)) - return sequence - -# Utility function. Parses a region string -def parse_regions( string ): - result = [] - for r in string.split(','): - elts = r.split(':') - chrom, start, end = elts[0], 0, 3000000000 - if len(elts)==1: pass - elif len(elts)==2: - if len(elts[1])>0: - ielts = elts[1].split('-') - if len(ielts) != 2: ValueError("Don't understand region string '%s'" % r) - try: start, end = int(ielts[0])-1, int(ielts[1]) - except: raise ValueError("Don't understand region string '%s'" % r) - else: - raise ValueError("Don't understand region string '%s'" % r) - result.append( (chrom,start,end) ) - return result - - -FORMAT = namedtuple('FORMAT','id numbertype number type description missingvalue') - -########################################################################################################### -# -# New class -# -########################################################################################################### - -class VCFRecord: - '''vcf record. - - initialized from data and vcf meta - ''' - - data = None - vcf = None - - def __init__(self, data, vcf): - self.data, self.vcf = data, vcf - - if len(data) != len(self.vcf._samples): - self.error(str(data), - self.BAD_NUMBER_OF_COLUMNS, - "expected %s for %s samples (%s), got %s" % \ - (len(self.vcf._samples), - len(self.vcf._samples), - self.vcf._samples, - len(data))) - - property contig: - def contig( self ): return self.data[0] - - property pos: - def __get__( self ): - return self.data.pos - - property id: - def __get__( self ): return self.data[2] - - property ref: - def __get__(self ): - # note: gerton substitutes reference if it can be fixed. - return self.data[3].upper() - - property alt: - def __get__(self): - # convert v3.3 to v4.0 alleles below - alt = self.data[4] - if alt == ".": alt = [] - else: alt = alt.upper().split(',') - return alt - - property qual: - def __get__(self): - qual = self.data[5] - if qual == ".": qual = -1 - else: - try: qual = float(qual) - except: self.error(line,self.QUAL_NOT_NUMERICAL) - - property filter: - def __get__(self): - # postpone checking that filters exist. Encode missing filter or no filtering as empty list - if cols[6] == "." or cols[6] == "PASS" or cols[6] == "0": filter = [] - else: filter = cols[6].split(';') - - return filter - - property info: - def __get__(self): - col = self.data[7] - # dictionary of keys, and list of values - info = {} - if col != ".": - for blurp in col.split(';'): - elts = blurp.split('=') - if len(elts) == 1: v = None - elif len(elts) == 2: v = elts[1] - else: self.error(str(self.data),self.ERROR_INFO_STRING) - info[elts[0]] = self.parse_formatdata(elts[0], v, self.vcf._info, line) - return info - - property format: - def __get__(self): - return self.data[8].split(':') - - def __getitem__(self, key): - - # parse sample columns - values = self.data[self.vcf._sample2column[key]].split(':') - alt = self.alt - format = self.format - - if len(values) > len(format): - self.error(line,self.BAD_NUMBER_OF_VALUES,"(found %s values in element %s; expected %s)" % (len(values),sample,len(format))) - - result = {} - for idx in range(len(format)): - expected = self.vcf.get_expected(format[idx], self.vcf._format, alt) - if idx < len(values): value = values[idx] - else: - if expected == -1: value = "." - else: value = ",".join(["."]*expected) - - result[format[idx]] = self.vcf.parse_formatdata(format[idx], value, self.vcf._format, line) - if expected != -1 and len(result[format[idx]]) != expected: - self.error(str(self.data),self.BAD_NUMBER_OF_PARAMETERS, - "id=%s, expected %s parameters, got %s" % (format[idx],expected,result[format[idx]])) - if len(result[format[idx]] ) < expected: result[format[idx]] += [result[format[idx]][-1]]*(expected-len(result[format[idx]])) - result[format[idx]] = result[format[idx]][:expected] - - return result - - def __str__(self): - return str(self.data) - -class VCF: - - # types - NT_UNKNOWN = 0 - NT_NUMBER = 1 - NT_ALLELES = 2 - NT_NR_ALLELES = 3 - NT_GENOTYPES = 4 - NT_PHASED_GENOTYPES = 5 - - _errors = { 0:"UNKNOWN_FORMAT_STRING:Unknown file format identifier", - 1:"BADLY_FORMATTED_FORMAT_STRING:Formatting error in the format string", - 2:"BADLY_FORMATTED_HEADING:Did not find 9 required headings (CHROM, POS, ..., FORMAT) %s", - 3:"BAD_NUMBER_OF_COLUMNS:Wrong number of columns found (%s)", - 4:"POS_NOT_NUMERICAL:Position column is not numerical", - 5:"UNKNOWN_CHAR_IN_REF:Unknown character in reference field", - 6:"V33_BAD_REF:Reference should be single-character in v3.3 VCF", - 7:"V33_BAD_ALLELE:Cannot interpret allele for v3.3 VCF", - 8:"POS_NOT_POSITIVE:Position field must be >0", - 9:"QUAL_NOT_NUMERICAL:Quality field must be numerical, or '.'", - 10:"ERROR_INFO_STRING:Error while parsing info field", - 11:"ERROR_UNKNOWN_KEY:Unknown key (%s) found in formatted field (info; format; or filter)", - 12:"ERROR_FORMAT_NOT_NUMERICAL:Expected integer or float in formatted field; got %s", - 13:"ERROR_FORMAT_NOT_CHAR:Eexpected character in formatted field; got string", - 14:"FILTER_NOT_DEFINED:Identifier (%s) in filter found which was not defined in header", - 15:"FORMAT_NOT_DEFINED:Identifier (%s) in format found which was not defined in header", - 16:"BAD_NUMBER_OF_VALUES:Found too many of values in sample column (%s)", - 17:"BAD_NUMBER_OF_PARAMETERS:Found unexpected number of parameters (%s)", - 18:"BAD_GENOTYPE:Cannot parse genotype (%s)", - 19:"V40_BAD_ALLELE:Bad allele found for v4.0 VCF (%s)", - 20:"MISSING_REF:Reference allele missing", - 21:"V33_UNMATCHED_DELETION:Deleted sequence does not match reference (%s)", - 22:"V40_MISSING_ANGLE_BRACKETS:Format definition is not deliminted by angular brackets", - 23:"FORMAT_MISSING_QUOTES:Description field in format definition is not surrounded by quotes", - 24:"V40_FORMAT_MUST_HAVE_NAMED_FIELDS:Fields in v4.0 VCF format definition must have named fields", - 25:"HEADING_NOT_SEPARATED_BY_TABS:Heading line appears separated by spaces, not tabs", - 26:"WRONG_REF:Wrong reference %s", - 27:"ERROR_TRAILING_DATA:Numerical field ('%s') has semicolon-separated trailing data", - 28:"BAD_CHR_TAG:Error calculating chr tag for %s", - 29:"ZERO_LENGTH_ALLELE:Found zero-length allele", - 30:"MISSING_INDEL_ALLELE_REF_BASE:Indel alleles must begin with single reference base" - } - - # tag-value pairs; tags are not unique; does not include fileformat, INFO, FILTER or FORMAT fields - _header = [] - - # version number; 33=v3.3; 40=v4.0 - _version = 40 - - # info, filter and format data - _info = {} - _filter = {} - _format = {} - - # header; and required columns - _required = ["CHROM","POS","ID","REF","ALT","QUAL","FILTER","INFO","FORMAT"] - _samples = [] - - # control behaviour - _ignored_errors = set([11]) # ERROR_UNKNOWN_KEY - _warn_errors = set([]) - _leftalign = False - - # reference sequence - _reference = None - - # regions to include; None includes everything - _regions = None - - # statefull stuff - _lineno = -1 - _line = None - _lines = None - - def __init__(self, _copy=None, reference=None, regions=None, lines=None, leftalign=False): - # make error identifiers accessible by name - for id in self._errors.keys(): self.__dict__[self._errors[id].split(':')[0]] = id - if _copy != None: - self._leftalign = _copy._leftalign - self._header = _copy._header[:] - self._version = _copy._version - self._info = copy.deepcopy(_copy._info) - self._filter = copy.deepcopy(_copy._filter) - self._format = copy.deepcopy(_copy._format) - self._samples = _copy._samples[:] - self._sample2column = copy.deepcopy(_copy._sample2column) - self._ignored_errors = copy.deepcopy(_copy._ignored_errors) - self._warn_errors = copy.deepcopy(_copy._warn_errors) - self._reference = _copy._reference - self._regions = _copy._regions - if reference: self._reference = reference - if regions: self._regions = regions - if leftalign: self._leftalign = leftalign - self._lines = lines - - def error(self,line,error,opt=None): - if error in self._ignored_errors: return - errorlabel, errorstring = self._errors[error].split(':') - if opt: errorstring = errorstring % opt - errwarn = ["Error","Warning"][error in self._warn_errors] - sys.stderr.write("Line %s: '%s'\n%s %s: %s\n" % (self._lineno,line,errwarn,errorlabel,errorstring)) - if error in self._warn_errors: return - raise ValueError(errorstring) - - def parse_format(self,line,format,filter=False): - if self._version == 40: - if not format.startswith('<'): - self.error(line,self.V40_MISSING_ANGLE_BRACKETS) - format = "<"+format - if not format.endswith('>'): - self.error(line,self.V40_MISSING_ANGLE_BRACKETS) - format += ">" - format = format[1:-1] - data = {'id':None,'number':None,'type':None,'descr':None} - idx = 0 - while len(format.strip())>0: - elts = format.strip().split(',') - first, rest = elts[0], ','.join(elts[1:]) - if first.find('=') == -1 or (first.find('"')>=0 and first.find('=') > first.find('"')): - if self._version == 40: self.error(line,self.V40_FORMAT_MUST_HAVE_NAMED_FIELDS) - if idx == 4: self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) - first = ["ID=","Number=","Type=","Description="][idx] + first - if first.startswith('ID='): data['id'] = first.split('=')[1] - elif first.startswith('Number='): data['number'] = first.split('=')[1] - elif first.startswith('Type='): data['type'] = first.split('=')[1] - elif first.startswith('Description='): - elts = format.split('"') - if len(elts)<3: - self.error(line,self.FORMAT_MISSING_QUOTES) - elts = first.split('=') + [rest] - data['descr'] = elts[1] - rest = '"'.join(elts[2:]) - if rest.startswith(','): rest = rest[1:] - else: - self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) - format = rest - idx += 1 - if filter and idx==1: idx=3 # skip number and type fields for FILTER format strings - if not data['id']: self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) - if not data['descr']: - self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) - data['descr'] = '' - if not data['type'] and not data['number']: - # fine, ##filter format - return FORMAT(data['id'],self.NT_NUMBER,0,"Flag",data['descr'],'.') - if not data['type'] in ["Integer","Float","Character","String","Flag"]: - self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) - # I would like a missing-value field, but it isn't there - if data['type'] in ['Integer','Float']: data['missing'] = None # Do NOT use arbitrary int/float as missing value - else: data['missing'] = '.' - if not data['number']: self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) - try: - n = int(data['number']) - t = self.NT_NUMBER - except ValueError: - n = -1 - if data['number'] == '.': t = self.NT_UNKNOWN - elif data['number'] == '#alleles': t = self.NT_ALLELES - elif data['number'] == '#nonref_alleles': t = self.NT_NR_ALLELES - elif data['number'] == '#genotypes': t = self.NT_GENOTYPES - elif data['number'] == '#phased_genotypes': t = self.NT_PHASED_GENOTYPES - else: - self.error(line,self.BADLY_FORMATTED_FORMAT_STRING) - return FORMAT(data['id'],t,n,data['type'],data['descr'],data['missing']) - - - def format_format( self, fmt, filter=False ): - values = [('ID',fmt.id)] - if fmt.number != None and not filter: - if fmt.numbertype == self.NT_UNKNOWN: nmb = "." - elif fmt.numbertype == self.NT_NUMBER: nmb = str(fmt.number) - elif fmt.numbertype == self.NT_ALLELES: nmb = "#alleles" - elif fmt.numbertype == self.NT_NR_ALLELES: nmb = "#nonref_alleles" - elif fmt.numbertype == self.NT_GENOTYPES: nmb = "#genotypes" - elif fmt.numbertype == self.NT_PHASED_GENOTYPES: nmb = "#phased_genotypes" - else: - raise ValueError("Unknown number type encountered: %s" % fmt.numbertype) - values.append( ('Number',nmb) ) - values.append( ('Type', fmt.type) ) - values.append( ('Description', '"' + fmt.description + '"') ) - if self._version == 33: - format = ",".join(v for k,v in values) - else: - format = "<" + (",".join( "%s=%s" % (k,v) for (k,v) in values )) + ">" - return format - - def get_expected(self, format, formatdict, alt): - fmt = formatdict[format] - if fmt.numbertype == self.NT_UNKNOWN: return -1 - if fmt.numbertype == self.NT_NUMBER: return fmt.number - if fmt.numbertype == self.NT_ALLELES: return len(alt)+1 - if fmt.numbertype == self.NT_NR_ALLELES: return len(alt) - if fmt.numbertype == self.NT_GENOTYPES: return ((len(alt)+1)*(len(alt)+2)) // 2 - if fmt.numbertype == self.NT_PHASED_GENOTYPES: return (len(alt)+1)*(len(alt)+1) - return 0 - - - def _add_definition(self, formatdict, key, data, line ): - if key in formatdict: return - self.error(line,self.ERROR_UNKNOWN_KEY,key) - if data == None: - formatdict[key] = FORMAT(key,self.NT_NUMBER,0,"Flag","(Undefined tag)",".") - return - if data == []: data = [""] # unsure what type -- say string - if type(data[0]) == type(0.0): - formatdict[key] = FORMAT(key,self.NT_UNKNOWN,-1,"Float","(Undefined tag)",None) - return - if type(data[0]) == type(0): - formatdict[key] = FORMAT(key,self.NT_UNKNOWN,-1,"Integer","(Undefined tag)",None) - return - formatdict[key] = FORMAT(key,self.NT_UNKNOWN,-1,"String","(Undefined tag)",".") - - - # todo: trim trailing missing values - def format_formatdata( self, data, format, key=True, value=True, separator=":" ): - output, sdata = [], [] - if type(data) == type([]): # for FORMAT field, make data with dummy values - d = {} - for k in data: d[k] = [] - data = d - # convert missing values; and silently add definitions if required - for k in data: - self._add_definition( format, k, data[k], "(output)" ) - for idx,v in enumerate(data[k]): - if v == format[k].missingvalue: data[k][idx] = "." - # make sure GT comes first; and ensure fixed ordering; also convert GT data back to string - for k in data: - if k != 'GT': sdata.append( (k,data[k]) ) - sdata.sort() - if 'GT' in data: - sdata = [('GT',map(self.convertGTback,data['GT']))] + sdata - for k,v in sdata: - if v == []: v = None - if key and value: - if v != None: output.append( k+"="+','.join(map(str,v)) ) - else: output.append( k ) - elif key: output.append(k) - elif value: - if v != None: output.append( ','.join(map(str,v)) ) - else: output.append( "." ) # should not happen - # snip off trailing missing data - while len(output) > 1: - last = output[-1].replace(',','').replace('.','') - if len(last)>0: break - output = output[:-1] - return separator.join(output) - - - def enter_default_format(self): - for f in [FORMAT('GT',self.NT_NUMBER,1,'String','Genotype','.'), - FORMAT('GQ',self.NT_NUMBER,1,'Integer','Genotype Quality',-1), - FORMAT('DP',self.NT_NUMBER,1,'Integer','Read depth at this position for this sample',-1), - FORMAT('HQ',self.NT_UNKNOWN,-1,'Integer','Haplotype Quality',-1), # unknown number, since may be haploid - FORMAT('FT',self.NT_NUMBER,1,'String','Sample Genotype Filter','.')]: - if f.id not in self._format: - self._format[f.id] = f - - def parse_header( self, line ): - assert line.startswith('##') - elts = line[2:].split('=') - key = elts[0].strip() - value = '='.join(elts[1:]).strip() - if key == "fileformat": - if value == "VCFv3.3": - self._version = 33 - elif value == "VCFv4.0": - self._version = 40 - elif value == "VCFv4.1": - # AH - for testing - self._version = 40 - else: - self.error(line,self.UNKNOWN_FORMAT_STRING) - elif key == "INFO": - f = self.parse_format(line, value) - self._info[ f.id ] = f - elif key == "FILTER": - f = self.parse_format(line, value, filter=True) - self._filter[ f.id ] = f - elif key == "FORMAT": - f = self.parse_format(line, value) - self._format[ f.id ] = f - else: - # keep other keys in the header field - self._header.append( (key,value) ) - - - def write_header( self, stream ): - stream.write("##fileformat=VCFv%s.%s\n" % (self._version // 10, self._version % 10)) - for key,value in self._header: stream.write("##%s=%s\n" % (key,value)) - for var,label in [(self._info,"INFO"),(self._filter,"FILTER"),(self._format,"FORMAT")]: - for f in var.itervalues(): stream.write("##%s=%s\n" % (label,self.format_format(f,filter=(label=="FILTER")))) - - - def parse_heading( self, line ): - assert line.startswith('#') - assert not line.startswith('##') - headings = line[1:].split('\t') - if len(headings)==1 and len(line[1:].split()) >= 9: - self.error(line,self.HEADING_NOT_SEPARATED_BY_TABS) - headings = line[1:].split() - - for i,s in enumerate(self._required): - - if len(headings)<=i or headings[i] != s: - - if len(headings) <= i: - err = "(%sth entry not found)" % (i+1) - else: - err = "(found %s, expected %s)" % (headings[i],s) - - #self.error(line,self.BADLY_FORMATTED_HEADING,err) - - # allow FORMAT column to be absent - if len(headings) == 8: - headings.append("FORMAT") - else: - self.error(line,self.BADLY_FORMATTED_HEADING,err) - - self._samples = headings[9:] - self._sample2column = dict( [(y,x) for x,y in enumerate( self._samples ) ] ) - - def write_heading( self, stream ): - stream.write("#" + "\t".join(self._required + self._samples) + "\n") - - def convertGT(self, GTstring): - if GTstring == ".": return ["."] - try: - gts = gtsRegEx.split(GTstring) - if len(gts) == 1: return [int(gts[0])] - if len(gts) != 2: raise ValueError() - if gts[0] == "." and gts[1] == ".": return [gts[0],GTstring[len(gts[0]):-len(gts[1])],gts[1]] - return [int(gts[0]),GTstring[len(gts[0]):-len(gts[1])],int(gts[1])] - except ValueError: - self.error(self._line,self.BAD_GENOTYPE,GTstring) - return [".","|","."] - - - def convertGTback(self, GTdata): - return ''.join(map(str,GTdata)) - - def parse_formatdata( self, key, value, formatdict, line ): - # To do: check that the right number of values is present - f = formatdict.get(key,None) - if f == None: - self._add_definition(formatdict, key, value, line ) - f = formatdict[key] - if f.type == "Flag": - if value is not None: self.error(line,self.ERROR_FLAG_HAS_VALUE) - return [] - values = value.split(',') - # deal with trailing data in some early VCF files - if f.type in ["Float","Integer"] and len(values)>0 and values[-1].find(';') > -1: - self.error(line,self.ERROR_TRAILING_DATA,values[-1]) - values[-1] = values[-1].split(';')[0] - if f.type == "Integer": - for idx,v in enumerate(values): - try: - if v == ".": values[idx] = f.missingvalue - else: values[idx] = int(v) - except: - self.error(line,self.ERROR_FORMAT_NOT_NUMERICAL,values) - return [0] * len(values) - return values - elif f.type == "String": - self._line = line - if f.id == "GT": values = map( self.convertGT, values ) - return values - elif f.type == "Character": - for v in values: - if len(v) != 1: self.error(line,self.ERROR_FORMAT_NOT_CHAR) - return values - elif f.type == "Float": - for idx,v in enumerate(values): - if v == ".": values[idx] = f.missingvalue - try: return map(float,values) - except: - self.error(line,self.ERROR_FORMAT_NOT_NUMERICAL,values) - return [0.0] * len(values) - else: - # can't happen - self.error(line,self.ERROR_INFO_STRING) - - - def inregion(self, chrom, pos): - if not self._regions: return True - for r in self._regions: - if r[0] == chrom and r[1] <= pos < r[2]: return True - return False - - - def parse_data( self, line, lineparse=False ): - cols = line.split('\t') - if len(cols) != len(self._samples)+9: - # gracefully deal with absent FORMAT column - if len(cols) == 8 and len(self._samples)==0: - cols.append("") - else: - self.error(line, - self.BAD_NUMBER_OF_COLUMNS, - "expected %s for %s samples (%s), got %s" % (len(self._samples)+9, len(self._samples), self._samples, len(cols))) - - chrom = cols[0] - - # get 0-based position - try: pos = int(cols[1])-1 - except: self.error(line,self.POS_NOT_NUMERICAL) - if pos < 0: self.error(line,self.POS_NOT_POSITIVE) - - # implement filtering - if not self.inregion(chrom,pos): return None - - # end of first-pass parse for sortedVCF - if lineparse: return chrom, pos, line - - id = cols[2] - - ref = cols[3].upper() - if ref == ".": - self.error(line,self.MISSING_REF) - if self._version == 33: ref = get_sequence(chrom,pos,pos+1,self._reference) - else: ref = "" - else: - for c in ref: - if c not in "ACGTN": self.error(line,self.UNKNOWN_CHAR_IN_REF) - if "N" in ref: ref = get_sequence(chrom,pos,pos+len(ref),self._reference) - - # make sure reference is sane - if self._reference: - left = max(0,pos-100) - faref_leftflank = get_sequence(chrom,left,pos+len(ref),self._reference) - faref = faref_leftflank[pos-left:] - if faref != ref: self.error(line,self.WRONG_REF,"(reference is %s, VCF says %s)" % (faref,ref)) - ref = faref - - # convert v3.3 to v4.0 alleles below - if cols[4] == ".": alt = [] - else: alt = cols[4].upper().split(',') - - if cols[5] == ".": qual = -1 - else: - try: qual = float(cols[5]) - except: self.error(line,self.QUAL_NOT_NUMERICAL) - - # postpone checking that filters exist. Encode missing filter or no filtering as empty list - if cols[6] == "." or cols[6] == "PASS" or cols[6] == "0": filter = [] - else: filter = cols[6].split(';') - - # dictionary of keys, and list of values - info = {} - if cols[7] != ".": - for blurp in cols[7].split(';'): - elts = blurp.split('=') - if len(elts) == 1: v = None - elif len(elts) == 2: v = elts[1] - else: self.error(line,self.ERROR_INFO_STRING) - info[elts[0]] = self.parse_formatdata(elts[0], v, self._info, line) - - # Gracefully deal with absent FORMAT column - if cols[8] == "": format = [] - else: format = cols[8].split(':') - - # check: all filters are defined - for f in filter: - if f not in self._filter: self.error(line,self.FILTER_NOT_DEFINED, f) - - # check: format fields are defined - for f in format: - if f not in self._format: self.error(line,self.FORMAT_NOT_DEFINED, f) - - # convert v3.3 alleles - if self._version == 33: - if len(ref) != 1: self.error(line,self.V33_BAD_REF) - newalts = [] - have_deletions = False - for a in alt: - if len(a) == 1: a = a + ref[1:] # SNP; add trailing reference - elif a.startswith('I'): a = ref[0] + a[1:] + ref[1:] # insertion just beyond pos; add first and trailing reference - elif a.startswith('D'): # allow D and D - have_deletions = True - try: - l = int(a[1:]) # throws ValueError if sequence - if len(ref) < l: # add to reference if necessary - addns = get_sequence(chrom,pos+len(ref),pos+l,self._reference) - ref += addns - for i,na in enumerate(newalts): newalts[i] = na+addns - a = ref[l:] # new deletion, deleting pos...pos+l - except ValueError: - s = a[1:] - if len(ref) < len(s): # add Ns to reference if necessary - addns = get_sequence(chrom,pos+len(ref),pos+len(s),self._reference) - if not s.endswith(addns) and addns != 'N'*len(addns): - self.error(line,self.V33_UNMATCHED_DELETION, - "(deletion is %s, reference is %s)" % (a,get_sequence(chrom,pos,pos+len(s),self._reference))) - ref += addns - for i,na in enumerate(newalts): newalts[i] = na+addns - a = ref[len(s):] # new deletion, deleting from pos - else: - self.error(line,self.V33_BAD_ALLELE) - newalts.append(a) - alt = newalts - # deletion alleles exist, add dummy 1st reference allele, and account for leading base - if have_deletions: - if pos == 0: - # Petr Danacek's: we can't have a leading nucleotide at (1-based) position 1 - addn = get_sequence(chrom,pos+len(ref),pos+len(ref)+1,self._reference) - ref += addn - alt = [allele+addn for allele in alt] - else: - addn = get_sequence(chrom,pos-1,pos,self._reference) - ref = addn + ref - alt = [addn + allele for allele in alt] - pos -= 1 - else: - # format v4.0 -- just check for nucleotides - for allele in alt: - if not alleleRegEx.match(allele): - self.error(line,self.V40_BAD_ALLELE,allele) - - # check for leading nucleotide in indel calls - for allele in alt: - if len(allele) != len(ref): - if len(allele) == 0: self.error(line,self.ZERO_LENGTH_ALLELE) - if ref[0].upper() != allele[0].upper() and "N" not in (ref[0]+allele[0]).upper(): - self.error(line,self.MISSING_INDEL_ALLELE_REF_BASE) - - # trim trailing bases in alleles - for i in range(1,min(len(ref),min(map(len,alt)))): - if len(set(allele[-1].upper() for allele in alt)) > 1 or ref[-1].upper() != alt[0][-1].upper(): - break - ref, alt = ref[:-1], [allele[:-1] for allele in alt] - - # left-align alleles, if a reference is available - if self._leftalign and self._reference: - while left < pos: - movable = True - for allele in alt: - if len(allele) > len(ref): - longest, shortest = allele, ref - else: - longest, shortest = ref, allele - if len(longest) == len(shortest) or longest[:len(shortest)].upper() != shortest.upper(): - movable = False - if longest[-1].upper() != longest[len(shortest)-1].upper(): - movable = False - if not movable: - break - ref = ref[:-1] - alt = [allele[:-1] for allele in alt] - if min(len(allele) for allele in alt) == 0 or len(ref) == 0: - ref = faref_leftflank[pos-left-1] + ref - alt = [faref_leftflank[pos-left-1] + allele for allele in alt] - pos -= 1 - - # parse sample columns - samples = [] - for sample in cols[9:]: - dict = {} - values = sample.split(':') - if len(values) > len(format): - self.error(line,self.BAD_NUMBER_OF_VALUES,"(found %s values in element %s; expected %s)" % (len(values),sample,len(format))) - for idx in range(len(format)): - expected = self.get_expected(format[idx], self._format, alt) - if idx < len(values): value = values[idx] - else: - if expected == -1: value = "." - else: value = ",".join(["."]*expected) - dict[format[idx]] = self.parse_formatdata(format[idx], value, self._format, line) - if expected != -1 and len(dict[format[idx]]) != expected: - self.error(line,self.BAD_NUMBER_OF_PARAMETERS, - "id=%s, expected %s parameters, got %s" % (format[idx],expected,dict[format[idx]])) - if len(dict[format[idx]] ) < expected: dict[format[idx]] += [dict[format[idx]][-1]]*(expected-len(dict[format[idx]])) - dict[format[idx]] = dict[format[idx]][:expected] - samples.append( dict ) - - # done - d = {'chrom':chrom, - 'pos':pos, # return 0-based position - 'id':id, - 'ref':ref, - 'alt':alt, - 'qual':qual, - 'filter':filter, - 'info':info, - 'format':format} - for key,value in zip(self._samples,samples): - d[key] = value - - return d - - - def write_data(self, stream, data): - required = ['chrom','pos','id','ref','alt','qual','filter','info','format'] + self._samples - for k in required: - if k not in data: raise ValueError("Required key %s not found in data" % str(k)) - if data['alt'] == []: alt = "." - else: alt = ",".join(data['alt']) - if data['filter'] == None: filter = "." - elif data['filter'] == []: - if self._version == 33: filter = "0" - else: filter = "PASS" - else: filter = ';'.join(data['filter']) - if data['qual'] == -1: qual = "." - else: qual = str(data['qual']) - - output = [data['chrom'], - str(data['pos']+1), # change to 1-based position - data['id'], - data['ref'], - alt, - qual, - filter, - self.format_formatdata( data['info'], self._info, separator=";" ), - self.format_formatdata( data['format'], self._format, value=False ) ] - - for s in self._samples: - output.append( self.format_formatdata( data[s], self._format, key=False ) ) - - stream.write( "\t".join(output) + "\n" ) - - def _parse_header(self, stream): - self._lineno = 0 - for line in stream: - self._lineno += 1 - if line.startswith('##'): - self.parse_header( line.strip() ) - elif line.startswith('#'): - self.parse_heading( line.strip() ) - self.enter_default_format() - else: - break - return line - - def _parse(self, line, stream): - if len(line.strip()) > 0: - d = self.parse_data( line.strip() ) - if d: yield d - for line in stream: - self._lineno += 1 - if self._lines and self._lineno > self._lines: raise StopIteration - d = self.parse_data( line.strip() ) - if d: yield d - - ###################################################################################################### - # - # API follows - # - ###################################################################################################### - - def getsamples(self): - """ List of samples in VCF file """ - return self._samples - - def setsamples(self,samples): - """ List of samples in VCF file """ - self._samples = samples - - def getheader(self): - """ List of header key-value pairs (strings) """ - return self._header - - def setheader(self,header): - """ List of header key-value pairs (strings) """ - self._header = header - - def getinfo(self): - """ Dictionary of ##INFO tags, as VCF.FORMAT values """ - return self._info - - def setinfo(self,info): - """ Dictionary of ##INFO tags, as VCF.FORMAT values """ - self._info = info - - def getformat(self): - """ Dictionary of ##FORMAT tags, as VCF.FORMAT values """ - return self._format - - def setformat(self,format): - """ Dictionary of ##FORMAT tags, as VCF.FORMAT values """ - self._format = format - - def getfilter(self): - """ Dictionary of ##FILTER tags, as VCF.FORMAT values """ - return self._filter - - def setfilter(self,filter): - """ Dictionary of ##FILTER tags, as VCF.FORMAT values """ - self._filter = filter - - def setversion(self, version): - if version != 33 and version != 40: raise ValueError("Can only handle v3.3 and v4.0 VCF files") - self._version = version - - def setregions(self, regions): - self._regions = regions - - def setreference(self, ref): - """ Provide a reference sequence; a Python class supporting a fetch(chromosome, start, end) method, e.g. PySam.FastaFile """ - self._reference = ref - - def ignoreerror(self, errorstring): - try: self._ignored_errors.add(self.__dict__[errorstring]) - except KeyError: raise ValueError("Invalid error string: %s" % errorstring) - - def warnerror(self, errorstring): - try: self._warn_errors.add(self.__dict__[errorstring]) - except KeyError: raise ValueError("Invalid error string: %s" % errorstring) - - def parse(self, stream): - """ Parse a stream of VCF-formatted lines. Initializes class instance and return generator """ - last_line = self._parse_header(stream) - # now return a generator that does the actual work. In this way the pre-processing is done - # before the first piece of data is yielded - return self._parse(last_line, stream) - - def write(self, stream, datagenerator): - """ Writes a VCF file to a stream, using a data generator (or list) """ - self.write_header(stream) - self.write_heading(stream) - for data in datagenerator: self.write_data(stream,data) - - def writeheader(self, stream): - """ Writes a VCF header """ - self.write_header(stream) - self.write_heading(stream) - - def compare_calls(self, pos1, ref1, alt1, pos2, ref2, alt2): - """ Utility function: compares two calls for equality """ - # a variant should always be assigned to a unique position, one base before - # the leftmost position of the alignment gap. If this rule is implemented - # correctly, the two positions must be equal for the calls to be identical. - if pos1 != pos2: return False - # from both calls, trim rightmost bases when identical. Do this safely, i.e. - # only when the reference bases are not Ns - while len(ref1)>0 and len(alt1)>0 and ref1[-1] == alt1[-1]: - ref1 = ref1[:-1] - alt1 = alt1[:-1] - while len(ref2)>0 and len(alt2)>0 and ref2[-1] == alt2[-1]: - ref2 = ref2[:-1] - alt2 = alt2[:-1] - # now, the alternative alleles must be identical - return alt1 == alt2 - -########################################################################################################### -########################################################################################################### -## API functions added by Andreas -########################################################################################################### - - def connect( self, filename ): - '''connect to tabix file.''' - self.tabixfile = pysam.Tabixfile( filename ) - self._parse_header(self.tabixfile.header) - - def fetch(self, - reference = None, - start = None, - end = None, - region = None ): - """ Parse a stream of VCF-formatted lines. Initializes class instance and return generator """ - - iter = self.tabixfile.fetch( reference, start, end, region, parser = pysam.asVCF() ) - for x in iter: - yield VCFRecord( x, self ) - - def validate( self, record ): - '''validate vcf record. - - returns a validated record. - ''' - - chrom, pos = record.chrom, record.pos - - # check reference - ref = record.ref - if ref == ".": - self.error(str(record),self.MISSING_REF) - if self._version == 33: ref = get_sequence(chrom,pos,pos+1,self._reference) - else: ref = "" - else: - for c in ref: - if c not in "ACGTN": self.error(str(record),self.UNKNOWN_CHAR_IN_REF) - if "N" in ref: ref = get_sequence(chrom, - pos, - pos+len(ref), - self._reference) - - # make sure reference is sane - if self._reference: - left = max(0,self.pos-100) - faref_leftflank = get_sequence(chrom,left,self.pos+len(ref),self._reference) - faref = faref_leftflank[pos-left:] - if faref != ref: self.error(line,self.WRONG_REF,"(reference is %s, VCF says %s)" % (faref,ref)) - ref = faref - - # check: format fields are defined - for f in record.format: - if f not in self._format: self.error(str(record),self.FORMAT_NOT_DEFINED, f) - - # check: all filters are defined - for f in record.filter: - if f not in self._filter: self.error(str(record),self.FILTER_NOT_DEFINED, f) - - # convert v3.3 alleles - if self._version == 33: - if len(ref) != 1: self.error(line,self.V33_BAD_REF) - newalts = [] - have_deletions = False - for a in alt: - if len(a) == 1: a = a + ref[1:] # SNP; add trailing reference - elif a.startswith('I'): a = ref[0] + a[1:] + ref[1:] # insertion just beyond pos; add first and trailing reference - elif a.startswith('D'): # allow D and D - have_deletions = True - try: - l = int(a[1:]) # throws ValueError if sequence - if len(ref) < l: # add to reference if necessary - addns = get_sequence(chrom,pos+len(ref),pos+l,self._reference) - ref += addns - for i,na in enumerate(newalts): newalts[i] = na+addns - a = ref[l:] # new deletion, deleting pos...pos+l - except ValueError: - s = a[1:] - if len(ref) < len(s): # add Ns to reference if necessary - addns = get_sequence(chrom,pos+len(ref),pos+len(s),self._reference) - if not s.endswith(addns) and addns != 'N'*len(addns): - self.error(line,self.V33_UNMATCHED_DELETION, - "(deletion is %s, reference is %s)" % (a,get_sequence(chrom,pos,pos+len(s),self._reference))) - ref += addns - for i,na in enumerate(newalts): newalts[i] = na+addns - a = ref[len(s):] # new deletion, deleting from pos - else: - self.error(line,self.V33_BAD_ALLELE) - newalts.append(a) - alt = newalts - # deletion alleles exist, add dummy 1st reference allele, and account for leading base - if have_deletions: - if pos == 0: - # Petr Danacek's: we can't have a leading nucleotide at (1-based) position 1 - addn = get_sequence(chrom,pos+len(ref),pos+len(ref)+1,self._reference) - ref += addn - alt = [allele+addn for allele in alt] - else: - addn = get_sequence(chrom,pos-1,pos,self._reference) - ref = addn + ref - alt = [addn + allele for allele in alt] - pos -= 1 - else: - # format v4.0 -- just check for nucleotides - for allele in alt: - if not alleleRegEx.match(allele): - self.error(line,self.V40_BAD_ALLELE,allele) - - - # check for leading nucleotide in indel calls - for allele in alt: - if len(allele) != len(ref): - if len(allele) == 0: self.error(line,self.ZERO_LENGTH_ALLELE) - if ref[0].upper() != allele[0].upper() and "N" not in (ref[0]+allele[0]).upper(): - self.error(line,self.MISSING_INDEL_ALLELE_REF_BASE) - - # trim trailing bases in alleles - for i in range(1,min(len(ref),min(map(len,alt)))): - if len(set(allele[-1].upper() for allele in alt)) > 1 or ref[-1].upper() != alt[0][-1].upper(): - break - ref, alt = ref[:-1], [allele[:-1] for allele in alt] - - # left-align alleles, if a reference is available - if self._leftalign and self._reference: - while left < pos: - movable = True - for allele in alt: - if len(allele) > len(ref): - longest, shortest = allele, ref - else: - longest, shortest = ref, allele - if len(longest) == len(shortest) or longest[:len(shortest)].upper() != shortest.upper(): - movable = False - if longest[-1].upper() != longest[len(shortest)-1].upper(): - movable = False - if not movable: - break - ref = ref[:-1] - alt = [allele[:-1] for allele in alt] - if min(len(allele) for allele in alt) == 0 or len(ref) == 0: - ref = faref_leftflank[pos-left-1] + ref - alt = [faref_leftflank[pos-left-1] + allele for allele in alt] - pos -= 1 - - - -