1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
|
import pytest
import struct
from functools import partial
from ..validators import validate_response_mbap
from umodbus.client import tcp
@pytest.mark.parametrize('function_code, quantity', [
(1, 0),
(2, 0),
(3, 0),
(4, 0),
(1, 0x07D0 + 1),
(2, 0x07D0 + 1),
(3, 0x007D + 1),
(4, 0x007D + 1),
])
def test_request_returning_invalid_data_value_error(sock, mbap, function_code,
quantity):
""" Validate response PDU of request returning excepetion response with
error code 3.
"""
function_code, starting_address, quantity = (function_code, 0, quantity)
adu = mbap + struct.pack('>BHH', function_code, starting_address, quantity)
sock.send(adu)
resp = sock.recv(1024)
validate_response_mbap(mbap, resp)
assert struct.unpack('>BB', resp[-2:]) == (0x80 + function_code, 3)
@pytest.mark.parametrize('function', [
(partial(tcp.read_coils, 1, 9, 2)),
(partial(tcp.read_discrete_inputs, 1, 9, 2)),
(partial(tcp.read_holding_registers, 1, 9, 2)),
(partial(tcp.read_input_registers, 1, 9, 2)),
(partial(tcp.write_single_coil, 1, 11, 0)),
(partial(tcp.write_single_register, 1, 11, 1337)),
(partial(tcp.write_multiple_coils, 1, 9, [1, 1])),
(partial(tcp.write_multiple_registers, 1, 9, [1337, 15])),
])
def test_request_returning_invalid_data_address_error(sock, function):
""" Validate response PDU of request returning excepetion response with
error code 2.
"""
adu = function()
mbap = adu[:7]
function_code = struct.unpack('>B', adu[7:8])[0]
sock.send(adu)
resp = sock.recv(1024)
validate_response_mbap(mbap, resp)
assert struct.unpack('>BB', resp[-2:]) == (0x80 + function_code, 2)
@pytest.mark.parametrize('function', [
(partial(tcp.read_coils, 1, 666, 1)),
(partial(tcp.read_discrete_inputs, 1, 666, 1)),
(partial(tcp.read_holding_registers, 1, 666, 1)),
(partial(tcp.read_input_registers, 1, 666, 1)),
(partial(tcp.write_single_coil, 1, 666, 0)),
(partial(tcp.write_single_register, 1, 666, 1337)),
(partial(tcp.write_multiple_coils, 1, 666, [1])),
(partial(tcp.write_multiple_registers, 1, 666, [1337])),
])
def test_request_returning_server_device_failure_error(sock, function):
""" Validate response PDU of request returning excepetion response with
error code 4.
"""
adu = function()
mbap = adu[:7]
function_code = struct.unpack('>B', adu[7:8])[0]
sock.send(adu)
resp = sock.recv(1024)
validate_response_mbap(mbap, resp)
assert struct.unpack('>BB', resp[-2:]) == (0x80 + function_code, 4)
|