1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
|
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2024 University of New Hampshire
# Copyright(c) 2024 Arm Limited
"""Common functionality for interactive shell handling.
The base class, :class:`InteractiveShell`, is meant to be extended by subclasses that
contain functionality specific to that shell type. These subclasses will often modify things like
the prompt to expect or the arguments to pass into the application, but still utilize
the same method for sending a command and collecting output. How this output is handled however
is often application specific. If an application needs elevated privileges to start it is expected
that the method for gaining those privileges is provided when initializing the class.
This class is designed for applications like primary applications in DPDK where only one instance
of the application can be running at a given time and, for this reason, is managed using a context
manager. This context manager starts the application when you enter the context and cleans up the
application when you exit. Using a context manager for this is useful since it allows us to ensure
the application is cleaned up as soon as you leave the block regardless of the reason.
The :option:`--timeout` command line argument and the :envvar:`DTS_TIMEOUT`
environment variable configure the timeout of getting the output from command execution.
"""
from abc import ABC, abstractmethod
from collections.abc import Callable
from pathlib import PurePath
from typing import Any, ClassVar, Concatenate, ParamSpec, TypeAlias, TypeVar
from paramiko import Channel, channel
from typing_extensions import Self
from framework.context import get_ctx
from framework.exception import (
InteractiveCommandExecutionError,
InteractiveSSHSessionDeadError,
InteractiveSSHTimeoutError,
)
from framework.logger import DTSLogger, get_dts_logger
from framework.params import Params
from framework.settings import SETTINGS
from framework.testbed_model.node import Node
P = ParamSpec("P")
T = TypeVar("T", bound="InteractiveShell")
R = TypeVar("R")
InteractiveShellMethod = Callable[Concatenate[T, P], R]
InteractiveShellDecorator: TypeAlias = Callable[[InteractiveShellMethod], InteractiveShellMethod]
def only_active(func: InteractiveShellMethod) -> InteractiveShellMethod:
"""This decorator will skip running the method if the SSH channel is not active."""
def _wrapper(self: "InteractiveShell", *args: P.args, **kwargs: P.kwargs) -> R | None:
if self._ssh_channel.active:
return func(self, *args, **kwargs)
return None
return _wrapper
class InteractiveShell(ABC):
"""The base class for managing interactive shells.
This class shouldn't be instantiated directly, but instead be extended. It contains
methods for starting interactive shells as well as sending commands to these shells
and collecting input until reaching a certain prompt. All interactive applications
will use the same SSH connection, but each will create their own channel on that
session.
Interactive shells are started and stopped using a context manager. This allows for the start
and cleanup of the application to happen at predictable times regardless of exceptions or
interrupts.
Attributes:
is_alive: :data:`True` if the application has started successfully, :data:`False`
otherwise.
"""
_node: Node
_stdin: channel.ChannelStdinFile
_stdout: channel.ChannelFile
_ssh_channel: Channel
_logger: DTSLogger
_timeout: float
_app_params: Params
_privileged: bool
#: The number of times to try starting the application before considering it a failure.
_init_attempts: ClassVar[int] = 5
#: Prompt to expect at the end of output when sending a command.
#: This is often overridden by subclasses.
_default_prompt: ClassVar[str] = ""
#: Extra characters to add to the end of every command
#: before sending them. This is often overridden by subclasses and is
#: most commonly an additional newline character. This additional newline
#: character is used to force the line that is currently awaiting input
#: into the stdout buffer so that it can be consumed and checked against
#: the expected prompt.
_command_extra_chars: ClassVar[str] = ""
is_alive: bool = False
def __init__(
self,
node: Node,
name: str | None = None,
privileged: bool = False,
app_params: Params = Params(),
) -> None:
"""Create an SSH channel during initialization.
Args:
node: The node on which to run start the interactive shell.
name: Name for the interactive shell to use for logging. This name will be appended to
the name of the underlying node which it is running on.
privileged: Enables the shell to run as superuser.
app_params: The command line parameters to be passed to the application on startup.
"""
self._node = node
if name is None:
name = type(self).__name__
self._logger = get_dts_logger(f"{node.name}.{name}")
self._app_params = app_params
self._privileged = privileged
self._timeout = SETTINGS.timeout
def _setup_ssh_channel(self) -> None:
self._ssh_channel = self._node.main_session.interactive_session.session.invoke_shell()
self._stdin = self._ssh_channel.makefile_stdin("w")
self._stdout = self._ssh_channel.makefile("r")
self._ssh_channel.settimeout(self._timeout)
self._ssh_channel.set_combine_stderr(True) # combines stdout and stderr streams
def _make_start_command(self) -> str:
"""Makes the command that starts the interactive shell."""
start_command = f"{self._make_real_path()} {self._app_params or ''}"
if self._privileged:
start_command = self._node.main_session._get_privileged_command(start_command)
return start_command
def start_application(self, prompt: str | None = None, add_to_shell_pool: bool = True) -> None:
"""Starts a new interactive application based on the path to the app.
This method is often overridden by subclasses as their process for starting may look
different. Initialization of the shell on the host can be retried up to
`self._init_attempts` - 1 times. This is done because some DPDK applications need slightly
more time after exiting their script to clean up EAL before others can start.
Args:
prompt: When starting up the application, expect this string at the end of stdout when
the application is ready. If :data:`None`, the class' default prompt will be used.
add_to_shell_pool: If :data:`True`, the shell will be registered to the shell pool.
Raises:
InteractiveCommandExecutionError: If the application fails to start within the allotted
number of retries.
"""
self._setup_ssh_channel()
self._ssh_channel.settimeout(5)
start_command = self._make_start_command()
self.is_alive = True
for attempt in range(self._init_attempts):
try:
self.send_command(start_command, prompt)
break
except InteractiveSSHTimeoutError:
self._logger.info(
f"Interactive shell failed to start (attempt {attempt+1} out of "
f"{self._init_attempts})"
)
else:
self._ssh_channel.settimeout(self._timeout)
self.is_alive = False # update state on failure to start
raise InteractiveCommandExecutionError("Failed to start application.")
self._ssh_channel.settimeout(self._timeout)
if add_to_shell_pool:
get_ctx().shell_pool.register_shell(self)
def send_command(
self, command: str, prompt: str | None = None, skip_first_line: bool = False
) -> str:
"""Send `command` and get all output before the expected ending string.
Lines that expect input are not included in the stdout buffer, so they cannot
be used for expect.
Example:
If you were prompted to log into something with a username and password,
you cannot expect ``username:`` because it won't yet be in the stdout buffer.
A workaround for this could be consuming an extra newline character to force
the current `prompt` into the stdout buffer.
Args:
command: The command to send.
prompt: After sending the command, `send_command` will be expecting this string.
If :data:`None`, will use the class's default prompt.
skip_first_line: Skip the first line when capturing the output.
Returns:
All output in the buffer before expected string.
Raises:
InteractiveCommandExecutionError: If attempting to send a command to a shell that is
not currently running.
InteractiveSSHSessionDeadError: The session died while executing the command.
InteractiveSSHTimeoutError: If command was sent but prompt could not be found in
the output before the timeout.
"""
if not self.is_alive:
raise InteractiveCommandExecutionError(
f"Cannot send command {command} to application because the shell is not running."
)
self._logger.info(f"Sending: '{command}'")
if prompt is None:
prompt = self._default_prompt
out: str = ""
try:
self._stdin.write(f"{command}{self._command_extra_chars}\n")
self._stdin.flush()
for line in self._stdout:
if skip_first_line:
skip_first_line = False
continue
if line.rstrip().endswith(prompt):
break
out += line
except TimeoutError as e:
self._logger.exception(e)
self._logger.debug(
f"Prompt ({prompt}) was not found in output from command before timeout."
)
raise InteractiveSSHTimeoutError(command) from e
except OSError as e:
self._logger.exception(e)
raise InteractiveSSHSessionDeadError(
self._node.main_session.interactive_session.hostname
) from e
finally:
self._logger.debug(f"Got output: {out}")
return out
@only_active
def close(self) -> None:
"""Close the shell.
Raises:
InteractiveSSHTimeoutError: If the shell failed to exit within the set timeout.
"""
try:
# Ensure the primary application has terminated via readiness of 'stdout'.
if self._ssh_channel.recv_ready():
self._ssh_channel.recv(1) # 'Waits' for a single byte to enter 'stdout' buffer.
except TimeoutError as e:
self._logger.exception(e)
self._logger.debug("Application failed to exit before set timeout.")
raise InteractiveSSHTimeoutError("Application 'exit' command") from e
self._ssh_channel.close()
get_ctx().shell_pool.unregister_shell(self)
@property
@abstractmethod
def path(self) -> str | PurePath:
"""Path to the shell executable."""
def _make_real_path(self) -> PurePath:
return self._node.main_session.join_remote_path(self.path)
def __enter__(self) -> Self:
"""Enter the context block.
Upon entering a context block with this class, the desired behavior is to create the
channel for the application to use, and then start the application.
Returns:
Reference to the object for the application after it has been started.
"""
self.start_application()
return self
def __exit__(self, *_: Any) -> None:
"""Exit the context block.
Upon exiting a context block with this class, we want to ensure that the instance of the
application is explicitly closed and properly cleaned up using its close method. Note that
because this method returns :data:`None` if an exception was raised within the block, it is
not handled and will be re-raised after the application is closed.
The desired behavior is to close the application regardless of the reason for exiting the
context and then recreate that reason afterwards. All method arguments are ignored for
this reason.
"""
self.close()
|