xenium
generic_epoch_based.hpp
1 //
2 // Copyright (c) 2018-2020 Manuel Pöter.
3 // Licensed under the MIT License. See LICENSE file in the project root for full license information.
4 //
5 
6 #ifndef GENERIC_EPOCH_BASED_IMPL
7 #error "This is an impl file and must not be included directly!"
8 #endif
9 
10 #include <xenium/detail/port.hpp>
11 #include <array>
12 
13 #ifdef _MSC_VER
14 #pragma warning(push)
15 #pragma warning(disable: 4127) // conditional expression is constant
16 #endif
17 
18 namespace xenium { namespace reclamation {
19  namespace scan {
23  struct all_threads {
24  template<class Reclaimer>
25  struct type {
26  bool scan(typename Reclaimer::epoch_t epoch) {
27  auto prevents_update = [epoch](const typename Reclaimer::thread_control_block &data) -> bool {
28  // TSan does not support explicit fences, so we cannot rely on the acquire-fence (6)
29  // but have to perform an acquire-load here to avoid false positives.
30  constexpr auto memory_order = TSAN_MEMORY_ORDER(std::memory_order_acquire, std::memory_order_relaxed);
31  return data.is_in_critical_region.load(memory_order) &&
32  data.local_epoch.load(memory_order) != epoch;
33  };
34 
35  // If any thread hasn't advanced to the current epoch, abort the attempt.
36  return !std::any_of(Reclaimer::global_thread_block_list.begin(),
37  Reclaimer::global_thread_block_list.end(),
38  prevents_update);
39  }
40 
41  void reset() {}
42  };
43  };
44 
48  template<unsigned N>
49  struct n_threads {
50  template<class Reclaimer>
51  struct type {
52  type() { reset(); }
53 
54  bool scan(typename Reclaimer::epoch_t epoch) {
55  for (unsigned i = 0; i < N; ++i) {
56  // TSan does not support explicit fences, so we cannot rely on the acquire-fence (6)
57  // but have to perform an acquire-load here to avoid false positives.
58  constexpr auto memory_order = TSAN_MEMORY_ORDER(std::memory_order_acquire, std::memory_order_relaxed);
59  if (!thread_iterator->is_in_critical_region.load(memory_order) ||
60  thread_iterator->local_epoch.load(memory_order) == epoch) {
61  if (++thread_iterator == Reclaimer::global_thread_block_list.end())
62  return true;
63  }
64  }
65  return false;
66  }
67 
68  void reset() {
69  thread_iterator = Reclaimer::global_thread_block_list.begin();
70  }
71 
72  private:
73  typename detail::thread_block_list<typename Reclaimer::thread_control_block>::iterator thread_iterator;
74  };
75  };
76 
80  struct one_thread {
81  template<class Reclaimer>
82  using type = n_threads<1>::type<Reclaimer>;
83  };
84  }
85 
86  namespace abandon {
90  struct never {
91  using retire_list = detail::retire_list<>;
92  static void apply(retire_list&, detail::orphan_list<>&) {}
93  };
94 
99  struct always {
100  using retire_list = detail::retire_list<>;
101  static void apply(retire_list& retire_list, detail::orphan_list<>& orphans)
102  {
103  if (!retire_list.empty())
104  orphans.add(retire_list.steal());
105  }
106  };
107 
112  template <size_t Threshold>
114  using retire_list = detail::counting_retire_list<>;
115  static void apply(retire_list& retire_list, detail::orphan_list<>& orphans)
116  {
117  if (retire_list.size() >= Threshold)
118  orphans.add(retire_list.steal());
119  }
120  };
121  }
122 
123  template <class Traits>
125  {
126  local_thread_data.enter_region();
127  }
128 
129  template <class Traits>
130  generic_epoch_based<Traits>::region_guard::~region_guard() noexcept
131  {
132  local_thread_data.leave_region();
133  }
134 
135  template <class Traits>
136  template <class T, class MarkedPtr>
137  generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::guard_ptr(const MarkedPtr& p) noexcept :
138  base(p)
139  {
140  if (this->ptr)
141  local_thread_data.enter_critical();
142  }
143 
144  template <class Traits>
145  template <class T, class MarkedPtr>
146  generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::guard_ptr(const guard_ptr& p) noexcept :
147  guard_ptr(MarkedPtr(p))
148  {}
149 
150  template <class Traits>
151  template <class T, class MarkedPtr>
152  generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::guard_ptr(guard_ptr&& p) noexcept :
153  base(p.ptr)
154  {
155  p.ptr.reset();
156  }
157 
158  template <class Traits>
159  template <class T, class MarkedPtr>
160  auto generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::operator=(const guard_ptr& p) noexcept
161  -> guard_ptr&
162  {
163  if (&p == this)
164  return *this;
165 
166  reset();
167  this->ptr = p.ptr;
168  if (this->ptr)
169  local_thread_data.enter_critical();
170 
171  return *this;
172  }
173 
174  template <class Traits>
175  template <class T, class MarkedPtr>
176  auto generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::operator=(guard_ptr&& p) noexcept
177  -> guard_ptr&
178  {
179  if (&p == this)
180  return *this;
181 
182  reset();
183  this->ptr = std::move(p.ptr);
184  p.ptr.reset();
185 
186  return *this;
187  }
188 
189  template <class Traits>
190  template <class T, class MarkedPtr>
191  void generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::acquire(
192  const concurrent_ptr<T>& p, std::memory_order order) noexcept
193  {
194  if (p.load(std::memory_order_relaxed) == nullptr)
195  {
196  reset();
197  return;
198  }
199 
200  if (!this->ptr)
201  local_thread_data.enter_critical();
202  // (1) - this load operation potentially synchronizes-with any release operation on p.
203  this->ptr = p.load(order);
204  if (!this->ptr)
205  local_thread_data.leave_critical();
206  }
207 
208  template <class Traits>
209  template <class T, class MarkedPtr>
210  bool generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::acquire_if_equal(
211  const concurrent_ptr<T>& p, const MarkedPtr& expected, std::memory_order order) noexcept
212  {
213  auto actual = p.load(std::memory_order_relaxed);
214  if (actual == nullptr || actual != expected)
215  {
216  reset();
217  return actual == expected;
218  }
219 
220  if (!this->ptr)
221  local_thread_data.enter_critical();
222  // (2) - this load operation potentially synchronizes-with any release operation on p.
223  this->ptr = p.load(order);
224  if (!this->ptr || this->ptr != expected)
225  {
226  local_thread_data.leave_critical();
227  this->ptr.reset();
228  }
229 
230  return this->ptr == expected;
231  }
232 
233  template <class Traits>
234  template <class T, class MarkedPtr>
235  void generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::reset() noexcept
236  {
237  if (this->ptr)
238  local_thread_data.leave_critical();
239  this->ptr.reset();
240  }
241 
242  template <class Traits>
243  template <class T, class MarkedPtr>
244  void generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::reclaim(Deleter d) noexcept
245  {
246  this->ptr->set_deleter(std::move(d));
247  local_thread_data.add_retired_node(this->ptr.get());
248  reset();
249  }
250 
251  template <class Traits>
252  struct generic_epoch_based<Traits>::thread_control_block :
253  detail::thread_block_list<thread_control_block>::entry
254  {
255  thread_control_block() :
256  is_in_critical_region(false),
257  local_epoch(number_epochs)
258  {}
259 
260  std::atomic<bool> is_in_critical_region;
261  std::atomic<epoch_t> local_epoch;
262  };
263 
264  template <class Traits>
265  struct generic_epoch_based<Traits>::thread_data
266  {
267  ~thread_data()
268  {
269  if (control_block == nullptr)
270  return; // no control_block -> nothing to do
271 
272  for (unsigned i = 0; i < number_epochs; ++i)
273  if (!retire_lists[i].empty())
274  orphans[i].add(retire_lists[i].steal());
275 
276  assert(control_block->is_in_critical_region.load(std::memory_order_relaxed) == false);
277  global_thread_block_list.release_entry(control_block);
278  control_block = nullptr;
279  }
280 
281  void enter_region()
282  {
283  ensure_has_control_block();
284  if (Traits::region_extension_type != region_extension::none && ++region_entries == 1)
285  {
286  if (Traits::region_extension_type == region_extension::eager)
287  set_critical_region_flag();
288  }
289  }
290 
291  void leave_region()
292  {
293  if (Traits::region_extension_type != region_extension::none && --region_entries == 0)
294  {
295  clear_critical_region_flag();
296  }
297  }
298 
299  void enter_critical()
300  {
301  enter_region();
302  if (++nested_critical_entries == 1)
303  do_enter_critical();
304  }
305 
306  void leave_critical()
307  {
308  if (--nested_critical_entries == 0 && Traits::region_extension_type == region_extension::none)
309  clear_critical_region_flag();
310  leave_region();
311  }
312 
313  private:
314  void ensure_has_control_block()
315  {
316  if (XENIUM_UNLIKELY(control_block == nullptr))
317  acquire_control_block();
318  }
319 
320  XENIUM_NOINLINE void acquire_control_block()
321  {
322  assert(control_block == nullptr);
323  control_block = global_thread_block_list.acquire_entry();
324  assert(control_block->is_in_critical_region.load() == false);
325  auto epoch = global_epoch.load(std::memory_order_relaxed);
326  control_block->local_epoch.store(epoch, std::memory_order_relaxed);
327  local_epoch_idx = epoch % number_epochs;
328  scan_strategy.reset();
329  }
330 
331  void set_critical_region_flag()
332  {
333  assert(!control_block->is_in_critical_region.load(std::memory_order_relaxed));
334  control_block->is_in_critical_region.store(true, std::memory_order_relaxed);
335  // (3) - this seq_cst-fence enforces a total order with itself, and
336  // synchronizes-with the acquire-fence (6)
337  std::atomic_thread_fence(std::memory_order_seq_cst);
338  }
339 
340  void clear_critical_region_flag()
341  {
342  //if (Traits::region_extension_type == region_extension::lazy && control_block == nullptr)
343  // return;
344 
345  assert(control_block != nullptr);
346  assert(control_block->is_in_critical_region.load(std::memory_order_relaxed));
347 
348  // (4) - this release-store synchronizes-with the acquire-fence (6)
349  control_block->is_in_critical_region.store(false, std::memory_order_release);
350  for (unsigned i = 0; i < number_epochs; ++i)
351  Traits::abandon_strategy::apply(retire_lists[i], orphans[i]);
352  }
353 
354  void do_enter_critical()
355  {
356  if (Traits::region_extension_type == region_extension::none)
357  set_critical_region_flag();
358  else if (Traits::region_extension_type == region_extension::lazy)
359  {
360  if (!control_block->is_in_critical_region.load(std::memory_order_relaxed))
361  set_critical_region_flag();
362  }
363 
364  assert(control_block->is_in_critical_region.load(std::memory_order_relaxed));
365 
366  // (5) - this acquire-load synchronizes-with the release-CAS (7)
367  auto epoch = global_epoch.load(std::memory_order_acquire);
368  if (control_block->local_epoch.load(std::memory_order_relaxed) != epoch) // New epoch?
369  {
370  critical_entries_since_update = 0;
371  update_local_epoch(epoch);
372  }
373  else if (critical_entries_since_update++ == Traits::scan_frequency)
374  {
375  critical_entries_since_update = 0;
376  if (scan_strategy.scan(epoch))
377  {
378  epoch = update_global_epoch(epoch, epoch + 1);
379  update_local_epoch(epoch);
380  }
381  }
382  }
383 
384  void update_local_epoch(epoch_t new_epoch)
385  {
386  // we either just updated the global_epoch or we are observing a new epoch
387  // from some other thread either way - we can reclaim all the objects from
388  // the old 'incarnation' of this epoch, as well as from other epochs we
389  // might have missed when we were not in a critical region.
390 
391  const auto old_epoch = control_block->local_epoch.load(std::memory_order_relaxed);
392  assert(new_epoch > old_epoch);
393  // TSan does not support explicit fences, so we cannot rely on the fences (3) and (6)
394  // but have to perform a release-store here to avoid false positives.
395  constexpr auto memory_order = TSAN_MEMORY_ORDER(std::memory_order_release, std::memory_order_relaxed);
396  control_block->local_epoch.store(new_epoch, memory_order);
397 
398  auto diff = std::min<int>(static_cast<int>(number_epochs), static_cast<int>(new_epoch - old_epoch));
399  epoch_t epoch_idx = local_epoch_idx;
400  for (int i = diff - 1; i >= 0; --i)
401  {
402  epoch_idx = (new_epoch - i) % number_epochs;
403  auto nodes = retire_lists[epoch_idx].steal();
404  detail::delete_objects(nodes.first);
405  }
406  local_epoch_idx = epoch_idx;
407 
408  scan_strategy.reset();
409  }
410 
411  epoch_t update_global_epoch(epoch_t curr_epoch, epoch_t new_epoch)
412  {
413  if (global_epoch.load(std::memory_order_relaxed) == curr_epoch)
414  {
415  // (6) - due to the load operations in scan, this acquire-fence synchronizes-with the release-store (4)
416  // and the seq-cst fence (3)
417  std::atomic_thread_fence(std::memory_order_acquire);
418 
419  // (7) - this release-CAS synchronizes-with the acquire-load (5)
420  bool success = global_epoch.compare_exchange_strong(curr_epoch, new_epoch,
421  std::memory_order_release,
422  std::memory_order_relaxed);
423  if (XENIUM_LIKELY(success))
424  reclaim_orphans(new_epoch);
425  }
426  return new_epoch;
427  }
428 
429  void add_retired_node(detail::deletable_object* p)
430  {
431  retire_lists[local_epoch_idx].push(p);
432  }
433 
434  void reclaim_orphans(epoch_t epoch)
435  {
436  auto idx = epoch % number_epochs;
437  auto nodes = orphans[idx].adopt();
438  detail::delete_objects(nodes);
439  }
440 
441  unsigned critical_entries_since_update = 0;
442  unsigned nested_critical_entries = 0;
443  unsigned region_entries = 0;
444  typename Traits::scan_strategy::template type<generic_epoch_based> scan_strategy;
445  thread_control_block* control_block = nullptr;
446  epoch_t local_epoch_idx = 0;
447  std::array<typename Traits::abandon_strategy::retire_list, number_epochs> retire_lists = {};
448 
449  friend class generic_epoch_based;
450  ALLOCATION_COUNTER(generic_epoch_based);
451  };
452 
453 #ifdef TRACK_ALLOCATIONS
454  template <class Traits>
455  inline void generic_epoch_based<Traits>::count_allocation()
456  { local_thread_data.allocation_counter.count_allocation(); }
457 
458  template <class Traits>
459  inline void generic_epoch_based<Traits>::count_reclamation()
460  { local_thread_data.allocation_counter.count_reclamation(); }
461 #endif
462 }}
463 
464 #ifdef _MSC_VER
465 #pragma warning(pop)
466 #endif
A generalized implementation of epoch based reclamation.
Definition: generic_epoch_based.hpp:240
Always abandon the remaining retired nodes when the thread leaves its critical region.
Definition: generic_epoch_based.hpp:99
Never abandon any nodes (except when the thread terminates).
Definition: generic_epoch_based.hpp:90
Abandon the retired nodes upon leaving the critical region when the number of nodes exceeds the speci...
Definition: generic_epoch_based.hpp:113
Scan all threads (default behaviour in EBR/NEBR).
Definition: generic_epoch_based.hpp:23
Scan N threads.
Definition: generic_epoch_based.hpp:49
Scan a single thread (default behaviour in DEBRA).
Definition: generic_epoch_based.hpp:80