File: pymodbus_plugin.py

package info (click to toggle)
pymodbus 2.1.0%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 2,708 kB
  • sloc: python: 17,594; makefile: 84; sh: 8
file content (63 lines) | stat: -rw-r--r-- 1,960 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
"""
"""
from zope.interface import implements

from twisted.python import usage
from twisted.plugin import IPlugin
from twisted.application.service import IServiceMaker
from twisted.application import internet

from pymodbus.constants import Defaults
from pymodbus.server.asynchronous import ModbusServerFactory
from pymodbus.transaction import ModbusSocketFramer
from pymodbus.internal.ptwisted import InstallManagementConsole

class Options(usage.Options):
    """
    The following are the options available to the
    pymodbus server.
    """
    optParameters = [
        ["port", "p", Defaults.Port, "The port number to listen on."],
        ["type", "t", "tcp", "The type of server to host (tcp, udp, ascii, rtu)"],
        ["store", "s", "./datastore", "The pickled datastore to use"],
        ["console", "c", False, "Should the management console be started"],
    ]

class ModbusServiceMaker(object):
    """
    A helper class used to build a twisted plugin
    """
    implements(IServiceMaker, IPlugin)
    tapname = "pymodbus"
    description = "A modbus server"
    options = Options

    def makeService(self, options):
        """
        Construct a service from the given options
        """
        if options["type"] == "tcp":
            server = internet.TCPServer
        else: server = internet.UDPServer


        framer = ModbusSocketFramer
        context = self._build_context(options['store'])
        factory = ModbusServerFactory(None, framer)
        if options['console']:
            InstallManagementConsole({ 'server' : factory })
        return server(int(options["port"]), factory)

    def _build_context(self, path):
        """
        A helper method to unpickle a datastore,
        note, this should be a ModbusServerContext.
        """
        import pickle
        try:
            context = pickle.load(path)
        except Exception: context = None
        return context

serviceMaker = ModbusServiceMaker()