1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
|
# (C) Copyright 2004-2023 Enthought, Inc., Austin, TX
# All rights reserved.
#
# This software is provided without warranty under the terms of the BSD
# license included in LICENSE.txt and may be redistributed only under
# the conditions described in the aforementioned license. The license
# is also available online at http://www.enthought.com/licenses/BSD.txt
#
# Thanks for using Enthought open source!
""" Tests for traitsui.testing._gui """
import os
import unittest
from pyface.api import GUI
from traitsui.tests._tools import (
is_qt,
is_wx,
is_mac_os,
process_cascade_events,
requires_toolkit,
ToolkitName,
)
if is_qt():
# Create a QObject that will emit a new event to itself as long as
# it has not received enough.
from pyface.qt import QtCore
class DummyQObject(QtCore.QObject):
def __init__(self, max_n_events):
super().__init__()
self.max_n_events = max_n_events
self.n_events = 0
def event(self, event):
if event.type() != QtCore.QEvent.Type.User:
return super().event(event)
self.n_events += 1
if self.n_events < self.max_n_events:
new_event = QtCore.QEvent(QtCore.QEvent.Type.User)
QtCore.QCoreApplication.postEvent(self, new_event)
return True
if is_wx():
# Create a wx.EvtHandler that will emit a new event to itself as long as
# it has not received enough.
import wx
import wx.lib.newevent
NewEvent, EVT_SOME_NEW_EVENT = wx.lib.newevent.NewEvent()
class DummyWxHandler(wx.EvtHandler):
def __init__(self, max_n_events):
super().__init__()
self.max_n_events = max_n_events
self.n_events = 0
def TryBefore(self, event):
self.n_events += 1
if self.n_events < self.max_n_events:
self.post_event()
return True
def post_event(self):
event = NewEvent()
wx.PostEvent(self, event)
class TestProcessEventsRepeated(unittest.TestCase):
"""Test process_events actually processes all events, including the events
posted by the processed events.
"""
@requires_toolkit([ToolkitName.qt])
def test_qt_process_events_process_all(self):
from pyface.qt import QtCore
if QtCore.__version_info__ < (5, 0, 0) and is_mac_os:
# On Qt4 and OSX, Qt QEventLoop.processEvents says nothing was "
# processed even when there are events processed, causing the "
# loop to break too soon. (See enthought/traitsui#951)"
self.skipTest(
"process_cascade_events is not reliable on Qt4 + OSX"
)
is_appveyor = os.environ.get("APPVEYOR", None) is not None
if QtCore.__version_info__ <= (5, 12, 6) and is_appveyor:
# With Qt + Appveyor, process_cascade_events may
# _occasionally_ break out of its loop too early. This is only
# seen on Appveyor but has not been reproducible on other Windows
# machines (See enthought/traitsui#951)
self.skipTest(
"process_cascade_events is not reliable on Qt + Appveyor"
)
def cleanup(q_object):
q_object.deleteLater()
# If the test fails, run process events at least the same
# number of times as max_n_events
for _ in range(q_object.max_n_events):
QtCore.QCoreApplication.processEvents(
QtCore.QEventLoop.ProcessEventsFlag.AllEvents
)
max_n_events = 10
q_object = DummyQObject(max_n_events=max_n_events)
self.addCleanup(cleanup, q_object)
QtCore.QCoreApplication.postEvent(
q_object, QtCore.QEvent(QtCore.QEvent.Type.User)
)
# sanity check calling processEvents does not process
# cascade of events.
QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.ProcessEventsFlag.AllEvents)
self.assertEqual(q_object.n_events, 1)
# when
process_cascade_events()
# then
actual = q_object.n_events
# If process_cascade_events did not do what it promises, then there
# are still pending tasks left. Run process events at least the same
# number of times as max_n_events to verify
for _ in range(max_n_events):
QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.ProcessEventsFlag.AllEvents)
n_left_behind_events = q_object.n_events - actual
msg = (
"Expected {max_n_events} events processed on the objects and zero "
"events left on the queue after running process_cascade_events. "
"Found {actual} processed with {n_left_behind_events} left "
"behind.".format(
max_n_events=max_n_events,
actual=actual,
n_left_behind_events=n_left_behind_events,
)
)
self.assertEqual(n_left_behind_events, 0, msg)
# If the previous assertion passes but this one fails, that means some
# events have gone missing, and that would likely be a problem for the
# test setup, not for the process_cascade_events.
self.assertEqual(actual, max_n_events, msg)
@requires_toolkit([ToolkitName.wx])
def test_wx_process_events_process_all(self):
def cleanup(wx_handler):
# In case of test failure, always flush the GUI event queue.
GUI.process_events()
wx_handler.Destroy()
max_n_events = 10
wx_handler = DummyWxHandler(max_n_events=max_n_events)
self.addCleanup(cleanup, wx_handler)
wx_handler.post_event()
# when
process_cascade_events()
# then
self.assertEqual(wx_handler.n_events, max_n_events)
|