File: _cf_mixin.py

package info (click to toggle)
python-fakeredis 2.29.0-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,772 kB
  • sloc: python: 19,002; sh: 8; makefile: 5
file content (188 lines) | stat: -rw-r--r-- 7,328 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""Command mixin for emulating `redis-py`'s cuckoo filter functionality."""

import io
from typing import List, Any

from probables import CountingCuckooFilter, CuckooFilterFullError

from fakeredis import _msgs as msgs
from fakeredis._command_args_parsing import extract_args
from fakeredis._commands import command, CommandItem, Int, Key
from fakeredis._helpers import SimpleError, OK, casematch, SimpleString


class ScalableCuckooFilter(CountingCuckooFilter):
    def __init__(self, capacity: int, bucket_size: int = 2, max_iterations: int = 20, expansion: int = 1):
        super().__init__(capacity, bucket_size, max_iterations, expansion)
        self.initial_capacity: int = capacity
        self.inserted: int = 0
        self.deleted: int = 0

    def insert(self, item: bytes) -> bool:
        try:
            super().add(item)
        except CuckooFilterFullError:
            return False
        self.inserted += 1
        return True

    def count(self, item: bytes) -> int:
        return super().check(item)

    def delete(self, item: bytes) -> bool:
        if super().remove(item):
            self.deleted += 1
            return True
        return False


class CFCommandsMixin:
    @staticmethod
    def _cf_add(key: CommandItem, item: bytes) -> int:
        if key.value is None:
            key.update(ScalableCuckooFilter(1024))
        res = key.value.insert(item)  # type:ignore
        key.updated()
        return 1 if res else 0

    @staticmethod
    def _cf_exist(key: CommandItem, item: bytes) -> int:
        return 1 if (item in key.value) else 0

    @command(name="CF.ADD", fixed=(Key(ScalableCuckooFilter), bytes), repeat=())
    def cf_add(self, key: CommandItem, value: bytes) -> int:
        return CFCommandsMixin._cf_add(key, value)

    @command(name="CF.ADDNX", fixed=(Key(ScalableCuckooFilter), bytes), repeat=())
    def cf_addnx(self, key: CommandItem, value: bytes) -> int:
        if value in key.value:
            return 0
        return CFCommandsMixin._cf_add(key, value)

    @command(name="CF.COUNT", fixed=(Key(ScalableCuckooFilter), bytes), repeat=())
    def cf_count(self, key: CommandItem, item: bytes) -> int:
        return 1 if self._cf_exist(key, item) else 0  # todo

    @command(name="CF.DEL", fixed=(Key(ScalableCuckooFilter), bytes), repeat=())
    def cf_del(self, key: CommandItem, value: bytes) -> int:
        if key.value is None:
            raise SimpleError(msgs.NOT_FOUND_MSG)
        res = key.value.delete(value)
        return 1 if res else 0

    @command(name="CF.EXISTS", fixed=(Key(ScalableCuckooFilter), bytes), repeat=())
    def cf_exist(self, key: CommandItem, value: bytes) -> int:
        return CFCommandsMixin._cf_exist(key, value)

    @command(name="CF.INFO", fixed=(Key(),), repeat=())
    def cf_info(self, key: CommandItem) -> List[Any]:
        if key.value is None or type(key.value) is not ScalableCuckooFilter:
            raise SimpleError("...")
        return [
            b"Size",
            key.value.capacity,
            b"Number of buckets",
            len(key.value.buckets),
            b"Number of filters",
            (key.value.capacity / key.value.initial_capacity) / key.value.expansion_rate,
            b"Number of items inserted",
            key.value.inserted,
            b"Number of items deleted",
            key.value.deleted,
            b"Bucket size",
            key.value.bucket_size,
            b"Max iterations",
            key.value.max_swaps,
            b"Expansion rate",
            key.value.expansion_rate,
        ]

    @command(name="CF.INSERT", fixed=(Key(),), repeat=(bytes,))
    def cf_insert(self, key: CommandItem, *args: bytes) -> List[int]:
        (capacity, no_create), left_args = extract_args(
            args, ("+capacity", "nocreate"), error_on_unexpected=False, left_from_first_unexpected=True
        )
        # if no_create and (capacity is not None or error_rate is not None):
        #     raise SimpleError("...")
        if len(left_args) < 2 or not casematch(left_args[0], b"items"):
            raise SimpleError("...")
        items = left_args[1:]
        capacity = capacity or 1024

        if key.value is None and no_create:
            raise SimpleError(msgs.NOT_FOUND_MSG)
        if key.value is None:
            key.value = ScalableCuckooFilter(capacity)
        res = list()
        for item in items:
            res.append(self._cf_add(key, item))
        key.updated()
        return res

    @command(name="CF.INSERTNX", fixed=(Key(),), repeat=(bytes,))
    def cf_insertnx(self, key: CommandItem, *args: bytes) -> List[int]:
        (capacity, no_create), left_args = extract_args(
            args, ("+capacity", "nocreate"), error_on_unexpected=False, left_from_first_unexpected=True
        )
        # if no_create and (capacity is not None or error_rate is not None):
        #     raise SimpleError("...")
        if len(left_args) < 2 or not casematch(left_args[0], b"items"):
            raise SimpleError("...")
        items = left_args[1:]
        capacity = capacity or 1024
        if key.value is None and no_create:
            raise SimpleError(msgs.NOT_FOUND_MSG)
        if key.value is None:
            key.value = ScalableCuckooFilter(capacity)
        res = list()
        for item in items:
            if item in key.value:
                res.append(0)
            else:
                res.append(self._cf_add(key, item))
        key.updated()
        return res

    @command(name="CF.MEXISTS", fixed=(Key(ScalableCuckooFilter), bytes), repeat=(bytes,))
    def cf_mexists(self, key: CommandItem, *values: bytes) -> List[int]:
        res = list()
        for value in values:
            res.append(CFCommandsMixin._cf_exist(key, value))
        return res

    @command(name="CF.RESERVE", fixed=(Key(), Int), repeat=(bytes,), flags=msgs.FLAG_LEAVE_EMPTY_VAL)
    def cf_reserve(self, key: CommandItem, capacity: int, *args: bytes) -> SimpleString:
        if key.value is not None:
            raise SimpleError(msgs.ITEM_EXISTS_MSG)
        (bucket_size, max_iterations, expansion), _ = extract_args(
            args, ("+bucketsize", "+maxiterations", "+expansion")
        )

        max_iterations = max_iterations or 20
        bucket_size = bucket_size or 2
        value = ScalableCuckooFilter(capacity, bucket_size=bucket_size, max_iterations=max_iterations)
        key.update(value)
        return OK

    @command(name="CF.SCANDUMP", fixed=(Key(), Int), repeat=(), flags=msgs.FLAG_LEAVE_EMPTY_VAL)
    def cf_scandump(self, key: CommandItem, iterator: int) -> List[Any]:
        if key.value is None:
            raise SimpleError(msgs.NOT_FOUND_MSG)
        f = io.BytesIO()

        if iterator == 0:
            key.value.tofile(f)
            f.seek(0)
            s = f.read()
            f.close()
            return [1, s]
        else:
            return [0, None]

    @command(name="CF.LOADCHUNK", fixed=(Key(), Int, bytes), repeat=(), flags=msgs.FLAG_LEAVE_EMPTY_VAL)
    def cf_loadchunk(self, key: CommandItem, _: int, data: bytes) -> SimpleString:
        if key.value is not None and type(key.value) is not ScalableCuckooFilter:
            raise SimpleError(msgs.NOT_FOUND_MSG)
        key.value = ScalableCuckooFilter.frombytes(data)
        key.updated()
        return OK