xenium
kirsch_kfifo_queue.hpp
1 //
2 // Copyright (c) 2018-2020 Manuel Pöter.
3 // Licensed under the MIT License. See LICENSE file in the project root for full license information.
4 //
5 
6 #ifndef XENIUM_KIRSCH_KFIFO_QUEUE_HPP
7 #define XENIUM_KIRSCH_KFIFO_QUEUE_HPP
8 
9 #include <xenium/marked_ptr.hpp>
10 #include <xenium/parameter.hpp>
11 #include <xenium/policy.hpp>
12 #include <xenium/utils.hpp>
13 
14 #include <xenium/detail/pointer_queue_traits.hpp>
15 
16 #include <algorithm>
17 #include <atomic>
18 #include <cstdint>
19 #include <stdexcept>
20 
21 namespace xenium {
43  template <class T, class... Policies>
45  private:
46  using traits = detail::pointer_queue_traits_t<T, Policies...>;
47  using raw_value_type = typename traits::raw_type;
48  public:
49  using value_type = T;
50  using reclaimer = parameter::type_param_t<policy::reclaimer, parameter::nil, Policies...>;
51  static constexpr unsigned padding_bytes = parameter::value_param_t<unsigned, policy::padding_bytes, sizeof(raw_value_type), Policies...>::value;
52 
53  static_assert(parameter::is_set<reclaimer>::value, "reclaimer policy must be specified");
54 
55  template <class... NewPolicies>
56  using with = kirsch_kfifo_queue<T, NewPolicies..., Policies...>;
57 
58  kirsch_kfifo_queue(uint64_t k);
60 
61  kirsch_kfifo_queue(const kirsch_kfifo_queue&) = delete;
63 
64  kirsch_kfifo_queue& operator= (const kirsch_kfifo_queue&) = delete;
65  kirsch_kfifo_queue& operator= (kirsch_kfifo_queue&&) = delete;
66 
73  void push(value_type value);
74 
81  [[nodiscard]] bool try_pop(value_type& result);
82 
83  private:
85 
86  struct padded_entry {
87  std::atomic<marked_value> value;
88  // we use max here to avoid arrays of size zero which are not allowed by Visual C++
89  char padding[std::max(padding_bytes, 1u)];
90  };
91 
92  struct unpadded_entry {
93  std::atomic<marked_value> value;
94  };
95  using entry = std::conditional_t<padding_bytes == 0, unpadded_entry, padded_entry>;
96 
97  public:
101  static constexpr std::size_t entry_size = sizeof(entry);
102 
103  private:
104  struct segment;
105 
106  struct segment_deleter {
107  void operator()(segment* seg) const { release_segment(seg); }
108  };
109  struct segment : reclaimer::template enable_concurrent_ptr<segment, 16, segment_deleter> {
110  using concurrent_ptr = typename reclaimer::template concurrent_ptr<segment, 16>;
111 
112  explicit segment(uint64_t k) : k(k) {}
113  ~segment() {
114  for (unsigned i = 0; i < k; ++i) {
115  assert(items()[i].value.load(std::memory_order_relaxed).get() == nullptr);
116  }
117  }
118 
119  void delete_remaining_items() {
120  for (unsigned i = 0; i < k; ++i) {
121  traits::delete_value(items()[i].value.load(std::memory_order_relaxed).get());
122  items()[i].value.store(nullptr, std::memory_order_relaxed);
123  }
124  }
125 
126  entry* items() noexcept { return reinterpret_cast<entry*>(this + 1); }
127 
128  std::atomic<bool> deleted{false};
129  const uint64_t k;
130  concurrent_ptr next{};
131  };
132 
133  using concurrent_ptr = typename segment::concurrent_ptr;
134  using marked_ptr = typename concurrent_ptr::marked_ptr;
135  using guard_ptr = typename concurrent_ptr::guard_ptr;
136 
137  segment* alloc_segment() const;
138  static void release_segment(segment* seg);
139 
140  template <bool Empty>
141  bool find_index(marked_ptr segment, uint64_t& value_index, marked_value& old) const noexcept;
142  void advance_head(guard_ptr& head_current, marked_ptr tail_current) noexcept;
143  void advance_tail(marked_ptr tail_old) noexcept;
144  bool committed(marked_ptr segment, marked_value value, uint64_t index) noexcept;
145 
146  const std::size_t k_;
147  concurrent_ptr head_;
148  concurrent_ptr tail_;
149  };
150 
151  template <class T, class... Policies>
152  kirsch_kfifo_queue<T, Policies...>::kirsch_kfifo_queue(uint64_t k) :
153  k_(k)
154  {
155  const auto seg = alloc_segment();
156  head_.store(seg, std::memory_order_relaxed);
157  tail_.store(seg, std::memory_order_relaxed);
158  }
159 
160  template <class T, class... Policies>
161  kirsch_kfifo_queue<T, Policies...>::~kirsch_kfifo_queue() {
162  auto seg = head_.load(std::memory_order_relaxed).get();
163  while (seg) {
164  auto next = seg->next.load(std::memory_order_relaxed).get();
165  seg->delete_remaining_items();
166  release_segment(seg);
167  seg = next;
168  }
169  }
170 
171  template <class T, class... Policies>
172  auto kirsch_kfifo_queue<T, Policies...>::alloc_segment() const -> segment* {
173  void* data = ::operator new(sizeof(segment) + k_ * sizeof(entry));
174  auto result = new(data) segment(k_);
175  for (std::size_t i = 0; i < k_; ++i)
176  new(&result->items()[i]) entry();
177  return result;
178  }
179 
180  template <class T, class... Policies>
181  void kirsch_kfifo_queue<T, Policies...>::release_segment(segment* seg) {
182  seg->~segment();
183  ::operator delete(seg);
184  }
185 
186  template <class T, class... Policies>
188  if (value == nullptr)
189  throw std::invalid_argument("value cannot be nullptr");
190 
191  raw_value_type raw_value = traits::get_raw(value);
192  guard_ptr tail_old;
193  for (;;) {
194  // (1) - this acquire-load synchronizes-with the release-CAS (9, 12, 14)
195  tail_old.acquire(tail_, std::memory_order_acquire);
196 
197  // TODO - local linearizability
198 
199  uint64_t idx = 0;
200  marked_value old_value;
201  bool found_idx = find_index<true>(tail_old, idx, old_value);
202  if (tail_old != tail_.load(std::memory_order_relaxed))
203  continue;
204 
205  if (found_idx) {
206  const marked_value new_value(raw_value, old_value.mark() + 1);
207  // (2) - this release-CAS synchronizes-with the acquire-CAS (5)
208  if (tail_old->items()[idx].value.compare_exchange_strong(old_value, new_value,
209  std::memory_order_release, std::memory_order_relaxed) &&
210  committed(tail_old, new_value, idx)) {
211  traits::release(value);
212  // TODO - local linearizability
213  return;
214  }
215  } else {
216  advance_tail(tail_old);
217  }
218  }
219  }
220 
221  template <class T, class... Policies>
223  guard_ptr head_old;
224  for (;;) {
225  // (3) - this acquire-load synchronizes-with the release-CAS (10)
226  head_old.acquire(head_, std::memory_order_acquire);
227  auto h = head_old.get();
228  (void)h;
229  uint64_t idx = 0;
230  marked_value old_value;
231  bool found_idx = find_index<false>(head_old, idx, old_value);
232  if (head_old != head_.load(std::memory_order_relaxed))
233  continue;
234 
235  // (4) - this acquire-load synchronizes-with the release-CAS (9, 12, 14)
236  marked_ptr tail_old = tail_.load(std::memory_order_acquire);
237  if (found_idx) {
238  assert(old_value.get() != (void*)0x100);
239  if (head_old.get() == tail_old.get())
240  advance_tail(tail_old);
241 
242  const marked_value new_value(nullptr, old_value.mark() + 1);
243  // (5) - this acquire-CAS synchronizes-with the release-CAS (2)
244  if (head_old->items()[idx].value.compare_exchange_strong(old_value, new_value,
245  std::memory_order_acquire, std::memory_order_relaxed)) {
246  traits::store(result, old_value.get());
247  return true;
248  }
249  } else {
250  if (head_old.get() == tail_old.get() && tail_old == tail_.load(std::memory_order_relaxed))
251  return false; // queue is empty
252  advance_head(head_old, tail_old);
253  }
254  }
255  }
256 
257  template <class T, class... Policies>
258  template <bool Empty>
260  marked_ptr segment, uint64_t& value_index, marked_value& old) const noexcept
261  {
262  const uint64_t k = segment->k;
263  const uint64_t random_index = utils::random() % k;
264  for (size_t i = 0; i < k; i++) {
265  uint64_t index = ((random_index + i) % k);
266  old = segment->items()[index].value.load(std::memory_order_relaxed);
267  if ((Empty && old.get() == nullptr) || (!Empty && old.get() != nullptr)) {
268  value_index = index;
269  return true;
270  }
271  }
272  return false;
273  }
274 
275  template <class T, class... Policies>
276  bool kirsch_kfifo_queue<T, Policies...>::committed(
277  marked_ptr segment, marked_value value, uint64_t index) noexcept
278  {
279  if (value != segment->items()[index].value.load(std::memory_order_relaxed))
280  return true;
281 
282  const marked_value empty_value(nullptr, value.mark() + 1);
283 
284  if (segment->deleted.load(std::memory_order_relaxed) == true) {
285  // Insert tail segment has been removed, but we are fine if element still has been removed.
286  return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
287  }
288 
289  // (6) - this acquire-load synchronizes-with the release-CAS (10)
290  marked_ptr head_current = head_.load(std::memory_order_acquire);
291  if (segment.get() == head_current.get()) {
292  // Insert tail segment is now head.
293  marked_ptr new_head(head_current.get(), head_current.mark() + 1);
294  // This relaxed-CAS is part of a release sequence headed by (10)
295  if (head_.compare_exchange_strong(head_current, new_head, std::memory_order_relaxed))
296  // We are fine if we can update head and thus fail any concurrent
297  // advance_head attempts.
298  return true;
299 
300  // We are fine if element still has been removed.
301  return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
302  }
303 
304  if (segment->deleted.load(std::memory_order_relaxed) == false) {
305  // Insert tail segment still not deleted.
306  return true;
307  } else {
308  // Head and tail moved beyond this segment. Try to remove the item.
309  // We are fine if element still has been removed.
310  return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
311  }
312  }
313 
314  template <class T, class... Policies>
315  void kirsch_kfifo_queue<T, Policies...>::advance_head(guard_ptr& head_current, marked_ptr tail_current) noexcept {
316  // (7) - this acquire-load synchronizes-with the release-CAS (13)
317  const marked_ptr head_next_segment = head_current->next.load(std::memory_order_acquire);
318  if (head_current != head_.load(std::memory_order_relaxed))
319  return;
320 
321  if (head_current.get() == tail_current.get()) {
322  // (8) - this acquire-load synchronizes-with the release-CAS (13)
323  const marked_ptr tail_next_segment = tail_current->next.load(std::memory_order_acquire);
324  if (tail_next_segment.get() == nullptr)
325  return;
326 
327  if (tail_current == tail_.load(std::memory_order_relaxed)) {
328  marked_ptr new_tail(tail_next_segment.get(), tail_current.mark() + 1);
329  // (9) - this release-CAS synchronizes-with the acquire-load (1, 4)
330  tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
331  }
332  }
333 
334  head_current->deleted.store(true, std::memory_order_relaxed);
335 
336  marked_ptr expected = head_current;
337  marked_ptr new_head(head_next_segment.get(), head_current.mark() + 1);
338  // (10) - this release-CAS synchronizes-with the acquire-load (3, 6)
339  if (head_.compare_exchange_strong(expected, new_head, std::memory_order_release, std::memory_order_relaxed)) {
340  head_current.reclaim();
341  }
342  }
343 
344  template <class T, class... Policies>
345  void kirsch_kfifo_queue<T, Policies...>::advance_tail(marked_ptr tail_current) noexcept {
346  // (11) - this acquire-load synchronizes-with the release-CAS (13)
347  marked_ptr next_segment = tail_current->next.load(std::memory_order_acquire);
348  if (tail_current != tail_.load(std::memory_order_relaxed))
349  return;
350 
351  if (next_segment.get() != nullptr) {
352  marked_ptr new_tail(next_segment.get(), next_segment.mark() + 1);
353  // (12) - this release-CAS synchronizes-with the acquire-load (1, 4)
354  tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
355  } else {
356  auto seg = alloc_segment();
357  const marked_ptr new_segment(seg, next_segment.mark() + 1);
358  // TODO - insert own value to simplify push?
359  // (13) - this release-CAS synchronizes-with the acquire-load (7, 8, 11)
360  if (tail_current->next.compare_exchange_strong(next_segment, new_segment,
361  std::memory_order_release, std::memory_order_relaxed)) {
362  marked_ptr new_tail(seg, tail_current.mark() + 1);
363  // (14) - this release-CAS synchronizes-with the acquire-load (1, 4)
364  tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
365  } else {
366  release_segment(seg);
367  }
368  }
369  }
370 }
371 #endif
xenium::marked_ptr
A pointer with an embedded mark/tag value.
Definition: marked_ptr.hpp:41
xenium::kirsch_kfifo_queue
An unbounded lock-free multi-producer/multi-consumer k-FIFO queue.
Definition: kirsch_kfifo_queue.hpp:44
xenium::marked_ptr::mark
uintptr_t mark() const noexcept
Get the mark value.
Definition: marked_ptr.hpp:70
xenium::kirsch_kfifo_queue::push
void push(value_type value)
Definition: kirsch_kfifo_queue.hpp:187
xenium::policy::padding_bytes
Policy to configure the number of padding bytes to add to each entry in kirsch_kfifo_queue and kirsch...
Definition: policy.hpp:117
xenium::policy::reclaimer
Policy to configure the reclamation scheme to be used.
Definition: policy.hpp:25
xenium::kirsch_kfifo_queue::entry_size
static constexpr std::size_t entry_size
Provides the effective size of a single queue entry (including padding).
Definition: kirsch_kfifo_queue.hpp:101
xenium::marked_ptr::get
T * get() const noexcept
Get underlying pointer (with mark bits stripped off).
Definition: marked_ptr.hpp:77
xenium::kirsch_kfifo_queue::try_pop
bool try_pop(value_type &result)
Definition: kirsch_kfifo_queue.hpp:222