1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
|
"""REPL (Read-Eval-Print Loop) for the Zabbix CLI."""
from __future__ import annotations
import os
import shlex
import sys
from collections import defaultdict
from collections.abc import Iterable
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
from typing import NamedTuple
from typing import NoReturn
from typing import Optional
import click
import click.parser
import click.shell_completion
from click.exceptions import Exit as ClickExit
from prompt_toolkit.history import InMemoryHistory
from prompt_toolkit.shortcuts import prompt
from zabbix_cli.exceptions import handle_exception
from zabbix_cli.output.console import err_console
from zabbix_cli.repl.completer import ClickCompleter
if TYPE_CHECKING:
from click.core import Context
from zabbix_cli.app import StatefulApp
class InternalCommandException(Exception):
pass
class ExitReplException(InternalCommandException):
pass
CommandCallable = Callable[..., Any]
"""Callable for internal commands."""
class InternalCommand(NamedTuple):
command: CommandCallable
description: str
_internal_commands: dict[str, InternalCommand] = {}
def _register_internal_command(
names: Iterable[str], target: CommandCallable, description: str
):
if isinstance(names, str):
names = [names]
for name in names:
_internal_commands[name] = InternalCommand(target, description)
def _get_registered_target(
name: str, default: Optional[CommandCallable] = None
) -> Optional[CommandCallable]:
target_info = _internal_commands.get(name)
if target_info:
return target_info[0]
return default
def _exit_internal() -> NoReturn:
raise ExitReplException()
def _help_internal() -> str:
formatter = click.HelpFormatter()
formatter.write_heading("REPL help")
formatter.indent()
with formatter.section("External Commands"):
formatter.write_text('prefix external commands with "!"')
with formatter.section("Internal Commands"):
formatter.write_text('prefix internal commands with ":"')
info_table: defaultdict[str, list[str]] = defaultdict(list)
for mnemonic, target_info in _internal_commands.items():
info_table[target_info[1]].append(mnemonic)
formatter.write_dl(
[
(
", ".join(f":{mnemonic}" for mnemonic in sorted(mnemonics)),
description,
)
for description, mnemonics in info_table.items()
]
)
return formatter.getvalue()
_register_internal_command(["q", "quit", "exit"], _exit_internal, "exits the repl")
_register_internal_command(
["?", "h", "help"], _help_internal, "displays general help information"
)
def bootstrap_prompt(
prompt_kwargs: Optional[dict[str, Any]],
group: click.Group,
ctx: click.Context,
*,
show_only_unused: bool = False,
shortest_only: bool = False,
) -> dict[str, Any]:
"""Bootstrap prompt_toolkit kwargs or use user defined values.
:param prompt_kwargs: The user specified prompt kwargs.
"""
prompt_kwargs = prompt_kwargs or {}
defaults = {
"history": InMemoryHistory(),
"completer": ClickCompleter(
group, ctx, show_only_unused=show_only_unused, shortest_only=shortest_only
),
"message": "> ",
}
for key in defaults:
default_value = defaults[key]
if key not in prompt_kwargs:
prompt_kwargs[key] = default_value
return prompt_kwargs
def register_repl(group: click.Group, name: str = "repl"):
"""Register :func:`repl()` as sub-command *name* of *group*."""
group.command(name=name)(click.pass_context(repl))
def exit() -> None:
"""Exit the repl"""
_exit_internal()
def dispatch_repl_commands(command: str) -> bool:
"""Execute system commands entered in the repl.
System commands are all commands starting with "!".
"""
if command.startswith("!") and len(command) > 1:
os.system(command[1:])
return True
return False
def handle_internal_commands(command: str) -> Any:
"""Run repl-internal commands.
Repl-internal commands are all commands starting with ":".
"""
if command.startswith(":"):
target = _get_registered_target(command[1:], default=None)
if target:
return target()
def repl( # noqa: C901
old_ctx: Context,
app: StatefulApp,
prompt_kwargs: Optional[dict[str, Any]] = None,
*,
allow_system_commands: bool = True,
allow_internal_commands: bool = True,
) -> None:
"""Start an interactive shell. All subcommands are available in it.
:param old_ctx: The current Click context.
:param prompt_kwargs: Parameters passed to
:py:func:`prompt_toolkit.shortcuts.prompt`.
If stdin is not a TTY, no prompt will be printed, but only commands read
from stdin.
"""
# parent should be available, but we're not going to bother if not
group_ctx = old_ctx.parent or old_ctx
group = group_ctx.command
isatty = sys.stdin.isatty()
if not isinstance(group, click.Group):
raise RuntimeError("REPL must be started from a Typer or Click group.")
available_commands = group.commands
# Delete the REPL command from those available
repl_command_name = old_ctx.command.name
if repl_command_name:
available_commands.pop(repl_command_name, None)
# Remove hidden commands
available_commands = {
cmd_name: cmd_obj
for cmd_name, cmd_obj in available_commands.items()
if not cmd_obj.hidden
}
group.commands = available_commands
prompt_kwargs = bootstrap_prompt(prompt_kwargs, group, group_ctx)
if isatty:
def get_command() -> str:
return prompt(**prompt_kwargs)
else:
get_command = sys.stdin.readline
while True:
try:
command = get_command()
except KeyboardInterrupt:
continue
except EOFError:
break
if not command:
if isatty:
continue
else:
break
if allow_system_commands and dispatch_repl_commands(command):
continue
if allow_internal_commands:
try:
result = handle_internal_commands(command)
if isinstance(result, str):
click.echo(result)
continue
except ExitReplException:
break
try:
args = shlex.split(command)
except ValueError as e:
click.echo(f"{type(e).__name__}: {e}")
continue
try:
if app:
group = app.as_click_group()
with group.make_context(None, args, parent=group_ctx) as ctx:
group.invoke(ctx)
ctx.exit()
except click.ClickException as e:
e.show()
except (ClickExit, SystemExit):
pass
except ExitReplException:
break
except Exception as e:
try:
# Pass exception to the exception handler
# which handles printing and logging the exception
# The exception handler also calls sys.exit() (used in non-interactive mode)
# so we need to catch it and ignore it.
handle_exception(e)
except SystemExit:
pass
except KeyboardInterrupt:
# User likely pressed Ctrl+C during a prompt or when a spinner
# was active. Ensure message is printed on a new line.
# TODO: determine if last char in terminal was newline somehow! Can we?
err_console.print("\n[red]Aborted.[/]")
pass
|