File: cbus.c

package info (click to toggle)
tarantool 2.6.0-1.4
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 85,412 kB
  • sloc: ansic: 513,775; cpp: 69,493; sh: 25,650; python: 19,190; perl: 14,973; makefile: 4,178; yacc: 1,329; sql: 1,074; pascal: 620; ruby: 190; awk: 18; lisp: 7
file content (248 lines) | stat: -rw-r--r-- 6,544 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
#include "memory.h"
#include "fiber.h"
#include "cbus.h"
#include "unit.h"
#include "trigger.h"

/**
 * Test triggers on cpipe flush. Cpipe flush send all buffered
 * messages to a consumer. Flush is called either at the end of
 * an event loop, or when a messages queue is full. This event
 * can be used to make some prepare actions before flush.
 */

/** Counter of flush events. */
static int flushed_cnt = 0;
/** Expected value of flushed_cnt at the end of the test. */
static int expected_flushed_cnt = 0;

/**
 * Worker thread. In the test only one worker is started and the
 * main thread sends to it messages to trigger tests one by one.
 */
struct cord worker;
/** Queue of messages from the main to the worker thread. */
struct cpipe pipe_to_worker;
/** Queue of messages from the worker to the main thread. */
struct cpipe pipe_to_main;
/**
 * Trigger which is called on flush to the main thread event. Here
 * we test only this flush direction (from worker to main), becase
 * the direction from the main to the worker works in the same
 * way.
 */
struct trigger on_flush_to_main;

/** Common callbacks. {{{ ------------------------------------- */

/** Dummy callback to fill cmsg rotes with more hops. */
static void
do_nothing(struct cmsg *m)
{
	(void) m;
}

/** Callback called on each flush to the main thread. */
static int
flush_cb(struct trigger *t, void *e)
{
	(void) t;
	(void) e;
	++flushed_cnt;
	printf("flush event, counter = %d\n", flushed_cnt);
	return 0;
}

/** Callback to finish the test. It breaks the main event loop. */
static void
finish_execution(struct cmsg *m)
{
	(void) m;
	fiber_cancel(fiber());
	printf("break main fiber and finish test\n");
	is(flushed_cnt, expected_flushed_cnt,
	   "flushed_cnt at the end of the test");
}

/** }}} Common callbacks. ------------------------------------- */

/** Worker routines. {{{ -------------------------------------- */

static int
worker_f(va_list ap)
{
	(void) ap;
	cpipe_create(&pipe_to_main, "main");
	struct cbus_endpoint endpoint;
	cbus_endpoint_create(&endpoint, "worker", fiber_schedule_cb, fiber());
	cbus_loop(&endpoint);
	cbus_endpoint_destroy(&endpoint, cbus_process);
	cpipe_destroy(&pipe_to_main);
	return 0;
}

static void
worker_start()
{
	printf("start worker\n");
	fail_if(cord_costart(&worker, "worker", worker_f, NULL) != 0);
	cpipe_create(&pipe_to_worker, "worker");
}

static void
worker_stop()
{
	printf("finish worker\n");
	cbus_stop_loop(&pipe_to_worker);
	cpipe_destroy(&pipe_to_worker);
	fail_if(cord_join(&worker) != 0);
}

/** }}} Worker routines. -------------------------------------- */

/**
 * Test that if messages are not too many, the flush callback
 * is called only once per event loop, even if multiple flush
 * events are created. {{{ ---------------------------------------
 */
static void
do_forced_flush(struct cmsg *m)
{
	(void) m;
	static struct cmsg_hop forced_flush_rote = { do_nothing, NULL };
	static struct cmsg_hop finish_route = { finish_execution, NULL };
	static struct cmsg forced_flush_msg;
	static struct cmsg finish_msg;
	cmsg_init(&forced_flush_msg, &forced_flush_rote);
	cmsg_init(&finish_msg, &finish_route);
	cpipe_push(&pipe_to_main, &forced_flush_msg);
	cpipe_flush_input(&pipe_to_main);
	cpipe_push(&pipe_to_main, &finish_msg);
	expected_flushed_cnt = 1;
}

static void
test_forced_flush(struct cmsg *m)
{
	(void) m;
	is(flushed_cnt, 1, "1 flush after test_several_messages");
	printf("\n*** Test forced flush ***\n");
	flushed_cnt = 0;
	static struct cmsg_hop test_forced_flush_route =
		{ do_forced_flush, NULL };
	static struct cmsg test_forced_flush_msg;
	cmsg_init(&test_forced_flush_msg, &test_forced_flush_route);
	cpipe_push(&pipe_to_worker, &test_forced_flush_msg);
}

/** }}} Test forced flush. ------------------------------------ */

/**
 * Test that flush is called once per event loop event if several
 * messages was pushed. {{{ --------------------------------------
 */

/** Do some event and check flush to was not called. */
static void
do_some_event(struct cmsg *m)
{
	(void) m;
	is(flushed_cnt, 0, "no flush during loop");
}

/**
 * Create the following scenario for the worker:
 * do_some_event() -> do_some_event() -> do_nothing() -> flush().
 * Each do_some_event cheks, that flush was not called.
 */
static void
test_several_messages(struct cmsg *m)
{
	(void) m;
	is(flushed_cnt, 1, "1 flush after test_single_msg");
	printf("\n*** Test several messages ***\n");
	flushed_cnt = 0;
	static struct cmsg_hop test_event_route[] = {
		{ do_some_event, &pipe_to_main },
		{ do_nothing, NULL },
	};
	static struct cmsg_hop test_several_msg_route[] = {
		{ do_some_event, &pipe_to_main },
		{ test_forced_flush, NULL },
	};
	static struct cmsg test_event_msg[2];
	static struct cmsg test_several_msg;
	cmsg_init(&test_event_msg[0], test_event_route);
	cmsg_init(&test_event_msg[1], test_event_route);
	cmsg_init(&test_several_msg, test_several_msg_route);
	cpipe_push(&pipe_to_worker, &test_event_msg[0]);
	cpipe_push(&pipe_to_worker, &test_event_msg[1]);
	cpipe_push(&pipe_to_worker, &test_several_msg);
}

/** }}} Test several messages. -------------------------------- */

/**
 * Test that flush trigger works for a single message.
 * {{{ -----------------------------------------------------------
 */

static void
test_single_msg()
{
	printf("\n*** Test single message ***\n");
	static struct cmsg_hop test_single_flush_route[] = {
		{ do_nothing, &pipe_to_main },
		/* Schedule the next test. */
		{ test_several_messages, NULL },
	};
	static struct cmsg test_msg;
	cmsg_init(&test_msg, test_single_flush_route);
	cpipe_push(&pipe_to_worker, &test_msg);
}

/** }}} Test single message. ---------------------------------- */

static int
main_f(va_list ap)
{
	(void) ap;
	struct cbus_endpoint endpoint;
	cbus_endpoint_create(&endpoint, "main", fiber_schedule_cb, fiber());
	worker_start();
	trigger_create(&on_flush_to_main, flush_cb, NULL, NULL);
	trigger_add(&pipe_to_main.on_flush, &on_flush_to_main);

	test_single_msg();

	cbus_loop(&endpoint);
	worker_stop();
	cbus_endpoint_destroy(&endpoint, cbus_process);
	ev_break(loop(), EVBREAK_ALL);
	return 0;
}

int
main()
{
	header();
	plan(6);

	memory_init();
	fiber_init(fiber_c_invoke);
	cbus_init();
	printf("start main fiber\n");
	struct fiber *main_fiber = fiber_new("main", main_f);
	assert(main_fiber != NULL);
	fiber_wakeup(main_fiber);
	printf("start main loop\n");
	ev_run(loop(), 0);
	printf("finish main loop\n");
	cbus_free();
	fiber_free();
	memory_free();

	int rc = check_plan();
	footer();
	return rc;
}