File: maf-swap

package info (click to toggle)
last-align 1179-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 4,004 kB
  • sloc: cpp: 43,317; python: 3,352; ansic: 1,874; makefile: 495; sh: 305
file content (150 lines) | stat: -rwxr-xr-x 4,947 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/python3

# Read MAF-format alignments, and write them, after moving the Nth
# sequence to the top in each alignment.

# Before writing, if the top sequence would be on the - strand, then
# flip all the strands.  But don't do this if the top sequence is
# translated DNA.

from __future__ import print_function

import gzip
import itertools
import optparse
import os
import signal
import sys

try:
    maketrans = str.maketrans
except AttributeError:
    from string import maketrans

def myOpen(fileName):
    if fileName == "-":
        return sys.stdin
    if fileName.endswith(".gz"):
        return gzip.open(fileName, "rt")  # xxx dubious for Python2
    return open(fileName)

def indexOfNthSequence(mafLines, n):
    for i, line in enumerate(mafLines):
        if line[0] == "s":
            if n == 1: return i
            n -= 1
    raise Exception("encountered an alignment with too few sequences")

def rangeOfNthSequence(mafLines, n):
    """Get the range of lines associated with the Nth sequence."""
    start = indexOfNthSequence(mafLines, n)
    stop = start + 1
    while stop < len(mafLines):
        line = mafLines[stop]
        if line[0] not in "qi": break
        stop += 1
    return start, stop

complement = maketrans('ACGTNSWRYKMBDHVacgtnswrykmbdhv',
                       'TGCANSWYRMKVHDBtgcanswyrmkvhdb')
# doesn't handle "U" in RNA sequences
def revcomp(seq):
    return seq[::-1].translate(complement)

def flippedMafRecords(mafLines):
    for line in mafLines:
        words = line.split()
        if words[0] == "s":
            s, name, start, span, strand, seqlen, alnString = words
            newStart = str(int(seqlen) - int(start) - int(span))
            newStrand = "+-"[strand == "+"]
            newString = revcomp(alnString)
            yield [s, name, newStart, span, newStrand, seqlen, newString]
        elif words[0] == "p":
            yield words[:1] + [words[1][::-1]]
        elif words[0] == "q":
            yield words[:2] + [words[2][::-1]]
        else:
            yield words

def sLineFieldWidths(mafLines):
    sLines = (i for i in mafLines if i[0] == "s")
    sColumns = list(zip(*sLines))
    for i in sColumns:
        yield max(list(map(len, i)))

def joinedMafS(fieldWidths, words):
    formatParams = itertools.chain.from_iterable(list(zip(fieldWidths, words)))
    return "%*s %-*s %*s %*s %*s %*s %*s\n" % tuple(formatParams)

def joinedMafLine(words, fieldWidths):
    if words[0] == "s":
        return joinedMafS(fieldWidths, words)
    elif words[0] == "q":
        words = words[:2] + [""] * 4 + words[2:]
        return joinedMafS(fieldWidths, words)
    elif words[0] == "p":
        words = words[:1] + [""] * 5 + words[1:]
        return joinedMafS(fieldWidths, words)
    else:
        return " ".join(words) + "\n"

def flippedMaf(mafLines):
    flippedLines = list(flippedMafRecords(mafLines))
    fieldWidths = list(sLineFieldWidths(flippedLines))
    return (joinedMafLine(i, fieldWidths) for i in flippedLines)

def isCanonicalStrand(mafLine):
    words = mafLine.split()
    strand = words[4]
    if strand == "+": return True
    alnString = words[6]
    if "/" in alnString or "\\" in alnString: return True  # frameshifts
    alnSize = int(words[3])
    gapCount = alnString.count("-")
    if len(alnString) - gapCount < alnSize: return True  # translated DNA
    return False

def swapOneMaf(opts, mafLines):
    start, stop = rangeOfNthSequence(mafLines, opts.n)
    mafLines[1:stop] = mafLines[start:stop] + mafLines[1:start]
    if not isCanonicalStrand(mafLines[1]):
        mafLines = flippedMaf(mafLines)
    for i in mafLines:
        print(i, end="")
    print()  # blank line after each alignment

def mafSwap(opts, args):
    if not args:
        args = ["-"]
    for fileName in args:
        inputLines = myOpen(fileName)
        mafLines = []
        for line in inputLines:
            if line[0] == "#":
                print(line, end="")
            elif line.isspace():
                if mafLines:
                    swapOneMaf(opts, mafLines)
                    mafLines = []
            else:
                mafLines.append(line)
        if mafLines:
            swapOneMaf(opts, mafLines)

if __name__ == "__main__":
    signal.signal(signal.SIGPIPE, signal.SIG_DFL)  # avoid silly error message

    usage = "%prog [options] my-alignments.maf"
    description = "Change the order of sequences in MAF-format alignments."
    op = optparse.OptionParser(usage=usage, description=description)
    op.add_option("-n", type="int", default=2,
                  help="move the Nth sequence to the top (default: %default)")
    (opts, args) = op.parse_args()
    if opts.n < 1: op.error("option -n: should be >= 1")

    try: mafSwap(opts, args)
    except KeyboardInterrupt: pass  # avoid silly error message
    except Exception as e:
        prog = os.path.basename(sys.argv[0])
        sys.exit(prog + ": error: " + str(e))