File: test_channel_allocation_policy_per_channel.py

package info (click to toggle)
ltt-control 2.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 21,860 kB
  • sloc: cpp: 192,012; sh: 28,777; ansic: 10,960; python: 7,108; makefile: 3,520; java: 109; xml: 46
file content (304 lines) | stat: -rwxr-xr-x 10,121 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
#!/usr/bin/env python3
#
# SPDX-FileCopyrightText: 2025 Olivier Dion <odion@efficios.com>
# SPDX-License-Identifier: GPL-2.0-only

import os
import pathlib
import random
import sys

# Import in-tree test utils
test_utils_import_path = pathlib.Path(__file__).absolute().parents[3] / "utils"
sys.path.append(str(test_utils_import_path))

import bt2
import lttngtest

"""
This test suite validates the following properties of the option
`--buffer-allocation=per-channel' of the `enable-channel' command:

  - Only a single file stream is generated in the trace output

  - By default, no CPU ID is provided as a context

  - When adding the `cpu_id' context, the correct CPU ID is added to the context
    of each events

  - Kernel domains cannot use the per-channel allocation

These tests are ran against 3 variants:

  - Normal session

  - Snapshot session

  - Live session
"""


def make_ust_per_channel_buffers_or_fail(session):
    """
    Make a channel in the UST domain with per-channel buffers allocation for SESSION.
    """
    try:
        return session.add_channel(
            lttngtest.TracingDomain.User,
            buffer_allocation_policy=lttngtest.BufferAllocationPolicy.PerChannel,
        )
    except Exception as e:
        tap.fail("Could not create UST channel with per-channel buffers")
        raise e


def trace_stream_count(channel_name, trace_location):
    """
    Return the number of stream from CHANNEL_NAME in TRACE_LOCATION.
    """

    count = 0

    for _, _, filenames in os.walk(trace_location):
        for filename in filenames:
            if filename.startswith(channel_name) and not filename.endswith(".idx"):
                count += 1
    return count


def trace_stream_all_context_values(context_name, events):
    """
    List all values seen for context named CONTEXT_NAME in TRACE_LOCATION.
    """

    values = set()

    for event in events:
        if event.common_context_field and context_name in event.common_context_field:
            values.add(event.common_context_field[context_name])
        elif (
            event.specific_context_field
            and context_name in event.specific_context_field
        ):
            values.add(event.specific_context_field[context_name])

    return values


def test_per_channel_buffers_ust_single_stream(tap, client, session, get_events):
    """
    Ensure that only a single stream is in the trace for
    channels with the per-channel allocation policy.
    """

    channel = make_ust_per_channel_buffers_or_fail(session)
    channel.add_recording_rule(lttngtest.UserTracepointEventRule())
    get_events()

    if isinstance(session.output, lttngtest.LocalSessionOutputLocation):
        tap.test(
            1 == trace_stream_count(channel.name, str(session.output.path)),
            "Only a single stream is created for per-channel buffers",
        )
    else:
        # The way to determine the output path of a streamed session is
        # convoluted.
        #
        # The output path format used by the relayd is "ABI", so we can rely on
        # it. By default, it follows the pattern:
        #   $LTTNG_HOME/lttng-traces/hostname/SESSION_NAME-CREATION_TIMESTAMP/"
        #
        # Unfortunately, the session's creation timestamp is not available from
        # the CLI; it is only exposed by liblttng-ctl's API (see lttng_session_get_creation_time()).
        # The following assumes that the session name is unique enough to be
        # used as a unique identifier.
        host_output_path = (
            test_env.lttng_home_location / "lttng-traces" / os.uname().nodename
        )
        # Choose the first path that matches the session name.
        session_output_path = next(
            p for p in host_output_path.iterdir() if p.name.startswith(session.name)
        )
        tap.test(
            1 == trace_stream_count(channel.name, str(session_output_path)),
            "Only a single stream is create for per-channel buffers",
        )


def test_per_channel_buffers_no_cpu_id_by_default(tap, client, session, get_events):
    """
    Ensure that by default, when creating a channel with
    per-channel buffers, no `cpu_id' context is added to events
    of the trace.
    """

    channel = make_ust_per_channel_buffers_or_fail(session)
    channel.add_recording_rule(lttngtest.UserTracepointEventRule())
    events = get_events()
    cpu_ids_seen = trace_stream_all_context_values("cpu_id", events)
    tap.test(len(cpu_ids_seen) == 0, "No cpu_id context field found in the trace")


def test_per_channel_buffers_correct_cpu_id_context(tap, client, session, get_events):
    """
    Ensure that `cpu_id' context is added to event when enabled.

    The test works by selecting a random CPU part of the current
    process affinity.  The tracee created inherit this affinity
    and will only produce events for that CPU.  It is expected
    that in the trace, only the ID of the selected CPU is emitted.
    """

    channel = make_ust_per_channel_buffers_or_fail(session)
    channel.add_context(lttngtest.CPUidContextType())
    channel.add_recording_rule(lttngtest.UserTracepointEventRule())

    # Before starting the tracee. Change the CPU affinity of this process to a
    # single CPU randomly selected from the current CPU affinity.
    saved_cpu_affinity = os.sched_getaffinity(0)

    try:
        online_cpus = lttngtest.online_cpus()
    except Exception as e:
        tap.skip("Could not get list of online CPUS: {}".format(e), 1)
        return

    try:
        new_affinity = {random.choice(list(saved_cpu_affinity.union(online_cpus)))}
        os.sched_setaffinity(0, new_affinity)
        events = get_events()
        cpu_id_seen = trace_stream_all_context_values("cpu_id", events)
        tap.test(cpu_id_seen == new_affinity, "Only desired cpu_id in the trace")
    finally:
        # Restore the CPU affinity of the process in any case.
        os.sched_setaffinity(0, saved_cpu_affinity)


def test_per_channel_buffers_kernel(tap, client, session, get_events):
    """
    Ensure that per-channel bufffers cannot be used with kernel domain channels.
    """

    try:
        session.add_channel(
            lttngtest.TracingDomain.Kernel,
            buffer_allocation_policy=lttngtest.BufferAllocationPolicy.PerChannel,
        )
        tap.fail("Kernel channel was created with per-channel buffers")
    except lttngtest.LTTngClientError as exn:
        tap.test(
            "Buffer allocation not supported for the kernel domain" in exn._output,
            "Cannot enable a channel with per-channel allocation policy in the kernel domain",
        )
    except Exception as e:
        tap.fail("Unknown exception thrown while adding 'cpu_id' context: {}".format(e))


def run_test(test, tap, client, snapshot=False, live=False):
    try:
        if live:
            session_output_location = lttngtest.NetworkSessionOutputLocation(
                "net://localhost:{}:{}/".format(
                    test_env.lttng_relayd_control_port, test_env.lttng_relayd_data_port
                )
            )
        else:
            session_output_location = lttngtest.LocalSessionOutputLocation(
                test_env.create_temporary_directory("trace")
            )

        session = client.create_session(
            output=session_output_location, snapshot=snapshot, live=live
        )

        if live:

            def prepare_consumer():
                viewer = test_env.launch_live_viewer(session.name)
                viewer.wait_until_connected()
                return viewer

            def drain_consumer(viewer):
                viewer.wait()
                return [msg.event for msg in viewer.messages]

        else:

            def prepare_consumer():
                return str(session.output.path)

            def drain_consumer(trace_location):
                return [
                    msg.event
                    for msg in bt2.TraceCollectionMessageIterator(trace_location)
                    if type(msg) is bt2._EventMessageConst
                ]

        # Launch a test application, and start a session.
        #
        # Prepare the consumer (either live viewer or simple trace location).
        #
        # Run the test application and wait for it to terminate.
        #
        # If the session is in snapshot mode, ask for a snapshot record.
        #
        # Stop the session and ask the consumer to drain all events from the
        # trace.
        def get_events():
            test_app = test_env.launch_wait_trace_test_application(50)
            session.start()
            consumer = prepare_consumer()
            test_app.trace()
            test_app.wait_for_exit()
            if snapshot:
                session.record_snapshot()
            session.stop()
            return drain_consumer(consumer)

        test(tap, client, session, get_events)
    finally:
        session.destroy()


ust_domain_tests = (
    test_per_channel_buffers_ust_single_stream,
    test_per_channel_buffers_no_cpu_id_by_default,
    test_per_channel_buffers_correct_cpu_id_context,
)

kernel_domain_tests = (test_per_channel_buffers_kernel,)

variants = (
    {},
    {"snapshot": True},
    {"live": True},
)

tap = lttngtest.TapGenerator(
    len(variants) * (len(ust_domain_tests) + len(kernel_domain_tests))
)

with lttngtest.test_environment(
    with_sessiond=True, with_relayd=True, log=tap.diagnostic
) as test_env:
    client = lttngtest.LTTngClient(test_env, log=tap.diagnostic)

    for variant in variants:
        tap.diagnostic("Running variant: {}".format(variant))

        for test in ust_domain_tests:
            run_test(test, tap, client, **variant)

        for test in kernel_domain_tests:
            if test_env.run_kernel_tests():
                run_test(test, tap, client, **variant)
            else:
                tap.skip(
                    "'{}' test require root to create kernel domain buffers".format(
                        test.__name__
                    ),
                    1,
                )


sys.exit(0 if tap.is_successful else 1)