File: ttaskqueue.cpp

package info (click to toggle)
aoflagger 3.5.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,004 kB
  • sloc: cpp: 67,891; python: 497; sh: 242; makefile: 22
file content (137 lines) | stat: -rw-r--r-- 3,695 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include <aocommon/taskqueue.h>

#include <atomic>
#include <thread>
#include <vector>

#include <boost/test/data/test_case.hpp>
#include <boost/test/unit_test.hpp>

using aocommon::TaskQueue;

BOOST_AUTO_TEST_SUITE(task_queue)

BOOST_DATA_TEST_CASE(single_thread, boost::unit_test::data::make({false, true}),
                     use_wait) {
  const std::vector<int> kValues{42, 43, 44, 45};
  constexpr int kDummyValue = 142;

  // Using a unique pointer ensures that TaskQueue cannot copy tasks.
  TaskQueue<std::unique_ptr<int>> queue;
  for (const int& value : kValues) {
    queue.Emplace(std::make_unique<int>(value));
  }

  for (const int& value : kValues) {
    std::unique_ptr<int> popped;
    BOOST_TEST(queue.Pop(popped));
    BOOST_REQUIRE(popped);
    BOOST_TEST(*popped == value);
  }

  if (use_wait) queue.WaitForIdle(0);

  queue.Finish();
  auto dummy = std::make_unique<int>(kDummyValue);
  BOOST_TEST(!queue.Pop(dummy));
  BOOST_REQUIRE(dummy);
  BOOST_TEST(*dummy == kDummyValue);
}

BOOST_DATA_TEST_CASE(multiple_threads_pop,
                     boost::unit_test::data::make({false, true}), use_wait) {
  const std::vector<int> kValues{42, 43, 44, 45};
  const size_t kLimit = 2;
  TaskQueue<int> queue{kLimit};
  std::mutex mutex;
  std::condition_variable notify;
  int popped_in_thread = 0;

  std::vector<std::thread> pop_threads;
  for (size_t i = 0; i < kValues.size(); ++i) {
    pop_threads.emplace_back([&] {
      int popped = 0;
      const bool result = queue.Pop(popped);
      std::lock_guard<std::mutex> lock(mutex);
      BOOST_TEST_REQUIRE(result);
      popped_in_thread = popped;
      notify.notify_one();
    });
  }

  if (use_wait) queue.WaitForIdle(pop_threads.size());

  for (const int& value : kValues) {
    popped_in_thread = 0;
    queue.Emplace(value);
    std::unique_lock<std::mutex> lock(mutex);
    while (popped_in_thread == 0) notify.wait(lock);
    BOOST_TEST(popped_in_thread == value);
  }

  for (std::thread& thread : pop_threads) thread.join();

  if (use_wait) queue.WaitForIdle(0);
}

BOOST_AUTO_TEST_CASE(multiple_threads_done) {
  constexpr size_t kNThreads = 42;
  constexpr int kDummyValue = 142;
  TaskQueue<int> queue;

  std::mutex mutex;
  std::vector<std::thread> threads;
  for (size_t i = 0; i < kNThreads; ++i) {
    threads.emplace_back([&] {
      int dummy = kDummyValue;
      std::lock_guard<std::mutex> lock(mutex);
      BOOST_TEST(!queue.Pop(dummy));
      BOOST_TEST(dummy == kDummyValue);
    });
  }

  queue.Finish();

  // Joining the threads also tests that all threads are done.
  for (std::thread& thread : threads) thread.join();
}

BOOST_AUTO_TEST_CASE(wait_for_idle) {
  // Test that WaitForIdle really waits until kNThreads call Pop().
  const size_t kNThreads = 42;

  TaskQueue<int> queue;

  std::atomic<bool> waiting = false;
  std::atomic<bool> done_waiting = false;
  std::thread wait_thread([&] {
    waiting = true;
    queue.WaitForIdle(kNThreads);
    done_waiting = true;
  });
  // Wait until wait_thread starts waiting.
  while (!waiting) std::this_thread::yield();

  std::vector<std::thread> pop_threads;
  for (size_t i = 0; i < kNThreads; ++i) {
    BOOST_TEST(waiting);
    BOOST_TEST(!done_waiting);
    std::atomic<bool> popping = false;
    pop_threads.emplace_back([&] {
      popping = true;
      int dummy;
      queue.Pop(dummy);
    });
    // Wait until the thread starts popping.
    while (!popping) std::this_thread::yield();
  }

  // Wait until wait_thread stops waiting.
  while (!done_waiting) std::this_thread::yield();

  wait_thread.join();
  queue.Finish();
  for (std::thread& pop_thread : pop_threads) pop_thread.join();
}

BOOST_AUTO_TEST_SUITE_END()