File: terminal.py

package info (click to toggle)
python-plumbum 1.9.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,300 kB
  • sloc: python: 10,016; makefile: 130; sh: 8
file content (240 lines) | stat: -rw-r--r-- 7,221 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
"""
Terminal-related utilities
--------------------------
"""

from __future__ import annotations

import contextlib
import os
import sys

from plumbum import local

from .progress import Progress
from .termsize import get_terminal_size

__all__ = [
    "readline",
    "ask",
    "choose",
    "prompt",
    "get_terminal_size",
    "Progress",
    "get_terminal_size",
]


def __dir__() -> list[str]:
    return __all__


def readline(message: str = "") -> str:
    """Gets a line of input from the user (stdin)"""
    sys.stdout.write(message)
    sys.stdout.flush()
    return sys.stdin.readline()


def ask(question: str, default: bool | None = None) -> bool:
    """
    Presents the user with a yes/no question.

    :param question: The question to ask
    :param default: If ``None``, the user must answer. If ``True`` or ``False``, lack of response is
                    interpreted as the default option

    :returns: the user's choice
    """
    question = question.rstrip().rstrip("?").rstrip() + "?"
    if default is None:
        question += " (y/n) "
    elif default:
        question += " [Y/n] "
    else:
        question += " [y/N] "

    while True:
        try:
            answer = readline(question).strip().lower()
        except EOFError:
            answer = None
        if answer in {"y", "yes"}:
            return True
        if answer in {"n", "no"}:
            return False
        if not answer and default is not None:
            return default
        sys.stdout.write("Invalid response, please try again\n")


def choose(question, options, default=None):
    """Prompts the user with a question and a set of options, from which the user needs to choose.

    :param question: The question to ask
    :param options: A set of options. It can be a list (of strings or two-tuples, mapping text
                    to returned-object) or a dict (mapping text to returned-object).``
    :param default: If ``None``, the user must answer. Otherwise, lack of response is interpreted
                    as this answer

    :returns: The user's choice

    Example::

        ans = choose("What is your favorite color?", ["blue", "yellow", "green"], default = "yellow")
        # `ans` will be one of "blue", "yellow" or "green"

        ans = choose("What is your favorite color?",
                {"blue" : 0x0000ff, "yellow" : 0xffff00 , "green" : 0x00ff00}, default = 0x00ff00)
        # this will display "blue", "yellow" and "green" but return a numerical value
    """
    if hasattr(options, "items"):
        options = options.items()
    sys.stdout.write(question.rstrip() + "\n")
    choices = {}
    defindex = None
    for i, item in enumerate(options, 1):
        if isinstance(item, (tuple, list)) and len(item) == 2:
            text = item[0]
            val = item[1]
        else:
            text = item
            val = item
        choices[i] = val
        if default is not None and default == val:
            defindex = i
        sys.stdout.write(f"({i}) {text}\n")
    if default is not None:
        msg = f"Choice [{default}]: " if defindex is None else f"Choice [{defindex}]: "
    else:
        msg = "Choice: "
    while True:
        try:
            choice = readline(msg).strip()
        except EOFError:
            choice = ""
        if not choice and default:
            return default
        try:
            choice = int(choice)
            if choice not in choices:
                raise ValueError()
        except ValueError:
            sys.stdout.write("Invalid choice, please try again\n")
            continue
        return choices[choice]


def prompt(
    question,
    type=str,  # pylint: disable=redefined-builtin
    default=NotImplemented,
    validator=lambda _: True,
):
    """
    Presents the user with a validated question, keeps asking if validation does not pass.

    :param question: The question to ask
    :param type: The type of the answer, defaults to str
    :param default: The default choice
    :param validator: An extra validator called after type conversion, can raise ValueError or return False to trigger a retry.

    :returns: the user's choice
    """
    question = question.rstrip(" \t:")
    if default is not NotImplemented:
        question += f" [{default}]"
    question += ": "
    while True:
        try:
            ans = readline(question).strip()
        except EOFError:
            ans = ""

        if not ans:
            if default is not NotImplemented:
                # sys.stdout.write("\b%s\n" % (default,))
                return default
            continue
        try:
            ans = type(ans)
        except (TypeError, ValueError) as ex:
            sys.stdout.write(f"Invalid value ({ex}), please try again\n")
            continue

        try:
            valid = validator(ans)
        except ValueError as ex:
            sys.stdout.write(f"{ex}, please try again\n")
            continue

        if not valid:
            sys.stdout.write("Value not in specified range, please try again\n")
            continue
        return ans


def hexdump(data_or_stream, bytes_per_line=16, aggregate=True):
    """Convert the given bytes (or a stream with a buffering ``read()`` method) to hexdump-formatted lines,
    with possible aggregation of identical lines. Returns a generator of formatted lines.
    """
    if hasattr(data_or_stream, "read"):

        def read_chunk():
            while True:
                buf = data_or_stream.read(bytes_per_line)
                if not buf:
                    break
                yield buf

    else:

        def read_chunk():
            for i in range(0, len(data_or_stream), bytes_per_line):
                yield data_or_stream[i : i + bytes_per_line]

    prev = None
    skipped = False
    for i, chunk in enumerate(read_chunk()):
        hexd = " ".join(f"{ord(ch):02x}" for ch in chunk)
        text = "".join(ch if 32 <= ord(ch) < 127 else "." for ch in chunk)
        if aggregate and prev == chunk:
            skipped = True
            continue
        prev = chunk
        if skipped:
            yield "*"
        hexd_ljust = hexd.ljust(bytes_per_line * 3, " ")
        yield f"{i*bytes_per_line:06x} | {hexd_ljust}| {text}"
        skipped = False


def pager(rows, pagercmd=None):  # pragma: no cover
    """Opens a pager (e.g., ``less``) to display the given text. Requires a terminal.

    :param rows: a ``bytes`` or a list/iterator of "rows" (``bytes``)
    :param pagercmd: the pager program to run. Defaults to ``less -RSin``
    """
    if not pagercmd:
        pagercmd = local["less"]["-RSin"]
    if hasattr(rows, "splitlines"):
        rows = rows.splitlines()

    pg = pagercmd.popen(stdout=None, stderr=None)
    try:
        for row in rows:
            line = f"{row}\n"
            try:
                pg.stdin.write(line)
                pg.stdin.flush()
            except OSError:
                break
        pg.stdin.close()
        pg.wait()
    finally:
        with contextlib.suppress(Exception):
            rows.close()
        if pg and pg.poll() is None:
            with contextlib.suppress(Exception):
                pg.terminate()
            os.system("reset")