1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
|
#! /usr/bin/env python3
# Author: Martin C. Frith 2021
# xxx This is inefficient: ok for smallish data
from __future__ import print_function
import collections
import gzip
import itertools
import optparse
import signal
import sys
def openFile(fileName):
if fileName == "-":
return sys.stdin
if fileName.endswith(".gz"):
return gzip.open(fileName, "rt")
return open(fileName)
def fastaInput(fileNames):
for f in fileNames:
head = None
for line in openFile(f):
if line[0] == ">":
if head:
yield head, "".join(body)
head = line
body = []
else:
body.append(line.rstrip().upper())
if head:
yield head, "".join(body)
def sortKey(seqRecord):
name, seq = seqRecord
return -len(seq), seq
def nonContainedSeqRecords(seqRecords):
maxPowerOfTwo = 3 # xxx ???
minSeqLen = min(len(seq) for name, seq in seqRecords if seq)
for minPowerOfTwo in range(maxPowerOfTwo + 1):
wordLen = 2 ** minPowerOfTwo
if wordLen * 2 > minSeqLen:
break
wordLengths = [2 ** i for i in range(minPowerOfTwo, maxPowerOfTwo + 1)]
index = collections.defaultdict(list)
for record in seqRecords:
name, seq = record
if not seq:
break
seqLen = len(seq)
wordLengths = [i for i in wordLengths if i <= seqLen]
wordLen = wordLengths[-1]
word = seq[:wordLen]
if all(seq not in oldSeq for oldSeq in index[word]):
for wordLen in wordLengths:
words = set(seq[i-wordLen:i] for i in range(wordLen, seqLen))
for word in words:
index[word].append(seq)
yield record
def main(opts, args):
if not args:
args = ["-"]
records = sorted(fastaInput(args), key=sortKey)
records = [next(v) for k, v in itertools.groupby(records, sortKey)]
if opts.s:
records = list(nonContainedSeqRecords(records))
for i, x in records:
print(i, x, sep="")
if __name__ == "__main__":
signal.signal(signal.SIGPIPE, signal.SIG_DFL) # avoid silly error message
usage = "%prog [options] seqs.fasta"
descr = "Keep the first of identical sequences (after converting them to uppercase)."
op = optparse.OptionParser(usage=usage, description=descr)
op.add_option("-s", action="store_true", help=
"omit any sequence that is a shorter substring of another")
opts, args = op.parse_args()
main(opts, args)
|