File: last-remove-dominated.py

package info (click to toggle)
last-align 199-1
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 1,840 kB
  • sloc: cpp: 19,270; python: 1,561; ansic: 639; makefile: 132; sh: 79
file content (167 lines) | stat: -rwxr-xr-x 5,992 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#! /usr/bin/env python

# Read MAF-format alignments, and write those are not "dominated" by
# any other one.  X dominates Y if they overlap on the top sequence,
# and the score of X in the overlapping region exceeds the total score
# of Y.  This script assumes the alignments have been sorted by
# maf-sort.sh.

import fileinput, string, re, itertools, optparse, signal

signal.signal(signal.SIGPIPE, signal.SIG_DFL)  # stop spurious error message

op = optparse.OptionParser(usage="%prog sorted-last-output.maf")
(opts, args) = op.parse_args()

def finalize_score_matrix(score_matrix, mask_lowercase):
    '''Add lowercase and non-standard letters to the score matrix.'''
    if ('a', 'a') in score_matrix: return  # it's already finalized
    min_score = min(score_matrix.values())
    if not mask_lowercase:
        for k, score in score_matrix.items():
            i, j = k
            score_matrix[i, j.lower()]         = score
            score_matrix[i.lower(), j]         = score
            score_matrix[i.lower(), j.lower()] = score
    for i in string.printable:
        for j in string.printable:
            k = i, j
            score_matrix.setdefault(k, min_score)

score_matrix = {}
gap_open = None
gap_extend = None
gap_pair = None  # extra parameter for "generalized affine gap costs"

def gap_cost(x, y):
    '''Get the cost for x and y unaligned letters in 2 sequences.'''
    affine      = gap_open * 2 + gap_extend * (x + y)
    generalized = gap_open + gap_extend * abs(x - y) + gap_pair * min(x, y)
    return min(affine, generalized)

def aln_score(seq1, seq2):
    '''Calculate the score of the alignment represented by seq1 and seq2.'''
    score = 0
    gap1 = gap2 = 0  # gap sizes
    for pair in itertools.izip(seq1, seq2):  # faster than zip
        a, b = pair
        if   a == '-': gap1 += 1
        elif b == '-': gap2 += 1
        else:
            if gap1 + gap2 > 0:
                score -= gap_cost(gap1, gap2)
                gap1 = gap2 = 0
            score += score_matrix[pair]
    if gap1 + gap2 > 0: score -= gap_cost(gap1, gap2)
    return score

def nthNonGap(s, n):
    '''Get the start position of the n-th non-gap.'''
    for i, x in enumerate(s):
        if x != '-':
            if n == 0: return i
            n -= 1
    raise ValueError('non-gap not found')

def nthLastNonGap(s, n):
    '''Get the end position of the n-th last non-gap.'''
    return len(s) - nthNonGap(s[::-1], n)

def aln_dominates(x, y):
    '''Does alignment x dominate alignment y?'''
    yscore = y[0][4]
    ybeg = y[1][4]
    yend = y[1][-1]
    xseq1 = x[1][12]
    xseq2 = x[2][12]
    xbeg = x[1][4]
    xend = x[1][-1]
    if len(xseq1) != len(xseq2): raise Exception("bad MAF data")
    lettersFromBeg = max(ybeg-xbeg, 0)
    lettersFromEnd = max(xend-yend, 0)
    alnBeg = nthNonGap(xseq1, lettersFromBeg)
    alnEnd = nthLastNonGap(xseq1, lettersFromEnd)
    xscore = aln_score(xseq1[alnBeg:alnEnd], xseq2[alnBeg:alnEnd])
    return xscore > yscore

def parse_maf_a_line(line):
    '''Parse a MAF line starting with "a".'''
    words = re.split(r'([\s=]+)', line)  # keep the delimiters
    words[4] = int(words[4])             # alignment score
    return words

def parse_maf_s_line(line):
    '''Parse a MAF line starting with "s".'''
    words = re.split(r'(\s+)', line)   # keep the spaces
    words[4] = int(words[4])           # aln start
    words[6] = int(words[6])           # aln size
    words.append(words[4] + words[6])  # aln end
    return words

def write_aln(aln):
    if aln.pop() == True: return  # the alignment is dominated
    aln[0] = ''.join(map(str, aln[0]))
    aln[1] = ''.join(map(str, aln[1][:-1]))  # remove the end that we appended
    aln[2] = ''.join(map(str, aln[2][:-1]))  # remove the end that we appended
    print ''.join(aln)  # this prints a blank line at the end

def process_aln(alns, newaln):
    if not newaln: return
    newaln[0] = parse_maf_a_line(newaln[0])
    newaln[1] = parse_maf_s_line(newaln[1])
    newaln[2] = parse_maf_s_line(newaln[2])
    newaln.append(False)  # means: this alignment is not (yet) dominated
    newchr = newaln[1][2]
    newbeg = newaln[1][4]
    kept_alns = []
    for oldaln in alns:
        oldchr = oldaln[1][2]
        oldbeg = oldaln[1][4]
        oldend = oldaln[1][-1]
        if oldchr == newchr and oldend > newbeg:
            if oldbeg > newbeg: raise Exception("MAF data isn't sorted")
            if not oldaln[-1]:  # speed-up
                if aln_dominates(newaln, oldaln): oldaln[-1] = True
            if not newaln[-1]:  # speed-up
                if aln_dominates(oldaln, newaln): newaln[-1] = True
            kept_alns.append(oldaln)
        else:
            write_aln(oldaln)
    alns[:] = kept_alns + [ newaln ]

alns = []  # alignments that might overlap the next one
aln = []  # the current alignment
columns = []  # column headings for the score matrix
mask_lowercase = False

for line in fileinput.input(args):
    if line.startswith('#'):
        body = line[1:].strip()
        words = body.split()
        m1 = re.search(r'a=(\d+) b=(\d+) c=(\d+)', body)
        m2 = re.search(r'u=(\d+)', body)
        m3 = re.match(r'\S(\s+\S)*$', body)
        m4 = re.match(r'\S(\s+-?\d+)+$', body)
        if m1:
            gap_open, gap_extend, gap_pair = map(int, m1.groups())
        elif m2:
            mask_lowercase = (m2.group(1) == "3")
        elif m3 and not columns:
            columns = words
        elif m4 and len(words) == len(columns) + 1:
            row = words[0]  # row heading for the score matrix
            scores = map(int, words[1:])
            for col, s in zip(columns, scores):
                score_matrix[row, col] = s
        elif columns:
            finalize_score_matrix(score_matrix, mask_lowercase)
    elif line.isspace():
        process_aln(alns, aln)
        aln = []
    else:
        aln.append(line)

process_aln(alns, aln)  # don't forget the last alignment

for oldaln in alns:
    write_aln(oldaln)