File: test_high_throughput_snapshot.py

package info (click to toggle)
ltt-control 2.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 21,860 kB
  • sloc: cpp: 192,012; sh: 28,777; ansic: 10,960; python: 7,108; makefile: 3,520; java: 109; xml: 46
file content (185 lines) | stat: -rwxr-xr-x 6,372 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env python3
#
# SPDX-FileCopyrightText: 2025 Kienan Stewart <kstewart@efficios.com>
# SPDX-License-Identifier: GPL-2.1-only
#

"""
Validate that the trailing packet in snapshots contains the appropriate
events discarded for the ring buffer.

See test_ust_local_snapshot_duplicate_seq_num in
tests/regression/tools/snapshots/ust_test.
"""

import os
import pathlib
import platform
import shutil
import subprocess
import sys
import time

test_utils_import_path = pathlib.Path(__file__).absolute().parents[3] / "utils"
sys.path.append(str(test_utils_import_path))

import bt2
import lttngtest


def get_bytes_at_offset(file_name, offset, length):
    with open(file_name, "rb") as f:
        f.seek(offset)
        data = f.read(length)
    return data


def bytes_to_hex_str(b):
    return " ".join("{:02x}".format(byte) for byte in b)


def test_high_throughput_snapshot(tap, test_env, events_per_app=100):
    client = lttngtest.LTTngClient(test_env, log=tap.diagnostic)
    session = client.create_session(output=None, snapshot=True)
    channel = session.add_channel(
        lttngtest.lttngctl.TracingDomain.User, subbuf_size=4096, subbuf_count=4
    )
    channel.add_recording_rule(lttngtest.lttngctl.UserTracepointEventRule("tp:tptest"))
    session.start()

    # 1. Trace application to set-up a situation where the ring-buffers have discarded events
    # Note: in snapshot mode this is less likely to happen than in discard mode, so an application
    # that explicitly emits events that are too large is used.
    online_cpus = list(lttngtest.online_cpus())
    app_kwargs = [
        # If there are no events produced ever, the sub-buffers remain empty (despite
        # accruing lost events) and so they are never delivered. Therefore, an application
        # that doesn't have discarded events for being too large is run first.
        {"event_count": events_per_app},
        # Then this application is run and all the events it emits should be discarded
        # as too large due to the text_size + fill_text vs. configured sub-buffer size.
        {"event_count": events_per_app, "text_size": 2048, "fill_text": True},
    ]
    for app_kwarg in app_kwargs:
        app = test_env.launch_wait_trace_test_application(**app_kwarg)
        proc = subprocess.Popen(
            ["taskset", "-c", "-p", str(online_cpus[0]), str(app.vpid)]
        )
        proc.wait()
        if proc.returncode != 0:
            tap.diagnostic(
                "Failed to tasket pid '{}' to CPU '{}': {}".format(
                    app.vpid, online_cpus[0], proc.returncode
                )
            )
            tap.bail_out("All tasksets need to succeed")
            return
        app.trace()
        app.wait_for_tracing_done()
        app.wait_for_exit()

    # 2. Snapshot and Confirm that there are discarded events
    output_path_a = test_env.create_temporary_directory()
    client.snapshot_record(session.name, output_path_a)
    received_a = 0
    discarded_a = 0
    for msg in bt2.TraceCollectionMessageIterator(str(output_path_a)):
        if type(msg) is bt2._EventMessageConst:
            received_a += 1
            continue

        if type(msg) is bt2._DiscardedEventsMessageConst:
            discarded_a += msg.count

    tap.diagnostic(
        "Total={}, Received={}, Discarded={}".format(
            received_a + discarded_a, received_a, discarded_a
        )
    )

    # 3. Snapshot and confirm that discarded events match, and that the trailing packets for the streams
    # have events discarded
    output_path_b = test_env.create_temporary_directory()
    client.snapshot_record(session.name, output_path_b)
    received_b = 0
    discarded_b = 0
    for msg in bt2.TraceCollectionMessageIterator(str(output_path_b)):
        if type(msg) is bt2._EventMessageConst:
            received_b += 1
            continue

        if type(msg) is bt2._DiscardedEventsMessageConst:
            discarded_b += msg.count

    tap.diagnostic(
        "Total={}, Received={}, Discarded={}".format(
            received_b + discarded_b, received_b, discarded_b
        )
    )

    tap.test(
        discarded_a == discarded_b,
        "The events discarded in snapshot A ({}) and snapshot B ({}) match".format(
            discarded_a, discarded_b
        ),
    )
    tap.test(
        discarded_a == events_per_app,
        "The number of discarded events ({}) match the number events emitted by a single run of the application ({})".format(
            discarded_a, events_per_app
        ),
    )

    # Get the configure page size.
    proc = subprocess.Popen(["getconf", "PAGE_SIZE"], stdout=subprocess.PIPE)
    proc.wait()
    if proc.returncode != 0:
        tap.bail_out("Unable to determine page size")
        return

    page_size = int(proc.stdout.read().decode("utf-8").strip())

    # Get the file associated with the chosen CPU from Snapshot B
    file_name = os.path.join(
        output_path_b,
        str(list(pathlib.Path(output_path_b).glob("*"))[0]),
        "ust",
        "uid",
        str(os.getuid()),
        "64-bit",
        "{}_{}".format(channel.name, online_cpus[0]),
    )

    # start of last (of the 4 from the ringbuffer) packet = page_size * 3
    # start of terminal packet = page_size * 4
    # CTF1 x86_64 : events_discarded at offset (decimal) 72, 8 bytes long
    events_discarded_fourth_packet = get_bytes_at_offset(
        file_name, 72 + 3 * page_size, 8
    )
    events_discarded_terminal_packet = get_bytes_at_offset(
        file_name, 72 + 4 * page_size, 8
    )

    tap.test(
        events_discarded_terminal_packet == events_discarded_fourth_packet,
        "events_discarded header field in fourth packet (`{}`) and terminal packet (`{}`) match.".format(
            bytes_to_hex_str(events_discarded_fourth_packet),
            bytes_to_hex_str(events_discarded_terminal_packet),
        ),
    )


if __name__ == "__main__":
    tap = lttngtest.TapGenerator(3)
    if platform.machine() != "x86_64":
        tap.skip_all_remaining("Only run on x86_64")
        sys.exit(0)

    if sys.maxsize <= 2**32:
        tap.skip_all_remaining("Only run on 64-bit systems")
        sys.exit(0)

    with lttngtest.test_environment(log=tap.diagnostic, with_sessiond=True) as test_env:
        test_high_throughput_snapshot(tap, test_env)

    sys.exit(0 if tap.is_successful else 1)