1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
|
#!/usr/bin/env python3
#
# SPDX-FileCopyrightText: 2025 Olivier Dion <odion@efficios.com>
# SPDX-License-Identifier: GPL-2.0-only
import os
import pathlib
import random
import sys
# Import in-tree test utils
test_utils_import_path = pathlib.Path(__file__).absolute().parents[3] / "utils"
sys.path.append(str(test_utils_import_path))
import bt2
import lttngtest
"""
This test suite validates the following properties of the option
`--buffer-allocation=per-channel' of the `enable-channel' command:
- Only a single file stream is generated in the trace output
- By default, no CPU ID is provided as a context
- When adding the `cpu_id' context, the correct CPU ID is added to the context
of each events
- Kernel domains cannot use the per-channel allocation
These tests are ran against 3 variants:
- Normal session
- Snapshot session
- Live session
"""
def make_ust_per_channel_buffers_or_fail(session):
"""
Make a channel in the UST domain with per-channel buffers allocation for SESSION.
"""
try:
return session.add_channel(
lttngtest.TracingDomain.User,
buffer_allocation_policy=lttngtest.BufferAllocationPolicy.PerChannel,
)
except Exception as e:
tap.fail("Could not create UST channel with per-channel buffers")
raise e
def trace_stream_count(channel_name, trace_location):
"""
Return the number of stream from CHANNEL_NAME in TRACE_LOCATION.
"""
count = 0
for _, _, filenames in os.walk(trace_location):
for filename in filenames:
if filename.startswith(channel_name) and not filename.endswith(".idx"):
count += 1
return count
def trace_stream_all_context_values(context_name, events):
"""
List all values seen for context named CONTEXT_NAME in TRACE_LOCATION.
"""
values = set()
for event in events:
if event.common_context_field and context_name in event.common_context_field:
values.add(event.common_context_field[context_name])
elif (
event.specific_context_field
and context_name in event.specific_context_field
):
values.add(event.specific_context_field[context_name])
return values
def test_per_channel_buffers_ust_single_stream(tap, client, session, get_events):
"""
Ensure that only a single stream is in the trace for
channels with the per-channel allocation policy.
"""
channel = make_ust_per_channel_buffers_or_fail(session)
channel.add_recording_rule(lttngtest.UserTracepointEventRule())
get_events()
if isinstance(session.output, lttngtest.LocalSessionOutputLocation):
tap.test(
1 == trace_stream_count(channel.name, str(session.output.path)),
"Only a single stream is created for per-channel buffers",
)
else:
# The way to determine the output path of a streamed session is
# convoluted.
#
# The output path format used by the relayd is "ABI", so we can rely on
# it. By default, it follows the pattern:
# $LTTNG_HOME/lttng-traces/hostname/SESSION_NAME-CREATION_TIMESTAMP/"
#
# Unfortunately, the session's creation timestamp is not available from
# the CLI; it is only exposed by liblttng-ctl's API (see lttng_session_get_creation_time()).
# The following assumes that the session name is unique enough to be
# used as a unique identifier.
host_output_path = (
test_env.lttng_home_location / "lttng-traces" / os.uname().nodename
)
# Choose the first path that matches the session name.
session_output_path = next(
p for p in host_output_path.iterdir() if p.name.startswith(session.name)
)
tap.test(
1 == trace_stream_count(channel.name, str(session_output_path)),
"Only a single stream is create for per-channel buffers",
)
def test_per_channel_buffers_no_cpu_id_by_default(tap, client, session, get_events):
"""
Ensure that by default, when creating a channel with
per-channel buffers, no `cpu_id' context is added to events
of the trace.
"""
channel = make_ust_per_channel_buffers_or_fail(session)
channel.add_recording_rule(lttngtest.UserTracepointEventRule())
events = get_events()
cpu_ids_seen = trace_stream_all_context_values("cpu_id", events)
tap.test(len(cpu_ids_seen) == 0, "No cpu_id context field found in the trace")
def test_per_channel_buffers_correct_cpu_id_context(tap, client, session, get_events):
"""
Ensure that `cpu_id' context is added to event when enabled.
The test works by selecting a random CPU part of the current
process affinity. The tracee created inherit this affinity
and will only produce events for that CPU. It is expected
that in the trace, only the ID of the selected CPU is emitted.
"""
channel = make_ust_per_channel_buffers_or_fail(session)
channel.add_context(lttngtest.CPUidContextType())
channel.add_recording_rule(lttngtest.UserTracepointEventRule())
# Before starting the tracee. Change the CPU affinity of this process to a
# single CPU randomly selected from the current CPU affinity.
saved_cpu_affinity = os.sched_getaffinity(0)
try:
online_cpus = lttngtest.online_cpus()
except Exception as e:
tap.skip("Could not get list of online CPUS: {}".format(e), 1)
return
try:
new_affinity = {random.choice(list(saved_cpu_affinity.union(online_cpus)))}
os.sched_setaffinity(0, new_affinity)
events = get_events()
cpu_id_seen = trace_stream_all_context_values("cpu_id", events)
tap.test(cpu_id_seen == new_affinity, "Only desired cpu_id in the trace")
finally:
# Restore the CPU affinity of the process in any case.
os.sched_setaffinity(0, saved_cpu_affinity)
def test_per_channel_buffers_kernel(tap, client, session, get_events):
"""
Ensure that per-channel bufffers cannot be used with kernel domain channels.
"""
try:
session.add_channel(
lttngtest.TracingDomain.Kernel,
buffer_allocation_policy=lttngtest.BufferAllocationPolicy.PerChannel,
)
tap.fail("Kernel channel was created with per-channel buffers")
except lttngtest.LTTngClientError as exn:
tap.test(
"Buffer allocation not supported for the kernel domain" in exn._output,
"Cannot enable a channel with per-channel allocation policy in the kernel domain",
)
except Exception as e:
tap.fail("Unknown exception thrown while adding 'cpu_id' context: {}".format(e))
def run_test(test, tap, client, snapshot=False, live=False):
try:
if live:
session_output_location = lttngtest.NetworkSessionOutputLocation(
"net://localhost:{}:{}/".format(
test_env.lttng_relayd_control_port, test_env.lttng_relayd_data_port
)
)
else:
session_output_location = lttngtest.LocalSessionOutputLocation(
test_env.create_temporary_directory("trace")
)
session = client.create_session(
output=session_output_location, snapshot=snapshot, live=live
)
if live:
def prepare_consumer():
viewer = test_env.launch_live_viewer(session.name)
viewer.wait_until_connected()
return viewer
def drain_consumer(viewer):
viewer.wait()
return [msg.event for msg in viewer.messages]
else:
def prepare_consumer():
return str(session.output.path)
def drain_consumer(trace_location):
return [
msg.event
for msg in bt2.TraceCollectionMessageIterator(trace_location)
if type(msg) is bt2._EventMessageConst
]
# Launch a test application, and start a session.
#
# Prepare the consumer (either live viewer or simple trace location).
#
# Run the test application and wait for it to terminate.
#
# If the session is in snapshot mode, ask for a snapshot record.
#
# Stop the session and ask the consumer to drain all events from the
# trace.
def get_events():
test_app = test_env.launch_wait_trace_test_application(50)
session.start()
consumer = prepare_consumer()
test_app.trace()
test_app.wait_for_exit()
if snapshot:
session.record_snapshot()
session.stop()
return drain_consumer(consumer)
test(tap, client, session, get_events)
finally:
session.destroy()
ust_domain_tests = (
test_per_channel_buffers_ust_single_stream,
test_per_channel_buffers_no_cpu_id_by_default,
test_per_channel_buffers_correct_cpu_id_context,
)
kernel_domain_tests = (test_per_channel_buffers_kernel,)
variants = (
{},
{"snapshot": True},
{"live": True},
)
tap = lttngtest.TapGenerator(
len(variants) * (len(ust_domain_tests) + len(kernel_domain_tests))
)
with lttngtest.test_environment(
with_sessiond=True, with_relayd=True, log=tap.diagnostic
) as test_env:
client = lttngtest.LTTngClient(test_env, log=tap.diagnostic)
for variant in variants:
tap.diagnostic("Running variant: {}".format(variant))
for test in ust_domain_tests:
run_test(test, tap, client, **variant)
for test in kernel_domain_tests:
if test_env.run_kernel_tests():
run_test(test, tap, client, **variant)
else:
tap.skip(
"'{}' test require root to create kernel domain buffers".format(
test.__name__
),
1,
)
sys.exit(0 if tap.is_successful else 1)
|