File: threadset.c

package info (click to toggle)
vips 8.16.1-2
  • links: PTS
  • area: main
  • in suites: sid
  • size: 51,940 kB
  • sloc: ansic: 169,179; cpp: 11,890; python: 4,620; xml: 4,353; sh: 732; perl: 40; makefile: 19
file content (371 lines) | stat: -rw-r--r-- 8,367 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
/* A set of threads.
 *
 * Creating and destroying threads can be expensive on some platforms, so we
 * try to only create once, then reuse.
 */

/*

	This file is part of VIPS.

	VIPS is free software; you can redistribute it and/or modify
	it under the terms of the GNU Lesser General Public License as published by
	the Free Software Foundation; either version 2 of the License, or
	(at your option) any later version.

	This program is distributed in the hope that it will be useful,
	but WITHOUT ANY WARRANTY; without even the implied warranty of
	MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
	GNU Lesser General Public License for more details.

	You should have received a copy of the GNU Lesser General Public License
	along with this program; if not, write to the Free Software
	Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
	02110-1301  USA

 */

/*

	These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk

 */

#ifdef HAVE_CONFIG_H
#include <config.h>
#endif /*HAVE_CONFIG_H*/
#include <glib/gi18n-lib.h>

#include <stdio.h>
#include <stdlib.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /*HAVE_UNISTD_H*/
#include <errno.h>

/*
#define VIPS_DEBUG
 */

#include <vips/vips.h>
#include <vips/internal.h>
#include <vips/thread.h>
#include <vips/debug.h>

typedef struct _VipsThreadExec {
	/* The source of this function.
	 */
	const char *domain;

	/* The function to execute within the thread.
	 */
	GFunc func;

	/* User data that is handed over to func when it is called.
	 */
	gpointer data;
} VipsThreadExec;

struct _VipsThreadset {
	/* An asynchronous queue of tasks.
	 */
	GAsyncQueue *queue;

	/* Idle threads wait on this semaphore.
	 */
	VipsSemaphore idle;

	/* The current number of (idle-)threads, the highwater mark,
	 * and the max we allow before blocking thread creation.
	 */
	int n_threads;
	int n_threads_highwater;
	int n_idle_threads;
	int max_threads;

	/* Set by our controller to request exit.
	 */
	gboolean exit;
};

/* The maximum relative time (in microseconds) that a thread waits
 * for work before being stopped.
 */
static const gint64 max_idle_time = 15 * G_TIME_SPAN_SECOND;

/* The maximum number of idle threads.
 */
static const int max_idle_threads = 2;

static gboolean
vips_threadset_reuse_wait(VipsThreadset *set)
{
	int result;

	/* A superfluous thread? Leave this thread.
	 */
	if (++set->n_idle_threads > max_idle_threads)
		return FALSE;

	g_async_queue_unlock(set->queue);

	/* Wait for at least 15 seconds before leaving this thread.
	 */
	result = vips_semaphore_down_timeout(&set->idle, max_idle_time);

	g_async_queue_lock(set->queue);

	return result != -1;
}

static void
vips_threadset_free_internal(VipsThreadset *set)
{
	VIPS_FREEF(g_async_queue_unref, set->queue);
	vips_semaphore_destroy(&set->idle);
	VIPS_FREE(set);
}

/* The thread work function.
 */
static void *
vips_threadset_work(void *pointer)
{
	VipsThreadset *set = (VipsThreadset *) pointer;
	gboolean cleanup = FALSE;

	VIPS_DEBUG_MSG("vips_threadset_work: starting %p\n", g_thread_self());

	g_async_queue_lock(set->queue);

	for (;;) {
		/* Pop a task from the queue. If the number of threads is limited,
		 * this will block until a task becomes available. Otherwise, it
		 * waits for at least 1/2 second before being marked as idle.
		 */
		VipsThreadExec *task = set->max_threads > 0
			? g_async_queue_pop_unlocked(set->queue)
			: g_async_queue_timeout_pop_unlocked(set->queue,
				  G_USEC_PER_SEC / 2);

		/* Request to exit? Leave this thread.
		 */
		if (set->exit) {
			/* The last thread should cleanup the set.
			 */
			cleanup = set->n_threads == 1;
			break;
		}

		/* No task available? Wait for being reused.
		 */
		if (task == NULL) {
			if (!vips_threadset_reuse_wait(set)) {
				set->n_idle_threads--;
				break;
			}

			continue;
		}

		/* A task was received and there was no request to exit.
		 */
		g_async_queue_unlock(set->queue);

		/* If we're profiling, attach a prof struct to this thread.
		 */
		if (vips__thread_profile)
			vips__thread_profile_attach(task->domain);

		/* Execute the task.
		 */
		task->func(task->data, NULL);

		/* Free any thread-private resources -- they will not be
		 * useful for the next task to use this thread.
		 */
		vips_thread_shutdown();
		VIPS_FREE(task);

		g_async_queue_lock(set->queue);
	}

	/* Timed-out or exit has been requested, decrement number of threads.
	 */
	set->n_threads--;
	VIPS_DEBUG_MSG(
		"vips_threadset_work: stopping %p (%d remaining, %d idle)\n",
		g_thread_self(), set->n_threads, set->n_idle_threads);

	g_async_queue_unlock(set->queue);

	if (cleanup)
		vips_threadset_free_internal(set);

	return NULL;
}

/* Add a new thread to the set.
 */
static gboolean
vips_threadset_add_thread(VipsThreadset *set)
{
	gboolean reused = FALSE;

	/* There are already sufficient threads running.
	 */
	if (set->max_threads > 0 &&
		set->n_threads >= set->max_threads)
		return TRUE;

	if (set->n_idle_threads > 0) {
		vips_semaphore_up(&set->idle);

		set->n_idle_threads--;
		reused = TRUE;
	}

	if (!reused) {
		/* No idle thread was found, we have to start a new one.
		 */
		GThread *thread;

		if (!(thread = vips_g_thread_new("libvips worker",
				  vips_threadset_work, set)))
			return FALSE;

		/* Ensure threads are freed on exit.
		 */
		g_thread_unref(thread);

		set->n_threads++;
		set->n_threads_highwater =
			VIPS_MAX(set->n_threads_highwater, set->n_threads);
	}

	return TRUE;
}

/**
 * vips_threadset_new:
 * @max_threads: maximum number of system threads
 *
 * Create a new threadset.
 *
 * If @max_threads is 0, new threads will be created when necessary by
 * vips_threadset_run(), with no limit on the number of threads.
 *
 * If @max_threads is > 0, then that many threads will be created by
 * vips_threadset_new() during startup and vips_threadset_run() will
 * not spawn any additional threads.
 *
 * Returns: the new threadset.
 */
VipsThreadset *
vips_threadset_new(int max_threads)
{
	VipsThreadset *set;

	set = g_new0(VipsThreadset, 1);
	set->queue = g_async_queue_new();
	vips_semaphore_init(&set->idle, 0, "idle");
	set->max_threads = max_threads;

	if (set->max_threads > 0)
		for (int i = 0; i < set->max_threads; i++) {
			if (!vips_threadset_add_thread(set)) {
				vips_threadset_free(set);
				return NULL;
			}
		}

	return set;
}

/**
 * vips_threadset_run:
 * @set: the threadset to run the task in
 * @domain: the name of the task (useful for debugging)
 * @func: the task to execute
 * @data: the task's data
 *
 * Execute a task in a thread. If there are no idle threads and the maximum
 * thread limit specified by @max_threads has not been reached, a new thread
 * will be spawned.
 *
 * See also: vips_threadset_new().
 *
 * Returns: 0 on success, or -1 on error.
 */
int
vips_threadset_run(VipsThreadset *set,
	const char *domain, GFunc func, gpointer data)
{
	VipsThreadExec *task;

	g_async_queue_lock(set->queue);

	/* Create a new thread if there are no waiting threads in the queue.
	 */
	if (g_async_queue_length_unlocked(set->queue) >= 0)
		if (!vips_threadset_add_thread(set)) {
			g_async_queue_unlock(set->queue);

			/* Thread create has failed.
			 */
			return -1;
		}

	/* Allocate the task and push it into the queue.
	 */
	task = g_new0(VipsThreadExec, 1);
	task->domain = domain;
	task->func = func;
	task->data = data;

	g_async_queue_push_unlocked(set->queue, task);
	g_async_queue_unlock(set->queue);

	return 0;
}

/**
 * vips_threadset_free:
 * @set: the threadset to free
 *
 * Free a threadset. This call returns immediately.
 */
void
vips_threadset_free(VipsThreadset *set)
{
	VIPS_DEBUG_MSG("vips_threadset_free: %p\n", set);

	g_async_queue_lock(set->queue);

	if (vips__leak)
		printf("vips_threadset_free: peak of %d threads\n",
			set->n_threads_highwater);

	set->exit = TRUE;

	/* No threads left, we cleanup.
	 */
	if (set->n_threads == 0) {
		g_async_queue_unlock(set->queue);
		vips_threadset_free_internal(set);
		return;
	}

	/* Wake up idle threads, if any.
	 */
	if (set->n_idle_threads > 0)
		vips_semaphore_upn(&set->idle, set->n_idle_threads);

	/* Send dummy data to the queue, causing threads to wake up and check
	 * the above set->exit condition.
	 */
	for (int i = 0; i < set->n_threads; i++)
		g_async_queue_push_unlocked(set->queue, GUINT_TO_POINTER(1));

	g_async_queue_unlock(set->queue);
}