28 #if __TBB_STATISTICS_STDOUT
47 #if __TBB_TASK_GROUP_CONTEXT
52 #if __TBB_TASK_PRIORITY
59 my_ref_top_priority = &a->my_top_priority;
60 my_ref_reload_epoch = &a->my_reload_epoch;
62 my_local_reload_epoch = *my_ref_reload_epoch;
68 return !slot &&
as_atomic( slot ).compare_and_swap( &
s, NULL ) == NULL;
74 size_t index =
s.my_arena_index;
75 if ( index < lower || index >= upper ) index =
s.my_random.get() % (upper - lower) + lower;
78 for (
size_t i = index; i < upper; ++i )
80 for (
size_t i = lower; i < index; ++i )
85 template <
bool as_worker>
105 __TBB_ASSERT(
s.my_innermost_running_task ==
s.my_dummy_task, NULL );
115 s.attach_arena(
this, index,
false );
117 #if !__TBB_FP_CONTEXT
121 #if __TBB_ARENA_OBSERVER
122 __TBB_ASSERT( !
s.my_last_local_observer,
"There cannot be notified local observers when entering arena" );
123 my_observers.notify_entry_observers(
s.my_last_local_observer,
true );
129 s.local_wait_for_all( *
s.my_dummy_task, NULL );
134 __TBB_ASSERT(
s.my_innermost_running_task ==
s.my_dummy_task, NULL );
138 "Worker cannot leave arena while its task pool is not reset" );
151 s.my_innermost_running_task =
s.my_dummy_task;
152 s.local_wait_for_all(*
s.my_dummy_task,t);
155 #if __TBB_ARENA_OBSERVER
156 my_observers.notify_exit_observers(
s.my_last_local_observer,
true );
157 s.my_last_local_observer = NULL;
159 #if __TBB_TASK_PRIORITY
160 if (
s.my_offloaded_tasks )
161 orphan_offloaded_tasks(
s );
164 ++
s.my_counters.arena_roundtrips;
165 *
my_slots[index].my_counters +=
s.my_counters;
166 s.my_counters.reset();
172 __TBB_ASSERT(
s.my_innermost_running_task ==
s.my_dummy_task, NULL );
179 on_thread_leaving<ref_worker>();
183 __TBB_ASSERT( !my_guard,
"improperly allocated arena?" );
186 #if __TBB_TASK_PRIORITY
187 __TBB_ASSERT( !my_reload_epoch && !my_orphaned_tasks && !my_skipped_fifo_priority,
"New arena object is not zeroed" );
196 #if __TBB_TASK_PRIORITY
197 my_bottom_priority = my_top_priority = normalized_normal_priority;
200 #if __TBB_ARENA_OBSERVER
201 my_observers.my_arena =
this;
203 #if __TBB_PREVIEW_RESUMABLE_TASKS
204 my_co_cache.init(4 * num_slots);
212 #if __TBB_PREVIEW_RESUMABLE_TASKS
219 #if __TBB_PREVIEW_CRITICAL_TASKS
223 my_slots[i].my_counters =
new (
NFS_Allocate(1,
sizeof(statistics_counters), NULL) ) statistics_counters;
228 #if __TBB_PREVIEW_CRITICAL_TASKS
230 ITT_SYNC_CREATE(&my_critical_task_stream, SyncType_Scheduler, SyncObj_CriticalTaskStream);
232 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
233 my_local_concurrency_mode =
false;
234 my_global_concurrency_mode =
false;
236 #if !__TBB_FP_CONTEXT
246 unsigned char* storage = (
unsigned char*)
NFS_Allocate( 1, n, NULL );
248 memset( storage, 0, n );
257 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
260 #if !__TBB_STATISTICS_EARLY_DUMP
264 intptr_t drained = 0;
277 #if __TBB_PREVIEW_RESUMABLE_TASKS
279 my_co_cache.cleanup();
281 #if __TBB_PREVIEW_CRITICAL_TASKS
282 __TBB_ASSERT( my_critical_task_stream.drain()==0,
"Not all critical tasks were executed");
284 #if __TBB_COUNT_TASK_NODES
285 my_market->update_task_node_count( -drained );
289 #if __TBB_TASK_GROUP_CONTEXT
290 __TBB_ASSERT( my_default_ctx,
"Master thread never entered the arena?" );
291 my_default_ctx->~task_group_context();
294 #if __TBB_ARENA_OBSERVER
295 if ( !my_observers.empty() )
296 my_observers.clear();
302 #if TBB_USE_ASSERT > 1
309 void arena::dump_arena_statistics () {
310 statistics_counters total;
312 #if __TBB_STATISTICS_EARLY_DUMP
315 *
my_slots[i].my_counters +=
s->my_counters;
321 dump_statistics( *
my_slots[i].my_counters, i );
324 dump_statistics( *
my_slots[0].my_counters, 0 );
325 #if __TBB_STATISTICS_STDOUT
326 #if !__TBB_STATISTICS_TOTALS_ONLY
327 printf(
"----------------------------------------------\n" );
329 dump_statistics( total, workers_counters_total );
331 dump_statistics( total, arena_counters_total );
332 #if !__TBB_STATISTICS_TOTALS_ONLY
333 printf(
"==============================================\n" );
339 #if __TBB_TASK_PRIORITY
344 inline bool arena::may_have_tasks ( generic_scheduler*
s,
bool& tasks_present,
bool& dequeuing_possible ) {
345 if ( !
s ||
s->my_arena !=
this )
347 dequeuing_possible |=
s->worker_outermost_level();
348 if (
s->my_pool_reshuffling_pending ) {
351 tasks_present =
true;
354 if (
s->my_offloaded_tasks ) {
355 tasks_present =
true;
356 if (
s->my_local_reload_epoch < *
s->my_ref_reload_epoch ) {
365 void arena::orphan_offloaded_tasks(generic_scheduler&
s) {
368 ++my_abandonment_epoch;
369 __TBB_ASSERT(
s.my_offloaded_task_list_tail_link && !*
s.my_offloaded_task_list_tail_link, NULL );
372 orphans = const_cast<task*>(my_orphaned_tasks);
373 *
s.my_offloaded_task_list_tail_link = orphans;
374 }
while (
as_atomic(my_orphaned_tasks).compare_and_swap(
s.my_offloaded_tasks, orphans) != orphans );
375 s.my_offloaded_tasks = NULL;
377 s.my_offloaded_task_list_tail_link = NULL;
397 advertise_new_work<work_enqueued>();
398 #if __TBB_TASK_PRIORITY
403 if ( p < my_bottom_priority || p > my_top_priority )
429 #if __TBB_TASK_PRIORITY
431 intptr_t top_priority = my_top_priority;
435 for( k=0; k<n; ++k ) {
446 bool work_absent = k == n;
447 #if __TBB_PREVIEW_CRITICAL_TASKS
448 bool no_critical_tasks = my_critical_task_stream.empty(0);
449 work_absent &= no_critical_tasks;
451 #if __TBB_TASK_PRIORITY
454 bool tasks_present = !work_absent || my_orphaned_tasks;
455 bool dequeuing_possible =
false;
460 uintptr_t abandonment_epoch = my_abandonment_epoch;
466 the_context_state_propagation_mutex.lock();
467 work_absent = !may_have_tasks(
my_slots[0].my_scheduler, tasks_present, dequeuing_possible );
468 the_context_state_propagation_mutex.unlock();
481 for( k = 1; work_absent && k < n; ++k ) {
484 work_absent = !may_have_tasks(
my_slots[k].my_scheduler, tasks_present, dequeuing_possible );
487 work_absent = work_absent
489 && abandonment_epoch == my_abandonment_epoch;
494 #if __TBB_TASK_PRIORITY
496 work_absent = work_absent && (!dequeuing_possible || no_fifo_tasks)
497 && top_priority == my_top_priority && reload_epoch == my_reload_epoch;
500 work_absent = work_absent && no_fifo_tasks;
503 #if __TBB_TASK_PRIORITY
504 if ( top_priority > my_bottom_priority ) {
505 if (
my_market->lower_arena_priority(*
this, top_priority - 1, reload_epoch)
508 atomic_update( my_skipped_fifo_priority, top_priority, std::less<intptr_t>());
511 else if ( !tasks_present && !my_orphaned_tasks && no_fifo_tasks ) {
524 #if __TBB_TASK_PRIORITY
541 #if __TBB_COUNT_TASK_NODES
542 intptr_t arena::workers_task_node_count() {
547 result +=
s->my_task_node_count;
555 #if __TBB_RECYCLE_TO_ENQUEUE
566 __TBB_ASSERT( ref_count!=0,
"attempt to enqueue task whose parent has a ref_count==0 (forgot to set_ref_count?)" );
567 __TBB_ASSERT( ref_count>0,
"attempt to enqueue task whose parent has a ref_count<0" );
572 #if __TBB_PREVIEW_CRITICAL_TASKS
578 if(
s &&
s->my_arena_slot ) {
581 #if __TBB_TASK_ISOLATION
584 unsigned& lane =
s->my_arena_slot->hint_for_critical;
591 advertise_new_work<work_spawned>();
597 #if __TBB_TASK_PRIORITY
598 intptr_t
p = prio ? normalize_priority(
priority_t(prio)) : normalized_normal_priority;
599 assert_priority_valid(
p);
600 #if __TBB_PREVIEW_CRITICAL_TASKS && __TBB_CPF_BUILD
605 if (
p != my_top_priority )
608 __TBB_ASSERT_EX(prio == 0,
"the library is not configured to respect the task priority");
609 #if __TBB_PREVIEW_CRITICAL_TASKS && __TBB_CPF_BUILD
615 advertise_new_work<work_enqueued>();
616 #if __TBB_TASK_PRIORITY
617 if (
p != my_top_priority )
632 #if __TBB_PREVIEW_RESUMABLE_TASKS
637 s->nested_arena_entry(a, slot_index);
641 #if __TBB_TASK_GROUP_CONTEXT
650 #if __TBB_TASK_PRIORITY
667 #if __TBB_PREVIEW_CRITICAL_TASKS
670 #if __TBB_TASK_GROUP_CONTEXT
683 #if __TBB_TASK_PRIORITY
684 if ( my_offloaded_tasks )
685 my_arena->orphan_offloaded_tasks( *
this );
686 my_offloaded_tasks = NULL;
696 #if __TBB_ARENA_OBSERVER
697 my_last_local_observer = 0;
698 my_arena->my_observers.notify_entry_observers( my_last_local_observer,
false );
700 #if __TBB_PREVIEW_RESUMABLE_TASKS
706 #if __TBB_ARENA_OBSERVER
707 my_arena->my_observers.notify_exit_observers( my_last_local_observer,
false );
709 #if __TBB_TASK_PRIORITY
710 if ( my_offloaded_tasks )
711 my_arena->orphan_offloaded_tasks( *
this );
728 #if __TBB_PREVIEW_RESUMABLE_TASKS
729 class resume_task :
public task {
736 if (
s->prepare_resume(my_target)) {
737 s->resume(my_target);
741 prefix().state = task::to_resume;
749 generic_scheduler* co_sched = curr.my_arena->my_co_cache.pop();
756 co_sched->my_arena = curr.my_arena;
759 co_sched->my_dummy_task->prefix().context = co_sched->my_arena->my_default_ctx;
766 void internal_suspend(
void* suspend_callback,
void* user_callback) {
768 __TBB_ASSERT(
s.my_arena_slot->my_scheduler_is_recalled != NULL, NULL);
769 bool is_recalled = *
s.my_arena_slot->my_scheduler_is_recalled;
770 generic_scheduler& target = is_recalled ? *
s.my_arena_slot->my_scheduler :
create_coroutine(
s);
772 generic_scheduler::callback_t callback = {
773 (generic_scheduler::suspend_callback_t)suspend_callback, user_callback, &
s };
774 target.set_post_resume_action(generic_scheduler::PRA_CALLBACK, &callback);
778 void internal_resume(task::suspend_point tag) {
779 generic_scheduler&
s = *static_cast<generic_scheduler*>(tag);
780 task* t =
new(&
s.allocate_task(
sizeof(resume_task),
__TBB_CONTEXT_ARG(NULL,
s.my_dummy_task->context()))) resume_task(
s);
786 arena& a = *
s.my_arena;
797 task::suspend_point internal_current_suspend_point() {
809 namespace interface7 {
815 #if __TBB_NUMA_SUPPORT
825 #if __TBB_TASK_GROUP_CONTEXT
826 new_arena->my_default_ctx =
new (
NFS_Allocate(1,
sizeof(task_group_context), NULL) )
829 new_arena->my_default_ctx->capture_fp_settings();
836 m.release(
true,
false );
838 #if __TBB_TASK_GROUP_CONTEXT
841 #if __TBB_TASK_GROUP_CONTEXT || __TBB_NUMA_SUPPORT
843 #if __TBB_TASK_GROUP_CONTEXT
847 #if __TBB_NUMA_SUPPORT
848 my_arena->my_numa_binding_observer = tbb::internal::construct_binding_observer(
860 #if __TBB_NUMA_SUPPORT
861 if(
my_arena->my_numa_binding_observer != NULL ) {
862 tbb::internal::destroy_binding_observer(
my_arena->my_numa_binding_observer);
863 my_arena->my_numa_binding_observer = NULL;
869 #if __TBB_TASK_GROUP_CONTEXT
878 if(
s &&
s->my_arena ) {
884 #if __TBB_TASK_GROUP_CONTEXT
900 #if __TBB_TASK_GROUP_CONTEXT
903 "The task will not be executed because default task_group_context of task_arena is cancelled. Has previously enqueued task thrown an exception?");
908 class delegated_task :
public task {
909 internal::delegate_base & my_delegate;
910 concurrent_monitor & my_monitor;
913 generic_scheduler&
s = *(generic_scheduler*)prefix().owner;
914 __TBB_ASSERT(
s.outermost_level(),
"expected to be enqueued and received on the outermost level");
915 struct outermost_context : internal::no_copy {
917 generic_scheduler &
s;
919 task_group_context * orig_ctx;
920 scheduler_properties orig_props;
921 outermost_context(delegated_task *_t, generic_scheduler &_s)
922 : t(_t),
s(_s), orig_dummy(
s.my_dummy_task), orig_props(
s.my_properties) {
924 #if __TBB_TASK_GROUP_CONTEXT
925 orig_ctx = t->prefix().context;
926 t->prefix().context =
s.my_arena->my_default_ctx;
932 ~outermost_context() {
933 #if __TBB_TASK_GROUP_CONTEXT
935 t->prefix().context = orig_ctx;
938 s.my_properties = orig_props;
939 s.my_dummy_task = orig_dummy;
948 task_prefix& prefix = my_root->prefix();
949 #if __TBB_PREVIEW_RESUMABLE_TASKS
950 reference_count old_ref_count = __TBB_FetchAndStoreW(&prefix.ref_count, 1);
952 if (old_ref_count == internal::abandon_flag + 2) {
955 tbb::task::resume(prefix.abandoned_scheduler);
960 my_monitor.notify(*
this);
963 delegated_task( internal::delegate_base &
d, concurrent_monitor &
s,
task * t )
964 : my_delegate(
d), my_monitor(
s), my_root(t) {}
966 bool operator()(uintptr_t ctx)
const {
return (
void*)ctx == (
void*)&my_delegate; }
974 bool same_arena =
s->my_arena ==
my_arena;
975 size_t index1 =
s->my_arena_index;
980 #if __TBB_USE_OPTIONAL_RTTI
988 internal::delegated_function< graph_funct, void >* deleg_funct =
989 dynamic_cast< internal::delegated_function< graph_funct, void>*
>(&
d);
994 (internal::forward< graph_funct >(deleg_funct->my_func)), 0);
998 concurrent_monitor::thread_context waiter;
999 #if __TBB_TASK_GROUP_CONTEXT
1001 #if __TBB_FP_CONTEXT
1006 root.prefix().ref_count = 2;
1021 s->local_wait_for_all(root, NULL);
1022 #if TBB_USE_EXCEPTIONS
1035 #if TBB_USE_EXCEPTIONS
1038 TbbRethrowException(pe);
1041 #if __TBB_USE_OPTIONAL_RTTI
1047 context_guard_helper<
false> context_guard;
1049 #if TBB_USE_EXCEPTIONS
1055 #if TBB_USE_EXCEPTIONS
1058 context_guard.restore_default();
1063 exception_container.register_pending_exception();
1065 TbbRethrowException(exception_container.my_exception);
1073 class wait_task :
public task {
1074 binary_semaphore & my_signal;
1078 __TBB_ASSERT(
s->outermost_level(),
"The enqueued task can be processed only on outermost level" );
1079 if (
s->is_worker() ) {
1082 s->my_innermost_running_task =
s->my_dummy_task;
1083 s->local_wait_for_all( *
s->my_dummy_task, NULL );
1084 s->my_innermost_running_task =
this;
1085 }
else s->my_arena->is_out_of_work();
1090 wait_task ( binary_semaphore & sema ) : my_signal(sema) {}
1097 __TBB_ASSERT(
s->my_arena !=
my_arena ||
s->my_arena_index == 0,
"task_arena::wait_until_empty() is not supported within a worker context" );
1101 if( !
s->my_arena_index )
1103 s->wait_until_empty();
1109 s->wait_until_empty();
1111 binary_semaphore waiter;
1124 return s?
int(
s->my_arena_index) : -1;
1127 #if __TBB_TASK_ISOLATION
1132 isolation_guard(
isolation_tag &isolation ) : guarded( isolation ), previous_value( isolation ) {}
1133 ~isolation_guard() {
1134 guarded = previous_value;
1141 __TBB_ASSERT(
s,
"this_task_arena::isolate() needs an initialized scheduler" );
1144 isolation_tag& current_isolation =
s->my_innermost_running_task->prefix().isolation;
1146 isolation_guard guard( current_isolation );
1147 current_isolation = isolation? isolation : reinterpret_cast<isolation_tag>(&
d);
1160 __TBB_ASSERT( !ta || ta->my_max_concurrency==1, NULL );
1161 return a->my_num_reserved_slots + a->my_max_num_workers;