File: shared_queue.hpp

package info (click to toggle)
vart 2.5-5
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 4,404 kB
  • sloc: cpp: 30,188; python: 7,493; sh: 969; makefile: 37; ansic: 36
file content (159 lines) | stat: -rw-r--r-- 4,728 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
/*
 * Copyright 2019 Xilinx Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#ifndef DEEPHI_SHARED_QUEUE_HPP_
#define DEEPHI_SHARED_QUEUE_HPP_

#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <deque>
#include <functional>
#include <memory>
#include <mutex>

namespace vitis {
namespace ai {
/**
 * A thread safe queue.
 */
template <typename T>
class SharedQueue {
 public:
  /**
   * Return the size of the queue.
   */
  virtual std::size_t size() const {
    std::lock_guard<std::mutex> lock(this->mtx_);
    return this->internal_size();
  }

  /**
   * Return true if the queue is empty, and false otherwise.
   */
  virtual bool empty() const {
    std::lock_guard<std::mutex> lock(this->mtx_);
    return this->internal_empty();
  }

  /**
   * Copy the value to the end of this queue.
   */
  virtual void push(const T& new_value) {
    std::lock_guard<std::mutex> lock(this->mtx_);
    this->internal_push(new_value);
    this->cond_not_empty_.notify_one();
  }

  /**
   * Get the first element in the queue and remove it from the queue.
   * This is blocking
   */
  virtual void pop(T& value) {
    std::unique_lock<std::mutex> lock(this->mtx_);
    this->cond_not_empty_.wait(lock,
                               [this]() { return !(this->internal_empty()); });
    this->internal_pop(value);
  }

  /**
   * Get the first element in the queue and remove it from the queue.
   * This will fail and return false if blocked for more than rel_time.
   */
  virtual bool pop(T& value, const std::chrono::milliseconds& rel_time) {
    std::unique_lock<std::mutex> lock(this->mtx_);
    if (this->cond_not_empty_.wait_for(lock, rel_time, [this]() {
          return !(this->internal_empty());
        }) == false) {
      return false;
    }
    this->internal_pop(value);
    return true;
  }

  /**
   * Get the first element in the queue that satisfies cond, and remove it
   * from the queue.
   * This will fail and return false if no such element.
   */
  virtual bool pop(T& value, std::function<bool(const T&)>& cond) {
    std::lock_guard<std::mutex> lock(this->mtx_);
    if (this->internal_empty()) return false;
    auto it = std::find_if(internal_.begin(), internal_.end(), cond);
    if (it != internal_.end()) {
      value = std::move(*it);
      internal_.erase(it);
      return true;
    }
    return false;
  }

  /**
   * Get the first element in the queue that satisfies cond, and remove it
   * from the queue.
   * This will fail and return false if no such element, or blocked for more
   * than rel_time.
   */
  virtual bool pop(T& value, std::function<bool(const T&)>& cond,
                   const std::chrono::milliseconds& rel_time) {
    auto now = std::chrono::steady_clock::now();
    std::unique_lock<std::mutex> lock(this->mtx_);
    // Wait until not empty
    if (!this->cond_not_empty_.wait_for(
            lock, rel_time, [this]() { return !this->internal_empty(); })) {
      return false;
    }
    auto it =
        std::find_if(this->internal_.begin(), this->internal_.end(), cond);
    while (it == this->internal_.end()) {
      if (this->cond_not_empty_.wait_until(lock, now + rel_time) ==
          std::cv_status::timeout) {
        break;
      }
      it = std::find_if(this->internal_.begin(), this->internal_.end(), cond);
    }
    it = std::find_if(this->internal_.begin(), this->internal_.end(), cond);
    if (it != this->internal_.end()) {
      value = std::move(*it);
      this->internal_.erase(it);
      return true;
    }
    return false;
  }

 protected:
  inline virtual std::size_t internal_size() const {
    return this->internal_.size();
  }
  inline virtual bool internal_empty() const { return this->internal_.empty(); }
  inline virtual void internal_push(const T& new_value) {
    this->internal_.emplace_back(new_value);
  }
  inline virtual void internal_pop(T& value) {
    value = std::move(this->internal_.front());
    this->internal_.pop_front();
  }
  inline virtual void internal_top(T& value) {
    value = this->internal_.front();
  }

  mutable std::mutex mtx_;
  std::condition_variable cond_not_empty_;

  std::deque<T> internal_;
};
}  // namespace ai
}  // namespace vitis
#endif