Eclipse SUMO - Simulation of Urban MObility
WorkStealingThreadPool.h
Go to the documentation of this file.
1 /****************************************************************************/
2 // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo
3 // Copyright (C) 2020-2022 German Aerospace Center (DLR) and others.
4 // This program and the accompanying materials are made available under the
5 // terms of the Eclipse Public License 2.0 which is available at
6 // https://www.eclipse.org/legal/epl-2.0/
7 // This Source Code may also be made available under the following Secondary
8 // Licenses when the conditions for such availability set forth in the Eclipse
9 // Public License 2.0 are satisfied: GNU General Public License, version 2
10 // or later which is available at
11 // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12 // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13 /****************************************************************************/
18 // Threadpool implementation,
19 // based on https://github.com/vukis/Cpp-Utilities/tree/master/ThreadPool
20 /****************************************************************************/
21 #pragma once
22 #include <config.h>
23 
24 #include "TaskQueue.h"
25 #include <algorithm>
26 #include <thread>
27 
28 template<typename CONTEXT = int>
30 public:
31 
32  explicit WorkStealingThreadPool(const bool workSteal, const std::vector<CONTEXT>& context)
33  : myQueues{ context.size() }, myTryoutCount(workSteal ? 1 : 0) {
34  size_t index = 0;
35  for (const CONTEXT& c : context) {
36  if (workSteal) {
37  myThreads.emplace_back([this, index, c] { workStealRun(index, c); });
38  } else {
39  myThreads.emplace_back([this, index, c] { run(index, c); });
40  }
41  index++;
42  }
43  }
44 
46  for (auto& queue : myQueues) {
47  queue.setEnabled(false);
48  }
49  for (auto& thread : myThreads) {
50  thread.join();
51  }
52  }
53 
54  template<typename TaskT>
55  auto executeAsync(TaskT&& task, int idx = -1) -> std::future<decltype(task(std::declval<CONTEXT>()))> {
56  const auto index = idx == -1 ? myQueueIndex++ : idx;
57  if (myTryoutCount > 0) {
58  for (size_t n = 0; n != myQueues.size() * myTryoutCount; ++n) {
59  // Here we need not to std::forward just copy task.
60  // Because if the universal reference of task has bound to an r-value reference
61  // then std::forward will have the same effect as std::move and thus task is not required to contain a valid task.
62  // Universal reference must only be std::forward'ed a exactly zero or one times.
63  bool success = false;
64  auto result = myQueues[(index + n) % myQueues.size()].tryPush(task, success);
65 
66  if (success) {
67  return result;
68  }
69  }
70  }
71  return myQueues[index % myQueues.size()].push(std::forward<TaskT>(task));
72  }
73 
74  void waitAll() {
75  std::vector<std::future<void>> results;
76  for (int n = 0; n != (int)myQueues.size(); ++n) {
77  results.push_back(executeAsync([](CONTEXT) {}, n));
78  }
79  for (auto& r : results) {
80  r.wait();
81  }
82  }
83 
84 private:
85  void run(size_t queueIndex, const CONTEXT& context) {
86  while (myQueues[queueIndex].isEnabled()) {
87  typename TaskQueue<CONTEXT>::TaskPtrType task;
88  if (myQueues[queueIndex].waitAndPop(task)) {
89  task->exec(context);
90  }
91  }
92  }
93 
94  void workStealRun(size_t queueIndex, const CONTEXT& context) {
95  while (myQueues[queueIndex].isEnabled()) {
96  typename TaskQueue<CONTEXT>::TaskPtrType task;
97  for (size_t n = 0; n != myQueues.size()*myTryoutCount; ++n) {
98  if (myQueues[(queueIndex + n) % myQueues.size()].tryPop(task)) {
99  break;
100  }
101  }
102  if (!task && !myQueues[queueIndex].waitAndPop(task)) {
103  return;
104  }
105  task->exec(context);
106  }
107  }
108 
109 private:
110  std::vector<TaskQueue<CONTEXT> > myQueues;
111  std::atomic<size_t> myQueueIndex{ 0 };
112  const size_t myTryoutCount;
113  std::vector<std::thread> myThreads;
114 };
std::unique_ptr< TaskBase< C > > TaskPtrType
Definition: TaskQueue.h:52
std::vector< TaskQueue< CONTEXT > > myQueues
auto executeAsync(TaskT &&task, int idx=-1) -> std::future< decltype(task(std::declval< CONTEXT >()))>
void workStealRun(size_t queueIndex, const CONTEXT &context)
std::vector< std::thread > myThreads
WorkStealingThreadPool(const bool workSteal, const std::vector< CONTEXT > &context)
void run(size_t queueIndex, const CONTEXT &context)
std::atomic< size_t > myQueueIndex