File: timerthread.c

package info (click to toggle)
rtpengine 13.5.1.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 13,676 kB
  • sloc: ansic: 86,764; perl: 59,422; python: 3,193; sh: 1,030; makefile: 693; asm: 211
file content (403 lines) | stat: -rw-r--r-- 10,802 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
#include "timerthread.h"

#include "helpers.h"
#include "log_funcs.h"
#include "poller.h"
#include "main.h"
#include "uring.h"


static int tt_obj_cmp(const void *a, const void *b) {
	const struct timerthread_obj *A = a, *B = b;
	// zero timevals go last
	if (A->next_check == 0 && B->next_check != 0)
		return 1;
	if (B->next_check == 0 && A->next_check != 0)
		return -1;
	// earlier timevals go first
	if (A->next_check < B->next_check)
		return -1;
	if (A->next_check > B->next_check)
		return 1;
	// rest sorted by pointer
	if (A < B)
		return -1;
	if (A > B)
		return 1;
	return 0;
}

static void timerthread_thread_init(struct timerthread_thread *tt, struct timerthread *parent) {
	tt->tree = g_tree_new(tt_obj_cmp);
	mutex_init(&tt->lock);
	cond_init(&tt->cond);
	tt->parent = parent;
	ZERO(tt->next_wake);
	tt->obj = NULL;
}

void timerthread_init(struct timerthread *tt, unsigned int num, void (*func)(void *)) {
	tt->func = func;
	tt->num_threads = num;
	tt->threads = g_malloc(sizeof(*tt->threads) * num);
	for (unsigned int i = 0; i < num; i++)
		timerthread_thread_init(&tt->threads[i], tt);
}

static int __tt_put_all(void *k, void *d, void *p) {
	struct timerthread_obj *tto = d;
	//struct timerthread *tt = p;
	obj_put(tto);
	return FALSE;
}

static void timerthread_thread_destroy(struct timerthread_thread *tt) {
	g_tree_foreach(tt->tree, __tt_put_all, tt);
	g_tree_destroy(tt->tree);
	if (tt->obj)
		obj_put(tt->obj);
	mutex_destroy(&tt->lock);
}

void timerthread_free(struct timerthread *tt) {
	for (unsigned int i = 0; i < tt->num_threads; i++)
		timerthread_thread_destroy(&tt->threads[i]);
	g_free(tt->threads);
}

static void timerthread_run(void *p) {
	struct timerthread_thread *tt = p;
	struct timerthread *parent = tt->parent;

	struct thread_waker waker = { .lock = &tt->lock, .cond = &tt->cond };
	thread_waker_add(&waker);

	int64_t accuracy = rtpe_config.timer_accuracy;

	mutex_lock(&tt->lock);

	while (!rtpe_shutdown) {
		rtpe_now = now_us();

		int64_t sleeptime = 10000000;
		// find the first element if we haven't determined it yet
		struct timerthread_obj *tt_obj = tt->obj;
		if (!tt_obj) {
			tt_obj = rtpe_g_tree_first(tt->tree);
			if (!tt_obj)
				goto sleep_now;

			// immediately steal reference
			// XXX ideally we would have a tree_steal_first() function
			g_tree_remove(tt->tree, tt_obj);
		}

		// scheduled to run? if not, then we remember this object/reference and go to sleep
		sleeptime = tt_obj->next_check - rtpe_now;

		if (sleeptime > accuracy) {
			tt->obj = tt_obj;
			goto sleep;
		}

		// pretend we're running exactly at the scheduled time
		rtpe_now = tt_obj->next_check;
		tt_obj->next_check = 0;
		tt_obj->last_run = rtpe_now;
		tt->next_wake = 0;
		tt->obj = NULL;
		mutex_unlock(&tt->lock);

		// run and release
		parent->func(tt_obj);
		obj_put(tt_obj);

		log_info_reset();
		uring_methods.thread_loop();

		mutex_lock(&tt->lock);
		continue;

sleep:
		/* figure out how long we should sleep */
		sleeptime = MIN(10000000, sleeptime);
sleep_now:;
		int64_t tv = rtpe_now;
		tv += sleeptime;
		tt->next_wake = tv;
		cond_timedwait(&tt->cond, &tt->lock, tv);
	}

	mutex_unlock(&tt->lock);
	thread_waker_del(&waker);
}

void timerthread_launch(struct timerthread *tt, const char *scheduler, int prio, const char *name) {
	for (unsigned int i = 0; i < tt->num_threads; i++)
		thread_create_detach_prio(timerthread_run, &tt->threads[i], scheduler, prio, name);
}

void timerthread_obj_schedule_abs_nl(struct timerthread_obj *tt_obj, int64_t tv) {
	if (!tt_obj)
		return;
	struct timerthread_thread *tt = tt_obj->thread;

	//ilog(LOG_DEBUG, "scheduling timer object at %llu.%06lu", (unsigned long long) tv->tv_sec,
			//(unsigned long) tv->tv_usec);

	if (tt_obj->next_check && tt_obj->next_check <= tv)
		return; /* already scheduled sooner */
	if (!g_tree_remove(tt->tree, tt_obj)) {
		if (tt->obj == tt_obj)
			tt->obj = NULL;
		else
			obj_hold(tt_obj); /* if it wasn't removed, we make a new reference */
	}
	tt_obj->next_check = tv;
	g_tree_insert(tt->tree, tt_obj, tt_obj);
	// need to wake the thread?
	if (tt->next_wake && tv < tt->next_wake) {
		// make sure we can get picked first: move pre-picked object back into tree
		if (tt->obj && tt->obj != tt_obj) {
			g_tree_insert(tt->tree, tt->obj, tt->obj);
			tt->obj = NULL;
		}
		cond_signal(&tt->cond);
	}
}

void timerthread_obj_deschedule(struct timerthread_obj *tt_obj) {
	if (!tt_obj)
		return;

	struct timerthread_thread *tt = tt_obj->thread;
	if (!tt)
		return;

	mutex_lock(&tt->lock);
	if (!tt_obj->next_check)
		goto nope; /* already descheduled */
	gboolean ret = g_tree_remove(tt->tree, tt_obj);
	if (!ret) {
		if (tt->obj == tt_obj) {
			tt->obj = NULL;
			ret = TRUE;
		}
	}
	ZERO(tt_obj->next_check);
	if (ret)
		obj_put(tt_obj);
nope:
	mutex_unlock(&tt->lock);
}

static int timerthread_queue_run_one(struct timerthread_queue *ttq,
		struct timerthread_queue_entry *ttqe,
		void (*run_func)(struct timerthread_queue *, void *)) {
	if (ttqe->when && ttqe->when > rtpe_now) {
		if (ttqe->when - rtpe_now > 1000) // not to queue packet less than 1ms
			return -1; // not yet
	}
	run_func(ttq, ttqe);
	return 0;
}


void timerthread_queue_run(void *ptr) {
	struct timerthread_queue *ttq = ptr;

	//ilog(LOG_DEBUG, "running timerthread_queue");

	int64_t next_send = 0;

	mutex_lock(&ttq->lock);

	while (g_tree_nnodes(ttq->entries)) {
		struct timerthread_queue_entry *ttqe = rtpe_g_tree_first(ttq->entries);
		assert(ttqe != NULL);
		g_tree_remove(ttq->entries, ttqe); // XXX use g_tree_remove_node if ever available

		mutex_unlock(&ttq->lock);

		int ret = timerthread_queue_run_one(ttq, ttqe, ttq->run_later_func);

		mutex_lock(&ttq->lock);

		if (!ret)
			continue;
		// couldn't send the last one. remember time to schedule
		g_tree_insert(ttq->entries, ttqe, ttqe);
		next_send = ttqe->when;
		break;
	}

	mutex_unlock(&ttq->lock);

	if (next_send)
		timerthread_obj_schedule_abs(&ttq->tt_obj, next_send);
}

static int ttqe_free_all(void *k, void *v, void *d) {
	struct timerthread_queue *ttq = d;
	if (ttq->entry_free_func)
		ttq->entry_free_func(k);
	return FALSE;
}

static void __timerthread_queue_free(void *p) {
	struct timerthread_queue *ttq = p;
	g_tree_foreach(ttq->entries, ttqe_free_all, ttq);
	g_tree_destroy(ttq->entries);
	mutex_destroy(&ttq->lock);
	if (ttq->free_func)
		ttq->free_func(p);
}

static int ttqe_compare(const void *a, const void *b) {
	const struct timerthread_queue_entry *t1 = a;
	const struct timerthread_queue_entry *t2 = b;
	// zero timevals go last
	if (t1->when == 0 && t2->when != 0)
		return 1;
	if (t2->when == 0 && t1->when != 0)
		return -1;
	// earlier timevals go first
	if (t1->when < t2->when)
		return -1;
	if (t1->when > t2->when)
		return 1;
	// rest sorted by index
	if (t1->idx < t2->idx)
		return -1;
	if (t1->idx > t2->idx)
		return 1;
	return 0;
}
 
void *timerthread_queue_new(const char *type, size_t size,
		struct timerthread *tt,
		void (*run_now_func)(struct timerthread_queue *, void *),
		void (*run_later_func)(struct timerthread_queue *, void *),
		void (*free_func)(void *),
		void (*entry_free_func)(void *))
{
	struct timerthread_queue *ttq = obj_alloc0_gen(type, size, __timerthread_queue_free);
	ttq->type = type;
	ttq->tt_obj.tt = tt;
	assert(tt->func == timerthread_queue_run);
	ttq->run_now_func = run_now_func;
	ttq->run_later_func = run_later_func;
	if (!ttq->run_later_func)
		ttq->run_later_func = run_now_func;
	ttq->free_func = free_func;
	ttq->entry_free_func = entry_free_func;
	mutex_init(&ttq->lock);
	ttq->entries = g_tree_new(ttqe_compare);
	return ttq;
}

int __ttqe_find_last_idx(const void *a, const void *b) {
	const struct timerthread_queue_entry *ttqe_a = a;
	void **data = (void **) b;
	const struct timerthread_queue_entry *ttqe_b = data[0];
	int ret = (ttqe_a->when < ttqe_b->when ? 1 : 0) + (ttqe_a->when > ttqe_b->when ? -1 : 0);
	if (ret)
		return ret;
	// same timestamp. track highest seen idx
	if (GPOINTER_TO_UINT(data[1]) < ttqe_a->idx)
		data[1] = GUINT_TO_POINTER(ttqe_a->idx);
	return 1; // and continue to higher idx
}
void timerthread_queue_push(struct timerthread_queue *ttq, struct timerthread_queue_entry *ttqe) {
	// can we send immediately?
	if (ttq->run_now_func && timerthread_queue_run_one(ttq, ttqe, ttq->run_now_func) == 0)
		return;

	// queue for sending

	//ilog(LOG_DEBUG, "queuing up %s object for processing at %lu.%06u",
			//ttq->type,
			//(unsigned long) ttqe->when.tv_sec,
			//(unsigned int) ttqe->when.tv_usec);

	// XXX recover log line fields
//	struct rtp_header *rh = (void *) cp->s.s;
//	ilog(LOG_DEBUG, "queuing up packet for delivery at %lu.%06u (RTP seq %u TS %u)",
//			(unsigned long) cp->to_send.tv_sec,
//			(unsigned int) cp->to_send.tv_usec,
//			ntohs(rh->seq_num),
//			ntohl(rh->timestamp));

	ttqe->idx = 0;

	mutex_lock(&ttq->lock);

	// check for most common case: no timestamp collision exists
	if (!g_tree_lookup(ttq->entries, ttqe))
		;
	else {
		// something else exists with the same timestamp. find the highest idx
		void *data[2];
		data[0] = ttqe;
		data[1] = 0;
		g_tree_search(ttq->entries, __ttqe_find_last_idx, data);
		ttqe->idx = GPOINTER_TO_UINT(data[1] + 1);
	}

	// this hands over ownership of cp, so we must copy the timeval out
	int64_t tv_send = ttqe->when;
	g_tree_insert(ttq->entries, ttqe, ttqe);
	struct timerthread_queue_entry *first_ttqe = rtpe_g_tree_first(ttq->entries);
	mutex_unlock(&ttq->lock);

	// first packet in? we're probably not scheduled yet
	if (first_ttqe == ttqe)
		timerthread_obj_schedule_abs(&ttq->tt_obj, tv_send);
}

static int ttqe_ptr_match(const void *ent, const void *ptr) {
	const struct timerthread_queue_entry *ttqe = ent;
	return ttqe->source == ptr;
}
unsigned int timerthread_queue_flush(struct timerthread_queue *ttq, void *ptr) {
	if (!ttq)
		return 0;

	mutex_lock(&ttq->lock);

	unsigned int num = 0;
	GQueue matches = G_QUEUE_INIT;
	rtpe_g_tree_find_all(&matches, ttq->entries, ttqe_ptr_match, ptr);

	while (matches.length) {
		struct timerthread_queue_entry *ttqe = g_queue_pop_head(&matches);
		g_tree_remove(ttq->entries, ttqe);
		if (ttq->entry_free_func)
			ttq->entry_free_func(ttqe);
		num++;
	}

	mutex_unlock(&ttq->lock);

	return num;
}

void timerthread_queue_flush_data(void *ptr) {
        struct timerthread_queue *ttq = ptr;

        //ilog(LOG_DEBUG, "timerthread_queue_flush_data");

        mutex_lock(&ttq->lock);
        while (g_tree_nnodes(ttq->entries)) {
                struct timerthread_queue_entry *ttqe = rtpe_g_tree_first(ttq->entries);
                assert(ttqe != NULL);
                g_tree_remove(ttq->entries, ttqe);

                mutex_unlock(&ttq->lock);

                ttq->run_later_func(ttq, ttqe);

                mutex_lock(&ttq->lock);
        }
        mutex_unlock(&ttq->lock);
}