File: etcd.py

package info (click to toggle)
python-limits 4.4.1-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,064 kB
  • sloc: python: 7,833; makefile: 162; sh: 59
file content (139 lines) | stat: -rw-r--r-- 4,745 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from __future__ import annotations

import time
import urllib.parse

from deprecated.sphinx import deprecated

from limits.errors import ConcurrentUpdateError
from limits.storage.base import Storage
from limits.typing import TYPE_CHECKING

if TYPE_CHECKING:
    import etcd3


@deprecated(version="4.4")
class EtcdStorage(Storage):
    """
    Rate limit storage with etcd as backend.

    Depends on :pypi:`etcd3`.
    """

    STORAGE_SCHEME = ["etcd"]
    """The storage scheme for etcd"""
    DEPENDENCIES = ["etcd3"]
    PREFIX = "limits"
    MAX_RETRIES = 5

    def __init__(
        self,
        uri: str,
        max_retries: int = MAX_RETRIES,
        wrap_exceptions: bool = False,
        **options: str,
    ) -> None:
        """
        :param uri: etcd location of the form
         ``etcd://host:port``,
        :param max_retries: Maximum number of attempts to retry
         in the case of concurrent updates to a rate limit key
        :param wrap_exceptions: Whether to wrap storage exceptions in
         :exc:`limits.errors.StorageError` before raising it.
        :param options: all remaining keyword arguments are passed
         directly to the constructor of :class:`etcd3.Etcd3Client`
        :raise ConfigurationError: when :pypi:`etcd3` is not available
        """
        parsed = urllib.parse.urlparse(uri)
        self.lib = self.dependencies["etcd3"].module
        self.storage: etcd3.Etcd3Client = self.lib.client(
            parsed.hostname, parsed.port, **options
        )
        self.max_retries = max_retries
        super().__init__(uri, wrap_exceptions=wrap_exceptions)

    @property
    def base_exceptions(
        self,
    ) -> type[Exception] | tuple[type[Exception], ...]:  # pragma: no cover
        return self.lib.Etcd3Exception  # type: ignore[no-any-return]

    def prefixed_key(self, key: str) -> bytes:
        return f"{self.PREFIX}/{key}".encode()

    def incr(
        self, key: str, expiry: int, elastic_expiry: bool = False, amount: int = 1
    ) -> int:
        retries = 0
        etcd_key = self.prefixed_key(key)
        while retries < self.max_retries:
            now = time.time()
            lease = self.storage.lease(expiry)
            window_end = now + expiry
            create_attempt = self.storage.transaction(
                compare=[self.storage.transactions.create(etcd_key) == "0"],
                success=[
                    self.storage.transactions.put(
                        etcd_key,
                        f"{amount}:{window_end}".encode(),
                        lease=lease.id,
                    )
                ],
                failure=[self.storage.transactions.get(etcd_key)],
            )
            if create_attempt[0]:
                return amount
            else:
                cur, meta = create_attempt[1][0][0]
                cur_value, window_end = cur.split(b":")
                window_end = float(window_end)
                if window_end <= now:
                    self.storage.revoke_lease(meta.lease_id)
                    self.storage.delete(etcd_key)
                else:
                    if elastic_expiry:
                        self.storage.refresh_lease(meta.lease_id)
                        window_end = now + expiry
                    new = int(cur_value) + amount
                    if self.storage.transaction(
                        compare=[self.storage.transactions.value(etcd_key) == cur],
                        success=[
                            self.storage.transactions.put(
                                etcd_key,
                                f"{new}:{window_end}".encode(),
                                lease=meta.lease_id,
                            )
                        ],
                        failure=[],
                    )[0]:
                        return new
                retries += 1
        raise ConcurrentUpdateError(key, retries)

    def get(self, key: str) -> int:
        value, meta = self.storage.get(self.prefixed_key(key))
        if value:
            amount, expiry = value.split(b":")
            if float(expiry) > time.time():
                return int(amount)
        return 0

    def get_expiry(self, key: str) -> float:
        value, _ = self.storage.get(self.prefixed_key(key))
        if value:
            return float(value.split(b":")[1])
        return time.time()

    def check(self) -> bool:
        try:
            self.storage.status()
            return True
        except:  # noqa
            return False

    def reset(self) -> int | None:
        return self.storage.delete_prefix(f"{self.PREFIX}/").deleted

    def clear(self, key: str) -> None:
        self.storage.delete(self.prefixed_key(key))