File: test_mpmc_queue.cpp

package info (click to toggle)
pytorch 2.9.1%2Bdfsg-1~exp2
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 180,096 kB
  • sloc: python: 1,473,255; cpp: 942,030; ansic: 79,796; asm: 7,754; javascript: 2,502; java: 1,962; sh: 1,809; makefile: 628; xml: 8
file content (134 lines) | stat: -rw-r--r-- 3,840 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#include <atomic>
#include <thread>

#include <gtest/gtest.h>

#include <torch/nativert/detail/MPMCQueue.h>

using torch::nativert::detail::MPMCQueue;

TEST(MPMCQueueTest, EmptyQueue) {
  MPMCQueue<int> queue(5);
  int out = 0;
  EXPECT_FALSE(queue.readIfNotEmpty(out));
}

TEST(MPMCQueueTest, SingleElement) {
  MPMCQueue<int> queue(5);
  EXPECT_TRUE(queue.writeIfNotFull(10));
  int out = 0;
  EXPECT_TRUE(queue.readIfNotEmpty(out));
  EXPECT_EQ(out, 10);
}

TEST(MPMCQueueTest, MultipleElements) {
  MPMCQueue<int> queue(5);
  for (int i = 0; i < 5; ++i) {
    EXPECT_TRUE(queue.writeIfNotFull(i));
  }
  for (int i = 0; i < 5; ++i) {
    int out = 0;
    EXPECT_TRUE(queue.readIfNotEmpty(out));
    EXPECT_EQ(out, i);
  }
}

TEST(MPMCQueueTest, FullQueue) {
  MPMCQueue<int> queue(5);
  for (int i = 0; i < 5; ++i) {
    EXPECT_TRUE(queue.writeIfNotFull(i));
  }
  EXPECT_FALSE(queue.writeIfNotFull(10));
}

TEST(MPMCQueueTest, ConcurrentAccess) {
  MPMCQueue<int> queue(10);
  std::thread writer([&queue]() {
    for (int i = 0; i < 5; ++i) {
      queue.writeIfNotFull(i);
    }
  });
  std::thread reader([&queue]() {
    for (int i = 0; i < 5; ++i) {
      int out = 0;
      while (!queue.readIfNotEmpty(out)) {
        // Wait until an element is available
        // TODO We could provide a blocking version of read() instead of
        // looping here. We only provide a non blocking wait API because
        // for now the queue is paired with a semaphore in executor.
        std::this_thread::yield();
      }
      EXPECT_LT(out, 5);
    }
  });
  writer.join();
  reader.join();
}

TEST(MPMCQueueTest, MPMCConcurrentAccess) {
  const size_t queueCapacity = 100000;
  const size_t numWriters = 5;
  const size_t numReaders = 5;
  const size_t numElementsPerWriter = 10000;
  MPMCQueue<int> queue(queueCapacity);
  // Writer threads
  std::vector<std::thread> writers;
  writers.reserve(numWriters);
  for (size_t i = 0; i < numWriters; ++i) {
    writers.emplace_back([&]() {
      for (size_t j = 0; j < numElementsPerWriter; ++j) {
        size_t value = i * numElementsPerWriter + j;
        while (!queue.writeIfNotFull(static_cast<int>(value))) {
          // Retry until the queue has space
          // TODO We could provide a blocking version of read() instead of
          // looping here. We only provide a non blocking wait API because
          // for now the queue is paired with a semaphore in executor.
          std::this_thread::yield();
        }
      }
    });
  }
  // Reader threads
  std::vector<std::thread> readers;
  std::atomic<size_t> totalReadCount{0};
  readers.reserve(numReaders);
  for (size_t i = 0; i < numReaders; ++i) {
    readers.emplace_back([&]() {
      int value = 0;
      while (totalReadCount < numWriters * numElementsPerWriter) {
        if (queue.readIfNotEmpty(value)) {
          ++totalReadCount;
        } else {
          // TODO We could provide a blocking version of read() instead of
          // looping here. We only provide a non blocking wait API because
          // for now the queue is paired with a semaphore in executor.
          std::this_thread::yield();
        }
      }
    });
  }
  // Join all threads
  for (auto& writer : writers) {
    writer.join();
  }
  for (auto& reader : readers) {
    reader.join();
  }
  // Verify that all elements were read
  EXPECT_EQ(totalReadCount, numWriters * numElementsPerWriter);
}

TEST(MPMCQueueTest, MoveOnlyType) {
  struct MoveOnly {
    MoveOnly() = default;
    MoveOnly(const MoveOnly&) = delete;
    MoveOnly& operator=(const MoveOnly&) = delete;
    MoveOnly(MoveOnly&&) = default;
    MoveOnly& operator=(MoveOnly&&) = default;
    ~MoveOnly() = default;
  };
  MPMCQueue<MoveOnly> queue(5);
  EXPECT_TRUE(queue.writeIfNotFull(MoveOnly()));
  MoveOnly out;
  EXPECT_TRUE(queue.readIfNotEmpty(out));
}