File: interactive.py

package info (click to toggle)
python-nubia 0.2.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 772 kB
  • sloc: python: 4,182; makefile: 9; sh: 1
file content (203 lines) | stat: -rw-r--r-- 7,784 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#!/usr/bin/env python3

# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
#


import logging
import os
from typing import Any, List, Tuple

from prompt_toolkit import PromptSession
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.completion import Completer
from prompt_toolkit.document import Document
from prompt_toolkit.enums import EditingMode
from prompt_toolkit.formatted_text import PygmentsTokens
from prompt_toolkit.history import FileHistory, InMemoryHistory
from prompt_toolkit.layout.processors import HighlightMatchingBracketProcessor
from prompt_toolkit.lexers import PygmentsLexer
from prompt_toolkit.patch_stdout import patch_stdout
from termcolor import cprint

from nubia.internal.helpers import catchall, find_approx, suggestions_msg, try_await
from nubia.internal.io.eventbus import Listener
from nubia.internal.options import Options
from nubia.internal.ui.lexer import NubiaLexer
from nubia.internal.ui.style import shell_style


def split_command(text):
    return text.lstrip(" ").split(" ", 1)


class IOLoop(Listener):
    def __init__(self, context, plugin, usagelogger, options: Options):
        self._ctx = context
        self._command_registry = self._ctx.registry
        self._plugin = plugin
        self._options = options
        self._blacklist = self._plugin.getBlacklistPlugin()
        self._status_bar = self._plugin.get_status_bar(context)
        self._completer = ShellCompleter(self._command_registry)
        self._command_registry.register_listener(self)
        self._usagelogger = usagelogger

    def _build_cli(self):
        if self._options.persistent_history:
            history = FileHistory(
                os.path.join(
                    os.path.expanduser("~"), ".{}_history".format(self._ctx.binary_name)
                )
            )
        else:
            history = InMemoryHistory()

        # If EDITOR does not exist, take EMACS
        # if it does, try fit the EMACS/VI pattern using upper
        editor = getattr(
            EditingMode, os.environ.get("EDITOR", "EMACS").upper(), EditingMode.EMACS
        )

        return PromptSession(
            history=history,
            auto_suggest=AutoSuggestFromHistory(),
            lexer=PygmentsLexer(NubiaLexer),
            completer=self._completer,
            input_processors=[HighlightMatchingBracketProcessor(chars="[](){}")],
            style=shell_style,
            bottom_toolbar=self._get_bottom_toolbar,
            editing_mode=editor,
            complete_in_thread=True,
            refresh_interval=1,
            include_default_pygments_style=False,
        )

    def _get_prompt_tokens(self) -> List[Tuple[Any, str]]:
        return self._plugin.get_prompt_tokens(self._ctx)

    def _get_bottom_toolbar(self) -> List[Tuple[Any, str]]:
        return PygmentsTokens(self._status_bar.get_tokens())

    async def parse_and_evaluate(self, input):
        command_parts = split_command(input)
        if command_parts and command_parts[0]:
            cmd = command_parts[0]
            args = command_parts[1] if len(command_parts) > 1 else None
            return await self.evaluate_command(cmd, args, input)

    async def evaluate_command(self, cmd, args, raw):
        args = args or ""

        if cmd in self._command_registry:
            cmd_instance = self._command_registry.find_command(cmd)
        else:
            suggestions = find_approx(
                cmd, self._command_registry.get_all_commands_map()
            )
            if self._options.auto_execute_single_suggestions and len(suggestions) == 1:
                print()
                cprint(
                    "Auto-correcting '{}' to '{}'".format(cmd, suggestions[0]),
                    "red",
                    attrs=["bold"],
                )
                cmd_instance = self._command_registry.find_command(suggestions[0])
            else:
                print()
                cprint(
                    "Unknown Command '{}'{} type `help` to see all "
                    "available commands".format(cmd, suggestions_msg(suggestions)),
                    "red",
                    attrs=["bold"],
                )
                cmd_instance = None

        if cmd_instance is not None:
            try:
                ret = self._blacklist.is_blacklisted(cmd)
                if ret:
                    return ret
            except Exception as e:
                err_message = (
                    "Blacklist executing failed, "
                    "all commands are available.\n"
                    "{}".format(str(e))
                )
                cprint(err_message, "red")
                logging.error(err_message)
            try:
                catchall(self._usagelogger.pre_exec)
                result = await try_await(cmd_instance.run_interactive(cmd, args, raw))
                catchall(self._usagelogger.post_exec, cmd, args, result, False)
                self._status_bar.set_last_command_status(result)
                return result
            except NotImplementedError as e:
                cprint("[NOT IMPLEMENTED]: {}".format(str(e)), "yellow", attrs=["bold"])
                # not implemented error code
                return 99

    async def run(self):
        prompt = self._build_cli()
        self._status_bar.start()
        try:
            while True:
                try:
                    with patch_stdout():
                        text = await prompt.prompt_async(
                            PygmentsTokens(self._get_prompt_tokens()),
                            rprompt=PygmentsTokens(
                                self._status_bar.get_rprompt_tokens()
                            ),
                        )
                    session_logger = self._plugin.get_session_logger(self._ctx)
                    if session_logger:
                        # Commands don't get written to stdout, so we have to
                        # explicitly dump them to the session log.
                        session_logger.log_command(text)
                        with session_logger.patch():
                            await self.parse_and_evaluate(text)
                    else:
                        await self.parse_and_evaluate(text)
                except KeyboardInterrupt:
                    pass
        except EOFError:
            # Application exiting.
            pass
        self._status_bar.stop()

    async def on_connected(self, *args, **kwargs):
        pass


class ShellCompleter(Completer):
    def __init__(self, command_registry):
        super(Completer, self).__init__()
        self._command_registry = command_registry

    def get_completions(self, document, complete_event):
        if document.on_first_line:
            cmd_and_args = split_command(document.text_before_cursor)
            # are we the first word? suggest from command names
            if len(cmd_and_args) > 1:
                cmd, args = cmd_and_args
                # pass to the children
                # let's find the parent command first
                cmd_instance = self._command_registry.find_command(cmd)
                if not cmd_instance:
                    return []
                return cmd_instance.get_completions(
                    cmd,
                    Document(
                        args, document.cursor_position - len(document.text) + len(args)
                    ),
                    complete_event,
                )
            else:
                return self._command_registry.get_completions(document, complete_event)

        return []