1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
|
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
#
import itertools
import logging
from typing import TYPE_CHECKING, Iterable, Callable, List
import pyparsing as pp
from prompt_toolkit.completion import CompleteEvent, Completion
from prompt_toolkit.document import Document
from nubia.internal import parser
from nubia.internal.helpers import function_to_str
if TYPE_CHECKING:
from nubia.internal.cmdbase import AutoCommand # noqa
class TokenParse:
"""
This class captures an interactive shell token that cannot be fully parser
by the interactive shell parser and analyze it.
"""
def __init__(self, token: str) -> None:
self._token = token
self._key = ""
self._is_argument = False
self._is_list = False
self._is_dict = False
self._last_value = ""
self.parse()
def parse(self):
key, delim, value = self._token.partition("=")
# Is everything before the = sane?
if any(x in key for x in "[]{}\"'"):
# We will treat this as positional in this case
return
# This is key=value
if delim == "=":
self._is_argument = True
self._key = key
else:
# This is positional, the value is the key
value = self._key
assert len(value) == 0
value = value.strip()
if len(value) > 0:
# Let's parse the value, is it a single, list, dict?
if value[0] == "[":
self._is_list = True
value = value.strip("[")
list_values = value.rpartition(",")
self._last_value = list_values[len(list_values) - 1].lstrip()
elif value[0] == "{":
self._is_dict = True
else:
self._last_value = value
@property
def is_argument(self) -> bool:
return self._is_argument
@property
def is_positional(self) -> bool:
return not self._is_argument
# Talks about the type of the value
@property
def is_list(self) -> bool:
return self._is_list
@property
def is_dict(self) -> bool:
return self._is_dict
@property
def argument_name(self) -> str:
assert self._is_argument
return self._key
def keys(self) -> Iterable[str]:
return []
def values(self) -> Iterable[str]:
return []
@property
def last_value(self) -> str:
return self._last_value
@property
def is_single_value(self) -> bool:
return not (self._is_dict or self._is_list)
class AutoCommandCompletion:
"""
This is the interactive completion state machine, it tracks the
parsed tokens out of a command input and builds a data model that is
used to understand what would be the next natural completion
token(s).
"""
def __init__(
self,
cmd_obj: "AutoCommand",
document: Document,
complete_event: CompleteEvent,
) -> None:
self.doc = document
self.cmd = cmd_obj
self.meta = self.cmd.metadata
self.event = complete_event
# current state
def get_completions(self) -> Iterable[Completion]:
"""
Returns a
"""
logger = logging.getLogger(f"{type(self).__name__}.get_completions")
# This is a rather sad piece of code. Since nubia uses <keyword>= as
# the completion string, when we specify choices for a keyword, and
# select the keyword from the autocompletion suggestion by hitting
# SPACE, the additional space confuses the autocompleter for the
# values associated with the space. For example, if you had state=
# as a keyword, and "up" and "down" as possible choices for the keyword
# once you've selected state= from the completion list by hitting SPACE
# up and down pop up as possible choices, but typing the first letter
# of a choice such as u(for up), doesn't trim the selection to those
# starting with u. Selection just stops because the parser gets confsed
# about the space after "keyword= " and doesn't provide any more auto
# complete suggestions. The lines that follow fix this by removing the
# space after = from the text, allowing the parser to do its job.
if self.doc.char_before_cursor != " ":
pos = self.doc.find_backwards('= ')
if pos:
spos = self.doc.cursor_position + pos + 1
epos = self.doc.cursor_position + pos + 2
self.doc._text = self.doc._text[:spos] + self.doc._text[epos:]
self.doc._cursor_position -= 1
remaining = None
try:
parsed = parser.parse(
self.doc.text, expect_subcommand=self.cmd.super_command
)
except parser.CommandParseError as e:
parsed = e.partial_result
remaining = e.remaining
# This is a funky but reliable way to figure that last token we are
# interested in manually parsing, This will return the last key=value
# including if the value is a 'value', [list], or {dict} or combination
# of these. This also matches positional arguments.
if self.doc.char_before_cursor in " ]}":
last_token = ""
else:
last_space = self.doc.find_backwards(
" ", in_current_line=True) or -1
last_token = self.doc.text[(last_space + 1):] # noqa
# We pick the bigger match here. The reason we want to look into
# remaining is to capture the state that we are in an open list,
# dictionary, or any other value that may have spaces in it but fails
# parsing (yet).
if remaining and len(remaining) > len(last_token):
last_token = remaining
try:
return self._prepare_args_completions(
parsed_command=parsed, last_token=last_token
)
except Exception as e:
logger.exception(str(e))
return []
def _prepare_args_completions(
self, parsed_command: pp.ParseResults, last_token
) -> Iterable[Completion]:
assert parsed_command is not None
args_meta = self.meta.arguments.values()
subcommand = None
# are we expecting a sub command?
if self.cmd.super_command:
# We have a sub-command (supposedly)
subcommand = parsed_command.get("__subcommand__")
assert subcommand
sub_meta = self.cmd.subcommand_metadata(subcommand)
if not sub_meta:
logging.debug("Parsing unknown sub-command failed!")
return []
# we did find the sub-command, yay!
# In this case we chain the arguments from super and the
# sub-command together
args_meta = itertools.chain(args_meta, sub_meta.arguments.values())
# Now let's see if we can figure which argument we are talking about
args_meta = self._filter_arguments_by_prefix(last_token, args_meta)
# Which arguments did we fully parse already? let's avoid printing them
# in completions
parsed_keys = parsed_command.asDict().get("kv", [])
# We are either completing an argument name, argument value, or
# positional value.
# Dissect the last_token and figure what is the right completion
parsed_token = TokenParse(last_token)
ret = []
if parsed_token.is_positional:
# TODO: Handle positional argument completions too
# To figure which positional we are in right now, we need to run the
# same logic that figures if all required arguments has been
# supplied and how many positionals have been processed and which
# one is next.
# This code is already in cmdbase.py run_interactive but needs to be
# refactored to be reusable here.
pass
elif parsed_token.is_argument:
argument_name = parsed_token.argument_name
arg = self._find_argument_by_name(argument_name)
if not arg or arg.choices in [False, None]:
return []
# TODO: Support dictionary keys/named tuples completion
if parsed_token.is_dict:
return []
# We are completing a value, in this case, we need to get the last
# meaninful piece of the token `x=[Tr` => `Tr`
if isinstance(arg.choices, Callable):
choices = arg.choices(
self.cmd.metadata.command.name, subcommand, last_token,
self.doc.text)
if not isinstance(choices, List):
raise ValueError('autocomplete function MUST provide list of strings'
f', got {choices}')
else:
if parsed_token.last_value:
choices = [c for c in arg.choices
if str(c).startswith(parsed_token.last_value)]
else:
choices = arg.choices
ret = [
Completion(
text=str(choice),
start_position=-len(parsed_token.last_value),
)
for choice in choices[:self.cmd._options.limit_visible_choices]
]
return ret
# We are completing arguments, or positionals.
# TODO: We would like to only show positional choices if we exhaust all
# required arguments. This will make it easier for the user to figure
# that there are still required named arguments. After that point we
# will show optional arguments and positionals as possible completions
ret = [
Completion(
text=arg_meta.name + "=",
start_position=-len(last_token),
display_meta=self._get_arg_help(arg_meta),
)
for arg_meta in args_meta
if arg_meta.name not in parsed_keys
]
return ret
def _filter_arguments_by_prefix(self, prefix: str, arguments=None):
arguments = arguments or self.meta.arguments.values()
if prefix:
return [
arg_meta for arg_meta in arguments if arg_meta.name.startswith(prefix)
]
return arguments
def _prepare_value_completions(self, prefix, partial_result):
parsed_keys = map(lambda x: x[0], partial_result.get("kv", []))
argument, rest = prefix.split("=", 1)
arguments = self._filter_arguments_by_prefix(argument)
if len(arguments) < 1:
return []
if len(arguments) == 1:
argument_obj = self._find_argument_by_name(argument)
assert argument_obj
# was that argument used before?
if argument in parsed_keys:
logging.debug(
"Argument {} was used already, not generating "
"completions".format(argument)
)
return []
return []
def _find_argument_by_name(self, name):
args_meta = list(self.meta.arguments.values())
if self.cmd.super_command:
# We need to get the subcommand name
subcommand_name = self.doc.text.split(" ")[0]
for _, sub in self.meta.subcommands:
if sub.command.name == subcommand_name:
args_meta.extend(list(sub.arguments.values()))
filtered = filter(lambda arg: arg.name == name, args_meta)
return next(filtered, None)
def _get_arg_help(self, arg_meta):
sb = ["["]
if arg_meta.type:
sb.append(function_to_str(arg_meta.type, False, False))
sb.append(", ")
if arg_meta.default_value_set:
sb.append("default: ")
sb.append(arg_meta.default_value)
else:
sb.append("required")
sb.append("] ")
sb.append(
arg_meta.description
if arg_meta.description
else "<no description provided>"
)
return "".join(str(item) for item in sb)
|