1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
|
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
#
import logging
import os
from typing import Any, List, Tuple
from prompt_toolkit import PromptSession
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.completion import Completer
from prompt_toolkit.document import Document
from prompt_toolkit.enums import EditingMode
from prompt_toolkit.formatted_text import PygmentsTokens
from prompt_toolkit.history import FileHistory, InMemoryHistory
from prompt_toolkit.layout.processors import HighlightMatchingBracketProcessor
from prompt_toolkit.lexers import PygmentsLexer
from prompt_toolkit.patch_stdout import patch_stdout
from termcolor import cprint
from nubia.internal.helpers import catchall, find_approx, suggestions_msg, try_await
from nubia.internal.io.eventbus import Listener
from nubia.internal.options import Options
from nubia.internal.ui.lexer import NubiaLexer
from nubia.internal.ui.style import shell_style
def split_command(text):
return text.lstrip(" ").split(" ", 1)
class IOLoop(Listener):
def __init__(self, context, plugin, usagelogger, options: Options):
self._ctx = context
self._command_registry = self._ctx.registry
self._plugin = plugin
self._options = options
self._blacklist = self._plugin.getBlacklistPlugin()
self._status_bar = self._plugin.get_status_bar(context)
self._completer = ShellCompleter(self._command_registry)
self._command_registry.register_listener(self)
self._usagelogger = usagelogger
def _build_cli(self):
if self._options.persistent_history:
history = FileHistory(
os.path.join(
os.path.expanduser("~"), ".{}_history".format(self._ctx.binary_name)
)
)
else:
history = InMemoryHistory()
# If EDITOR does not exist, take EMACS
# if it does, try fit the EMACS/VI pattern using upper
editor = getattr(
EditingMode, os.environ.get("EDITOR", "EMACS").upper(), EditingMode.EMACS
)
return PromptSession(
history=history,
auto_suggest=AutoSuggestFromHistory(),
lexer=PygmentsLexer(NubiaLexer),
completer=self._completer,
input_processors=[HighlightMatchingBracketProcessor(chars="[](){}")],
style=shell_style,
bottom_toolbar=self._get_bottom_toolbar,
editing_mode=editor,
complete_in_thread=True,
refresh_interval=1,
include_default_pygments_style=False,
)
def _get_prompt_tokens(self) -> List[Tuple[Any, str]]:
return self._plugin.get_prompt_tokens(self._ctx)
def _get_bottom_toolbar(self) -> List[Tuple[Any, str]]:
return PygmentsTokens(self._status_bar.get_tokens())
async def parse_and_evaluate(self, input):
command_parts = split_command(input)
if command_parts and command_parts[0]:
cmd = command_parts[0]
args = command_parts[1] if len(command_parts) > 1 else None
return await self.evaluate_command(cmd, args, input)
async def evaluate_command(self, cmd, args, raw):
args = args or ""
if cmd in self._command_registry:
cmd_instance = self._command_registry.find_command(cmd)
else:
suggestions = find_approx(
cmd, self._command_registry.get_all_commands_map()
)
if self._options.auto_execute_single_suggestions and len(suggestions) == 1:
print()
cprint(
"Auto-correcting '{}' to '{}'".format(cmd, suggestions[0]),
"red",
attrs=["bold"],
)
cmd_instance = self._command_registry.find_command(suggestions[0])
else:
print()
cprint(
"Unknown Command '{}'{} type `help` to see all "
"available commands".format(cmd, suggestions_msg(suggestions)),
"red",
attrs=["bold"],
)
cmd_instance = None
if cmd_instance is not None:
try:
ret = self._blacklist.is_blacklisted(cmd)
if ret:
return ret
except Exception as e:
err_message = (
"Blacklist executing failed, "
"all commands are available.\n"
"{}".format(str(e))
)
cprint(err_message, "red")
logging.error(err_message)
try:
catchall(self._usagelogger.pre_exec)
result = await try_await(cmd_instance.run_interactive(cmd, args, raw))
catchall(self._usagelogger.post_exec, cmd, args, result, False)
self._status_bar.set_last_command_status(result)
return result
except NotImplementedError as e:
cprint("[NOT IMPLEMENTED]: {}".format(str(e)), "yellow", attrs=["bold"])
# not implemented error code
return 99
async def run(self):
prompt = self._build_cli()
self._status_bar.start()
try:
while True:
try:
with patch_stdout():
text = await prompt.prompt_async(
PygmentsTokens(self._get_prompt_tokens()),
rprompt=PygmentsTokens(
self._status_bar.get_rprompt_tokens()
),
)
session_logger = self._plugin.get_session_logger(self._ctx)
if session_logger:
# Commands don't get written to stdout, so we have to
# explicitly dump them to the session log.
session_logger.log_command(text)
with session_logger.patch():
await self.parse_and_evaluate(text)
else:
await self.parse_and_evaluate(text)
except KeyboardInterrupt:
pass
except EOFError:
# Application exiting.
pass
self._status_bar.stop()
async def on_connected(self, *args, **kwargs):
pass
class ShellCompleter(Completer):
def __init__(self, command_registry):
super(Completer, self).__init__()
self._command_registry = command_registry
def get_completions(self, document, complete_event):
if document.on_first_line:
cmd_and_args = split_command(document.text_before_cursor)
# are we the first word? suggest from command names
if len(cmd_and_args) > 1:
cmd, args = cmd_and_args
# pass to the children
# let's find the parent command first
cmd_instance = self._command_registry.find_command(cmd)
if not cmd_instance:
return []
return cmd_instance.get_completions(
cmd,
Document(
args, document.cursor_position - len(document.text) + len(args)
),
complete_event,
)
else:
return self._command_registry.get_completions(document, complete_event)
return []
|