File: cache.py

package info (click to toggle)
python-pysnmp4 7.1.21-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 3,564 kB
  • sloc: python: 33,654; makefile: 166; javascript: 4
file content (113 lines) | stat: -rw-r--r-- 4,368 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#
# This file is part of pysnmp software.
#
# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
# License: https://www.pysnmp.com/pysnmp/license.html
#
from pysnmp import nextid
from pysnmp.proto import error


class Cache:
    """SNMP message cache."""

    __state_reference = nextid.Integer(0xFFFFFF)
    __message_id = nextid.Integer(0xFFFFFF)

    def __init__(self):
        """Create a cache object."""
        self.__msgIdIndex = {}
        self.__stateReferenceIndex = {}
        self.__sendPduHandleIdx = {}
        # Message expiration mechanics
        self.__expirationQueue = {}
        self.__expirationTimer = 0

    # Server mode cache handling

    def new_state_reference(self):
        """Generate a new state reference."""
        return self.__state_reference()

    def push_by_state_reference(self, stateReference, **msgInfo):
        """Push state information."""
        if stateReference in self.__stateReferenceIndex:
            raise error.ProtocolError(
                f"Cache dup for stateReference={stateReference} at {self}"
            )
        expireAt = self.__expirationTimer + 600
        self.__stateReferenceIndex[stateReference] = msgInfo, expireAt

        # Schedule to expire
        if expireAt not in self.__expirationQueue:
            self.__expirationQueue[expireAt] = {}
        if "stateReference" not in self.__expirationQueue[expireAt]:
            self.__expirationQueue[expireAt]["stateReference"] = {}
        self.__expirationQueue[expireAt]["stateReference"][stateReference] = 1

    def pop_by_state_reference(self, stateReference):
        """Release state information."""
        if stateReference in self.__stateReferenceIndex:
            cacheInfo = self.__stateReferenceIndex[stateReference]
        else:
            raise error.ProtocolError(
                f"Cache miss for stateReference={stateReference} at {self}"
            )
        del self.__stateReferenceIndex[stateReference]
        cacheEntry, expireAt = cacheInfo
        del self.__expirationQueue[expireAt]["stateReference"][stateReference]
        return cacheEntry

    # Client mode cache handling

    def new_message_id(self):
        """Generate a new message ID."""
        return self.__message_id()

    def push_by_message_id(self, msgId, **msgInfo):
        """Cache state information."""
        if msgId in self.__msgIdIndex:
            raise error.ProtocolError(f"Cache dup for msgId={msgId} at {self}")
        expireAt = self.__expirationTimer + 600
        self.__msgIdIndex[msgId] = msgInfo, expireAt

        self.__sendPduHandleIdx[msgInfo["sendPduHandle"]] = msgId

        # Schedule to expire
        if expireAt not in self.__expirationQueue:
            self.__expirationQueue[expireAt] = {}
        if "msgId" not in self.__expirationQueue[expireAt]:
            self.__expirationQueue[expireAt]["msgId"] = {}
        self.__expirationQueue[expireAt]["msgId"][msgId] = 1

    def pop_by_message_id(self, msgId):
        """Release state information."""
        if msgId in self.__msgIdIndex:
            cacheInfo = self.__msgIdIndex[msgId]
        else:
            raise error.ProtocolError(f"Cache miss for msgId={msgId} at {self}")
        msgInfo, expireAt = cacheInfo
        del self.__sendPduHandleIdx[msgInfo["sendPduHandle"]]
        del self.__msgIdIndex[msgId]
        cacheEntry, expireAt = cacheInfo
        del self.__expirationQueue[expireAt]["msgId"][msgId]
        return cacheEntry

    def pop_by_send_pdu_handle(self, sendPduHandle):
        """Release state information."""
        if sendPduHandle in self.__sendPduHandleIdx:
            self.pop_by_message_id(self.__sendPduHandleIdx[sendPduHandle])

    def expire_caches(self):
        """Expire pending messages."""
        # Uses internal clock to expire pending messages
        if self.__expirationTimer in self.__expirationQueue:
            cacheInfo = self.__expirationQueue[self.__expirationTimer]
            if "stateReference" in cacheInfo:
                for stateReference in cacheInfo["stateReference"]:
                    del self.__stateReferenceIndex[stateReference]
            if "msgId" in cacheInfo:
                for msgId in cacheInfo["msgId"]:
                    del self.__msgIdIndex[msgId]
            del self.__expirationQueue[self.__expirationTimer]
        self.__expirationTimer += 1