1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
|
# Copyright (C) PyZMQ Developers
# Distributed under the terms of the Modified BSD License.
import pytest
import zmq
import zmq.asyncio
from zmq.utils.monitor import recv_monitor_message
from zmq_test_utils import require_zmq_4
pytestmark = require_zmq_4
@pytest.fixture(params=["zmq", "asyncio"])
async def Context(request):
if request.param == "asyncio":
return zmq.asyncio.Context
else:
return zmq.Context
async def test_monitor(context, socket):
"""Test monitoring interface for sockets."""
s_rep = socket(zmq.REP)
s_req = socket(zmq.REQ)
s_req.bind("tcp://127.0.0.1:6666")
# try monitoring the REP socket
s_rep.monitor(
"inproc://monitor.rep",
zmq.EVENT_CONNECT_DELAYED | zmq.EVENT_CONNECTED | zmq.EVENT_MONITOR_STOPPED,
)
# create listening socket for monitor
s_event = socket(zmq.PAIR)
s_event.connect("inproc://monitor.rep")
s_event.linger = 0
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6666")
m = recv_monitor_message(s_event)
if isinstance(context, zmq.asyncio.Context):
m = await m
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
assert m['endpoint'] == b"tcp://127.0.0.1:6666"
# test receive event for connected event
m = recv_monitor_message(s_event)
if isinstance(context, zmq.asyncio.Context):
m = await m
assert m['event'] == zmq.EVENT_CONNECTED
assert m['endpoint'] == b"tcp://127.0.0.1:6666"
# test monitor can be disabled.
s_rep.disable_monitor()
m = recv_monitor_message(s_event)
if isinstance(context, zmq.asyncio.Context):
m = await m
assert m['event'] == zmq.EVENT_MONITOR_STOPPED
async def test_monitor_repeat(context, socket, sockets):
s = socket(zmq.PULL)
m = s.get_monitor_socket()
sockets.append(m)
m2 = s.get_monitor_socket()
assert m is m2
s.disable_monitor()
evt = recv_monitor_message(m)
if isinstance(context, zmq.asyncio.Context):
evt = await evt
assert evt['event'] == zmq.EVENT_MONITOR_STOPPED
m.close()
s.close()
async def test_monitor_connected(context, socket, sockets):
"""Test connected monitoring socket."""
s_rep = socket(zmq.REP)
s_req = socket(zmq.REQ)
s_req.bind("tcp://127.0.0.1:6667")
# try monitoring the REP socket
# create listening socket for monitor
s_event = s_rep.get_monitor_socket()
s_event.linger = 0
sockets.append(s_event)
# test receive event for connect event
s_rep.connect("tcp://127.0.0.1:6667")
m = recv_monitor_message(s_event)
if isinstance(context, zmq.asyncio.Context):
m = await m
if m['event'] == zmq.EVENT_CONNECT_DELAYED:
assert m['endpoint'] == b"tcp://127.0.0.1:6667"
# test receive event for connected event
m = recv_monitor_message(s_event)
if isinstance(context, zmq.asyncio.Context):
m = await m
assert m['event'] == zmq.EVENT_CONNECTED
assert m['endpoint'] == b"tcp://127.0.0.1:6667"
|