1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
"""Test example server/client sync.
This is a thorough test of the
- client_sync.py
- server_sync.py
examples.
These are basis for most examples and thus tested separately
"""
import os
from threading import Thread
from time import sleep
import pytest
from examples.client_sync import (
main,
run_a_few_calls,
run_sync_client,
setup_sync_client,
)
from examples.server_async import setup_server
from examples.server_sync import run_sync_server
from pymodbus.exceptions import ConnectionException
from pymodbus.server import ServerStop
SLEEPING = 5 if os.name == "nt" else 1
@pytest.mark.parametrize("use_host", ["localhost"])
@pytest.mark.parametrize(
("use_comm", "use_framer"),
[
("tcp", "socket"),
("tcp", "rtu"),
# awaiting fix: ("tls", "tls"),
("udp", "socket"),
("udp", "rtu"),
("serial", "rtu"),
],
)
class TestClientServerSyncExamples:
"""Test Client server async combinations."""
@staticmethod
@pytest.fixture(name="use_port")
def get_port_in_class(base_ports):
"""Return next port."""
base_ports[__class__.__name__] += 1
return base_ports[__class__.__name__]
def test_combinations(
self,
mock_clc,
mock_cls,
):
"""Run sync client and server."""
server_args = setup_server(cmdline=mock_cls)
thread = Thread(target=run_sync_server, args=(server_args,))
thread.daemon = True
thread.start()
sleep(SLEEPING)
main(cmdline=mock_clc)
ServerStop()
def test_server_no_client(self, mock_cls):
"""Run async server without client."""
server_args = setup_server(cmdline=mock_cls)
thread = Thread(target=run_sync_server, args=(server_args,))
thread.daemon = True
thread.start()
sleep(SLEEPING)
ServerStop()
def test_server_client_twice(self, mock_cls, mock_clc, use_comm):
"""Run async server without client."""
if use_comm == "serial":
# cannot open the usb port multiple times
return
server_args = setup_server(cmdline=mock_cls)
thread = Thread(target=run_sync_server, args=(server_args,))
thread.daemon = True
thread.start()
sleep(SLEEPING)
test_client = setup_sync_client(cmdline=mock_clc)
run_sync_client(test_client, modbus_calls=run_a_few_calls)
sleep(SLEEPING)
run_sync_client(test_client, modbus_calls=run_a_few_calls)
ServerStop()
def test_client_no_server(self, mock_clc):
"""Run async client without server."""
if mock_clc[1] == "udp":
# udp is connectionless, so it it not possible to detect a proper connection
# instead it fails on first message exchange
return
test_client = setup_sync_client(cmdline=mock_clc)
with pytest.raises((AssertionError, ConnectionException)):
run_sync_client(test_client, modbus_calls=run_a_few_calls)
|