File: test_fork.py

package info (click to toggle)
ltt-control 2.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 21,860 kB
  • sloc: cpp: 192,012; sh: 28,777; ansic: 10,960; python: 7,108; makefile: 3,520; java: 109; xml: 46
file content (138 lines) | stat: -rw-r--r-- 3,872 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python3
#
# SPDX-FileCopyrightText: 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
#
# SPDX-License-Identifier: GPL-2.0-only

import os
import subprocess
import re
import shutil
import sys

test_path = os.path.dirname(os.path.abspath(__file__)) + "/"
test_utils_path = test_path
for i in range(4):
    test_utils_path = os.path.dirname(test_utils_path)
test_utils_path = test_utils_path + "/utils"
sys.path.append(test_utils_path)
from test_utils import *


NR_TESTS = 6
current_test = 1
print("1..{0}".format(NR_TESTS))

# Check if a sessiond is running... bail out if none found.
if session_daemon_alive() == 0:
    bail(
        'No sessiond running. Please make sure you are running this test with the "run" shell script and verify that the lttng tools are properly installed.'
    )

session_info = create_session()
enable_ust_tracepoint_event(session_info, "ust_tests_fork*")
start_session(session_info)

fork_process = subprocess.Popen(
    [test_path + "fork", test_path + "fork2"],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)
parent_pid = -1
child_pid = -1
for line in fork_process.stdout:
    line = line.decode("utf-8").replace("\n", "")
    match = re.search(r"child_pid (\d+)", line)
    if match:
        child_pid = match.group(1)
    match = re.search(r"parent_pid (\d+)", line)
    if match:
        parent_pid = match.group(1)

fork_process.wait()

print_test_result(
    fork_process.returncode == 0, current_test, "Fork test application exited normally"
)
current_test += 1

stop_session(session_info)

# Check both events (normal exit and suicide messages) are present in the resulting trace
try:
    babeltrace_process = subprocess.Popen(
        [BABELTRACE_BIN, session_info.trace_path],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
except FileNotFoundError:
    bail("Could not open {}. Please make sure it is installed.".format(BABELTRACE_BIN))

event_lines = []
for event_line in babeltrace_process.stdout:
    event_line = event_line.decode("utf-8").replace("\n", "")
    if (
        re.search(r"warning", event_line) is not None
        or re.search(r"error", event_line) is not None
    ):
        print("# " + event_line)
    else:
        event_lines.append(event_line)

babeltrace_process.wait()

print_test_result(
    babeltrace_process.returncode == 0, current_test, "Resulting trace is readable"
)
current_test += 1

if babeltrace_process.returncode != 0:
    bail("Unreadable trace; can't proceed with analysis.", session_info)

event_before_fork = False
event_after_fork_parent = False
event_after_fork_child = False
event_after_exec = False

for event_line in event_lines:
    match = re.search(r".*pid = (\d+)", event_line)
    if match is not None:
        event_pid = match.group(1)
    else:
        continue

    if re.search(r"before_fork", event_line):
        event_before_fork = event_pid == parent_pid
    if re.search(r"after_fork_parent", event_line):
        event_after_fork_parent = event_pid == parent_pid
    if re.search(r"after_fork_child", event_line):
        event_after_fork_child = event_pid == child_pid
    if re.search(r"after_exec", event_line):
        event_after_exec = event_pid == child_pid

print_test_result(
    event_before_fork,
    current_test,
    "before_fork event logged by parent process found in trace",
)
current_test += 1
print_test_result(
    event_after_fork_parent,
    current_test,
    "after_fork_parent event logged by parent process found in trace",
)
current_test += 1
print_test_result(
    event_after_fork_child,
    current_test,
    "after_fork_child event logged by child process found in trace",
)
current_test += 1
print_test_result(
    event_after_exec,
    current_test,
    "after_exec event logged by child process found in trace",
)
current_test += 1

shutil.rmtree(session_info.tmp_directory)