File: _zset.py

package info (click to toggle)
python-fakeredis 2.29.0-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,772 kB
  • sloc: python: 19,002; sh: 8; makefile: 5
file content (100 lines) | stat: -rw-r--r-- 3,482 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from typing import Any, Tuple, Optional, Generator, Dict, ItemsView

import sortedcontainers


class ZSet:
    def __init__(self) -> None:
        self._bylex: Dict[bytes, float] = {}  # Maps value to score
        self._byscore = sortedcontainers.SortedList()

    def __contains__(self, value: bytes) -> bool:
        return value in self._bylex

    def add(self, value: bytes, score: float) -> bool:
        """Update the item and return whether it modified the zset"""
        old_score = self._bylex.get(value, None)
        if old_score is not None:
            if score == old_score:
                return False
            self._byscore.remove((old_score, value))
        self._bylex[value] = score
        self._byscore.add((score, value))
        return True

    def __setitem__(self, value: bytes, score: float) -> None:
        self.add(value, score)

    def __getitem__(self, key: bytes) -> float:
        return self._bylex[key]

    def get(self, key: bytes, default: Optional[float] = None) -> Optional[float]:
        return self._bylex.get(key, default)

    def __len__(self) -> int:
        return len(self._bylex)

    def __iter__(self) -> Generator[Any, Any, None]:
        def gen() -> Generator[Any, Any, None]:
            for score, value in self._byscore:
                yield value

        return gen()

    def discard(self, key: bytes) -> None:
        try:
            score = self._bylex.pop(key)
        except KeyError:
            return
        else:
            self._byscore.remove((score, key))

    def zcount(self, _min: float, _max: float) -> int:
        pos1: int = self._byscore.bisect_left(_min)
        pos2: int = self._byscore.bisect_left(_max)
        return max(0, pos2 - pos1)

    def zlexcount(self, min_value: float, min_exclusive: bool, max_value: float, max_exclusive: bool) -> int:
        pos1: int
        pos2: int
        if not self._byscore:
            return 0
        score = self._byscore[0][0]
        if min_exclusive:
            pos1 = self._byscore.bisect_right((score, min_value))
        else:
            pos1 = self._byscore.bisect_left((score, min_value))
        if max_exclusive:
            pos2 = self._byscore.bisect_left((score, max_value))
        else:
            pos2 = self._byscore.bisect_right((score, max_value))
        return max(0, pos2 - pos1)

    def islice_score(self, start: int, stop: int, reverse: bool = False) -> Any:
        return self._byscore.islice(start, stop, reverse)

    def irange_lex(
        self, start: bytes, stop: bytes, inclusive: Tuple[bool, bool] = (True, True), reverse: bool = False
    ) -> Any:
        if not self._byscore:
            return iter([])
        default_score = self._byscore[0][0]
        start_score = self._bylex.get(start, default_score)
        stop_score = self._bylex.get(stop, default_score)
        it = self._byscore.irange(
            (start_score, start),
            (stop_score, stop),
            inclusive=inclusive,
            reverse=reverse,
        )
        return (item[1] for item in it)

    def irange_score(self, start: Tuple[Any, bytes], stop: Tuple[Any, bytes], reverse: bool) -> Any:
        return self._byscore.irange(start, stop, reverse=reverse)

    def rank(self, member: bytes) -> Tuple[int, float]:
        ind: int = self._byscore.index((self._bylex[member], member))
        return ind, self._byscore[ind][0]

    def items(self) -> ItemsView[bytes, Any]:
        return self._bylex.items()