1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
|
"""Test server working as slave on a multidrop RS485 line."""
import pytest
from pymodbus.exceptions import ModbusIOException
from pymodbus.framer import FramerAscii, FramerRTU
from pymodbus.pdu import DecodePDU
class TestMultidrop:
"""Test that server works on a multidrop line."""
good_frame = b"\x02\x03\x00\x01\x00}\xd4\x18"
@pytest.fixture(name="framer")
def fixture_framer(self):
"""Prepare framer."""
return FramerRTU(DecodePDU(True))
def test_ok_frame(self, framer):
"""Test ok frame."""
serial_event = self.good_frame
used_len, pdu = framer.processIncomingFrame(serial_event)
assert pdu
assert used_len == len(serial_event)
def test_ok_2frame(self, framer):
"""Test ok frame."""
serial_event = self.good_frame + self.good_frame
used_len, pdu = framer.processIncomingFrame(serial_event)
assert pdu
assert used_len == len(self.good_frame)
used_len, pdu = framer.processIncomingFrame(serial_event[used_len:])
assert pdu
assert used_len == len(self.good_frame)
def test_bad_crc(self, framer):
"""Test bad crc."""
serial_event = b"\x02\x03\x00\x01\x00}\xd4\x19" # Manually mangled crc
_, pdu = framer.processIncomingFrame(serial_event)
assert not pdu
def test_big_split_response_frame_from_other_id(self, framer):
"""Test split response."""
# This is a single *response* from device id 1 after being queried for 125 holding register values
# Because the response is so long it spans several serial events
framer = FramerRTU(DecodePDU(False))
serial_events = [
b'\x01\x03\xfa\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00',
b'\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00',
b'\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00',
b'\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00',
b'\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00',
b'\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00',
b'\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00',
b'\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00',
b'\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00',
b'\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00',
b'\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00',
]
final = b'\x11\x00\x11\x00\x11\x00\x11\x00\x11\x00\x11\xa5\x8f'
data = b''
for serial_event in serial_events:
data += serial_event
used_len, pdu = framer.processIncomingFrame(data)
assert not pdu
assert not used_len
used_len, pdu = framer.processIncomingFrame(data + final)
assert pdu
assert used_len == len(data + final)
def test_split_frame(self, framer):
"""Test split frame."""
used_len, pdu = framer.processIncomingFrame(self.good_frame[:5])
assert not pdu
assert not used_len
used_len, pdu = framer.processIncomingFrame(self.good_frame)
assert pdu
assert used_len == len(self.good_frame)
def test_complete_frame_trailing_data_without_id(self, framer):
"""Test trailing data."""
garbage = b"\x05\x04\x03" # without id
serial_event = garbage + self.good_frame
used_len, pdu = framer.processIncomingFrame(serial_event)
assert pdu
assert used_len == len(serial_event)
def test_complete_frame_trailing_data_with_id(self, framer):
"""Test trailing data."""
garbage = b"\x05\x04\x03\x02\x01\x00" # with id
serial_event = garbage + self.good_frame
used_len, pdu = framer.processIncomingFrame(serial_event)
assert pdu
assert used_len == len(serial_event)
def test_split_frame_trailing_data_with_id(self, framer):
"""Test split frame."""
garbage = b"ABCDEF"
serial_events = garbage + self.good_frame
used_len, pdu = framer.processIncomingFrame(serial_events[:11])
assert not pdu
serial_events = serial_events[used_len:]
used_len, pdu = framer.processIncomingFrame(serial_events)
assert pdu
assert used_len == len(serial_events)
@pytest.mark.parametrize(
("garbage"), [
b"\x02\x90\x07",
b"\x02\x10\x07",
b"\x02\x10\x07\x10",
])
def test_coincidental(self, garbage, framer):
"""Test conincidental."""
serial_events = garbage + self.good_frame
used_len, pdu = framer.processIncomingFrame(serial_events[:5])
assert not pdu
serial_events = serial_events[used_len:]
used_len, pdu = framer.processIncomingFrame(serial_events)
assert pdu
assert used_len == len(serial_events)
def test_wrapped_frame(self, framer):
"""Test wrapped frame."""
garbage = b"\x05\x04\x03\x02\x01\x00"
serial_event = garbage + self.good_frame + garbage
# We probably should not respond in this case; in this case we've likely become desynchronized
# i.e. this probably represents a case where a command came for us, but we didn't get
# to the serial buffer in time (some other co-routine or perhaps a block on the USB bus)
# and the master moved on and queried another device
_, pdu = framer.processIncomingFrame(serial_event)
assert pdu
def test_frame_with_trailing_data(self, framer):
"""Test trailing data."""
garbage = b"\x05\x04\x03\x02\x01\x00"
serial_event = self.good_frame + garbage
# We should not respond in this case for identical reasons as test_wrapped_frame
_, pdu = framer.processIncomingFrame(serial_event)
assert pdu
def test_wrong_class(self):
"""Test conincidental."""
def return_none(_data):
"""Return none."""
return None
framer = FramerAscii(DecodePDU(True))
framer.decoder.decode = return_none
with pytest.raises(ModbusIOException):
framer.processIncomingFrame(b':1103007C00026E\r\n')
def test_getFrameStart(self, framer):
"""Test getFrameStart."""
framer_ok = b"\x02\x03\x00\x01\x00\x7d\xd4\x18"
_, pdu = framer.processIncomingFrame(framer_ok)
assert framer_ok[1:-2] == pdu.function_code.to_bytes(1,'big')+pdu.encode()
framer_2ok = framer_ok + framer_ok
used_len, pdu = framer.processIncomingFrame(framer_2ok)
assert pdu
framer_2ok = framer_2ok[used_len:]
used_len, pdu = framer.processIncomingFrame(framer_2ok)
assert pdu
assert used_len == len(framer_2ok)
def test_rtu_split_frame(self, framer):
"""Test test_rtu_split_frame."""
msg1 = (b'\x01\x08\x00\x00\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb'
b'\xcc\xdd\xee\xff\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99')
msg2 = (b'\xaa\xbb\xcc\xdd\xee\xff\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99'
b'\xaa\xbb\xcc\xdd\xee\xff\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99'
b'\xaa\xbb\xcc\xdd\xee\xff\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99'
b'\xaa\xbb\xcc\xdd\xee\xff\xe7\x65')
used_len, pdu = framer.processIncomingFrame(msg1+msg2)
assert pdu
assert used_len == len(msg1 + msg2)
used_len, pdu = framer.processIncomingFrame(msg1)
assert not pdu
assert not used_len
used_len, pdu = framer.processIncomingFrame(msg1+msg2)
assert pdu
assert used_len == len(msg1 + msg2)
|