1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
|
/* yarn.c -- generic thread operations implemented using pthread functions
* Copyright (C) 2008, 2012 Mark Adler
* Version 1.3 13 Jan 2012 Mark Adler
* For conditions of distribution and use, see copyright notice in yarn.h
*/
/* Basic thread operations implemented using the POSIX pthread library. All
pthread references are isolated within this module to allow alternate
implementations with other thread libraries. See yarn.h for the description
of these operations. */
/* Version history:
1.0 19 Oct 2008 First version
1.1 26 Oct 2008 No need to set the stack size -- remove
Add yarn_abort() function for clean-up on error exit
1.2 19 Dec 2011 (changes reversed in 1.3)
1.3 13 Jan 2012 Add large file #define for consistency with pigz.c
Update thread portability #defines per IEEE 1003.1-2008
Fix documentation in yarn.h for yarn_prefix
*/
/* for thread portability */
#define _XOPEN_SOURCE 700
#define _POSIX_C_SOURCE 200809L
#define _THREAD_SAFE
/* use large file functions if available */
#define _FILE_OFFSET_BITS 64
/* external libraries and entities referenced */
#include <stdio.h> /* fprintf(), stderr */
#include <stdlib.h> /* exit(), malloc(), free(), NULL */
#include <pthread.h> /* pthread_t, pthread_create(), pthread_join(), */
/* pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(),
PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(),
pthread_self(), pthread_equal(),
pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(),
pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(),
pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(),
pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy() */
#include <errno.h> /* ENOMEM, EAGAIN, EINVAL */
/* interface definition */
#include "yarn.h"
/* constants */
#define local static /* for non-exported functions and globals */
/* error handling external globals, resettable by application */
char *yarn_prefix = "yarn";
void (*yarn_abort)(int) = NULL;
/* immediately exit -- use for errors that shouldn't ever happen */
local void fail(int err)
{
fprintf(stderr, "%s: %s (%d) -- aborting\n", yarn_prefix,
err == ENOMEM ? "out of memory" : "internal pthread error", err);
if (yarn_abort != NULL)
yarn_abort(err);
exit(err == ENOMEM || err == EAGAIN ? err : EINVAL);
}
/* memory handling routines provided by user -- if none are provided, malloc()
and free() are used, which are therefore assumed to be thread-safe */
typedef void *(*malloc_t)(size_t);
typedef void (*free_t)(void *);
local malloc_t my_malloc_f = malloc;
local free_t my_free = free;
/* use user-supplied allocation routines instead of malloc() and free() */
void yarn_mem(malloc_t lease, free_t vacate)
{
my_malloc_f = lease;
my_free = vacate;
}
/* memory allocation that cannot fail (from the point of view of the caller) */
local void *my_malloc(size_t size)
{
void *block;
if ((block = my_malloc_f(size)) == NULL)
fail(ENOMEM);
return block;
}
/* -- lock functions -- */
struct lock_s {
pthread_mutex_t mutex;
pthread_cond_t cond;
long value;
};
lock *new_lock(long initial)
{
int ret;
lock *bolt;
bolt = my_malloc(sizeof(struct lock_s));
if ((ret = pthread_mutex_init(&(bolt->mutex), NULL)) ||
(ret = pthread_cond_init(&(bolt->cond), NULL)))
fail(ret);
bolt->value = initial;
return bolt;
}
void possess(lock *bolt)
{
int ret;
if ((ret = pthread_mutex_lock(&(bolt->mutex))) != 0)
fail(ret);
}
void release(lock *bolt)
{
int ret;
if ((ret = pthread_mutex_unlock(&(bolt->mutex))) != 0)
fail(ret);
}
void twist(lock *bolt, enum twist_op op, long val)
{
int ret;
if (op == TO)
bolt->value = val;
else if (op == BY)
bolt->value += val;
if ((ret = pthread_cond_broadcast(&(bolt->cond))) ||
(ret = pthread_mutex_unlock(&(bolt->mutex))))
fail(ret);
}
#define until(a) while(!(a))
void wait_for(lock *bolt, enum wait_op op, long val)
{
int ret;
switch (op) {
case TO_BE:
until (bolt->value == val)
if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
fail(ret);
break;
case NOT_TO_BE:
until (bolt->value != val)
if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
fail(ret);
break;
case TO_BE_MORE_THAN:
until (bolt->value > val)
if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
fail(ret);
break;
case TO_BE_LESS_THAN:
until (bolt->value < val)
if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
fail(ret);
}
}
long peek_lock(lock *bolt)
{
return bolt->value;
}
void free_lock(lock *bolt)
{
int ret;
if ((ret = pthread_cond_destroy(&(bolt->cond))) ||
(ret = pthread_mutex_destroy(&(bolt->mutex))))
fail(ret);
my_free(bolt);
}
/* -- thread functions (uses lock functions above) -- */
struct thread_s {
pthread_t id;
int done; /* true if this thread has exited */
thread *next; /* for list of all launched threads */
};
/* list of threads launched but not joined, count of threads exited but not
joined (incremented by ignition() just before exiting) */
local lock threads_lock = {
PTHREAD_MUTEX_INITIALIZER,
PTHREAD_COND_INITIALIZER,
0 /* number of threads exited but not joined */
};
local thread *threads = NULL; /* list of extant threads */
/* structure in which to pass the probe and its payload to ignition() */
struct capsule {
void (*probe)(void *);
void *payload;
};
/* mark the calling thread as done and alert join_all() */
local void reenter(void *dummy)
{
thread *match, **prior;
pthread_t me;
(void)dummy;
/* find this thread in the threads list by matching the thread id */
me = pthread_self();
possess(&(threads_lock));
prior = &(threads);
while ((match = *prior) != NULL) {
if (pthread_equal(match->id, me))
break;
prior = &(match->next);
}
if (match == NULL)
fail(EINVAL);
/* mark this thread as done and move it to the head of the list */
match->done = 1;
if (threads != match) {
*prior = match->next;
match->next = threads;
threads = match;
}
/* update the count of threads to be joined and alert join_all() */
twist(&(threads_lock), BY, +1);
}
/* all threads go through this routine so that just before the thread exits,
it marks itself as done in the threads list and alerts join_all() so that
the thread resources can be released -- use cleanup stack so that the
marking occurs even if the thread is cancelled */
local void *ignition(void *arg)
{
struct capsule *capsule = arg;
/* run reenter() before leaving */
pthread_cleanup_push(reenter, NULL);
/* execute the requested function with argument */
capsule->probe(capsule->payload);
my_free(capsule);
/* mark this thread as done and let join_all() know */
pthread_cleanup_pop(1);
/* exit thread */
return NULL;
}
/* not all POSIX implementations create threads as joinable by default, so that
is made explicit here */
thread *launch(void (*probe)(void *), void *payload)
{
int ret;
thread *th;
struct capsule *capsule;
pthread_attr_t attr;
/* construct the requested call and argument for the ignition() routine
(allocated instead of automatic so that we're sure this will still be
there when ignition() actually starts up -- ignition() will free this
allocation) */
capsule = my_malloc(sizeof(struct capsule));
capsule->probe = probe;
capsule->payload = payload;
/* assure this thread is in the list before join_all() or ignition() looks
for it */
possess(&(threads_lock));
/* create the thread and call ignition() from that thread */
th = my_malloc(sizeof(struct thread_s));
if ((ret = pthread_attr_init(&attr)) ||
(ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE)) ||
(ret = pthread_create(&(th->id), &attr, ignition, capsule)) ||
(ret = pthread_attr_destroy(&attr)))
fail(ret);
/* put the thread in the threads list for join_all() */
th->done = 0;
th->next = threads;
threads = th;
release(&(threads_lock));
return th;
}
void join(thread *ally)
{
int ret;
thread *match, **prior;
/* wait for thread to exit and return its resources */
if ((ret = pthread_join(ally->id, NULL)) != 0)
fail(ret);
/* find the thread in the threads list */
possess(&(threads_lock));
prior = &(threads);
while ((match = *prior) != NULL) {
if (match == ally)
break;
prior = &(match->next);
}
if (match == NULL)
fail(EINVAL);
/* remove thread from list and update exited count, free thread */
if (match->done)
threads_lock.value--;
*prior = match->next;
release(&(threads_lock));
my_free(ally);
}
/* This implementation of join_all() only attempts to join threads that have
announced that they have exited (see ignition()). When there are many
threads, this is faster than waiting for some random thread to exit while a
bunch of other threads have already exited. */
int join_all(void)
{
int ret, count;
thread *match, **prior;
/* grab the threads list and initialize the joined count */
count = 0;
possess(&(threads_lock));
/* do until threads list is empty */
while (threads != NULL) {
/* wait until at least one thread has reentered */
wait_for(&(threads_lock), NOT_TO_BE, 0);
/* find the first thread marked done (should be at or near the top) */
prior = &(threads);
while ((match = *prior) != NULL) {
if (match->done)
break;
prior = &(match->next);
}
if (match == NULL)
fail(EINVAL);
/* join the thread (will be almost immediate), remove from the threads
list, update the reenter count, and free the thread */
if ((ret = pthread_join(match->id, NULL)) != 0)
fail(ret);
threads_lock.value--;
*prior = match->next;
my_free(match);
count++;
}
/* let go of the threads list and return the number of threads joined */
release(&(threads_lock));
return count;
}
/* cancel and join the thread -- the thread will cancel when it gets to a file
operation, a sleep or pause, or a condition wait */
void destruct(thread *off_course)
{
int ret;
if ((ret = pthread_cancel(off_course->id)) != 0)
fail(ret);
join(off_course);
}
|