1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
from unittest import mock
import pytest
@pytest.mark.parametrize(
"gtk,gtk_available",
[
(False, False),
(False, True),
(True, False),
(True, True),
],
)
def test_install(gtk, gtk_available):
import sys
from gbulb import install
called = False
def set_event_loop_policy(pol):
nonlocal called
called = True
cls_name = pol.__class__.__name__
if gtk:
assert cls_name == "GtkEventLoopPolicy"
else:
assert cls_name == "GLibEventLoopPolicy"
if gtk and "gbulb.gtk" in sys.modules:
del sys.modules["gbulb.gtk"]
mock_repository = mock.Mock()
if not gtk_available:
del mock_repository.Gtk
with mock.patch.dict("sys.modules", {"gi.repository": mock_repository}):
with mock.patch("asyncio.set_event_loop_policy", set_event_loop_policy):
import_error = gtk and not gtk_available
try:
install(gtk=gtk)
except ImportError:
assert import_error
else:
assert not import_error
assert called
def test_get_event_loop():
import asyncio
import gbulb
try:
loop = gbulb.new_event_loop()
asyncio.set_event_loop(loop)
assert asyncio.get_event_loop() is gbulb.get_event_loop()
finally:
loop.close()
def test_wait_signal(glib_loop):
import asyncio
from gi.repository import GObject
from gbulb import wait_signal
class TestObject(GObject.GObject):
__gsignals__ = {
"foo": (GObject.SignalFlags.RUN_LAST, None, (str,)),
}
t = TestObject()
async def emitter():
t.emit("foo", "frozen brains tell no tales")
called = False
async def waiter():
nonlocal called
r = await wait_signal(t, "foo")
assert r == (t, "frozen brains tell no tales")
called = True
glib_loop.run_until_complete(
asyncio.wait(
[
glib_loop.create_task(waiter()),
glib_loop.create_task(emitter()),
],
timeout=1,
)
)
assert called
def test_wait_signal_cancel(glib_loop):
import asyncio
from gi.repository import GObject
from gbulb import wait_signal
class TestObject(GObject.GObject):
__gsignals__ = {
"foo": (GObject.SignalFlags.RUN_LAST, None, (str,)),
}
t = TestObject()
async def emitter():
t.emit("foo", "frozen brains tell no tales")
called = False
cancelled = False
async def waiter():
nonlocal cancelled
# Yield to the event loop
await asyncio.sleep(0)
r = wait_signal(t, "foo")
@r.add_done_callback
def caller(r):
nonlocal called
called = True
r.cancel()
assert r.cancelled()
cancelled = True
glib_loop.run_until_complete(
asyncio.wait(
[
glib_loop.create_task(waiter()),
glib_loop.create_task(emitter()),
],
timeout=1,
)
)
assert cancelled
assert called
def test_wait_signal_cancel_state():
from gbulb import wait_signal
m = wait_signal(mock.Mock(), "anything")
assert m.cancel()
assert not m.cancel()
|