File: test_v3-trap.py

package info (click to toggle)
python-pysnmp4 7.1.21-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,564 kB
  • sloc: python: 33,654; makefile: 166; javascript: 4
file content (123 lines) | stat: -rw-r--r-- 5,015 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import asyncio
import socket
import pytest


from pysnmp.hlapi.v3arch.asyncio import *
from tests.manager_context import MANAGER_PORT, ManagerContextManager


@pytest.mark.asyncio
async def test_send_v3_trap_notification():
    async with ManagerContextManager() as (_, message_count):
        # snmptrap -v3 -l authPriv -u usr-md5-des -A authkey1 -X privkey1 -e 8000000001020304 localhost:MANAGER_PORT 0 1.3.6.1.6.3.1.1.5.1 1.3.6.1.2.1.1.1.0 s "my system"
        snmpEngine = SnmpEngine(OctetString(hexValue="8000000001020304"))
        errorIndication, errorStatus, errorIndex, varBinds = await send_notification(
            snmpEngine,
            UsmUserData("usr-md5-des", "authkey1", "privkey1"),
            await UdpTransportTarget.create(("localhost", MANAGER_PORT)),
            ContextData(),
            "trap",
            NotificationType(ObjectIdentity("IF-MIB", "linkDown")),
        )

        snmpEngine.close_dispatcher()
        await asyncio.sleep(1)
        assert message_count == [1]


@pytest.mark.asyncio
async def test_send_v3_trap_notification_none():
    async with ManagerContextManager() as (_, message_count):
        # snmptrap -v3 -l noAuthNoPriv -u usr-none-none -e 8000000001020305 localhost:MANAGER_PORT 0 1.3.6.1.6.3.1.1.5.1 1.3.6.1.2.1.1.1.0 s "my system"
        snmpEngine = SnmpEngine(OctetString(hexValue="8000000001020305"))
        errorIndication, errorStatus, errorIndex, varBinds = await send_notification(
            snmpEngine,
            UsmUserData("usr-none-none", None, None),
            await UdpTransportTarget.create(("localhost", MANAGER_PORT)),
            ContextData(),
            "trap",
            NotificationType(ObjectIdentity("IF-MIB", "linkDown")),
        )

        snmpEngine.close_dispatcher()
        await asyncio.sleep(1)
        assert message_count == [1]


@pytest.mark.asyncio
async def test_send_v3_trap_notification_invalid_user():
    async with ManagerContextManager() as (_, message_count):
        # snmptrap -v3 -l authPriv -u usr-md5-des -A authkey1 -X privkey1 -e 8000000001020304 localhost:MANAGER_PORT 0 1.3.6.1.6.3.1.1.5.1 1.3.6.1.2.1.1.1.0 s "my system"
        snmpEngine = SnmpEngine(OctetString(hexValue="8000000001020304"))
        errorIndication, errorStatus, errorIndex, varBinds = await send_notification(
            snmpEngine,
            UsmUserData("usr-md5-des1", "authkey1", "privkey1"),
            await UdpTransportTarget.create(("localhost", MANAGER_PORT)),
            ContextData(),
            "trap",
            NotificationType(ObjectIdentity("IF-MIB", "linkDown")),
        )

        snmpEngine.close_dispatcher()
        await asyncio.sleep(1)
        assert message_count == [0]


hex_dump = """
30 81 BC 02  01 03 30 11  02 04 61 03  A8 E3 02 03
00 FF E3 04  01 03 02 01  03 04 3A 30  38 04 08 80
00 00 00 01  02 03 04 02  01 01 02 04  05 35 84 3C
04 0B 75 73  72 2D 6D 64  35 2D 64 65  73 04 0C 36
04 72 CD 9B  57 02 49 45  C2 86 F6 04  08 00 00 00
0C CB C9 9A  08 04 68 2B  7D 70 58 59  12 E1 9F CC
A7 74 AD D7  1F 89 D4 BD  6B 8A D4 93  50 E5 31 82
98 45 C5 2A  23 CA 0A D2  3B A0 CF 59  C4 96 58 D3
CC 1A 8C 8F  87 82 FC 27  E4 AC 6B 54  8A 28 B9 D3
FA CD 92 E6  62 0C FB 65  42 62 9E CE  76 34 A9 02
3C CF 3A 05  7C 1F 3C B2  D3 0E B4 2F  8A 66 CB B4
10 66 A8 F4  A9 E3 D8 3C  44 BE 28 52  AC 70 27
"""
# copied from snmptrap -d -v3 -l authPriv -u usr-md5-des -A authkey1 -X privkey1 -e 8000000001020304 localhost:1622 0 1.3.6.1.6.3.1.1.5.1 1.3.6.1.2.1.1.1.0 s "my system"


def hex_dump_to_bytes(hex_dump):
    # Split the hex dump into lines
    lines = hex_dump.strip().split("\n")

    # Extract the hex bytes from each line
    hex_bytes = []
    for line in lines:
        # Skip the beginning of the line (offset) and drop the end of the line (non-hex characters)
        hex_part = line.replace(" ", "")
        hex_bytes.append(hex_part)

    # Join all hex bytes into a single string
    hex_string = "".join(hex_bytes)

    # Convert the hex string to a byte array
    return bytes.fromhex(hex_string)


@pytest.mark.asyncio
async def test_send_v3_trap_notification_raw():
    async with ManagerContextManager() as (_, message_count):
        raw_bytes = hex_dump_to_bytes(hex_dump)
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.sendto(raw_bytes, ("localhost", MANAGER_PORT))
        await asyncio.sleep(1)
        assert message_count == [1]


@pytest.mark.asyncio
async def test_send_v3_trap_notification_invalid_msg_flags():
    async with ManagerContextManager() as (_, message_count):
        raw_bytes = bytearray(hex_dump_to_bytes(hex_dump))
        assert raw_bytes[21] == 0x03  # check the original value
        raw_bytes[
            21
        ] = 0x02  # manually changed line 2, "04  01 03" to "04  01 02" to make the message invalid
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.sendto(raw_bytes, ("localhost", MANAGER_PORT))
        await asyncio.sleep(1)
        assert message_count == [0]