File: test_per_application_leaks.py

package info (click to toggle)
ltt-control 2.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 21,860 kB
  • sloc: cpp: 192,012; sh: 28,777; ansic: 10,960; python: 7,108; makefile: 3,520; java: 109; xml: 46
file content (172 lines) | stat: -rwxr-xr-x 5,951 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#!/usr/bin/env python3
#
# SPDX-FileCyoprightText: Kienan Stewart <kstewart@efficios.com>
# SPDX-License-Identifier: GPL-2.0-only

"""
Test that the consumerd doesn't leak file descriptor allocations in /dev/shm
when the relayd exits before instrumented applications start.

@see https://bugs.lttng.org/issues/1411
"""

import os
import pathlib
import subprocess
import sys
import time

test_utils_import_path = pathlib.Path(__file__).absolute().parents[3] / "utils"
sys.path.append(str(test_utils_import_path))

import lttngtest


def get_consumerd_pid(tap, parent, match_string):
    pid = None
    try:
        process = subprocess.Popen(
            ["pgrep", "-P", str(parent), "-f", match_string],
            stdout=subprocess.PIPE,
        )
        process.wait()
        output = str(process.stdout.read(), encoding="UTF-8").splitlines()
        if len(output) > 1:
            raise Exception(
                "Unexpected number of output lines (got {}): {}".format(
                    len(output), output
                )
            )
        elif len(output) == 1:
            pid = int(output[0])
    except Exception as e:
        tap.diagnostic(
            "Failed to find child process of '{}' matching '{}': '{}'".format(
                parent, match_string, str(e)
            )
        )
    return pid


def count_process_dev_shm_fds(pid):
    count = 0
    if pid is None:
        return count
    dir = os.path.join("/proc", str(pid), "fd")
    for root, dirs, files in os.walk(dir):
        for f in files:
            filename = pathlib.Path(os.path.join(root, f))
            try:
                # The symlink in /proc/PID may exist, but point to an unlinked
                # file - shm_unlink is called but either the kernel hasn't yet
                # finished the clean-up or the consumer hasn't called close()
                # on the FD yet.
                if filename.is_symlink() and str(filename.resolve()).startswith(
                    "/dev/shm/shm-ust-consumer"
                ):
                    count += 1
            except FileNotFoundError:
                # As /proc/XX/fd/ is being walked, fds may be added or removed
                continue
    return count


def count_dev_shm_fds(tap, test_env):
    consumer32_pid = get_consumerd_pid(tap, test_env._sessiond.pid, "ustconsumerd32")
    fds_consumerd32 = count_process_dev_shm_fds(consumer32_pid)
    consumer64_pid = get_consumerd_pid(tap, test_env._sessiond.pid, "ustconsumerd64")
    fds_consumerd64 = count_process_dev_shm_fds(consumer64_pid)
    return (fds_consumerd32, fds_consumerd64)


def test_fd_leak(tap, test_env, buffer_sharing_policy, kill_relayd=True):
    tap.diagnostic(
        "test_fd_leak with buffer sharing policy {}, kill relayd: {}".format(
            buffer_sharing_policy, kill_relayd
        )
    )
    client = lttngtest.LTTngClient(test_env, log=tap.diagnostic)
    output = lttngtest.NetworkSessionOutputLocation(
        "net://localhost:{}:{}/".format(
            test_env.lttng_relayd_control_port, test_env.lttng_relayd_data_port
        )
    )

    session = client.create_session(output=output, live=True)
    channel = session.add_channel(
        lttngtest.lttngctl.TracingDomain.User,
        buffer_sharing_policy=buffer_sharing_policy,
    )
    channel.add_recording_rule(lttngtest.lttngctl.UserTracepointEventRule())
    session.start()

    count_post_start = count_dev_shm_fds(tap, test_env)

    # Kill the relayd
    if kill_relayd:
        test_env._terminate_relayd()

    test_env.launch_wait_trace_test_application(10)
    count_post_app1 = count_dev_shm_fds(tap, test_env)

    test_env.launch_wait_trace_test_application(10)
    count_post_app2 = count_dev_shm_fds(tap, test_env)

    test_env.launch_wait_trace_test_application(10)
    count_post_app3 = count_dev_shm_fds(tap, test_env)

    session.stop()
    session.destroy()

    # As there is not method to know exactly when the final close of the
    # shm happens (it is timing dependant from an external point of view),
    # this test iterates waiting for the post-destroy count to reach the
    # post-start count. In a failure, this will loop infinitely.
    tap.diagnostic(
        "Waiting for post-destroy shm count to drop back to post-start level"
    )
    while True:
        count_post_destroy = count_dev_shm_fds(tap, test_env)
        if count_post_destroy == count_post_start:
            break
        time.sleep(0.1)

    tap.diagnostic(
        "FD counts post-start: {}, post-destroy: {}".format(
            count_post_start, count_post_destroy
        )
    )
    tap.test(
        count_post_start == count_post_destroy,
        "Count of consumerd FDs in /dev/shm are equal after session start then after destroy",
    )

    tap.diagnostic(
        "FD counts post-app-1: {}, post-app-2: {}, post-app-3: {}".format(
            count_post_app1, count_post_app2, count_post_app3
        )
    )
    if buffer_sharing_policy == lttngtest.lttngctl.BufferSharingPolicy.PerUID:
        tap.test(
            (count_post_app1 == count_post_app2)
            and (count_post_app2 == count_post_app3),
            "Count of consumerd FDs in /dev/shm doesn't leak over several application invocations",
        )
    else:
        tap.skip(
            "Count of consumerds FDs in /dev/shm doesn't leak over several application invocations - no mechanism is available to guarantee buffer reclamation within a given time frame"
        )


tap = lttngtest.TapGenerator(8)
for kill_relayd in [True, False]:
    for buffer_sharing_policy in [
        lttngtest.lttngctl.BufferSharingPolicy.PerUID,
        lttngtest.lttngctl.BufferSharingPolicy.PerPID,
    ]:
        with lttngtest.test_environment(
            log=tap.diagnostic, with_relayd=True, with_sessiond=True
        ) as test_env:
            test_fd_leak(tap, test_env, buffer_sharing_policy, kill_relayd)

sys.exit(0 if tap.is_successful else 1)