File: terminal.py

package info (click to toggle)
netplan.io 1.2-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 4,268 kB
  • sloc: python: 34,640; ansic: 14,096; xml: 4,989; javascript: 2,165; sh: 513; makefile: 118
file content (157 lines) | stat: -rw-r--r-- 5,307 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/python3
#
# Copyright (C) 2018 Canonical, Ltd.
# Author: Mathieu Trudel-Lapierre <mathieu.trudel-lapierre@canonical.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
Terminal / input handling
"""

import fcntl
import os
import termios
import select
import sys


class Terminal(object):
    """
    Do minimal terminal mangling to prompt users for input
    """

    def __init__(self, fd):
        self.fd = fd
        self.orig_flags = None
        self.orig_term = None
        self.save()

    def enable_echo(self):
        if sys.stdin.isatty():
            attrs = termios.tcgetattr(self.fd)
            attrs[3] = attrs[3] | termios.ICANON
            attrs[3] = attrs[3] | termios.ECHO
            termios.tcsetattr(self.fd, termios.TCSANOW, attrs)

    def disable_echo(self):
        if sys.stdin.isatty():
            attrs = termios.tcgetattr(self.fd)
            attrs[3] = attrs[3] & ~termios.ICANON
            attrs[3] = attrs[3] & ~termios.ECHO
            termios.tcsetattr(self.fd, termios.TCSANOW, attrs)

    def enable_nonblocking_io(self):
        flags = fcntl.fcntl(self.fd, fcntl.F_GETFL)
        fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)

    def disable_nonblocking_io(self):
        flags = fcntl.fcntl(self.fd, fcntl.F_GETFL)
        fcntl.fcntl(self.fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)

    def get_confirmation_input(self, timeout=120, message=None):  # pragma: nocover (requires user input)
        """
        Get a "confirmation" input from the user, for at most (timeout)
        seconds. Optionally, customize the message to be displayed.

        timeout -- timeout to wait for input (default 120)
        message -- optional customized message ("Press ENTER to (message)")

        raises:
        InputAccepted -- the user confirmed the changes
        InputRejected -- the user rejected the changes
        """
        print("Do you want to keep these settings?\n\n")

        settings = dict()
        self.save(settings)
        self.disable_echo()
        self.enable_nonblocking_io()

        if not message:
            message = "accept the new configuration"

        print("Press ENTER before the timeout to {}\n\n".format(message))
        timeout_now = timeout
        while (timeout_now > 0):
            print("Changes will revert in {:>{}} seconds".format(timeout_now, len(str(timeout))), end='\r')

            # wait at most 1 second for usable input from stdin
            select.select([sys.stdin], [], [], 1)
            try:
                # retrieve any input from the terminal. select() either has
                # timed out with no input, or found something we can retrieve.
                c = sys.stdin.read()
                if (c == '\n'):
                    self.reset(settings)
                    # Yay, user has accepted the changes!
                    raise InputAccepted()
            except TypeError:
                # read() above is non-blocking, if there is nothing to read it
                # will return TypeError, which we should ignore -- on to the
                # next iteration until timeout.
                pass
            timeout_now -= 1

        # We reached the timeout for our loop, now revert our change for
        # non-blocking I/O and signal the caller the changes were essentially
        # rejected.
        self.reset(settings)
        raise InputRejected()

    def save(self, dest=None):
        """
        Save the terminal's current attributes and flags

        Optional argument:
            - dest: if set, save settings to this dict
        """
        orig_flags = fcntl.fcntl(self.fd, fcntl.F_GETFL)
        orig_term = None
        if sys.stdin.isatty():
            orig_term = termios.tcgetattr(self.fd)
        if dest is not None:
            dest.update({'flags': orig_flags,
                         'term': orig_term})
        else:
            self.orig_flags = orig_flags
            self.orig_term = orig_term

    def reset(self, orig=None):
        """
        Reset the terminal to its original attributes and flags

        Optional argument:
            - orig: if set, reset to settings from this dict
        """
        orig_term = None
        orig_flags = None
        if orig is not None:
            orig_term = orig.get('term')
            orig_flags = orig.get('flags')
        else:
            orig_term = self.orig_term
            orig_flags = self.orig_flags
        if sys.stdin.isatty():
            termios.tcsetattr(self.fd, termios.TCSAFLUSH, orig_term)
        fcntl.fcntl(self.fd, fcntl.F_SETFL, orig_flags)


class InputAccepted(Exception):
    """ Denotes has accepted input"""
    pass


class InputRejected(Exception):
    """ Denotes that the user has rejected input"""
    pass