File: test_server_sync.py

package info (click to toggle)
pymodbus 2.1.0%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 2,708 kB
  • sloc: python: 17,594; makefile: 84; sh: 8
file content (365 lines) | stat: -rw-r--r-- 16,513 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
#!/usr/bin/env python
from pymodbus.compat import IS_PYTHON3
import unittest
if IS_PYTHON3: # Python 3
    from unittest.mock import patch, Mock
else: # Python 2
    from mock import patch, Mock
import serial
import socket

from pymodbus.device import ModbusDeviceIdentification
from pymodbus.server.sync import ModbusBaseRequestHandler
from pymodbus.server.sync import ModbusSingleRequestHandler
from pymodbus.server.sync import ModbusConnectedRequestHandler
from pymodbus.server.sync import ModbusDisconnectedRequestHandler
from pymodbus.server.sync import ModbusTcpServer, ModbusUdpServer, ModbusSerialServer
from pymodbus.server.sync import StartTcpServer, StartUdpServer, StartSerialServer
from pymodbus.exceptions import NotImplementedException
from pymodbus.bit_read_message import ReadCoilsRequest, ReadCoilsResponse
from pymodbus.datastore import ModbusServerContext

from pymodbus.compat import socketserver

import platform
from distutils.version import LooseVersion

IS_DARWIN = platform.system().lower() == "darwin"
OSX_SIERRA = LooseVersion("10.12")
if IS_DARWIN:
    IS_HIGH_SIERRA_OR_ABOVE = LooseVersion(platform.mac_ver()[0])
    SERIAL_PORT = '/dev/ptyp0' if not IS_HIGH_SIERRA_OR_ABOVE else '/dev/ttyp0'
else:
    IS_HIGH_SIERRA_OR_ABOVE = False
    SERIAL_PORT = "/dev/ptmx"
#---------------------------------------------------------------------------#
# Mock Classes
#---------------------------------------------------------------------------#
class MockServer(object):
    def __init__(self):
        self.framer  = lambda _, client=None: "framer"
        self.decoder = "decoder"
        self.threads = []
        self.context = {}


#---------------------------------------------------------------------------#
# Fixture
#---------------------------------------------------------------------------#
class SynchronousServerTest(unittest.TestCase):
    '''
    This is the unittest for the pymodbus.server.sync module
    '''

    #-----------------------------------------------------------------------#
    # Test Base Request Handler
    #-----------------------------------------------------------------------#

    def testBaseHandlerUndefinedMethods(self):
        ''' Test the base handler undefined methods'''
        handler = socketserver.BaseRequestHandler(None, None, None)
        handler.__class__ = ModbusBaseRequestHandler
        self.assertRaises(NotImplementedException, lambda: handler.send(None))
        self.assertRaises(NotImplementedException, lambda: handler.handle())

    def testBaseHandlerMethods(self):
        ''' Test the base class for all the clients '''
        request = ReadCoilsRequest(1, 1)
        address = ('server', 12345)
        server  = MockServer()
        with patch.object(ModbusBaseRequestHandler, 'handle') as mock_handle:
            with patch.object(ModbusBaseRequestHandler, 'send') as mock_send:
                mock_handle.return_value = True
                mock_send.return_value = True
                handler = ModbusBaseRequestHandler(request, address, server)
                self.assertEqual(handler.running, True)
                self.assertEqual(handler.framer, 'framer')

                handler.execute(request)
                self.assertEqual(mock_send.call_count, 1)

                server.context[0x00] = object()
                handler.execute(request)
                self.assertEqual(mock_send.call_count, 2)

    #-----------------------------------------------------------------------#
    # Test Single Request Handler
    #-----------------------------------------------------------------------#
    def testModbusSingleRequestHandlerSend(self):
        handler = socketserver.BaseRequestHandler(None, None, None)
        handler.__class__ = ModbusSingleRequestHandler
        handler.framer  = Mock()
        handler.framer.buildPacket.return_value = b"message"
        handler.request = Mock()
        request = ReadCoilsResponse([1])
        handler.send(request)
        self.assertEqual(handler.request.send.call_count, 1)

        request.should_respond = False
        handler.send(request)
        self.assertEqual(handler.request.send.call_count, 1)

    def testModbusSingleRequestHandlerHandle(self):
        handler = socketserver.BaseRequestHandler(None, None, None)
        handler.__class__ = ModbusSingleRequestHandler
        handler.framer  = Mock()
        handler.framer.buildPacket.return_value = b"message"
        handler.request = Mock()
        handler.socket = Mock()
        handler.server = Mock()
        handler.request.recv.return_value = b"\x12\x34"

        # exit if we are not running
        handler.running = False
        handler.handle()
        self.assertEqual(handler.framer.processIncomingPacket.call_count, 0)

        # run forever if we are running
        def _callback1(a, b, *args, **kwargs):
            handler.running = False # stop infinite loop
        handler.framer.processIncomingPacket.side_effect = _callback1
        handler.running = True
        # Ugly hack
        handler.server.context = ModbusServerContext(slaves={-1: None},
                                                     single=False)
        handler.handle()
        self.assertEqual(handler.framer.processIncomingPacket.call_count, 1)

        # exceptions are simply ignored
        def _callback2(a, b, *args, **kwargs):
            if handler.framer.processIncomingPacket.call_count == 2:
                raise Exception("example exception")
            else: handler.running = False # stop infinite loop
        handler.framer.processIncomingPacket.side_effect = _callback2
        handler.running = True
        handler.handle()
        self.assertEqual(handler.framer.processIncomingPacket.call_count, 3)

    #-----------------------------------------------------------------------#
    # Test Connected Request Handler
    #-----------------------------------------------------------------------#
    def testModbusConnectedRequestHandlerSend(self):
        handler = socketserver.BaseRequestHandler(None, None, None)
        handler.__class__ = ModbusConnectedRequestHandler
        handler.framer = Mock()
        handler.framer.buildPacket.return_value = b"message"
        handler.request = Mock()
        request = ReadCoilsResponse([1])
        handler.send(request)
        self.assertEqual(handler.request.send.call_count, 1)

        request.should_respond = False
        handler.send(request)
        self.assertEqual(handler.request.send.call_count, 1)

    def testModbusConnectedRequestHandlerHandle(self):
        handler = socketserver.BaseRequestHandler(None, None, None)
        handler.__class__ = ModbusConnectedRequestHandler
        handler.server = Mock()
        # handler.server.context.slaves = Mock()
        # protocol.factory.store.single = True
        handler.framer  = Mock()
        handler.framer.buildPacket.return_value = b"message"
        handler.request = Mock()
        handler.request.recv.return_value = b"\x12\x34"

        # exit if we are not running
        handler.running = False
        handler.handle()
        self.assertEqual(handler.framer.processIncomingPacket.call_count, 0)

        # run forever if we are running
        def _callback(a, b, *args, **kwargs):
            handler.running = False # stop infinite loop
        handler.framer.processIncomingPacket.side_effect = _callback
        handler.running = True
        handler.handle()
        self.assertEqual(handler.framer.processIncomingPacket.call_count, 1)

        # socket errors cause the client to disconnect
        handler.framer.processIncomingPacket.side_effect = socket.error()
        handler.running = True
        handler.handle()
        self.assertEqual(handler.framer.processIncomingPacket.call_count, 2)

        # every other exception causes the client to disconnect
        handler.framer.processIncomingPacket.side_effect = Exception()
        handler.running = True
        handler.handle()
        self.assertEqual(handler.framer.processIncomingPacket.call_count, 3)

        # receiving no data causes the client to disconnect
        handler.request.recv.return_value = None
        handler.running = True
        handler.handle()
        self.assertEqual(handler.framer.processIncomingPacket.call_count, 4)

    #-----------------------------------------------------------------------#
    # Test Disconnected Request Handler
    #-----------------------------------------------------------------------#
    def testModbusDisconnectedRequestHandlerSend(self):
        handler = socketserver.BaseRequestHandler(None, None, None)
        handler.__class__ = ModbusDisconnectedRequestHandler
        handler.framer  = Mock()
        handler.server = Mock()
        handler.framer.buildPacket.return_value = b"message"
        handler.request = Mock()
        handler.socket = Mock()
        request = ReadCoilsResponse([1])
        handler.send(request)
        self.assertEqual(handler.socket.sendto.call_count, 1)

        request.should_respond = False
        handler.send(request)
        self.assertEqual(handler.socket.sendto.call_count, 1)

    def testModbusDisconnectedRequestHandlerHandle(self):
        handler = socketserver.BaseRequestHandler(None, None, None)
        handler.__class__ = ModbusDisconnectedRequestHandler
        handler.framer  = Mock()
        handler.server = Mock()
        handler.framer.buildPacket.return_value = b"message"
        handler.request = (b"\x12\x34", handler.request)

        # exit if we are not running
        handler.running = False
        handler.handle()
        self.assertEqual(handler.framer.processIncomingPacket.call_count, 0)

        # run forever if we are running
        def _callback(a, b):
            handler.running = False # stop infinite loop
        handler.framer.processIncomingPacket.side_effect = _callback
        handler.running = True
        handler.handle()
        self.assertEqual(handler.framer.processIncomingPacket.call_count, 1)

        # socket errors cause the client to disconnect
        handler.request = (b"\x12\x34", handler.request)
        handler.framer.processIncomingPacket.side_effect = socket.error()
        handler.running = True
        handler.handle()
        self.assertEqual(handler.framer.processIncomingPacket.call_count, 2)

        # every other exception causes the client to disconnect
        handler.request = (b"\x12\x34", handler.request)
        handler.framer.processIncomingPacket.side_effect = Exception()
        handler.running = True
        handler.handle()
        self.assertEqual(handler.framer.processIncomingPacket.call_count, 3)

        # receiving no data causes the client to disconnect
        handler.request = (None, handler.request)
        handler.running = True
        handler.handle()
        self.assertEqual(handler.framer.processIncomingPacket.call_count, 4)

    #-----------------------------------------------------------------------#
    # Test TCP Server
    #-----------------------------------------------------------------------#
    def testTcpServerClose(self):
        ''' test that the synchronous TCP server closes correctly '''
        with patch.object(socket.socket, 'bind') as mock_socket:
            identity = ModbusDeviceIdentification(info={0x00: 'VendorName'})
            server = ModbusTcpServer(context=None, identity=identity)
            server.threads.append(Mock(**{'running': True}))
            server.server_close()
            self.assertEqual(server.control.Identity.VendorName, 'VendorName')
            self.assertFalse(server.threads[0].running)

    def testTcpServerProcess(self):
        ''' test that the synchronous TCP server processes requests '''
        with patch('pymodbus.compat.socketserver.ThreadingTCPServer') as mock_server:
            server = ModbusTcpServer(None)
            server.process_request('request', 'client')
            self.assertTrue(mock_server.process_request.called)

    #-----------------------------------------------------------------------#
    # Test UDP Server
    #-----------------------------------------------------------------------#
    def testUdpServerClose(self):
        ''' test that the synchronous UDP server closes correctly '''
        with patch.object(socket.socket, 'bind') as mock_socket:
            identity = ModbusDeviceIdentification(info={0x00: 'VendorName'})
            server = ModbusUdpServer(context=None, identity=identity)
            server.threads.append(Mock(**{'running': True}))
            server.server_close()
            self.assertEqual(server.control.Identity.VendorName, 'VendorName')
            self.assertFalse(server.threads[0].running)

    def testUdpServerProcess(self):
        ''' test that the synchronous UDP server processes requests '''
        with patch('pymodbus.compat.socketserver.ThreadingUDPServer') as mock_server:
            server = ModbusUdpServer(None)
            request = ('data', 'socket')
            server.process_request(request, 'client')
            self.assertTrue(mock_server.process_request.called)

    #-----------------------------------------------------------------------#
    # Test Serial Server
    #-----------------------------------------------------------------------#
    def testSerialServerConnect(self):
        with patch.object(serial, 'Serial') as mock_serial:
                # mock_serial.return_value = "socket"
                mock_serial.write = lambda x: len(x)
                mock_serial.read = lambda size: '\x00' * size
                identity = ModbusDeviceIdentification(info={0x00: 'VendorName'})
                server = ModbusSerialServer(context=None, identity=identity, port="dummy")
                # # mock_serial.return_value = "socket"
                # self.assertEqual(server.socket.port, "dummy")
                self.assertEquals(server.handler.__class__.__name__, "CustomSingleRequestHandler")
                self.assertEqual(server.control.Identity.VendorName, 'VendorName')

                server._connect()
                # self.assertEqual(server.socket, "socket")

        with patch.object(serial, 'Serial') as mock_serial:
            mock_serial.write = lambda x: len(x)
            mock_serial.read = lambda size: '\x00' * size
            mock_serial.side_effect = serial.SerialException()
            server = ModbusSerialServer(None, port="dummy")
            self.assertEqual(server.socket, None)

    def testSerialServerServeForever(self):
        ''' test that the synchronous serial server closes correctly '''
        with patch.object(serial, 'Serial') as mock_serial:
            with patch('pymodbus.server.sync.CustomSingleRequestHandler') as mock_handler:
                server = ModbusSerialServer(None)
                instance = mock_handler.return_value
                instance.handle.side_effect = server.server_close
                server.serve_forever()
                instance.handle.assert_any_call()

    def testSerialServerClose(self):
        ''' test that the synchronous serial server closes correctly '''
        with patch.object(serial, 'Serial') as mock_serial:
            instance = mock_serial.return_value
            server = ModbusSerialServer(None)
            server.server_close()
            instance.close.assert_any_call()

    #-----------------------------------------------------------------------#
    # Test Synchronous Factories
    #-----------------------------------------------------------------------#
    def testStartTcpServer(self):
        ''' Test the tcp server starting factory '''
        with patch.object(ModbusTcpServer, 'serve_forever') as mock_server:
            with patch.object(socketserver.TCPServer, 'server_bind') as mock_binder:
                StartTcpServer()

    def testStartUdpServer(self):
        ''' Test the udp server starting factory '''
        with patch.object(ModbusUdpServer, 'serve_forever') as mock_server:
            with patch.object(socketserver.UDPServer, 'server_bind') as mock_binder:
                StartUdpServer()

    def testStartSerialServer(self):
        ''' Test the serial server starting factory '''
        with patch.object(ModbusSerialServer, 'serve_forever') as mock_server:
            StartSerialServer(port=SERIAL_PORT)

#---------------------------------------------------------------------------#
# Main
#---------------------------------------------------------------------------#
if __name__ == "__main__":
    unittest.main()