Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_impl_H
18 #define __TBB_flow_graph_impl_H
19 
20 #include "../tbb_stddef.h"
21 #include "../task.h"
22 #include "../task_arena.h"
23 #include "../flow_graph_abstractions.h"
24 
25 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
26 #include "../concurrent_priority_queue.h"
27 #endif
28 
29 #include <list>
30 
31 #if TBB_DEPRECATED_FLOW_ENQUEUE
32 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
33 #else
34 #define FLOW_SPAWN(a) tbb::task::spawn((a))
35 #endif
36 
37 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
38 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr ) expr
39 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority ) , priority
40 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1, priority
41 #else
42 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr )
43 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority )
44 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1
45 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
46 
47 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
48 #define __TBB_DEPRECATED_LIMITER_EXPR( expr ) expr
49 #define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1, arg2
50 #define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg3, arg4
51 #else
52 #define __TBB_DEPRECATED_LIMITER_EXPR( expr )
53 #define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1
54 #define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg2
55 #endif // TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
56 
57 namespace tbb {
58 namespace flow {
59 
60 namespace internal {
61 static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
62 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
63 typedef unsigned int node_priority_t;
64 static const node_priority_t no_priority = node_priority_t(0);
65 #endif
66 }
67 
68 namespace interface10 {
69 class graph;
70 }
71 
72 namespace interface11 {
73 
75 
76 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
80 struct graph_task : public task {
81  graph_task( node_priority_t node_priority = no_priority ) : priority( node_priority ) {}
83 };
84 #else
85 typedef task graph_task;
86 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
87 
88 class graph_node;
89 
90 template <typename GraphContainerType, typename GraphNodeType>
93  friend class graph_node;
94 public:
95  typedef size_t size_type;
96  typedef GraphNodeType value_type;
97  typedef GraphNodeType* pointer;
98  typedef GraphNodeType& reference;
99  typedef const GraphNodeType& const_reference;
100  typedef std::forward_iterator_tag iterator_category;
101 
103  graph_iterator() : my_graph(NULL), current_node(NULL) {}
104 
107  my_graph(other.my_graph), current_node(other.current_node)
108  {}
109 
112  if (this != &other) {
113  my_graph = other.my_graph;
114  current_node = other.current_node;
115  }
116  return *this;
117  }
118 
120  reference operator*() const;
121 
123  pointer operator->() const;
124 
126  bool operator==(const graph_iterator& other) const {
127  return ((my_graph == other.my_graph) && (current_node == other.current_node));
128  }
129 
131  bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
132 
135  internal_forward();
136  return *this;
137  }
138 
141  graph_iterator result = *this;
142  operator++();
143  return result;
144  }
145 
146 private:
147  // the graph over which we are iterating
148  GraphContainerType *my_graph;
149  // pointer into my_graph's my_nodes list
150  pointer current_node;
151 
153  graph_iterator(GraphContainerType *g, bool begin);
154  void internal_forward();
155 }; // class graph_iterator
156 
157 // flags to modify the behavior of the graph reset(). Can be combined.
160  rf_reset_bodies = 1 << 0, // delete the current node body, reset to a copy of the initial node body.
161  rf_clear_edges = 1 << 1 // delete edges
162 };
163 
164 namespace internal {
165 
173 
174 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
176  bool operator()(const graph_task* left, const graph_task* right) {
177  return left->priority < right->priority;
178  }
179 };
180 
182 
183 class priority_task_selector : public task {
184 public:
185  priority_task_selector(graph_task_priority_queue_t& priority_queue)
186  : my_priority_queue(priority_queue) {}
188  graph_task* t = NULL;
189  bool result = my_priority_queue.try_pop(t);
190  __TBB_ASSERT_EX( result, "Number of critical tasks for scheduler and tasks"
191  " in graph's priority queue mismatched" );
193  "Incorrect task submitted to graph priority queue" );
195  "Tasks from graph's priority queue must have priority" );
196  task* t_next = t->execute();
197  task::destroy(*t);
198  return t_next;
199  }
200 private:
201  graph_task_priority_queue_t& my_priority_queue;
202 };
203 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
204 
205 }
206 
207 } // namespace interfaceX
208 namespace interface10 {
210 
213 
214  template< typename Body >
216  public:
217  run_task(Body& body
220  ) : tbb::flow::interface11::graph_task(node_priority),
221 #else
222  ) :
223 #endif
224  my_body(body) { }
226  my_body();
227  return NULL;
228  }
229  private:
230  Body my_body;
231  };
232 
233  template< typename Receiver, typename Body >
235  public:
236  run_and_put_task(Receiver &r, Body& body) : my_receiver(r), my_body(body) {}
238  tbb::task *res = my_receiver.try_put_task(my_body());
239  if (res == tbb::flow::interface11::SUCCESSFULLY_ENQUEUED) res = NULL;
240  return res;
241  }
242  private:
243  Receiver &my_receiver;
244  Body my_body;
245  };
246  typedef std::list<tbb::task *> task_list_type;
247 
248  class wait_functor {
250  public:
251  wait_functor(tbb::task* t) : graph_root_task(t) {}
252  void operator()() const { graph_root_task->wait_for_all(); }
253  };
254 
258  public:
259  spawn_functor(tbb::task& t) : spawn_task(t) {}
260  void operator()() const {
261  FLOW_SPAWN(spawn_task);
262  }
263  };
264 
265  void prepare_task_arena(bool reinit = false) {
266  if (reinit) {
267  __TBB_ASSERT(my_task_arena, "task arena is NULL");
268  my_task_arena->terminate();
269  my_task_arena->initialize(tbb::task_arena::attach());
270  }
271  else {
272  __TBB_ASSERT(my_task_arena == NULL, "task arena is not NULL");
273  my_task_arena = new tbb::task_arena(tbb::task_arena::attach());
274  }
275  if (!my_task_arena->is_active()) // failed to attach
276  my_task_arena->initialize(); // create a new, default-initialized arena
277  __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
278  }
279 
280 public:
282  graph();
283 
285  explicit graph(tbb::task_group_context& use_this_context);
286 
288 
289  ~graph();
290 
291 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
292  void set_name(const char *name);
293 #endif
294 
296  reserve_wait();
297  }
298 
300  release_wait();
301  }
302 
304 
306  void reserve_wait() __TBB_override;
307 
309 
311  void release_wait() __TBB_override;
312 
314 
316  template< typename Receiver, typename Body >
317  void run(Receiver &r, Body body) {
319  task* rtask = new (task::allocate_additional_child_of(*root_task()))
321  my_task_arena->execute(spawn_functor(*rtask));
322  }
323  }
324 
326 
328  template< typename Body >
329  void run(Body body) {
331  task* rtask = new (task::allocate_additional_child_of(*root_task())) run_task< Body >(body);
332  my_task_arena->execute(spawn_functor(*rtask));
333  }
334  }
335 
337 
338  void wait_for_all() {
339  cancelled = false;
340  caught_exception = false;
341  if (my_root_task) {
342 #if TBB_USE_EXCEPTIONS
343  try {
344 #endif
345  my_task_arena->execute(wait_functor(my_root_task));
346 #if __TBB_TASK_GROUP_CONTEXT
347  cancelled = my_context->is_group_execution_cancelled();
348 #endif
349 #if TBB_USE_EXCEPTIONS
350  }
351  catch (...) {
352  my_root_task->set_ref_count(1);
353  my_context->reset();
354  caught_exception = true;
355  cancelled = true;
356  throw;
357  }
358 #endif
359 #if __TBB_TASK_GROUP_CONTEXT
360  // TODO: the "if" condition below is just a work-around to support the concurrent wait
361  // mode. The cancellation and exception mechanisms are still broken in this mode.
362  // Consider using task group not to re-implement the same functionality.
363  if (!(my_context->traits() & tbb::task_group_context::concurrent_wait)) {
364  my_context->reset(); // consistent with behavior in catch()
365 #endif
366  my_root_task->set_ref_count(1);
367 #if __TBB_TASK_GROUP_CONTEXT
368  }
369 #endif
370  }
371  }
372 
375  return my_root_task;
376  }
377 
378  // ITERATORS
379  template<typename C, typename N>
381 
382  // Graph iterator typedefs
385 
386  // Graph iterator constructors
388  iterator begin();
390  iterator end();
392  const_iterator begin() const;
394  const_iterator end() const;
396  const_iterator cbegin() const;
398  const_iterator cend() const;
399 
401  bool is_cancelled() { return cancelled; }
402  bool exception_thrown() { return caught_exception; }
403 
404  // thread-unsafe state reset.
406 
407 private:
409 #if __TBB_TASK_GROUP_CONTEXT
411 #endif
413  bool cancelled;
416  task_list_type my_reset_task_list;
417 
419 
421  void register_node(tbb::flow::interface11::graph_node *n);
422  void remove_node(tbb::flow::interface11::graph_node *n);
423 
425 
426 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
428 #endif
429 
437 
439 
440 }; // class graph
441 } // namespace interface10
442 
443 namespace interface11 {
444 
446 
447 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
448 namespace internal{
449 class get_graph_helper;
450 }
451 #endif
452 
455  friend class graph;
456  template<typename C, typename N>
457  friend class graph_iterator;
458 
459 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
460  friend class internal::get_graph_helper;
461 #endif
462 
463 protected:
465  graph_node *next, *prev;
466 public:
467  explicit graph_node(graph& g);
468 
469  virtual ~graph_node();
470 
471 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
472  virtual void set_name(const char *name) = 0;
473 #endif
474 
475 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
476  virtual void extract() = 0;
477 #endif
478 
479 protected:
480  // performs the reset on an individual node.
481  virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
482 }; // class graph_node
483 
484 namespace internal {
485 
486 inline void activate_graph(graph& g) {
487  g.my_is_active = true;
488 }
489 
490 inline void deactivate_graph(graph& g) {
491  g.my_is_active = false;
492 }
493 
494 inline bool is_graph_active(graph& g) {
495  return g.my_is_active;
496 }
497 
498 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
500  task* critical_task = &t;
501  // TODO: change flow graph's interfaces to work with graph_task type instead of tbb::task.
502  graph_task* gt = static_cast<graph_task*>(&t);
503  if( gt->priority != no_priority ) {
508  critical_task = new( gt->allocate_continuation() ) priority_task_selector(g.my_priority_queue);
509  tbb::internal::make_critical( *critical_task );
510  g.my_priority_queue.push(gt);
511  }
512  return *critical_task;
513 }
514 #else
516  return t;
517 }
518 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
519 
521 inline void spawn_in_graph_arena(graph& g, tbb::task& arena_task) {
522  if (is_graph_active(g)) {
523  graph::spawn_functor s_fn(prioritize_task(g, arena_task));
525  g.my_task_arena->execute(s_fn);
526  }
527 }
528 
530 inline void enqueue_in_graph_arena(graph &g, tbb::task& arena_task) {
531  if (is_graph_active(g)) {
532  __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );
533  task::enqueue(prioritize_task(g, arena_task), *g.my_task_arena);
534  }
535 }
536 
538  g.my_reset_task_list.push_back(tp);
539 }
540 
541 } // namespace internal
542 
543 } // namespace interfaceX
544 } // namespace flow
545 } // namespace tbb
546 
547 #endif // __TBB_flow_graph_impl_H
void prepare_task_arena(bool reinit=false)
internal::allocate_continuation_proxy & allocate_continuation()
Returns proxy for overloaded new that allocates a continuation task of *this.
Definition: task.h:665
priority_task_selector(graph_task_priority_queue_t &priority_queue)
tbb::flow::interface11::graph_iterator< const graph, const tbb::flow::interface11::graph_node > const_iterator
static tbb::task *const SUCCESSFULLY_ENQUEUED
Base class for user-defined tasks.
Definition: task.h:604
void run(Body body)
Spawns a task that runs a function object.
void enqueue_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Enqueues a task inside graph arena.
void add_task_to_graph_reset_list(tbb::flow::interface10::graph &g, tbb::task *tp)
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
bool is_graph_active(tbb::flow::interface10::graph &g)
#define __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
Definition: tbb_config.h:860
A lock that occupies a single byte.
Definition: spin_mutex.h:39
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert ...
Definition: tbb_stddef.h:167
bool operator!=(const graph_iterator &other) const
Inequality.
bool operator==(const graph_iterator &other) const
Equality.
tbb::flow::interface11::graph_node * my_nodes_last
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
run_task(Body &body, tbb::flow::interface11::node_priority_t node_priority=tbb::flow::interface11::no_priority)
void activate_graph(tbb::flow::interface10::graph &g)
tbb::task * root_task()
Returns the root task of the graph.
bool is_cancelled()
return status of graph execution
Used to form groups of tasks.
Definition: task.h:347
virtual task * execute()=0
Should be overridden by derived classes.
tbb::concurrent_priority_queue< graph_task *, graph_task_comparator > graph_task_priority_queue_t
Base class for tasks generated by graph nodes.
tbb::task_group_context * my_context
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls...
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp end
graph_iterator & operator++()
Pre-increment.
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
unsigned int node_priority_t
task * execute() __TBB_override
Should be overridden by derived classes.
tbb::task & prioritize_task(tbb::flow::interface10::graph &g, tbb::task &arena_task)
void make_critical(task &t)
Definition: task.h:1002
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
void push(const_reference elem)
Pushes elem onto the queue, increasing capacity of queue if necessary.
#define __TBB_override
Definition: tbb_stddef.h:240
void spawn_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
void run(Receiver &r, Body body)
Spawns a task that runs a body and puts its output to a specific receiver.
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
tbb::flow::interface11::graph_iterator< graph, tbb::flow::interface11::graph_node > iterator
bool operator==(const cache_aligned_allocator< T > &, const cache_aligned_allocator< U > &)
graph_iterator & operator=(const graph_iterator &other)
Assignment.
graph_iterator operator++(int)
Post-increment.
tbb::flow::interface11::internal::graph_task_priority_queue_t my_priority_queue
graph_task(node_priority_t node_priority=no_priority)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
std::forward_iterator_tag iterator_category
static void enqueue(task &t)
Enqueue task for starvation-resistant execution.
Definition: task.h:825
Tag class used to indicate the "attaching" constructor.
Definition: task_arena.h:239
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
void wait_for_all()
Wait for reference count to become one, and set reference count to zero.
Definition: task.h:808
The base of all graph nodes.
void deactivate_graph(tbb::flow::interface10::graph &g)
bool operator()(const graph_task *left, const graph_task *right)
static const node_priority_t no_priority
internal::return_type_or_void< F >::type execute(F &f)
Definition: task_arena.h:346
Pure virtual template classes that define interfaces for async communication.
graph_iterator(const graph_iterator &other)
Copy constructor.
#define FLOW_SPAWN(a)
The graph class.
std::list< tbb::task * > task_list_type

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.