File: test_other_messages.py

package info (click to toggle)
pymodbus 2.1.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 2,704 kB
  • sloc: python: 17,594; makefile: 83; sh: 8
file content (108 lines) | stat: -rw-r--r-- 4,027 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/env python
import unittest
from pymodbus.other_message import *

class ModbusOtherMessageTest(unittest.TestCase):
    '''
    This is the unittest for the pymodbus.other_message module
    '''

    def setUp(self):
        self.requests = [
            ReadExceptionStatusRequest,
            GetCommEventCounterRequest,
            GetCommEventLogRequest,
            ReportSlaveIdRequest,
        ]

        self.responses = [
            lambda: ReadExceptionStatusResponse(0x12),
            lambda: GetCommEventCounterResponse(0x12),
            GetCommEventLogResponse,
            lambda: ReportSlaveIdResponse(0x12),
        ]

    def tearDown(self):
        ''' Cleans up the test environment '''
        del self.requests
        del self.responses

    def testOtherMessagesToString(self):
        for message in self.requests:
            self.assertNotEqual(str(message()), None)
        for message in self.responses:
            self.assertNotEqual(str(message()), None)

    def testReadExceptionStatus(self):
        request = ReadExceptionStatusRequest()
        request.decode(b'\x12')
        self.assertEqual(request.encode(), b'')
        self.assertEqual(request.execute().function_code, 0x07)

        response = ReadExceptionStatusResponse(0x12)
        self.assertEqual(response.encode(), b'\x12')
        response.decode(b'\x12')
        self.assertEqual(response.status, 0x12)

    def testGetCommEventCounter(self):
        request = GetCommEventCounterRequest()
        request.decode(b'\x12')
        self.assertEqual(request.encode(), b'')
        self.assertEqual(request.execute().function_code, 0x0b)

        response = GetCommEventCounterResponse(0x12)
        self.assertEqual(response.encode(), b'\x00\x00\x00\x12')
        response.decode(b'\x00\x00\x00\x12')
        self.assertEqual(response.status, True)
        self.assertEqual(response.count, 0x12)

        response.status = False
        self.assertEqual(response.encode(), b'\xFF\xFF\x00\x12')

    def testGetCommEventLog(self):
        request = GetCommEventLogRequest()
        request.decode(b'\x12')
        self.assertEqual(request.encode(), b'')
        self.assertEqual(request.execute().function_code, 0x0c)

        response = GetCommEventLogResponse()
        self.assertEqual(response.encode(), b'\x06\x00\x00\x00\x00\x00\x00')
        response.decode(b'\x06\x00\x00\x00\x12\x00\x12')
        self.assertEqual(response.status, True)
        self.assertEqual(response.message_count, 0x12)
        self.assertEqual(response.event_count, 0x12)
        self.assertEqual(response.events, [])

        response.status = False
        self.assertEqual(response.encode(), b'\x06\xff\xff\x00\x12\x00\x12')

    def testGetCommEventLogWithEvents(self):
        response = GetCommEventLogResponse(events=[0x12,0x34,0x56])
        self.assertEqual(response.encode(), b'\x09\x00\x00\x00\x00\x00\x00\x12\x34\x56')
        response.decode(b'\x09\x00\x00\x00\x12\x00\x12\x12\x34\x56')
        self.assertEqual(response.status, True)
        self.assertEqual(response.message_count, 0x12)
        self.assertEqual(response.event_count, 0x12)
        self.assertEqual(response.events, [0x12,0x34,0x56])

    def testReportSlaveId(self):
        request = ReportSlaveIdRequest()
        request.decode(b'\x12')
        self.assertEqual(request.encode(), b'')
        self.assertEqual(request.execute().function_code, 0x11)

        response = ReportSlaveIdResponse(request.execute().identifier, True)

        self.assertEqual(response.encode(), b'\tPymodbus\xff')
        response.decode(b'\x03\x12\x00')
        self.assertEqual(response.status, False)
        self.assertEqual(response.identifier, b'\x12\x00')

        response.status = False
        self.assertEqual(response.encode(), b'\x03\x12\x00\x00')

#---------------------------------------------------------------------------#
# Main
#---------------------------------------------------------------------------#
if __name__ == "__main__":
    unittest.main()