File: consumer_testpoints.cpp

package info (click to toggle)
ltt-control 2.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 21,860 kB
  • sloc: cpp: 192,012; sh: 28,777; ansic: 10,960; python: 7,108; makefile: 3,520; java: 109; xml: 46
file content (144 lines) | stat: -rw-r--r-- 3,610 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/*
 * SPDX-FileCopyrightText: 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
 *
 * SPDX-License-Identifier: GPL-2.0-only
 *
 */

#include <common/compat/getenv.hpp>
#include <common/consumer/consumer.hpp>
#include <common/error.hpp>
#include <common/pipe.hpp>

#include <lttng/constant.h>
#include <lttng/lttng-export.h>

#include <dlfcn.h>
#include <fcntl.h>
#include <stdbool.h>
#include <stdio.h>
#include <unistd.h>

static char *pause_pipe_path;
static struct lttng_pipe *pause_pipe;
static int *data_consumption_state;
using lttng_consumer_get_type_func = enum lttng_consumer_type (*)();
static lttng_consumer_get_type_func lttng_consumer_get_type;

int lttng_opt_verbose;

static void __attribute__((destructor)) pause_pipe_fini()
{
	int ret;

	if (pause_pipe_path) {
		ret = unlink(pause_pipe_path);
		if (ret) {
			PERROR("unlink pause pipe");
		}
	}

	free(pause_pipe_path);
	lttng_pipe_destroy(pause_pipe);
}

/*
 * We use this testpoint, invoked at the start of the consumerd's data handling
 * thread to create a named pipe/FIFO which a test application can use to either
 * pause or resume the consumption of data.
 */
extern "C" LTTNG_EXPORT int __testpoint_consumerd_thread_data(void);
int __testpoint_consumerd_thread_data(void)
{
	int ret = 0;
	const char *pause_pipe_path_prefix, *domain;

	/*
	 * lttng_opt_verbose does not refer to the same one as loaded in the consumerd.
	 * For the debug mode for this TU, as it's only used in testing.
	 */
	lttng_opt_verbose = 3;

	pause_pipe_path_prefix = lttng_secure_getenv("CONSUMER_PAUSE_PIPE_PATH");
	if (!pause_pipe_path_prefix) {
		ret = -1;
		goto end;
	}

	/*
	 * These symbols are exclusive to the consumerd process, hence we can't
	 * rely on their presence in the sessiond. Not looking-up these symbols
	 * dynamically would not allow this shared object to be LD_PRELOAD-ed
	 * when launching the session daemon.
	 */
	data_consumption_state = (int *) dlsym(nullptr, "data_consumption_paused");
	LTTNG_ASSERT(data_consumption_state);
	lttng_consumer_get_type =
		(lttng_consumer_type(*)()) dlsym(nullptr, "lttng_consumer_get_type");
	LTTNG_ASSERT(lttng_consumer_get_type);

	switch (lttng_consumer_get_type()) {
	case LTTNG_CONSUMER_KERNEL:
		domain = "kernel";
		break;
	case LTTNG_CONSUMER32_UST:
		domain = "ust32";
		break;
	case LTTNG_CONSUMER64_UST:
		domain = "ust64";
		break;
	default:
		abort();
	}

	ret = asprintf(&pause_pipe_path, "%s-%s", pause_pipe_path_prefix, domain);
	if (ret < 1) {
		ERR("Failed to allocate pause pipe path");
		goto end;
	}

	DBG("Creating pause pipe at %s", pause_pipe_path);
	pause_pipe = lttng_pipe_named_open(
		pause_pipe_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP, O_NONBLOCK);
	if (!pause_pipe) {
		ERR("Failed to create pause pipe at %s", pause_pipe_path);
		ret = -1;
		goto end;
	}

	/* Only the read end of the pipe is useful to us. */
	ret = lttng_pipe_write_close(pause_pipe);
end:
	return ret;
}

extern "C" LTTNG_EXPORT int __testpoint_consumerd_thread_data_poll(void);
int __testpoint_consumerd_thread_data_poll(void)
{
	int ret = 0;
	uint8_t value;
	bool value_read = false;

	if (!pause_pipe) {
		ret = -1;
		goto end;
	}

	/* Purge pipe and only consider the freshest value. */
	do {
		ret = lttng_pipe_read(pause_pipe, &value, sizeof(value));
		if (ret == sizeof(value)) {
			value_read = true;
		}
	} while (ret == sizeof(value));

	ret = (errno == EAGAIN) ? 0 : -errno;

	if (value_read) {
		*data_consumption_state = !!value;
		DBG("Message received on pause pipe: %s data consumption",
		    *data_consumption_state ? "paused" : "resumed");
	}
end:
	return ret;
}