File: __init__.py

package info (click to toggle)
pynput 1.8.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 596 kB
  • sloc: python: 6,309; makefile: 10; sh: 1
file content (260 lines) | stat: -rw-r--r-- 7,940 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# coding=utf-8
# pystray
# Copyright (C) 2015-2024 Moses Palmér
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import contextlib
import functools
import time
import unittest

from six.moves import input

import pynput


#: The name of the current backend
BACKEND = pynput.keyboard.Controller.__module__.rsplit('.', 1)[-1][1:]


def _backend(name, f):
    """Returns ``f`` if the current backend is ``name``.

    :param str name: The name of the backend.

    :param f: A value.

    :return: ``f`` or ``None``
    """
    return f if name == BACKEND else None


def notify(message, delay=None, columns=50):
    """Prints a notification on screen.

    :param str message: The message to display.

    :param delay: An optional delay, in seconds, before returning from this
        function
    :type delay: float or None

    :param int columns: The number of columns for the notification.
    """
    # The maximum length of a message line; we need four columns for the
    # frame
    max_length = columns - 4

    # Split the message into lines containing at most max_length characters
    lines = []
    for line in message.splitlines():
        if lines:
            lines.append('')
        for word in line.split():
            if not lines or not lines[-1] \
                    or len(lines[-1]) + 1 + len(word) > max_length:
                lines.append(word)
            else:
                lines[-1] += ' ' + word

    # Print the message
    print('')
    print('+' + '=' * (columns - 2) + '+')
    for line in lines:
        print(('| {:<%ds} |' % max_length).format(line))
    print('+' + '-' * (columns - 2) + '+')

    if delay:
        time.sleep(delay)



#: A decorator to make a test run only on macOS
darwin = functools.partial(_backend, 'darwin')

#: A decorator to make a test run only on Windows
win32 = functools.partial(_backend, 'win32')

#: A decorator to make a test run only on Linux
xorg = functools.partial(_backend, 'xorg')


class EventTest(unittest.TestCase):
    #: The message displayed when this test suite is started
    NOTIFICATION = None

    #: The controller class; if this is defined, :attr:`controller` will be
    # instantiated for every test
    CONTROLLER_CLASS = None

    #: The listener class; this must be defined for subclasses
    LISTENER_CLASS = None

    #: The maximum number of seconds to wait before failing in
    #: :meth:`assert_stop`
    STOP_MAX_WAIT = 3.0

    #: The minimum number of events to accumulate before checking for changes
    #: in :meth:`assertChange`
    CHANGE_MIN_EVENTS = 50

    @classmethod
    def setUpClass(self):
        self.notify(self.NOTIFICATION, 4)
        self.listeners = []

    @classmethod
    def tearDownClass(self):
        remaining = [
            listener
            for listener in self.listeners
            if not (listener.join(0.5) or listener.is_alive)]
        for listener in remaining:
            listener.join()

    def setUp(self):
        if self.CONTROLLER_CLASS is not None:
            self.controller = self.CONTROLLER_CLASS()
        self.suppress = False

    @classmethod
    def notify(self, message, delay=None, columns=50):
        notify(message, delay, columns)

    def listener(self, *args, **kwargs):
        """Creates a listener.

        All arguments are passed to the constructor.
        """
        listener = self.LISTENER_CLASS(suppress=self.suppress, *args, **kwargs)
        self.listeners.append(listener)
        return listener

    @contextlib.contextmanager
    def assert_event(self, failure_message, **kwargs):
        """Asserts that a specific event is emitted when a code block is
        executed.

        :param str failure_message: The message to display upon failure.

        :param args: Arguments to pass to the listener constructor.

        :param kwargs: Arguments to pass to the listener constructor.
        """
        def wrapper(name, callback):
            def inner(*a):
                if callback(*a):
                    listener.success = True
                    return False

            return inner if callback else None

        with self.listener(**{
                name: wrapper(name, callback)
                for name, callback in kwargs.items()}) as listener:
            time.sleep(0.1)
            listener.success = False
            yield

            for _ in range(30):
                time.sleep(0.1)
                if listener.success:
                    break

        self.assertTrue(
            listener.success,
            failure_message)

    def assert_stop(self, failure_message, **callbacks):
        """Asserts that a listener stop within :attr:`STOP_MAX_WAIT` seconds.

        :param str failure_message: The message to display upon failure.

        :param args: Arguments to pass to the listener constructor.

        :param callbacks: The callbacks for checking whether change has
            occurred.
        """
        success = False
        listener = self.listener(**callbacks)
        with listener:
            for _ in range(10):
                time.sleep(self.STOP_MAX_WAIT * 0.1)
                if not listener.running:
                    success = True
                    break

        self.assertTrue(
            success,
            failure_message)

    def assert_cumulative(self, failure_message, **callbacks):
        """Asserts that the callback returns true for at least two thirds of
        the elements.

        At least :attr:`CHANGE_MIN_EVENTS` will be examined.

        :param str failure_message: The message to display upon failure.

        :param callbacks: The callbacks for checking whether change has
            occurred.
        """
        # The lists of accumulated events
        events = {
            name: []
            for name in callbacks}

        def wrapper(name, callback):
            def inner(*a):
                cache = events[name]
                cache.append(a)

                total_length = len(cache)
                if total_length > self.CHANGE_MIN_EVENTS:
                    change_length = len([
                        None
                        for i, b in enumerate(cache[1:])
                        if callback(cache[i], b)])

                    if change_length > (2 * total_length) / 3:
                        return False

            return inner if callback else None

        self.assert_stop(failure_message, **{
            name: wrapper(name, callback)
            for name, callback in callbacks.items()})

    def confirm(self, statement, *fmt):
        """Asks the user to confirm a statement.

        :param str statement: The statement to confirm.

        :raises AssertionError: if the user does not confirm
        """
        valid_responses = ('yes', 'y', 'no', 'n')
        accept_responses = valid_responses[:2]

        message = ('\n' + statement % fmt) + ' '
        while True:
            response = input(message)
            if response.lower() in valid_responses:
                self.assertIn(
                    response.lower(), accept_responses,
                    'User declined statement "%s"' % message)
                return
            else:
                print(
                    'Please respond %s' % ', '.join(
                        '"%s"' % r for r in valid_responses))