1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
|
"""Provision hosts for running tests."""
from __future__ import annotations
import collections.abc as c
import dataclasses
import functools
import itertools
import os
import pickle
import sys
import time
import traceback
import typing as t
from .config import (
EnvironmentConfig,
)
from .util import (
ApplicationError,
HostConnectionError,
display,
open_binary_file,
verify_sys_executable,
version_to_str,
type_guard,
)
from .util_common import (
ExitHandler,
)
from .thread import (
WrappedThread,
)
from .host_profiles import (
ControllerHostProfile,
DockerProfile,
HostProfile,
SshConnection,
SshTargetHostProfile,
create_host_profile,
)
from .pypi_proxy import (
run_pypi_proxy,
)
THostProfile = t.TypeVar('THostProfile', bound=HostProfile)
TEnvironmentConfig = t.TypeVar('TEnvironmentConfig', bound=EnvironmentConfig)
class PrimeContainers(ApplicationError):
"""Exception raised to end execution early after priming containers."""
@dataclasses.dataclass(frozen=True)
class HostState:
"""State of hosts and profiles to be passed to ansible-test during delegation."""
controller_profile: ControllerHostProfile
target_profiles: list[HostProfile]
@property
def profiles(self) -> list[HostProfile]:
"""Return all the profiles as a list."""
return [t.cast(HostProfile, self.controller_profile)] + self.target_profiles
def serialize(self, path: str) -> None:
"""Serialize the host state to the given path."""
with open_binary_file(path, 'wb') as state_file:
pickle.dump(self, state_file)
@staticmethod
def deserialize(args: EnvironmentConfig, path: str) -> HostState:
"""Deserialize host state from the given args and path."""
with open_binary_file(path) as state_file:
host_state: HostState = pickle.load(state_file)
host_state.controller_profile.args = args
for target in host_state.target_profiles:
target.args = args
return host_state
def get_controller_target_connections(self) -> list[SshConnection]:
"""Return SSH connection(s) for accessing all target hosts from the controller."""
return list(itertools.chain.from_iterable([target.get_controller_target_connections() for
target in self.target_profiles if isinstance(target, SshTargetHostProfile)]))
def targets(self, profile_type: t.Type[THostProfile]) -> list[THostProfile]:
"""The list of target(s), verified to be of the specified type."""
if not self.target_profiles:
raise Exception('No target profiles found.')
assert type_guard(self.target_profiles, profile_type)
return t.cast(list[THostProfile], self.target_profiles)
def prepare_profiles(
args: TEnvironmentConfig,
targets_use_pypi: bool = False,
skip_setup: bool = False,
requirements: t.Optional[c.Callable[[HostProfile], None]] = None,
) -> HostState:
"""
Create new profiles, or load existing ones, and return them.
If a requirements callback was provided, it will be used before configuring hosts if delegation has already been performed.
"""
if args.host_path:
host_state = HostState.deserialize(args, os.path.join(args.host_path, 'state.dat'))
else:
run_pypi_proxy(args, targets_use_pypi)
host_state = HostState(
controller_profile=t.cast(ControllerHostProfile, create_host_profile(args, args.controller, True)),
target_profiles=[create_host_profile(args, target, False) for target in args.targets],
)
if args.prime_containers:
for host_profile in host_state.profiles:
if isinstance(host_profile, DockerProfile):
host_profile.provision()
raise PrimeContainers()
ExitHandler.register(functools.partial(cleanup_profiles, host_state))
def provision(profile: HostProfile) -> None:
"""Provision the given profile."""
profile.provision()
if not skip_setup:
profile.setup()
dispatch_jobs([(profile, WrappedThread(functools.partial(provision, profile))) for profile in host_state.profiles])
host_state.controller_profile.configure()
if not args.delegate:
check_controller_python(args, host_state)
if requirements:
requirements(host_state.controller_profile)
def configure(profile: HostProfile) -> None:
"""Configure the given profile."""
profile.wait()
if not skip_setup:
profile.configure()
if requirements:
requirements(profile)
dispatch_jobs([(profile, WrappedThread(functools.partial(configure, profile))) for profile in host_state.target_profiles])
return host_state
def check_controller_python(args: EnvironmentConfig, host_state: HostState) -> None:
"""Check the running environment to make sure it is what we expected."""
sys_version = version_to_str(sys.version_info[:2])
controller_python = host_state.controller_profile.python
if expected_executable := verify_sys_executable(controller_python.path):
raise ApplicationError(f'Running under Python interpreter "{sys.executable}" instead of "{expected_executable}".')
expected_version = controller_python.version
if expected_version != sys_version:
raise ApplicationError(f'Running under Python version {sys_version} instead of {expected_version}.')
args.controller_python = controller_python
def cleanup_profiles(host_state: HostState) -> None:
"""Cleanup provisioned hosts when exiting."""
for profile in host_state.profiles:
profile.deprovision()
def dispatch_jobs(jobs: list[tuple[HostProfile, WrappedThread]]) -> None:
"""Run the given profile job threads and wait for them to complete."""
for profile, thread in jobs:
thread.daemon = True
thread.start()
while any(thread.is_alive() for profile, thread in jobs):
time.sleep(1)
failed = False
connection_failures = 0
for profile, thread in jobs:
try:
thread.wait_for_result()
except HostConnectionError as ex:
display.error(f'Host {profile.config} connection failed:\n{ex}')
failed = True
connection_failures += 1
except ApplicationError as ex:
display.error(f'Host {profile.config} job failed:\n{ex}')
failed = True
except Exception as ex: # pylint: disable=broad-except
name = f'{"" if ex.__class__.__module__ == "builtins" else ex.__class__.__module__ + "."}{ex.__class__.__qualname__}'
display.error(f'Host {profile.config} job failed:\nTraceback (most recent call last):\n'
f'{"".join(traceback.format_tb(ex.__traceback__)).rstrip()}\n{name}: {ex}')
failed = True
if connection_failures:
raise HostConnectionError(f'Host job(s) failed, including {connection_failures} connection failure(s). See previous error(s) for details.')
if failed:
raise ApplicationError('Host job(s) failed. See previous error(s) for details.')
|