1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
|
# type: ignore
# ruff: noqa: SLF001
# pylint: disable=c-extension-no-member
from knot_resolver.constants import NOTIFY_SUPPORT
if NOTIFY_SUPPORT:
import os
import signal
import time
from functools import partial
from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
from supervisor.events import ProcessStateEvent, ProcessStateStartingEvent, subscribe
from supervisor.medusa.asyncore_25 import compact_traceback
from supervisor.process import Subprocess
from supervisor.states import ProcessStates
from supervisor.supervisord import Supervisor
from knot_resolver.controller.supervisord.plugin import notify
starting_processes: List[Subprocess] = []
def is_type_notify(proc: Subprocess) -> bool:
return (
proc.config.environment is not None and proc.config.environment.get("X-SUPERVISORD-TYPE", None) == "notify"
)
class NotifySocketDispatcher:
"""
See supervisor.dispatcher
"""
def __init__(self, supervisor: Supervisor, fd: int):
self._supervisor = supervisor
self.fd = fd
self.closed = False # True if close() has been called
def __repr__(self):
return f"<{self.__class__.__name__} with fd={self.fd}>"
def readable(self):
return True
def writable(self):
return False
def handle_read_event(self):
logger: Any = self._supervisor.options.logger
res: Optional[Tuple[int, bytes]] = notify.read_message(self.fd)
if res is None:
return # there was some junk
pid, data = res
# pylint: disable=undefined-loop-variable
for proc in starting_processes:
if proc.pid == pid:
break
else:
logger.warn(f"ignoring ready notification from unregistered PID={pid}")
return
if data.startswith(b"READY=1"):
# handle case, when some process is really ready
if is_type_notify(proc):
proc._assertInState(ProcessStates.STARTING)
proc.change_state(ProcessStates.RUNNING)
logger.info(
f"success: {proc.config.name} entered RUNNING state, process sent notification via $NOTIFY_SOCKET"
)
else:
logger.warn(
f"ignoring READY notification from {proc.config.name}, which is not configured to send it"
)
elif data.startswith(b"STOPPING=1"):
# just accept the message, filter unwanted notifications and do nothing else
if is_type_notify(proc):
logger.info(
f"success: {proc.config.name} entered STOPPING state, process sent notification via $NOTIFY_SOCKET"
)
else:
logger.warn(
f"ignoring STOPPING notification from {proc.config.name}, which is not configured to send it"
)
else:
# handle case, when we got something unexpected
logger.warn(f"ignoring unrecognized data on $NOTIFY_SOCKET sent from PID={pid}, data='{data!r}'")
return
def handle_write_event(self):
raise ValueError("this dispatcher is not writable")
def handle_error(self):
_nil, t, v, tbinfo = compact_traceback()
self._supervisor.options.logger.error(
f"uncaptured python exception, closing notify socket {repr(self)} ({t}:{v} {tbinfo})"
)
self.close()
def close(self):
if not self.closed:
os.close(self.fd)
self.closed = True
def flush(self):
pass
def keep_track_of_starting_processes(event: ProcessStateEvent) -> None:
global starting_processes
proc: Subprocess = event.process
if isinstance(event, ProcessStateStartingEvent):
# process is starting
# if proc not in starting_processes:
starting_processes.append(proc)
else:
# not starting
starting_processes = [p for p in starting_processes if p.pid is not proc.pid]
notify_dispatcher: Optional[NotifySocketDispatcher] = None
def process_transition(slf: Subprocess) -> None:
if not is_type_notify(slf):
return slf
# modified version of upstream process transition code
if slf.state == ProcessStates.STARTING:
if time.time() - slf.laststart > slf.config.startsecs:
# STARTING -> STOPPING if the process has not sent ready notification
# within proc.config.startsecs
slf.config.options.logger.warn(
f"process '{slf.config.name}' did not send ready notification within {slf.config.startsecs} secs, killing"
)
slf.kill(signal.SIGKILL)
slf.x_notifykilled = True # used in finish() function to set to FATAL state
slf.laststart = time.time() + 1 # prevent immediate state transition to RUNNING from happening
# return self for chaining
return slf
def subprocess_finish_tail(slf, pid, sts) -> Tuple[Any, Any, Any]:
if getattr(slf, "x_notifykilled", False):
# we want FATAL, not STOPPED state after timeout waiting for startup notification
# why? because it's likely not gonna help to try starting the process up again if
# it failed so early
slf.change_state(ProcessStates.FATAL)
# clear the marker value
del slf.x_notifykilled
# return for chaining
return slf, pid, sts
def supervisord_get_process_map(supervisord: Any, mp: Dict[Any, Any]) -> Dict[Any, Any]:
global notify_dispatcher
if notify_dispatcher is None:
notify_dispatcher = NotifySocketDispatcher(supervisord, notify.init_socket())
supervisord.options.logger.info("notify: injected $NOTIFY_SOCKET into event loop")
# add our dispatcher to the result
assert notify_dispatcher.fd not in mp
mp[notify_dispatcher.fd] = notify_dispatcher
return mp
def process_spawn_as_child_add_env(slf: Subprocess, *args: Any) -> Tuple[Any, ...]:
if is_type_notify(slf):
slf.config.environment["NOTIFY_SOCKET"] = os.getcwd() + "/supervisor-notify-socket"
return (slf, *args)
T = TypeVar("T")
U = TypeVar("U")
def chain(first: Callable[..., U], second: Callable[[U], T]) -> Callable[..., T]:
def wrapper(*args: Any, **kwargs: Any) -> T:
res = first(*args, **kwargs)
if isinstance(res, tuple):
return second(*res)
return second(res)
return wrapper
def append(first: Callable[..., T], second: Callable[..., None]) -> Callable[..., T]:
def wrapper(*args: Any, **kwargs: Any) -> T:
res = first(*args, **kwargs)
second(*args, **kwargs)
return res
return wrapper
def monkeypatch(supervisord: Supervisor) -> None:
"""Inject ourselves into supervisord code"""
# append notify socket handler to event loop
supervisord.get_process_map = chain(
supervisord.get_process_map, partial(supervisord_get_process_map, supervisord)
)
# prepend timeout handler to transition method
Subprocess.transition = chain(process_transition, Subprocess.transition)
Subprocess.finish = append(Subprocess.finish, subprocess_finish_tail)
# add environment variable $NOTIFY_SOCKET to starting processes
Subprocess._spawn_as_child = chain(process_spawn_as_child_add_env, Subprocess._spawn_as_child)
# keep references to starting subprocesses
subscribe(ProcessStateEvent, keep_track_of_starting_processes)
def inject(supervisord: Supervisor, **_config: Any) -> Any: # pylint: disable=useless-return
monkeypatch(supervisord)
# this method is called by supervisord when loading the plugin,
# it should return XML-RPC object, which we don't care about
# That's why why are returning just None
return None
|