1 #ifndef OSMIUM_THREAD_QUEUE_HPP 2 #define OSMIUM_THREAD_QUEUE_HPP 37 #include <condition_variable> 44 #ifdef OSMIUM_DEBUG_QUEUE_SIZE 76 #ifdef OSMIUM_DEBUG_QUEUE_SIZE 77 std::size_t m_largest_size;
81 std::atomic<int> m_push_counter;
85 std::atomic<int> m_full_counter;
91 std::atomic<int> m_pop_counter;
95 std::atomic<int> m_empty_counter;
107 explicit Queue(std::size_t max_size = 0, std::string name =
"") :
108 m_max_size(max_size),
109 m_name(
std::move(name)),
111 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
128 #ifdef OSMIUM_DEBUG_QUEUE_SIZE 130 std::cerr <<
"queue '" << m_name
131 <<
"' with max_size=" << m_max_size
132 <<
" had largest size " << m_largest_size
133 <<
" and was full " << m_full_counter
134 <<
" times in " << m_push_counter
135 <<
" push() calls and was empty " << m_empty_counter
136 <<
" times in " << m_pop_counter
148 constexpr
const std::chrono::milliseconds max_wait{10};
149 #ifdef OSMIUM_DEBUG_QUEUE_SIZE 153 while (
size() >= m_max_size) {
154 std::unique_lock<std::mutex> lock{m_mutex};
155 m_space_available.wait_for(lock, max_wait, [
this] {
158 #ifdef OSMIUM_DEBUG_QUEUE_SIZE 163 std::lock_guard<std::mutex> lock{m_mutex};
164 m_queue.push(std::move(value));
165 #ifdef OSMIUM_DEBUG_QUEUE_SIZE 166 if (m_largest_size < m_queue.size()) {
167 m_largest_size = m_queue.size();
170 m_data_available.notify_one();
174 #ifdef OSMIUM_DEBUG_QUEUE_SIZE 177 std::unique_lock<std::mutex> lock{m_mutex};
178 #ifdef OSMIUM_DEBUG_QUEUE_SIZE 179 if (m_queue.empty()) {
183 m_data_available.wait(lock, [
this] {
184 return !m_queue.empty();
186 if (!m_queue.empty()) {
187 value = std::move(m_queue.front());
191 m_space_available.notify_one();
197 #ifdef OSMIUM_DEBUG_QUEUE_SIZE 201 std::lock_guard<std::mutex> lock{m_mutex};
202 if (m_queue.empty()) {
203 #ifdef OSMIUM_DEBUG_QUEUE_SIZE 208 value = std::move(m_queue.front());
212 m_space_available.notify_one();
218 std::lock_guard<std::mutex> lock{m_mutex};
219 return m_queue.empty();
223 std::lock_guard<std::mutex> lock{m_mutex};
224 return m_queue.size();
233 #endif // OSMIUM_THREAD_QUEUE_HPP
Definition: location.hpp:554
std::mutex m_mutex
Definition: queue.hpp:66
Queue(std::size_t max_size=0, std::string name="")
Definition: queue.hpp:107
std::queue< T > m_queue
Definition: queue.hpp:68
const std::string m_name
Name of this queue (for debugging only).
Definition: queue.hpp:64
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
std::condition_variable m_space_available
Used to signal producers when queue is not full.
Definition: queue.hpp:74
bool try_pop(T &value)
Definition: queue.hpp:196
Queue & operator=(const Queue &)=delete
void wait_and_pop(T &value)
Definition: queue.hpp:173
void push(T value)
Definition: queue.hpp:147
bool empty() const
Definition: queue.hpp:217
std::size_t size() const
Definition: queue.hpp:222
const std::size_t m_max_size
Definition: queue.hpp:61
std::condition_variable m_data_available
Used to signal consumers when data is available in the queue.
Definition: queue.hpp:71