1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
from __future__ import annotations
import asyncio
import time
import urllib.parse
from typing import TYPE_CHECKING
from deprecated.sphinx import deprecated
from limits.aio.storage.base import Storage
from limits.errors import ConcurrentUpdateError
if TYPE_CHECKING:
import aetcd
@deprecated(version="4.4")
class EtcdStorage(Storage):
"""
Rate limit storage with etcd as backend.
Depends on :pypi:`aetcd`.
"""
STORAGE_SCHEME = ["async+etcd"]
"""The async storage scheme for etcd"""
DEPENDENCIES = ["aetcd"]
PREFIX = "limits"
MAX_RETRIES = 5
def __init__(
self,
uri: str,
max_retries: int = MAX_RETRIES,
wrap_exceptions: bool = False,
**options: str,
) -> None:
"""
:param uri: etcd location of the form
``async+etcd://host:port``,
:param max_retries: Maximum number of attempts to retry
in the case of concurrent updates to a rate limit key
:param wrap_exceptions: Whether to wrap storage exceptions in
:exc:`limits.errors.StorageError` before raising it.
:param options: all remaining keyword arguments are passed
directly to the constructor of :class:`aetcd.client.Client`
:raise ConfigurationError: when :pypi:`aetcd` is not available
"""
parsed = urllib.parse.urlparse(uri)
self.lib = self.dependencies["aetcd"].module
self.storage: aetcd.Client = self.lib.Client(
host=parsed.hostname, port=parsed.port, **options
)
self.max_retries = max_retries
super().__init__(uri, wrap_exceptions=wrap_exceptions)
@property
def base_exceptions(
self,
) -> type[Exception] | tuple[type[Exception], ...]: # pragma: no cover
return self.lib.ClientError # type: ignore[no-any-return]
def prefixed_key(self, key: str) -> bytes:
return f"{self.PREFIX}/{key}".encode()
async def incr(
self, key: str, expiry: int, elastic_expiry: bool = False, amount: int = 1
) -> int:
retries = 0
etcd_key = self.prefixed_key(key)
while retries < self.max_retries:
now = time.time()
lease = await self.storage.lease(expiry)
window_end = now + expiry
create_attempt = await self.storage.transaction(
compare=[self.storage.transactions.create(etcd_key) == b"0"],
success=[
self.storage.transactions.put(
etcd_key, f"{amount}:{window_end}".encode(), lease=lease.id
)
],
failure=[self.storage.transactions.get(etcd_key)],
)
if create_attempt[0]:
return amount
else:
cur = create_attempt[1][0][0][1]
cur_value, window_end = cur.value.split(b":")
window_end = float(window_end)
if window_end <= now:
await asyncio.gather(
self.storage.revoke_lease(cur.lease),
self.storage.delete(etcd_key),
)
else:
if elastic_expiry:
await self.storage.refresh_lease(cur.lease)
window_end = now + expiry
new = int(cur_value) + amount
if (
await self.storage.transaction(
compare=[
self.storage.transactions.value(etcd_key) == cur.value
],
success=[
self.storage.transactions.put(
etcd_key,
f"{new}:{window_end}".encode(),
lease=cur.lease,
)
],
failure=[],
)
)[0]:
return new
retries += 1
raise ConcurrentUpdateError(key, retries)
async def get(self, key: str) -> int:
cur = await self.storage.get(self.prefixed_key(key))
if cur:
amount, expiry = cur.value.split(b":")
if float(expiry) > time.time():
return int(amount)
return 0
async def get_expiry(self, key: str) -> float:
cur = await self.storage.get(self.prefixed_key(key))
if cur:
window_end = float(cur.value.split(b":")[1])
return window_end
return time.time()
async def check(self) -> bool:
try:
await self.storage.status()
return True
except: # noqa
return False
async def reset(self) -> int | None:
return (await self.storage.delete_prefix(f"{self.PREFIX}/".encode())).deleted
async def clear(self, key: str) -> None:
await self.storage.delete(self.prefixed_key(key))
|