File: engine.py

package info (click to toggle)
python-pysnmp4 7.1.21-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,564 kB
  • sloc: python: 33,654; makefile: 166; javascript: 4
file content (313 lines) | stat: -rw-r--r-- 11,884 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
#
# This file is part of pysnmp software.
#
# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
# License: https://www.pysnmp.com/pysnmp/license.html
#
import os
import shutil
import sys
import tempfile
import warnings
from typing import Any, Dict


from pyasn1.type import univ
from pysnmp import debug, error
from pysnmp.carrier.base import AbstractTransportAddress, AbstractTransportDispatcher
from pysnmp.entity import observer
from pysnmp.proto.acmod import rfc3415, void
from pysnmp.proto.mpmod.base import AbstractMessageProcessingModel
from pysnmp.proto.mpmod.rfc2576 import (
    SnmpV1MessageProcessingModel,
    SnmpV2cMessageProcessingModel,
)
from pysnmp.proto.mpmod.rfc3412 import SnmpV3MessageProcessingModel
from pysnmp.proto.rfc1902 import OctetString
from pysnmp.proto.rfc3412 import MsgAndPduDispatcher
from pysnmp.proto.secmod.base import AbstractSecurityModel
from pysnmp.proto.secmod.rfc2576 import SnmpV1SecurityModel, SnmpV2cSecurityModel
from pysnmp.proto.secmod.rfc3414 import SnmpUSMSecurityModel
from pysnmp.smi.builder import MibBuilder

__all__ = ["SnmpEngine"]


class SnmpEngine:
    """Creates SNMP engine object.

    SNMP engine object is central in SNMP v3 architecture. It is an umbrella
    object that coordinates interactions between all parts of SNMP v3 system.
    See :RFC:`3412#section-2.1` (where it is termed *The Dispatcher*).

    With PySNMP design, `SnmpEngine` is the only stateful object, all SNMP
    v3 operations require an instance of SNMP engine. Users do not normally
    request services directly from `SnmpEngine`, but pass it around to
    other PySNMP interfaces.

    It is possible to run multiple instances of `SnmpEngine` in the
    application. In a multithreaded environment, each thread that
    works with SNMP must have its own `SnmpEngine` instance.

    Parameters
    ----------
    snmpEngineID : :py:class:`~pysnmp.proto.rfc1902.OctetString`
        Unique and unambiguous identifier of an SNMP engine.
        If not given, `snmpEngineID` is autogenerated and stored on
        the filesystem. See :RFC:`3411#section-3.1.1`  for details.

    Examples
    --------
    >>> SnmpEngine()
    SnmpEngine(snmpEngineID=OctetString(hexValue='0x80004fb80567726f6d6d69742'))
    >>>

    """

    transport_dispatcher: "AbstractTransportDispatcher"
    message_dispatcher: MsgAndPduDispatcher
    engine_id: OctetString
    cache: Dict[str, Any]
    security_models: Dict[int, AbstractSecurityModel]
    message_processing_subsystems: Dict[univ.Integer, AbstractMessageProcessingModel]
    access_control_model: Dict[int, "void.Vacm | rfc3415.Vacm"]

    def __init__(
        self,
        snmpEngineID: "OctetString | None" = None,
        maxMessageSize: int = 65507,
        msgAndPduDsp: "MsgAndPduDispatcher | None" = None,
    ):
        """Create an SNMP engine object."""
        self.cache = {}

        self.observer = observer.MetaObserver()

        if msgAndPduDsp is None:
            self.message_dispatcher = MsgAndPduDispatcher()
        else:
            self.message_dispatcher = msgAndPduDsp
        self.message_processing_subsystems = {
            SnmpV1MessageProcessingModel.MESSAGE_PROCESSING_MODEL_ID: SnmpV1MessageProcessingModel(),
            SnmpV2cMessageProcessingModel.MESSAGE_PROCESSING_MODEL_ID: SnmpV2cMessageProcessingModel(),
            SnmpV3MessageProcessingModel.MESSAGE_PROCESSING_MODEL_ID: SnmpV3MessageProcessingModel(),
        }
        self.security_models = {
            SnmpV1SecurityModel.SECURITY_MODEL_ID: SnmpV1SecurityModel(),
            SnmpV2cSecurityModel.SECURITY_MODEL_ID: SnmpV2cSecurityModel(),
            SnmpUSMSecurityModel.SECURITY_MODEL_ID: SnmpUSMSecurityModel(),
        }
        self.access_control_model: dict[int, "void.Vacm | rfc3415.Vacm"] = {
            void.Vacm.ACCESS_MODEL_ID: void.Vacm(),
            rfc3415.Vacm.ACCESS_MODEL_ID: rfc3415.Vacm(),
        }

        self.transport_dispatcher = None  # type: ignore

        if self.message_dispatcher.mib_instrum_controller is None:
            raise error.PySnmpError("MIB instrumentation does not yet exist")
        (
            snmpEngineMaxMessageSize,
        ) = self.get_mib_builder().import_symbols(  # type: ignore
            "__SNMP-FRAMEWORK-MIB", "snmpEngineMaxMessageSize"
        )
        snmpEngineMaxMessageSize.syntax = snmpEngineMaxMessageSize.syntax.clone(
            maxMessageSize
        )
        (snmpEngineBoots,) = self.get_mib_builder().import_symbols(  # type: ignore
            "__SNMP-FRAMEWORK-MIB", "snmpEngineBoots"
        )
        snmpEngineBoots.syntax += 1
        (origSnmpEngineID,) = self.get_mib_builder().import_symbols(  # type: ignore
            "__SNMP-FRAMEWORK-MIB", "snmpEngineID"
        )

        if snmpEngineID is None:
            self.snmpEngineID = origSnmpEngineID.syntax
        else:
            origSnmpEngineID.syntax = origSnmpEngineID.syntax.clone(snmpEngineID)
            self.snmpEngineID = origSnmpEngineID.syntax

            debug.logger & debug.FLAG_APP and debug.logger(
                "SnmpEngine: using custom SNMP Engine ID: %s"
                % self.snmpEngineID.prettyPrint()
            )

            # Attempt to make some of snmp Engine settings persistent.
            # This should probably be generalized as a non-volatile MIB store.

            persistentPath = os.path.join(
                tempfile.gettempdir(), "__pysnmp", self.snmpEngineID.prettyPrint()
            )

            debug.logger & debug.FLAG_APP and debug.logger(
                "SnmpEngine: using persistent directory: %s" % persistentPath
            )

            if not os.path.exists(persistentPath):
                try:
                    os.makedirs(persistentPath)
                except OSError:
                    return

            f = os.path.join(persistentPath, "boots")
            try:
                snmpEngineBoots.syntax = snmpEngineBoots.syntax.clone(open(f).read())
            except Exception:
                pass

            try:
                snmpEngineBoots.syntax += 1
            except Exception:
                snmpEngineBoots.syntax = snmpEngineBoots.syntax.clone(1)

            try:
                fd, fn = tempfile.mkstemp(dir=persistentPath)
                os.write(fd, snmpEngineBoots.syntax.prettyPrint().encode("iso-8859-1"))
                os.close(fd)
                shutil.move(fn, f)
            except Exception:
                debug.logger & debug.FLAG_APP and debug.logger(
                    "SnmpEngine: could not stored SNMP Engine Boots: %s"
                    % sys.exc_info()[1]
                )
            else:
                debug.logger & debug.FLAG_APP and debug.logger(
                    "SnmpEngine: stored SNMP Engine Boots: %s"
                    % snmpEngineBoots.syntax.prettyPrint()
                )

    def __repr__(self):
        """Return a string representation of the SNMP engine object."""
        return f"{self.__class__.__name__}(snmpEngineID={self.snmpEngineID!r})"

    def _close(self):
        """
        Close the SNMP engine to test memory leak.

        This method is intended for unit testing purposes only.
        It closes the SNMP engine and checks if all associated resources are released.
        """
        for securityModel in self.security_models.values():
            securityModel._close()

    def open_dispatcher(self, timeout: float = 0):
        """
        Open the dispatcher used by SNMP engine.

        This method is called when SNMP engine is ready to process SNMP
        messages. It opens the dispatcher and starts processing incoming
        messages.
        """
        if self.transport_dispatcher:
            self.transport_dispatcher.run_dispatcher(timeout)

    def close_dispatcher(self):
        """
        Close the dispatcher used by SNMP engine.

        This method is called when SNMP engine is no longer needed. It
        releases all resources allocated by the engine.
        """
        if self.transport_dispatcher:
            self.transport_dispatcher.close_dispatcher()
            self.unregister_transport_dispatcher()

    # Transport dispatcher bindings

    def __receive_message_callback(
        self,
        transportDispatcher: AbstractTransportDispatcher,
        transportDomain: "tuple[int, ...]",
        transportAddress: AbstractTransportAddress,
        wholeMsg,
    ):
        self.message_dispatcher.receive_message(
            self, transportDomain, transportAddress, wholeMsg
        )

    def __receive_timer_tick_callback(self, timeNow: float):
        self.message_dispatcher.receive_timer_tick(self, timeNow)
        for mpHandler in self.message_processing_subsystems.values():
            mpHandler.receive_timer_tick(self, timeNow)
        for smHandler in self.security_models.values():
            smHandler.receive_timer_tick(self, timeNow)

    def register_transport_dispatcher(
        self,
        transportDispatcher: AbstractTransportDispatcher,
        recvId: "tuple[int, ...] | str | None" = None,
    ):
        """Register transport dispatcher."""
        if (
            self.transport_dispatcher is not None
            and self.transport_dispatcher is not transportDispatcher
        ):
            raise error.PySnmpError("Transport dispatcher already registered")
        transportDispatcher.register_recv_callback(
            self.__receive_message_callback, recvId
        )
        if self.transport_dispatcher is None:
            transportDispatcher.register_timer_callback(
                self.__receive_timer_tick_callback
            )
            self.transport_dispatcher = transportDispatcher

    def unregister_transport_dispatcher(self, recvId: "tuple[int, ...] | None" = None):
        """Remove transport dispatcher."""
        if self.transport_dispatcher is None:
            raise error.PySnmpError("Transport dispatcher not registered")
        self.transport_dispatcher.unregister_recv_callback(recvId)
        self.transport_dispatcher.unregister_timer_callback()
        self.transport_dispatcher = None  # type: ignore

    def get_mib_builder(self) -> MibBuilder:
        """Get MIB builder."""
        return self.message_dispatcher.mib_instrum_controller.get_mib_builder()

    # User app may attach opaque objects to SNMP Engine
    def set_user_context(self, **kwargs):
        """Attach user context to the SNMP engine."""
        self.cache.update({"__%s" % k: kwargs[k] for k in kwargs})

    def get_user_context(self, arg) -> "dict[str, Any] | None":
        """Get user context."""
        return self.cache.get("__%s" % arg)

    def delete_user_context(self, arg):
        """Delete user context."""
        try:
            del self.cache["__%s" % arg]
        except KeyError:
            pass

    def __enter__(self, *args, **kwargs):
        """Open the SNMP engine."""
        return self

    def __exit__(self, *args, **kwargs):
        """Close the SNMP engine."""
        if self.transport_dispatcher:
            self.transport_dispatcher.close_dispatcher()

    # compatibility with legacy code
    # Old to new attribute mapping
    deprecated_attributes = {
        "transportDispatcher": "transport_dispatcher",
        "openDispatcher": "open_dispatcher",
        "closeDispatcher": "close_dispatcher",
        "msgAndPduDsp": "message_dispatcher",
    }

    def __getattr__(self, attr: str):
        """Handle deprecated attributes."""
        if new_attr := self.deprecated_attributes.get(attr):
            warnings.warn(
                f"{attr} is deprecated. Please use {new_attr} instead.",
                DeprecationWarning,
                stacklevel=2,
            )
            return getattr(self, new_attr)
        raise AttributeError(
            f"'{self.__class__.__name__}' object has no attribute '{attr}'"
        )