1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
|
#!/usr/bin/env python
#
# USAGE: test_usdt2.py
#
# Copyright 2017 Facebook, Inc
# Licensed under the Apache License, Version 2.0 (the "License")
from __future__ import print_function
from bcc import BPF, USDT
from unittest import main, TestCase
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
import ctypes as ct
import inspect
import os
import signal
class TestUDST(TestCase):
def setUp(self):
# Application, minimum, to define three trace points
app_text = b"""
#include <stdlib.h>
#include <unistd.h>
#include "folly/tracing/StaticTracepoint.h"
int main(int argc, char **argv) {
int t = atoi(argv[1]);
while (1) {
FOLLY_SDT(test, probe_point_1, t);
FOLLY_SDT(test, probe_point_2, t + 1);
FOLLY_SDT(test, probe_point_3, t + 2);
sleep(1);
}
return 1;
}
"""
# BPF program
self.bpf_text = """
#include <uapi/linux/ptrace.h>
BPF_PERF_OUTPUT(event1);
BPF_PERF_OUTPUT(event2);
BPF_PERF_OUTPUT(event3);
BPF_PERF_OUTPUT(event4);
BPF_PERF_OUTPUT(event5);
BPF_PERF_OUTPUT(event6);
int do_trace1(struct pt_regs *ctx) {
u32 pid = bpf_get_current_pid_tgid();
int result = 0;
bpf_usdt_readarg(1, ctx, &result);
if (FILTER)
event1.perf_submit(ctx, &result, sizeof(result));
else
event4.perf_submit(ctx, &result, sizeof(result));
return 0;
};
int do_trace2(struct pt_regs *ctx) {
u32 pid = bpf_get_current_pid_tgid();
int result = 0;
bpf_usdt_readarg(1, ctx, &result);
if (FILTER)
event2.perf_submit(ctx, &result, sizeof(result));
else
event5.perf_submit(ctx, &result, sizeof(result));
return 0;
}
int do_trace3(struct pt_regs *ctx) {
u32 pid = bpf_get_current_pid_tgid();
int result = 0;
bpf_usdt_readarg(1, ctx, &result);
if (FILTER)
event3.perf_submit(ctx, &result, sizeof(result));
else
event6.perf_submit(ctx, &result, sizeof(result));
return 0;
}
"""
# Compile and run the application
self.ftemp = NamedTemporaryFile(delete=False)
self.ftemp.close()
comp = Popen(["gcc", "-I", "%s/include" % os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))),
"-x", "c++", "-o", self.ftemp.name, "-"],
stdin=PIPE)
comp.stdin.write(app_text)
comp.stdin.close()
self.assertEqual(comp.wait(), 0)
# create 3 applications, 2 applications will have usdt attached and
# the third one does not, and the third one should not call into
# bpf program.
self.app = Popen([self.ftemp.name, "1"])
self.app2 = Popen([self.ftemp.name, "11"])
self.app3 = Popen([self.ftemp.name, "21"])
def test_attach1(self):
# Enable USDT probe from given PID and verifier generated BPF programs.
u = USDT(pid=int(self.app.pid))
u.enable_probe(probe="probe_point_1", fn_name="do_trace1")
u.enable_probe(probe="probe_point_2", fn_name="do_trace2")
u2 = USDT(pid=int(self.app2.pid))
u2.enable_probe(probe="probe_point_2", fn_name="do_trace2")
u2.enable_probe(probe="probe_point_3", fn_name="do_trace3")
self.bpf_text = self.bpf_text.replace("FILTER", "pid == %d" % self.app.pid)
b = BPF(text=self.bpf_text, usdt_contexts=[u, u2])
# Event states for each event:
# 0 - probe not caught, 1 - probe caught with correct value,
# 2 - probe caught with incorrect value
self.evt_st_1 = 0
self.evt_st_2 = 0
self.evt_st_3 = 0
self.evt_st_4 = 0
self.evt_st_5 = 0
self.evt_st_6 = 0
def check_event_val(data, event_state, expected_val):
result = ct.cast(data, ct.POINTER(ct.c_int)).contents
if result.value == expected_val:
if (event_state == 0 or event_state == 1):
return 1
return 2
def print_event1(cpu, data, size):
self.evt_st_1 = check_event_val(data, self.evt_st_1, 1)
def print_event2(cpu, data, size):
self.evt_st_2 = check_event_val(data, self.evt_st_2, 2)
def print_event3(cpu, data, size):
self.evt_st_3 = check_event_val(data, self.evt_st_3, 3)
def print_event4(cpu, data, size):
self.evt_st_4 = check_event_val(data, self.evt_st_4, 11)
def print_event5(cpu, data, size):
self.evt_st_5 = check_event_val(data, self.evt_st_5, 12)
def print_event6(cpu, data, size):
self.evt_st_6 = check_event_val(data, self.evt_st_6, 13)
# loop with callback to print_event
b["event1"].open_perf_buffer(print_event1)
b["event2"].open_perf_buffer(print_event2)
b["event3"].open_perf_buffer(print_event3)
b["event4"].open_perf_buffer(print_event4)
b["event5"].open_perf_buffer(print_event5)
b["event6"].open_perf_buffer(print_event6)
# three iterations to make sure we get some probes and have time to process them
for i in range(5):
b.perf_buffer_poll()
# note that event1 and event4 do not really fire, so their state should be 0
# use separate asserts so that if test fails we know which one is the culprit
self.assertTrue(self.evt_st_1 == 1)
self.assertTrue(self.evt_st_2 == 1)
self.assertTrue(self.evt_st_3 == 0)
self.assertTrue(self.evt_st_4 == 0)
self.assertTrue(self.evt_st_5 == 1)
self.assertTrue(self.evt_st_6 == 1)
def tearDown(self):
# kill the subprocess, clean the environment
self.app.kill()
self.app.wait()
self.app2.kill()
self.app2.wait()
self.app3.kill()
self.app3.wait()
os.unlink(self.ftemp.name)
if __name__ == "__main__":
main()
|