1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
import pytest
from pyqtgraph import SignalProxy
from pyqtgraph.Qt import QtCore, mkQApp
class Sender(QtCore.QObject):
signalSend = QtCore.Signal()
def __init__(self, parent=None):
super(Sender, self).__init__(parent)
class Receiver(QtCore.QObject):
def __init__(self, parent=None):
super(Receiver, self).__init__(parent)
self.counter = 0
@QtCore.Slot()
def slotReceive(self):
self.counter += 1
@pytest.fixture
def qapp():
app = mkQApp()
if app is None:
app = mkQApp()
yield app
app.processEvents(QtCore.QEventLoop.ProcessEventsFlag.AllEvents, 100)
def test_signal_proxy_slot(qapp):
"""Test the normal work mode of SignalProxy with `signal` and `slot`"""
sender = Sender(parent=qapp)
receiver = Receiver(parent=qapp)
proxy = SignalProxy(sender.signalSend, delay=0.0, rateLimit=0.6,
slot=receiver.slotReceive, threadSafe=False)
assert proxy.blockSignal is False
assert proxy is not None
assert sender is not None
assert receiver is not None
sender.signalSend.emit()
proxy.flush()
qapp.processEvents(QtCore.QEventLoop.ProcessEventsFlag.AllEvents, 10)
assert receiver.counter > 0
def test_signal_proxy_disconnect_slot(qapp):
"""Test the disconnect of SignalProxy with `signal` and `slot`"""
sender = Sender(parent=qapp)
receiver = Receiver(parent=qapp)
proxy = SignalProxy(sender.signalSend, delay=0.0, rateLimit=0.6,
slot=receiver.slotReceive, threadSafe=False)
assert proxy.blockSignal is False
assert proxy is not None
assert sender is not None
assert receiver is not None
assert proxy.slot is not None
proxy.disconnect()
assert proxy.slot is None
assert proxy.blockSignal is True
sender.signalSend.emit()
proxy.flush()
qapp.processEvents(QtCore.QEventLoop.ProcessEventsFlag.AllEvents, 10)
assert receiver.counter == 0
def test_signal_proxy_no_slot_start(qapp):
"""Test the connect mode of SignalProxy without slot at start`"""
sender = Sender(parent=qapp)
receiver = Receiver(parent=qapp)
proxy = SignalProxy(sender.signalSend, delay=0.0, rateLimit=0.6,
threadSafe=False)
assert proxy.blockSignal is True
assert proxy is not None
assert sender is not None
assert receiver is not None
sender.signalSend.emit()
proxy.flush()
qapp.processEvents(QtCore.QEventLoop.ProcessEventsFlag.AllEvents, 10)
assert receiver.counter == 0
proxy.connectSlot(receiver.slotReceive)
assert proxy.blockSignal is False
sender.signalSend.emit()
proxy.flush()
qapp.processEvents(QtCore.QEventLoop.ProcessEventsFlag.AllEvents, 10)
assert receiver.counter > 0
# An additional connect should raise an AssertionError
with pytest.raises(AssertionError):
proxy.connectSlot(receiver.slotReceive)
def test_signal_proxy_slot_block(qapp):
"""Test the block mode of SignalProxy with `signal` and `slot`"""
sender = Sender(parent=qapp)
receiver = Receiver(parent=qapp)
proxy = SignalProxy(sender.signalSend, delay=0.0, rateLimit=0.6,
slot=receiver.slotReceive, threadSafe=False)
assert proxy.blockSignal is False
assert proxy is not None
assert sender is not None
assert receiver is not None
with proxy.block():
sender.signalSend.emit()
sender.signalSend.emit()
sender.signalSend.emit()
proxy.flush()
qapp.processEvents(QtCore.QEventLoop.ProcessEventsFlag.AllEvents, 10)
assert receiver.counter == 0
sender.signalSend.emit()
proxy.flush()
qapp.processEvents(QtCore.QEventLoop.ProcessEventsFlag.AllEvents, 10)
assert receiver.counter > 0
|