1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
|
/*
* Copyright (c) 2021 NVIDIA Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MPSC_QUEUE_H
#define MPSC_QUEUE_H 1
#include <stdbool.h>
#include <stdint.h>
#include <stddef.h>
#include <openvswitch/thread.h>
#include <openvswitch/util.h>
#include "ovs-atomic.h"
/* Multi-producer, single-consumer queue
* =====================================
*
* This data structure is a lockless queue implementation with
* the following properties:
*
* * Multi-producer: multiple threads can write concurrently.
* Insertion in the queue is thread-safe, no inter-thread
* synchronization is necessary.
*
* * Single-consumer: only a single thread can safely remove
* nodes from the queue. The queue must be 'acquired' using
* 'mpsc_queue_acquire()' before removing nodes.
*
* * Unbounded: the queue is backed by a linked-list and is not
* limited in number of elements.
*
* * Intrusive: queue elements are allocated as part of larger
* objects. Objects are retrieved by offset manipulation.
*
* * per-producer FIFO: Elements in the queue are kept in the
* order their producer inserted them. The consumer retrieves
* them in in the same insertion order. When multiple
* producers insert at the same time, either will proceed.
*
* This queue is well-suited for message passing between threads,
* where any number of thread can insert a message and a single
* thread is meant to receive and process it.
*
* Thread-safety
* =============
*
* The consumer thread must acquire the queue using 'mpsc_queue_acquire()'.
* Once the queue is protected against concurrent reads, the thread can call
* the consumer API:
*
* * mpsc_queue_poll() to peek and return the tail of the queue
* * mpsc_queue_pop() to remove the tail of the queue
* * mpsc_queue_tail() to read the current tail
* * mpsc_queue_push_front() to enqueue an element safely at the tail
* * MPSC_QUEUE_FOR_EACH() to iterate over the current elements,
* without removing them.
* * MPSC_QUEUE_FOR_EACH_POP() to iterate over the elements while
* removing them.
*
* When a thread is finished with reading the queue, it can release the
* reader lock using 'mpsc_queue_release()'.
*
* Producers can always insert elements in the queue, even if no consumer
* acquired the reader lock. No inter-producer synchronization is needed.
*
* The consumer thread is also allowed to insert elements while it holds the
* reader lock.
*
* Producer threads must never be cancelled while writing to the queue.
* This will block the consumer, that will then lose any subsequent elements
* in the queue. Producers should ideally be cooperatively managed or
* the queue insertion should be within non-cancellable sections.
*
* Queue state
* ===========
*
* When polling the queue, three states can be observed: 'empty', 'non-empty',
* and 'inconsistent'. Three polling results are defined, respectively:
*
* * MPSC_QUEUE_EMPTY: the queue is empty.
* * MPSC_QUEUE_ITEM: an item was available and has been removed.
* * MPSC_QUEUE_RETRY: the queue is inconsistent.
*
* If 'MPSC_QUEUE_RETRY' is returned, then a producer has not yet finished
* writing to the queue and the list of nodes is not coherent. The consumer
* can retry shortly to check if the producer has finished.
*
* This behavior is the reason the removal function is called
* 'mpsc_queue_poll()'.
*
*/
struct mpsc_queue_node {
ATOMIC(struct mpsc_queue_node *) next;
};
struct mpsc_queue {
ATOMIC(struct mpsc_queue_node *) head;
ATOMIC(struct mpsc_queue_node *) tail;
struct mpsc_queue_node stub;
struct ovs_mutex read_lock;
};
#define MPSC_QUEUE_INITIALIZER(Q) { \
.head = &(Q)->stub, \
.tail = &(Q)->stub, \
.stub = { .next = NULL }, \
.read_lock = OVS_MUTEX_INITIALIZER, \
}
/* Consumer API. */
/* Initialize the queue. Not necessary is 'MPSC_QUEUE_INITIALIZER' was used. */
void mpsc_queue_init(struct mpsc_queue *queue);
/* The reader lock must be released prior to destroying the queue. */
void mpsc_queue_destroy(struct mpsc_queue *queue);
/* Acquire and release the consumer lock. */
#define mpsc_queue_acquire(q) do { \
ovs_mutex_lock(&(q)->read_lock); \
} while (0)
#define mpsc_queue_release(q) do { \
ovs_mutex_unlock(&(q)->read_lock); \
} while (0)
enum mpsc_queue_poll_result {
/* Queue is empty. */
MPSC_QUEUE_EMPTY,
/* Polling the queue returned an item. */
MPSC_QUEUE_ITEM,
/* Data has been enqueued but one or more producer thread have not
* finished writing it. The queue is in an inconsistent state.
* Retrying shortly, if the producer threads are still active, will
* return the data.
*/
MPSC_QUEUE_RETRY,
};
/* Set 'node' to a removed item from the queue if 'MPSC_QUEUE_ITEM' is
* returned, otherwise 'node' is not set.
*/
enum mpsc_queue_poll_result mpsc_queue_poll(struct mpsc_queue *queue,
struct mpsc_queue_node **node)
OVS_REQUIRES(queue->read_lock);
/* Pop an element if there is any in the queue. */
struct mpsc_queue_node *mpsc_queue_pop(struct mpsc_queue *queue)
OVS_REQUIRES(queue->read_lock);
/* Insert at the front of the queue. Only the consumer can do it. */
void mpsc_queue_push_front(struct mpsc_queue *queue,
struct mpsc_queue_node *node)
OVS_REQUIRES(queue->read_lock);
/* Get the current queue tail. */
struct mpsc_queue_node *mpsc_queue_tail(struct mpsc_queue *queue)
OVS_REQUIRES(queue->read_lock);
/* Get the next element of a node. */
struct mpsc_queue_node *mpsc_queue_next(struct mpsc_queue *queue,
struct mpsc_queue_node *prev)
OVS_REQUIRES(queue->read_lock);
#define MPSC_QUEUE_FOR_EACH(node, queue) \
for (node = mpsc_queue_tail(queue); node != NULL; \
node = mpsc_queue_next((queue), node))
#define MPSC_QUEUE_FOR_EACH_POP(node, queue) \
for (node = mpsc_queue_pop(queue); node != NULL; \
node = mpsc_queue_pop(queue))
/* Producer API. */
void mpsc_queue_insert(struct mpsc_queue *queue, struct mpsc_queue_node *node);
#endif /* MPSC_QUEUE_H */
|