File: utils.py

package info (click to toggle)
python-enaml 0.19.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 13,284 kB
  • sloc: python: 31,443; cpp: 4,499; makefile: 140; javascript: 68; lisp: 53; sh: 20
file content (330 lines) | stat: -rw-r--r-- 9,097 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
#------------------------------------------------------------------------------
# Copyright (c) 2013-2025, Nucleic Development Team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
#------------------------------------------------------------------------------
"""Generic utility functions for testing.

"""
import os
import sys
from contextlib import contextmanager

from atom.api import Atom, Bool

import enaml
from enaml.application import timed_call
from enaml.core.enaml_compiler import EnamlCompiler
from enaml.core.parser import parse
from enaml.widgets.api import Window, Dialog, PopupView
with enaml.imports():
    from enaml.stdlib.message_box import MessageBox

# Timeout for qtbot wait (the value used is large due to Travis being sometimes
# very slow).
TIMEOUT = 2000


def compile_source(source, item, filename='<test>', namespace=None):
    """ Compile Enaml source code and return the target item.

    Parameters
    ----------
    source : str
        The Enaml source code string to compile.

    item : str
        The name of the item in the resulting namespace to return.

    filename : str, optional
        The filename to use when compiling the code. The default
        is '<test>'.

    namespace : dict
        Namespace in which to execute the code

    Returns
    -------
    result : object
        The named object from the resulting namespace.

    """
    ast = parse(source, filename)
    code = EnamlCompiler.compile(ast, filename)
    namespace = namespace or {}
    exec(code, namespace)
    return namespace[item]


def run_pending_tasks(qtbot, timeout=1000):
    """Run all enaml pending tasks.

    WARNING: this may not run the Qt event loop if no task is pending.
    This will only deal with tasks scheduled through the schedule function
    (or Application method)

    Parameters
    ----------
    timeout : int, optional
        Timeout after which the operation should fail in ms

    """
    def check_pending_tasks():
        assert not qtbot.enaml_app.has_pending_tasks()
    qtbot.wait_until(check_pending_tasks, timeout=TIMEOUT)


def get_window(qtbot, cls=Window, timeout=1000):
    """Convenience function running the event loop and returning the first
    window found in the set of active windows.

    Parameters
    ----------
    cls : type, optional
        Type of the window which should be returned.

    timeout : int, optional
        Timeout after which the operation should fail in ms

    Returns
    -------
    window : Window or None
        Return the first window found matching the specified class

    Raises
    ------
    AssertionError : raised if no window is found in the given time

    """
    def check_window_presence():
        assert [w for w in Window.windows if isinstance(w, cls)]

    qtbot.wait_until(check_window_presence, timeout=TIMEOUT)
    for w in Window.windows:
        if isinstance(w, cls):
            return w


def get_popup(qtbot, cls=PopupView, timeout=1000):
    """Convenience function running the event loop and returning the first
    popup found in the set of active popups.

    Parameters
    ----------
    cls : type, optional
        Type of the window which should be returned.

    timeout : int, optional
        Timeout after which the operation should fail in ms

    Returns
    -------
    popup : PopupView or None
        Return the first window found matching the specified class

    Raises
    ------
    AssertionError : raised if no popup is found in the given time

    """
    def check_popup_presence():
        assert [p for p in PopupView.popup_views if isinstance(p, cls)]

    qtbot.wait_until(check_popup_presence, timeout=TIMEOUT)
    for p in PopupView.popup_views:
        if isinstance(p, cls):
            return p


def wait_for_window_displayed(qtbot, window, timeout=1000):
    """Wait for a window to be displayed.

    This method should be called on already activated windows (the show method
    should have been called).

    """
    if not window.proxy_is_active or not window.proxy.widget:
        msg = 'Window must be activated before waiting for display'
        raise RuntimeError(msg)
    qtbot.wait_exposed(window.proxy.widget, timeout=timeout)
    qtbot.wait(25)


class EventObserver(Atom):
    """Simple observer registering the fact it was called once.

    """
    called = Bool()

    def callback(self, change):
        self.called = True

    def assert_called(self):
        assert self.called


def wait_for_destruction(qtbot, widget):
    """Wait for a widget to get destroyed.

    """
    if widget.is_destroyed:
        return
    obs = EventObserver()
    widget.observe('destroyed', obs.callback)
    qtbot.wait_until(obs.assert_called, timeout=TIMEOUT)


def close_window_or_popup(qtbot, window_or_popup):
    """Close a window/popup and run the event loop to make sure the closing
    complete.

    """
    if window_or_popup.is_destroyed:
        return
    obs = EventObserver()
    window_or_popup.observe('destroyed', obs.callback)
    window_or_popup.close()
    qtbot.wait_until(obs.assert_called, timeout=TIMEOUT)


@contextmanager
def close_all_windows(qtbot):
    """Close all opened windows.

    """
    yield
    run_pending_tasks(qtbot)
    while Window.windows:
        windows = list(Window.windows)
        # First close non top level windows to avoid a window to lose its
        # parent and not remove itself from the set of windows.
        non_top_level_windows = [w for w in windows if w.parent is not None]
        for window in non_top_level_windows:
            close_window_or_popup(qtbot, window)
        for window in windows:
            close_window_or_popup(qtbot, window)


@contextmanager
def close_all_popups(qtbot):
    """Close all opened popups.

    """
    yield
    run_pending_tasks(qtbot)
    while PopupView.popup_views:
        popups = list(PopupView.popup_views)
        # First close non top level popups to avoid a up/window to lose its
        # parent and not remove itself from the set of windows.
        non_top_level_popups = [p for p in popups if p.parent is not None]
        for popup in non_top_level_popups:
            close_window_or_popup(qtbot, popup)
        for popup in popups:
            close_window_or_popup(qtbot, popup)


class ScheduledClosing(object):
    """Scheduled closing of dialog.

    """

    def __init__(self, bot, cls, handler, op, skip_answer):
        self.cls = cls
        self.handler = handler
        self.op = op
        self.bot = bot
        self.skip_answer = skip_answer
        self.called = False

    def __call__(self):
        self.called = True
        from conftest import DIALOG_SLEEP
        dial = get_window(self.bot, cls=self.cls)
        wait_for_window_displayed(self.bot, dial)
        self.bot.wait(DIALOG_SLEEP*1000)
        obs = EventObserver()
        dial.observe('finished', obs.callback)

        try:
            self.handler(self.bot, dial)
        finally:
            if not self.skip_answer:
                getattr(dial, self.op)()
            self.bot.wait_until(obs.assert_called, timeout=TIMEOUT)

    def was_called(self):
        """Assert the scheduler was called.

        """
        assert self.called


@contextmanager
def handle_dialog(qtbot, op='accept', handler=lambda qtbot, window: window,
                  cls=Dialog, time=100, skip_answer=False):
    """Automatically close a dialog opened during the context.

    Parameters
    ----------
    op : {'accept', 'reject'}, optional
        Whether to accept or reject the dialog.

    handler : callable, optional
        Callable taking as arguments the bot and the dialog, called before
        accepting or rejecting the dialog.

    cls : type, optional
        Dialog class to identify.

    time : float, optional
        Time to wait before handling the dialog in ms.

    skip_answer : bool, optional
        Skip answering to the dialog. If this is True the handler should handle
        the answer itself.

    """
    sch = ScheduledClosing(qtbot, cls, handler, op, skip_answer)
    timed_call(time, sch)
    try:
        yield
    except Exception:
        raise
    else:
        qtbot.wait_until(sch.was_called, timeout=TIMEOUT)


@contextmanager
def handle_question(app, answer):
    """Handle question dialog.

    """
    def answer_question(app, dial):
        """Mark the right button as clicked.

        """
        dial.buttons[0 if answer == 'yes' else 1].was_clicked = True

    with handle_dialog(app, 'accept' if answer == 'yes' else 'reject',
                       handler=answer_question, cls=MessageBox):
        yield


@contextmanager
def cd(path, add_to_sys_path=False):
    """ cd to the directory then return to the cwd

    """
    cwd = os.getcwd()
    if add_to_sys_path:
        abspath = os.path.abspath(path)
        sys.path.append(abspath)
    try:
        os.chdir(path)
        yield
    finally:
        os.chdir(cwd)
        if add_to_sys_path:
            sys.path.remove(abspath)