File: test_nullmodem.py

package info (click to toggle)
pymodbus 3.8.6-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,720 kB
  • sloc: python: 14,867; makefile: 27; sh: 17
file content (297 lines) | stat: -rw-r--r-- 11,339 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
"""Test transport."""
import asyncio
from unittest import mock

import pytest

from pymodbus.transport.transport import NullModem


class TestTransportNullModem:
    """Test null modem module."""

    @staticmethod
    @pytest.fixture(name="use_port")
    def get_port_in_class(base_ports):
        """Return next port."""
        base_ports[__class__.__name__] += 2
        return base_ports[__class__.__name__]

    async def test_init(self, dummy_protocol):
        """Test initialize."""
        prot = dummy_protocol()
        prot.connection_made = mock.Mock()
        prot.connection_lost = mock.Mock()
        NullModem(prot)
        prot.connection_made.assert_not_called()
        prot.connection_lost.assert_not_called()

    async def test_close(self, dummy_protocol):
        """Test close."""
        prot = dummy_protocol()
        prot.connection_made = mock.Mock()
        prot.connection_lost = mock.Mock()
        modem = NullModem(prot)
        modem.close()
        prot.connection_made.assert_not_called()
        prot.connection_lost.assert_called_once()

    async def test_no_close(self, dummy_protocol):
        """Test no close."""
        prot = dummy_protocol()
        NullModem(prot)


    async def test_close_no_protocol(self):
        """Test close no protocol."""
        modem = NullModem(None)
        modem.close()

    async def test_close_twice(self, dummy_protocol):
        """Test close twice."""
        prot = dummy_protocol()
        prot.connection_made = mock.Mock()
        prot.connection_lost = mock.Mock()
        modem = NullModem(prot)
        modem.close()
        prot.connection_made.assert_not_called()
        prot.connection_lost.assert_called_once()
        modem.close()  # test _is_closing works.

    async def test_listen(self, dummy_protocol, use_port):
        """Test listener (shared list)."""
        prot = dummy_protocol(is_server=True)
        prot.connection_made = mock.Mock()
        prot.connection_lost = mock.Mock()
        listen = NullModem.set_listener(use_port, prot)
        assert NullModem.listeners[use_port] == prot
        assert len(NullModem.listeners) == 1
        assert not NullModem.connections
        listen.close()
        assert not NullModem.listeners
        prot.connection_made.assert_not_called()
        prot.connection_lost.assert_not_called()

    async def test_listen_twice(self, dummy_protocol, use_port):
        """Test exception when listening twice."""
        listen1 = NullModem.set_listener(use_port, dummy_protocol(is_server=True))
        listen1.close()
        listen2 = NullModem.set_listener(use_port, dummy_protocol(is_server=True))
        assert len(NullModem.listeners) == 1
        listen2.close()
        assert not NullModem.listeners

    async def test_listen_twice_fail(self, dummy_protocol, use_port):
        """Test exception when listening twice."""
        listen1 = NullModem.set_listener(use_port, dummy_protocol(is_server=True))
        with pytest.raises(AssertionError):
            NullModem.set_listener(use_port, dummy_protocol(is_server=True))
        listen1.close()
        assert not NullModem.listeners

    async def test_listen_triangle(self, dummy_protocol, use_port):
        """Test listener (shared list)."""
        listen1 = NullModem.set_listener(use_port, dummy_protocol(is_server=True))
        listen2 = NullModem.set_listener(use_port + 1, dummy_protocol(is_server=True))
        listen1.close()
        assert use_port not in NullModem.listeners
        assert len(NullModem.listeners) == 1
        listen2.close()
        assert not NullModem.listeners

    async def test_is_dirty(self, dummy_protocol, use_port):
        """Test is_dirty()."""
        assert not NullModem.is_dirty()
        listen1 = NullModem.set_listener(use_port, dummy_protocol(is_server=True))
        assert NullModem.is_dirty()
        listen1.close()
        assert not NullModem.is_dirty()

    async def test_connect(self, dummy_protocol, use_port):
        """Test connect."""
        prot_listen = dummy_protocol(is_server=True)
        prot_listen.connection_made = mock.Mock()
        prot_listen.connection_lost = mock.Mock()
        listen = NullModem.set_listener(use_port, prot_listen)
        prot1 = dummy_protocol()
        prot1.connection_made = mock.Mock()
        prot1.connection_lost = mock.Mock()
        modem, _ = NullModem.set_connection(use_port, prot1)
        modem_b = modem.other_modem
        assert modem.protocol != listen.protocol
        assert modem.protocol != modem_b.protocol
        assert len(NullModem.connections) == 2
        assert NullModem.connections[modem] == use_port
        assert NullModem.connections[modem_b] == -use_port
        modem.close()
        assert modem_b not in NullModem.connections
        listen.close()
        prot_listen.connection_made.assert_not_called()
        prot_listen.connection_lost.assert_not_called()
        prot1.connection_made.assert_called_once()
        prot1.connection_lost.assert_called_once()

    async def test_connect_no_listen(self, dummy_protocol, use_port):
        """Test connect without listen."""
        with pytest.raises(asyncio.TimeoutError):
            NullModem.set_connection(use_port, dummy_protocol())

    async def test_listen_close(self, dummy_protocol, use_port):
        """Test connect without listen."""
        listen = NullModem.set_listener(use_port, dummy_protocol(is_server=True))
        modem, _ = NullModem.set_connection(use_port, dummy_protocol())
        listen.close()
        assert len(NullModem.connections) == 2
        assert not NullModem.listeners
        modem.close()

    async def test_connect_multiple(self, dummy_protocol, use_port):
        """Test multiple connect."""
        listen1 = NullModem.set_listener(use_port, dummy_protocol(is_server=True))
        modem1, _ = NullModem.set_connection(use_port, dummy_protocol())
        modem1b = modem1.other_modem
        modem2, _ = NullModem.set_connection(use_port, dummy_protocol())
        modem2b = modem2.other_modem
        protocol_list = [
            modem1.protocol,
            modem1b.protocol,
            modem2.protocol,
            modem2b.protocol,
            listen1.protocol,
        ]
        entries = [modem1, modem1b, modem2, modem2b]
        for inx in range(0, 4):
            test_list = protocol_list.copy()
            del test_list[inx]
            assert entries[inx].protocol not in test_list
        assert len(NullModem.connections) == 4
        listen1.close()
        modem1.close()
        assert len(NullModem.connections) == 2
        assert modem1b not in NullModem.connections
        assert modem2b in NullModem.connections
        modem2.close()

    async def test_is_dirty_with_connection(self, dummy_protocol, use_port):
        """Test connect."""
        assert not NullModem.is_dirty()
        listen = NullModem.set_listener(use_port, dummy_protocol(is_server=True))
        modem, _ = NullModem.set_connection(use_port, dummy_protocol())
        assert NullModem.is_dirty()
        modem.close()
        listen.close()
        assert not NullModem.is_dirty()

    async def test_flow_write(self, dummy_protocol, use_port):
        """Test flow write."""
        listen = NullModem.set_listener(use_port, dummy_protocol(is_server=True))
        modem, _ = NullModem.set_connection(use_port, dummy_protocol())
        modem_b = modem.other_modem
        test_data1 = b"abcd"
        test_data2 = b"efgh"
        modem.write(test_data1)
        modem_b.write(test_data2)
        assert modem_b.protocol.recv_buffer == test_data1
        assert modem.protocol.recv_buffer == test_data2
        modem.close()
        listen.close()


    async def test_flow_sendto(self, dummy_protocol, use_port):
        """Test flow sendto."""
        listen = NullModem.set_listener(use_port, dummy_protocol(is_server=True))
        modem, _ = NullModem.set_connection(use_port, dummy_protocol())
        modem_b = modem.other_modem
        test_data1 = b"abcd"
        test_data2 = b"efgh"
        modem.sendto(test_data1)
        modem_b.sendto(test_data2)
        assert modem_b.protocol.recv_buffer == test_data1
        assert modem.protocol.recv_buffer == test_data2
        modem.close()
        listen.close()

    async def test_flow_not_connected(self, dummy_protocol):
        """Test flow not connected."""
        modem = NullModem(dummy_protocol())
        modem.sendto("no connection")
        assert modem.protocol.recv_buffer == b''
        modem.close()


    async def test_triangle_flow(self, dummy_protocol, use_port):
        """Test triangle flow."""
        listen = NullModem.set_listener(use_port, dummy_protocol(is_server=True))
        modem1, _ = NullModem.set_connection(use_port, dummy_protocol())
        modem1b = modem1.other_modem
        modem2, _ = NullModem.set_connection(use_port, dummy_protocol())
        modem2b = modem2.other_modem
        test_data1 = b"abcd"
        test_data2 = b"efgh"
        test_data3 = b"ijkl"
        test_data4 = b"mnop"
        modem1.write(test_data1)
        modem1b.write(test_data2)
        assert modem1b.protocol.recv_buffer == test_data1
        assert modem1.protocol.recv_buffer == test_data2
        assert modem2b.protocol.recv_buffer == b""
        assert modem2.protocol.recv_buffer == b""
        modem2.write(test_data3)
        modem2b.write(test_data4)
        assert modem1b.protocol.recv_buffer == test_data1
        assert modem1.protocol.recv_buffer == test_data2
        assert modem2b.protocol.recv_buffer == test_data3
        assert modem2.protocol.recv_buffer == test_data4
        modem1.close()
        modem2.close()
        listen.close()

    @pytest.mark.parametrize(
        "add_text",
        [
            [b""],
            [b"MANIPULATED"],
            [b"MANIPULATED", b"and more"],
            [b"MANIPULATED", b"and", b"much more"],
        ],
    )
    async def test_manipulator(self, add_text, dummy_protocol, use_port):
        """Test manipulator."""

        def manipulator(data):
            """Test manipulator."""
            data = [data]
            data.extend(add_text)
            return data

        listen = NullModem.set_listener(use_port, dummy_protocol(is_server=True))
        modem, _ = NullModem.set_connection(use_port, dummy_protocol())
        modem_b = modem.other_modem
        modem.set_manipulator(manipulator)
        data1 = b"abcd"
        data2 = b"efgh"
        modem.write(data1)
        modem.write(data2)
        modem_b.write(data1)
        modem_b.write(data2)
        x = b"".join(part for part in add_text)
        assert modem.protocol.recv_buffer == data1 + data2
        assert modem_b.protocol.recv_buffer == data1 + x + data2 + x
        modem.close()
        listen.close()

    async def test_abstract_methods(self, dummy_protocol):
        """Test asyncio abstract methods."""
        modem = NullModem(dummy_protocol())
        modem.abort()
        modem.can_write_eof()
        modem.get_write_buffer_size()
        modem.get_write_buffer_limits()
        modem.set_write_buffer_limits(1024, 1)
        modem.write_eof()
        modem.get_protocol()
        modem.set_protocol(None)
        modem.is_closing()
        modem.is_reading()
        modem.pause_reading()
        modem.resume_reading()