xenium
vyukov_hash_map.hpp
1//
2// Copyright (c) 2018-2020 Manuel Pöter.
3// Licensed under the MIT License. See LICENSE file in the project root for full license information.
4//
5
6#ifndef XENIUM_VYUKOV_HASH_MAP_IMPL
7#error "This is an impl file and must not be included directly!"
8#endif
9
10#include <xenium/acquire_guard.hpp>
11#include <xenium/backoff.hpp>
12#include <xenium/parameter.hpp>
13#include <xenium/policy.hpp>
14
15#include <atomic>
16#include <cstring>
17
18#ifdef _MSC_VER
19#pragma warning(push)
20#pragma warning(disable: 26495) // uninitialized member variable
21#pragma warning(disable: 4324) // structure was padded due to alignment specifier
22#endif
23
24namespace xenium {
25
26template <class Key, class Value, class... Policies>
27struct vyukov_hash_map<Key, Value, Policies...>::bucket_state {
28 bucket_state() noexcept : value() {}
29
30 bucket_state locked() const noexcept { return value | lock; }
31 bucket_state clear_lock() const {
32 assert(value & lock);
33 return value ^ lock;
34 }
35 bucket_state new_version() const noexcept { return value + version_inc; }
36 bucket_state inc_item_count() const {
37 assert(item_count() < bucket_item_count);
38 bucket_state result(value + item_count_inc);
39 assert(result.item_count() == item_count() + 1);
40 return result;
41 }
42 bucket_state dec_item_count() const {
43 assert(item_count() > 0);
44 bucket_state result(value - item_count_inc);
45 assert(result.item_count() == item_count() - 1);
46 return result;
47 }
48 bucket_state set_delete_marker(std::uint32_t marker) const {
49 assert(delete_marker() == 0);
50 bucket_state result(value | (marker << delete_marker_shift));
51 assert(result.delete_marker() == marker);
52 return result;
53 }
54
55 bool operator==(bucket_state r) const noexcept { return this->value == r.value; }
56 bool operator!=(bucket_state r) const noexcept { return this->value != r.value; }
57
58 std::uint32_t item_count() const noexcept { return (value >> 1) & item_count_mask; }
59 std::uint32_t delete_marker() const noexcept { return (value >> delete_marker_shift) & item_count_mask; }
60 std::uint32_t version() const noexcept { return value >> version_shift; }
61 bool is_locked() const noexcept { return (value & lock) != 0; }
62
63private:
64 bucket_state(std::uint32_t value) noexcept : value(value) {}
65
66 std::uint32_t value;
67
68 /*
69 value has the same structure as the following bit field:
70 struct {
71 // the lock bit
72 unsigned lock : 1;
73
74 // the number of items in the bucket array
75 unsigned item_count : find_last_bit_set(bucket_item_count);
76
77 // marker for the item that is currently beeing removed
78 unsigned delete_marker : find_last_bit_set(bucket_item_count);
79
80 // version counter - gets incremented at the end of every remove operation
81 unsigned version : sizeof(unsigned) * 8 - 2 * find_last_bit_set(bucket_item_count) - 1;
82 };
83 */
84
85 static constexpr std::size_t item_counter_bits = utils::find_last_bit_set(bucket_item_count);
86
87 static constexpr std::size_t item_count_shift = 1;
88 static constexpr std::size_t delete_marker_shift = item_count_shift + item_counter_bits;
89 static constexpr std::size_t version_shift = delete_marker_shift + item_counter_bits;
90
91 static constexpr std::uint32_t lock = 1;
92 static constexpr std::uint32_t version_inc = 1 << version_shift;
93 static constexpr std::uint32_t item_count_inc = 1 << item_count_shift;
94
95 static constexpr std::uint32_t item_count_mask = (1 << item_counter_bits) - 1;
96};
97
98template <class Key, class Value, class... Policies>
99struct vyukov_hash_map<Key, Value, Policies...>::bucket {
100 std::atomic<bucket_state> state;
101 std::atomic<extension_item*> head;
102 typename traits::storage_key_type key[bucket_item_count];
103 typename traits::storage_value_type value[bucket_item_count];
104};
105
106template <class Key, class Value, class... Policies>
107struct vyukov_hash_map<Key, Value, Policies...>::extension_item {
108 typename traits::storage_key_type key;
109 typename traits::storage_value_type value;
110 std::atomic<extension_item*> next;
111};
112
113template <class Key, class Value, class... Policies>
114struct vyukov_hash_map<Key, Value, Policies...>::extension_bucket {
115 std::atomic<std::uint32_t> lock;
116 std::atomic<extension_item*> head;
117 extension_item items[extension_item_count];
118
119 void acquire_lock() {
120 backoff backoff;
121 for (;;) {
122 while (lock.load(std::memory_order_relaxed))
123 ;
124 // (1) - this acquire-exchange synchronizes-with the release-store (2)
125 if (lock.exchange(1, std::memory_order_acquire) == 0)
126 return;
127 backoff();
128 }
129 }
130 void release_lock() {
131 // (2) - this release-store synchronizes-with the acquire-exchange (1)
132 lock.store(0, std::memory_order_release);
133 }
134};
135
136template <class Key, class Value, class... Policies>
137struct alignas(64) vyukov_hash_map<Key, Value, Policies...>::block :
138 reclaimer::template enable_concurrent_ptr<block>
139{
140 std::uint32_t mask;
141 std::uint32_t bucket_count;
142 std::uint32_t extension_bucket_count;
143 extension_bucket* extension_buckets;
144
145 // TODO - adapt to be customizable via map_to_bucket policy
146 std::uint32_t index(const key_type& key) const { return static_cast<std::uint32_t>(key & mask); }
147 bucket* buckets() { return reinterpret_cast<bucket*>(this+1); }
148
149 void operator delete(void* p) { ::operator delete(p, cacheline_size); }
150};
151
152template <class Key, class Value, class... Policies>
153struct vyukov_hash_map<Key, Value, Policies...>::unlocker {
154 unlocker(bucket& locked_bucket, bucket_state state) : state(state), locked_bucket(locked_bucket) {}
155 ~unlocker() {
156 if (enabled) {
157 assert(locked_bucket.state.load().is_locked());
158 locked_bucket.state.store(state, std::memory_order_relaxed);
159 }
160 }
161 void unlock(bucket_state new_state, std::memory_order order) {
162 assert(enabled);
163 assert(locked_bucket.state.load().is_locked());
164 locked_bucket.state.store(new_state, order);
165 enabled = false;
166 }
167 void disable() { enabled = false; }
168private:
169 bool enabled = true;
170 bucket_state state;
171 bucket& locked_bucket;
172};
173
174template <class Key, class Value, class... Policies>
175vyukov_hash_map<Key, Value, Policies...>::vyukov_hash_map(std::size_t initial_capacity) :
176 resize_lock(0)
177{
178 auto b = allocate_block(static_cast<std::uint32_t>(utils::next_power_of_two(initial_capacity)));
179 if (b == nullptr)
180 throw std::bad_alloc();
181 data_block.store(b, std::memory_order_relaxed);
182}
183
184template <class Key, class Value, class... Policies>
185vyukov_hash_map<Key, Value, Policies...>::~vyukov_hash_map() {
186 // delete all remaining entries - this also reclaims any internally allocated
187 // nodes as well as managed_ptr instances.
188 // This could be implemented in a more efficient way, but it works for now!
189 auto it = begin();
190 while (it != end())
191 erase(it);
192 delete data_block.load().get();
193}
194
195template <class Key, class Value, class... Policies>
196bool vyukov_hash_map<Key, Value, Policies...>::emplace(key_type key, value_type value) {
197 return do_get_or_emplace<false>(
198 std::move(key),
199 [v = std::move(value)]() mutable { return std::move(v); },
200 [](accessor&&, auto&) {});
201}
202
203template <class Key, class Value, class... Policies>
204template <class... Args>
205auto vyukov_hash_map<Key, Value, Policies...>::get_or_emplace(key_type key, Args&&... args)
206 -> std::pair<accessor, bool>
207{
208 std::pair<accessor, bool> result;
209 result.second = do_get_or_emplace<true>(
210 std::move(key),
211 [&]{ return value_type(std::forward<Args>(args)...); },
212 [&result](accessor&& acc, auto&) {
213 result.first = std::move(acc);
214 });
215 return result;
216}
217
218template <class Key, class Value, class... Policies>
219template <class Factory>
220auto vyukov_hash_map<Key, Value, Policies...>::get_or_emplace_lazy(key_type key, Factory&& factory)
221 -> std::pair<accessor, bool>
222{
223 std::pair<accessor, bool> result;
224 result.second = do_get_or_emplace<true>(
225 std::move(key),
226 std::forward<Factory>(factory),
227 [&result](accessor&& acc, auto&) {
228 result.first = std::move(acc);
229 });
230 return result;
231}
232
233template <class Key, class Value, class... Policies>
234template <bool AcquireAccessor, class Factory, class Callback>
235bool vyukov_hash_map<Key, Value, Policies...>::do_get_or_emplace(Key&& key, Factory&& factory, Callback&& callback) {
236 const hash_t h = hash{}(key);
237
238 accessor acc;
239retry:
240 guarded_block b;
241 bucket_state state;
242 bucket& bucket = lock_bucket(h, b, state);
243
244 unlocker unlocker(bucket, state);
245
246 std::uint32_t item_count = state.item_count();
247
248 for (std::uint32_t i = 0; i != item_count; ++i)
249 if (traits::template compare_key<AcquireAccessor>(bucket.key[i], bucket.value[i], key, h, acc)) {
250 callback(std::move(acc), bucket.value[i]);
251 unlocker.unlock(state, std::memory_order_relaxed);
252 return false;
253 }
254
255 if (item_count < bucket_item_count) {
256 traits::template store_item<AcquireAccessor>(bucket.key[item_count], bucket.value[item_count], h,
257 std::move(key), factory(), std::memory_order_relaxed, acc);
258 callback(std::move(acc), bucket.value[item_count]);
259 // release the bucket lock and increment the item count
260 // (3) - this release-store synchronizes-with the acquire-CAS (7, 30, 34, 37) and the acquire-load (23)
261 unlocker.unlock(state.inc_item_count(), std::memory_order_release);
262 return true;
263 }
264
265 // TODO - keep extension items sorted
266 for (extension_item* extension = bucket.head.load(std::memory_order_relaxed);
267 extension != nullptr;
268 extension = extension->next.load(std::memory_order_relaxed)) {
269 if (traits::template compare_key<AcquireAccessor>(extension->key, extension->value, key, h, acc)) {
270 callback(std::move(acc), extension->value);
271 unlocker.unlock(state, std::memory_order_relaxed);
272 return false;
273 }
274 }
275
276 extension_item* extension = allocate_extension_item(b.get(), h);
277 if (extension == nullptr) {
278 unlocker.disable(); // bucket is unlocked in grow()
279 grow(bucket, state);
280 goto retry;
281 }
282 try {
283 traits::template store_item<AcquireAccessor>(extension->key, extension->value, h,
284 std::move(key), factory(), std::memory_order_relaxed, acc);
285 } catch(...) {
286 free_extension_item(extension);
287 throw;
288 }
289 callback(std::move(acc), extension->value);
290 auto old_head = bucket.head.load(std::memory_order_relaxed);
291 extension->next.store(old_head, std::memory_order_relaxed);
292 // (4) - this release-store synchronizes-with the acquire-load (25)
293 bucket.head.store(extension, std::memory_order_release);
294 // release the bucket lock
295 // (5) - this release-store synchronizes-with the acquire-CAS (7, 30, 34, 37) and the acquire-load (23)
296 unlocker.unlock(state, std::memory_order_release);
297
298 return true;
299}
300
301template <class Key, class Value, class... Policies>
303 accessor acc;
304 bool result = do_extract(key, acc);
305 traits::reclaim(acc);
306 return result;
307}
308
309template <class Key, class Value, class... Policies>
310bool vyukov_hash_map<Key, Value, Policies...>::extract(const key_type& key, accessor& acc) {
311 bool result = do_extract(key, acc);
312 traits::reclaim_internal(acc);
313 return result;
314}
315
316template <class Key, class Value, class... Policies>
317bool vyukov_hash_map<Key, Value, Policies...>::do_extract(const key_type& key, accessor& result) {
318 const hash_t h = hash{}(key);
319 backoff backoff;
320 guarded_block b;
321
322restart:
323 // (6) - this acquire-load synchronizes-with the release-store (31)
324 b.acquire(data_block, std::memory_order_acquire);
325 const std::size_t bucket_idx = h & b->mask;
326 bucket& bucket = b->buckets()[bucket_idx];
327 bucket_state state = bucket.state.load(std::memory_order_relaxed);
328 std::uint32_t item_count = state.item_count();
329
330 if (item_count == 0)
331 return false; // no items to check
332
333 if (state.is_locked())
334 goto restart;
335
336 auto locked_state = state.locked();
337 // (7) - this acquire-CAS synchronizes-with the release-store (3, 5, 11, 13, 14, 36, 38)
338 if (!bucket.state.compare_exchange_strong(state, locked_state, std::memory_order_acquire,
339 std::memory_order_relaxed)) {
340 backoff();
341 goto restart;
342 }
343
344 unlocker unlocker(bucket, state);
345
346 // we have the lock - now look for the key
347
348 for (std::uint32_t i = 0; i != item_count; ++i) {
349 if (traits::template compare_key<true>(bucket.key[i], bucket.value[i], key, h, result)) {
350 extension_item* extension = bucket.head.load(std::memory_order_relaxed);
351 if (extension) {
352 // signal which item we are deleting
353 bucket.state.store(locked_state.set_delete_marker(i + 1), std::memory_order_relaxed);
354
355 auto k = extension->key.load(std::memory_order_relaxed);
356 auto v = extension->value.load(std::memory_order_relaxed);
357 bucket.key[i].store(k, std::memory_order_relaxed);
358 // (8) - this release-store synchronizes-with the acquire-load (24)
359 bucket.value[i].store(v, std::memory_order_release);
360
361 // reset the delete marker
362 locked_state = locked_state.new_version();
363 // (9) - this release-store synchronizes-with the acquire-load (23)
364 bucket.state.store(locked_state, std::memory_order_release);
365
366 extension_item* extension_next = extension->next.load(std::memory_order_relaxed);
367 // (10) - this release-store synchronizes-with the acquire-load (25)
368 bucket.head.store(extension_next, std::memory_order_release);
369
370 // release the bucket lock and increase the version
371 // (11) - this release-store synchronizes-with the acquire-CAS (7, 30, 34, 37) and the acquire-load (23)
372 unlocker.unlock(locked_state.new_version().clear_lock(), std::memory_order_release);
373
374 free_extension_item(extension);
375 } else {
376 if (i != item_count - 1) {
377 // signal which item we are deleting
378 bucket.state.store(locked_state.set_delete_marker(i + 1), std::memory_order_relaxed);
379
380 auto k = bucket.key[item_count - 1].load(std::memory_order_relaxed);
381 auto v = bucket.value[item_count - 1].load(std::memory_order_relaxed);
382 bucket.key[i].store(k, std::memory_order_relaxed);
383 // (12) - this release-store synchronizes-with the acquire-load (24)
384 bucket.value[i].store(v, std::memory_order_release);
385 }
386
387 // release the bucket lock, reset the delete marker (if it is set), increase the version
388 // and decrement the item counter.
389 // (13) - this release-store synchronizes-with the acquire-CAS (7, 30, 34, 37) and the acquire-load (23)
390 unlocker.unlock(state.new_version().dec_item_count(), std::memory_order_release);
391 }
392 return true;
393 }
394 }
395
396 auto extension_prev = &bucket.head;
397 extension_item* extension = extension_prev->load(std::memory_order_relaxed);
398 while (extension) {
399 if (traits::template compare_key<true>(extension->key, extension->value, key, h, result)) {
400 extension_item* extension_next = extension->next.load(std::memory_order_relaxed);
401 extension_prev->store(extension_next, std::memory_order_relaxed);
402
403 // release the bucket lock and increase the version
404 // (14) - this release-store synchronizes-with the acquire-CAS (7, 30, 34, 37) and the acquire-load (23)
405 unlocker.unlock(state.new_version(), std::memory_order_release);
406
407 free_extension_item(extension);
408 return true;
409 }
410 extension_prev = &extension->next;
411 extension = extension_prev->load(std::memory_order_relaxed);
412 }
413
414 // key not found
415
416 // release the bucket lock
417 unlocker.unlock(state, std::memory_order_relaxed);
418
419 return false;
420}
421
422template <class Key, class Value, class... Policies>
424 if (it.extension) {
425 // the item we are currently looking at is an extension item
426 auto next = it.extension->next.load(std::memory_order_relaxed);
427 it.prev->store(next, std::memory_order_relaxed);
428 auto new_state = it.current_bucket_state.locked().new_version();
429 // (15) - this release-store synchronizes-with the acquire-load (23)
430 it.current_bucket->state.store(new_state, std::memory_order_release);
431
432 free_extension_item(it.extension);
433 it.extension = next;
434 if (next == nullptr)
435 it.move_to_next_bucket();
436 return;
437 }
438
439 // the item we are currently looking at is in the bucket array
440
441 auto extension = it.current_bucket->head.load(std::memory_order_relaxed);
442 if (extension) {
443 auto locked_state = it.current_bucket_state.locked();
444 auto marked_state = locked_state.set_delete_marker(it.index + 1);
445 it.current_bucket->state.store(marked_state, std::memory_order_relaxed);
446 assert(it.current_bucket->state.load().is_locked());
447
448 auto k = extension->key.load(std::memory_order_relaxed);
449 auto v = extension->value.load(std::memory_order_relaxed);
450 it.current_bucket->key[it.index].store(k, std::memory_order_relaxed);
451 // (16) - this release-store synchronizes-with the acquire-load (24)
452 it.current_bucket->value[it.index].store(v, std::memory_order_release);
453
454 // reset the delete marker
455 locked_state = locked_state.new_version();
456 // (17) - this release-store synchronizes-with the acquire-load (23)
457 it.current_bucket->state.store(locked_state, std::memory_order_release);
458 assert(it.current_bucket->state.load().is_locked());
459
460 auto next = extension->next.load(std::memory_order_relaxed);
461 // (18) - this release-store synchronizes-with the acquire-load (25)
462 it.current_bucket->head.store(next, std::memory_order_release);
463
464 // increase the version but keep the lock
465 // (19) - this release-store synchronizes-with the acquire-load (23)
466 it.current_bucket->state.store(locked_state.new_version(), std::memory_order_release);
467 assert(it.current_bucket->state.load().is_locked());
468 free_extension_item(extension);
469 } else {
470 auto max_index = it.current_bucket_state.item_count() - 1;
471 if (it.index != max_index) {
472 // signal which item we are deleting
473 auto locked_state = it.current_bucket_state.locked();
474 auto marked_state = locked_state.set_delete_marker(it.index + 1);
475 it.current_bucket->state.store(marked_state, std::memory_order_relaxed);
476 assert(it.current_bucket->state.load().is_locked());
477
478 auto k = it.current_bucket->key[max_index].load(std::memory_order_relaxed);
479 auto v = it.current_bucket->value[max_index].load(std::memory_order_relaxed);
480 it.current_bucket->key[it.index].store(k, std::memory_order_relaxed);
481 // (20) - this release-store synchronizes-with the acquire-load (24)
482 it.current_bucket->value[it.index].store(v, std::memory_order_release);
483 }
484
485 auto new_state = it.current_bucket_state.new_version().dec_item_count();
486 it.current_bucket_state = new_state;
487
488 // (21) - this release store synchronizes-with the acquire-load (23)
489 it.current_bucket->state.store(new_state.locked(), std::memory_order_release);
490 assert(it.current_bucket->state.load().is_locked());
491 if (it.index == new_state.item_count()) {
492 it.prev = &it.current_bucket->head;
493 it.extension = it.current_bucket->head.load(std::memory_order_relaxed);
494 if (it.extension == nullptr)
495 it.move_to_next_bucket();
496 }
497 }
498}
499
500template <class Key, class Value, class... Policies>
501bool vyukov_hash_map<Key, Value, Policies...>::try_get_value(const key_type& key, accessor& value) const {
502 const hash_t h = hash{}(key);
503
504 // (22) - this acquire-load synchronizes-with the release-store (31)
505 guarded_block b = acquire_guard(data_block, std::memory_order_acquire);
506 const std::size_t bucket_idx = h & b->mask;
507 bucket& bucket = b->buckets()[bucket_idx];
508
509retry:
510 // data_block (and therefore the bucket) can only change due to a concurrent grow()
511 // operation, but since grow() does not change the content of a bucket we can ignore
512 // it and don't have to reread everything during a retry.
513
514 // (23) - this acquire-load synchronizes-with the release-store (3, 5, 9, 11, 13, 14, 15, 17, 19, 21, 36, 38)
515 bucket_state state = bucket.state.load(std::memory_order_acquire);
516
517 std::uint32_t item_count = state.item_count();
518 for (std::uint32_t i = 0; i != item_count; ++i) {
519 if (traits::compare_trivial_key(bucket.key[i], key, h)) {
520 // use acquire semantic here - should synchronize-with the release store to value
521 // in remove() to ensure that if we see the changed value here we also see the
522 // changed state in the subsequent reload of state
523 // (24) - this acquire-load synchronizes-with the release-store (8, 12, 16, 20)
524 accessor acc = traits::acquire(bucket.value[i], std::memory_order_acquire);
525
526 // ensure that we can use the value we just read
527 const auto state2 = bucket.state.load(std::memory_order_relaxed);
528 if (state.version() != state2.version()) {
529 // a deletion has occured in the meantime -> we have to retry
530 state = state2;
531 goto retry;
532 }
533
534 const auto delete_marker = i + 1;
535 if (state2.delete_marker() == delete_marker) {
536 // Some other thread is currently deleting the entry at this index.
537 // The key we just read can be either the original one (i.e., the key that is
538 // currently beeing deleted), or another key that is beeing moved to this slot
539 // to replace the deleted entry.
540 // Unfortunately we cannot differentiate between these two cases, so we simply
541 // ignore this item and continue with our search. If we can finish our search
542 // before the deleting thread can finish moving some key/value pair into this
543 // slot everything is fine. If the delete operation finished before our search,
544 // we will recognize this by an updated state and retry.
545 continue;
546 }
547
548 if (!traits::compare_nontrivial_key(acc, key))
549 continue;
550
551 value = std::move(acc);
552 return true;
553 }
554 }
555
556 // (25) - this acquire-load synchronizes-with the release-store (4, 10, 18)
557 extension_item* extension = bucket.head.load(std::memory_order_acquire);
558 while (extension) {
559 if (traits::compare_trivial_key(extension->key, key, h)) {
560 // TODO - this acquire does not synchronize with anything ATM.
561 // However, this is probably required when introducing an update-method that
562 // allows to store a new value.
563 // (26) - this acquire-load synchronizes-with <nothing>
564 accessor acc = traits::acquire(extension->value, std::memory_order_acquire);
565
566 auto state2 = bucket.state.load(std::memory_order_relaxed);
567 if (state.version() != state2.version()) {
568 // a deletion has occured in the meantime -> we have to retry
569 state = state2;
570 goto retry;
571 }
572
573 if (!traits::compare_nontrivial_key(acc, key))
574 continue;
575
576 value = std::move(acc);
577 return true;
578 }
579
580 // (27) - this acquire-load synchronizes-with the release-store (35)
581 extension = extension->next.load(std::memory_order_acquire);
582 auto state2 = bucket.state.load(std::memory_order_relaxed);
583 if (state.version() != state2.version()) {
584 // a deletion has occured in the meantime -> we have to retry
585 state = state2;
586 goto retry;
587 }
588 }
589
590 auto state2 = bucket.state.load(std::memory_order_relaxed);
591 if (state.version() != state2.version()) {
592 state = state2;
593 // a deletion has occured -> we have to retry since the entry we are looking for might
594 // have been moved while we were searching
595 goto retry;
596 }
597
598 return false;
599}
600
601template <class Key, class Value, class... Policies>
602void vyukov_hash_map<Key, Value, Policies...>::grow(bucket& bucket, bucket_state state)
603{
604 // try to acquire the resizeLock
605 const int already_resizing = resize_lock.exchange(1, std::memory_order_relaxed);
606
607 // release the bucket lock
608 bucket.state.store(state, std::memory_order_relaxed);
609
610 // we intentionally release the bucket lock only after we tried to acquire
611 // the resize_lock, to avoid the situation where we might miss a resize
612 // performed by some other thread.
613
614 if (already_resizing) {
615 backoff backoff;
616 // another thread is already resizing -> wait for it to finish
617 // (28) - this acquire-load synchronizes-with the release-store (32)
618 while (resize_lock.load(std::memory_order_acquire) != 0)
619 backoff();
620
621 return;
622 }
623
624 do_grow();
625}
626
627template <class Key, class Value, class... Policies>
628void vyukov_hash_map<Key, Value, Policies...>::do_grow()
629{
630 // Note: since we hold the resize lock, nobody can replace the current block
631 // (29) - this acquire-load synchronizes-with the release-store (31)
632 auto old_block = data_block.load(std::memory_order_acquire);
633 const auto bucket_count = old_block->bucket_count;
634 block* new_block = allocate_block(bucket_count * 2);
635 if (new_block == nullptr) {
636 resize_lock.store(0, std::memory_order_relaxed);
637 throw std::bad_alloc();
638 }
639
640 // lock all buckets
641 auto old_buckets = old_block->buckets();
642 for (std::uint32_t i = 0; i != bucket_count; ++i) {
643 auto& bucket = old_buckets[i];
644 backoff backoff;
645 for (;;) {
646 auto st = bucket.state.load(std::memory_order_relaxed);
647 if (st.is_locked())
648 continue;
649
650 // (30) - this acquire-CAS synchronizes-with the release-store (3, 5, 11, 13, 14, 36, 38)
651 if (bucket.state.compare_exchange_strong(st, st.locked(),
652 std::memory_order_acquire, std::memory_order_relaxed))
653 break; // we've got the lock
654
655 backoff();
656 }
657 }
658
659 auto new_buckets = new_block->buckets();
660 for (std::uint32_t bucket_idx = 0; bucket_idx != bucket_count; ++bucket_idx) {
661 auto& old_bucket = old_buckets[bucket_idx];
662 const std::uint32_t item_count = old_bucket.state.load(std::memory_order_relaxed).item_count();
663 for (std::uint32_t i = 0; i != item_count; ++i) {
664 auto k = old_bucket.key[i].load(std::memory_order_relaxed);
665 hash_t h = traits::template rehash<hash>(k);
666 auto& new_bucket = new_buckets[h & new_block->mask];
667 auto new_bucket_state = new_bucket.state.load(std::memory_order_relaxed);
668 auto new_bucket_count = new_bucket_state.item_count();
669 auto v = old_bucket.value[i].load(std::memory_order_relaxed);
670 new_bucket.key[new_bucket_count].store(k, std::memory_order_relaxed);
671 new_bucket.value[new_bucket_count].store(v, std::memory_order_relaxed);
672 new_bucket.state.store(new_bucket_state.inc_item_count(), std::memory_order_relaxed);
673 }
674
675 // relaxed ordering is fine since we own the bucket lock
676 for (extension_item* extension = old_bucket.head;
677 extension != nullptr;
678 extension = extension->next.load(std::memory_order_relaxed))
679 {
680 auto k = extension->key.load(std::memory_order_relaxed);
681 hash_t h = traits::template rehash<hash>(k);
682 auto& new_bucket = new_buckets[h & new_block->mask];
683 auto new_bucket_state = new_bucket.state.load(std::memory_order_relaxed);
684 auto new_bucket_count = new_bucket_state.item_count();
685 auto v = extension->value.load(std::memory_order_relaxed);
686 if (new_bucket_count < bucket_item_count) {
687 new_bucket.key[new_bucket_count].store(k, std::memory_order_relaxed);
688 new_bucket.value[new_bucket_count].store(v, std::memory_order_relaxed);
689 new_bucket.state.store(new_bucket_state.inc_item_count(), std::memory_order_relaxed);
690 } else {
691 extension_item* new_extension = allocate_extension_item(new_block, h);
692 assert(new_extension);
693 new_extension->key.store(k, std::memory_order_relaxed);
694 new_extension->value.store(v, std::memory_order_relaxed);
695 new_extension->next.store(new_bucket.head, std::memory_order_relaxed);
696 new_bucket.head = new_extension;
697 }
698 }
699 }
700 // (31) - this release-store synchronizes-with (6, 22, 29, 33)
701 data_block.store(new_block, std::memory_order_release);
702 // (32) - this release-store synchronizes-with the acquire-load (28)
703 resize_lock.store(0, std::memory_order_release);
704
705 // reclaim the old data block
706 guarded_block g(old_block);
707 g.reclaim();
708}
709
710template <class Key, class Value, class... Policies>
711auto vyukov_hash_map<Key, Value, Policies...>::allocate_block(std::uint32_t bucket_count) -> block* {
712 std::uint32_t extension_bucket_count = bucket_count / bucket_to_extension_ratio;
713 std::size_t size = sizeof(block) +
714 sizeof(bucket) * bucket_count +
715 sizeof(extension_bucket) * (static_cast<size_t>(extension_bucket_count) + 1);
716
717 void* mem = ::operator new(size, cacheline_size, std::nothrow);
718 if (mem == nullptr)
719 return nullptr;
720
721 std::memset(mem, 0, size);
722 block* b = new (mem) block;
723 b->mask = bucket_count - 1;
724 b->bucket_count = bucket_count;
725 b->extension_bucket_count = extension_bucket_count;
726
727 std::size_t extension_bucket_addr =
728 reinterpret_cast<std::size_t>(b) + sizeof(block) + sizeof(bucket) * bucket_count;
729 if (extension_bucket_addr % sizeof(extension_bucket) != 0) {
730 extension_bucket_addr += sizeof(extension_bucket) - (extension_bucket_addr % sizeof(extension_bucket));
731 }
732 b->extension_buckets = reinterpret_cast<extension_bucket*>(extension_bucket_addr);
733
734 for (std::uint32_t i = 0; i != extension_bucket_count; ++i) {
735 auto& bucket = b->extension_buckets[i];
736 extension_item* head = nullptr;
737 for (std::size_t j = 0; j != extension_item_count; ++j) {
738 bucket.items[j].next.store(head, std::memory_order_relaxed);
739 head = &bucket.items[j];
740 }
741 bucket.head.store(head, std::memory_order_relaxed);
742 }
743
744 return b;
745}
746
747template <class Key, class Value, class... Policies>
749 const auto h = hash{}(key);
750 iterator result;
751
752 result.current_bucket = &lock_bucket(h, result.block, result.current_bucket_state);
753 auto& bucket = *result.current_bucket;
754
755 accessor acc;
756 auto item_count = result.current_bucket_state.item_count();
757 for (std::uint32_t i = 0; i != item_count; ++i) {
758 if (traits::template compare_key<false>(bucket.key[i], bucket.value[i], key, h, acc)) {
759 result.index = i;
760 return result;
761 }
762 }
763
764 auto extension = bucket.head.load(std::memory_order_relaxed);
765 while (extension) {
766 if (traits::template compare_key<false>(extension->key, extension->value, key, h, acc)) {
767 result.extension = extension;
768 return result;
769 }
770 extension = extension->next.load(std::memory_order_relaxed);
771 }
772
773 return end();
774}
775
776template <class Key, class Value, class... Policies>
778 iterator result;
779 result.current_bucket = &lock_bucket(0, result.block, result.current_bucket_state);
780 if (result.current_bucket_state.item_count() == 0)
781 result.move_to_next_bucket();
782 return result;
783}
784
785template <class Key, class Value, class... Policies>
786auto vyukov_hash_map<Key, Value, Policies...>::lock_bucket(hash_t hash, guarded_block& block, bucket_state& state)
787 -> bucket&
788{
789 backoff backoff;
790 for (;;) {
791 // (33) - this acquire-load synchronizes-with the release-store (31)
792 block.acquire(data_block, std::memory_order_acquire);
793 const std::size_t bucket_idx = hash & block->mask;
794 auto& bucket = block->buckets()[bucket_idx];
795 bucket_state st = bucket.state.load(std::memory_order_relaxed);
796 if (st.is_locked())
797 continue;
798
799 // (34) - this acquire-CAS synchronizes-with the release-store (3, 5, 11, 13, 14, 36, 38)
800 if (bucket.state.compare_exchange_strong(st, st.locked(), std::memory_order_acquire,
801 std::memory_order_relaxed)) {
802 state = st;
803 return bucket;
804 }
805 backoff();
806 }
807}
808
809template <class Key, class Value, class... Policies>
810auto vyukov_hash_map<Key, Value, Policies...>::allocate_extension_item(block* b, hash_t hash)
811 -> extension_item*
812{
813 const std::size_t extension_bucket_count = b->extension_bucket_count;
814 const std::size_t mod_mask = extension_bucket_count - 1;
815 for (std::size_t iter = 0; iter != 2; ++iter) {
816 for (std::size_t idx = 0; idx != extension_bucket_count; ++idx) {
817 const std::size_t extension_bucket_idx = (hash + idx) & mod_mask;
818 extension_bucket& extension_bucket = b->extension_buckets[extension_bucket_idx];
819
820 if (extension_bucket.head.load(std::memory_order_relaxed) == nullptr)
821 continue;
822
823 extension_bucket.acquire_lock();
824 auto item = extension_bucket.head.load(std::memory_order_relaxed);
825 if (item) {
826 extension_bucket.head.store(item->next, std::memory_order_relaxed);
827 extension_bucket.release_lock();
828 return item;
829 }
830 extension_bucket.release_lock();
831 }
832 }
833 return nullptr;
834}
835
836template <class Key, class Value, class... Policies>
837void vyukov_hash_map<Key, Value, Policies...>::free_extension_item(extension_item* item) {
838 std::size_t item_addr = reinterpret_cast<std::size_t>(item);
839 auto bucket = reinterpret_cast<extension_bucket*>(item_addr - item_addr % sizeof(extension_bucket));
840
841 bucket->acquire_lock();
842 auto head = bucket->head.load(std::memory_order_relaxed);
843 // (35) - this release-store synchronizes-with the acquire-load (27)
844 item->next.store(head, std::memory_order_release);
845 // we need to use release semantic here to ensure that threads in try_get_value
846 // that see the value written by this store also see the updated bucket_state.
847 bucket->head.store(item, std::memory_order_relaxed);
848 bucket->release_lock();
849}
850
851template <class Key, class Value, class... Policies>
852vyukov_hash_map<Key, Value, Policies...>::iterator::iterator() :
853 block(),
854 current_bucket(),
855 current_bucket_state(),
856 index(),
857 extension(),
858 prev()
859{}
860
861template <class Key, class Value, class... Policies>
862vyukov_hash_map<Key, Value, Policies...>::iterator::iterator(iterator&& other) :
863 block(std::move(other.block)),
864 current_bucket(std::move(other.current_bucket)),
865 current_bucket_state(std::move(other.current_bucket_state)),
866 index(std::move(other.index)),
867 extension(std::move(other.extension)),
868 prev(std::move(other.prev))
869{
870 other.block.reset();
871 other.current_bucket = nullptr;
872 other.current_bucket_state = bucket_state();
873 other.index = 0;
874 other.extension = nullptr;
875 other.prev = nullptr;
876}
877
878template <class Key, class Value, class... Policies>
879auto vyukov_hash_map<Key, Value, Policies...>::iterator::operator=(iterator&& other) -> iterator& {
880 block = std::move(other.block);
881 current_bucket = std::move(other.current_bucket);
882 current_bucket_state = std::move(other.current_bucket_state);
883 index = std::move(other.index);
884 extension = std::move(other.extension);
885 prev = std::move(other.prev);
886
887 other.block.reset();
888 other.current_bucket = nullptr;
889 other.current_bucket_state = bucket_state();
890 other.index = 0;
891 other.extension = nullptr;
892 other.prev = nullptr;
893
894 return *this;
895}
896
897template <class Key, class Value, class... Policies>
898vyukov_hash_map<Key, Value, Policies...>::iterator::~iterator() {
899 reset();
900}
901
902template <class Key, class Value, class... Policies>
904 // unlock the current bucket
905 if (current_bucket) {
906 assert(current_bucket->state.load().is_locked());
907 // (36) - this release-store synchronizes-with the acquire-CAS (7, 30, 34, 37) and the acquire-load (23)
908 current_bucket->state.store(current_bucket_state, std::memory_order_release);
909 }
910
911 block.reset();
912 current_bucket = nullptr;
913 current_bucket_state = bucket_state();
914 index = 0;
915 extension = nullptr;
916 prev = nullptr;
917}
918
919template <class Key, class Value, class... Policies>
921 return block == r.block &&
922 current_bucket == r.current_bucket &&
923 current_bucket_state == r.current_bucket_state &&
924 index == r.index &&
925 extension == r.extension;
926}
927
928template <class Key, class Value, class... Policies>
930 return !(*this == r);
931}
932
933template <class Key, class Value, class... Policies>
934auto vyukov_hash_map<Key, Value, Policies...>::iterator::operator++() -> iterator& {
935 if (extension) {
936 prev = &extension->next;
937 extension = extension->next.load(std::memory_order_relaxed);
938 if (extension == nullptr)
939 move_to_next_bucket();
940 return *this;
941 }
942
943 ++index;
944 if (index == current_bucket_state.item_count()) {
945 prev = &current_bucket->head;
946 extension = current_bucket->head.load(std::memory_order_relaxed);
947 if (extension == nullptr)
948 move_to_next_bucket();
949 }
950 return *this;
951}
952
953template <class Key, class Value, class... Policies>
954auto vyukov_hash_map<Key, Value, Policies...>::iterator::operator->() -> pointer {
955 static_assert(std::is_reference<reference>::value,
956 "operator-> is only available for instantiations with non-trivial key types. Use explicit "
957 "dereferenciation instead (operator*). The reason is that all other instantiations create "
958 "temporary std::pair<> instances since key and value are stored separately.");
959 return &this->operator*();
960}
961
962template <class Key, class Value, class... Policies>
963auto vyukov_hash_map<Key, Value, Policies...>::iterator::operator*() -> reference {
964 if (extension) {
965 return traits::deref_iterator(extension->key, extension->value);
966 }
967 return traits::deref_iterator(current_bucket->key[index], current_bucket->value[index]);
968}
969
970template <class Key, class Value, class... Policies>
971void vyukov_hash_map<Key, Value, Policies...>::iterator::move_to_next_bucket() {
972 assert(current_bucket != nullptr);
973 if (current_bucket == block->buckets() + block->bucket_count - 1) {
974 // we reached the end of the container -> reset the iterator
975 reset();
976 return;
977 }
978
979 auto old_bucket = current_bucket;
980 auto old_bucket_state = current_bucket_state;
981 ++current_bucket; // move pointer to the next bucket
982
983 // TODO - no need to lock if bucket is empty
984
985 backoff backoff;
986 for (;;) {
987 auto st = current_bucket->state.load(std::memory_order_relaxed);
988 if (st.is_locked())
989 continue;
990
991 // (37) - this acquire-CAS synchronizes-with the release-store (3, 5, 11, 13, 14, 36, 38)
992 if (current_bucket->state.compare_exchange_strong(st, st.locked(), std::memory_order_acquire,
993 std::memory_order_relaxed))
994 {
995 current_bucket_state = st;
996 break;
997 }
998 backoff();
999 }
1000
1001 // (38) - this release-store synchronizes-with the acquire-CAS (7, 30, 34, 37) and the acquire-load (23)
1002 old_bucket->state.store(old_bucket_state, std::memory_order_release); // unlock the previous bucket
1003
1004 index = 0;
1005 extension = nullptr;
1006 prev = nullptr;
1007 if (current_bucket_state.item_count() == 0)
1008 move_to_next_bucket();
1009}
1010
1011}
1012
1013#ifdef _MSC_VER
1014#pragma warning(pop)
1015#endif
A ForwardIterator to safely iterate vyukov_hash_map.
Definition: vyukov_hash_map.hpp:370
void reset()
Releases the bucket lock and resets the iterator.
Definition: vyukov_hash_map.hpp:903
Slim wrapper around std::hash with specialization for pointer types.
Definition: hash.hpp:25
A concurrent hash-map that uses fine-grained locking.
Definition: vyukov_hash_map.hpp:143