1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
#include <assert.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
pthread_t start_worker_thread(void *(*start_routine)(void *))
{
pthread_t worker_thread;
const pthread_attr_t *const attributes = NULL;
void *const worker_argument = NULL;
const int create_status =
pthread_create(&worker_thread, attributes, start_routine, worker_argument);
assert(create_status == 0);
return worker_thread;
}
void join_thread(const pthread_t thread)
{
const int join_status = pthread_join(thread, NULL);
assert(join_status == 0);
}
sem_t create_semaphore(int initial_value)
{
const int shared_between_processes = false;
sem_t semaphore;
const int init_error =
sem_init(&semaphore, shared_between_processes, initial_value);
assert(init_error == false);
return semaphore;
}
void destroy_semaphore(sem_t *const semaphore)
{
int destroy_error = sem_destroy(semaphore);
assert(destroy_error == false);
}
// This blocking queue structure supports waiting for free space before
// enqueuing and waiting for data before dequeuing, using a pair of semaphores.
// These functions do not employ a mutex, so a data race may occur if multiple
// threads attempt to enqueue at the same time or multiple threads attempt to
// dequeue at the same time. The expected valid use case is with two threads,
// where one of the two enqueues and the other thread dequeues.
struct blocking_queuet
{
size_t size;
int *elements;
size_t begin;
size_t end;
sem_t free_space;
sem_t data_waiting;
};
struct blocking_queuet initialise_blocking_queue(const size_t size)
{
printf("init queue begin\n");
struct blocking_queuet queue;
queue.size = size;
queue.elements = calloc(size, sizeof(int));
queue.begin = 0;
queue.end = 0;
queue.free_space = create_semaphore(size);
queue.data_waiting = create_semaphore(0);
printf("init queue end\n");
return queue;
}
void free_blocking_queue(struct blocking_queuet *queue)
{
free(queue->elements);
queue->elements = NULL;
destroy_semaphore(&queue->free_space);
destroy_semaphore(&queue->data_waiting);
}
void enqueue(struct blocking_queuet *queue, const int data)
{
printf(
"enqueue begin:%lu end:%lu data:%d \n", queue->begin, queue->end, data);
const int free_wait_error = sem_wait(&queue->free_space);
assert(free_wait_error == false);
queue->elements[queue->end] = data;
if(++queue->end == queue->size)
{
queue->end = 0;
}
const int post_error = sem_post(&queue->data_waiting);
assert(post_error == false);
printf("enqueue done\n");
}
int dequeue(struct blocking_queuet *queue)
{
printf("dequeue begin:%lu end:%lu\n", queue->begin, queue->end);
const int data_wait_error = sem_wait(&queue->data_waiting);
assert(data_wait_error == false);
int result = queue->elements[queue->begin];
if(++queue->begin == queue->size)
{
queue->begin = 0;
}
const int free_error = sem_post(&queue->free_space);
assert(free_error == false);
printf("dequeue done. data:%d \n", result);
return result;
}
struct blocking_queuet input_queue;
struct blocking_queuet output_queue;
void *worker(void *arguments)
{
int data;
while(data = dequeue(&input_queue))
{
enqueue(&output_queue, data * data);
}
pthread_exit(NULL);
}
int main(void)
{
input_queue = initialise_blocking_queue(3);
output_queue = initialise_blocking_queue(10);
const pthread_t worker_thread1 = start_worker_thread(&worker);
printf("worker_started\n");
enqueue(&input_queue, 1);
enqueue(&input_queue, 2);
enqueue(&input_queue, 3);
enqueue(&input_queue, 4);
enqueue(&input_queue, 5);
enqueue(&input_queue, 6);
enqueue(&input_queue, 7);
enqueue(&input_queue, 8);
enqueue(&input_queue, 9);
enqueue(&input_queue, 0);
join_thread(worker_thread1);
free_blocking_queue(&input_queue);
assert(dequeue(&output_queue) == 1);
assert(dequeue(&output_queue) == 4);
assert(dequeue(&output_queue) == 9);
assert(dequeue(&output_queue) == 16);
assert(dequeue(&output_queue) == 25);
assert(dequeue(&output_queue) == 36);
assert(dequeue(&output_queue) == 49);
assert(dequeue(&output_queue) == 64);
assert(dequeue(&output_queue) == 81);
free_blocking_queue(&output_queue);
return EXIT_SUCCESS;
}
|