1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
|
#
# This file is part of pysnmp software.
#
# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
# License: https://www.pysnmp.com/pysnmp/license.html
#
import os
import shutil
import sys
import tempfile
import warnings
from typing import Any, Dict
from pyasn1.type import univ
from pysnmp import debug, error
from pysnmp.carrier.base import AbstractTransportAddress, AbstractTransportDispatcher
from pysnmp.entity import observer
from pysnmp.proto.acmod import rfc3415, void
from pysnmp.proto.mpmod.base import AbstractMessageProcessingModel
from pysnmp.proto.mpmod.rfc2576 import (
SnmpV1MessageProcessingModel,
SnmpV2cMessageProcessingModel,
)
from pysnmp.proto.mpmod.rfc3412 import SnmpV3MessageProcessingModel
from pysnmp.proto.rfc1902 import OctetString
from pysnmp.proto.rfc3412 import MsgAndPduDispatcher
from pysnmp.proto.secmod.base import AbstractSecurityModel
from pysnmp.proto.secmod.rfc2576 import SnmpV1SecurityModel, SnmpV2cSecurityModel
from pysnmp.proto.secmod.rfc3414 import SnmpUSMSecurityModel
from pysnmp.smi.builder import MibBuilder
__all__ = ["SnmpEngine"]
class SnmpEngine:
"""Creates SNMP engine object.
SNMP engine object is central in SNMP v3 architecture. It is an umbrella
object that coordinates interactions between all parts of SNMP v3 system.
See :RFC:`3412#section-2.1` (where it is termed *The Dispatcher*).
With PySNMP design, `SnmpEngine` is the only stateful object, all SNMP
v3 operations require an instance of SNMP engine. Users do not normally
request services directly from `SnmpEngine`, but pass it around to
other PySNMP interfaces.
It is possible to run multiple instances of `SnmpEngine` in the
application. In a multithreaded environment, each thread that
works with SNMP must have its own `SnmpEngine` instance.
Parameters
----------
snmpEngineID : :py:class:`~pysnmp.proto.rfc1902.OctetString`
Unique and unambiguous identifier of an SNMP engine.
If not given, `snmpEngineID` is autogenerated and stored on
the filesystem. See :RFC:`3411#section-3.1.1` for details.
Examples
--------
>>> SnmpEngine()
SnmpEngine(snmpEngineID=OctetString(hexValue='0x80004fb80567726f6d6d69742'))
>>>
"""
transport_dispatcher: "AbstractTransportDispatcher"
message_dispatcher: MsgAndPduDispatcher
engine_id: OctetString
cache: Dict[str, Any]
security_models: Dict[int, AbstractSecurityModel]
message_processing_subsystems: Dict[univ.Integer, AbstractMessageProcessingModel]
access_control_model: Dict[int, "void.Vacm | rfc3415.Vacm"]
def __init__(
self,
snmpEngineID: "OctetString | None" = None,
maxMessageSize: int = 65507,
msgAndPduDsp: "MsgAndPduDispatcher | None" = None,
):
"""Create an SNMP engine object."""
self.cache = {}
self.observer = observer.MetaObserver()
if msgAndPduDsp is None:
self.message_dispatcher = MsgAndPduDispatcher()
else:
self.message_dispatcher = msgAndPduDsp
self.message_processing_subsystems = {
SnmpV1MessageProcessingModel.MESSAGE_PROCESSING_MODEL_ID: SnmpV1MessageProcessingModel(),
SnmpV2cMessageProcessingModel.MESSAGE_PROCESSING_MODEL_ID: SnmpV2cMessageProcessingModel(),
SnmpV3MessageProcessingModel.MESSAGE_PROCESSING_MODEL_ID: SnmpV3MessageProcessingModel(),
}
self.security_models = {
SnmpV1SecurityModel.SECURITY_MODEL_ID: SnmpV1SecurityModel(),
SnmpV2cSecurityModel.SECURITY_MODEL_ID: SnmpV2cSecurityModel(),
SnmpUSMSecurityModel.SECURITY_MODEL_ID: SnmpUSMSecurityModel(),
}
self.access_control_model: dict[int, "void.Vacm | rfc3415.Vacm"] = {
void.Vacm.ACCESS_MODEL_ID: void.Vacm(),
rfc3415.Vacm.ACCESS_MODEL_ID: rfc3415.Vacm(),
}
self.transport_dispatcher = None # type: ignore
if self.message_dispatcher.mib_instrum_controller is None:
raise error.PySnmpError("MIB instrumentation does not yet exist")
(
snmpEngineMaxMessageSize,
) = self.get_mib_builder().import_symbols( # type: ignore
"__SNMP-FRAMEWORK-MIB", "snmpEngineMaxMessageSize"
)
snmpEngineMaxMessageSize.syntax = snmpEngineMaxMessageSize.syntax.clone(
maxMessageSize
)
(snmpEngineBoots,) = self.get_mib_builder().import_symbols( # type: ignore
"__SNMP-FRAMEWORK-MIB", "snmpEngineBoots"
)
snmpEngineBoots.syntax += 1
(origSnmpEngineID,) = self.get_mib_builder().import_symbols( # type: ignore
"__SNMP-FRAMEWORK-MIB", "snmpEngineID"
)
if snmpEngineID is None:
self.snmpEngineID = origSnmpEngineID.syntax
else:
origSnmpEngineID.syntax = origSnmpEngineID.syntax.clone(snmpEngineID)
self.snmpEngineID = origSnmpEngineID.syntax
debug.logger & debug.FLAG_APP and debug.logger(
"SnmpEngine: using custom SNMP Engine ID: %s"
% self.snmpEngineID.prettyPrint()
)
# Attempt to make some of snmp Engine settings persistent.
# This should probably be generalized as a non-volatile MIB store.
persistentPath = os.path.join(
tempfile.gettempdir(), "__pysnmp", self.snmpEngineID.prettyPrint()
)
debug.logger & debug.FLAG_APP and debug.logger(
"SnmpEngine: using persistent directory: %s" % persistentPath
)
if not os.path.exists(persistentPath):
try:
os.makedirs(persistentPath)
except OSError:
return
f = os.path.join(persistentPath, "boots")
try:
snmpEngineBoots.syntax = snmpEngineBoots.syntax.clone(open(f).read())
except Exception:
pass
try:
snmpEngineBoots.syntax += 1
except Exception:
snmpEngineBoots.syntax = snmpEngineBoots.syntax.clone(1)
try:
fd, fn = tempfile.mkstemp(dir=persistentPath)
os.write(fd, snmpEngineBoots.syntax.prettyPrint().encode("iso-8859-1"))
os.close(fd)
shutil.move(fn, f)
except Exception:
debug.logger & debug.FLAG_APP and debug.logger(
"SnmpEngine: could not stored SNMP Engine Boots: %s"
% sys.exc_info()[1]
)
else:
debug.logger & debug.FLAG_APP and debug.logger(
"SnmpEngine: stored SNMP Engine Boots: %s"
% snmpEngineBoots.syntax.prettyPrint()
)
def __repr__(self):
"""Return a string representation of the SNMP engine object."""
return f"{self.__class__.__name__}(snmpEngineID={self.snmpEngineID!r})"
def _close(self):
"""
Close the SNMP engine to test memory leak.
This method is intended for unit testing purposes only.
It closes the SNMP engine and checks if all associated resources are released.
"""
for securityModel in self.security_models.values():
securityModel._close()
def open_dispatcher(self, timeout: float = 0):
"""
Open the dispatcher used by SNMP engine.
This method is called when SNMP engine is ready to process SNMP
messages. It opens the dispatcher and starts processing incoming
messages.
"""
if self.transport_dispatcher:
self.transport_dispatcher.run_dispatcher(timeout)
def close_dispatcher(self):
"""
Close the dispatcher used by SNMP engine.
This method is called when SNMP engine is no longer needed. It
releases all resources allocated by the engine.
"""
if self.transport_dispatcher:
self.transport_dispatcher.close_dispatcher()
self.unregister_transport_dispatcher()
# Transport dispatcher bindings
def __receive_message_callback(
self,
transportDispatcher: AbstractTransportDispatcher,
transportDomain: "tuple[int, ...]",
transportAddress: AbstractTransportAddress,
wholeMsg,
):
self.message_dispatcher.receive_message(
self, transportDomain, transportAddress, wholeMsg
)
def __receive_timer_tick_callback(self, timeNow: float):
self.message_dispatcher.receive_timer_tick(self, timeNow)
for mpHandler in self.message_processing_subsystems.values():
mpHandler.receive_timer_tick(self, timeNow)
for smHandler in self.security_models.values():
smHandler.receive_timer_tick(self, timeNow)
def register_transport_dispatcher(
self,
transportDispatcher: AbstractTransportDispatcher,
recvId: "tuple[int, ...] | str | None" = None,
):
"""Register transport dispatcher."""
if (
self.transport_dispatcher is not None
and self.transport_dispatcher is not transportDispatcher
):
raise error.PySnmpError("Transport dispatcher already registered")
transportDispatcher.register_recv_callback(
self.__receive_message_callback, recvId
)
if self.transport_dispatcher is None:
transportDispatcher.register_timer_callback(
self.__receive_timer_tick_callback
)
self.transport_dispatcher = transportDispatcher
def unregister_transport_dispatcher(self, recvId: "tuple[int, ...] | None" = None):
"""Remove transport dispatcher."""
if self.transport_dispatcher is None:
raise error.PySnmpError("Transport dispatcher not registered")
self.transport_dispatcher.unregister_recv_callback(recvId)
self.transport_dispatcher.unregister_timer_callback()
self.transport_dispatcher = None # type: ignore
def get_mib_builder(self) -> MibBuilder:
"""Get MIB builder."""
return self.message_dispatcher.mib_instrum_controller.get_mib_builder()
# User app may attach opaque objects to SNMP Engine
def set_user_context(self, **kwargs):
"""Attach user context to the SNMP engine."""
self.cache.update({"__%s" % k: kwargs[k] for k in kwargs})
def get_user_context(self, arg) -> "dict[str, Any] | None":
"""Get user context."""
return self.cache.get("__%s" % arg)
def delete_user_context(self, arg):
"""Delete user context."""
try:
del self.cache["__%s" % arg]
except KeyError:
pass
def __enter__(self, *args, **kwargs):
"""Open the SNMP engine."""
return self
def __exit__(self, *args, **kwargs):
"""Close the SNMP engine."""
if self.transport_dispatcher:
self.transport_dispatcher.close_dispatcher()
# compatibility with legacy code
# Old to new attribute mapping
deprecated_attributes = {
"transportDispatcher": "transport_dispatcher",
"openDispatcher": "open_dispatcher",
"closeDispatcher": "close_dispatcher",
"msgAndPduDsp": "message_dispatcher",
}
def __getattr__(self, attr: str):
"""Handle deprecated attributes."""
if new_attr := self.deprecated_attributes.get(attr):
warnings.warn(
f"{attr} is deprecated. Please use {new_attr} instead.",
DeprecationWarning,
stacklevel=2,
)
return getattr(self, new_attr)
raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute '{attr}'"
)
|