File: kmer.py

package info (click to toggle)
dnapi 1.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, forky, sid, trixie
  • size: 42,284 kB
  • sloc: python: 831; sh: 62; makefile: 13
file content (99 lines) | stat: -rw-r--r-- 2,778 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""Functions to handle k-mer related proceses.

"""

import sys
import re
from operator import itemgetter


def _calc_overlap(x, y, seed):
    """Return an overlapping position between a pair of k-mers.

    """
    if not x or not y:
        return 0
    for m in re.finditer(y[:seed], x):
        overlap = seed
        p = m.end()
        if len(x) == p:
            return overlap
        tail = re.search(x[p:], y)
        if not tail:
            continue
        if tail.start() == seed:
            return tail.end()
    return 0


def filter_kmers(kmers, kmer_len, rate):
    """Return a clean set of k-mers in tuple.

       Filter low-complexity and low-frequency kmers.
    """
    low_comp = [re.compile(base * (kmer_len//2)) for base in "ACGTN"]
    i, x = -1, -1
    while x != len(low_comp):
        i += 1
        x = sum([not p.findall(kmers[i][0]) for p in low_comp])
    max_hits = kmers[i][1]

    clean = []
    total = 0
    for s, n in kmers[i:]:
        if sum([not p.findall(s) for p in low_comp]) != len(low_comp):
            continue
        if float(max_hits)/n > rate:
            break
        clean.append((s, n))
        total += n
    return [(s, round(float(n)/total*100, 4)) for s, n in clean]


def assemble_kmers(kmers, seed):
    """Return assembled k-mers and the frequency in tuple.

       Assemble given k-mers by checking suffix-prefix matches.
    """
    pre_l, new_l = 0, len(kmers)
    while pre_l != new_l:
        pre_l = len(kmers)
        for i in range(pre_l):
            kmer, hits = kmers[i]
            if not hits:
                continue
            max_o, max_j = 0, 0
            for j in range(pre_l):
                if i == j:
                    continue
                if kmers[j][0] in kmer:
                    hits += kmers[j][1]
                    kmers[i] = (kmer, hits)
                    kmers[j] = ('', 0)
                    continue
                overlap = _calc_overlap(kmer, kmers[j][0], seed)
                if overlap > max_o:
                    max_o, max_j = overlap, j
            if max_o > 0:
                kmer += kmers[max_j][0][max_o:]
                hits += kmers[max_j][1]
                kmers[i] = (kmer, hits)
                kmers[max_j] = ('', 0)
        kmers = [k for k in kmers if k != ('', 0)]
        new_l = len(kmers)
    return kmers


def count_kmers(seq_list, kmer_len, sample_num):
    """Return sorted k-mer frequency.

    """
    freq = {}
    for cnt, seq in enumerate(seq_list):
        if cnt == sample_num:
            break
        interval = len(seq) - kmer_len + 1
        for i in range(interval):
            kmer = seq[i : i+kmer_len]
            freq[kmer] = freq.get(kmer, 0) + 1
    return sorted(freq.items(), key=itemgetter(1), reverse=True)