1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726
|
import asyncio
import errno
import json
import logging
import os
import signal
import sys
from functools import partial
from http import HTTPStatus
from pathlib import Path
from pwd import getpwuid
from time import time
from typing import Any, Dict, List, Literal, Optional, Set, Union, cast
from aiohttp import web
from aiohttp.web import middleware
from aiohttp.web_app import Application
from aiohttp.web_response import json_response
from aiohttp.web_runner import AppRunner, TCPSite, UnixSite
from knot_resolver.constants import USER
from knot_resolver.controller import get_best_controller_implementation
from knot_resolver.controller.exceptions import SubprocessControllerError, SubprocessControllerExecError
from knot_resolver.controller.interface import SubprocessType
from knot_resolver.controller.registered_workers import command_single_registered_worker
from knot_resolver.datamodel import kres_config_json_schema
from knot_resolver.datamodel.cache_schema import CacheClearRPCSchema
from knot_resolver.datamodel.config_schema import KresConfig, get_rundir_without_validation
from knot_resolver.datamodel.globals import Context, set_global_validation_context
from knot_resolver.datamodel.management_schema import ManagementSchema
from knot_resolver.manager import files, metrics
from knot_resolver.utils import custom_atexit as atexit
from knot_resolver.utils import ignore_exceptions_optional
from knot_resolver.utils.async_utils import readfile
from knot_resolver.utils.compat import asyncio as asyncio_compat
from knot_resolver.utils.etag import structural_etag
from knot_resolver.utils.functional import Result
from knot_resolver.utils.modeling.exceptions import AggregateDataValidationError, DataParsingError, DataValidationError
from knot_resolver.utils.modeling.parsing import DataFormat, data_combine, try_to_parse
from knot_resolver.utils.modeling.query import query
from knot_resolver.utils.modeling.types import NoneType
from knot_resolver.utils.systemd_notify import systemd_notify
from .config_store import ConfigStore
from .constants import PID_FILE_NAME, init_user_constants
from .exceptions import KresManagerException
from .logger import logger_init
from .manager import KresManager
logger = logging.getLogger(__name__)
@middleware
async def error_handler(request: web.Request, handler: Any) -> web.Response:
"""
Generic error handler for route handlers.
If an exception is thrown during request processing, this middleware catches it
and responds accordingly.
"""
try:
return await handler(request)
except (AggregateDataValidationError, DataValidationError) as e:
return web.Response(text=str(e), status=HTTPStatus.BAD_REQUEST)
except DataParsingError as e:
return web.Response(text=f"request processing error:\n{e}", status=HTTPStatus.BAD_REQUEST)
except KresManagerException as e:
return web.Response(text=f"request processing failed:\n{e}", status=HTTPStatus.INTERNAL_SERVER_ERROR)
def from_mime_type(mime_type: str) -> DataFormat:
formats = {
"application/json": DataFormat.JSON,
"application/octet-stream": DataFormat.JSON, # default in aiohttp
}
if mime_type not in formats:
raise DataParsingError(f"unsupported MIME type '{mime_type}', expected: {str(formats)[1:-1]}")
return formats[mime_type]
def parse_from_mime_type(data: str, mime_type: str) -> Any:
return from_mime_type(mime_type).parse_to_dict(data)
class Server:
# pylint: disable=too-many-instance-attributes
# This is top-level class containing pretty much everything. Instead of global
# variables, we use instance attributes. That's why there are so many and it's
# ok.
def __init__(self, store: ConfigStore, config_path: Optional[List[Path]], manager: KresManager):
# config store & server dynamic reconfiguration
self.config_store = store
# HTTP server
self.app = Application(middlewares=[error_handler])
self.runner = AppRunner(self.app)
self.listen: Optional[ManagementSchema] = None
self.site: Union[NoneType, TCPSite, UnixSite] = None
self.listen_lock = asyncio.Lock()
self._config_path: Optional[List[Path]] = config_path
self._exit_code: int = 0
self._shutdown_event = asyncio.Event()
self._manager = manager
async def _reconfigure(self, config: KresConfig, force: bool = False) -> None:
await self._reconfigure_listen_address(config)
async def _deny_management_changes(
self, config_old: KresConfig, config_new: KresConfig, force: bool = False
) -> Result[None, str]:
if config_old.management != config_new.management:
return Result.err(
"/server/management: Changing management API address/unix-socket dynamically is not allowed as it's really dangerous."
" If you really need this feature, please contact the developers and explain why. Technically,"
" there are no problems in supporting it. We are only blocking the dynamic changes because"
" we think the consequences of leaving this footgun unprotected are worse than its usefulness."
)
return Result.ok(None)
async def _deny_cache_garbage_collector_changes(
self, config_old: KresConfig, config_new: KresConfig, _force: bool = False
) -> Result[None, str]:
if config_old.cache.garbage_collector != config_new.cache.garbage_collector:
return Result.err(
"/cache/garbage-collector/*: Changing configuration dynamically is not allowed."
" To change this configuration, you must edit the configuration file and restart the entire resolver."
)
return Result.ok(None)
async def _reload_config(self, force: bool = False) -> None:
if self._config_path is None:
logger.warning("The manager was started with inlined configuration - can't reload")
else:
try:
data: Dict[str, Any] = {}
for file in self._config_path:
file_data = try_to_parse(await readfile(file))
data = data_combine(data, file_data)
config = KresConfig(data)
await self.config_store.update(config, force)
logger.info("Configuration file successfully reloaded")
except FileNotFoundError:
logger.error(
f"Configuration file was not found at '{file}'."
" Something must have happened to it while we were running."
)
logger.error("Configuration has NOT been changed.")
except (DataParsingError, DataValidationError) as e:
logger.error(f"Failed to parse the updated configuration file: {e}")
logger.error("Configuration has NOT been changed.")
except KresManagerException as e:
logger.error(f"Reloading of the configuration file failed: {e}")
logger.error("Configuration has NOT been changed.")
async def _renew_config(self, force: bool = False) -> None:
try:
await self.config_store.renew(force)
logger.info("Configuration successfully renewed")
except KresManagerException as e:
logger.error(f"Renewing the configuration failed: {e}")
logger.error("Configuration has NOT been renewed.")
async def sigint_handler(self) -> None:
logger.info("Received SIGINT, triggering graceful shutdown")
self.trigger_shutdown(0)
async def sigterm_handler(self) -> None:
logger.info("Received SIGTERM, triggering graceful shutdown")
self.trigger_shutdown(0)
async def sighup_handler(self) -> None:
logger.info("Received SIGHUP, reloading configuration file")
systemd_notify(RELOADING="1")
await self._reload_config()
systemd_notify(READY="1")
@staticmethod
def all_handled_signals() -> Set[signal.Signals]:
return {signal.SIGHUP, signal.SIGINT, signal.SIGTERM}
def bind_signal_handlers(self):
asyncio_compat.add_async_signal_handler(signal.SIGTERM, self.sigterm_handler)
asyncio_compat.add_async_signal_handler(signal.SIGINT, self.sigint_handler)
asyncio_compat.add_async_signal_handler(signal.SIGHUP, self.sighup_handler)
def unbind_signal_handlers(self):
asyncio_compat.remove_signal_handler(signal.SIGTERM)
asyncio_compat.remove_signal_handler(signal.SIGINT)
asyncio_compat.remove_signal_handler(signal.SIGHUP)
async def start(self) -> None:
self._setup_routes()
await self.runner.setup()
await self.config_store.register_verifier(self._deny_management_changes)
await self.config_store.register_verifier(self._deny_cache_garbage_collector_changes)
await self.config_store.register_on_change_callback(self._reconfigure)
async def wait_for_shutdown(self) -> None:
await self._shutdown_event.wait()
def trigger_shutdown(self, exit_code: int) -> None:
self._shutdown_event.set()
self._exit_code = exit_code
async def _handler_index(self, _request: web.Request) -> web.Response:
"""
Dummy index handler to indicate that the server is indeed running...
"""
return json_response(
{
"msg": "Knot Resolver Manager is running! The configuration endpoint is at /config",
"status": "RUNNING",
}
)
async def _handler_config_query(self, request: web.Request) -> web.Response:
"""
Route handler for changing resolver configuration
"""
# There are a lot of local variables in here, but they are usually immutable (almost SSA form :) )
# pylint: disable=too-many-locals
# parse the incoming data
if request.method == "GET":
update_with: Optional[Dict[str, Any]] = None
else:
update_with = parse_from_mime_type(await request.text(), request.content_type)
document_path = request.match_info["path"]
getheaders = ignore_exceptions_optional(List[str], None, KeyError)(request.headers.getall)
etags = getheaders("if-match")
not_etags = getheaders("if-none-match")
current_config: Dict[str, Any] = self.config_store.get().get_unparsed_data()
# stop processing if etags
def strip_quotes(s: str) -> str:
return s.strip('"')
# WARNING: this check is prone to race conditions. When changing, make sure that the current config
# is really the latest current config (i.e. no await in between obtaining the config and the checks)
status = HTTPStatus.NOT_MODIFIED if request.method in ("GET", "HEAD") else HTTPStatus.PRECONDITION_FAILED
if etags is not None and structural_etag(current_config) not in map(strip_quotes, etags):
return web.Response(status=status)
if not_etags is not None and structural_etag(current_config) in map(strip_quotes, not_etags):
return web.Response(status=status)
# run query
op = cast(Literal["get", "delete", "patch", "put"], request.method.lower())
new_config, to_return = query(current_config, op, document_path, update_with)
# update the config
if request.method != "GET":
# validate
config_validated = KresConfig(new_config)
# apply
await self.config_store.update(config_validated)
# serialize the response (the `to_return` object is a Dict/list/scalar, we want to return json)
resp_text: Optional[str] = json.dumps(to_return) if to_return is not None else None
# create the response and return it
res = web.Response(status=HTTPStatus.OK, text=resp_text, content_type="application/json")
res.headers.add("ETag", f'"{structural_etag(new_config)}"')
return res
async def _handler_metrics(self, request: web.Request) -> web.Response:
raise web.HTTPMovedPermanently("/metrics/json")
async def _handler_metrics_json(self, _request: web.Request) -> web.Response:
config = self.config_store.get()
return web.Response(
body=await metrics.report_json(config),
content_type="application/json",
charset="utf8",
)
async def _handler_metrics_prometheus(self, _request: web.Request) -> web.Response:
metrics_report = await metrics.report_prometheus()
if not metrics_report:
raise web.HTTPNotFound()
return web.Response(
body=metrics_report,
content_type="text/plain",
charset="utf8",
)
async def _handler_cache_clear(self, request: web.Request) -> web.Response:
data = parse_from_mime_type(await request.text(), request.content_type)
config = CacheClearRPCSchema(data)
_, result = await command_single_registered_worker(config.render_lua())
return web.Response(
body=json.dumps(result),
content_type="application/json",
charset="utf8",
)
async def _handler_schema(self, _request: web.Request) -> web.Response:
return web.json_response(
kres_config_json_schema(), headers={"Access-Control-Allow-Origin": "*"}, dumps=partial(json.dumps, indent=4)
)
async def _handle_view_schema(self, _request: web.Request) -> web.Response:
"""
Provides a UI for visuallising and understanding JSON schema.
The feature in the Knot Resolver Manager to render schemas is unwanted, as it's completely
out of scope. However, it can be convinient. We therefore rely on a public web-based viewers
and provide just a redirect. If this feature ever breaks due to disapearance of the public
service, we can fix it. But we are not guaranteeing, that this will always work.
"""
return web.Response(
text="""
<html>
<head><title>Redirect to schema viewer</title></head>
<body>
<script>
// we are using JS in order to use proper host
let protocol = window.location.protocol;
let host = window.location.host;
let url = encodeURIComponent(`${protocol}//${host}/schema`);
window.location.replace(`https://json-schema.app/view/%23?url=${url}`);
</script>
<h1>JavaScript required for a dynamic redirect...</h1>
</body>
</html>
""",
content_type="text/html",
)
async def _handler_stop(self, _request: web.Request) -> web.Response:
"""
Route handler for shutting down the server (and whole manager)
"""
self._shutdown_event.set()
logger.info("Shutdown event triggered...")
return web.Response(text="Shutting down...")
async def _handler_reload(self, request: web.Request) -> web.Response:
"""
Route handler for reloading the configuration
"""
logger.info("Reloading event triggered...")
await self._reload_config(force=bool(request.path.endswith("/force")))
return web.Response(text="Reloading...")
async def _handler_renew(self, request: web.Request) -> web.Response:
"""
Route handler for renewing the configuration
"""
logger.info("Renewing configuration event triggered...")
await self._renew_config(force=bool(request.path.endswith("/force")))
return web.Response(text="Renewing configuration...")
async def _handler_processes(self, request: web.Request) -> web.Response:
"""
Route handler for listing PIDs of subprocesses
"""
proc_type: Optional[SubprocessType] = None
if "path" in request.match_info and len(request.match_info["path"]) > 0:
ptstr = request.match_info["path"]
if ptstr == "/kresd":
proc_type = SubprocessType.KRESD
elif ptstr == "/gc":
proc_type = SubprocessType.GC
elif ptstr == "/all":
proc_type = None
else:
return web.Response(text=f"Invalid process type '{ptstr}'", status=400)
return web.json_response(
await self._manager.get_processes(proc_type),
headers={"Access-Control-Allow-Origin": "*"},
dumps=partial(json.dumps, indent=4),
)
def _setup_routes(self) -> None:
self.app.add_routes(
[
web.get("/", self._handler_index),
web.get(r"/v1/config{path:.*}", self._handler_config_query),
web.put(r"/v1/config{path:.*}", self._handler_config_query),
web.delete(r"/v1/config{path:.*}", self._handler_config_query),
web.patch(r"/v1/config{path:.*}", self._handler_config_query),
web.post("/stop", self._handler_stop),
web.post("/reload", self._handler_reload),
web.post("/reload/force", self._handler_reload),
web.post("/renew", self._handler_renew),
web.post("/renew/force", self._handler_renew),
web.get("/schema", self._handler_schema),
web.get("/schema/ui", self._handle_view_schema),
web.get("/metrics", self._handler_metrics),
web.get("/metrics/json", self._handler_metrics_json),
web.get("/metrics/prometheus", self._handler_metrics_prometheus),
web.post("/cache/clear", self._handler_cache_clear),
web.get("/processes{path:.*}", self._handler_processes),
]
)
async def _reconfigure_listen_address(self, config: KresConfig) -> None:
async with self.listen_lock:
mgn = config.management
# if the listen address did not change, do nothing
if self.listen == mgn:
return
# start the new listen address
nsite: Union[web.TCPSite, web.UnixSite]
if mgn.unix_socket:
nsite = web.UnixSite(self.runner, str(mgn.unix_socket))
logger.info(f"Starting API HTTP server on http+unix://{mgn.unix_socket}")
elif mgn.interface:
nsite = web.TCPSite(self.runner, str(mgn.interface.addr), int(mgn.interface.port))
logger.info(f"Starting API HTTP server on http://{mgn.interface.addr}:{mgn.interface.port}")
else:
raise KresManagerException("Requested API on unsupported configuration format.")
await nsite.start()
# stop the old listen
assert (self.listen is None) == (self.site is None)
if self.listen is not None and self.site is not None:
if self.listen.unix_socket:
logger.info(f"Stopping API HTTP server on http+unix://{mgn.unix_socket}")
elif self.listen.interface:
logger.info(
f"Stopping API HTTP server on http://{self.listen.interface.addr}:{self.listen.interface.port}"
)
await self.site.stop()
# save new state
self.listen = mgn
self.site = nsite
async def shutdown(self) -> None:
if self.site is not None:
await self.site.stop()
await self.runner.cleanup()
def get_exit_code(self) -> int:
return self._exit_code
async def _load_raw_config(config: Union[Path, Dict[str, Any]]) -> Dict[str, Any]:
# Initial configuration of the manager
if isinstance(config, Path):
if not config.exists():
raise KresManagerException(
f"Manager is configured to load config file at {config} on startup, but the file does not exist."
)
logger.info(f"Loading configuration from '{config}' file.")
config = try_to_parse(await readfile(config))
# validate the initial configuration
assert isinstance(config, dict)
return config
async def _load_config(config: Dict[str, Any]) -> KresConfig:
return KresConfig(config)
async def _init_config_store(config: Dict[str, Any]) -> ConfigStore:
config_validated = await _load_config(config)
return ConfigStore(config_validated)
async def _init_manager(config_store: ConfigStore) -> KresManager:
"""
Called asynchronously when the application initializes.
"""
# Instantiate subprocess controller (if we wanted to, we could switch it at this point)
controller = await get_best_controller_implementation(config_store.get())
# Create KresManager. This will perform autodetection of available service managers and
# select the most appropriate to use (or use the one configured directly)
manager = await KresManager.create(controller, config_store)
logger.info("Initial configuration applied. Process manager initialized...")
return manager
async def _deny_working_directory_changes(
config_old: KresConfig, config_new: KresConfig, force: bool = False
) -> Result[None, str]:
if config_old.rundir != config_new.rundir:
return Result.err("Changing manager's `rundir` during runtime is not allowed.")
return Result.ok(None)
def _set_working_directory(config_raw: Dict[str, Any]) -> None:
try:
rundir = get_rundir_without_validation(config_raw)
except ValueError as e:
raise DataValidationError(str(e), "/rundir") from e
logger.debug(f"Changing working directory to '{rundir.to_path().absolute()}'.")
os.chdir(rundir.to_path())
def _lock_working_directory(attempt: int = 0) -> None:
# the following syscall is atomic, it's essentially the same as acquiring a lock
try:
pidfile_fd = os.open(PID_FILE_NAME, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
except OSError as e:
if e.errno == errno.EEXIST and attempt == 0:
# the pid file exists, let's check PID
with open(PID_FILE_NAME, "r", encoding="utf-8") as f:
pid = int(f.read().strip())
try:
os.kill(pid, 0)
except OSError as e2:
if e2.errno == errno.ESRCH:
os.unlink(PID_FILE_NAME)
_lock_working_directory(attempt=attempt + 1)
return
raise KresManagerException(
"Another manager is running in the same working directory."
f" PID file is located at {os.getcwd()}/{PID_FILE_NAME}"
) from e
raise KresManagerException(
"Another manager is running in the same working directory."
f" PID file is located at {os.getcwd()}/{PID_FILE_NAME}"
) from e
# now we know that we are the only manager running in this directory
# write PID to the pidfile and close it afterwards
pidfile = os.fdopen(pidfile_fd, "w")
pid = os.getpid()
pidfile.write(f"{pid}\n")
pidfile.close()
# make sure that the file is deleted on shutdown
atexit.register(lambda: os.unlink(PID_FILE_NAME))
async def _sigint_while_shutting_down():
logger.warning(
"Received SIGINT while already shutting down. Ignoring."
" If you want to forcefully stop the manager right now, use SIGTERM."
)
async def _sigterm_while_shutting_down():
logger.warning("Received SIGTERM. Invoking dirty shutdown!")
sys.exit(128 + signal.SIGTERM)
async def start_server(config: List[str]) -> int: # noqa: PLR0915
# This function is quite long, but it describes how manager runs. So let's silence pylint
# pylint: disable=too-many-statements
start_time = time()
working_directory_on_startup = os.getcwd()
manager: Optional[KresManager] = None
# Block signals during initialization to force their processing once everything is ready
signal.pthread_sigmask(signal.SIG_BLOCK, Server.all_handled_signals())
# Check if we are running under the intended user, if not, log a warning message
pw_username = getpwuid(os.getuid()).pw_name
if pw_username != USER:
logger.warning(
f"Knot Resolver does not run as the default '{USER}' user, but as '{pw_username}' instead."
" This may or may not affect the configuration validation and the proper functioning of the resolver."
)
if os.geteuid() == 0:
logger.warning(" It is not recommended to run under root privileges unless there is no other option.")
# before starting server, initialize the subprocess controller, config store, etc. Any errors during inicialization
# are fatal
try:
# Make sure that the config paths does not change meaning when we change working directory
config_absolute = [Path(path).absolute() for path in config]
config_data: Dict[str, Any] = {}
for file in config_absolute:
# warning about the different parent directories of each config file
# compared to the first one which is used as the prefix path
if config_absolute[0].parent != file.parent:
logger.warning(
f"The configuration file '{file}' has a parent directory that is different"
f" from '{config_absolute[0]}', which is used as the prefix for relative paths."
"This can cause issues with files that are configured with relative paths."
)
# Preprocess config - load from file or in general take it to the last step before validation.
config_raw = await _load_raw_config(file)
# combine data from all config files
config_data = data_combine(config_data, config_raw)
# before processing any configuration, set validation context
# - resolve_root: root against which all relative paths will be resolved
# - strict_validation: check for path existence during configuration validation
# - permissions_default: validate dirs/files rwx permissions against default user:group in constants
set_global_validation_context(Context(config_absolute[0].parent, True, False))
# We want to change cwd as soon as possible. Some parts of the codebase are using os.getcwd() to get the
# working directory.
#
# If we fail to read rundir from unparsed config, the first config validation error comes from here
_set_working_directory(config_data)
# We don't want more than one manager in a single working directory. So we lock it with a PID file.
# Warning - this does not prevent multiple managers with the same naming of kresd service.
_lock_working_directory()
# set_global_validation_context(Context(config.parent))
# After the working directory is set, we can initialize proper config store with a newly parsed configuration.
config_store = await _init_config_store(config_data)
# Some "constants" need to be loaded from the initial config, some need to be stored from the initial run conditions
await init_user_constants(config_store, working_directory_on_startup)
# This behaviour described above with paths means, that we MUST NOT allow `rundir` change after initialization.
# It would cause strange problems because every other path configuration depends on it. Therefore, we have to
# add a check to the config store, which disallows changes.
await config_store.register_verifier(_deny_working_directory_changes)
# Up to this point, we have been logging to memory buffer. But now, when we have the configuration loaded, we
# can flush the buffer into the proper place
await logger_init(config_store)
# With configuration on hand, we can initialize monitoring. We want to do this before any subprocesses are
# started, therefore before initializing manager
await metrics.init_prometheus(config_store)
await files.init_files_watchdog(config_store)
# After we have loaded the configuration, we can start worrying about subprocess management.
manager = await _init_manager(config_store)
# prepare instance of the server (no side effects)
server = Server(config_store, config_absolute, manager)
# add Server's shutdown trigger to the manager
manager.add_shutdown_trigger(server.trigger_shutdown)
except SubprocessControllerExecError as e:
# if we caught this exception, some component wants to perform a reexec during startup. Most likely, it would
# be a subprocess manager like supervisord, which wants to make sure the manager runs under supervisord in
# the process tree. So now we stop everything, and exec what we are told to. We are assuming, that the thing
# we'll exec will invoke us again.
logger.info("Exec requested with arguments: %s", str(e.exec_args))
# unblock signals, this could actually terminate us straight away
signal.pthread_sigmask(signal.SIG_UNBLOCK, Server.all_handled_signals())
# run exit functions
atexit.run_callbacks()
# and finally exec what we were told to exec
os.execl(*e.exec_args)
except SubprocessControllerError as e:
logger.error(f"Server initialization failed: {e}")
return 1
except KresManagerException as e:
# We caught an error with a pretty error message. Just print it and exit.
logger.error(e)
return 1
except BaseException:
logger.error("Uncaught generic exception during manager inicialization...", exc_info=True)
return 1
# At this point, all backend functionality-providing components are initialized. It's therefore save to start
# the API server.
try:
await server.start()
except OSError as e:
if e.errno in (errno.EADDRINUSE, errno.EADDRNOTAVAIL):
# fancy error reporting of network binding errors
logger.error(str(e))
await manager.stop()
return 1
raise
# At this point, pretty much everything is ready to go. We should just make sure the user can shut
# the manager down with signals.
server.bind_signal_handlers()
signal.pthread_sigmask(signal.SIG_UNBLOCK, Server.all_handled_signals())
logger.info(f"Manager fully initialized and running in {round(time() - start_time, 3)} seconds")
# notify systemd/anything compatible that we are ready
systemd_notify(READY="1")
await server.wait_for_shutdown()
# notify systemd that we are shutting down
systemd_notify(STOPPING="1")
# Ok, now we are tearing everything down.
# First of all, let's block all unwanted interruptions. We don't want to be reconfiguring kresd's while
# shutting down.
signal.pthread_sigmask(signal.SIG_BLOCK, Server.all_handled_signals())
server.unbind_signal_handlers()
# on the other hand, we want to immediatelly stop when the user really wants us to stop
asyncio_compat.add_async_signal_handler(signal.SIGTERM, _sigterm_while_shutting_down)
asyncio_compat.add_async_signal_handler(signal.SIGINT, _sigint_while_shutting_down)
signal.pthread_sigmask(signal.SIG_UNBLOCK, {signal.SIGTERM, signal.SIGINT})
# After triggering shutdown, we neet to clean everything up
logger.info("Stopping API service...")
await server.shutdown()
logger.info("Stopping kresd manager...")
await manager.stop()
logger.info(f"The manager run for {round(time() - start_time)} seconds...")
return server.get_exit_code()
|