File: test_ust.py

package info (click to toggle)
ltt-control 2.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 21,860 kB
  • sloc: cpp: 192,012; sh: 28,777; ansic: 10,960; python: 7,108; makefile: 3,520; java: 109; xml: 46
file content (171 lines) | stat: -rwxr-xr-x 6,136 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/bin/env python3
#
# SPDX-FileCopyrightText: 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com>
#
# SPDX-License-Identifier: GPL-2.0-only

import pathlib
import sys
import os
from typing import Any, Callable, Type

"""
Test the addition of various user space contexts.

This test successively sets up a session with a certain context enabled, traces
a test application, and then reads the resulting trace to determine if:
  - the context field is present in the trace
  - the context field has the expected value.

The vpid, vuid, vgid and java application contexts are validated by this test.
"""

# Import in-tree test utils
test_utils_import_path = pathlib.Path(__file__).absolute().parents[3] / "utils"
sys.path.append(str(test_utils_import_path))

import lttngtest
import bt2


def context_trace_field_name(context_type):
    # type: (Type[lttngtest.ContextType]) -> str
    if isinstance(context_type, lttngtest.VpidContextType):
        return "vpid"
    elif isinstance(context_type, lttngtest.VuidContextType):
        return "vuid"
    elif isinstance(context_type, lttngtest.VgidContextType):
        return "vgid"
    elif isinstance(context_type, lttngtest.JavaApplicationContextType):
        # Depends on the trace format and will need to be adapted for CTF 2.
        return "_app_{retriever}_{name}".format(
            retriever=context_type.retriever_name, name=context_type.field_name
        )
    else:
        raise NotImplementedError


def trace_stream_class_has_context_field_in_event_context(
    trace_location, context_field_name
):
    # type: (pathlib.Path, str) -> bool
    iterator = bt2.TraceCollectionMessageIterator(str(trace_location))

    # A bt2 message sequence is guaranteed to begin with a StreamBeginningMessage.
    # Since we only have one channel (one stream class) and one trace, it is
    # safe to use it to determine if the stream class contains the expected
    # context field.
    stream_begin_msg = next(iterator)

    trace_class = stream_begin_msg.stream.trace.cls
    # Ensure the trace class has only one stream class.
    assert len(trace_class)

    stream_class_id = next(iter(trace_class))
    stream_class = trace_class[stream_class_id]
    event_common_context_field_class = stream_class.event_common_context_field_class

    return context_field_name in event_common_context_field_class


def trace_events_have_context_value(trace_location, context_field_name, value):
    # type: (pathlib.Path, str, Any) -> bool
    for msg in bt2.TraceCollectionMessageIterator(str(trace_location)):
        if type(msg) is not bt2._EventMessageConst:
            continue

        if msg.event.common_context_field[context_field_name] != value:
            print(msg.event.common_context_field[context_field_name])
            return False
    return True


def test_static_context(tap, test_env, context_type, context_value_retriever):
    # type: (lttngtest.TapGenerator, lttngtest._Environment, lttngtest.ContextType, Callable[[lttngtest.WaitTraceTestApplication], Any]) -> None
    tap.diagnostic(
        "Test presence and expected value of context `{context_name}`".format(
            context_name=type(context_type).__name__
        )
    )

    session_output_location = lttngtest.LocalSessionOutputLocation(
        test_env.create_temporary_directory("trace")
    )

    client = lttngtest.LTTngClient(test_env, log=tap.diagnostic)

    with tap.case("Create a session") as test_case:
        session = client.create_session(output=session_output_location)
    tap.diagnostic("Created session `{session_name}`".format(session_name=session.name))

    with tap.case(
        "Add a channel to session `{session_name}`".format(session_name=session.name)
    ) as test_case:
        channel = session.add_channel(lttngtest.TracingDomain.User)
    tap.diagnostic("Created channel `{channel_name}`".format(channel_name=channel.name))

    with tap.case(
        "Add {context_type} context to channel `{channel_name}`".format(
            context_type=type(context_type).__name__, channel_name=channel.name
        )
    ) as test_case:
        channel.add_context(context_type)

    test_app = test_env.launch_wait_trace_test_application(50)

    # Only track the test application
    session.user_vpid_process_attribute_tracker.track(test_app.vpid)
    expected_context_value = context_value_retriever(test_app)

    # Enable all user space events, the default for a user tracepoint event rule.
    channel.add_recording_rule(lttngtest.UserTracepointEventRule())

    session.start()
    test_app.trace()
    test_app.wait_for_exit()
    session.stop()
    session.destroy()

    tap.test(
        trace_stream_class_has_context_field_in_event_context(
            session_output_location.path, context_trace_field_name(context_type)
        ),
        "Stream class contains field `{context_field_name}`".format(
            context_field_name=context_trace_field_name(context_type)
        ),
    )

    tap.test(
        trace_events_have_context_value(
            session_output_location.path,
            context_trace_field_name(context_type),
            expected_context_value,
        ),
        "Trace's events contain the expected `{context_field_name}` value `{expected_context_value}`".format(
            context_field_name=context_trace_field_name(context_type),
            expected_context_value=expected_context_value,
        ),
    )


tap = lttngtest.TapGenerator(20)
tap.diagnostic("Test user space context tracing")

with lttngtest.test_environment(with_sessiond=True, log=tap.diagnostic) as test_env:
    test_static_context(
        tap, test_env, lttngtest.VpidContextType(), lambda test_app: test_app.vpid
    )
    test_static_context(
        tap, test_env, lttngtest.VuidContextType(), lambda test_app: os.getuid()
    )
    test_static_context(
        tap, test_env, lttngtest.VgidContextType(), lambda test_app: os.getgid()
    )
    test_static_context(
        tap,
        test_env,
        lttngtest.JavaApplicationContextType("mayo", "ketchup"),
        lambda test_app: {},
    )

sys.exit(0 if tap.is_successful else 1)