File: netlink_monitor.py

package info (click to toggle)
wsdd 2%3A0.8-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 408 kB
  • sloc: python: 1,560; sh: 259; makefile: 21; xml: 14
file content (102 lines) | stat: -rwxr-xr-x 2,886 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/python3

# Not really a test case, but a PoC for getting notified about changes in
# network addreses on Linux using netlink sockets.

import socket
import struct

# from rtnetlink.h
RTMGRP_LINK = 1
RTMGRP_IPV4_IFADDR = 0x10
RTMGRP_IPV6_IFADDR = 0x100

RTM_NEWADDR = 20
RTM_DELADDR = 21
RTM_GETADDR = 22

# from netlink.h
NLM_F_REQUEST = 0x01

NLM_F_ROOT = 0x100
NLM_F_MATCH = 0x200
NLM_F_DUMP = NLM_F_ROOT | NLM_F_MATCH

# from if_addr.h
IFA_ADDRESS = 1
IFA_LOCAL = 2
IFA_LABEL = 3
IFA_FLAGS = 8

# self_defines

NLM_HDR_LEN = 16
NLM_HDR_ALIGNTO = 4

IFA_MSG_LEN = 8

# hardcoded as 4 in rtnetlink.h
RTA_ALIGNTO = 4


def align_to(x, n):
    return ((x + n - 1) // n) * n


s = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, socket.NETLINK_ROUTE)
s.bind((0, RTMGRP_LINK | RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR))

kernel = (0, 0)
req = struct.pack('@IHHIIB', NLM_HDR_LEN + 1, RTM_GETADDR,
                  NLM_F_REQUEST | NLM_F_DUMP, 1, 0, socket.AF_PACKET)

s.sendto(req, kernel)

while True:
    buf, src = s.recvfrom(4096)

    offset = 0
    while offset < len(buf):
        (h_len, h_type, h_flags, _, _) = struct.unpack_from(
            '@IHHII', buf, offset)

        msg_len = h_len - NLM_HDR_LEN
        if msg_len < 0:
            # print('invalid message size')
            break

        if h_type != RTM_NEWADDR and h_type != RTM_DELADDR:
            offset += align_to(msg_len, NLM_HDR_ALIGNTO)
            # print('not interested in message type ', h_type)
            # print('new offset: ', offset)
            continue

        offset += NLM_HDR_LEN
        # decode ifaddrmsg as in rtnetlink.h
        ifa_family, _, ifa_flags, ifa_scope, ifa_idx = struct.unpack_from(
            '@BBBBI', buf, offset)

        ifa_name = ''
        addr = ''
        # look for some details in attributes
        i = offset + IFA_MSG_LEN
        while i - offset < msg_len:
            attr_len, attr_type = struct.unpack_from('HH', buf, i)
            if attr_type == IFA_LABEL:
                ifa_name, = struct.unpack_from(str(attr_len - 4 - 1) + 's',
                                               buf, i + 4)
            elif attr_type == IFA_LOCAL and ifa_family == socket.AF_INET:
                b = buf[i + 4:i + 4 + 4]
                addr = socket.inet_ntop(socket.AF_INET, b)
            elif attr_type == IFA_ADDRESS and ifa_family == socket.AF_INET6:
                b = buf[i + 4:i + 4 + 16]
                addr = socket.inet_ntop(socket.AF_INET6, b)
            elif attr_type == IFA_FLAGS:
                _, ifa_flags = struct.unpack_from('HI', buf, i)
            i += align_to(attr_len, RTA_ALIGNTO)

        msg_type = 'NEW' if h_type == RTM_NEWADDR else 'DEL'
        print('{} addr on interface {} {} [{}]: {}'.format(msg_type, ifa_name,
              ifa_idx, hex(ifa_flags), addr))

        offset += align_to(msg_len, NLM_HDR_ALIGNTO)