File: test_py_kmc_file.py

package info (click to toggle)
kmc 3.2.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 4,132 kB
  • sloc: cpp: 39,219; python: 372; perl: 179; makefile: 157; sh: 34
file content (218 lines) | stat: -rwxr-xr-x 6,881 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
#!/usr/bin/env python3
'''
A series of test for PyKMCFile class.
'''

import sys
import os
import subprocess
import kmer_utils
import init_sys_path
import py_kmc_api as pka
import pytest
if not init_sys_path.is_windows():
    import resource



@pytest.fixture(scope="module", autouse=True)
def create_kmc_db():
    '''
    Set up tests and clean up after.
    '''
    kmer_len = 17
    memory = 2 #GB
    cutoff_min = 1
    sig_len = 9
    reads_src = 'input.fastq'
    reads = (
        'GGCATTGCATGCAGTNNCAGTCATGCAGTCAGGCAGTCATGGCATGCAACGACGATCAGTCATGGTCGAG',
        'GGCATTGCATGCAGTNNCAGTCATGCAGTCAGGCAGTCATGGCATGCAACGACGATCAGTCATGGTCGAG',
        'GTCGATGCATCGATGCTGATGCTGCTGTGCTAGTAGCGTCTGAGGGCTA'
    )
    _save_reads_as_fastq(reads, reads_src)

    kmers = _cout_kmers(reads, kmer_len)
    absent_kmers = _generate_not_existing_kmers(kmers, kmer_len)
    _run_kmc(cutoff_min, kmer_len, memory, sig_len, reads_src)

    result = {
        'kmers': kmers,
        'kmer_len': kmer_len,
        'sig_len': sig_len,
        'absent_kmers': absent_kmers
    }
    yield result

    os.remove(reads_src)
    os.remove('kmc_db.kmc_pre')
    os.remove('kmc_db.kmc_suf')

def _cout_kmers(reads, kmer_len):
    ''' Simple k-mer counting routine. '''
    kmers = {}
    for read in reads:
        for start in range(0, len(read) - kmer_len + 1):
            kmer = read[start:start+kmer_len]
            if 'N' in kmer:
                continue
            rev = kmer_utils.rev_comp(kmer)
            if rev < kmer:
                kmer = rev
            if kmer in kmers.keys():
                kmers[kmer] += 1
            else:
                kmers[kmer] = 1
    return kmers

def _save_reads_as_fastq(reads, file_name):
    ''' Save reads from input to file named file_name. '''
    file = open(file_name, 'w')
    for read in reads:
        file.write("@TEST\n")
        file.write(read + "\n")
        file.write("+TEST\n")
        file.write("I"*len(read) + "\n")
    file.close()


def _generate_not_existing_kmers(kmers, kmer_len):
    ''' Generate k-mers that are not present in the database.

        :kmers: existing k-mers
        :kmer_len: length of k-mers

    '''
    def increment_kmer(kmer, start):
        ''' Increments k-mer to next lexographical.

        Start from pos :start: (from end, i.e. start = 0 means last k-mer symbol). '''
        def replace_char(string, pos, new_char):
            ''' Create new string with character at :pos: changed to :new_char:. '''
            if pos < 0:
                pos = len(string) + pos
            return string[:pos] + new_char + string[pos+1:]
        for i in range(start, len(kmer)):
            if kmer[-1-i] == 'A':
                return replace_char(kmer, -1 - i, 'C')
            if kmer[-1-i] == 'C':
                return replace_char(kmer, -1 - i, 'G')
            if kmer[-1-i] == 'T':
                return replace_char(kmer, -1 - i, 'T')
            kmer = replace_char(kmer, -1 - i, 'T')
        return kmer

    absent_kmers = []

    for i in range(0, kmer_len):
        for kmer_str in kmers.keys():
            inc_kmer = increment_kmer(kmer_str, i)
            if not inc_kmer in kmers.keys():
                absent_kmers.append(inc_kmer)
    return absent_kmers

def _run_kmc(cutoff_min, kmer_len, memory, sig_len, reads_src):
    ''' Runs kmc. '''
    if init_sys_path.is_linux() or init_sys_path.is_mac():
        kmc_path = os.path.join(os.path.dirname(__file__), '../../bin/kmc')
    elif init_sys_path.is_windows():
        kmc_path = os.path.join(os.path.dirname(__file__), '../../x64/Release/kmer_counter.exe')

    if init_sys_path.is_mac():        
        resource.setrlimit(resource.RLIMIT_NOFILE, (2048, 2048))

    subprocess.call([kmc_path,
                     '-ci{}'.format(cutoff_min),
                     '-k{}'.format(kmer_len),
                     '-m{}'.format(memory),
                     '-p{}'.format(sig_len),
                     reads_src,
                     'kmc_db',
                     '.'
                    ])
    

def _open_for_listing():
    ''' Open kmc database for listing and check if opened sucessfully. '''
    kmc_file = pka.KMCFile()
    assert kmc_file.OpenForListing('kmc_db')
    return kmc_file

def _open_for_ra():
    ''' Open kmc database for random access and check if opened sucessfully. '''
    kmc_file = pka.KMCFile()
    assert kmc_file.OpenForRA('kmc_db')
    return kmc_file

def test_info(create_kmc_db):
    '''
    Test if some fields in object returned from Info are set properly.

    '''
    pattern = create_kmc_db
    kmc_file = _open_for_listing()
    info = kmc_file.Info()
    assert info.kmer_length == pattern['kmer_len']
    assert info.mode == 0 # no Quake mode (quake not supported anymore)
    assert info.counter_size == 1
    assert info.signature_len == pattern['sig_len']
    assert info.min_count == 1
    assert info.both_strands
    assert info.total_kmers == len(pattern['kmers'])

def test_kmc_file_next_kmer(create_kmc_db):
    ''' Test if all counted k-mers are returned by KMC API using NextKmer method. '''
    pattern = create_kmc_db['kmers']
    kmc_file = _open_for_listing()
    counter = pka.Count()
    kmer = pka.KmerAPI(create_kmc_db['kmer_len'])
    res = {}
    while kmc_file.ReadNextKmer(kmer, counter):
        res[str(kmer)] = counter.value
    assert res == pattern

def test_get_counters_for_read(create_kmc_db):
    ''' Test case for GetCountersForRead method of KMCFile. '''
    kmers = create_kmc_db['kmers']
    read = "GGCATTGCATGCAGTNNCAGTCATGCAGTCAGGCAGTCATGGCATGCGTAAACGACGATCAGTCATGGTCGAG"
    pattern = []
    kmer_len = create_kmc_db['kmer_len']
    for i in range(0, len(read) - kmer_len + 1):
        kmer = read[i:i+kmer_len]
        if 'N' in kmer:
            pattern.append(0)
            continue
        rev = kmer_utils.rev_comp(kmer)
        if rev < kmer:
            kmer = rev
        if not kmer in kmers.keys():
            pattern.append(0)
        else:
            pattern.append(kmers[kmer])

    kmc_file = _open_for_ra()
    res = pka.CountVec()
    kmc_file.GetCountersForRead(read, res)
    assert res.value == pattern


def test_check_kmer(create_kmc_db):
    '''
    Test case for CheckKmer method.

    Check if are k-mers from input are present in the database and
    if some not present in the input are absent in the output.
    '''
    kmers = create_kmc_db['kmers']
    kmer_len = create_kmc_db['kmer_len']
    kmer = pka.KmerAPI(kmer_len)
    counter = pka.Count()
    kmc_file = _open_for_ra()
    for kmer_str, count in kmers.items():
        kmer.from_string(kmer_str)
        assert kmc_file.CheckKmer(kmer, counter)
        assert counter.value == count
    absent_kmers = create_kmc_db['absent_kmers']
    for kmer_str in absent_kmers:
        kmer.from_string(kmer_str)
        assert not kmc_file.CheckKmer(kmer, counter)