X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=erange.git;a=blobdiff_plain;f=cistematic%2Fcore%2Fmotif.py;fp=cistematic%2Fcore%2Fmotif.py;h=9a40e67ded5c9d78fe70524cfd7095d194bc8274;hp=0000000000000000000000000000000000000000;hb=bc30aca13e5ec397c92e67002fbf7a103130b828;hpb=0d3e3112fd04c2e6b44a25cacef1d591658ad181 diff --git a/cistematic/core/motif.py b/cistematic/core/motif.py new file mode 100644 index 0000000..9a40e67 --- /dev/null +++ b/cistematic/core/motif.py @@ -0,0 +1,1373 @@ +########################################################################### +# # +# C O P Y R I G H T N O T I C E # +# Copyright (c) 2003-10 by: # +# * California Institute of Technology # +# # +# All Rights Reserved. # +# # +# Permission is hereby granted, free of charge, to any person # +# obtaining a copy of this software and associated documentation files # +# (the "Software"), to deal in the Software without restriction, # +# including without limitation the rights to use, copy, modify, merge, # +# publish, distribute, sublicense, and/or sell copies of the Software, # +# and to permit persons to whom the Software is furnished to do so, # +# subject to the following conditions: # +# # +# The above copyright notice and this permission notice shall be # +# included in all copies or substantial portions of the Software. # +# # +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS # +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN # +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # +# SOFTWARE. # +########################################################################### +# +# motif.py - defines the motif object and its methods in cistematic +from string import upper, lower +from math import log, exp +from copy import deepcopy +from cistematic.core import complement +from cistematic.cisstat.score import pearsonCorrelation +import re, os, tempfile + +if os.environ.get("CISTEMATIC_ROOT"): + cisRoot = os.environ.get("CISTEMATIC_ROOT") +else: + cisRoot = "/proj/genome" + +if os.environ.get("CISTEMATIC_TEMP"): + cisTemp = os.environ.get("CISTEMATIC_TEMP") +else: + cisTemp = "/tmp" + +tempfile.tempdir = cisTemp + +try: + import cistematic.core._motif as _motif + hasMotifExtension = True +except: + hasMotifExtension = False + +matrixRow = {"A": 0, + "C": 1, + "G": 2, + "T": 3 +} + +symbolToMatrix = {"A": [1.0, 0.0, 0.0, 0.0], + "C": [0.0, 1.0, 0.0, 0.0], + "G": [0.0, 0.0, 1.0, 0.0], + "T": [0.0, 0.0, 0.0, 1.0], + "W": [0.5, 0.0, 0.0, 0.5], + "S": [0.0, 0.5, 0.5, 0.0], + "R": [0.5, 0.0, 0.5, 0.0], + "Y": [0.0, 0.5, 0.0, 0.5], + "M": [0.5, 0.5, 0.0, 0.0], + "K": [0.0, 0.0, 0.5, 0.5], + "H": [0.33, 0.33, 0.0, 0.34], + "B": [0.0, 0.33, 0.33, 0.34], + "V": [0.33, 0.33, 0.34, 0.0], + "D": [0.33, 0.0, 0.33, 0.34], + "N": [0.25, 0.25, 0.25, 0.25] +} + +reMAP = {"A": "A", + "C": "C", + "G": "G", + "T": "T", + "N": "[ACGT]", + "W": "[AT]", + "S": "[GC]", + "R": "[AG]", + "Y": "[CT]", + "M": "[AC]", + "K": "[GT]", + "H": "[ACT]", + "B": "[CGT]", + "V": "[ACG]", + "D": "[AGT]", + "|": "I" +} + +motifDict = {"A": ["W", "R", "M", "H", "V", "D"], + "T": ["W", "Y", "K", "H", "B", "D"], + "C": ["S", "Y", "M", "H", "B", "V"], + "G": ["S", "R", "K", "B", "V", "D"] +} + + +class Motif: + """ The Motif class is the heart of cistematic. It captures both the consensus, + PWM, and Markov(1) versions of the motif, as well as provides methods for scanning + sequences for matches. + """ + motifSeq = "" + motifPWM = [] + reversePWM = [] + motifMarkov1 = [] + revMarkov1 = [] + tagID = "" + sequences = [] + annotations = [] + strictConsensus = "" + threshold = 1.0 + info = "" + + + def __init__(self, tagID="", motif="", PWM=[], seqs=[], thresh=0.0, info="", motifFile="", seqfile=""): + """ initialize a Motif object either with (A) a tagID identifier and either of + (i) a IUPAC consensus or (ii) a PWM or (iii) example sequences, or (B) from + a motif file in Cistematic motif format. + """ + fileTagID = "" + if motifFile != "": + (fileTagID, motif, PWM, seqs, thresh, info) = self.readMotif(motifFile) + + if len(tagID) > 0: + self.setTagID(tagID) + elif len(fileTagID) > 0: + self.setTagID(fileTagID) + else: + self.setTagID("default") + + if motif <> "": + self.setMotif(motif) + + # PWM overrules the motif + if len(PWM) > 1: + self.setPWM(PWM) + + # a seqfile can be used to create a list of sequences + if len(seqfile) > 0: + seqs = self.readSeqFile(seqfile) + + # Sequences overrules the PWM and motif + if len(seqs) > 1: + self.setSequences(seqs) + self.setThreshold(len(seqs[0])) + + self.setThreshold(float(thresh)) + if len(info) > 0: + self.setInfo(info) + + + def __len__(self): + """ returns the length of the motif + """ + return len(self.motifPWM) + + + def readMotif(self, motifFile): + """ read motif in cistmeatic motif format to initialize motif instance. + """ + motif = "" + PWM = [] + seqs = [] + threshold = 0.0 + info = "" + infile = open(motifFile, "r") + for line in infile: + if len(line) < 4 or line[0] == "#": + continue + + fields = line.strip().split("\t") + fieldType = fields[0].lower() + if fieldType not in ["tagid", "motif", "acgt", "sequence", "threshold", "info"]: + print "could not process line %s" % line + continue + + if len(fields) < 2: + continue + + if fieldType == "motif": + motif = fields[1] + elif fieldType == "acgt": + PWM.append([float(fields[1]), float(fields[2]), float(fields[3]), float(fields[4])]) + elif fieldType == "sequence": + seqs.append(fields[1]) + elif fieldType == "threshold": + threshold = float(fields[1]) + elif fieldType == "info": + info = fields[1].strip() + elif fieldType == "tagid": + tagID = fields[1] + + infile.close() + + return (tagID, motif, PWM, seqs, threshold, info) + + + def readSeqFile(self, seqFile): + """ read sequences from a sequence file. + """ + seqs = [] + infile = open(seqFile, "r") + for line in infile: + seqs.append(line.strip().upper()) + + infile.close() + + return seqs + + + def saveMotif(self, motFile): + """ Save motif in cistematic motif format. + """ + outfile = open(motFile, "w") + outfile.write("tagid\t%s\n" % self.tagID) + outfile.write("info\t%s\n" % self.info.strip()) + outfile.write("threshold\t%s\n" % str(self.threshold)) + outfile.write("motif\t%s\n" % self.buildConsensus()) + for col in self.motifPWM: + outfile.write("acgt\t%f\t%f\t%f\t%f\n" % (col[0], col[1], col[2], col[3])) + + for seq in self.sequences: + outfile.write("sequence\t%s\n" % seq) + + outfile.close() + + + def setTagID(self, tag): + """ set motif identifier. + """ + self.tagID = tag + + + def setInfo(self, info): + """ set motif info string. + """ + self.info = info + + + def setThreshold(self, threshold): + """ set a pre-defined threshold or the highest-possible threshold, otherwise. + """ + (sForward, sBackward) = self.scoreMotif(self.buildStrictConsensus()) + if sForward > sBackward: + self.threshold = sForward + else: + self.threshold = sBackward + + if threshold < self.threshold and threshold > 0: + self.threshold = threshold + + + def setMotif(self, motif): + """ set the motif PWM using a IUPAC consensus. + """ + self.motifSeq = motif + self.motifPWM = self.buildPWM(motif) + self.buildReversePWM() + self.strictConsensus = self.buildStrictConsensus() + + + def setSequences(self, seqs): + """ set the founder sequences for the motif and recalculate PWM with them. + """ + self.sequences = seqs + self.calculatePWMfromSeqs() + self.calculateMarkov1() + + + def setPWM(self, PWM): + """ set the PWM for the motif and calculate consensus. + """ + self.motifPWM = PWM + self.buildReversePWM() + self.motifSeq = self.buildConsensus() + self.strictConsensus = self.buildStrictConsensus() + + + def appendToPWM(self, col): + """ add a column to the PWM for the motif + """ + self.motifPWM.append(col) + + + def buildPWM(self, motif): + """ returns the PWM for the provided consensus motif. + """ + PWM = [] + for letter in upper(motif): + PWM.append(symbolToMatrix[letter]) + + return PWM + + + def buildReversePWM(self): + """ returns the reverse PWM for the motif + """ + theRevPWM = [] + tempPWM = deepcopy(self.motifPWM) + tempPWM.reverse() + for col in tempPWM: + theRevPWM.append([col[matrixRow["T"]], col[matrixRow["G"]], col[matrixRow["C"]], col[matrixRow["A"]]]) + + self.reversePWM = deepcopy(theRevPWM) + + return self.reversePWM + + + def getPWM(self): + """ returns the PWM for the motif + """ + return self.motifPWM + + + def printPWM(self): + """ print the PWM and the consensus + """ + aRow = "" + cRow = "" + gRow = "" + tRow = "" + cons = self.buildConsensus() + consLine = "Cons:" + for NT in cons: + consLine += "\t" + consLine += NT + + for col in self.motifPWM: + aRow = "%s%s\t" % (aRow, str(round(col[matrixRow["A"]],4))) + cRow = "%s%s\t" % (cRow, str(round(col[matrixRow["C"]],4))) + gRow = "%s%s\t" % (gRow, str(round(col[matrixRow["G"]],4))) + tRow = "%s%s\t" % (tRow, str(round(col[matrixRow["T"]],4))) + + print "A:\t%s\nC:\t%s\nG:\t%s\nT:\t%s\n" % (aRow, cRow, gRow, tRow) + print "%s\n" % consLine + + + def saveLogo(self, outfilename, height=-1, width=-1): + """ saves a logo version of the motif as outfilename (assumes has .png) + if the motif is built from sequences. + will fail if weblogo 2.8.2 package is not installed in correct place. + """ + logoPath = "%s/programs/weblogo/seqlogo" % cisRoot + if outfilename[-4:] in [".png", ".PNG"]: + outfilename = outfilename[:-4] + + if len(self.sequences) < 1: + print "cannot run logo representation without founder sequences" + else: + if True: + seqfilename = "%s.tmp" % tempfile.mktemp() + seqfile = open(seqfilename, "w") + for sequence in self.sequences: + seqfile.write("%s\n" % sequence) + + seqfile.flush() + dimensions = "" + if height > 0: + dimensions += "-h %d " % height + if width > 0: + dimensions += "-w %d " % width + else: + dimensions += "-w %d " % len(self.motifPWM) + + cmd = logoPath + " -f " + seqfilename + " -F PNG -c " + dimensions + "-a -Y -n -M -k 1 -o " + outfilename + contents = os.system(cmd) + seqfile.close() + os.remove(seqfilename) + else: + print "failed to make logo: expecting weblogo 2.8.2 package in %s" % logoPath + print "also check if ghostscript package is correctly installed." + + + def getSymbol(self, col): + """ helper function for buildConsensus() + """ + for NT in ["A", "C", "G", "T"]: + row = matrixRow[NT] + if col[row] > 0.9: + return NT + + aColValue = col[matrixRow["A"]] + cColValue = col[matrixRow["C"]] + gColValue = col[matrixRow["G"]] + tColValue = col[matrixRow["T"]] + + dualsList = [("R", aColValue + gColValue), + ("Y", tColValue + cColValue), + ("W", aColValue + tColValue), + ("S", cColValue + gColValue), + ("M", aColValue + cColValue), + ("K", tColValue + gColValue) + ] + + bestDual = self.getBestSymbol(dualsList) + if bestDual[1] > 0.9: + return bestDual[0] + + trioList = [("B", cColValue + gColValue + tColValue), + ("D", aColValue + gColValue + tColValue), + ("H", aColValue + cColValue + tColValue), + ("V", aColValue + cColValue + gColValue) + ] + + bestTrio = self.getBestSymbol(trioList) + if bestTrio[1] > 0.9: + return bestTrio[0] + + return "N" + + + def getBestSymbol(self, symbolProbabilityList): + bestSymbol = symbolProbabilityList[0] + for symbol in symbolProbabilityList[1:]: + if symbol[1] > bestSymbol[1]: + bestSymbol = symbol + + return bestSymbol + + + def buildConsensus(self): + """ returns the best consensus using the IUPAC alphabet. + """ + consensus = "" + for col in self.motifPWM: + consensus += self.getSymbol(col) + + return consensus + + + def buildStrictConsensus(self): + """ returns the best consensus using solely nucleotides. + """ + consensus = "" + for col in self.motifPWM: + mRow = [] + for nt in ("A", "C", "G", "T"): + mRow.append((col[matrixRow[nt]], nt)) + + mRow.sort() + consensus += mRow[3][1] + + return consensus + + + def bestConsensusScore(self): + """ returns the best consensus score possible. + """ + score = 0.0 + for col in self.motifPWM: + mRow = [] + for nt in ["A", "C", "G", "T"]: + mRow.append((col[matrixRow[nt]], nt)) + + mRow.sort() + score += mRow[3][0] + + return score + + + def expected(self, length, background=[0.25, 0.25, 0.25, 0.25], numMismatches=0): + """ returns the expected number of matches to the consensus in a sequence of a given length and background. + """ + expectedNum = length * self.consensusProbability(background, numMismatches) + return expectedNum + + + def consensusProbability(self, background=[0.25, 0.25, 0.25, 0.25], numMismatches=0): + """ returns the probability of the consensus given the background. + """ + prob = 0 + motifs = [] + if numMismatches> 0: + seqs = self.seqMismatches(self.buildConsensus().upper(), numMismatches) + motifs = seqs.split("|") + else: + motifs.append(self.buildConsensus()) + + for theCons in motifs: + motProb = 0 + for NT in theCons: + currentProb = 0.0 + if NT in ("A", "W", "R", "M", "H", "V", "D", "N"): + currentProb += background[matrixRow["A"]] + + if NT in ("C", "S", "Y", "M", "H", "B", "V", "N"): + currentProb += background[matrixRow["C"]] + + if NT in ("G", "S", "R", "K", "B", "V", "D", "N"): + currentProb += background[matrixRow["G"]] + + if NT in ("T", "W", "Y", "K", "H", "B", "D", "N"): + currentProb += background[matrixRow["T"]] + + motProb = motProb + log(currentProb) + + prob += exp(motProb) + + return prob + + + def pwmProbability(self, background): + """ returns probability of the PWM. + """ + prob = 1.0 + for row in self.motifPWM: + currentProb = 0.0 + for NT in ["A", "C", "G", "T"]: + currentProb += row[matrixRow[NT]] * background[matrixRow[NT]] + + prob = prob * currentProb + + return prob + + + def revComp(self): + """ returns the reverse complement of the consensus of this motif. + """ + return complement(self.buildConsensus(), len(self.motifPWM)) + + + def numberOfN(self): + """ returns numbers of effective Ns in motif. + """ + index = 0 + for col in self.motifPWM: + if self.getSymbol(col) == "N": + index += 1 + + return index + + + def buildMismatchSeq(self, rootSeq, tailSeq, numMismatches): + """ helper function called from seqMismatches(). + """ + finalSeq = "" + tailLen = len(tailSeq) + if tailLen < 1 or numMismatches < 1: + return rootSeq + tailSeq + + for pos in range(tailLen - numMismatches + 1): + newRootSeq = rootSeq + newRootSeq += tailSeq[:pos] + newRootSeq += "N" + finalSeq += self.buildMismatchSeq(newRootSeq, tailSeq[pos + 1:], numMismatches - 1) + finalSeq += "|" + + return finalSeq[:-1] + + + def seqMismatches(self, seq, numMismatches): + """ Returns list of sequences that will be used by initializeMismatchRE(). + """ + return self.buildMismatchSeq("", seq, numMismatches) + + + def probMatchPWM(self, PWM, NT, colIndex): + """ returns the probability of seeing that particular nucleotide according to the PSFM. + """ + + if NT in ["A", "T", "C", "G"]: + row = matrixRow[NT] + return PWM[colIndex][row] + + if NT == "N": + return 1.0 + + currentProb = 0.0 + for motifNucleotide in ["A", "T", "C", "G"]: + if NT in motifDict[motifNucleotide]: + row = matrixRow[NT] + currentProb += PWM[colIndex][row] + + return currentProb + + + def psfmOdds(self, PWM, NT, colIndex, background=[0.25, 0.25, 0.25, 0.25]): + """ calculates the odds of nucleotide NT coming from position colIndex in thePWM + as opposed to coming from the background. + """ + + currentProb = self.probMatchPWM(PWM, NT, colIndex) + backgroundProb = self.getBackgroundProbability(self, NT, background) + + try: + odds = currentProb / backgroundProb + except ZeroDivisionError: + odds = 1.0 + + return odds + + + def getBackgroundProbability(self, NT, background=[0.25, 0.25, 0.25, 0.25]): + + if NT in ["A", "T", "C", "G"]: + row = matrixRow[NT] + return background[row] + + if NT == "N": + return 1.0 + + backgroundProb = 0.0 + for motifNucleotide in ["A", "T", "C", "G"]: + if NT in motifDict[motifNucleotide]: + row = matrixRow[NT] + backgroundProb += background[row] + + return backgroundProb + + + def ntMatch(self, motifNT, seqNT): + """ returns True if seqNT matches motifNT. + """ + if motifNT == seqNT: + return True + + if seqNT == "N" or motifNT == "N": + return True + + if motifNT in motifDict[seqNT]: + return True + + return False + + + def scoreMotif(self, aSeq, diff=1000): + """ calculates the consensus score using the PSFM + """ + motLength = len(self.motifPWM) + if len(aSeq) < motLength: + return (0.0, 0.0) + + matchPWM = self.probMatchPWM + motPWM = self.motifPWM + revPWM = self.reversePWM + theSeq = upper(aSeq) + forwardCons = 0.0 + reverseCons = 0.0 + bestCons = 0.0 + for index in range(motLength): + currentNT = theSeq[index] + forwardCons += matchPWM(motPWM, currentNT, index) + reverseCons += matchPWM(revPWM, currentNT, index) + bestCons += matchPWM(motPWM,self.strictConsensus[index], index) + + if (forwardCons + diff) < bestCons and (reverseCons + diff) < bestCons: + return (-1, -1) + + return (forwardCons, reverseCons) + + + def scoreMotifLogOdds(self, aSeq, background=[0.25, 0.25, 0.25, 0.25]): + """ calculates the log-odds score using the PSFM given the markov(0) background. + """ + motLength = len(self.motifPWM) + if len(aSeq) < motLength: + return (0.0, 0.0) + + odds = self.psfmOdds + motPWM = self.motifPWM + revPWM = self.reversePWM + theSeq = upper(aSeq) + forwardCons = 0.0 + reverseCons = 0.0 + bestCons = 0.0 + for index in range(motLength): + currentNT = theSeq[index] + try: + forwardCons += log(odds(motPWM, currentNT, index, background), 2) + except: + forwardCons += log(0.01, 2) + + try: + reverseCons += log(odds(revPWM, currentNT, index, background), 2) + except: + reverseCons += log(0.01, 2) + + bestCons += log(odds(motPWM,self.strictConsensus[index], index, background), 2) + + return (forwardCons, reverseCons) + + + def bestLogOddsScore(self, background=[0.25, 0.25, 0.25, 0.25]): + """ calculates the best possible log-odds score using the PSFM given the markov(0) background. + """ + motLength = len(self.motifPWM) + odds = self.psfmOdds + motPWM = self.motifPWM + bestLogOdds = 0.0 + for index in range(motLength): + bestLogOdds += log(odds(motPWM,self.strictConsensus[index], index, background), 2) + + return bestLogOdds + + + def locateConsensus(self, aSeq): + """ returns a list of positions on aSeq that match the consensus exactly. + """ + cons = self.buildConsensus() + revComp = self.revComp() + motLength = len(cons) + Position = [] + if len(aSeq) < motLength: + return [] + else: + theSeq = upper(aSeq) + + pos = 0 + seqLength = len(theSeq) + while pos <= (seqLength - motLength): + subSeq = theSeq[pos:pos + motLength].strip() + try: + forwardMot = 1 + for index in range(motLength): + if not self.ntMatch(cons[index], subSeq[index]): + forwardMot = 0 + break + + revCompMot = 1 + for index in range(motLength): + if not self.ntMatch(revComp[index], subSeq[index]): + revCompMot = 0 + break + except: + print "chocked at pos %d" % pos + forwardMot = 0 + + if forwardMot == 1: + Position.append((pos, "F")) + pos += motLength + elif revCompMot == 1: + Position.append((pos, "R")) + pos += motLength + else: + pos +=1 + + return Position + + + def compareConsensus(self, aSeq): + """ returns a sequence with nucleotide differences from consensus in lower case. + """ + cons = self.buildConsensus() + revComp = self.revComp() + motLength = len(cons) + if len(aSeq) < motLength: + raise NameError, "Sequence too short" + else: + theSeq = upper(aSeq) + + forwardMismatch = 0 + backwardMismatch = 0 + forwardSeq = "" + backwardSeq = "" + + for index in range(motLength): + if not self.ntMatch(cons[index], theSeq[index]): + forwardMismatch += 1 + forwardSeq += lower(theSeq[index]) + else: + forwardSeq += theSeq[index] + + if not self.ntMatch(revComp[index], theSeq[index]): + backwardMismatch += 1 + backwardSeq += lower(theSeq[index]) + else: + backwardSeq += theSeq[index] + + if forwardMismatch <= backwardMismatch: + return forwardSeq + else: + return backwardSeq + + + def scoreDiffPWM(self, compMotif): + """ returns a score scaled from 0 (no difference) to 2 (completely different) to + quantify the difference between the motif and another motif. + """ + score = 0.0 + diffPWM = self.getDiffPWM(compMotif) + for pos in range(len(diffPWM)): + (adiff, cdiff, gdiff, tdiff) = diffPWM[pos] + score += abs(adiff) + abs(cdiff) + abs(gdiff) + abs(tdiff) + + score /= float(len(diffPWM)) + + return score + + + def getDiffPWM(self, compMotif): + """ subtracts the PWM of compMotif from existing PWM to compare differences. + Note that the comparison is only done on the length of the shorter motif. + """ + diffPWM = [] + compPWM = compMotif.getPWM() + numBasesToCompare = min(len(compMotif), len(self.motifPWM)) + + for pos in range(numBasesToCompare): + pwmCol = self.motifPWM[pos] + pwmColComp = compPWM[pos] + pwmEntry = [] + for NT in range(4): + pwmEntry.append(pwmCol[NT] - pwmColComp[NT]) + + diffPWM.append(pwmEntry) + + return diffPWM + + + def initializeRE(self): + """ initializes Regular Expression motif engine. + """ + global forwardRE + global backwardRE + cons = self.buildConsensus().upper() + revComp = self.revComp().upper() + reCons = "" + reBackward = "" + for NT in cons: + reCons += reMAP[NT] + + if revComp != cons: + for NT in revComp: + reBackward += reMAP[NT] + else: + reBackward = "ZZZZZZZ" + + forwardRE = re.compile(reCons, re.IGNORECASE) + backwardRE = re.compile(reBackward, re.IGNORECASE) + + + def initializeMismatchRE(self, numMismatches): + """ initializes Regular Expression motif engine allowing for mismatches. + """ + global forwardRE + global backwardRE + cons = self.seqMismatches(self.buildConsensus().upper(), numMismatches) + revComp = self.seqMismatches(self.revComp().upper(), numMismatches) + reCons = "" + reBackward = "" + for NT in cons: + reCons += reMAP[NT] + + if self.revComp().upper() != self.buildConsensus().upper(): + for NT in revComp: + reBackward += reMAP[NT] + else: + reBackward = "ZZZZZZZ" + + forwardRE = re.compile(reCons, re.IGNORECASE) + backwardRE = re.compile(reBackward, re.IGNORECASE) + + + def locateConsensusRE(self, sequence): + """ Returns a list of positions on aSeq that match the consensus exactly. + Should be run after either initializeRE() or initializeMismatchRE(numMismatches) + """ + motLength = len(self.motifPWM) + position = [] + results = [] + if len(sequence) < motLength: + return [] + + forwardIter = forwardRE.finditer(sequence) + backwardIter = backwardRE.finditer(sequence) + for match in forwardIter: + position.append((match.start(), "F")) + + for match in backwardIter: + position.append((match.start(), "R")) + + positionLength = len(position) + if positionLength >= 1: + position.sort() + (prevPos, prevSense) = position[0] + results.append((prevPos, prevSense)) + + for index in range(1, positionLength): + (pos, sense) = position[index] + if pos >= prevPos + motLength: + results.append((pos, sense)) + (pos, sense) = (prevPos, prevSense) + + return results + + + def locateStrictConsensus(self, aSeq, mismatches=0): + """ returns a list of positions on aSeq that match the strict + consensus within some mismatches. + Only available as a C-extension for greater speed-up for now. + """ + forwardMer = self.buildStrictConsensus() + motLength = len(forwardMer) + revcompMer = complement(forwardMer, motLength) + if hasMotifExtension: + return _motif.locateMer(aSeq, forwardMer, revcompMer, mismatches) + else: + print "only supported as part of the C extension for now" + return [] + + + def locateMotif(self, sequence, threshold=90.0, numberN=0): + """ returns a list of positions on aSeq that match the PWM within a Threshold, + given as a percentage of the optimal consensus score. + Will call C-extension for greater speed-up if compiled. + """ + motifLength = len(self.motifPWM) + sequenceLength = len(sequence) + threshold /= 100.0 + if threshold < 0.5: + print "Threshold less than 50% - will abort locateMotif()" + return [] + + maxScore = self.bestConsensusScore() + maxDiff = maxScore * (1 - threshold) + if sequenceLength < motifLength: + return [] + else: + sequence.strip() + + if hasMotifExtension: + return _motif.locateMotif(sequence, self.motifPWM, self.reversePWM, maxScore, maxDiff) + + sequence = upper(sequence) + positionList = [] + position = 0 + while position <= (sequenceLength - motifLength): + subSequence = sequence[position: position + motifLength] + if subSequence.count("N") > numberN: + position += 1 + continue + + (seqScore, revSeqScore) = self.scoreMotif(subSequence, maxDiff) + if seqScore >= revSeqScore and seqScore > 1.0: + positionList.append((position, "F")) + elif revSeqScore > 1.0: + positionList.append((position, "R")) + + position += 1 + + return positionList + + + def locateMarkov1(self, sequence, maxFold=5.0): + """ returns a list of positions on sequence that match the Markov1 within maxFold. + """ + motifLength = len(self.motifPWM) + sequenceLength = len(sequence) + if maxFold < 1.0: + print "maxFold less than 1.0 - will abort locateMarkov1()" + return [] + + maxScore = self.bestMarkov1Score() * maxFold + if sequenceLength < motifLength: + return [] + else: + sequence.strip() + + if hasMotifExtension: + return _motif.locateMarkov1(sequence, self.motifMarkov1, self.revMarkov1, maxScore) + + sequence = upper(sequence) + positionList = [] + position = 0 + while position <= (sequenceLength - motifLength): + subSequence = sequence[position: position + motifLength] + if subSequence.count("N") > 0: + position += 1 + continue + + (seqScore, revSeqScore) = self.scoreMarkov1(subSequence, maxScore) + if seqScore <= revSeqScore and seqScore < maxScore: + positionList.append((position, "F")) + elif revSeqScore < maxScore: + positionList.append((position, "R")) + + position += 1 + + return positionList + + + def calculatePWMfromSeqs(self): + """ calculate the PWM using a set of non-degenerate instances of the motif. + """ + PWM = [] + numSeqs = len(self.sequences) + + if numSeqs < 1: + return + + # using length of first sequence as the length of the motif + length = len(self.sequences[0]) + for index in range(length): + PWM.append([0.0, 0.0, 0.0, 0.0]) + + for seq in self.sequences: + index = 0 + theseq = seq.upper() + for NT in theseq: + PWM[index][matrixRow[NT]] += 1.0 + index += 1 + + for index in range(length): + for NT in ["A", "C", "G", "T"]: + PWM[index][matrixRow[NT]] /= numSeqs + + self.motifPWM = PWM + self.buildReversePWM() + self.motifSeq = self.buildConsensus() + self.strictConsensus = self.buildStrictConsensus() + + + def printMarkov1(self): + """ print the Markov1 PSSM of the form previous NT -> current NT. + """ + row = [] + for prior in range(4): + row.append(["", "", "", ""]) + + for pos in self.motifMarkov1: + for prior in range(4): + for current in range(4): + row[prior][current] += str(round(pos[prior][current], 4)) + "\t" + + for prior in ["A", "C", "G", "T"]: + for current in ["A", "C", "G", "T"]: + try: + print "%s -> %s\t%s\n" % (prior, current, row[matrixRow[prior]][matrixRow[current]]) + except: + print "ERROR: %s %s" % (prior, current) + print "\n" + + + def bestMarkov1Score(self): + """ returns the best markov1 score possible. + """ + motLength = len(self.motifMarkov1) + matchMarkov1 = self.probMatchMarkov1 + score = 0.0 + for index in range(motLength): + col = self.motifMarkov1[index] + mRow = [] + for prior in ["A", "C", "G", "T"]: + for current in ["A", "C", "G", "T"]: + mRow.append((col[matrixRow[prior]][matrixRow[current]], prior, current)) + mRow.sort() + + if index == 0: + currentProb = matchMarkov1(self.motifMarkov1, "N", mRow[-1][2], index) + else: + currentProb = matchMarkov1(self.motifMarkov1, mRow[-1][1], mRow[-1][2], index) + + if currentProb < 0.0001: + currentProb = 0.0001 + + if currentProb > 0.0: + score -= log(currentProb,2.0) + + return score + + + def worstMarkov1Score(self): + """ returns the worst markov1 score possible. + """ + motLength = len(self.motifMarkov1) + currentProb = 0.0001 + score = -log(currentProb, 2.0) * (motLength - 1) + + return score + + + def calculateMarkov1(self, pseudoCount=1.0): + """ calculate the Markov1 PSSM using a set of non-degenerate instances of the motif. + adds a pseudoCount for unseen combinations. + """ + self.motifMarkov1 = [] + numSeqs = len(self.sequences) + pseudoCount + + if numSeqs < 2: + return [] + + # using length of first sequence as the length of the motif + length = len(self.sequences[0]) + + for index in range(length): + self.motifMarkov1.append([[pseudoCount, pseudoCount, pseudoCount, pseudoCount], + [pseudoCount, pseudoCount, pseudoCount, pseudoCount], + [pseudoCount, pseudoCount, pseudoCount, pseudoCount], + [pseudoCount, pseudoCount, pseudoCount, pseudoCount]]) + + for seq in self.sequences: + theseq = seq.upper() + index = 0 + prior = -1 + for pos in theseq: + if index == 0: + for priorNT in range(4): + self.motifMarkov1[index][priorNT][matrixRow[pos]] += 0.25 + else: + self.motifMarkov1[index][prior][matrixRow[pos]] += 1.0 + + prior = matrixRow[pos] + index += 1 + + for index in range(length): + for prior in range(4): + for current in range(4): + self.motifMarkov1[index][prior][current] /= numSeqs + + self.buildReverseMarkov1(pseudoCount) + + + def buildReverseMarkov1(self, pseudoCount=1.0): + """ calculate the Reverse Markov1 PSSM using a set of non-degenerate instances of the motif. + """ + self.revMarkov1 = [] + numSeqs = len(self.sequences) + pseudoCount + + if numSeqs < 2: + return [] + + # using length of first sequence as the length of the motif + length = len(self.sequences[0]) + for index in range(length): + self.revMarkov1.append([[pseudoCount, pseudoCount, pseudoCount, pseudoCount], + [pseudoCount, pseudoCount, pseudoCount, pseudoCount], + [pseudoCount, pseudoCount, pseudoCount, pseudoCount], + [pseudoCount, pseudoCount, pseudoCount, pseudoCount]]) + + for aSeq in self.sequences: + seq = complement(aSeq.upper(), length) + index = 0 + prior = -1 + for pos in seq: + if index == 0: + for priorNT in range(4): + self.revMarkov1[index][priorNT][matrixRow[pos]] += 0.25 + else: + self.revMarkov1[index][prior][matrixRow[pos]] += 1.0 + + prior = matrixRow[pos] + index += 1 + + for index in range(length): + for prior in range(4): + for current in range(4): + self.revMarkov1[index][prior][current] /= numSeqs + + + def scoreMarkov1(self, aSeq, maxScore=10000000.): + """ calculate the matching score using the Markov1. + limit search if given a low maxScore + """ + motLength = len(self.motifMarkov1) + matchMarkov1 = self.probMatchMarkov1 + Score = [] + if len(aSeq) < motLength: + return Score + else: + theSeq = upper(aSeq) + + pos = 0 + seqProb = 0.0 + revSeqProb = 0.0 + subSeq = theSeq[pos: pos + motLength] + previousNT = "N" + for index in range(motLength): + currentNT = subSeq[index] + currentProb = matchMarkov1(self.motifMarkov1, previousNT, currentNT, index) + + if currentProb < 0.0001: + currentProb = 0.0001 + + if currentProb > 0.0: + seqProb -= log(currentProb,2.0) + + revCurrentProb = matchMarkov1(self.revMarkov1, previousNT, currentNT, index) + if revCurrentProb < 0.002: + revCurrentProb = 0.002 + + if revCurrentProb > 0.0: + revSeqProb -= log(revCurrentProb, 2.0) + + if seqProb > maxScore and revSeqProb > maxScore: + return (seqProb, revSeqProb) + + previousNT = currentNT + + return (seqProb, revSeqProb) + + + def probMatchMarkov1(self, theMarkov1, previousNT, NT, colIndex): + """ returns the likelihood of seeing NT given previousNT at this position of the motif. + """ + currentProb = 0.0 + if NT in ["A", "C", "G", "T"]: + currentNT = matrixRow[NT] + else: + currentNT = 0 + + try: + prevNT = matrixRow[previousNT] + except KeyError: + for index in range(4): + currentProb += theMarkov1[colIndex][index][currentNT] + + return currentProb + + if NT in ["A", "C", "G", "T"]: + return theMarkov1[colIndex][prevNT][currentNT] + + if NT == "N": + return 1.0 + + for motifNucleotide in ["A", "T", "C", "G"]: + if NT in motifDict[motifNucleotide]: + row = matrixRow[NT] + currentProb += theMarkov1[colIndex][prevNT][row] + + return currentProb + + + def isSane(self, minLen=7, stretchLen=6): + """ check for motif sanity, which includes: minimum length, less than half N's in consensus, + motifs include more than two nucleotides, no nucleotide or dinucleotide is repeated more + than stretchlen. The appropriate exceptions are made for 'GC' dinucleotides. + """ + stretchLen = int(stretchLen) + minLen = int(minLen) + stretchLen = min(stretchLen, minLen - 1) + + cons = self.buildConsensus() + motifLen = float(len(cons)) + if motifLen < minLen: + return False + + nCount = cons.count("N") + if (nCount >= 0.5 * motifLen): + return False + + aCount = cons.count("A") + gCount = cons.count("G") + cCount = cons.count("C") + tCount = cons.count("T") + + atCount = aCount + tCount + agCount = aCount + gCount + acCount = aCount + cCount + gtCount = gCount + tCount + tcCount = tCount + cCount + + for pairedCount in [atCount, agCount, acCount, gtCount, tcCount]: + if pairedCount == motifLen: + return False + + cons = self.buildStrictConsensus() + repeatSequences = [] + for nucleotide in ["A", "G", "C", "T"]: + repeatSequences.append(nucleotide * stretchLen) + + if stretchLen % 2 != 0: + stretchLen += 1 + + repeatCount = stretchLen/2 + for dinucleotide in ["AG", "AC", "AT", "CT", "GT"]: + repeatSequences.append(dinucleotide * repeatCount) + + for testSequence in repeatSequences: + if cons.count(testSequence): + return False + + return True + + +def correlateMotifs(actualMotifA, actualMotifB, maxSlide=1): + """ Compares two motifs using the "pearson correlation coefficient-like" MSS. + Will slide a motif up to maxSlide bases compared to the other, + and reports the best score. + """ + bestScore = 0.0 + + if len(actualMotifA) < len(actualMotifB): + motA = actualMotifB + motB = actualMotifA + else: + motA = actualMotifA + motB = actualMotifB + + motApwm = motA.getPWM() + motBpwm = motB.getPWM() + motCpwm = motB.buildReversePWM() + if hasMotifExtension: + return _motif.correlateMotifs(motApwm, motBpwm, motCpwm, maxSlide) + else: + length = len(motA) + padLength = length - len(motB) + Ncol = [symbolToMatrix["N"]] + for slide in range(-1 * maxSlide, maxSlide + padLength + 1): + pwmA = deepcopy(motApwm) + pwmB = deepcopy(motBpwm) + pwmC = deepcopy(motCpwm) + if slide < 0: + pwmA = Ncol * abs(slide) + pwmA + pwmB = pwmB + Ncol * (abs(slide) + padLength) + pwmC = pwmC + Ncol * (abs(slide) + padLength) + elif slide > 0 and slide <= maxSlide: + if padLength > 0: + if padLength >= slide: + adjustedPadLength = padLength - slide + adjustedSlide = 0 + else: + adjustedPadLength = 0 + adjustedSlide = slide - padLength + + pwmA = pwmA + Ncol * adjustedSlide + pwmB = Ncol * slide + pwmB + Ncol * adjustedPadLength + pwmC = Ncol * slide + pwmC + Ncol * adjustedPadLength + else: + pwmA = pwmA + Ncol * slide + pwmB = Ncol * slide + pwmB + pwmC = Ncol * slide + pwmC + elif slide > maxSlide: + maxDiff = slide - maxSlide + pwmA = pwmA + Ncol * maxSlide + pwmB = Ncol * slide + pwmB + Ncol * (padLength - maxDiff) + pwmC = Ncol * slide + pwmC + Ncol * (padLength - maxDiff) + else: + pwmB = pwmB + Ncol * padLength + pwmC = pwmC + Ncol * padLength + + score1 = 0.0 + score2 = 0.0 + thisLength = len(pwmA) + for index in range(thisLength): + score1 += pearsonCorrelation(pwmA[index], pwmB[index]) + score2 += pearsonCorrelation(pwmA[index], pwmC[index]) + + score1 = score1 / float(thisLength) + score2 = score2 / float(thisLength) + if score1 < score2 and score2 > bestScore: + bestScore = score2 + elif score1 > bestScore: + bestScore = score1 + + return bestScore + + +def MSS(motifA, motifB, maxSlide=1): + """ Compares two motifs using the motif similarity score (MSS). + Will slide a motif up to maxSlide bases compared to the other, + and reports the best score. Wrapper around correlateMotifs() + """ + return correlateMotifs(motifA, motifB, maxSlide) + + +def printMSS(motifList, maxSlide=1): + """ Prints a matrix of MSS comparisons between different motifs + in motifList. + """ + for mot1 in motifList: + print mot1.tagID, + for mot2 in motifList: + val = "\t%1.2f" % correlateMotifs(mot1, mot2, maxSlide) + print val, + + print "" \ No newline at end of file