File: threadset.c

package info (click to toggle)
vips 8.14.1-3%2Bdeb12u2
  • links: PTS
  • area: main
  • in suites: bookworm
  • size: 35,912 kB
  • sloc: ansic: 165,449; cpp: 10,987; python: 4,462; xml: 4,212; sh: 471; perl: 40; makefile: 23
file content (357 lines) | stat: -rw-r--r-- 8,051 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
/* A set of threads. 
 *
 * Creating and destroying threads can be expensive on some platforms, so we
 * try to only create once, then reuse.
 */

/*

    This file is part of VIPS.
    
    VIPS is free software; you can redistribute it and/or modify
    it under the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 2 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
    02110-1301  USA

 */

/*

    These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk

 */

#ifdef HAVE_CONFIG_H
#include <config.h>
#endif /*HAVE_CONFIG_H*/
#include <glib/gi18n-lib.h>

#include <stdio.h>
#include <stdlib.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /*HAVE_UNISTD_H*/
#include <errno.h>

/*
#define VIPS_DEBUG
 */

#include <vips/vips.h>
#include <vips/internal.h>
#include <vips/thread.h>
#include <vips/debug.h>

typedef struct _VipsThreadsetMember {
	/* The set we are part of.
	 */
	VipsThreadset *set;

	/* The underlying glib thread object.
	 */
	GThread *thread;

	/* The task the thread should run next.
	 */
	const char *domain;
	GFunc func; 
	void *data;
	void *user_data;

	/* The thread waits on this when it's free.
	 */
	VipsSemaphore idle;

	/* Set by our controller to request exit.
	 */
	gboolean kill;
} VipsThreadsetMember;

struct _VipsThreadset {
	GMutex *lock;

	/* All the VipsThreadsetMember we have created.
	 */
	GSList *members;

	/* The set of currently idle threads.
	 */
	GSList *free;

	/* The current number of threads, the highwater mark, and 
	 * the max we allow before blocking thread creation.
	 */
	int n_threads;
	int n_threads_highwater;
	int max_threads;
};

/* The maximum relative time (in microseconds) that a thread waits
 * for work before being stopped.
 */
static const gint64 max_idle_time = 15 * G_TIME_SPAN_SECOND;

/* The thread work function.
 */
static void *
vips_threadset_work( void *pointer )
{
	VipsThreadsetMember *member = (VipsThreadsetMember *) pointer;
	VipsThreadset *set = member->set;

	VIPS_DEBUG_MSG( "vips_threadset_work: starting %p\n", member );

	for(;;) {
		/* Wait for at least 15 seconds to be given work.
		 */
		if( vips_semaphore_down_timeout( &member->idle, 
			max_idle_time ) == -1 )
			break;

		/* Killed or no task available? Leave this thread.
		 */
		if( member->kill ||
			!member->func ) 
			break;

		/* If we're profiling, attach a prof struct to this thread.
		 */
		if( vips__thread_profile ) 
			vips__thread_profile_attach( member->domain );

		/* Execute the task.
		 */
		member->func( member->data, member->user_data );

		/* Free any thread-private resources -- they will not be
		 * useful for the next task to use this thread.
		 */
		vips_thread_shutdown();

		member->domain = NULL;
		member->func = NULL;
		member->data = NULL;
		member->user_data = NULL;

		/* We are free ... back on the free list!
		 */
		g_mutex_lock( set->lock );
		set->free = g_slist_prepend( set->free, member );
		g_mutex_unlock( set->lock );
	}

	/* Timed-out or kill has been requested ... remove from both free
	 * and member list.
	 */
	g_mutex_lock( set->lock );
	set->free = g_slist_remove( set->free, member );
	set->members = g_slist_remove( set->members, member );
	set->n_threads -= 1;
	VIPS_DEBUG_MSG( "vips_threadset_work: stopping %p (%d remaining)\n", 
		member, set->n_threads );
	g_mutex_unlock( set->lock );

	vips_semaphore_destroy( &member->idle );

	VIPS_FREE( member );

	return( NULL );
}

/* Create a new idle member for the set.
 */
static VipsThreadsetMember *
vips_threadset_add( VipsThreadset *set )
{
	VipsThreadsetMember *member;

	if( set->max_threads &&
		set->n_threads >= set->max_threads ) {
		vips_error( "VipsThreadset", 
			"%s", _( "threadset is exhausted" ) );
		return( NULL );
	}

	member = g_new0( VipsThreadsetMember, 1 );
	member->set = set;

	vips_semaphore_init( &member->idle, 0, "idle" );

	if( !(member->thread = vips_g_thread_new( "libvips worker", 
		vips_threadset_work, member )) ) {
		vips_semaphore_destroy( &member->idle );
		VIPS_FREE( member );

		return( NULL );
	}

	/* Ensure idle threads are freed on exit, this
	 * ref is increased before the thread is joined.
	 */
	g_thread_unref( member->thread );

	g_mutex_lock( set->lock );
	set->members = g_slist_prepend( set->members, member );
	set->n_threads += 1;
	set->n_threads_highwater = 
		VIPS_MAX( set->n_threads_highwater, set->n_threads );
	g_mutex_unlock( set->lock );

	return( member );
}

/** 
 * vips_threadset_new:
 * @max_threads: maxium number of system threads
 *
 * Create a new threadset. 
 *
 * If @max_threads is 0, new threads will be created when necessary by
 * vips_threadset_run(), with no limit on the number of threads.
 *
 * If @max_threads is > 0, then that many threads will be created by
 * vips_threadset_new() during startup and vips_threadset_run() will fail if
 * no free threads are available.
 *
 * Returns: the new threadset.
 */
VipsThreadset *
vips_threadset_new( int max_threads )
{
	VipsThreadset *set;

	set = g_new0( VipsThreadset, 1 );
	set->lock = vips_g_mutex_new();
	set->max_threads = max_threads;

	if( set->max_threads > 0 )
		for( int i = 0; i < set->max_threads; i++ ) {
			VipsThreadsetMember *member;

			if( !(member = vips_threadset_add( set )) ) {
				vips_threadset_free( set );
				return( NULL );
			}

			set->free = g_slist_prepend( set->free, member );
		}

	return( set );
}

/**
 * vips_threadset_run: 
 * @set: the threadset to run the task in
 * @domain: the name of the task (useful for debugging)
 * @func: the task to execute
 * @data: the task's data
 *
 * Execute a task in a thread. If there are no idle threads, create a new one,
 * provided we are under @max_threads.
 *
 * See also: vips_threadset_new().
 *
 * Returns: 0 on success, or -1 on error.
 */
int
vips_threadset_run( VipsThreadset *set, 
	const char *domain, GFunc func, gpointer data )
{
	VipsThreadsetMember *member;

	member = NULL;

	/* Try to get an idle thread.
	 */
	g_mutex_lock( set->lock );
	if( set->free ) {
		member = (VipsThreadsetMember *) set->free->data;
		set->free = g_slist_remove( set->free, member );
	}
	g_mutex_unlock( set->lock );

	/* None? Make a new idle but not free member.
	 */
	if( !member )
		member = vips_threadset_add( set );

	/* Still nothing? Thread create has failed.
	 */
	if( !member )
		return( -1 );

	/* Allocate the task and set it going.
	 */
	member->domain = domain;
	member->func = func;
	member->data = data;
	member->user_data = NULL;
	vips_semaphore_up( &member->idle );

	return( 0 );
}

/* Kill a member.
 */
static void
vips_threadset_kill_member( VipsThreadsetMember *member )
{
	GThread *thread;

	thread = g_thread_ref( member->thread );
	member->kill = TRUE;

	vips_semaphore_up( &member->idle );

	(void) g_thread_join( thread );

	/* member is freed on thread exit.
	 */
}

/** 
 * vips_threadset_free:
 * @set: the threadset to free
 *
 * Free a threadset. This call will block until all pending tasks are
 * finished.
 */
void
vips_threadset_free( VipsThreadset *set )
{
	VIPS_DEBUG_MSG( "vips_threadset_free: %p\n", set );

	/* Try to get and finish a thread.
	 */
	for(;;) {
		VipsThreadsetMember *member;

		member = NULL;
		g_mutex_lock( set->lock );
		if( set->members )
			member = (VipsThreadsetMember *) set->members->data;
		g_mutex_unlock( set->lock );

		if( !member )
			break;

		vips_threadset_kill_member( member );
	}

	if( vips__leak )
		printf( "vips_threadset_free: peak of %d threads\n", 
			set->n_threads_highwater );

	VIPS_FREEF( vips_g_mutex_free, set->lock );
	VIPS_FREE( set );
}