From: Konstantin Taranov <kotaranov@microsoft.com>
Date: Tue, 11 Mar 2025 20:49:21 +0100
Subject: providers/mana: improve synchronization on the shadow queue

Use release/acquire semantics for the shadow queue.
It ensures synchronization between sender and poller threads.

Signed-off-by: Konstantin Taranov <kotaranov@microsoft.com>
Origin: upstream, https://github.com/linux-rdma/rdma-core/pull/1584
---
 providers/mana/shadow_queue.h | 58 ++++++++++++++++++++++++++-----------------
 1 file changed, 35 insertions(+), 23 deletions(-)

diff --git a/providers/mana/shadow_queue.h b/providers/mana/shadow_queue.h
index 1073f7c..9343fec 100644
--- a/providers/mana/shadow_queue.h
+++ b/providers/mana/shadow_queue.h
@@ -12,6 +12,9 @@
 #include <infiniband/verbs.h>
 #include <sys/mman.h>
 #include <util/util.h>
+#include <stdatomic.h>
+
+typedef _Atomic(uint64_t) _atomic_t;
 
 #define MANA_NO_SIGNAL_WC (0xff)
 
@@ -83,8 +86,18 @@ static inline void destroy_shadow_queue(struct shadow_queue *queue)
 	}
 }
 
+static inline _atomic_t *producer(struct shadow_queue *queue)
+{
+	return (_atomic_t *)&queue->prod_idx;
+}
+
+static inline _atomic_t *consumer(struct shadow_queue *queue)
+{
+	return (_atomic_t *)&queue->cons_idx;
+}
+
 static inline struct shadow_wqe_header *
-shadow_queue_get_element(const struct shadow_queue *queue, uint64_t unmasked_index)
+shadow_queue_get_element(struct shadow_queue *queue, uint64_t unmasked_index)
 {
 	uint32_t index = unmasked_index & (queue->length - 1);
 
@@ -93,53 +106,51 @@ shadow_queue_get_element(const struct shadow_queue *queue, uint64_t unmasked_ind
 
 static inline bool shadow_queue_full(struct shadow_queue *queue)
 {
-	return (queue->prod_idx - queue->cons_idx) >= queue->length;
+	uint64_t prod_idx = atomic_load_explicit(producer(queue), memory_order_relaxed);
+	uint64_t cons_idx = atomic_load_explicit(consumer(queue), memory_order_acquire);
+
+	return (prod_idx - cons_idx) >= queue->length;
 }
 
 static inline struct shadow_wqe_header *
 shadow_queue_producer_entry(struct shadow_queue *queue)
 {
-	return shadow_queue_get_element(queue, queue->prod_idx);
+	uint64_t prod_idx = atomic_load_explicit(producer(queue), memory_order_relaxed);
+
+	return shadow_queue_get_element(queue, prod_idx);
 }
 
 static inline void shadow_queue_advance_producer(struct shadow_queue *queue)
 {
-	queue->prod_idx++;
-}
+	uint64_t prod_idx = atomic_load_explicit(producer(queue), memory_order_relaxed);
 
-static inline void shadow_queue_retreat_producer(struct shadow_queue *queue)
-{
-	queue->prod_idx--;
+	atomic_store_explicit(producer(queue), prod_idx + 1, memory_order_release);
 }
 
 static inline void shadow_queue_advance_consumer(struct shadow_queue *queue)
 {
-	queue->cons_idx++;
-}
+	uint64_t cons_idx = atomic_load_explicit(consumer(queue), memory_order_relaxed);
 
-static inline bool shadow_queue_empty(struct shadow_queue *queue)
-{
-	return queue->prod_idx == queue->cons_idx;
-}
-
-static inline uint32_t shadow_queue_get_pending_wqe_count(struct shadow_queue *queue)
-{
-	return (uint32_t)(queue->prod_idx - queue->next_to_complete_idx);
+	atomic_store_explicit(consumer(queue), cons_idx + 1, memory_order_release);
 }
 
 static inline struct shadow_wqe_header *
-shadow_queue_get_next_to_consume(const struct shadow_queue *queue)
+shadow_queue_get_next_to_consume(struct shadow_queue *queue)
 {
-	if (queue->cons_idx == queue->next_to_complete_idx)
+	uint64_t cons_idx = atomic_load_explicit(consumer(queue), memory_order_relaxed);
+
+	if (cons_idx == queue->next_to_complete_idx)
 		return NULL;
 
-	return shadow_queue_get_element(queue, queue->cons_idx);
+	return shadow_queue_get_element(queue, cons_idx);
 }
 
 static inline struct shadow_wqe_header *
 shadow_queue_get_next_to_complete(struct shadow_queue *queue)
 {
-	if (queue->next_to_complete_idx == queue->prod_idx)
+	uint64_t prod_idx = atomic_load_explicit(producer(queue), memory_order_acquire);
+
+	if (queue->next_to_complete_idx == prod_idx)
 		return NULL;
 
 	return shadow_queue_get_element(queue, queue->next_to_complete_idx);
@@ -153,10 +164,11 @@ static inline void shadow_queue_advance_next_to_complete(struct shadow_queue *qu
 static inline struct shadow_wqe_header *
 shadow_queue_get_next_to_signal(struct shadow_queue *queue)
 {
+	uint64_t prod_idx = atomic_load_explicit(producer(queue), memory_order_acquire);
 	struct shadow_wqe_header *wqe = NULL;
 
 	queue->next_to_signal_idx = max(queue->next_to_signal_idx, queue->next_to_complete_idx);
-	while (queue->next_to_signal_idx < queue->prod_idx) {
+	while (queue->next_to_signal_idx < prod_idx) {
 		wqe = shadow_queue_get_element(queue, queue->next_to_signal_idx);
 		queue->next_to_signal_idx++;
 		if (wqe->flags != MANA_NO_SIGNAL_WC)
