File: etcd.py

package info (click to toggle)
python-limits 4.4.1-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,064 kB
  • sloc: python: 7,833; makefile: 162; sh: 59
file content (146 lines) | stat: -rw-r--r-- 5,081 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from __future__ import annotations

import asyncio
import time
import urllib.parse
from typing import TYPE_CHECKING

from deprecated.sphinx import deprecated

from limits.aio.storage.base import Storage
from limits.errors import ConcurrentUpdateError

if TYPE_CHECKING:
    import aetcd


@deprecated(version="4.4")
class EtcdStorage(Storage):
    """
    Rate limit storage with etcd as backend.

    Depends on :pypi:`aetcd`.
    """

    STORAGE_SCHEME = ["async+etcd"]
    """The async storage scheme for etcd"""
    DEPENDENCIES = ["aetcd"]

    PREFIX = "limits"
    MAX_RETRIES = 5

    def __init__(
        self,
        uri: str,
        max_retries: int = MAX_RETRIES,
        wrap_exceptions: bool = False,
        **options: str,
    ) -> None:
        """
        :param uri: etcd location of the form
         ``async+etcd://host:port``,
        :param max_retries: Maximum number of attempts to retry
         in the case of concurrent updates to a rate limit key
        :param wrap_exceptions: Whether to wrap storage exceptions in
         :exc:`limits.errors.StorageError` before raising it.
        :param options: all remaining keyword arguments are passed
         directly to the constructor of :class:`aetcd.client.Client`
        :raise ConfigurationError: when :pypi:`aetcd` is not available
        """
        parsed = urllib.parse.urlparse(uri)
        self.lib = self.dependencies["aetcd"].module
        self.storage: aetcd.Client = self.lib.Client(
            host=parsed.hostname, port=parsed.port, **options
        )
        self.max_retries = max_retries
        super().__init__(uri, wrap_exceptions=wrap_exceptions)

    @property
    def base_exceptions(
        self,
    ) -> type[Exception] | tuple[type[Exception], ...]:  # pragma: no cover
        return self.lib.ClientError  # type: ignore[no-any-return]

    def prefixed_key(self, key: str) -> bytes:
        return f"{self.PREFIX}/{key}".encode()

    async def incr(
        self, key: str, expiry: int, elastic_expiry: bool = False, amount: int = 1
    ) -> int:
        retries = 0
        etcd_key = self.prefixed_key(key)
        while retries < self.max_retries:
            now = time.time()
            lease = await self.storage.lease(expiry)
            window_end = now + expiry
            create_attempt = await self.storage.transaction(
                compare=[self.storage.transactions.create(etcd_key) == b"0"],
                success=[
                    self.storage.transactions.put(
                        etcd_key, f"{amount}:{window_end}".encode(), lease=lease.id
                    )
                ],
                failure=[self.storage.transactions.get(etcd_key)],
            )
            if create_attempt[0]:
                return amount
            else:
                cur = create_attempt[1][0][0][1]
                cur_value, window_end = cur.value.split(b":")
                window_end = float(window_end)
                if window_end <= now:
                    await asyncio.gather(
                        self.storage.revoke_lease(cur.lease),
                        self.storage.delete(etcd_key),
                    )
                else:
                    if elastic_expiry:
                        await self.storage.refresh_lease(cur.lease)
                        window_end = now + expiry
                    new = int(cur_value) + amount
                    if (
                        await self.storage.transaction(
                            compare=[
                                self.storage.transactions.value(etcd_key) == cur.value
                            ],
                            success=[
                                self.storage.transactions.put(
                                    etcd_key,
                                    f"{new}:{window_end}".encode(),
                                    lease=cur.lease,
                                )
                            ],
                            failure=[],
                        )
                    )[0]:
                        return new
                retries += 1
        raise ConcurrentUpdateError(key, retries)

    async def get(self, key: str) -> int:
        cur = await self.storage.get(self.prefixed_key(key))
        if cur:
            amount, expiry = cur.value.split(b":")
            if float(expiry) > time.time():
                return int(amount)
        return 0

    async def get_expiry(self, key: str) -> float:
        cur = await self.storage.get(self.prefixed_key(key))
        if cur:
            window_end = float(cur.value.split(b":")[1])
            return window_end
        return time.time()

    async def check(self) -> bool:
        try:
            await self.storage.status()
            return True
        except:  # noqa
            return False

    async def reset(self) -> int | None:
        return (await self.storage.delete_prefix(f"{self.PREFIX}/".encode())).deleted

    async def clear(self, key: str) -> None:
        await self.storage.delete(self.prefixed_key(key))