1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
from __future__ import annotations
import time
import urllib.parse
from deprecated.sphinx import deprecated
from limits.errors import ConcurrentUpdateError
from limits.storage.base import Storage
from limits.typing import TYPE_CHECKING
if TYPE_CHECKING:
import etcd3
@deprecated(version="4.4")
class EtcdStorage(Storage):
"""
Rate limit storage with etcd as backend.
Depends on :pypi:`etcd3`.
"""
STORAGE_SCHEME = ["etcd"]
"""The storage scheme for etcd"""
DEPENDENCIES = ["etcd3"]
PREFIX = "limits"
MAX_RETRIES = 5
def __init__(
self,
uri: str,
max_retries: int = MAX_RETRIES,
wrap_exceptions: bool = False,
**options: str,
) -> None:
"""
:param uri: etcd location of the form
``etcd://host:port``,
:param max_retries: Maximum number of attempts to retry
in the case of concurrent updates to a rate limit key
:param wrap_exceptions: Whether to wrap storage exceptions in
:exc:`limits.errors.StorageError` before raising it.
:param options: all remaining keyword arguments are passed
directly to the constructor of :class:`etcd3.Etcd3Client`
:raise ConfigurationError: when :pypi:`etcd3` is not available
"""
parsed = urllib.parse.urlparse(uri)
self.lib = self.dependencies["etcd3"].module
self.storage: etcd3.Etcd3Client = self.lib.client(
parsed.hostname, parsed.port, **options
)
self.max_retries = max_retries
super().__init__(uri, wrap_exceptions=wrap_exceptions)
@property
def base_exceptions(
self,
) -> type[Exception] | tuple[type[Exception], ...]: # pragma: no cover
return self.lib.Etcd3Exception # type: ignore[no-any-return]
def prefixed_key(self, key: str) -> bytes:
return f"{self.PREFIX}/{key}".encode()
def incr(
self, key: str, expiry: int, elastic_expiry: bool = False, amount: int = 1
) -> int:
retries = 0
etcd_key = self.prefixed_key(key)
while retries < self.max_retries:
now = time.time()
lease = self.storage.lease(expiry)
window_end = now + expiry
create_attempt = self.storage.transaction(
compare=[self.storage.transactions.create(etcd_key) == "0"],
success=[
self.storage.transactions.put(
etcd_key,
f"{amount}:{window_end}".encode(),
lease=lease.id,
)
],
failure=[self.storage.transactions.get(etcd_key)],
)
if create_attempt[0]:
return amount
else:
cur, meta = create_attempt[1][0][0]
cur_value, window_end = cur.split(b":")
window_end = float(window_end)
if window_end <= now:
self.storage.revoke_lease(meta.lease_id)
self.storage.delete(etcd_key)
else:
if elastic_expiry:
self.storage.refresh_lease(meta.lease_id)
window_end = now + expiry
new = int(cur_value) + amount
if self.storage.transaction(
compare=[self.storage.transactions.value(etcd_key) == cur],
success=[
self.storage.transactions.put(
etcd_key,
f"{new}:{window_end}".encode(),
lease=meta.lease_id,
)
],
failure=[],
)[0]:
return new
retries += 1
raise ConcurrentUpdateError(key, retries)
def get(self, key: str) -> int:
value, meta = self.storage.get(self.prefixed_key(key))
if value:
amount, expiry = value.split(b":")
if float(expiry) > time.time():
return int(amount)
return 0
def get_expiry(self, key: str) -> float:
value, _ = self.storage.get(self.prefixed_key(key))
if value:
return float(value.split(b":")[1])
return time.time()
def check(self) -> bool:
try:
self.storage.status()
return True
except: # noqa
return False
def reset(self) -> int | None:
return self.storage.delete_prefix(f"{self.PREFIX}/").deleted
def clear(self, key: str) -> None:
self.storage.delete(self.prefixed_key(key))
|