Libosmium  2.17.3
Fast and flexible C++ library for working with OpenStreetMap data
queue.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_THREAD_QUEUE_HPP
2 #define OSMIUM_THREAD_QUEUE_HPP
3 
4 /*
5 
6 This file is part of Osmium (https://osmcode.org/libosmium).
7 
8 Copyright 2013-2022 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
36 #include <atomic>
37 #include <chrono>
38 #include <condition_variable>
39 #include <cstddef>
40 #include <mutex>
41 #include <queue>
42 #include <string>
43 #include <utility> // IWYU pragma: keep
44 
45 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
46 # include <iostream>
47 #endif
48 
49 namespace osmium {
50 
51  namespace thread {
52 
56  template <typename T>
57  class Queue {
58 
61  const std::size_t m_max_size;
62 
64  const std::string m_name;
65 
66  mutable std::mutex m_mutex;
67 
68  std::queue<T> m_queue;
69 
71  std::condition_variable m_data_available;
72 
74  std::condition_variable m_space_available;
75 
76  std::atomic<bool> m_in_use{true};
77 
78 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
80  std::size_t m_largest_size;
81 
83  std::atomic<int> m_push_counter;
84 
87  std::atomic<int> m_full_counter;
88 
93  std::atomic<int> m_pop_counter;
94 
97  std::atomic<int> m_empty_counter;
98 #endif
99 
100  public:
101 
109  explicit Queue(std::size_t max_size = 0, std::string name = "") :
110  m_max_size(max_size),
111  m_name(std::move(name)),
112  m_queue()
113 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
114  ,
115  m_largest_size(0),
116  m_push_counter(0),
117  m_full_counter(0),
118  m_pop_counter(0),
119  m_empty_counter(0)
120 #endif
121  {
122  }
123 
124  Queue(const Queue&) = delete;
125  Queue& operator=(const Queue&) = delete;
126 
127  Queue(Queue&&) = delete;
128  Queue& operator=(Queue&&) = delete;
129 
130 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
131  ~Queue() {
132  std::cerr << "queue '" << m_name
133  << "' with max_size=" << m_max_size
134  << " had largest size " << m_largest_size
135  << " and was full " << m_full_counter
136  << " times in " << m_push_counter
137  << " push() calls and was empty " << m_empty_counter
138  << " times in " << m_pop_counter
139  << " pop() calls\n";
140  }
141 #else
142  ~Queue() = default;
143 #endif
144 
149  void push(T value) {
150  if (!m_in_use) {
151  return;
152  }
153  constexpr const std::chrono::milliseconds max_wait{10};
154 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
155  ++m_push_counter;
156 #endif
157  if (m_max_size) {
158  while (size() >= m_max_size) {
159  std::unique_lock<std::mutex> lock{m_mutex};
160  m_space_available.wait_for(lock, max_wait, [this] {
161  return m_queue.size() < m_max_size;
162  });
163 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
164  ++m_full_counter;
165 #endif
166  }
167  }
168  std::lock_guard<std::mutex> lock{m_mutex};
169  m_queue.push(std::move(value));
170 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
171  if (m_largest_size < m_queue.size()) {
172  m_largest_size = m_queue.size();
173  }
174 #endif
175  m_data_available.notify_one();
176  }
177 
178  void wait_and_pop(T& value) {
179 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
180  ++m_pop_counter;
181 #endif
182  std::unique_lock<std::mutex> lock{m_mutex};
183 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
184  if (m_queue.empty()) {
185  ++m_empty_counter;
186  }
187 #endif
188  m_data_available.wait(lock, [this] {
189  return !m_in_use || !m_queue.empty();
190  });
191  if (!m_queue.empty()) {
192  value = std::move(m_queue.front());
193  m_queue.pop();
194  lock.unlock();
195  if (m_max_size) {
196  m_space_available.notify_one();
197  }
198  }
199  }
200 
201  bool try_pop(T& value) {
202 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
203  ++m_pop_counter;
204 #endif
205  {
206  std::lock_guard<std::mutex> lock{m_mutex};
207  if (m_queue.empty()) {
208 #ifdef OSMIUM_DEBUG_QUEUE_SIZE
209  ++m_empty_counter;
210 #endif
211  return false;
212  }
213  value = std::move(m_queue.front());
214  m_queue.pop();
215  }
216  if (m_max_size) {
217  m_space_available.notify_one();
218  }
219  return true;
220  }
221 
222  bool empty() const {
223  std::lock_guard<std::mutex> lock{m_mutex};
224  return m_queue.empty();
225  }
226 
227  std::size_t size() const {
228  std::lock_guard<std::mutex> lock{m_mutex};
229  return m_queue.size();
230  }
231 
232  bool in_use() const noexcept {
233  return m_in_use;
234  }
235 
236  void shutdown() {
237  m_in_use = false;
238  std::lock_guard<std::mutex> lock{m_mutex};
239  while (!m_queue.empty()) {
240  m_queue.pop();
241  }
242  m_data_available.notify_all();
243  }
244 
245  }; // class Queue
246 
247  } // namespace thread
248 
249 } // namespace osmium
250 
251 #endif // OSMIUM_THREAD_QUEUE_HPP
Definition: queue.hpp:57
bool try_pop(T &value)
Definition: queue.hpp:201
std::mutex m_mutex
Definition: queue.hpp:66
Queue & operator=(const Queue &)=delete
bool empty() const
Definition: queue.hpp:222
bool in_use() const noexcept
Definition: queue.hpp:232
void push(T value)
Definition: queue.hpp:149
void wait_and_pop(T &value)
Definition: queue.hpp:178
std::atomic< bool > m_in_use
Definition: queue.hpp:76
Queue(const Queue &)=delete
std::condition_variable m_space_available
Used to signal producers when queue is not full.
Definition: queue.hpp:74
Queue(std::size_t max_size=0, std::string name="")
Definition: queue.hpp:109
std::size_t size() const
Definition: queue.hpp:227
std::queue< T > m_queue
Definition: queue.hpp:68
Queue & operator=(Queue &&)=delete
Queue(Queue &&)=delete
const std::size_t m_max_size
Definition: queue.hpp:61
std::condition_variable m_data_available
Used to signal consumers when data is available in the queue.
Definition: queue.hpp:71
void shutdown()
Definition: queue.hpp:236
const std::string m_name
Name of this queue (for debugging only).
Definition: queue.hpp:64
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
Definition: location.hpp:555