File: garbage.c

package info (click to toggle)
rtpengine 13.5.1.4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 13,676 kB
  • sloc: ansic: 86,775; perl: 59,422; python: 3,193; sh: 1,037; makefile: 687; asm: 211
file content (96 lines) | stat: -rw-r--r-- 2,449 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#include "garbage.h"
#include <glib.h>
#include <pthread.h>
#include "log.h"


typedef struct {
	void *ptr;
	void (*free_func)(void *);
	int *wait_threads;
	unsigned int array_len;
	unsigned int threads_left;
} garbage_t;


static pthread_mutex_t garbage_lock = PTHREAD_MUTEX_INITIALIZER;
static GQueue garbage = G_QUEUE_INIT;
static volatile int garbage_thread_num;


unsigned int garbage_new_thread_num(void) {
	return g_atomic_int_add(&garbage_thread_num, 1);
}


void garbage_add(void *ptr, free_func_t *free_func) {
	// Each running poller thread has a unique number associated with it, starting
	// with 0. A garbage entry uses an array of boolean flags, one for each running
	// thread, to keep track of which threads have seen this entry. Once a garbage
	// entry has been seen by all threads, the free function is finally called.
	// This is to make sure that all poller threads have left epoll_wait() after
	// an fd has been removed from the watch list.

	garbage_t *garb = g_new(garbage_t, 1);
	garb->ptr = ptr;
	garb->free_func = free_func;

	pthread_mutex_lock(&garbage_lock);

	garb->array_len = g_atomic_int_get(&garbage_thread_num);
	garb->threads_left = garb->array_len;
	garb->wait_threads = malloc(sizeof(int) * garb->array_len);
	memset(garb->wait_threads, 0, sizeof(int) * garb->array_len);

	g_queue_push_tail(&garbage, garb);

	pthread_mutex_unlock(&garbage_lock);
}


static void garbage_collect1(garbage_t *garb) {
	garb->free_func(garb->ptr);

	free(garb->wait_threads);
	g_free(garb);
}


void garbage_collect(unsigned int num) {
	dbg("running garbage collection thread %u", num);

restart:
	pthread_mutex_lock(&garbage_lock);

	for (GList *l = garbage.head; l; l = l->next) {
		garbage_t *garb = l->data;
		// has this been created before we were running?
		if (garb->array_len <= num)
			continue;
		// have we processed this already?
		if (garb->wait_threads[num])
			continue;
		dbg("marking garbage entry %p as seen by %u with %u threads left", garb, num,
				garb->threads_left);
		garb->wait_threads[num] = 1;
		garb->threads_left--;
		// anything left?
		if (!garb->threads_left) {
			// remove from list and process
			g_queue_delete_link(&garbage, l);
			pthread_mutex_unlock(&garbage_lock);
			garbage_collect1(garb);

			goto restart;
		}
	}

	pthread_mutex_unlock(&garbage_lock);
}


void garbage_collect_all(void) {
	garbage_t *garb;
	while ((garb = g_queue_pop_head(&garbage)))
		garbage_collect1(garb);
}