1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
|
/* SPDX-License-Identifier: LGPL-3.0-or-later */
/*
* threadpool.c
*
* Copyright (C) 2021 David Oberhollenzer <goliath@infraroot.at>
*/
#include "config.h"
#include "util/threadpool.h"
#include "util/test.h"
#if defined(_WIN32) || defined(__WINDOWS__)
#define WIN32_LEAN_AND_MEAN
#define VC_EXTRALEAN
#include <windows.h>
static CRITICAL_SECTION mutex;
static unsigned int ticket;
static void ticket_init(void)
{
InitializeCriticalSection(&mutex);
ticket = 0;
}
static void ticket_cleanup(void)
{
DeleteCriticalSection(&mutex);
ticket = 0;
}
static void ticket_wait(unsigned int value)
{
for (;;) {
EnterCriticalSection(&mutex);
if (value == ticket) {
ticket += 1;
LeaveCriticalSection(&mutex);
break;
}
LeaveCriticalSection(&mutex);
SwitchToThread();
}
}
#elif defined(HAVE_PTHREAD)
#include <pthread.h>
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static unsigned int ticket;
static void ticket_init(void)
{
ticket = 0;
}
static void ticket_cleanup(void)
{
}
static void ticket_wait(unsigned int value)
{
for (;;) {
pthread_mutex_lock(&mutex);
if (value == ticket) {
ticket += 1;
pthread_mutex_unlock(&mutex);
break;
}
pthread_mutex_unlock(&mutex);
sched_yield();
}
}
#else
static void ticket_init(void)
{
}
static void ticket_cleanup(void)
{
}
static void ticket_wait(unsigned int value)
{
(void)value;
}
#endif
static int worker(void *user, void *work_item)
{
unsigned int value = *((unsigned int *)work_item);
(void)user;
ticket_wait(value);
*((unsigned int *)work_item) = 42;
return 0;
}
static int worker_serial(void *user, void *work_item)
{
(void)user;
*((unsigned int *)work_item) = 42;
return 0;
}
static void test_case(thread_pool_t *pool)
{
unsigned int values[10];
unsigned int *ptr;
size_t i, count;
int ret;
/* must return a sane value */
count = pool->get_worker_count(pool);
TEST_ASSERT(count >= 1);
/* dequeue on empty pool MUST NOT lock up */
ptr = pool->dequeue(pool);
TEST_NULL(ptr);
/* submit work items in reverse order */
ticket_init();
for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) {
values[i] = (sizeof(values) / sizeof(values[0]) - 1) - i;
ret = pool->submit(pool, values + i);
TEST_EQUAL_I(ret, 0);
}
/* items must dequeue in the same order */
for (i = 0; i < sizeof(values) / sizeof(values[0]); ++i) {
ptr = pool->dequeue(pool);
TEST_NOT_NULL(ptr);
TEST_ASSERT(ptr == (values + i));
TEST_EQUAL_UI(*ptr, 42);
}
ticket_cleanup();
/* queue now empty */
ptr = pool->dequeue(pool);
TEST_NULL(ptr);
}
int main(int argc, char **argv)
{
thread_pool_t *pool;
(void)argc; (void)argv;
/* test the actual parallel implementation */
pool = thread_pool_create(10, worker);
TEST_NOT_NULL(pool);
test_case(pool);
pool->destroy(pool);
/* repeate the test with the serial reference implementation */
pool = thread_pool_create_serial(worker_serial);
TEST_NOT_NULL(pool);
test_case(pool);
pool->destroy(pool);
return EXIT_SUCCESS;
}
|