Eclipse SUMO - Simulation of Urban MObility
TaskQueue.h
Go to the documentation of this file.
1 /****************************************************************************/
2 // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo
3 // Copyright (C) 2020-2022 German Aerospace Center (DLR) and others.
4 // This program and the accompanying materials are made available under the
5 // terms of the Eclipse Public License 2.0 which is available at
6 // https://www.eclipse.org/legal/epl-2.0/
7 // This Source Code may also be made available under the following Secondary
8 // Licenses when the conditions for such availability set forth in the Eclipse
9 // Public License 2.0 are satisfied: GNU General Public License, version 2
10 // or later which is available at
11 // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12 // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13 /****************************************************************************/
18 // Threadpool implementation,
19 // based on https://github.com/vukis/Cpp-Utilities/tree/master/ThreadPool
20 /****************************************************************************/
21 #pragma once
22 #include <config.h>
23 
24 #include <condition_variable>
25 #include <functional>
26 #include <future>
27 #include <queue>
28 
29 template <typename C>
30 class TaskBase {
31 public:
32  virtual ~TaskBase() = default;
33  virtual void exec(const C& context) = 0;
34 };
35 
36 template <typename T, typename C>
37 class Task : public TaskBase<C> {
38 public:
39  Task(T&& t) : task(std::move(t)) {}
40  void exec(const C& context) override {
41  task(context);
42  }
43 
44  T task;
45 };
46 
47 template <typename C>
48 class TaskQueue {
49  using LockType = std::unique_lock<std::mutex>;
50 
51 public:
52  using TaskPtrType = std::unique_ptr<TaskBase<C> >;
53  TaskQueue() = default;
54  ~TaskQueue() = default;
55 
56  void setEnabled(bool enabled) {
57  {
58  LockType lock{ myMutex };
59  myEnabled = enabled;
60  }
61  if (!enabled) {
62  myReady.notify_all();
63  }
64  }
65 
66  bool isEnabled() const {
67  LockType lock{ myMutex };
68  return myEnabled;
69  }
70 
71  bool waitAndPop(TaskPtrType& task) {
72  LockType lock{ myMutex };
73  myReady.wait(lock, [this] { return !myEnabled || !myQueue.empty(); });
74  if (myEnabled && !myQueue.empty()) {
75  task = std::move(myQueue.front());
76  myQueue.pop();
77  return true;
78  }
79  return false;
80  }
81 
82  template <typename TaskT>
83  auto push(TaskT&& task) -> std::future<decltype(task(std::declval<C>()))> {
84  using PkgTask = std::packaged_task<decltype(task(std::declval<C>()))(C)>;
85  auto job = std::unique_ptr<Task<PkgTask, C>>(new Task<PkgTask, C>(PkgTask(std::forward<TaskT>(task))));
86  auto future = job->task.get_future();
87  {
88  LockType lock{ myMutex };
89  myQueue.emplace(std::move(job));
90  }
91 
92  myReady.notify_one();
93  return future;
94  }
95 
96  bool tryPop(TaskPtrType& task) {
97  LockType lock{ myMutex, std::try_to_lock };
98  if (!lock || !myEnabled || myQueue.empty()) {
99  return false;
100  }
101  task = std::move(myQueue.front());
102  myQueue.pop();
103  return true;
104  }
105 
106  template <typename TaskT>
107  auto tryPush(TaskT&& task, bool& success) -> std::future<decltype(task(std::declval<C>()))> {
108  std::future<decltype(task(std::declval<C>()))> future;
109  success = false;
110  {
111  LockType lock{ myMutex, std::try_to_lock };
112  if (!lock) {
113  return future;
114  }
115  using PkgTask = std::packaged_task<decltype(task(std::declval<C>()))(C)>;
116  auto job = std::unique_ptr<Task<PkgTask, C>>(new Task<PkgTask, C>(PkgTask(std::forward<TaskT>(task))));
117  future = job->task.get_future();
118  success = true;
119  myQueue.emplace(std::move(job));
120  }
121 
122  myReady.notify_one();
123  return future;
124  }
125 
126 private:
127  TaskQueue(const TaskQueue&) = delete;
128  TaskQueue& operator=(const TaskQueue&) = delete;
129 
130  std::queue<TaskPtrType> myQueue;
131  bool myEnabled = true;
132  mutable std::mutex myMutex;
133  std::condition_variable myReady;
134 };
virtual ~TaskBase()=default
virtual void exec(const C &context)=0
Definition: TaskQueue.h:37
T task
Definition: TaskQueue.h:44
Task(T &&t)
Definition: TaskQueue.h:39
void exec(const C &context) override
Definition: TaskQueue.h:40
auto tryPush(TaskT &&task, bool &success) -> std::future< decltype(task(std::declval< C >()))>
Definition: TaskQueue.h:107
bool waitAndPop(TaskPtrType &task)
Definition: TaskQueue.h:71
bool tryPop(TaskPtrType &task)
Definition: TaskQueue.h:96
std::condition_variable myReady
Definition: TaskQueue.h:133
TaskQueue()=default
TaskQueue & operator=(const TaskQueue &)=delete
std::queue< TaskPtrType > myQueue
Definition: TaskQueue.h:130
bool myEnabled
Definition: TaskQueue.h:131
bool isEnabled() const
Definition: TaskQueue.h:66
TaskQueue(const TaskQueue &)=delete
std::unique_lock< std::mutex > LockType
Definition: TaskQueue.h:49
auto push(TaskT &&task) -> std::future< decltype(task(std::declval< C >()))>
Definition: TaskQueue.h:83
std::unique_ptr< TaskBase< C > > TaskPtrType
Definition: TaskQueue.h:52
~TaskQueue()=default
std::mutex myMutex
Definition: TaskQueue.h:132
void setEnabled(bool enabled)
Definition: TaskQueue.h:56