1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
|
import asyncio
from asyncio import Lock
from typing import Any, Awaitable, Callable, List, Tuple
from knot_resolver.datamodel import KresConfig
from knot_resolver.utils.functional import Result
from knot_resolver.utils.modeling.exceptions import DataParsingError
from knot_resolver.utils.modeling.types import NoneType
from .exceptions import KresManagerException
VerifyCallback = Callable[[KresConfig, KresConfig, bool], Awaitable[Result[None, str]]]
UpdateCallback = Callable[[KresConfig, bool], Awaitable[None]]
class ConfigStore:
def __init__(self, initial_config: KresConfig):
self._config = initial_config
self._verifiers: List[VerifyCallback] = []
self._callbacks: List[UpdateCallback] = []
self._update_lock: Lock = Lock()
async def update(self, config: KresConfig, force: bool = False) -> None:
# invoke pre-change verifiers
results: Tuple[Result[None, str], ...] = tuple(
await asyncio.gather(*[ver(self._config, config, force) for ver in self._verifiers])
)
err_res = filter(lambda r: r.is_err(), results)
errs = list(map(lambda r: r.unwrap_err(), err_res))
if len(errs) > 0:
raise KresManagerException("Configuration validation failed. The reasons are:\n - " + "\n - ".join(errs))
async with self._update_lock:
# update the stored config with the new version
self._config = config
# invoke change callbacks
for call in self._callbacks:
await call(config, force)
async def renew(self, force: bool = False) -> None:
await self.update(self._config, force)
async def register_verifier(self, verifier: VerifyCallback) -> None:
self._verifiers.append(verifier)
res = await verifier(self.get(), self.get(), False)
if res.is_err():
raise DataParsingError(f"Initial config verification failed with error: {res.unwrap_err()}")
async def register_on_change_callback(self, callback: UpdateCallback) -> None:
"""
Registers new callback and immediatelly calls it with current config
"""
self._callbacks.append(callback)
await callback(self.get(), False)
def get(self) -> KresConfig:
return self._config
def only_on_no_changes_update(selector: Callable[[KresConfig], Any]) -> Callable[[UpdateCallback], UpdateCallback]:
def decorator(orig_func: UpdateCallback) -> UpdateCallback:
original_value_set: Any = False
original_value: Any = None
async def new_func_update(config: KresConfig, force: bool = False) -> None:
nonlocal original_value_set
nonlocal original_value
if not original_value_set:
original_value_set = True
elif original_value == selector(config):
await orig_func(config, force)
elif force:
await orig_func(config, force)
original_value = selector(config)
return new_func_update
return decorator
def only_on_real_changes_update(selector: Callable[[KresConfig], Any]) -> Callable[[UpdateCallback], UpdateCallback]:
def decorator(orig_func: UpdateCallback) -> UpdateCallback:
original_value_set: Any = False
original_value: Any = None
async def new_func_update(config: KresConfig, force: bool) -> None:
nonlocal original_value_set
nonlocal original_value
if not original_value_set:
original_value_set = True
await orig_func(config, force)
elif original_value != selector(config):
await orig_func(config, force)
elif force:
await orig_func(config, force)
original_value = selector(config)
return new_func_update
return decorator
def only_on_real_changes_verifier(selector: Callable[[KresConfig], Any]) -> Callable[[VerifyCallback], VerifyCallback]:
def decorator(orig_func: VerifyCallback) -> VerifyCallback:
original_value_set: Any = False
original_value: Any = None
async def new_func_verifier(old: KresConfig, new: KresConfig, force: bool) -> Result[NoneType, str]:
nonlocal original_value_set
nonlocal original_value
if not original_value_set:
original_value_set = True
original_value = selector(new)
await orig_func(old, new, force)
elif original_value != selector(new):
original_value = selector(new)
await orig_func(old, new, force)
return Result.ok(None)
return new_func_verifier
return decorator
|