File: last-map-probs.py

package info (click to toggle)
last-align 199-1
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 1,840 kB
  • sloc: cpp: 19,270; python: 1,561; ansic: 639; makefile: 132; sh: 79
file content (127 lines) | stat: -rwxr-xr-x 4,594 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#! /usr/bin/env python

# Copyright 2010, 2011 Martin C. Frith

# Read query-genome alignments: write them along with the probability
# that each alignment is not the true mapping of its query.  These
# probabilities make the risky assumption that one of the alignments
# reported for each query is correct.

import sys, os, fileinput, math, optparse, signal

def logsum(x, y):
    """log(exp(x) + exp(y))."""
    a = max(x, y)
    b = min(x, y)
    return a + math.log(1 + math.exp(b-a))

def mismapProb(score, temperature, queryName, denominators):
    x = score / temperature
    y = denominators[queryName]
    assert x <= y
    prob = 1 - math.exp(x - y)
    assert prob >= 0
    return prob

def mafScore(words):
    for word in words:
        if word.startswith("score="):
            return float(word[6:])
    raise Exception("found an alignment without a score")

def namesAndScores(lines):
    queryNames = []
    scores = []
    for line in lines:
        if line.startswith("a"):
            s = mafScore(line.split())
            scores.append(s)
            sLineCount = 0
        elif line.startswith("s"):
            sLineCount += 1
            if sLineCount == 2: queryNames.append(line.split()[1])
            # maxsplit doesn't seem to make it faster
        elif line[0].isdigit():  # we have an alignment in tabular format
            w = line.split()
            scores.append(float(w[0]))
            queryNames.append(w[6])
    return queryNames, scores

def scoreTotals(queryNames, scores, temperature):
    denominators = {}
    for n, s in zip(queryNames, scores):
        r = s / temperature
        d = denominators.get(n, -1e9)
        denominators[n] = logsum(d, r)
    return denominators

def writeOneBatch(lines, queryNames, scores, denominators, opts, temperature):
    isWanted = True
    i = 0
    for line in lines:
        if line.startswith("a"):
            s = scores[i]
            p = mismapProb(s, temperature, queryNames[i], denominators)
            i += 1
            if s < opts.score or p > opts.mismap:
                isWanted = False
            else:
                newLineEnd = " mismap=%g\n" % p
                line = line.rstrip() + newLineEnd
        elif line[0].isdigit():  # we have an alignment in tabular format
            s = scores[i]
            p = mismapProb(s, temperature, queryNames[i], denominators)
            i += 1
            if s < opts.score or p > opts.mismap: continue
            newLineEnd = "\t%g\n" % p
            line = line.rstrip() + newLineEnd
        if isWanted: print line,
        if line.isspace(): isWanted = True  # reset at end of maf paragraph

def processOneBatch(lines, opts, temperature):
    if not lines: return
    if temperature < 0:
        raise Exception("I need a header line with: t=(a positive value)")

    queryNames, scores = namesAndScores(lines)
    denominators = scoreTotals(queryNames, scores, temperature)
    writeOneBatch(lines, queryNames, scores, denominators, opts, temperature)

def lastMapProbs(opts, args):
    temperature = -1
    lines = []

    for line in fileinput.input(args):
        if line.startswith("#"):
            for i in line.split():
                if i.startswith("t="): temperature = float(i[2:])
        if line.startswith("# batch"):
            processOneBatch(lines, opts, temperature)
            lines = []
        lines.append(line)
    processOneBatch(lines, opts, temperature)

if __name__ == "__main__":
    signal.signal(signal.SIGPIPE, signal.SIG_DFL)  # avoid silly error message

    usage = """
  %prog --help
  %prog [options] lastal-alignments"""

    description = "Calculate a mismap probability for each alignment.  This is the probability that the alignment does not reflect the origin of the query sequence, assuming that one reported alignment does reflect the origin of each query."

    op = optparse.OptionParser(usage=usage, description=description)
    op.add_option("-m", "--mismap", type="float", default=0.01, metavar="M",
                  help="don't write alignments with mismap probability > M (default: %default)")
    op.add_option("-s", "--score", type="float", default=0, metavar="S",
                  help="don't write alignments with score < S (default: %default)")
    (opts, args) = op.parse_args()
    if not args and sys.stdin.isatty():
        op.print_help()
        op.exit()

    try: lastMapProbs(opts, args)
    except KeyboardInterrupt: pass  # avoid silly error message
    except Exception, e:
        prog = os.path.basename(sys.argv[0])
        sys.exit(prog + ": error: " + str(e))