File: blocking_queue.c

package info (click to toggle)
cbmc 6.6.0-4
  • links: PTS
  • area: main
  • in suites: forky, sid, trixie
  • size: 153,852 kB
  • sloc: cpp: 386,459; ansic: 114,466; java: 28,405; python: 6,003; yacc: 4,552; makefile: 4,041; lex: 2,487; xml: 2,388; sh: 2,050; perl: 557; pascal: 184; javascript: 163; ada: 36
file content (153 lines) | stat: -rw-r--r-- 4,280 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#include <assert.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>

pthread_t start_worker_thread(void *(*start_routine)(void *))
{
  pthread_t worker_thread;
  const pthread_attr_t *const attributes = NULL;
  void *const worker_argument = NULL;
  const int create_status =
    pthread_create(&worker_thread, attributes, start_routine, worker_argument);
  assert(create_status == 0);
  return worker_thread;
}

void join_thread(const pthread_t thread)
{
  const int join_status = pthread_join(thread, NULL);
  assert(join_status == 0);
}

sem_t create_semaphore(int initial_value)
{
  const int shared_between_processes = false;
  sem_t semaphore;
  const int init_error =
    sem_init(&semaphore, shared_between_processes, initial_value);
  assert(init_error == false);
  return semaphore;
}

void destroy_semaphore(sem_t *const semaphore)
{
  int destroy_error = sem_destroy(semaphore);
  assert(destroy_error == false);
}

// This blocking queue structure supports waiting for free space before
// enqueuing and waiting for data before dequeuing, using a pair of semaphores.
// These functions do not employ a mutex, so a data race may occur if multiple
// threads attempt to enqueue at the same time or multiple threads attempt to
// dequeue at the same time. The expected valid use case is with two threads,
// where one of the two enqueues and the other thread dequeues.
struct blocking_queuet
{
  size_t size;
  int *elements;
  size_t begin;
  size_t end;
  sem_t free_space;
  sem_t data_waiting;
};

struct blocking_queuet initialise_blocking_queue(const size_t size)
{
  printf("init queue begin\n");
  struct blocking_queuet queue;
  queue.size = size;
  queue.elements = calloc(size, sizeof(int));
  queue.begin = 0;
  queue.end = 0;
  queue.free_space = create_semaphore(size);
  queue.data_waiting = create_semaphore(0);
  printf("init queue end\n");
  return queue;
}

void free_blocking_queue(struct blocking_queuet *queue)
{
  free(queue->elements);
  queue->elements = NULL;
  destroy_semaphore(&queue->free_space);
  destroy_semaphore(&queue->data_waiting);
}

void enqueue(struct blocking_queuet *queue, const int data)
{
  printf(
    "enqueue begin:%lu end:%lu data:%d \n", queue->begin, queue->end, data);
  const int free_wait_error = sem_wait(&queue->free_space);
  assert(free_wait_error == false);
  queue->elements[queue->end] = data;
  if(++queue->end == queue->size)
  {
    queue->end = 0;
  }
  const int post_error = sem_post(&queue->data_waiting);
  assert(post_error == false);
  printf("enqueue done\n");
}

int dequeue(struct blocking_queuet *queue)
{
  printf("dequeue begin:%lu end:%lu\n", queue->begin, queue->end);
  const int data_wait_error = sem_wait(&queue->data_waiting);
  assert(data_wait_error == false);
  int result = queue->elements[queue->begin];
  if(++queue->begin == queue->size)
  {
    queue->begin = 0;
  }
  const int free_error = sem_post(&queue->free_space);
  assert(free_error == false);
  printf("dequeue done. data:%d \n", result);
  return result;
}

struct blocking_queuet input_queue;
struct blocking_queuet output_queue;

void *worker(void *arguments)
{
  int data;
  while(data = dequeue(&input_queue))
  {
    enqueue(&output_queue, data * data);
  }
  pthread_exit(NULL);
}

int main(void)
{
  input_queue = initialise_blocking_queue(3);
  output_queue = initialise_blocking_queue(10);
  const pthread_t worker_thread1 = start_worker_thread(&worker);
  printf("worker_started\n");
  enqueue(&input_queue, 1);
  enqueue(&input_queue, 2);
  enqueue(&input_queue, 3);
  enqueue(&input_queue, 4);
  enqueue(&input_queue, 5);
  enqueue(&input_queue, 6);
  enqueue(&input_queue, 7);
  enqueue(&input_queue, 8);
  enqueue(&input_queue, 9);
  enqueue(&input_queue, 0);
  join_thread(worker_thread1);
  free_blocking_queue(&input_queue);
  assert(dequeue(&output_queue) == 1);
  assert(dequeue(&output_queue) == 4);
  assert(dequeue(&output_queue) == 9);
  assert(dequeue(&output_queue) == 16);
  assert(dequeue(&output_queue) == 25);
  assert(dequeue(&output_queue) == 36);
  assert(dequeue(&output_queue) == 49);
  assert(dequeue(&output_queue) == 64);
  assert(dequeue(&output_queue) == 81);
  free_blocking_queue(&output_queue);
  return EXIT_SUCCESS;
}