self.vcf = vcf
+ def error(self,line,error,opt=None):
+ '''raise error.'''
+ # pass to vcf file for error handling
+ return self.vcf.error( line, error, opt )
+
cdef update( self, char * buffer, size_t nbytes ):
'''update internal data.
27:"ERROR_TRAILING_DATA:Numerical field ('%s') has semicolon-separated trailing data",
28:"BAD_CHR_TAG:Error calculating chr tag for %s",
29:"ZERO_LENGTH_ALLELE:Found zero-length allele",
- 30:"MISSING_INDEL_ALLELE_REF_BASE:Indel alleles must begin with single reference base"
+ 30:"MISSING_INDEL_ALLELE_REF_BASE:Indel alleles must begin with single reference base",
+ 31:"ZERO_FOR_NON_FLAG_FIELD: number set to 0, but type is not 'FLAG'",
+ 32:"ERROR_FORMAT_NOT_INTEGER:Expected integer in formatted field; got %s",
+ 33:"ERROR_FLAG_HAS_VALUE:Flag fields should not have a value",
}
# tag-value pairs; tags are not unique; does not include fileformat, INFO, FILTER or FORMAT fields
_samples = []
# control behaviour
- _ignored_errors = set([11]) # ERROR_UNKNOWN_KEY
+ _ignored_errors = set([11,31]) # ERROR_UNKNOWN_KEY, ERROR_ZERO_FOR_NON_FLAG_FIELD
_warn_errors = set([])
_leftalign = False
errorlabel, errorstring = self._errors[error].split(':')
if opt: errorstring = errorstring % opt
errwarn = ["Error","Warning"][error in self._warn_errors]
- sys.stderr.write("Line %s: '%s'\n%s %s: %s\n" % (self._lineno,line,errwarn,errorlabel,errorstring))
+ errorstring += " in line %s: '%s'\n%s %s: %s\n" % (self._lineno,line,errwarn,errorlabel,errorstring)
if error in self._warn_errors: return
raise ValueError(errorstring)
idx += 1
if filter and idx==1: idx=3 # skip number and type fields for FILTER format strings
if not data['id']: self.error(line,self.BADLY_FORMATTED_FORMAT_STRING)
- if not data['descr']:
+ if 'descr' not in data:
+ # missing description
self.error(line,self.BADLY_FORMATTED_FORMAT_STRING)
- data['descr'] = '<none>'
+ data['descr'] = ""
if not data['type'] and not data['number']:
# fine, ##filter format
return FORMAT(data['id'],self.NT_NUMBER,0,"Flag",data['descr'],'.')
elif data['number'] == '#nonref_alleles': t = self.NT_NR_ALLELES
elif data['number'] == '#genotypes': t = self.NT_GENOTYPES
elif data['number'] == '#phased_genotypes': t = self.NT_PHASED_GENOTYPES
+ elif data['number'] == '#phased_genotypes': t = self.NT_PHASED_GENOTYPES
+ # abbreviations added in VCF version v4.1
+ elif data['number'] == 'A': t = self.NT_ALLELES
+ elif data['number'] == 'G': t = self.NT_GENOTYPES
else:
self.error(line,self.BADLY_FORMATTED_FORMAT_STRING)
+ # if number is 0 - type must be Flag
+ if n == 0 and data['type'] != 'Flag':
+ self.error( line, self.ZERO_FOR_NON_FLAG_FIELD)
+ # force type 'Flag' if no number
+ data['type'] = 'Flag'
+
return FORMAT(data['id'],t,n,data['type'],data['descr'],data['missing'])
-
def format_format( self, fmt, filter=False ):
values = [('ID',fmt.id)]
if fmt.number != None and not filter:
def enter_default_format(self):
for f in [FORMAT('GT',self.NT_NUMBER,1,'String','Genotype','.'),
- FORMAT('GQ',self.NT_NUMBER,1,'Integer','Genotype Quality',-1),
FORMAT('DP',self.NT_NUMBER,1,'Integer','Read depth at this position for this sample',-1),
+ FORMAT('FT',self.NT_NUMBER,1,'String','Sample Genotype Filter','.'),
+ FORMAT('GL',self.NT_UNKNOWN,-1,'Float','Genotype likelihoods','.'),
+ FORMAT('GLE',self.NT_UNKNOWN,-1,'Float','Genotype likelihoods','.'),
+ FORMAT('GQ',self.NT_NUMBER,1,'Integer','Genotype Quality',-1),
+ FORMAT('PL',self.NT_GENOTYPES,-1,'Integer','Phred-scaled genotype likelihoods', '.'),
+ FORMAT('GP',self.NT_GENOTYPES,-1,'Float','Genotype posterior probabilities','.'),
+ FORMAT('GQ',self.NT_GENOTYPES,-1,'Integer','Conditional genotype quality','.'),
FORMAT('HQ',self.NT_UNKNOWN,-1,'Integer','Haplotype Quality',-1), # unknown number, since may be haploid
- FORMAT('FT',self.NT_NUMBER,1,'String','Sample Genotype Filter','.')]:
+ FORMAT('PS',self.NT_UNKNOWN,-1,'Integer','Phase set','.'),
+ FORMAT('PQ',self.NT_NUMBER,1,'Integer','Phasing quality',-1),
+ FORMAT('EC',self.NT_ALLELES,1,'Integer','Expected alternate allel counts',-1),
+ FORMAT('MQ',self.NT_NUMBER,1,'Integer','RMS mapping quality',-1),
+ ]:
if f.id not in self._format:
self._format[f.id] = f
assert line.startswith('#')
assert not line.startswith('##')
headings = line[1:].split('\t')
- if len(headings)==1 and len(line[1:].split()) >= 9:
+ # test for 8, as FORMAT field might be missing
+ if len(headings)==1 and len(line[1:].split()) >= 8:
self.error(line,self.HEADING_NOT_SEPARATED_BY_TABS)
headings = line[1:].split()
err = "(found %s, expected %s)" % (headings[i],s)
#self.error(line,self.BADLY_FORMATTED_HEADING,err)
-
# allow FORMAT column to be absent
if len(headings) == 8:
headings.append("FORMAT")
self.error(self._line,self.BAD_GENOTYPE,GTstring)
return [".","|","."]
-
def convertGTback(self, GTdata):
return ''.join(map(str,GTdata))
if v == ".": values[idx] = f.missingvalue
else: values[idx] = int(v)
except:
- self.error(line,self.ERROR_FORMAT_NOT_NUMERICAL,values)
+ self.error(line,self.ERROR_FORMAT_NOT_INTEGER,"%s=%s" % (key, str(values)))
return [0] * len(values)
return values
elif f.type == "String":
self._line = line
- if f.id == "GT": values = map( self.convertGT, values )
+ if f.id == "GT": values = list(map( self.convertGT, values ))
return values
elif f.type == "Character":
for v in values:
elif f.type == "Float":
for idx,v in enumerate(values):
if v == ".": values[idx] = f.missingvalue
- try: return map(float,values)
+ try: return list(map(float,values))
except:
- self.error(line,self.ERROR_FORMAT_NOT_NUMERICAL,values)
+ self.error(line,self.ERROR_FORMAT_NOT_NUMERICAL,"%s=%s" % (key, str(values)))
return [0.0] * len(values)
else:
# can't happen
self.error(line,self.ERROR_INFO_STRING)
-
def inregion(self, chrom, pos):
if not self._regions: return True
for r in self._regions:
if r[0] == chrom and r[1] <= pos < r[2]: return True
return False
-
def parse_data( self, line, lineparse=False ):
cols = line.split('\t')
if len(cols) != len(self._samples)+9:
# gracefully deal with absent FORMAT column
- if len(cols) == 8 and len(self._samples)==0:
+ # and those missing samples
+ if len(cols) == 8:
cols.append("")
else:
self.error(line,
if len(elts) == 1: v = None
elif len(elts) == 2: v = elts[1]
else: self.error(line,self.ERROR_INFO_STRING)
- info[elts[0]] = self.parse_formatdata(elts[0], v, self._info, line)
+ info[elts[0]] = self.parse_formatdata(elts[0],
+ v,
+ self._info,
+ line)
# Gracefully deal with absent FORMAT column
if cols[8] == "": format = []
if f not in self._filter: self.error(line,self.FILTER_NOT_DEFINED, f)
# check: format fields are defined
- for f in format:
- if f not in self._format: self.error(line,self.FORMAT_NOT_DEFINED, f)
+ if self._format:
+ for f in format:
+ if f not in self._format: self.error(line,self.FORMAT_NOT_DEFINED, f)
# convert v3.3 alleles
if self._version == 33:
self.error(line,self.MISSING_INDEL_ALLELE_REF_BASE)
# trim trailing bases in alleles
- for i in range(1,min(len(ref),min(map(len,alt)))):
- if len(set(allele[-1].upper() for allele in alt)) > 1 or ref[-1].upper() != alt[0][-1].upper():
- break
- ref, alt = ref[:-1], [allele[:-1] for allele in alt]
+ if alt:
+ for i in range(1,min(len(ref),min(map(len,alt)))):
+ if len(set(allele[-1].upper() for allele in alt)) > 1 or ref[-1].upper() != alt[0][-1].upper():
+ break
+ ref, alt = ref[:-1], [allele[:-1] for allele in alt]
# left-align alleles, if a reference is available
if self._leftalign and self._reference:
else:
if expected == -1: value = "."
else: value = ",".join(["."]*expected)
- dict[format[idx]] = self.parse_formatdata(format[idx], value, self._format, line)
+
+ dict[format[idx]] = self.parse_formatdata(format[idx],
+ value,
+ self._format,
+ line)
if expected != -1 and len(dict[format[idx]]) != expected:
self.error(line,self.BAD_NUMBER_OF_PARAMETERS,
"id=%s, expected %s parameters, got %s" % (format[idx],expected,dict[format[idx]]))
return line
def _parse(self, line, stream):
+ # deal with files with header only
+ if line.startswith("##"): return
if len(line.strip()) > 0:
d = self.parse_data( line.strip() )
- #if d: yield d
+ if d: yield d
for line in stream:
self._lineno += 1
if self._lines and self._lineno > self._lines: raise StopIteration
d = self.parse_data( line.strip() )
- #if d: yield d
+ if d: yield d
######################################################################################################
#