File: modbus_simulator.py

package info (click to toggle)
pymodbus 2.1.0%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 2,708 kB
  • sloc: python: 17,594; makefile: 84; sh: 8
file content (139 lines) | stat: -rw-r--r-- 4,419 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env python
"""
An example of creating a fully implemented modbus server
with read/write data as well as user configurable base data
"""

import pickle
from optparse import OptionParser
from twisted.internet import reactor

from pymodbus.server.asynchronous import StartTcpServer
from pymodbus.datastore import ModbusServerContext,ModbusSlaveContext

# -------------------------------------------------------------------------- #
# Logging
# -------------------------------------------------------------------------- #
import logging
logging.basicConfig()

server_log = logging.getLogger("pymodbus.server")
protocol_log = logging.getLogger("pymodbus.protocol")

# -------------------------------------------------------------------------- #
# Extra Global Functions
# -------------------------------------------------------------------------- #
# These are extra helper functions that don't belong in a class
# -------------------------------------------------------------------------- #
import getpass


def root_test():
    """ Simple test to see if we are running as root """
    return True  # removed for the time being as it isn't portable
    #return getpass.getuser() == "root"

# -------------------------------------------------------------------------- #
# Helper Classes
# -------------------------------------------------------------------------- #


class ConfigurationException(Exception):
    """ Exception for configuration error """

    def __init__(self, string):
        """ Initializes the ConfigurationException instance

        :param string: The message to append to the exception
        """
        Exception.__init__(self, string)
        self.string = string

    def __str__(self):
        """ Builds a representation of the object

        :returns: A string representation of the object
        """
        return 'Configuration Error: %s' % self.string



class Configuration:
    """
    Class used to parse configuration file and create and modbus
    datastore.

    The format of the configuration file is actually just a
    python pickle, which is a compressed memory dump from
    the scraper.
    """

    def __init__(self, config):
        """
        Trys to load a configuration file, lets the file not
        found exception fall through

        :param config: The pickled datastore
        """
        try:
            self.file = open(config, "rb")
        except Exception as e:
            _logger.critical(str(e))
            raise ConfigurationException("File not found %s" % config)

    def parse(self):
        """ Parses the config file and creates a server context
        """
        handle = pickle.load(self.file)
        try:  # test for existance, or bomb
            dsd = handle['di']
            csd = handle['ci']
            hsd = handle['hr']
            isd = handle['ir']
        except Exception:
            raise ConfigurationException("Invalid Configuration")
        slave = ModbusSlaveContext(d=dsd, c=csd, h=hsd, i=isd)
        return ModbusServerContext(slaves=slave)

# -------------------------------------------------------------------------- #
# Main start point
# -------------------------------------------------------------------------- #


def main():
    """ Server launcher """
    parser = OptionParser()
    parser.add_option("-c", "--conf",
                      help="The configuration file to load",
                      dest="file")
    parser.add_option("-D", "--debug",
                      help="Turn on to enable tracing",
                      action="store_true", dest="debug", default=False)
    (opt, arg) = parser.parse_args()

    # enable debugging information
    if opt.debug:
        try:
            server_log.setLevel(logging.DEBUG)
            protocol_log.setLevel(logging.DEBUG)
        except Exception as e:
            print("Logging is not supported on this system")

    # parse configuration file and run
    try:
        conf = Configuration(opt.file)
        StartTcpServer(context=conf.parse())
    except ConfigurationException as err:
        print(err)
        parser.print_help()

# -------------------------------------------------------------------------- #
# Main jumper
# -------------------------------------------------------------------------- #


if __name__ == "__main__":
    if root_test():
        main()
    else:
        print("This script must be run as root!")