1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
|
#! /usr/bin/env python
from __future__ import print_function
import gzip
import itertools
import optparse
import signal
import sys
def myOpen(fileName): # faster than fileinput
if fileName == "-":
return sys.stdin
if fileName.endswith(".gz"):
return gzip.open(fileName, "rt") # xxx dubious for Python2
return open(fileName)
def alnBegFromSeqBeg(gappedSequence, seqBeg):
for i, x in enumerate(gappedSequence):
if x != "-":
if seqBeg == 0:
return i
seqBeg -= 1
def alnEndFromSeqEnd(gappedSequence, seqEnd):
for i, x in enumerate(gappedSequence):
if x != "-":
seqEnd -= 1
if seqEnd == 0:
return i + 1
def alignmentRange(cutBeg, cutEnd, sLineFields):
beg = int(sLineFields[2])
if beg >= cutEnd:
return 0, 0
sequenceWithGaps = sLineFields[6]
span = len(sequenceWithGaps) - sequenceWithGaps.count("-")
end = beg + span
if end <= cutBeg:
return 0, 0
seqBeg = max(cutBeg - beg, 0)
alnBeg = alnBegFromSeqBeg(sequenceWithGaps, seqBeg)
seqEnd = min(cutEnd - beg, span)
alnEnd = alnEndFromSeqEnd(sequenceWithGaps, seqEnd)
return alnBeg, alnEnd
def findTheSpecifiedSequence(seqName, mafLines):
for line in mafLines:
if line[0] == "s":
fields = line.split()
if seqName is None or fields[1] == seqName:
return fields
return None
def cutMafRecords(mafLines, alnBeg, alnEnd):
for line in mafLines:
fields = line.split()
if line[0] == "s":
oldSeq = fields[6]
newSeq = oldSeq[alnBeg:alnEnd]
beg = int(fields[2]) + alnBeg - oldSeq[:alnBeg].count("-")
span = len(newSeq) - newSeq.count("-")
yield fields[:2] + [str(beg), str(span)] + fields[4:6] + [newSeq]
elif line[0] == "q":
yield fields[:2] + [fields[2][alnBeg:alnEnd]]
elif line[0] == "p":
yield fields[:1] + [fields[1][alnBeg:alnEnd]]
else:
yield fields
def mafFieldWidths(mafRecords):
sRecords = (i for i in mafRecords if i[0] == "s")
sColumns = zip(*sRecords)
for i in sColumns:
yield max(map(len, i))
def printMafLine(fieldWidths, fields):
formatParams = itertools.chain.from_iterable(zip(fieldWidths, fields))
print("%*s %-*s %*s %*s %*s %*s %*s" % tuple(formatParams))
def cutOneMaf(cutRange, mafLines):
seqName, cutBeg, cutEnd = cutRange
sLineFields = findTheSpecifiedSequence(seqName, mafLines)
if not sLineFields:
return
alnBeg, alnEnd = alignmentRange(cutBeg, cutEnd, sLineFields)
if alnBeg >= alnEnd:
return
mafRecords = list(cutMafRecords(mafLines, alnBeg, alnEnd))
fieldWidths = list(mafFieldWidths(mafRecords))
for fields in mafRecords:
if fields[0] == "s":
printMafLine(fieldWidths, fields)
elif fields[0] == "q":
printMafLine(fieldWidths, fields[:2] + [""] * 4 + fields[2:])
elif fields[0] == "p":
printMafLine(fieldWidths, fields[:1] + [""] * 5 + fields[1:])
else:
print(" ".join(fields))
print()
def mafCutOneFile(cutRange, lines):
mafLines = []
for line in lines:
if line.isspace():
cutOneMaf(cutRange, mafLines)
mafLines = []
elif line[0] != "#":
mafLines.append(line)
cutOneMaf(cutRange, mafLines)
def wantedRange(cutSpecification):
seqName = None
if ":" in cutSpecification:
seqName, cutSpecification = cutSpecification.rsplit(":", 1)
beg, end = cutSpecification.rsplit("-", 1)
return seqName, int(beg), int(end)
def mafCut(opts, args):
cutRange = wantedRange(args[0])
if len(args) > 1:
for i in args[1:]:
mafCutOneFile(cutRange, myOpen(i))
else:
mafCutOneFile(cutRange, sys.stdin)
if __name__ == "__main__":
signal.signal(signal.SIGPIPE, signal.SIG_DFL) # avoid silly error message
usage = "%prog chrN:start-end alignments.maf"
description = "Get parts of MAF-format alignments."
op = optparse.OptionParser(usage=usage, description=description)
opts, args = op.parse_args()
if not args:
op.error("please give me a cut specification and MAF alignments")
mafCut(opts, args)
|