Intel(R) Threading Building Blocks Doxygen Documentation
version 4.2.3
|
Go to the documentation of this file.
17 #ifndef __TBB__concurrent_queue_impl_H
18 #define __TBB__concurrent_queue_impl_H
20 #ifndef __TBB_concurrent_queue_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
24 #include "../tbb_stddef.h"
25 #include "../tbb_machine.h"
26 #include "../atomic.h"
27 #include "../spin_mutex.h"
28 #include "../cache_aligned_allocator.h"
29 #include "../tbb_exception.h"
30 #include "../tbb_profiling.h"
32 #include __TBB_STD_SWAP_HEADER
37 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
40 namespace strict_ppl {
41 template<
typename T,
typename A>
class concurrent_queue;
44 template<
typename T,
typename A>
class concurrent_bounded_queue;
49 namespace strict_ppl {
73 static const size_t phi = 3;
77 static const size_t n_queue = 8;
103 return uintptr_t(
p)>1;
121 #if _MSC_VER && !defined(__INTEL_COMPILER)
123 #pragma warning( push )
124 #pragma warning( disable: 4146 )
133 typedef void (*item_constructor_t)(T* location,
const void* src);
145 void copy_item(
page& dst,
size_t dindex,
const void* src, item_constructor_t construct_item ) {
146 construct_item( &get_ref(dst, dindex), src );
150 item_constructor_t construct_item )
152 T& src_item = get_ref( const_cast<page&>(src), sindex );
153 construct_item( &get_ref(dst, dindex), static_cast<const void*>(&src_item) );
157 T& from = get_ref(src,index);
177 return (&static_cast<padded_page*>(static_cast<void*>(&
p))->
last)[index];
189 item_constructor_t construct_item ) ;
194 item_constructor_t construct_item ) ;
197 size_t end_in_page,
ticket& g_index, item_constructor_t construct_item ) ;
199 void invalidate_page_and_rethrow(
ticket k ) ;
216 item_constructor_t construct_item )
226 ++base.
my_rep->n_invalid_entries;
227 invalidate_page_and_rethrow( k );
233 if( tail_counter != k ) spin_wait_until_my_turn( tail_counter, k, *base.
my_rep );
249 copy_item( *
p, index, item, construct_item );
255 ++base.
my_rep->n_invalid_entries;
272 bool success =
false;
275 if(
p->mask & uintptr_t(1)<<index ) {
277 assign_and_destroy_item( dst, *
p, index );
279 --base.
my_rep->n_invalid_entries;
287 item_constructor_t construct_item )
294 ticket g_index = head_counter;
298 size_t end_in_first_page = (index+n_items<base.
my_rep->items_per_page)?(index+n_items):base.
my_rep->items_per_page;
300 head_page = make_copy( base, srcp, index, end_in_first_page, g_index, construct_item );
301 page* cur_page = head_page;
305 cur_page->
next = make_copy( base, srcp, 0, base.
my_rep->items_per_page, g_index, construct_item );
306 cur_page = cur_page->
next;
311 if( last_index==0 ) last_index = base.
my_rep->items_per_page;
313 cur_page->
next = make_copy( base, srcp, 0, last_index, g_index, construct_item );
314 cur_page = cur_page->
next;
316 tail_page = cur_page;
318 invalidate_page_and_rethrow( g_index );
321 head_page = tail_page = NULL;
329 page* invalid_page = (
page*)uintptr_t(1);
335 q->
next = invalid_page;
337 head_page = invalid_page;
338 tail_page = invalid_page;
346 ticket& g_index, item_constructor_t construct_item )
350 new_page->
next = NULL;
352 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
353 if( new_page->
mask & uintptr_t(1)<<begin_in_page )
354 copy_item( *new_page, begin_in_page, *src_page, begin_in_page, construct_item );
367 my_ticket(k), my_queue(queue), my_page(
p), allocator(b)
378 my_queue.head_page = q;
380 my_queue.tail_page = NULL;
385 allocator.deallocate_page(
p );
389 #if _MSC_VER && !defined(__INTEL_COMPILER)
390 #pragma warning( pop )
391 #endif // warning 4146 is back
406 return k*phi%n_queue;
411 return array[index(k)];
441 return reinterpret_cast<page*>(allocate_block ( n ));
447 deallocate_block( reinterpret_cast<void*>(
p), n );
451 virtual void *allocate_block(
size_t n ) = 0;
454 virtual void deallocate_block(
void *
p,
size_t n ) = 0;
462 for(
size_t i=0; i<nq; i++ )
463 __TBB_ASSERT( my_rep->
array[i].tail_page==NULL,
"pages were not freed properly" );
472 r.
choose(k).push( src, k, *
this, construct_item );
477 bool internal_try_pop(
void* dst ) ;
480 size_t internal_size()
const ;
483 bool internal_empty()
const ;
487 void internal_finish_clear() ;
497 #if __TBB_CPP11_RVALUE_REF_PRESENT
507 const size_t item_size =
sizeof(T);
514 my_rep->item_size = item_size;
515 my_rep->items_per_page = item_size<= 8 ? 32 :
516 item_size<= 16 ? 16 :
536 #if defined(_MSC_VER) && defined(_Wp64)
537 #pragma warning (push)
538 #pragma warning (disable: 4267)
541 #if defined(_MSC_VER) && defined(_Wp64)
542 #pragma warning (pop)
548 }
while( !r.
choose( k ).pop( dst, k, *
this ) );
555 __TBB_ASSERT(
sizeof(ptrdiff_t)<=
sizeof(
size_t), NULL );
560 ptrdiff_t sz = tc-hc-nie;
561 return sz<0 ? 0 : size_t(sz);
577 for(
size_t i=0; i<nq; ++i ) {
581 deallocate_page( tp );
582 r.
array[i].tail_page = NULL;
601 for(
size_t i = 0; i < r.
n_queue; ++i )
602 r.
array[i].assign( src.
my_rep->array[i], *
this, construct_item);
605 "the source concurrent queue should not be concurrently modified." );
618 head_counter(queue.my_rep->head_counter),
621 for(
size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
622 array[k] = queue.
my_rep->array[k].head_page;
626 bool get_item( T*& item,
size_t k ) ;
631 if( k==my_queue.my_rep->tail_counter ) {
639 return (
p->mask & uintptr_t(1)<<i)!=0;
645 template<
typename Value>
651 template<
typename C,
typename T,
typename U>
654 template<
typename C,
typename T,
typename U>
662 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
669 : my_rep(NULL), my_item(NULL) {
694 template<
typename Value>
699 if( !my_rep->get_item(my_item, k) ) advance();
702 template<
typename Value>
704 if( my_rep!=other.
my_rep ) {
717 template<
typename Value>
719 __TBB_ASSERT( my_item,
"attempt to increment iterator past end of queue" );
720 size_t k = my_rep->head_counter;
724 my_rep->get_item(tmp,k);
733 my_rep->head_counter = ++k;
734 if( !my_rep->get_item(my_item, k) ) advance();
747 template<
typename Container,
typename Value>
749 public std::iterator<std::forward_iterator_tag,Value> {
750 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
751 template<
typename T,
class A>
752 friend class ::tbb::strict_ppl::concurrent_queue;
779 return *static_cast<Value*>(this->my_item);
792 Value* result = &operator*();
799 template<
typename C,
typename T,
typename U>
804 template<
typename C,
typename T,
typename U>
854 #if __TBB_PROTECTED_NESTED_CLASS_BROKEN
868 virtual void copy_item(
page& dst,
size_t index,
const void* src ) = 0;
869 virtual void assign_and_destroy_item(
void* dst,
page& src,
size_t index ) = 0;
915 #if __TBB_CPP11_RVALUE_REF_PRESENT
926 void internal_insert_item(
const void* src, copy_specifics op_type );
929 bool internal_insert_if_not_full(
const void* src, copy_specifics op_type );
934 virtual void copy_page_item(
page& dst,
size_t dindex,
const page& src,
size_t sindex ) = 0;
955 virtual void move_item(
page& dst,
size_t index,
const void* src ) = 0;
965 template<
typename C,
typename T,
typename U>
968 template<
typename C,
typename T,
typename U>
1011 template<
typename Container,
typename Value>
1013 public std::iterator<std::forward_iterator_tag,Value> {
1015 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
1016 template<
typename T,
class A>
1017 friend class ::tbb::concurrent_bounded_queue;
1045 return *static_cast<Value*>(
my_item);
1065 template<
typename C,
typename T,
typename U>
1070 template<
typename C,
typename T,
typename U>
void __TBB_EXPORTED_METHOD internal_push_move(const void *src)
Enqueue item at tail of queue using move operation.
atomic< page * > head_page
Base class for types that should not be copied or assigned.
~concurrent_queue_iterator_base_v3()
Destructor.
size_t items_per_page
Always a power of 2.
void pause()
Pause for a while.
bool get_item(T *&item, size_t k)
Set item to point to kth element. Return true if at end of queue or item is marked valid; false other...
void assign(const concurrent_queue_iterator_base_v3< Value > &other)
Assignment.
concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base
static const size_t n_queue
concurrent_queue_rep< T >::page page
concurrent_queue_iterator & operator=(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Iterator assignment.
micro_queue< T >::padded_page padded_page
void internal_push(const void *src, item_constructor_t construct_item)
Enqueue item at tail of queue.
static T & get_ref(page &p, size_t index)
page * make_copy(concurrent_queue_base_v3< T > &base, const page *src_page, size_t begin_in_page, size_t end_in_page, ticket &g_index, item_constructor_t construct_item)
Value * operator++(int)
Post increment.
virtual void move_item(page &dst, size_t index, const void *src)=0
A lock that occupies a single byte.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
bool is_valid_page(const concurrent_queue_rep_base::page *p)
concurrent_queue_iterator_rep< Value > * my_rep
Represents concurrent_queue over which we are iterating.
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Value * my_item
Pointer to current item.
concurrent_queue_iterator_base_v3 & operator=(const concurrent_queue_iterator_base_v3 &i)
virtual void move_page_item(page &dst, size_t dindex, const page &src, size_t sindex)=0
void itt_store_word_with_release(tbb::atomic< T > &dst, U src)
Value * operator->() const
static size_t index(ticket k)
Map ticket to an array index.
Base class for types that should not be assigned.
virtual ~concurrent_queue_page_allocator()
concurrent_queue_iterator_base_v3 & operator=(const concurrent_queue_iterator_base_v3 &i)
A queue using simple locking.
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
atomic< ticket > tail_counter
Class used to ensure exception-safety of method "pop".
T last
Must be last field.
bool pop(void *dst, ticket k, concurrent_queue_base_v3< T > &base)
parts of concurrent_queue_rep that do not have references to micro_queue
void internal_finish_clear()
free any remaining pages
Type-independent portion of concurrent_queue_iterator.
atomic< size_t > n_invalid_entries
number of invalid entries in the queue
ptrdiff_t my_capacity
Capacity of the queue.
micro_queue< T > & choose(ticket k)
bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full(const void *src)
Attempt to enqueue item onto queue using move operation.
concurrent_queue_iterator_rep * my_rep
concurrent_queue over which we are iterating.
micro_queue< T >::padded_page padded_page
atomic< ticket > head_counter
Class that implements exponential backoff.
concurrent_queue_iterator()
void swap(atomic< T > &lhs, atomic< T > &rhs)
size_t item_size
Size of an item.
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
base class of concurrent_queue
void copy_item(page &dst, size_t dindex, const void *src, item_constructor_t construct_item)
atomic< ticket > tail_counter
atomic< ticket > head_counter
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
Meets requirements of a forward iterator for STL.
virtual ~concurrent_queue_base_v3()
friend bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
A queue using simple locking.
void push(const void *item, ticket k, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
void assign_and_destroy_item(void *dst, page &src, size_t index)
virtual concurrent_queue_rep_base::page * allocate_page()=0
void internal_throw_exception() const
Obsolete.
Value * operator++(int)
Post increment.
Meets requirements of a forward iterator for STL.
concurrent_queue_iterator & operator++()
Advance to next item in queue.
~micro_queue_pop_finalizer()
concurrent_queue_iterator & operator++()
Advance to next item in queue.
void move(tbb_thread &t1, tbb_thread &t2)
size_t items_per_page
Always a power of 2.
concurrent_queue_iterator_base_v3()
Default constructor.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
size_t internal_size() const
Get size of queue; result may be invalid if queue is modified concurrently.
void __TBB_EXPORTED_METHOD move_content(concurrent_queue_base_v8 &src)
move items
Value & operator*() const
Reference to current item.
micro_queue< T > array[n_queue]
concurrent_queue_rep_base::page page
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
Value & operator*() const
Reference to current item.
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
Represents acquisition of a mutex.
virtual page * allocate_page() __TBB_override
virtual void deallocate_page(concurrent_queue_rep_base::page *p) __TBB_override
concurrent_queue_iterator_rep(const concurrent_queue_base_v3< T > &queue)
micro_queue & assign(const micro_queue &src, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
Internal representation of a ConcurrentQueue.
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
size_t item_size
Size of an item.
concurrent_queue_page_allocator & allocator
void itt_hide_store_word(T &dst, T src)
micro_queue_pop_finalizer(micro_queue< T > &queue, concurrent_queue_base_v3< T > &b, ticket k, page *p)
argument_integer_type modulo_power_of_two(argument_integer_type arg, divisor_integer_type divisor)
A function to compute arg modulo divisor where divisor is a power of 2.
concurrent_queue_iterator(const concurrent_queue_base_v3 &queue)
Construct iterator pointing to head of queue.
void advance()
Advance iterator one step towards tail of queue.
const concurrent_queue_base_v3< T > & my_queue
concurrent_queue_iterator_base_v3()
Default constructor.
size_t __TBB_EXPORTED_FUNC NFS_GetLineSize()
Cache/sector line size.
Abstract class to define interface for page allocation/deallocation.
friend bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
concurrent_queue_base_v8(size_t item_sz)
concurrent_queue_rep_base::page page
padded_page()
Not defined anywhere - exists to quiet warnings.
representation of concurrent_queue_base
micro_queue< T > & my_queue
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Similar to C++0x std::remove_cv.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
T last
Must be last field.
void internal_swap(concurrent_queue_base_v3 &src)
swap internal representation
concurrent_queue_iterator()
concurrent_queue_rep * my_rep
Internal representation.
void operator=(const padded_page &)
Not defined anywhere - exists to quiet warnings.
void copy_item(page &dst, size_t dindex, const page &src, size_t sindex, item_constructor_t construct_item)
concurrent_queue_iterator & operator=(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Iterator assignment.
Identifiers declared inside namespace internal should never be used directly by client code.
void spin_wait_until_my_turn(atomic< ticket > &counter, ticket k, concurrent_queue_rep_base &rb) const
Value * operator->() const
auto last(Container &c) -> decltype(begin(c))
void const char const char int ITT_FORMAT __itt_group_sync p
#define __TBB_EXPORTED_METHOD
void call_itt_notify(notify_type, void *)
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
bool internal_empty() const
check if the queue is empty; thread safe
void invalidate_page_and_rethrow(ticket k)
atomic< page * > tail_page
void * my_item
Pointer to current item.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
concurrent_queue_rep< T > * my_rep
Internal representation.
#define __TBB_compiler_fence()
bool internal_try_pop(void *dst)
Attempt to dequeue item from queue.
void assign(const concurrent_queue_base_v3 &src, item_constructor_t construct_item)
copy or move internal representation
Constness-independent portion of concurrent_queue_iterator.
micro_queue< T >::item_constructor_t item_constructor_t
concurrent_queue_base_v3()
Copyright © 2005-2020 Intel Corporation. All Rights Reserved.
Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are
registered trademarks or trademarks of Intel Corporation or its
subsidiaries in the United States and other countries.
* Other names and brands may be claimed as the property of others.