File: test_uprobes.py

package info (click to toggle)
bpfcc 0.18.0%2Bds-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 12,368 kB
  • sloc: ansic: 132,727; python: 36,226; cpp: 26,973; sh: 710; yacc: 525; makefile: 141; lex: 94
file content (140 lines) | stat: -rwxr-xr-x 4,616 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python
# Copyright (c) PLUMgrid, Inc.
# Licensed under the Apache License, Version 2.0 (the "License")

import bcc
import ctypes
import errno
import os
import subprocess
import shutil
import time
import unittest

class TestUprobes(unittest.TestCase):
    def test_simple_library(self):
        text = """
#include <uapi/linux/ptrace.h>
BPF_ARRAY(stats, u64, 1);
static void incr(int idx) {
    u64 *ptr = stats.lookup(&idx);
    if (ptr)
        ++(*ptr);
}
int count(struct pt_regs *ctx) {
    bpf_trace_printk("count() uprobe fired");
    u32 pid = bpf_get_current_pid_tgid();
    if (pid == PID)
        incr(0);
    return 0;
}"""
        test_pid = os.getpid()
        text = text.replace("PID", "%d" % test_pid)
        b = bcc.BPF(text=text)
        b.attach_uprobe(name="c", sym="malloc_stats", fn_name="count", pid=test_pid)
        b.attach_uretprobe(name="c", sym="malloc_stats", fn_name="count", pid=test_pid)
        libc = ctypes.CDLL("libc.so.6")
        libc.malloc_stats.restype = None
        libc.malloc_stats.argtypes = []
        libc.malloc_stats()
        self.assertEqual(b["stats"][ctypes.c_int(0)].value, 2)
        b.detach_uretprobe(name="c", sym="malloc_stats", pid=test_pid)
        b.detach_uprobe(name="c", sym="malloc_stats", pid=test_pid)

    def test_simple_binary(self):
        text = """
#include <uapi/linux/ptrace.h>
BPF_ARRAY(stats, u64, 1);
static void incr(int idx) {
    u64 *ptr = stats.lookup(&idx);
    if (ptr)
        ++(*ptr);
}
int count(struct pt_regs *ctx) {
    u32 pid = bpf_get_current_pid_tgid();
    incr(0);
    return 0;
}"""
        b = bcc.BPF(text=text)
        b.attach_uprobe(name="/usr/bin/python", sym="main", fn_name="count")
        b.attach_uretprobe(name="/usr/bin/python", sym="main", fn_name="count")
        with os.popen("/usr/bin/python -V") as f:
            pass
        self.assertGreater(b["stats"][ctypes.c_int(0)].value, 0)
        b.detach_uretprobe(name="/usr/bin/python", sym="main")
        b.detach_uprobe(name="/usr/bin/python", sym="main")

    def test_mount_namespace(self):
        text = """
#include <uapi/linux/ptrace.h>
BPF_TABLE("array", int, u64, stats, 1);
static void incr(int idx) {
    u64 *ptr = stats.lookup(&idx);
    if (ptr)
        ++(*ptr);
}
int count(struct pt_regs *ctx) {
    bpf_trace_printk("count() uprobe fired");
    u32 pid = bpf_get_current_pid_tgid();
    if (pid == PID)
        incr(0);
    return 0;
}"""
        # Need to import libc from ctypes to access unshare(2)
        libc = ctypes.CDLL("libc.so.6", use_errno=True)

        # Need to find path to libz.so.1
        libz_path = None
        p = subprocess.Popen(["ldconfig", "-p"], stdout=subprocess.PIPE)
        for l in p.stdout:
            n = l.split()
            if n[0] == b"libz.so.1":
                # if libz was already found, override only if new lib is more
                # specific (e.g. libc6,x86-64 vs libc6)
                if not libz_path or len(n[1].split(b",")) > 1:
                    libz_path = n[-1]
        p.wait()
        p.stdout.close()
        p = None

        self.assertIsNotNone(libz_path)

        # fork a child that we'll place in a separate mount namespace
        child_pid = os.fork()
        if child_pid == 0:
            # Unshare CLONE_NEWNS
            if libc.unshare(0x00020000) == -1:
                e = ctypes.get_errno()
                raise OSError(e, errno.errorcode[e])

            # Remount root MS_REC|MS_PRIVATE
            if libc.mount(None, b"/", None, (1<<14)|(1<<18) , None) == -1:
                e = ctypes.get_errno()
                raise OSError(e, errno.errorcode[e])

            if libc.mount(b"tmpfs", b"/tmp", b"tmpfs", 0, None) == -1:
                e = ctypes.get_errno()
                raise OSError(e, errno.errorcode[e])

            shutil.copy(libz_path, b"/tmp")

            libz = ctypes.CDLL("/tmp/libz.so.1")
            time.sleep(1)
            libz.zlibVersion()
            time.sleep(5)
            os._exit(0)

        libname = "/tmp/libz.so.1"
        symname = "zlibVersion"
        text = text.replace("PID", "%d" % child_pid)
        b = bcc.BPF(text=text)
        b.attach_uprobe(name=libname, sym=symname, fn_name="count", pid=child_pid)
        b.attach_uretprobe(name=libname, sym=symname, fn_name="count", pid=child_pid)
        time.sleep(1)
        self.assertEqual(b["stats"][ctypes.c_int(0)].value, 2)
        b.detach_uretprobe(name=libname, sym=symname, pid=child_pid)
        b.detach_uprobe(name=libname, sym=symname, pid=child_pid)
        os.wait()

if __name__ == "__main__":
    unittest.main()