1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
|
# Copyright (c) Meta Platforms, Inc. and affiliates.
# SPDX-License-Identifier: LGPL-2.1-or-later
import contextlib
import errno
import os
from unittest import SkipTest
from tests.linux_kernel.bpf import (
BPF_LD_MAP_FD,
BPF_MAP_TYPE_HASH,
BPF_PROG_TYPE_KPROBE,
BPF_PROG_TYPE_SOCKET_FILTER,
BPF_REG_0,
bpf_map_create,
bpf_map_get_info_by_fd,
bpf_prog_get_info_by_fd,
bpf_prog_ids,
bpf_prog_load,
)
from tests.linux_kernel.crash_commands import CrashCommandTestCase
from tests.linux_kernel.helpers.test_bpf import BpfTestCase
class TestBpf(CrashCommandTestCase, BpfTestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
# This command isn't supported before Linux kernel commits dc4bb0e23561
# ("bpf: Introduce bpf_prog ID") and f3f1c054c288 ("bpf: Introduce bpf_map
# ID") (in v4.13).
try:
next(bpf_prog_ids())
except StopIteration:
# Kernel supports BPF IDs but no programs exist yet
pass
except OSError as e:
if e.errno != errno.EINVAL:
raise
raise SkipTest("This kernel version doesn't support BPF object IDs")
def test_bpf(self):
with contextlib.ExitStack() as exit_stack:
map_fd = bpf_map_create(BPF_MAP_TYPE_HASH, 8, 8, 8)
exit_stack.callback(os.close, map_fd)
prog1_fd = bpf_prog_load(BPF_PROG_TYPE_KPROBE, self.INSNS, b"GPL")
exit_stack.callback(os.close, prog1_fd)
prog2_insns = BPF_LD_MAP_FD(BPF_REG_0, map_fd) + self.INSNS
prog2_fd = bpf_prog_load(BPF_PROG_TYPE_SOCKET_FILTER, prog2_insns, b"GPL")
exit_stack.callback(os.close, prog2_fd)
map_id = bpf_map_get_info_by_fd(map_fd).id
prog1_id = bpf_prog_get_info_by_fd(prog1_fd).id
prog2_info = bpf_prog_get_info_by_fd(prog2_fd)
prog2_id = prog2_info.id
prog2_tag = "".join(f"{b:02x}" for b in prog2_info.tag)
cmd = self.check_crash_command("bpf")
self.assertIn("BPF_PROG_TYPE", cmd.stdout)
self.assertRegex(
cmd.stdout,
rf"(?sm)BPF_PROG.*^\s*{prog1_id}\s+.*KPROBE\s+[0-9a-f]{{16}}\s*$",
)
self.assertRegex(
cmd.stdout,
rf"(?sm)BPF_PROG.*^\s*{prog2_id}\s+.*SOCKET_FILTER\s+.*{prog2_tag}\s+{map_id}$",
)
self.assertIn("BPF_MAP_TYPE", cmd.stdout)
self.assertRegex(
cmd.stdout,
rf"(?sm)BPF_MAP.*^\s*{map_id}\s+.*HASH\s+",
)
def test_bpf_program_by_id(self):
with contextlib.ExitStack() as exit_stack:
prog_fd = bpf_prog_load(BPF_PROG_TYPE_KPROBE, self.INSNS, b"GPL")
exit_stack.callback(os.close, prog_fd)
prog_info = bpf_prog_get_info_by_fd(prog_fd)
prog_id = prog_info.id
prog_tag = "".join(f"{b:02x}" for b in prog_info.tag)
cmd = self.check_crash_command(f"bpf -p {prog_id}")
self.assertRegex(
cmd.stdout,
rf"(?sm)ID\s+.*BPF_PROG_TYPE.*^\s*{prog_id}\s+.*KPROBE\s+.*{prog_tag}\s*$",
)
self.assertRegex(
cmd.stdout,
r"(?sm)^\s*XLATED:\s*\d+\s+JITED:\s*\d+\s+MEMLOCK:\s*\d+",
)
self.assertRegex(
cmd.stdout,
r"(?sm)^\s*LOAD_TIME:\s*((\w+\s+){3}\d{2}:\d{2}:\d{2}\s+\d{4}|\(unknown\))",
)
self.assertRegex(
cmd.stdout,
r"(?sm)^\s*GPL_COMPATIBLE:\s*(yes|no)\s+NAME:\s*(\(unused\)|\(unknown\)|\S+)\s+UID:\s*\d+",
)
def test_bpf_program_by_id_invalid(self):
self.assertRaises(
Exception,
self.check_crash_command,
"bpf -p 1a",
)
self.assertRaises(
Exception,
self.check_crash_command,
"bpf -p 1$^7",
)
|