LLVM 22.0.0git
ThreadPool.h
Go to the documentation of this file.
1//===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8//
9// This file defines a crude C++11 based thread pool.
10//
11//===----------------------------------------------------------------------===//
12
13#ifndef LLVM_SUPPORT_THREADPOOL_H
14#define LLVM_SUPPORT_THREADPOOL_H
15
16#include "llvm/ADT/DenseMap.h"
18#include "llvm/Config/llvm-config.h"
23#include "llvm/Support/thread.h"
24
25#include <future>
26
27#include <condition_variable>
28#include <deque>
29#include <functional>
30#include <memory>
31#include <mutex>
32#include <utility>
33
34namespace llvm {
35
37
38/// This defines the abstract base interface for a ThreadPool allowing
39/// asynchronous parallel execution on a defined number of threads.
40///
41/// It is possible to reuse one thread pool for different groups of tasks
42/// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using
43/// the same queue, but it is possible to wait only for a specific group of
44/// tasks to finish.
45///
46/// It is also possible for worker threads to submit new tasks and wait for
47/// them. Note that this may result in a deadlock in cases such as when a task
48/// (directly or indirectly) tries to wait for its own completion, or when all
49/// available threads are used up by tasks waiting for a task that has no thread
50/// left to run on (this includes waiting on the returned future). It should be
51/// generally safe to wait() for a group as long as groups do not form a cycle.
53 /// The actual method to enqueue a task to be defined by the concrete
54 /// implementation.
55 virtual void asyncEnqueue(llvm::unique_function<void()> Task,
56 ThreadPoolTaskGroup *Group) = 0;
57
58public:
59 /// Destroying the pool will drain the pending tasks and wait. The current
60 /// thread may participate in the execution of the pending tasks.
62
63 /// Blocking wait for all the threads to complete and the queue to be empty.
64 /// It is an error to try to add new tasks while blocking on this call.
65 /// Calling wait() from a task would deadlock waiting for itself.
66 virtual void wait() = 0;
67
68 /// Blocking wait for only all the threads in the given group to complete.
69 /// It is possible to wait even inside a task, but waiting (directly or
70 /// indirectly) on itself will deadlock. If called from a task running on a
71 /// worker thread, the call may process pending tasks while waiting in order
72 /// not to waste the thread.
73 virtual void wait(ThreadPoolTaskGroup &Group) = 0;
74
75 /// Returns the maximum number of worker this pool can eventually grow to.
76 virtual unsigned getMaxConcurrency() const = 0;
77
78 /// Asynchronous submission of a task to the pool. The returned future can be
79 /// used to wait for the task to finish and is *non-blocking* on destruction.
80 template <typename Function, typename... Args>
81 auto async(Function &&F, Args &&...ArgList) {
82 auto Task =
83 std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
84 return async(std::move(Task));
85 }
86
87 /// Overload, task will be in the given task group.
88 template <typename Function, typename... Args>
89 auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList) {
90 auto Task =
91 std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
92 return async(Group, std::move(Task));
93 }
94
95 /// Asynchronous submission of a task to the pool. The returned future can be
96 /// used to wait for the task to finish and is *non-blocking* on destruction.
97 template <typename Func>
98 auto async(Func &&F) -> std::shared_future<decltype(F())> {
99 return asyncImpl(
100 llvm::unique_function<decltype(F())()>(std::forward<Func>(F)), nullptr);
101 }
102
103 template <typename Func>
104 auto async(ThreadPoolTaskGroup &Group, Func &&F)
105 -> std::shared_future<decltype(F())> {
106 return asyncImpl(
107 llvm::unique_function<decltype(F())()>(std::forward<Func>(F)), &Group);
108 }
109
110private:
111 /// Asynchronous submission of a task to the pool. The returned future can be
112 /// used to wait for the task to finish and is *non-blocking* on destruction.
113 template <typename ResTy>
114 std::shared_future<ResTy> asyncImpl(llvm::unique_function<ResTy()> Task,
115 ThreadPoolTaskGroup *Group) {
116 auto Future = std::async(std::launch::deferred, std::move(Task)).share();
117 asyncEnqueue([Future]() { Future.wait(); }, Group);
118 return Future;
119 }
120};
121
122#if LLVM_ENABLE_THREADS
123/// A ThreadPool implementation using std::threads.
124///
125/// The pool keeps a vector of threads alive, waiting on a condition variable
126/// for some work to become available.
127class LLVM_ABI StdThreadPool : public ThreadPoolInterface {
128public:
129 /// Construct a pool using the hardware strategy \p S for mapping hardware
130 /// execution resources (threads, cores, CPUs)
131 /// Defaults to using the maximum execution resources in the system, but
132 /// accounting for the affinity mask.
133 StdThreadPool(ThreadPoolStrategy S = hardware_concurrency());
134
135 /// Blocking destructor: the pool will wait for all the threads to complete.
136 ~StdThreadPool() override;
137
138 /// Blocking wait for all the threads to complete and the queue to be empty.
139 /// It is an error to try to add new tasks while blocking on this call.
140 /// Calling wait() from a task would deadlock waiting for itself.
141 void wait() override;
142
143 /// Blocking wait for only all the threads in the given group to complete.
144 /// It is possible to wait even inside a task, but waiting (directly or
145 /// indirectly) on itself will deadlock. If called from a task running on a
146 /// worker thread, the call may process pending tasks while waiting in order
147 /// not to waste the thread.
148 void wait(ThreadPoolTaskGroup &Group) override;
149
150 /// Returns the maximum number of worker threads in the pool, not the current
151 /// number of threads!
152 unsigned getMaxConcurrency() const override { return MaxThreadCount; }
153
154 /// Returns true if the current thread is a worker thread of this thread pool.
155 bool isWorkerThread() const;
156
157private:
158 /// Returns true if all tasks in the given group have finished (nullptr means
159 /// all tasks regardless of their group). QueueLock must be locked.
160 bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const;
161
162 /// Asynchronous submission of a task to the pool. The returned future can be
163 /// used to wait for the task to finish and is *non-blocking* on destruction.
164 void asyncEnqueue(llvm::unique_function<void()> Task,
165 ThreadPoolTaskGroup *Group) override {
166 int requestedThreads;
167 {
168 // Lock the queue and push the new task
169 std::unique_lock<std::mutex> LockGuard(QueueLock);
170
171 // Don't allow enqueueing after disabling the pool
172 assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
173 Tasks.emplace_back(std::make_pair(std::move(Task), Group));
174 requestedThreads = ActiveThreads + Tasks.size();
175 }
176 QueueCondition.notify_one();
177 grow(requestedThreads);
178 }
179
180 /// Grow to ensure that we have at least `requested` Threads, but do not go
181 /// over MaxThreadCount.
182 void grow(int requested);
183
184 void processTasks(ThreadPoolTaskGroup *WaitingForGroup);
185 void processTasksWithJobserver();
186
187 /// Threads in flight
188 std::vector<llvm::thread> Threads;
189 /// Lock protecting access to the Threads vector.
190 mutable llvm::sys::RWMutex ThreadsLock;
191
192 /// Tasks waiting for execution in the pool.
193 std::deque<std::pair<llvm::unique_function<void()>, ThreadPoolTaskGroup *>>
194 Tasks;
195
196 /// Locking and signaling for accessing the Tasks queue.
197 std::mutex QueueLock;
198 std::condition_variable QueueCondition;
199
200 /// Signaling for job completion (all tasks or all tasks in a group).
201 std::condition_variable CompletionCondition;
202
203 /// Keep track of the number of thread actually busy
204 unsigned ActiveThreads = 0;
205 /// Number of threads active for tasks in the given group (only non-zero).
206 DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups;
207
208 /// Signal for the destruction of the pool, asking thread to exit.
209 bool EnableFlag = true;
210
211 const ThreadPoolStrategy Strategy;
212
213 /// Maximum number of threads to potentially grow this pool to.
214 const unsigned MaxThreadCount;
215
216 JobserverClient *TheJobserver = nullptr;
217};
218#endif // LLVM_ENABLE_THREADS
219
220/// A non-threaded implementation.
222public:
223 /// Construct a non-threaded pool, ignoring using the hardware strategy.
225
226 /// Blocking destructor: the pool will first execute the pending tasks.
227 ~SingleThreadExecutor() override;
228
229 /// Blocking wait for all the tasks to execute first
230 void wait() override;
231
232 /// Blocking wait for only all the tasks in the given group to complete.
233 void wait(ThreadPoolTaskGroup &Group) override;
234
235 /// Returns always 1: there is no concurrency.
236 unsigned getMaxConcurrency() const override { return 1; }
237
238 /// Returns true if the current thread is a worker thread of this thread pool.
239 bool isWorkerThread() const;
240
241private:
242 /// Asynchronous submission of a task to the pool. The returned future can be
243 /// used to wait for the task to finish and is *non-blocking* on destruction.
244 void asyncEnqueue(llvm::unique_function<void()> Task,
245 ThreadPoolTaskGroup *Group) override {
246 Tasks.emplace_back(std::make_pair(std::move(Task), Group));
247 }
248
249 /// Tasks waiting for execution in the pool.
250 std::deque<std::pair<llvm::unique_function<void()>, ThreadPoolTaskGroup *>>
251 Tasks;
252};
253
254#if LLVM_ENABLE_THREADS
255using DefaultThreadPool = StdThreadPool;
256#else
258#endif
259
260/// A group of tasks to be run on a thread pool. Thread pool tasks in different
261/// groups can run on the same threadpool but can be waited for separately.
262/// It is even possible for tasks of one group to submit and wait for tasks
263/// of another group, as long as this does not form a loop.
265public:
266 /// The ThreadPool argument is the thread pool to forward calls to.
268
269 /// Blocking destructor: will wait for all the tasks in the group to complete
270 /// by calling ThreadPool::wait().
272
273 /// Calls ThreadPool::async() for this group.
274 template <typename Function, typename... Args>
275 inline auto async(Function &&F, Args &&...ArgList) {
276 return Pool.async(*this, std::forward<Function>(F),
277 std::forward<Args>(ArgList)...);
278 }
279
280 /// Calls ThreadPool::wait() for this group.
281 void wait() { Pool.wait(*this); }
282
283private:
285};
286
287} // namespace llvm
288
289#endif // LLVM_SUPPORT_THREADPOOL_H
assert(UImm &&(UImm !=~static_cast< T >(0)) &&"Invalid immediate!")
#define LLVM_ABI
Definition Compiler.h:213
This file defines the DenseMap class.
This file provides a collection of function (or more generally, callable) type erasure utilities supp...
#define F(x, y, z)
Definition MD5.cpp:55
A non-threaded implementation.
Definition ThreadPool.h:221
SingleThreadExecutor(ThreadPoolStrategy ignored={})
Construct a non-threaded pool, ignoring using the hardware strategy.
void wait() override
Blocking wait for all the tasks to execute first.
unsigned getMaxConcurrency() const override
Returns always 1: there is no concurrency.
Definition ThreadPool.h:236
This defines the abstract base interface for a ThreadPool allowing asynchronous parallel execution on...
Definition ThreadPool.h:52
auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList)
Overload, task will be in the given task group.
Definition ThreadPool.h:89
virtual void wait()=0
Blocking wait for all the threads to complete and the queue to be empty.
auto async(ThreadPoolTaskGroup &Group, Func &&F) -> std::shared_future< decltype(F())>
Definition ThreadPool.h:104
virtual unsigned getMaxConcurrency() const =0
Returns the maximum number of worker this pool can eventually grow to.
auto async(Func &&F) -> std::shared_future< decltype(F())>
Asynchronous submission of a task to the pool.
Definition ThreadPool.h:98
virtual ~ThreadPoolInterface()
Destroying the pool will drain the pending tasks and wait.
auto async(Function &&F, Args &&...ArgList)
Asynchronous submission of a task to the pool.
Definition ThreadPool.h:81
virtual void wait(ThreadPoolTaskGroup &Group)=0
Blocking wait for only all the threads in the given group to complete.
This tells how a thread pool will be used.
Definition Threading.h:115
A group of tasks to be run on a thread pool.
Definition ThreadPool.h:264
auto async(Function &&F, Args &&...ArgList)
Calls ThreadPool::async() for this group.
Definition ThreadPool.h:275
void wait()
Calls ThreadPool::wait() for this group.
Definition ThreadPool.h:281
~ThreadPoolTaskGroup()
Blocking destructor: will wait for all the tasks in the group to complete by calling ThreadPool::wait...
Definition ThreadPool.h:271
ThreadPoolTaskGroup(ThreadPoolInterface &Pool)
The ThreadPool argument is the thread pool to forward calls to.
Definition ThreadPool.h:267
unique_function is a type-erasing functor similar to std::function.
SmartRWMutex< false > RWMutex
Definition RWMutex.h:165
This is an optimization pass for GlobalISel generic memory operations.
ThreadPoolStrategy hardware_concurrency(unsigned ThreadCount=0)
Returns a default thread strategy where all available hardware resources are to be used,...
Definition Threading.h:190
SingleThreadExecutor DefaultThreadPool
Definition ThreadPool.h:257