File: last-map-probs

package info (click to toggle)
last-align 963-2
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 3,380 kB
  • sloc: cpp: 41,136; python: 2,744; ansic: 1,240; makefile: 383; sh: 255
file content (141 lines) | stat: -rwxr-xr-x 5,072 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#! /usr/bin/env python

# Copyright 2010, 2011, 2012, 2014 Martin C. Frith

# Read query-genome alignments: write them along with the probability
# that each alignment is not the true mapping of its query.  These
# probabilities make the risky assumption that one of the alignments
# reported for each query is correct.

from __future__ import print_function

import gzip
import sys, os, math, optparse, signal

def myOpen(fileName):
    if fileName == "-":
        return sys.stdin
    if fileName.endswith(".gz"):
        return gzip.open(fileName, "rt")  # xxx dubious for Python2
    return open(fileName)

def logsum(numbers):
    """Adds numbers, in log space, to avoid overflow."""
    m = max(numbers)
    s = sum(math.exp(i - m) for i in numbers)
    return m + math.log(s)

def mafScore(aLine):
    for word in aLine.split():
        if word.startswith("score="):
            return float(word[6:])
    raise Exception("found an alignment without a score")

def namesAndScores(lines):
    queryNames = []
    scores = []
    for line in lines:
        if line[0] == "a":  # faster than line.startswith("a")
            s = mafScore(line)
            scores.append(s)
            sLineCount = 0
        elif line[0] == "s":
            sLineCount += 1
            if sLineCount == 2: queryNames.append(line.split(None, 2)[1])
        elif line[0].isdigit():  # we have an alignment in tabular format
            w = line.split(None, 7)
            scores.append(float(w[0]))
            queryNames.append(w[6])
    return queryNames, scores

def scoreTotals(queryNames, scores, temperature):
    queryLists = {}
    for n, s in zip(queryNames, scores):
        queryLists.setdefault(n, []).append(s / temperature)
    return dict((k, logsum(queryLists[k])) for k in queryLists)

def writeOneBatch(lines, queryNames, scores, denominators, opts, temperature):
    isWanted = True
    i = 0
    for line in lines:
        if line[0] == "a":
            s = scores[i]
            p = 1.0 - math.exp(s / temperature - denominators[queryNames[i]])
            i += 1
            if s < opts.score or p > opts.mismap:
                isWanted = False
            else:
                newLineEnd = " mismap=%.3g\n" % p
                line = line.rstrip() + newLineEnd
        elif line[0].isdigit():  # we have an alignment in tabular format
            s = scores[i]
            p = 1.0 - math.exp(s / temperature - denominators[queryNames[i]])
            i += 1
            if s < opts.score or p > opts.mismap: continue
            newLineEnd = "\tmismap=%.3g\n" % p
            line = line.rstrip() + newLineEnd
        if isWanted: print(line, end="")
        if line.isspace(): isWanted = True  # reset at end of maf paragraph

def doOneBatch(lines, opts, temperature):
    queryNames, scores = namesAndScores(lines)
    denominators = scoreTotals(queryNames, scores, temperature)
    writeOneBatch(lines, queryNames, scores, denominators, opts, temperature)

def readHeaderOrDie(lines):
    t = 0.0
    e = -1
    for line in lines:
        if line.startswith("#") or line.isspace():
            print(line, end="")
            for i in line.split():
                if i.startswith("t="): t = float(i[2:])
                elif i.startswith("e="): e = int(i[2:])
            if t > 0 and e >= 0: break
        else:
            raise Exception("I need a header with t= and e=")
    return t, e

def doOneFile(opts, f):
    temperature, e = readHeaderOrDie(f)
    if opts.score < 0: opts.score = e + round(temperature * math.log(1000))
    lines = []

    for line in f:
        if line.startswith("# batch"):
            doOneBatch(lines, opts, temperature)
            lines = []
        lines.append(line)
    doOneBatch(lines, opts, temperature)

def lastMapProbs(opts, args):
    if args:
        for i in args:
            doOneFile(opts, myOpen(i))
    else:
        doOneFile(opts, sys.stdin)

if __name__ == "__main__":
    signal.signal(signal.SIGPIPE, signal.SIG_DFL)  # avoid silly error message

    usage = """
  %prog --help
  %prog [options] lastal-alignments"""

    description = "Calculate a mismap probability for each alignment.  This is the probability that the alignment does not reflect the origin of the query sequence, assuming that one reported alignment does reflect the origin of each query."

    op = optparse.OptionParser(usage=usage, description=description)
    op.add_option("-m", "--mismap", type="float", default=0.01, metavar="M",
                  help="don't write alignments with mismap probability > M (default: %default)")
    op.add_option("-s", "--score", type="float", default=-1, metavar="S",
                  help="don't write alignments with score < S (default: e+t*ln[1000])")
    (opts, args) = op.parse_args()
    if not args and sys.stdin.isatty():
        op.print_help()
        op.exit()

    try: lastMapProbs(opts, args)
    except KeyboardInterrupt: pass  # avoid silly error message
    except Exception as e:
        prog = os.path.basename(sys.argv[0])
        sys.exit(prog + ": error: " + str(e))