1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
|
######################################################################
#
# File: b2/_internal/_cli/autocomplete_install.py
#
# Copyright 2023 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations
import abc
import io
import logging
import os
import re
import shlex
import shutil
import signal
import subprocess
import textwrap
from datetime import datetime
from importlib.util import find_spec
from pathlib import Path
from shlex import quote
import argcomplete
from class_registry import ClassRegistry, RegistryKeyError
from b2._internal._utils.python_compat import shlex_join
logger = logging.getLogger(__name__)
SHELL_REGISTRY = ClassRegistry()
def autocomplete_install(prog: str, shell: str = 'bash') -> None:
"""Install autocomplete for the given program."""
try:
autocomplete_installer = SHELL_REGISTRY.get(shell, prog=prog)
except RegistryKeyError:
raise AutocompleteInstallError(f"Unsupported shell: {shell}")
autocomplete_installer.install()
logger.info("Autocomplete for %s has been enabled.", prog)
class ShellAutocompleteInstaller(abc.ABC):
shell_exec: str
def __init__(self, prog: str):
self.prog = prog
def install(self) -> None:
"""Install autocomplete for the given program."""
script_path = self.create_script()
if not self.is_enabled():
logger.info(
"%s completion doesn't seem to be autoloaded from %s.", self.shell_exec,
script_path.parent
)
try:
self.force_enable(script_path)
except NotImplementedError as e:
logging.warning(
"Autocomplete wasn't automatically picked up and cannot force enable it: %s", e
)
if not self.is_enabled():
logger.error("Autocomplete is still not enabled.")
raise AutocompleteInstallError(f"Autocomplete for {self.prog} install failed.")
def create_script(self) -> Path:
"""Create autocomplete for the given program."""
shellcode = self.get_shellcode()
script_path = self.get_script_path()
logger.info("Creating autocompletion script under %s", script_path)
script_path.parent.mkdir(exist_ok=True, parents=True, mode=0o755)
script_path.write_text(shellcode)
return script_path
@abc.abstractmethod
def force_enable(self, completion_script: Path) -> None:
"""
Enable autocomplete for the given program.
Used as fallback if shell doesn't automatically enable autocomplete.
"""
raise NotImplementedError
def get_shellcode(self) -> str:
"""Get autocomplete shellcode for the given program."""
return argcomplete.shellcode([self.prog], shell=self.shell_exec)
@abc.abstractmethod
def get_script_path(self) -> Path:
"""Get autocomplete script path for the given program."""
raise NotImplementedError
def program_in_path(self) -> bool:
"""Check if the given program is in PATH."""
return _silent_success_run([self.shell_exec, '-c', self.prog])
@abc.abstractmethod
def is_enabled(self) -> bool:
"""Check if autocompletion is enabled."""
raise NotImplementedError
class BashLikeAutocompleteInstaller(ShellAutocompleteInstaller):
shell_exec: str
rc_file_path: str
def get_rc_path(self) -> Path:
return Path(self.rc_file_path).expanduser()
def force_enable(self, completion_script: Path) -> None:
"""Enable autocomplete for the given program, common logic."""
rc_path = self.get_rc_path()
if rc_path.exists() and rc_path.read_text().strip():
bck_path = rc_path.with_suffix(f".{datetime.now():%Y-%m-%dT%H-%M-%S}.bak")
logger.warning("Backing up %s to %s", rc_path, bck_path)
try:
shutil.copyfile(rc_path, bck_path)
except OSError as e:
raise AutocompleteInstallError(
f"Failed to backup {rc_path} under {bck_path}"
) from e
logger.warning("Explicitly adding %s to %s", completion_script, rc_path)
add_or_update_shell_section(
rc_path, f"{self.prog} autocomplete", self.prog, self.get_rc_section(completion_script)
)
def get_rc_section(self, completion_script: Path) -> str:
return f"source {quote(str(completion_script))}"
def get_script_path(self) -> Path:
"""Get autocomplete script path for the given program, common logic."""
script_dir = Path(f"~/.{self.shell_exec}_completion.d/").expanduser()
return script_dir / self.prog
def is_enabled(self) -> bool:
"""Check if autocompletion is enabled."""
return _silent_success_run([self.shell_exec, '-i', '-c', f'complete -p {quote(self.prog)}'])
@SHELL_REGISTRY.register('bash')
class BashAutocompleteInstaller(BashLikeAutocompleteInstaller):
shell_exec = 'bash'
rc_file_path = "~/.bashrc"
@SHELL_REGISTRY.register('zsh')
class ZshAutocompleteInstaller(BashLikeAutocompleteInstaller):
shell_exec = 'zsh'
rc_file_path = "~/.zshrc"
def get_rc_section(self, completion_script: Path) -> str:
return textwrap.dedent(
f"""\
if [[ -z "$_comps" ]] && [[ -t 0 ]]; then autoload -Uz compinit && compinit -i -D; fi
source {quote(str(completion_script))}
"""
)
def get_script_path(self) -> Path:
"""Custom get_script_path for Zsh, if the structure differs from the base implementation."""
return Path("~/.zsh/completion/").expanduser() / f"_{self.prog}"
def is_enabled(self) -> bool:
rc_path = self.get_rc_path()
if not rc_path.exists():
# if zshrc is missing `zshrc -i` may hang on creation wizard when emulating tty
rc_path.touch(mode=0o750)
_silent_success_run_with_pty(
[self.shell_exec, '-c', 'autoload -Uz compaudit; echo AUDIT; compaudit']
)
cmd = [self.shell_exec, '-i', '-c', f'[[ -v _comps[{quote(self.prog)}] ]]']
return _silent_success_run_with_tty(cmd)
@SHELL_REGISTRY.register('fish')
class FishAutocompleteInstaller(ShellAutocompleteInstaller):
shell_exec = 'fish'
rc_file_path = "~/.config/fish/config.fish"
def force_enable(self, completion_script: Path) -> None:
raise NotImplementedError("Fish shell doesn't support manual completion enabling.")
def get_script_path(self) -> Path:
"""Get autocomplete script path for the given program, common logic."""
complete_paths = [
Path(p) for p in shlex.split(
subprocess.run(
[self.shell_exec, '-c', 'echo $fish_complete_path'],
timeout=30,
text=True,
check=True,
capture_output=True
).stdout
)
]
user_path = Path("~/.config/fish/completions").expanduser()
if complete_paths:
target_path = user_path if user_path in complete_paths else complete_paths[0]
else:
logger.warning("$fish_complete_path is empty, falling back to %r", user_path)
target_path = user_path
return target_path / f"{self.prog}.fish"
def is_enabled(self) -> bool:
"""
Check if autocompletion is enabled.
Fish seems to lazy-load completions, hence first we trigger completion.
That alone cannot be used, since fish tends to always propose completions (e.g. suggesting similarly
named filenames).
"""
environ = os.environ.copy()
environ.setdefault("TERM", "xterm") # TERM has to be set for fish to load completions
return _silent_success_run_with_tty(
[
self.shell_exec, '-i', '-c',
f'string length -q -- (complete -C{quote(f"{self.prog} ")} >/dev/null && complete -c {quote(self.prog)})'
],
env=environ,
)
def _silent_success_run_with_tty(
cmd: list[str], timeout: int = 30, env: dict | None = None
) -> bool:
emulate_tty = not os.isatty(0) # is True under GHA or pytest-xdist
if emulate_tty and not find_spec('pexpect'):
emulate_tty = False
logger.warning(
"pexpect is needed to check autocomplete installation correctness without tty. "
"You can install it via `pip install pexpect`."
)
run_func = _silent_success_run_with_pty if emulate_tty else _silent_success_run
return run_func(cmd, timeout=timeout, env=env)
def _silent_success_run(cmd: list[str], timeout: int = 30, env: dict | None = None) -> bool:
p = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.DEVNULL,
start_new_session=True, # prevents `zsh -i` messing with parent tty under pytest-xdist
env=env,
)
try:
stdout, stderr = p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
p.kill()
stdout, stderr = p.communicate(timeout=1)
logger.warning("Command %r timed out, stdout: %r, stderr: %r", cmd, stdout, stderr)
else:
logger.log(
logging.DEBUG if p.returncode == 0 else logging.WARNING,
"Command %r exited with code %r, stdout: %r, stderr: %r", cmd, p.returncode, stdout,
stderr
)
return p.returncode == 0
def _silent_success_run_with_pty(
cmd: list[str], timeout: int = 30, env: dict | None = None
) -> bool:
"""
Run a command with emulated terminal and return whether it succeeded.
"""
import pexpect
command_str = shlex_join(cmd)
child = pexpect.spawn(command_str, timeout=timeout, env=env)
output = io.BytesIO()
try:
child.logfile_read = output
child.expect(pexpect.EOF)
except pexpect.TIMEOUT:
logger.warning("Command %r timed out, output: %r", cmd, output.getvalue())
child.kill(signal.SIGKILL)
return False
finally:
child.close()
logger.log(
logging.DEBUG if child.exitstatus == 0 else logging.WARNING,
"Command %r exited with code %r, output: %r", cmd, child.exitstatus, output.getvalue()
)
return child.exitstatus == 0
def add_or_update_shell_section(
path: Path, section: str, managed_by: str, content: str, comment_sign="#"
) -> None:
"""Add or update a section in a file."""
section_start = f"{comment_sign} >>> {section} >>>"
section_end = f"{comment_sign} <<< {section} <<<"
assert section_end not in content
try:
file_content = path.read_text()
except FileNotFoundError:
file_content = ""
full_content = f"""
{section_start}
{comment_sign} This section is managed by {managed_by} . Manual edit may break automated updates.
{content}
{section_end}
""".strip()
pattern = re.compile(
rf'^{re.escape(section_start)}.*?^{re.escape(section_end)}', flags=re.MULTILINE | re.DOTALL
)
if pattern.search(file_content):
file_content = pattern.sub(full_content, file_content)
else:
file_content += f"\n{full_content}\n"
path.write_text(file_content)
class AutocompleteInstallError(Exception):
"""Exception raised when autocomplete installation fails."""
SUPPORTED_SHELLS = sorted(SHELL_REGISTRY.keys())
|