File: triggers.py

package info (click to toggle)
knot-resolver 6.0.17-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 16,376 kB
  • sloc: javascript: 42,732; ansic: 40,311; python: 12,580; cpp: 2,121; sh: 1,988; xml: 193; makefile: 181
file content (139 lines) | stat: -rw-r--r-- 5,291 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import logging
from threading import Timer
from typing import Dict, Optional
from urllib.parse import quote

from knot_resolver.controller.registered_workers import command_registered_workers
from knot_resolver.datamodel import KresConfig
from knot_resolver.utils import compat
from knot_resolver.utils.requests import SocketDesc, request

logger = logging.getLogger(__name__)

_triggers: Optional["Triggers"] = None


class Triggers:
    def __init__(self, config: KresConfig) -> None:
        self._config = config

        self._reload_force = False
        self._renew_force = False
        self._renew_timer: Optional[Timer] = None
        self._reload_timer: Optional[Timer] = None
        self._cmd_timers: Dict[str, Timer] = {}

        management = config.management
        socket = SocketDesc(
            f'http+unix://{quote(str(management.unix_socket), safe="")}/',
            'Key "/management/unix-socket" in validated configuration',
        )
        if management.interface:
            socket = SocketDesc(
                f"http://{management.interface.addr}:{management.interface.port}",
                'Key "/management/interface" in validated configuration',
            )
        self._socket = socket

    def trigger_cmd(self, cmd: str) -> None:
        def _cmd() -> None:
            if compat.asyncio.is_event_loop_running():
                compat.asyncio.create_task(command_registered_workers(cmd))
            else:
                compat.asyncio.run(command_registered_workers(cmd))
            logger.info(f"Sending '{cmd}' command to reload watched files has finished")

        # skipping if command was already triggered
        if cmd in self._cmd_timers and self._cmd_timers[cmd].is_alive():
            logger.info(f"Skipping sending '{cmd}' command, it was already triggered")
            return
        # start a 5sec timer
        logger.info(f"Delayed send of '{cmd}' command has started")
        self._cmd_timers[cmd] = Timer(5, _cmd)
        self._cmd_timers[cmd].start()

    def cancel_cmd(self, cmd: str) -> None:
        if cmd in self._cmd_timers:
            self._cmd_timers[cmd].cancel()

    def trigger_renew(self, force: bool = False) -> None:
        def _renew() -> None:
            response = request(self._socket, "POST", "renew/force" if force else "renew")
            if response.status != 200:
                logger.error(f"Failed to renew configuration: {response.body}")
            logger.info("Renewing configuration has finished")
            self._renew_force = False

        # do not trigger renew if reload is scheduled
        if self._reload_timer and self._reload_timer.is_alive() and self._reload_force >= force:
            logger.info("Skipping renewing configuration, reload was already triggered")
            return

        # skipping if reload was already triggered
        if self._renew_timer and self._renew_timer.is_alive():
            if self._renew_force >= force:
                logger.info("Skipping renewing configuration, it was already triggered")
                return
            self._renew_timer.cancel()
            self._renew_force = False

        logger.info("Delayed configuration renew has started")
        # start a 5sec timer
        self._renew_timer = Timer(5, _renew)
        self._renew_timer.start()
        self._renew_force = force

    def trigger_reload(self, force: bool = False) -> None:
        def _reload() -> None:
            response = request(self._socket, "POST", "reload/force" if force else "reload")
            if response.status != 200:
                logger.error(f"Failed to reload configuration: {response.body}")
            logger.info("Reloading configuration has finished")
            self._reload_force = False

        # cancel renew
        if self._renew_timer and self._renew_timer.is_alive() and force >= self._renew_force:
            self._renew_timer.cancel()
            self._renew_force = False

        # skipping if reload was already triggered
        if self._reload_timer and self._reload_timer.is_alive():
            if self._reload_force >= force:
                logger.info("Skipping reloading configuration, it was already triggered")
                return
            logger.info("Cancelling already scheduled configuration reload, force reload triggered")
            self._reload_timer.cancel()
            self._reload_force = False

        logger.info("Delayed configuration reload has started")
        # start a 5sec timer
        self._reload_timer = Timer(5, _reload)
        self._reload_timer.start()
        self._reload_force = force


def trigger_cmd(config: KresConfig, cmd: str) -> None:
    global _triggers
    if not _triggers:
        _triggers = Triggers(config)
    _triggers.trigger_cmd(cmd)


def cancel_cmd(cmd: str) -> None:
    global _triggers  # noqa: PLW0602
    if _triggers:
        _triggers.cancel_cmd(cmd)


def trigger_renew(config: KresConfig, force: bool = False) -> None:
    global _triggers
    if not _triggers:
        _triggers = Triggers(config)
    _triggers.trigger_renew(force)


def trigger_reload(config: KresConfig, force: bool = False) -> None:
    global _triggers
    if not _triggers:
        _triggers = Triggers(config)
    _triggers.trigger_reload(force)