File: test_hypothesis.py

package info (click to toggle)
rapidfuzz 3.12.2%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,436 kB
  • sloc: python: 7,571; cpp: 7,481; sh: 30; makefile: 23
file content (468 lines) | stat: -rw-r--r-- 14,447 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
from __future__ import annotations

import random
from itertools import product
from string import ascii_letters, digits, punctuation

import hypothesis.strategies as st
import pytest
from hypothesis import assume, given, settings

from rapidfuzz import fuzz, process, utils_cpp, utils_py
from rapidfuzz.distance import Levenshtein_py, metrics_cpp
from tests.distance.common import Indel, JaroWinkler, Levenshtein


def isclose(a, b, rel_tol=1e-09, abs_tol=0.0):
    return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)


def levenshtein(s1, s2, weights=(1, 1, 1)):
    """
    python implementation of a generic Levenshtein distance
    this is much less error prone, than the bitparallel C implementations
    and is therefore used to test the C implementation
    However this makes this very slow even for testing purposes
    """

    rows = len(s1) + 1
    cols = len(s2) + 1
    insert, delete, substitute = weights

    dist = [[0 for x in range(cols)] for x in range(rows)]

    for row in range(1, rows):
        dist[row][0] = row * delete

    for col in range(1, cols):
        dist[0][col] = col * insert

    for col in range(1, cols):
        for row in range(1, rows):
            cost = 0 if s1[row - 1] == s2[col - 1] else substitute

            dist[row][col] = min(
                dist[row - 1][col] + delete,  # deletion
                dist[row][col - 1] + insert,  # insertion
                dist[row - 1][col - 1] + cost,  # substitution
            )

    return dist[-1][-1]


def normalize_distance(dist, s1, s2, weights=(1, 1, 1)):
    insert, delete, substitute = weights
    if len(s1) > len(s2):
        max_dist = min(
            [
                # delete all characters from s1 and insert all characters from s2
                len(s1) * delete + len(s2) * insert,
                # replace all characters and delete the remaining characters from s1
                len(s2) * substitute + (len(s1) - len(s2)) * delete,
            ]
        )
    else:
        max_dist = min(
            [
                # delete all characters from s1 and insert all characters from s2
                len(s1) * delete + len(s2) * insert,
                # replace all characters and insert the remaining characters into s1
                len(s1) * substitute + (len(s2) - len(s1)) * insert,
            ]
        )

    return 1 - 1 * float(dist) / float(max_dist) if max_dist else 1


def jaro_similarity(pattern, text):
    if not pattern and not text:
        return 1.0

    P_flag = [0] * (len(pattern) + 1)
    T_flag = [0] * (len(text) + 1)

    Bound = max(len(pattern), len(text)) // 2
    Bound = max(Bound - 1, 0)

    CommonChars = 0
    for i in range(len(text)):
        lowlim = i - Bound if i >= Bound else 0
        hilim = i + Bound if i + Bound <= len(pattern) - 1 else len(pattern) - 1

        for j in range(lowlim, hilim + 1):
            if not P_flag[j] and pattern[j] == text[i]:
                T_flag[i] = 1
                P_flag[j] = 1
                CommonChars += 1
                break

    if not CommonChars:
        return 0

    Transpositions = 0
    k = 0
    for i in range(len(text)):
        if T_flag[i]:
            j = k
            while j < len(pattern):
                if P_flag[j]:
                    k = j + 1
                    break
                j += 1

            if text[i] != pattern[j]:
                Transpositions += 1

    Transpositions = Transpositions // 2

    sim = CommonChars / len(pattern) + CommonChars / len(text) + (CommonChars - Transpositions) / CommonChars
    return sim / 3


def jaro_winkler_similarity(pattern, text, prefix_weight=0.1):
    min_len = min(len(pattern), len(text))
    prefix = 0
    max_prefix = min(min_len, 4)

    while prefix < max_prefix:
        if text[prefix] != pattern[prefix]:
            break
        prefix += 1

    Sim = jaro_similarity(pattern, text)
    if Sim > 0.7:
        Sim += prefix * prefix_weight * (1.0 - Sim)

    return Sim


def partial_ratio_short_needle_impl(s1, s2):
    if not s1 and not s2:
        return 100

    if not s1 or not s2:
        return 0

    if len(s1) > len(s2):
        return partial_ratio_short_needle_impl(s2, s1)
    parts = [s2[max(0, i) : min(len(s2), i + len(s1))] for i in range(-len(s1), len(s2))]
    res = 0
    for part in parts:
        res = max(res, fuzz.ratio(s1, part))
    return res


def partial_ratio_short_needle(s1, s2):
    if len(s1) != len(s2):
        return partial_ratio_short_needle_impl(s1, s2)

    return max(
        partial_ratio_short_needle_impl(s1, s2),
        partial_ratio_short_needle_impl(s2, s1),
    )


def cdist_scorer(queries, choices, scorer):
    import numpy as np

    matrix = np.zeros((len(queries), len(choices)), dtype=np.uint8)

    for i, query in enumerate(queries):
        for j, choice in enumerate(choices):
            matrix[i, j] = scorer(query, choice)

    return matrix


def cdist_distance(queries, choices, scorer):
    import numpy as np

    matrix = np.zeros((len(queries), len(choices)), dtype=np.int32)

    for i, query in enumerate(queries):
        for j, choice in enumerate(choices):
            matrix[i, j] = scorer(query, choice)

    return matrix


HYPOTHESIS_ALPHABET = ascii_letters + digits + punctuation

SCORERS = [
    fuzz.ratio,
    fuzz.partial_ratio,
    fuzz.token_set_ratio,
    fuzz.token_sort_ratio,
    fuzz.token_ratio,
    fuzz.partial_token_set_ratio,
    fuzz.partial_token_sort_ratio,
    fuzz.partial_token_ratio,
    fuzz.WRatio,
    fuzz.QRatio,
]

FULL_SCORERS = [fuzz.ratio, fuzz.WRatio, fuzz.QRatio]

PROCESSORS = [lambda x: x, utils_cpp.default_process, utils_py.default_process]


@given(s1=st.text(), s2=st.text())
@settings(max_examples=100, deadline=None)
def test_matching_blocks(s1, s2):
    """
    test correct matching block conversion
    """
    ops = Levenshtein.editops(s1, s2)
    assert ops.as_matching_blocks() == ops.as_opcodes().as_matching_blocks()


@given(s1=st.text(), s2=st.text())
@settings(max_examples=100, deadline=None)
def test_levenshtein_editops(s1, s2):
    """
    test Levenshtein.editops with any sizes
    """
    ops = Levenshtein.editops(s1, s2)
    assert ops.apply(s1, s2) == s2


@given(s1=st.text(min_size=65), s2=st.text(min_size=65))
@settings(max_examples=50, deadline=None)
def test_levenshtein_editops_block(s1, s2):
    """
    test Levenshtein.editops for long strings
    """
    ops = Levenshtein.editops(s1, s2)
    assert ops.apply(s1, s2) == s2


@given(s1=st.text(), s2=st.text())
@settings(max_examples=100, deadline=None)
def test_indel_editops(s1, s2):
    """
    test Indel.editops with any sizes
    """
    ops = Indel.editops(s1, s2)
    assert ops.apply(s1, s2) == s2


@given(s1=st.text(min_size=65), s2=st.text(min_size=65))
@settings(max_examples=50, deadline=None)
def test_indel_editops_block(s1, s2):
    """
    test Indel.editops for long strings
    """
    ops = Indel.editops(s1, s2)
    assert ops.apply(s1, s2) == s2


@given(s1=st.text(), s2=st.text())
@settings(max_examples=100, deadline=None)
def test_levenshtein_opcodes(s1, s2):
    """
    test Levenshtein.opcodes with any sizes
    """
    ops = Levenshtein.opcodes(s1, s2)
    assert ops.apply(s1, s2) == s2


@given(s1=st.text(min_size=65), s2=st.text(min_size=65))
@settings(max_examples=50, deadline=None)
def test_levenshtein_opcodes_block(s1, s2):
    """
    test Levenshtein.opcodes for long strings
    """
    ops = Levenshtein.opcodes(s1, s2)
    assert ops.apply(s1, s2) == s2


@given(s1=st.text(), s2=st.text())
@settings(max_examples=100, deadline=None)
def test_indel_opcodes(s1, s2):
    """
    test Indel.opcodes with any sizes
    """
    ops = Indel.opcodes(s1, s2)
    assert ops.apply(s1, s2) == s2


@given(s1=st.text(min_size=65), s2=st.text(min_size=65))
@settings(max_examples=50, deadline=None)
def test_indel_opcodes_block(s1, s2):
    """
    test Indel.opcodes for long strings
    """
    ops = Indel.opcodes(s1, s2)
    assert ops.apply(s1, s2) == s2


@given(s1=st.text(max_size=64), s2=st.text())
@settings(max_examples=50, deadline=1000)
def test_partial_ratio_short_needle(s1, s2):
    """
    test partial_ratio for short needles (needle <= 64)
    """
    assert isclose(fuzz.partial_ratio(s1, s2), partial_ratio_short_needle(s1, s2))


@given(s1=st.text(), s2=st.text())
@settings(max_examples=50, deadline=1000)
def test_token_ratio(s1, s2):
    """
    token_ratio should be max(token_sort_ratio, token_set_ratio)
    """
    assert fuzz.token_ratio(s1, s2) == max(fuzz.token_sort_ratio(s1, s2), fuzz.token_set_ratio(s1, s2))


@given(s1=st.text(), s2=st.text())
@settings(max_examples=50, deadline=1000)
def test_partial_token_ratio(s1, s2):
    """
    partial_token_ratio should be max(partial_token_sort_ratio, partial_token_set_ratio)
    """
    assert fuzz.partial_token_ratio(s1, s2) == max(
        fuzz.partial_token_sort_ratio(s1, s2), fuzz.partial_token_set_ratio(s1, s2)
    )


@given(s1=st.text(max_size=64), s2=st.text(max_size=64))
@settings(max_examples=50, deadline=None)
def test_levenshtein_word(s1, s2):
    """
    Test short Levenshtein implementation against simple implementation
    """
    # uniform Levenshtein
    # distance
    reference_dist = levenshtein(s1, s2)
    assert Levenshtein.distance(s1, s2) == reference_dist
    # normalized distance
    reference_sim = normalize_distance(reference_dist, s1, s2)
    assert isclose(Levenshtein.normalized_similarity(s1, s2), reference_sim)

    # InDel-Distance
    # distance
    reference_dist = levenshtein(s1, s2, weights=(1, 1, 2))
    assert Indel.distance(s1, s2) == reference_dist
    # normalized distance
    reference_sim = normalize_distance(reference_dist, s1, s2, weights=(1, 1, 2))
    assert isclose(Indel.normalized_similarity(s1, s2), reference_sim)


@given(s1=st.text(min_size=65), s2=st.text(min_size=65))
@settings(max_examples=50, deadline=None)
def test_levenshtein_block(s1, s2):
    """
    Test blockwise Levenshtein implementation against simple implementation
    """
    # uniform Levenshtein
    # distance
    reference_dist = levenshtein(s1, s2)
    assert Levenshtein.distance(s1, s2) == reference_dist
    # normalized distance
    reference_sim = normalize_distance(reference_dist, s1, s2)
    assert isclose(Levenshtein.normalized_similarity(s1, s2), reference_sim)

    # InDel-Distance
    # distance
    reference_dist = levenshtein(s1, s2, weights=(1, 1, 2))
    assert Indel.distance(s1, s2) == reference_dist
    # normalized distance
    reference_sim = normalize_distance(reference_dist, s1, s2, weights=(1, 1, 2))
    assert isclose(Indel.normalized_similarity(s1, s2), reference_sim)


@given(s1=st.text(), s2=st.text())
@settings(max_examples=50, deadline=None)
def test_levenshtein_random(s1, s2):
    """
    Test mixed strings to test through all implementations of Levenshtein
    """
    # uniform Levenshtein
    # distance
    reference_dist = levenshtein(s1, s2)
    assert Levenshtein.distance(s1, s2) == reference_dist
    # normalized distance
    reference_sim = normalize_distance(reference_dist, s1, s2)
    assert isclose(Levenshtein.normalized_similarity(s1, s2), reference_sim)

    # InDel-Distance
    # distance
    reference_dist = levenshtein(s1, s2, weights=(1, 1, 2))
    assert Indel.distance(s1, s2) == reference_dist
    # normalized distance
    reference_sim = normalize_distance(reference_dist, s1, s2, weights=(1, 1, 2))
    assert isclose(Indel.normalized_similarity(s1, s2), reference_sim)


@given(sentence=st.text())
@settings(max_examples=50, deadline=1000)
def test_multiple_processor_runs(sentence):
    """
    Test that running a preprocessor on a sentence
    a second time does not change the result
    """
    assert utils_py.default_process(sentence) == utils_py.default_process(utils_py.default_process(sentence))
    assert utils_cpp.default_process(sentence) == utils_cpp.default_process(utils_cpp.default_process(sentence))


@pytest.mark.parametrize(("scorer", "processor"), list(product(FULL_SCORERS, PROCESSORS)))
@given(choices=st.lists(st.text(), min_size=1))
@settings(max_examples=50, deadline=1000)
def test_only_identical_strings_extracted(scorer, processor, choices):
    """
    Test that only identical (post processing) strings score 100 on the test.
    If two strings are not identical then using full comparison methods they should
    not be a perfect (100) match.
    :param scorer:
    :param processor:
    :param data:
    :return:
    """
    query = random.choice(choices)
    assume(processor(query))

    matches = process.extract(query, choices, scorer=scorer, processor=processor, score_cutoff=100, limit=None)

    assert matches != []

    for match in matches:
        assert processor(query) == processor(match[0])


@given(queries=st.lists(st.text(), min_size=1), choices=st.lists(st.text(), min_size=1))
@settings(max_examples=50, deadline=1000)
def test_cdist(queries, choices):
    """
    Test that cdist returns correct results
    """
    pytest.importorskip("numpy")
    reference_matrix = cdist_distance(queries, choices, scorer=metrics_cpp.levenshtein_distance)
    matrix1 = process.cdist(queries, choices, scorer=metrics_cpp.levenshtein_distance)
    matrix2 = process.cdist(queries, choices, scorer=Levenshtein_py.distance)
    assert (matrix1 == reference_matrix).all()
    assert (matrix2 == reference_matrix).all()

    reference_matrix = cdist_distance(queries, queries, scorer=metrics_cpp.levenshtein_distance)
    matrix1 = process.cdist(queries, queries, scorer=metrics_cpp.levenshtein_distance)
    matrix2 = process.cdist(queries, queries, scorer=Levenshtein_py.distance)
    assert (matrix1 == reference_matrix).all()
    assert (matrix2 == reference_matrix).all()


@given(s1=st.text(max_size=64), s2=st.text(max_size=64))
@settings(max_examples=50, deadline=1000)
def test_jaro_winkler_word(s1, s2):
    assert isclose(jaro_winkler_similarity(s1, s2), JaroWinkler.similarity(s1, s2))


@given(s1=st.text(min_size=65), s2=st.text(min_size=65))
@settings(max_examples=50, deadline=None)
def test_jaro_winkler_block(s1, s2):
    assert isclose(jaro_winkler_similarity(s1, s2), JaroWinkler.similarity(s1, s2))


@given(s1=st.text(), s2=st.text())
@settings(max_examples=50, deadline=None)
def test_jaro_winkler_random(s1, s2):
    print(s1, s2)
    assert isclose(jaro_winkler_similarity(s1, s2), JaroWinkler.similarity(s1, s2))