1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
#include <atomic>
#include <thread>
#include <gtest/gtest.h>
#include <torch/nativert/detail/MPMCQueue.h>
using torch::nativert::detail::MPMCQueue;
TEST(MPMCQueueTest, EmptyQueue) {
MPMCQueue<int> queue(5);
int out = 0;
EXPECT_FALSE(queue.readIfNotEmpty(out));
}
TEST(MPMCQueueTest, SingleElement) {
MPMCQueue<int> queue(5);
EXPECT_TRUE(queue.writeIfNotFull(10));
int out = 0;
EXPECT_TRUE(queue.readIfNotEmpty(out));
EXPECT_EQ(out, 10);
}
TEST(MPMCQueueTest, MultipleElements) {
MPMCQueue<int> queue(5);
for (int i = 0; i < 5; ++i) {
EXPECT_TRUE(queue.writeIfNotFull(i));
}
for (int i = 0; i < 5; ++i) {
int out = 0;
EXPECT_TRUE(queue.readIfNotEmpty(out));
EXPECT_EQ(out, i);
}
}
TEST(MPMCQueueTest, FullQueue) {
MPMCQueue<int> queue(5);
for (int i = 0; i < 5; ++i) {
EXPECT_TRUE(queue.writeIfNotFull(i));
}
EXPECT_FALSE(queue.writeIfNotFull(10));
}
TEST(MPMCQueueTest, ConcurrentAccess) {
MPMCQueue<int> queue(10);
std::thread writer([&queue]() {
for (int i = 0; i < 5; ++i) {
queue.writeIfNotFull(i);
}
});
std::thread reader([&queue]() {
for (int i = 0; i < 5; ++i) {
int out = 0;
while (!queue.readIfNotEmpty(out)) {
// Wait until an element is available
// TODO We could provide a blocking version of read() instead of
// looping here. We only provide a non blocking wait API because
// for now the queue is paired with a semaphore in executor.
std::this_thread::yield();
}
EXPECT_LT(out, 5);
}
});
writer.join();
reader.join();
}
TEST(MPMCQueueTest, MPMCConcurrentAccess) {
const size_t queueCapacity = 100000;
const size_t numWriters = 5;
const size_t numReaders = 5;
const size_t numElementsPerWriter = 10000;
MPMCQueue<int> queue(queueCapacity);
// Writer threads
std::vector<std::thread> writers;
writers.reserve(numWriters);
for (size_t i = 0; i < numWriters; ++i) {
writers.emplace_back([&]() {
for (size_t j = 0; j < numElementsPerWriter; ++j) {
size_t value = i * numElementsPerWriter + j;
while (!queue.writeIfNotFull(static_cast<int>(value))) {
// Retry until the queue has space
// TODO We could provide a blocking version of read() instead of
// looping here. We only provide a non blocking wait API because
// for now the queue is paired with a semaphore in executor.
std::this_thread::yield();
}
}
});
}
// Reader threads
std::vector<std::thread> readers;
std::atomic<size_t> totalReadCount{0};
readers.reserve(numReaders);
for (size_t i = 0; i < numReaders; ++i) {
readers.emplace_back([&]() {
int value = 0;
while (totalReadCount < numWriters * numElementsPerWriter) {
if (queue.readIfNotEmpty(value)) {
++totalReadCount;
} else {
// TODO We could provide a blocking version of read() instead of
// looping here. We only provide a non blocking wait API because
// for now the queue is paired with a semaphore in executor.
std::this_thread::yield();
}
}
});
}
// Join all threads
for (auto& writer : writers) {
writer.join();
}
for (auto& reader : readers) {
reader.join();
}
// Verify that all elements were read
EXPECT_EQ(totalReadCount, numWriters * numElementsPerWriter);
}
TEST(MPMCQueueTest, MoveOnlyType) {
struct MoveOnly {
MoveOnly() = default;
MoveOnly(const MoveOnly&) = delete;
MoveOnly& operator=(const MoveOnly&) = delete;
MoveOnly(MoveOnly&&) = default;
MoveOnly& operator=(MoveOnly&&) = default;
~MoveOnly() = default;
};
MPMCQueue<MoveOnly> queue(5);
EXPECT_TRUE(queue.writeIfNotFull(MoveOnly()));
MoveOnly out;
EXPECT_TRUE(queue.readIfNotEmpty(out));
}
|