File: test_gui.py

package info (click to toggle)
python-traitsui 8.0.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 18,232 kB
  • sloc: python: 58,982; makefile: 113
file content (176 lines) | stat: -rw-r--r-- 5,938 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# (C) Copyright 2004-2023 Enthought, Inc., Austin, TX
# All rights reserved.
#
# This software is provided without warranty under the terms of the BSD
# license included in LICENSE.txt and may be redistributed only under
# the conditions described in the aforementioned license. The license
# is also available online at http://www.enthought.com/licenses/BSD.txt
#
# Thanks for using Enthought open source!

""" Tests for traitsui.testing._gui """

import os
import unittest

from pyface.api import GUI

from traitsui.tests._tools import (
    is_qt,
    is_wx,
    is_mac_os,
    process_cascade_events,
    requires_toolkit,
    ToolkitName,
)


if is_qt():

    # Create a QObject that will emit a new event to itself as long as
    # it has not received enough.

    from pyface.qt import QtCore

    class DummyQObject(QtCore.QObject):
        def __init__(self, max_n_events):
            super().__init__()
            self.max_n_events = max_n_events
            self.n_events = 0

        def event(self, event):
            if event.type() != QtCore.QEvent.Type.User:
                return super().event(event)

            self.n_events += 1

            if self.n_events < self.max_n_events:
                new_event = QtCore.QEvent(QtCore.QEvent.Type.User)
                QtCore.QCoreApplication.postEvent(self, new_event)
            return True


if is_wx():

    # Create a wx.EvtHandler that will emit a new event to itself as long as
    # it has not received enough.

    import wx
    import wx.lib.newevent

    NewEvent, EVT_SOME_NEW_EVENT = wx.lib.newevent.NewEvent()

    class DummyWxHandler(wx.EvtHandler):
        def __init__(self, max_n_events):
            super().__init__()
            self.max_n_events = max_n_events
            self.n_events = 0

        def TryBefore(self, event):
            self.n_events += 1
            if self.n_events < self.max_n_events:
                self.post_event()
            return True

        def post_event(self):
            event = NewEvent()
            wx.PostEvent(self, event)


class TestProcessEventsRepeated(unittest.TestCase):
    """Test process_events actually processes all events, including the events
    posted by the processed events.
    """

    @requires_toolkit([ToolkitName.qt])
    def test_qt_process_events_process_all(self):
        from pyface.qt import QtCore

        if QtCore.__version_info__ < (5, 0, 0) and is_mac_os:
            # On Qt4 and OSX, Qt QEventLoop.processEvents says nothing was "
            # processed even when there are events processed, causing the "
            # loop to break too soon. (See enthought/traitsui#951)"
            self.skipTest(
                "process_cascade_events is not reliable on Qt4 + OSX"
            )

        is_appveyor = os.environ.get("APPVEYOR", None) is not None
        if QtCore.__version_info__ <= (5, 12, 6) and is_appveyor:
            # With Qt + Appveyor, process_cascade_events may
            # _occasionally_ break out of its loop too early. This is only
            # seen on Appveyor but has not been reproducible on other Windows
            # machines (See enthought/traitsui#951)
            self.skipTest(
                "process_cascade_events is not reliable on Qt + Appveyor"
            )

        def cleanup(q_object):
            q_object.deleteLater()
            # If the test fails, run process events at least the same
            # number of times as max_n_events
            for _ in range(q_object.max_n_events):
                QtCore.QCoreApplication.processEvents(
                    QtCore.QEventLoop.ProcessEventsFlag.AllEvents
                )

        max_n_events = 10
        q_object = DummyQObject(max_n_events=max_n_events)
        self.addCleanup(cleanup, q_object)

        QtCore.QCoreApplication.postEvent(
            q_object, QtCore.QEvent(QtCore.QEvent.Type.User)
        )

        # sanity check calling processEvents does not process
        # cascade of events.
        QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.ProcessEventsFlag.AllEvents)
        self.assertEqual(q_object.n_events, 1)

        # when
        process_cascade_events()

        # then
        actual = q_object.n_events

        # If process_cascade_events did not do what it promises, then there
        # are still pending tasks left. Run process events at least the same
        # number of times as max_n_events to verify
        for _ in range(max_n_events):
            QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.ProcessEventsFlag.AllEvents)

        n_left_behind_events = q_object.n_events - actual
        msg = (
            "Expected {max_n_events} events processed on the objects and zero "
            "events left on the queue after running process_cascade_events. "
            "Found {actual} processed with {n_left_behind_events} left "
            "behind.".format(
                max_n_events=max_n_events,
                actual=actual,
                n_left_behind_events=n_left_behind_events,
            )
        )
        self.assertEqual(n_left_behind_events, 0, msg)

        # If the previous assertion passes but this one fails, that means some
        # events have gone missing, and that would likely be a problem for the
        # test setup, not for the process_cascade_events.
        self.assertEqual(actual, max_n_events, msg)

    @requires_toolkit([ToolkitName.wx])
    def test_wx_process_events_process_all(self):
        def cleanup(wx_handler):
            # In case of test failure, always flush the GUI event queue.
            GUI.process_events()
            wx_handler.Destroy()

        max_n_events = 10
        wx_handler = DummyWxHandler(max_n_events=max_n_events)
        self.addCleanup(cleanup, wx_handler)

        wx_handler.post_event()

        # when
        process_cascade_events()

        # then
        self.assertEqual(wx_handler.n_events, max_n_events)