File: completion.py

package info (click to toggle)
python-nubia 0.2.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 772 kB
  • sloc: python: 4,182; makefile: 9; sh: 1
file content (329 lines) | stat: -rw-r--r-- 12,485 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
#!/usr/bin/env python3

# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
#

import itertools
import logging
from typing import TYPE_CHECKING, Iterable, Callable, List

import pyparsing as pp
from prompt_toolkit.completion import CompleteEvent, Completion
from prompt_toolkit.document import Document

from nubia.internal import parser
from nubia.internal.helpers import function_to_str

if TYPE_CHECKING:
    from nubia.internal.cmdbase import AutoCommand  # noqa


class TokenParse:
    """
    This class captures an interactive shell token that cannot be fully parser
    by the interactive shell parser and analyze it.
    """

    def __init__(self, token: str) -> None:
        self._token = token
        self._key = ""
        self._is_argument = False
        self._is_list = False
        self._is_dict = False
        self._last_value = ""
        self.parse()

    def parse(self):
        key, delim, value = self._token.partition("=")
        # Is everything before the = sane?
        if any(x in key for x in "[]{}\"'"):
            # We will treat this as positional in this case
            return

        # This is key=value
        if delim == "=":
            self._is_argument = True
            self._key = key
        else:
            # This is positional, the value is the key
            value = self._key
            assert len(value) == 0
        value = value.strip()
        if len(value) > 0:
            # Let's parse the value, is it a single, list, dict?
            if value[0] == "[":
                self._is_list = True
                value = value.strip("[")
                list_values = value.rpartition(",")
                self._last_value = list_values[len(list_values) - 1].lstrip()
            elif value[0] == "{":
                self._is_dict = True
            else:
                self._last_value = value

    @property
    def is_argument(self) -> bool:
        return self._is_argument

    @property
    def is_positional(self) -> bool:
        return not self._is_argument

    # Talks about the type of the value
    @property
    def is_list(self) -> bool:
        return self._is_list

    @property
    def is_dict(self) -> bool:
        return self._is_dict

    @property
    def argument_name(self) -> str:
        assert self._is_argument
        return self._key

    def keys(self) -> Iterable[str]:
        return []

    def values(self) -> Iterable[str]:
        return []

    @property
    def last_value(self) -> str:
        return self._last_value

    @property
    def is_single_value(self) -> bool:
        return not (self._is_dict or self._is_list)


class AutoCommandCompletion:
    """
    This is the interactive completion state machine, it tracks the
    parsed tokens out of a command input and builds a data model that is
    used to understand what would be the next natural completion
    token(s).
    """

    def __init__(
        self,
        cmd_obj: "AutoCommand",
        document: Document,
        complete_event: CompleteEvent,
    ) -> None:
        self.doc = document
        self.cmd = cmd_obj
        self.meta = self.cmd.metadata
        self.event = complete_event

        # current state

    def get_completions(self) -> Iterable[Completion]:
        """
        Returns a
        """
        logger = logging.getLogger(f"{type(self).__name__}.get_completions")

        # This is a rather sad piece of code. Since nubia uses <keyword>= as
        # the completion string, when we specify choices for a keyword, and
        # select the keyword from the autocompletion suggestion by hitting
        # SPACE, the additional space confuses the autocompleter for the
        # values associated with the space. For example, if you had state=
        # as a keyword, and "up" and "down" as possible choices for the keyword
        # once you've selected state= from the completion list by hitting SPACE
        # up and down pop up as possible choices, but typing the first letter
        # of a choice such as u(for up), doesn't trim the selection to those
        # starting with u. Selection just stops because the parser gets confsed
        # about the space after "keyword= " and doesn't provide any more auto
        # complete suggestions. The lines that follow fix this by removing the
        # space after = from the text, allowing the parser to do its job.
        if self.doc.char_before_cursor != " ":
            pos = self.doc.find_backwards('= ')
            if pos:
                spos = self.doc.cursor_position + pos + 1
                epos = self.doc.cursor_position + pos + 2
                self.doc._text = self.doc._text[:spos] + self.doc._text[epos:]
                self.doc._cursor_position -= 1

        remaining = None
        try:
            parsed = parser.parse(
                self.doc.text, expect_subcommand=self.cmd.super_command
            )
        except parser.CommandParseError as e:
            parsed = e.partial_result
            remaining = e.remaining

        # This is a funky but reliable way to figure that last token we are
        # interested in manually parsing, This will return the last key=value
        # including if the value is a 'value', [list], or {dict} or combination
        # of these. This also matches positional arguments.

        if self.doc.char_before_cursor in " ]}":
            last_token = ""
        else:
            last_space = self.doc.find_backwards(
                " ", in_current_line=True) or -1
            last_token = self.doc.text[(last_space + 1):]  # noqa

        # We pick the bigger match here. The reason we want to look into
        # remaining is to capture the state that we are in an open list,
        # dictionary, or any other value that may have spaces in it but fails
        # parsing (yet).
        if remaining and len(remaining) > len(last_token):
            last_token = remaining
        try:
            return self._prepare_args_completions(
                parsed_command=parsed, last_token=last_token
            )
        except Exception as e:
            logger.exception(str(e))
            return []

    def _prepare_args_completions(
        self, parsed_command: pp.ParseResults, last_token
    ) -> Iterable[Completion]:
        assert parsed_command is not None
        args_meta = self.meta.arguments.values()
        subcommand = None
        # are we expecting a sub command?
        if self.cmd.super_command:
            # We have a sub-command (supposedly)
            subcommand = parsed_command.get("__subcommand__")
            assert subcommand
            sub_meta = self.cmd.subcommand_metadata(subcommand)
            if not sub_meta:
                logging.debug("Parsing unknown sub-command failed!")
                return []
            # we did find the sub-command, yay!
            # In this case we chain the arguments from super and the
            # sub-command together
            args_meta = itertools.chain(args_meta, sub_meta.arguments.values())
        # Now let's see if we can figure which argument we are talking about
        args_meta = self._filter_arguments_by_prefix(last_token, args_meta)
        # Which arguments did we fully parse already? let's avoid printing them
        # in completions
        parsed_keys = parsed_command.asDict().get("kv", [])
        # We are either completing an argument name, argument value, or
        # positional value.
        # Dissect the last_token and figure what is the right completion
        parsed_token = TokenParse(last_token)

        ret = []
        if parsed_token.is_positional:
            # TODO: Handle positional argument completions too
            # To figure which positional we are in right now, we need to run the
            # same logic that figures if all required arguments has been
            # supplied and how many positionals have been processed and which
            # one is next.
            # This code is already in cmdbase.py run_interactive but needs to be
            # refactored to be reusable here.
            pass
        elif parsed_token.is_argument:
            argument_name = parsed_token.argument_name
            arg = self._find_argument_by_name(argument_name)
            if not arg or arg.choices in [False, None]:
                return []
            # TODO: Support dictionary keys/named tuples completion
            if parsed_token.is_dict:
                return []
            # We are completing a value, in this case, we need to get the last
            # meaninful piece of the token `x=[Tr` => `Tr`
            if isinstance(arg.choices, Callable):
                choices = arg.choices(
                    self.cmd.metadata.command.name, subcommand, last_token,
                    self.doc.text)
                if not isinstance(choices, List):
                    raise ValueError('autocomplete function MUST provide list of strings'
                                     f', got {choices}')
            else:
                if parsed_token.last_value:
                    choices = [c for c in arg.choices
                               if str(c).startswith(parsed_token.last_value)]
                else:
                    choices = arg.choices

            ret = [
                Completion(
                    text=str(choice),
                    start_position=-len(parsed_token.last_value),
                )
                for choice in choices[:self.cmd._options.limit_visible_choices]
            ]
            return ret

        # We are completing arguments, or positionals.
        # TODO: We would like to only show positional choices if we exhaust all
        # required arguments. This will make it easier for the user to figure
        # that there are still required named arguments. After that point we
        # will show optional arguments and positionals as possible completions
        ret = [
            Completion(
                text=arg_meta.name + "=",
                start_position=-len(last_token),
                display_meta=self._get_arg_help(arg_meta),
            )
            for arg_meta in args_meta
            if arg_meta.name not in parsed_keys
        ]
        return ret

    def _filter_arguments_by_prefix(self, prefix: str, arguments=None):
        arguments = arguments or self.meta.arguments.values()
        if prefix:
            return [
                arg_meta for arg_meta in arguments if arg_meta.name.startswith(prefix)
            ]
        return arguments

    def _prepare_value_completions(self, prefix, partial_result):
        parsed_keys = map(lambda x: x[0], partial_result.get("kv", []))
        argument, rest = prefix.split("=", 1)
        arguments = self._filter_arguments_by_prefix(argument)
        if len(arguments) < 1:
            return []
        if len(arguments) == 1:
            argument_obj = self._find_argument_by_name(argument)
            assert argument_obj
            # was that argument used before?
            if argument in parsed_keys:
                logging.debug(
                    "Argument {} was used already, not generating "
                    "completions".format(argument)
                )
                return []
        return []

    def _find_argument_by_name(self, name):
        args_meta = list(self.meta.arguments.values())
        if self.cmd.super_command:
            # We need to get the subcommand name
            subcommand_name = self.doc.text.split(" ")[0]
            for _, sub in self.meta.subcommands:
                if sub.command.name == subcommand_name:
                    args_meta.extend(list(sub.arguments.values()))
        filtered = filter(lambda arg: arg.name == name, args_meta)
        return next(filtered, None)

    def _get_arg_help(self, arg_meta):
        sb = ["["]
        if arg_meta.type:
            sb.append(function_to_str(arg_meta.type, False, False))
            sb.append(", ")
        if arg_meta.default_value_set:
            sb.append("default: ")
            sb.append(arg_meta.default_value)
        else:
            sb.append("required")
        sb.append("] ")
        sb.append(
            arg_meta.description
            if arg_meta.description
            else "<no description provided>"
        )
        return "".join(str(item) for item in sb)