File: fasta-nr

package info (click to toggle)
last-align 1609-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 14,656 kB
  • sloc: cpp: 44,297; python: 5,192; ansic: 1,937; sh: 651; makefile: 457
file content (82 lines) | stat: -rwxr-xr-x 2,590 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#! /usr/bin/env python3
# Author: Martin C. Frith 2021

# xxx This is inefficient: ok for smallish data

from __future__ import print_function

import collections
import gzip
import itertools
import optparse
import signal
import sys

def openFile(fileName):
    if fileName == "-":
        return sys.stdin
    if fileName.endswith(".gz"):
        return gzip.open(fileName, "rt")
    return open(fileName)

def fastaInput(fileNames):
    for f in fileNames:
        head = None
        for line in openFile(f):
            if line[0] == ">":
                if head:
                    yield head, "".join(body)
                head = line
                body = []
            else:
                body.append(line.rstrip().upper())
        if head:
            yield head, "".join(body)

def sortKey(seqRecord):
    name, seq = seqRecord
    return -len(seq), seq

def nonContainedSeqRecords(seqRecords):
    maxPowerOfTwo = 3  # xxx ???
    minSeqLen = min(len(seq) for name, seq in seqRecords if seq)
    for minPowerOfTwo in range(maxPowerOfTwo + 1):
        wordLen = 2 ** minPowerOfTwo
        if wordLen * 2 > minSeqLen:
            break
    wordLengths = [2 ** i for i in range(minPowerOfTwo, maxPowerOfTwo + 1)]
    index = collections.defaultdict(list)
    for record in seqRecords:
        name, seq = record
        if not seq:
            break
        seqLen = len(seq)
        wordLengths = [i for i in wordLengths if i <= seqLen]
        wordLen = wordLengths[-1]
        word = seq[:wordLen]
        if all(seq not in oldSeq for oldSeq in index[word]):
            for wordLen in wordLengths:
                words = set(seq[i-wordLen:i] for i in range(wordLen, seqLen))
                for word in words:
                    index[word].append(seq)
            yield record

def main(opts, args):
    if not args:
        args = ["-"]
    records = sorted(fastaInput(args), key=sortKey)
    records = [next(v) for k, v in itertools.groupby(records, sortKey)]
    if opts.s:
        records = list(nonContainedSeqRecords(records))
    for i, x in records:
        print(i, x, sep="")

if __name__ == "__main__":
    signal.signal(signal.SIGPIPE, signal.SIG_DFL)  # avoid silly error message
    usage = "%prog [options] seqs.fasta"
    descr = "Keep the first of identical sequences (after converting them to uppercase)."
    op = optparse.OptionParser(usage=usage, description=descr)
    op.add_option("-s", action="store_true", help=
                  "omit any sequence that is a shorter substring of another")
    opts, args = op.parse_args()
    main(opts, args)