1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
#! /usr/bin/env python3
# Author: Martin C. Frith 2021
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import print_function
import argparse
import gzip
import signal
import sys
def openFile(fileName):
if fileName == "-":
return sys.stdin
if fileName.endswith(".gz"):
return gzip.open(fileName, "rt")
return open(fileName)
def alignmentInput(lines):
ranges = []
alnLines = []
for line in lines:
fields = line.split()
if line[0] == "#":
print(line, end="")
elif fields:
alnLines.append(line)
if line[0] == "s":
seqName = fields[1]
beg = int(fields[2])
span = int(fields[3])
seqLen = int(fields[5])
end = beg + span
if fields[4] == "-":
end = seqLen - beg
beg = end - span
r = seqName, seqLen, beg, end
ranges.append(r)
else:
if alnLines:
yield ranges, alnLines
ranges = []
alnLines = []
if alnLines:
yield ranges, alnLines
def isCoveringMuchSequence(alns, nums):
n = nums[0]
for s in range(2):
seqLen = alns[n][0][s][1]
coverage = sum(alns[i][0][s][3] - alns[i][0][s][2] for i in nums)
if coverage * 2 >= seqLen: return True
return False
def isNearInAllSeqs(iRanges, jRanges, maxDistance):
for i, j in zip(iRanges, jRanges):
iSeq, iLen, iBeg, iEnd = i
jSeq, jLen, jBeg, jEnd = j
if iSeq != jSeq:
return False
if jBeg > iEnd + maxDistance or iBeg > jEnd + maxDistance:
return False
return True
def linkOneAln(args, alns, i):
maxRankDifference = args.tween + 1
jIndexBeg = max(i - maxRankDifference, 0)
jIndexEnd = min(i + maxRankDifference + 1, len(alns))
iRanges, iLines, iRank, iLinks = alns[i]
iSeq, iLen, iBeg, iEnd = iRanges[0]
j = i
while j > jIndexBeg:
j -= 1
jRanges, jLines, jRank, jLinks = alns[j]
jSeq, jLen, jBeg, jEnd = jRanges[0]
if jSeq < iSeq or jEnd + args.distance < iBeg:
break
if isNearInAllSeqs(iRanges, jRanges, args.distance):
if abs(jRank - iRank) <= maxRankDifference:
iLinks.append(j)
j = i + 1
while j < jIndexEnd:
jRanges, jLines, jRank, jLinks = alns[j]
jSeq, jLen, jBeg, jEnd = jRanges[0]
if iSeq < jSeq or iEnd + args.distance < jBeg:
break
if isNearInAllSeqs(iRanges, jRanges, args.distance):
if abs(jRank - iRank) <= maxRankDifference:
iLinks.append(j)
j += 1
def connectedComponent(alns, isMarked, i):
stack = [i]
isMarked[i] = True
while stack:
j = stack.pop()
yield j
for k in alns[j][3]:
if not isMarked[k]:
stack.append(k)
isMarked[k] = True
def connectedComponents(alns):
isMarked = [False for i in alns]
for i in range(len(alns)):
if not isMarked[i]:
yield sorted(connectedComponent(alns, isMarked, i))
def sortKey1(aln):
ranges, lines = aln
return ranges[1]
def main(args):
alns = sorted(alignmentInput(openFile(args.infile)), key=sortKey1)
alns = sorted(aln + (rank, []) for rank, aln in enumerate(alns))
for i in range(len(alns)):
linkOneAln(args, alns, i)
for c in connectedComponents(alns):
if len(c) >= args.count or isCoveringMuchSequence(alns, c):
for i in c:
ranges, lines, rank, links = alns[i]
print(*lines, sep="")
if __name__ == "__main__":
signal.signal(signal.SIGPIPE, signal.SIG_DFL) # avoid silly error message
descr = "Omit any alignment not near (in both sequences) to other alignments."
ap = argparse.ArgumentParser(description=descr, formatter_class=
argparse.ArgumentDefaultsHelpFormatter)
ap.add_argument("-c", "--count", type=int, default=3, metavar="C",
help="minimum number of linked alignments")
ap.add_argument("-d", "--distance", type=float, default=1000000,
metavar="D", help="maximum distance")
ap.add_argument("-t", "--tween", type=float, default=5, metavar="T",
help="maximum number of alignments in between")
ap.add_argument("infile", help="file of sequence alignments in MAF format")
args = ap.parse_args()
main(args)
|