File: _topk.py

package info (click to toggle)
python-fakeredis 2.29.0-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,772 kB
  • sloc: python: 19,002; sh: 8; makefile: 5
file content (103 lines) | stat: -rw-r--r-- 3,399 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import heapq
import random
import time
from typing import List, Optional, Tuple


class Bucket(object):
    def __init__(self, counter: int, fingerprint: int):
        self.counter = counter
        self.fingerprint = fingerprint

    def add(self, fingerprint: int, incr: int, decay: float) -> int:
        if self.fingerprint == fingerprint:
            self.counter += incr
            return self.counter
        elif self._decay(decay):
            self.counter += incr
            self.fingerprint = fingerprint
            return self.counter
        return 0

    def count(self, fingerprint: int) -> int:
        if self.fingerprint == fingerprint:
            return self.counter
        return 0

    def _decay(self, decay: float) -> bool:
        if self.counter > 0:
            probability = decay**self.counter
            if probability >= 1 or random.random() < probability:
                self.counter -= 1
        return self.counter == 0


class HashArray(object):
    def __init__(self, width: int, decay: float):
        self.width = width
        self.decay = decay
        self.array = [Bucket(0, 0) for _ in range(width)]
        self._seed = random.getrandbits(32)

    def count(self, item: bytes) -> int:
        return self.get_bucket(item).count(self._hash(item))

    def add(self, item: bytes, incr: int) -> int:
        bucket = self.get_bucket(item)
        return bucket.add(self._hash(item), incr, self.decay)

    def get_bucket(self, item: bytes) -> Bucket:
        return self.array[self._hash(item) % self.width]

    def _hash(self, item: bytes) -> int:
        return hash(item) ^ self._seed


class HeavyKeeper(object):
    is_topk_initialized = False

    def __init__(self, k: int, width: int = 1024, depth: int = 5, decay: float = 0.9) -> None:
        if not HeavyKeeper.is_topk_initialized:
            random.seed(time.time())
        self.k = k
        self.width = width
        self.depth = depth
        self.decay = decay
        self.hash_arrays = [HashArray(width, decay) for _ in range(depth)]
        self.min_heap: List[Tuple[int, bytes]] = list()

    def _index(self, val: bytes) -> int:
        for ind, item in enumerate(self.min_heap):
            if item[1] == val:
                return ind
        return -1

    def add(self, item: bytes, incr: int) -> Optional[bytes]:
        max_count = 0
        for i in range(self.depth):
            count = self.hash_arrays[i].add(item, incr)
            max_count = max(max_count, count)
        if len(self.min_heap) < self.k:
            heapq.heappush(self.min_heap, (max_count, item))
            return None
        ind = self._index(item)
        if ind >= 0:
            self.min_heap[ind] = (max_count, item)
            heapq.heapify(self.min_heap)
            return None
        if max_count > self.min_heap[0][0]:
            expelled = heapq.heapreplace(self.min_heap, (max_count, item))
            return expelled[1]
        return None

    def count(self, item: bytes) -> int:
        ind = self._index(item)
        if ind > 0:
            return self.min_heap[ind][0]
        return max([ha.count(item) for ha in self.hash_arrays])

    def list(self, k: Optional[int] = None) -> List[Tuple[int, bytes]]:
        sorted_list = sorted(self.min_heap, key=lambda x: x[0], reverse=True)
        if k is None:
            return sorted_list
        return sorted_list[:k]