File: test_high_throughput.py

package info (click to toggle)
ltt-control 2.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 21,860 kB
  • sloc: cpp: 192,012; sh: 28,777; ansic: 10,960; python: 7,108; makefile: 3,520; java: 109; xml: 46
file content (105 lines) | stat: -rwxr-xr-x 3,280 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#!/usr/bin/env python3
#
# SPDX-FileCopyrightText: 2025 Kienan Stewart <kstewart@efficios.com>
# SPDX-License-Identifier: GPL-2.1-only
#

"""
Validate that under heaving tracing load the sum of the recorded and dropped
events matches the expected output of the traced applications
"""

import concurrent.futures
import os
import pathlib
import subprocess
import sys
import time

test_utils_import_path = pathlib.Path(__file__).absolute().parents[2] / "utils"
print(test_utils_import_path)
sys.path.append(str(test_utils_import_path))

import bt2
import lttngtest


def test_high_throughput(tap, test_env, app_count=20, events_per_app=1000000):
    online_cpus = list(lttngtest.online_cpus())

    client = lttngtest.LTTngClient(test_env, log=tap.diagnostic)
    output_path = test_env.create_temporary_directory()
    output = lttngtest.LocalSessionOutputLocation(trace_path=output_path)
    session = client.create_session(output=output)
    channel = session.add_channel(lttngtest.lttngctl.TracingDomain.Kernel)
    channel.add_recording_rule(lttngtest.KernelTracepointEventRule("lttng_test_*"))
    session.start()

    proc = subprocess.Popen(
        ["taskset", "-p", "-c", str(online_cpus[0]), str(os.getpid())]
    )
    proc.wait()
    if proc.returncode != 0:
        tap.bail_out(
            "Failed to taskset self to first online CPU `{}`: {}".format(
                online_cpus[0], proc.returncode
            )
        )
        return

    with lttngtest.kernel_module("lttng-test"):

        def submit_events(count):
            with open("/proc/lttng-test-recursive-event", "w") as f:
                f.write(str(count))

            return True

        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = {
                executor.submit(submit_events, events_per_app): x
                for x in range(app_count)
            }

            for future in concurrent.futures.as_completed(futures):
                index = futures[future]
                tap.diagnostic("Job {} done: {}".format(index, future.result()))

        session.stop()
        session.destroy()

    # There is a single lttng_test_recursive_event per app run not counted in events_per_app
    expected = app_count * (events_per_app + 1)
    received = 0
    discarded = 0
    for msg in bt2.TraceCollectionMessageIterator(str(output_path)):
        if type(msg) is bt2._EventMessageConst:
            received += 1
            continue

        if type(msg) is bt2._DiscardedEventsMessageConst:
            discarded += msg.count

    total = received + discarded
    tap.diagnostic(
        "received={}, discarded={}, total={}, expected={}".format(
            received, discarded, total, expected
        )
    )
    tap.test(
        total == expected,
        "Total events {} match expected total {}".format(total, expected),
    )


if __name__ == "__main__":
    tap = lttngtest.TapGenerator(1)
    if not lttngtest._Environment.run_kernel_tests():
        tap.skip_all_remaining("Kernel tests not enabled")
        sys.exit(0)

    with lttngtest.test_environment(
        log=tap.diagnostic, with_sessiond=True, enable_kernel_domain=True
    ) as test_env:
        test_high_throughput(tap, test_env)
    sys.exit(0 if tap.is_successful else 1)