File: config_store.py

package info (click to toggle)
knot-resolver 6.0.17-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 16,376 kB
  • sloc: javascript: 42,732; ansic: 40,311; python: 12,580; cpp: 2,121; sh: 1,988; xml: 193; makefile: 181
file content (124 lines) | stat: -rw-r--r-- 4,748 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import asyncio
from asyncio import Lock
from typing import Any, Awaitable, Callable, List, Tuple

from knot_resolver.datamodel import KresConfig
from knot_resolver.utils.functional import Result
from knot_resolver.utils.modeling.exceptions import DataParsingError
from knot_resolver.utils.modeling.types import NoneType

from .exceptions import KresManagerException

VerifyCallback = Callable[[KresConfig, KresConfig, bool], Awaitable[Result[None, str]]]
UpdateCallback = Callable[[KresConfig, bool], Awaitable[None]]


class ConfigStore:
    def __init__(self, initial_config: KresConfig):
        self._config = initial_config
        self._verifiers: List[VerifyCallback] = []
        self._callbacks: List[UpdateCallback] = []
        self._update_lock: Lock = Lock()

    async def update(self, config: KresConfig, force: bool = False) -> None:
        # invoke pre-change verifiers
        results: Tuple[Result[None, str], ...] = tuple(
            await asyncio.gather(*[ver(self._config, config, force) for ver in self._verifiers])
        )
        err_res = filter(lambda r: r.is_err(), results)
        errs = list(map(lambda r: r.unwrap_err(), err_res))
        if len(errs) > 0:
            raise KresManagerException("Configuration validation failed. The reasons are:\n - " + "\n - ".join(errs))

        async with self._update_lock:
            # update the stored config with the new version
            self._config = config

            # invoke change callbacks
            for call in self._callbacks:
                await call(config, force)

    async def renew(self, force: bool = False) -> None:
        await self.update(self._config, force)

    async def register_verifier(self, verifier: VerifyCallback) -> None:
        self._verifiers.append(verifier)
        res = await verifier(self.get(), self.get(), False)
        if res.is_err():
            raise DataParsingError(f"Initial config verification failed with error: {res.unwrap_err()}")

    async def register_on_change_callback(self, callback: UpdateCallback) -> None:
        """
        Registers new callback and immediatelly calls it with current config
        """

        self._callbacks.append(callback)
        await callback(self.get(), False)

    def get(self) -> KresConfig:
        return self._config


def only_on_no_changes_update(selector: Callable[[KresConfig], Any]) -> Callable[[UpdateCallback], UpdateCallback]:
    def decorator(orig_func: UpdateCallback) -> UpdateCallback:
        original_value_set: Any = False
        original_value: Any = None

        async def new_func_update(config: KresConfig, force: bool = False) -> None:
            nonlocal original_value_set
            nonlocal original_value
            if not original_value_set:
                original_value_set = True
            elif original_value == selector(config):
                await orig_func(config, force)
            elif force:
                await orig_func(config, force)
            original_value = selector(config)

        return new_func_update

    return decorator


def only_on_real_changes_update(selector: Callable[[KresConfig], Any]) -> Callable[[UpdateCallback], UpdateCallback]:
    def decorator(orig_func: UpdateCallback) -> UpdateCallback:
        original_value_set: Any = False
        original_value: Any = None

        async def new_func_update(config: KresConfig, force: bool) -> None:
            nonlocal original_value_set
            nonlocal original_value
            if not original_value_set:
                original_value_set = True
                await orig_func(config, force)
            elif original_value != selector(config):
                await orig_func(config, force)
            elif force:
                await orig_func(config, force)
            original_value = selector(config)

        return new_func_update

    return decorator


def only_on_real_changes_verifier(selector: Callable[[KresConfig], Any]) -> Callable[[VerifyCallback], VerifyCallback]:
    def decorator(orig_func: VerifyCallback) -> VerifyCallback:
        original_value_set: Any = False
        original_value: Any = None

        async def new_func_verifier(old: KresConfig, new: KresConfig, force: bool) -> Result[NoneType, str]:
            nonlocal original_value_set
            nonlocal original_value
            if not original_value_set:
                original_value_set = True
                original_value = selector(new)
                await orig_func(old, new, force)
            elif original_value != selector(new):
                original_value = selector(new)
                await orig_func(old, new, force)
            return Result.ok(None)

        return new_func_verifier

    return decorator