File: maf-cut

package info (click to toggle)
last-align 1179-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 4,004 kB
  • sloc: cpp: 43,317; python: 3,352; ansic: 1,874; makefile: 495; sh: 305
file content (136 lines) | stat: -rwxr-xr-x 4,405 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#! /usr/bin/python3
# Author: Martin C. Frith 2018

from __future__ import print_function

import gzip
import itertools
import optparse
import signal
import sys

def myOpen(fileName):  # faster than fileinput
    if fileName == "-":
        return sys.stdin
    if fileName.endswith(".gz"):
        return gzip.open(fileName, "rt")  # xxx dubious for Python2
    return open(fileName)

def alnBegFromSeqBeg(gappedSequence, seqBeg):
    for i, x in enumerate(gappedSequence):
        if x != "-":
            if seqBeg == 0:
                return i
            seqBeg -= 1

def alnEndFromSeqEnd(gappedSequence, seqEnd):
    for i, x in enumerate(gappedSequence):
        if x != "-":
            seqEnd -= 1
            if seqEnd == 0:
                return i + 1

def alignmentRange(cutBeg, cutEnd, sLineFields):
    beg = int(sLineFields[2])
    if beg >= cutEnd:
        return 0, 0
    sequenceWithGaps = sLineFields[6]
    span = len(sequenceWithGaps) - sequenceWithGaps.count("-")
    end = beg + span
    if end <= cutBeg:
        return 0, 0
    seqBeg = max(cutBeg - beg, 0)
    alnBeg = alnBegFromSeqBeg(sequenceWithGaps, seqBeg)
    seqEnd = min(cutEnd - beg, span)
    alnEnd = alnEndFromSeqEnd(sequenceWithGaps, seqEnd)
    return alnBeg, alnEnd

def findTheSpecifiedSequence(seqName, mafLines):
    for line in mafLines:
        if line[0] == "s":
            fields = line.split()
            if seqName is None or fields[1] == seqName:
                return fields
    return None

def cutMafRecords(mafLines, alnBeg, alnEnd):
    for line in mafLines:
        fields = line.split()
        if line[0] == "s":
            oldSeq = fields[6]
            newSeq = oldSeq[alnBeg:alnEnd]
            beg = int(fields[2]) + alnBeg - oldSeq[:alnBeg].count("-")
            span = len(newSeq) - newSeq.count("-")
            yield fields[:2] + [str(beg), str(span)] + fields[4:6] + [newSeq]
        elif line[0] == "q":
            yield fields[:2] + [fields[2][alnBeg:alnEnd]]
        elif line[0] == "p":
            yield fields[:1] + [fields[1][alnBeg:alnEnd]]
        else:
            yield fields

def mafFieldWidths(mafRecords):
    sRecords = (i for i in mafRecords if i[0] == "s")
    sColumns = list(zip(*sRecords))
    for i in sColumns:
        yield max(list(map(len, i)))

def printMafLine(fieldWidths, fields):
    formatParams = itertools.chain.from_iterable(list(zip(fieldWidths, fields)))
    print("%*s %-*s %*s %*s %*s %*s %*s" % tuple(formatParams))

def cutOneMaf(cutRange, mafLines):
    seqName, cutBeg, cutEnd = cutRange
    sLineFields = findTheSpecifiedSequence(seqName, mafLines)
    if not sLineFields:
        return
    alnBeg, alnEnd = alignmentRange(cutBeg, cutEnd, sLineFields)
    if alnBeg >= alnEnd:
        return
    mafRecords = list(cutMafRecords(mafLines, alnBeg, alnEnd))
    fieldWidths = list(mafFieldWidths(mafRecords))
    for fields in mafRecords:
        if fields[0] == "s":
            printMafLine(fieldWidths, fields)
        elif fields[0] == "q":
            printMafLine(fieldWidths, fields[:2] + [""] * 4 + fields[2:])
        elif fields[0] == "p":
            printMafLine(fieldWidths, fields[:1] + [""] * 5 + fields[1:])
        else:
            print(" ".join(fields))
    print()

def mafCutOneFile(cutRange, lines):
    mafLines = []
    for line in lines:
        if line.isspace():
            cutOneMaf(cutRange, mafLines)
            mafLines = []
        elif line[0] != "#":
            mafLines.append(line)
    cutOneMaf(cutRange, mafLines)

def wantedRange(cutSpecification):
    seqName = None
    if ":" in cutSpecification:
        seqName, cutSpecification = cutSpecification.rsplit(":", 1)
    beg, end = cutSpecification.rsplit("-", 1)
    return seqName, int(beg), int(end)

def mafCut(opts, args):
    cutRange = wantedRange(args[0])
    if len(args) > 1:
        for i in args[1:]:
            mafCutOneFile(cutRange, myOpen(i))
    else:
        mafCutOneFile(cutRange, sys.stdin)

if __name__ == "__main__":
    signal.signal(signal.SIGPIPE, signal.SIG_DFL)  # avoid silly error message
    usage = "%prog chrN:start-end alignments.maf"
    description = "Get parts of MAF-format alignments."
    op = optparse.OptionParser(usage=usage, description=description)
    opts, args = op.parse_args()
    if not args:
        op.error("please give me a cut specification and MAF alignments")
    mafCut(opts, args)