1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
|
import logging
from os import kill # pylint: disable=[no-name-in-module]
from pathlib import Path
from typing import Any, Dict, Iterable, NoReturn, Optional, Union, cast
from xmlrpc.client import Fault, ServerProxy
import supervisor.xmlrpc # type: ignore[import]
from knot_resolver.controller.exceptions import SubprocessControllerError, SubprocessControllerExecError
from knot_resolver.controller.interface import (
KresID,
Subprocess,
SubprocessController,
SubprocessStatus,
SubprocessType,
)
from knot_resolver.controller.supervisord.config_file import SupervisordKresID, write_config_file
from knot_resolver.datamodel.config_schema import KresConfig, workers_max_count
from knot_resolver.manager.constants import supervisord_config_file, supervisord_pid_file, supervisord_sock_file
from knot_resolver.utils import which
from knot_resolver.utils.async_utils import call, readfile
from knot_resolver.utils.compat.asyncio import async_in_a_thread
logger = logging.getLogger(__name__)
async def _start_supervisord(config: KresConfig) -> None:
logger.debug("Writing supervisord config")
await write_config_file(config)
logger.debug("Starting supervisord")
res = await call(["supervisord", "--configuration", str(supervisord_config_file(config).absolute())])
if res != 0:
raise SubprocessControllerError(f"Supervisord exited with exit code {res}")
async def _exec_supervisord(config: KresConfig) -> NoReturn:
logger.debug("Writing supervisord config")
await write_config_file(config)
logger.debug("Execing supervisord")
raise SubprocessControllerExecError(
[
str(which.which("supervisord")),
"supervisord",
"--configuration",
str(supervisord_config_file(config).absolute()),
]
)
async def _reload_supervisord(config: KresConfig) -> None:
await write_config_file(config)
try:
supervisord = _create_supervisord_proxy(config)
supervisord.reloadConfig()
except Fault as e:
raise SubprocessControllerError(f"supervisord reload failed: {e}") from e
@async_in_a_thread
def _stop_supervisord(config: KresConfig) -> None:
supervisord = _create_supervisord_proxy(config)
# pid = supervisord.getPID()
try:
# we might be trying to shut down supervisord at a moment, when it's waiting
# for us to stop. Therefore, this shutdown request for supervisord might
# die and it's not a problem.
supervisord.shutdown()
except Fault as e:
if e.faultCode == 6 and e.faultString == "SHUTDOWN_STATE":
# supervisord is already stopping, so it's fine
pass
else:
# something wrong happened, let's be loud about it
raise
# It is always better to clean up.
# This way, we can be sure that we are starting with a newly generated configuration.
supervisord_config_file(config).unlink()
async def _is_supervisord_available() -> bool:
# yes, it is! The code in this file wouldn't be running without it due to imports :)
# so let's just check that we can find supervisord and supervisorctl binaries
try:
which.which("supervisord")
which.which("supervisorctl")
except RuntimeError:
logger.error("Failed to find supervisord or supervisorctl executables in $PATH")
return False
return True
async def _get_supervisord_pid(config: KresConfig) -> Optional[int]:
if not Path(supervisord_pid_file(config)).exists():
return None
return int(await readfile(supervisord_pid_file(config)))
def _is_process_runinng(pid: int) -> bool:
try:
# kill with signal 0 is a safe way to test that a process exists
kill(pid, 0)
return True
except ProcessLookupError:
return False
async def _is_supervisord_running(config: KresConfig) -> bool:
pid = await _get_supervisord_pid(config)
if pid is None:
return False
if not _is_process_runinng(pid):
supervisord_pid_file(config).unlink()
return False
return True
def _create_proxy(config: KresConfig) -> ServerProxy:
return ServerProxy(
"http://127.0.0.1",
transport=supervisor.xmlrpc.SupervisorTransport(
None, None, serverurl="unix://" + str(supervisord_sock_file(config))
),
)
def _create_supervisord_proxy(config: KresConfig) -> Any:
proxy = _create_proxy(config)
return getattr(proxy, "supervisor")
def _create_fast_proxy(config: KresConfig) -> Any:
proxy = _create_proxy(config)
return getattr(proxy, "fast")
def _convert_subprocess_status(proc: Any) -> SubprocessStatus:
conversion_tbl = {
# "STOPPED": None, # filtered out elsewhere
"STARTING": SubprocessStatus.RUNNING,
"RUNNING": SubprocessStatus.RUNNING,
"BACKOFF": SubprocessStatus.RUNNING,
"STOPPING": SubprocessStatus.RUNNING,
"EXITED": SubprocessStatus.EXITED,
"FATAL": SubprocessStatus.FATAL,
"UNKNOWN": SubprocessStatus.UNKNOWN,
}
if proc["statename"] in conversion_tbl:
status = conversion_tbl[proc["statename"]]
else:
logger.warning(f"Unknown supervisord process state {proc['statename']}")
status = SubprocessStatus.UNKNOWN
return status
def _list_running_subprocesses(config: KresConfig) -> Dict[SupervisordKresID, SubprocessStatus]:
try:
supervisord = _create_supervisord_proxy(config)
processes: Any = supervisord.getAllProcessInfo()
except Fault as e:
raise SubprocessControllerError(f"failed to get info from all running processes: {e}") from e
# there will be a manager process as well, but we don't want to report anything on ourselves
processes = [pr for pr in processes if pr["name"] != "manager"]
# convert all the names
return {
SupervisordKresID.from_string(f"{pr['group']}:{pr['name']}"): _convert_subprocess_status(pr)
for pr in processes
if pr["statename"] != "STOPPED"
}
class SupervisordSubprocess(Subprocess):
def __init__(
self,
config: KresConfig,
controller: "SupervisordSubprocessController",
base_id: Union[SubprocessType, SupervisordKresID],
):
if isinstance(base_id, SubprocessType):
super().__init__(config, SupervisordKresID.alloc(base_id))
else:
super().__init__(config, base_id)
self._controller: "SupervisordSubprocessController" = controller
@property
def name(self):
return str(self.id)
def status(self) -> SubprocessStatus:
try:
supervisord = _create_supervisord_proxy(self._config)
status = supervisord.getProcessInfo(self.name)
except Fault as e:
raise SubprocessControllerError(f"failed to get status from '{self.id}' process: {e}") from e
return _convert_subprocess_status(status)
@async_in_a_thread
def _start(self) -> None:
# +1 for canary process (same as in config_file.py)
assert int(self.id) <= int(workers_max_count()) + 1, "trying to spawn more than allowed limit of workers"
try:
supervisord = _create_fast_proxy(self._config)
supervisord.startProcess(self.name)
except Fault as e:
raise SubprocessControllerError(f"failed to start '{self.id}'") from e
@async_in_a_thread
def _stop(self) -> None:
supervisord = _create_supervisord_proxy(self._config)
supervisord.stopProcess(self.name)
@async_in_a_thread
def _restart(self) -> None:
supervisord = _create_supervisord_proxy(self._config)
supervisord.stopProcess(self.name)
fast = _create_fast_proxy(self._config)
fast.startProcess(self.name)
@async_in_a_thread
def get_pid(self) -> int:
if self._pid is None:
supervisord = _create_supervisord_proxy(self._config)
info = supervisord.getProcessInfo(self.name)
self._pid = info["pid"]
return self._pid
def get_used_config(self) -> KresConfig:
return self._config
class SupervisordSubprocessController(SubprocessController):
def __init__(self): # pylint: disable=super-init-not-called
self._controller_config: Optional[KresConfig] = None
def __str__(self):
return "supervisord"
async def is_controller_available(self, config: KresConfig) -> bool:
res = await _is_supervisord_available()
if not res:
logger.info("Failed to find usable supervisord.")
logger.debug("Detection - supervisord controller is available for use")
return res
async def get_all_running_instances(self) -> Iterable[Subprocess]:
assert self._controller_config is not None
if await _is_supervisord_running(self._controller_config):
states = _list_running_subprocesses(self._controller_config)
return [
SupervisordSubprocess(self._controller_config, self, id_)
for id_ in states
if states[id_] == SubprocessStatus.RUNNING
]
return []
async def initialize_controller(self, config: KresConfig) -> None:
self._controller_config = config
if not await _is_supervisord_running(config):
logger.info(
"We want supervisord to restart us when needed, we will therefore exec() it and let it start us again."
)
await _exec_supervisord(config)
else:
logger.info("Supervisord is already running, we will just update its config...")
await _reload_supervisord(config)
async def shutdown_controller(self) -> None:
assert self._controller_config is not None
await _stop_supervisord(self._controller_config)
async def create_subprocess(self, subprocess_config: KresConfig, subprocess_type: SubprocessType) -> Subprocess:
return SupervisordSubprocess(subprocess_config, self, subprocess_type)
@async_in_a_thread
def get_subprocess_status(self) -> Dict[KresID, SubprocessStatus]:
assert self._controller_config is not None
return cast(Dict[KresID, SubprocessStatus], _list_running_subprocesses(self._controller_config))
|