File: test_bpf.py

package info (click to toggle)
drgn 0.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 7,852 kB
  • sloc: python: 74,992; ansic: 54,589; awk: 423; makefile: 351; sh: 99
file content (118 lines) | stat: -rw-r--r-- 4,038 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# Copyright (c) Meta Platforms, Inc. and affiliates.
# SPDX-License-Identifier: LGPL-2.1-or-later

import contextlib
import errno
import os
from unittest import SkipTest

from tests.linux_kernel.bpf import (
    BPF_LD_MAP_FD,
    BPF_MAP_TYPE_HASH,
    BPF_PROG_TYPE_KPROBE,
    BPF_PROG_TYPE_SOCKET_FILTER,
    BPF_REG_0,
    bpf_map_create,
    bpf_map_get_info_by_fd,
    bpf_prog_get_info_by_fd,
    bpf_prog_ids,
    bpf_prog_load,
)
from tests.linux_kernel.crash_commands import CrashCommandTestCase
from tests.linux_kernel.helpers.test_bpf import BpfTestCase


class TestBpf(CrashCommandTestCase, BpfTestCase):
    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        # This command isn't supported before Linux kernel commits dc4bb0e23561
        # ("bpf: Introduce bpf_prog ID") and f3f1c054c288 ("bpf: Introduce bpf_map
        # ID") (in v4.13).
        try:
            next(bpf_prog_ids())
        except StopIteration:
            # Kernel supports BPF IDs but no programs exist yet
            pass
        except OSError as e:
            if e.errno != errno.EINVAL:
                raise
            raise SkipTest("This kernel version doesn't support BPF object IDs")

    def test_bpf(self):
        with contextlib.ExitStack() as exit_stack:
            map_fd = bpf_map_create(BPF_MAP_TYPE_HASH, 8, 8, 8)
            exit_stack.callback(os.close, map_fd)

            prog1_fd = bpf_prog_load(BPF_PROG_TYPE_KPROBE, self.INSNS, b"GPL")
            exit_stack.callback(os.close, prog1_fd)

            prog2_insns = BPF_LD_MAP_FD(BPF_REG_0, map_fd) + self.INSNS
            prog2_fd = bpf_prog_load(BPF_PROG_TYPE_SOCKET_FILTER, prog2_insns, b"GPL")
            exit_stack.callback(os.close, prog2_fd)

            map_id = bpf_map_get_info_by_fd(map_fd).id
            prog1_id = bpf_prog_get_info_by_fd(prog1_fd).id
            prog2_info = bpf_prog_get_info_by_fd(prog2_fd)
            prog2_id = prog2_info.id
            prog2_tag = "".join(f"{b:02x}" for b in prog2_info.tag)

            cmd = self.check_crash_command("bpf")

            self.assertIn("BPF_PROG_TYPE", cmd.stdout)
            self.assertRegex(
                cmd.stdout,
                rf"(?sm)BPF_PROG.*^\s*{prog1_id}\s+.*KPROBE\s+[0-9a-f]{{16}}\s*$",
            )
            self.assertRegex(
                cmd.stdout,
                rf"(?sm)BPF_PROG.*^\s*{prog2_id}\s+.*SOCKET_FILTER\s+.*{prog2_tag}\s+{map_id}$",
            )
            self.assertIn("BPF_MAP_TYPE", cmd.stdout)
            self.assertRegex(
                cmd.stdout,
                rf"(?sm)BPF_MAP.*^\s*{map_id}\s+.*HASH\s+",
            )

    def test_bpf_program_by_id(self):
        with contextlib.ExitStack() as exit_stack:
            prog_fd = bpf_prog_load(BPF_PROG_TYPE_KPROBE, self.INSNS, b"GPL")
            exit_stack.callback(os.close, prog_fd)

            prog_info = bpf_prog_get_info_by_fd(prog_fd)
            prog_id = prog_info.id
            prog_tag = "".join(f"{b:02x}" for b in prog_info.tag)

            cmd = self.check_crash_command(f"bpf -p {prog_id}")

            self.assertRegex(
                cmd.stdout,
                rf"(?sm)ID\s+.*BPF_PROG_TYPE.*^\s*{prog_id}\s+.*KPROBE\s+.*{prog_tag}\s*$",
            )

            self.assertRegex(
                cmd.stdout,
                r"(?sm)^\s*XLATED:\s*\d+\s+JITED:\s*\d+\s+MEMLOCK:\s*\d+",
            )

            self.assertRegex(
                cmd.stdout,
                r"(?sm)^\s*LOAD_TIME:\s*((\w+\s+){3}\d{2}:\d{2}:\d{2}\s+\d{4}|\(unknown\))",
            )

            self.assertRegex(
                cmd.stdout,
                r"(?sm)^\s*GPL_COMPATIBLE:\s*(yes|no)\s+NAME:\s*(\(unused\)|\(unknown\)|\S+)\s+UID:\s*\d+",
            )

    def test_bpf_program_by_id_invalid(self):
        self.assertRaises(
            Exception,
            self.check_crash_command,
            "bpf -p 1a",
        )
        self.assertRaises(
            Exception,
            self.check_crash_command,
            "bpf -p 1$^7",
        )