LLVM  15.0.0git
ThreadPool.h
Go to the documentation of this file.
1 //===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 //
9 // This file defines a crude C++11 based thread pool.
10 //
11 //===----------------------------------------------------------------------===//
12 
13 #ifndef LLVM_SUPPORT_THREADPOOL_H
14 #define LLVM_SUPPORT_THREADPOOL_H
15 
16 #include "llvm/ADT/DenseMap.h"
17 #include "llvm/Config/llvm-config.h"
18 #include "llvm/Support/RWMutex.h"
19 #include "llvm/Support/Threading.h"
20 #include "llvm/Support/thread.h"
21 
22 #include <future>
23 
24 #include <condition_variable>
25 #include <deque>
26 #include <functional>
27 #include <memory>
28 #include <mutex>
29 #include <utility>
30 
31 namespace llvm {
32 
33 class ThreadPoolTaskGroup;
34 
35 /// A ThreadPool for asynchronous parallel execution on a defined number of
36 /// threads.
37 ///
38 /// The pool keeps a vector of threads alive, waiting on a condition variable
39 /// for some work to become available.
40 ///
41 /// It is possible to reuse one thread pool for different groups of tasks
42 /// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using
43 /// the same queue, but it is possible to wait only for a specific group of
44 /// tasks to finish.
45 ///
46 /// It is also possible for worker threads to submit new tasks and wait for
47 /// them. Note that this may result in a deadlock in cases such as when a task
48 /// (directly or indirectly) tries to wait for its own completion, or when all
49 /// available threads are used up by tasks waiting for a task that has no thread
50 /// left to run on (this includes waiting on the returned future). It should be
51 /// generally safe to wait() for a group as long as groups do not form a cycle.
52 class ThreadPool {
53 public:
54  /// Construct a pool using the hardware strategy \p S for mapping hardware
55  /// execution resources (threads, cores, CPUs)
56  /// Defaults to using the maximum execution resources in the system, but
57  /// accounting for the affinity mask.
59 
60  /// Blocking destructor: the pool will wait for all the threads to complete.
61  ~ThreadPool();
62 
63  /// Asynchronous submission of a task to the pool. The returned future can be
64  /// used to wait for the task to finish and is *non-blocking* on destruction.
65  template <typename Function, typename... Args>
66  auto async(Function &&F, Args &&...ArgList) {
67  auto Task =
68  std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
69  return async(std::move(Task));
70  }
71 
72  /// Overload, task will be in the given task group.
73  template <typename Function, typename... Args>
74  auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList) {
75  auto Task =
76  std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
77  return async(Group, std::move(Task));
78  }
79 
80  /// Asynchronous submission of a task to the pool. The returned future can be
81  /// used to wait for the task to finish and is *non-blocking* on destruction.
82  template <typename Func>
83  auto async(Func &&F) -> std::shared_future<decltype(F())> {
84  return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
85  nullptr);
86  }
87 
88  template <typename Func>
89  auto async(ThreadPoolTaskGroup &Group, Func &&F)
90  -> std::shared_future<decltype(F())> {
91  return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
92  &Group);
93  }
94 
95  /// Blocking wait for all the threads to complete and the queue to be empty.
96  /// It is an error to try to add new tasks while blocking on this call.
97  /// Calling wait() from a task would deadlock waiting for itself.
98  void wait();
99 
100  /// Blocking wait for only all the threads in the given group to complete.
101  /// It is possible to wait even inside a task, but waiting (directly or
102  /// indirectly) on itself will deadlock. If called from a task running on a
103  /// worker thread, the call may process pending tasks while waiting in order
104  /// not to waste the thread.
105  void wait(ThreadPoolTaskGroup &Group);
106 
107  // TODO: misleading legacy name warning!
108  // Returns the maximum number of worker threads in the pool, not the current
109  // number of threads!
110  unsigned getThreadCount() const { return MaxThreadCount; }
111 
112  /// Returns true if the current thread is a worker thread of this thread pool.
113  bool isWorkerThread() const;
114 
115 private:
116  /// Helpers to create a promise and a callable wrapper of \p Task that sets
117  /// the result of the promise. Returns the callable and a future to access the
118  /// result.
119  template <typename ResTy>
120  static std::pair<std::function<void()>, std::future<ResTy>>
121  createTaskAndFuture(std::function<ResTy()> Task) {
122  std::shared_ptr<std::promise<ResTy>> Promise =
123  std::make_shared<std::promise<ResTy>>();
124  auto F = Promise->get_future();
125  return {
126  [Promise = std::move(Promise), Task]() { Promise->set_value(Task()); },
127  std::move(F)};
128  }
129  static std::pair<std::function<void()>, std::future<void>>
130  createTaskAndFuture(std::function<void()> Task) {
131  std::shared_ptr<std::promise<void>> Promise =
132  std::make_shared<std::promise<void>>();
133  auto F = Promise->get_future();
134  return {[Promise = std::move(Promise), Task]() {
135  Task();
136  Promise->set_value();
137  },
138  std::move(F)};
139  }
140 
141  /// Returns true if all tasks in the given group have finished (nullptr means
142  /// all tasks regardless of their group). QueueLock must be locked.
143  bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const;
144 
145  /// Asynchronous submission of a task to the pool. The returned future can be
146  /// used to wait for the task to finish and is *non-blocking* on destruction.
147  template <typename ResTy>
148  std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task,
149  ThreadPoolTaskGroup *Group) {
150 
151 #if LLVM_ENABLE_THREADS
152  /// Wrap the Task in a std::function<void()> that sets the result of the
153  /// corresponding future.
154  auto R = createTaskAndFuture(Task);
155 
156  int requestedThreads;
157  {
158  // Lock the queue and push the new task
159  std::unique_lock<std::mutex> LockGuard(QueueLock);
160 
161  // Don't allow enqueueing after disabling the pool
162  assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
163  Tasks.emplace_back(std::make_pair(std::move(R.first), Group));
164  requestedThreads = ActiveThreads + Tasks.size();
165  }
166  QueueCondition.notify_one();
167  grow(requestedThreads);
168  return R.second.share();
169 
170 #else // LLVM_ENABLE_THREADS Disabled
171 
172  // Get a Future with launch::deferred execution using std::async
173  auto Future = std::async(std::launch::deferred, std::move(Task)).share();
174  // Wrap the future so that both ThreadPool::wait() can operate and the
175  // returned future can be sync'ed on.
176  Tasks.emplace_back(std::make_pair([Future]() { Future.get(); }, Group));
177  return Future;
178 #endif
179  }
180 
181 #if LLVM_ENABLE_THREADS
182  // Grow to ensure that we have at least `requested` Threads, but do not go
183  // over MaxThreadCount.
184  void grow(int requested);
185 
186  void processTasks(ThreadPoolTaskGroup *WaitingForGroup);
187 #endif
188 
189  /// Threads in flight
190  std::vector<llvm::thread> Threads;
191  /// Lock protecting access to the Threads vector.
192  mutable llvm::sys::RWMutex ThreadsLock;
193 
194  /// Tasks waiting for execution in the pool.
195  std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
196 
197  /// Locking and signaling for accessing the Tasks queue.
198  std::mutex QueueLock;
199  std::condition_variable QueueCondition;
200 
201  /// Signaling for job completion (all tasks or all tasks in a group).
202  std::condition_variable CompletionCondition;
203 
204  /// Keep track of the number of thread actually busy
205  unsigned ActiveThreads = 0;
206  /// Number of threads active for tasks in the given group (only non-zero).
207  DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups;
208 
209 #if LLVM_ENABLE_THREADS // avoids warning for unused variable
210  /// Signal for the destruction of the pool, asking thread to exit.
211  bool EnableFlag = true;
212 #endif
213 
214  const ThreadPoolStrategy Strategy;
215 
216  /// Maximum number of threads to potentially grow this pool to.
217  const unsigned MaxThreadCount;
218 };
219 
220 /// A group of tasks to be run on a thread pool. Thread pool tasks in different
221 /// groups can run on the same threadpool but can be waited for separately.
222 /// It is even possible for tasks of one group to submit and wait for tasks
223 /// of another group, as long as this does not form a loop.
225 public:
226  /// The ThreadPool argument is the thread pool to forward calls to.
227  ThreadPoolTaskGroup(ThreadPool &Pool) : Pool(Pool) {}
228 
229  /// Blocking destructor: will wait for all the tasks in the group to complete
230  /// by calling ThreadPool::wait().
232 
233  /// Calls ThreadPool::async() for this group.
234  template <typename Function, typename... Args>
235  inline auto async(Function &&F, Args &&...ArgList) {
236  return Pool.async(*this, std::forward<Function>(F),
237  std::forward<Args>(ArgList)...);
238  }
239 
240  /// Calls ThreadPool::wait() for this group.
241  void wait() { Pool.wait(*this); }
242 
243 private:
244  ThreadPool &Pool;
245 };
246 
247 } // namespace llvm
248 
249 #endif // LLVM_SUPPORT_THREADPOOL_H
llvm::hardware_concurrency
ThreadPoolStrategy hardware_concurrency(unsigned ThreadCount=0)
Returns a default thread strategy where all available hardware resources are to be used,...
Definition: Threading.h:185
llvm
This is an optimization pass for GlobalISel generic memory operations.
Definition: AddressRanges.h:17
llvm::ThreadPool
A ThreadPool for asynchronous parallel execution on a defined number of threads.
Definition: ThreadPool.h:52
llvm::Function
Definition: Function.h:60
RWMutex.h
llvm::ThreadPoolStrategy
This tells how a thread pool will be used.
Definition: Threading.h:116
llvm::ThreadPoolTaskGroup::ThreadPoolTaskGroup
ThreadPoolTaskGroup(ThreadPool &Pool)
The ThreadPool argument is the thread pool to forward calls to.
Definition: ThreadPool.h:227
llvm::ThreadPool::ThreadPool
ThreadPool(ThreadPoolStrategy S=hardware_concurrency())
Construct a pool using the hardware strategy S for mapping hardware execution resources (threads,...
Definition: ThreadPool.cpp:35
DenseMap.h
F
#define F(x, y, z)
Definition: MD5.cpp:55
llvm::RISCVFenceField::R
@ R
Definition: RISCVBaseInfo.h:240
llvm::ThreadPoolTaskGroup::~ThreadPoolTaskGroup
~ThreadPoolTaskGroup()
Blocking destructor: will wait for all the tasks in the group to complete by calling ThreadPool::wait...
Definition: ThreadPool.h:231
llvm::ThreadPool::getThreadCount
unsigned getThreadCount() const
Definition: ThreadPool.h:110
Threading.h
llvm::ThreadPoolTaskGroup
A group of tasks to be run on a thread pool.
Definition: ThreadPool.h:224
move
compiles ldr LCPI1_0 ldr ldr mov lsr tst moveq r1 ldr LCPI1_1 and r0 bx lr It would be better to do something like to fold the shift into the conditional move
Definition: README.txt:546
llvm::ThreadPool::isWorkerThread
bool isWorkerThread() const
Returns true if the current thread is a worker thread of this thread pool.
Definition: ThreadPool.cpp:168
llvm::ThreadPoolTaskGroup::wait
void wait()
Calls ThreadPool::wait() for this group.
Definition: ThreadPool.h:241
llvm::ThreadPool::~ThreadPool
~ThreadPool()
Blocking destructor: the pool will wait for all the threads to complete.
Definition: ThreadPool.cpp:178
assert
assert(ImpDefSCC.getReg()==AMDGPU::SCC &&ImpDefSCC.isDef())
function
print Print MemDeps of function
Definition: MemDepPrinter.cpp:82
llvm::ThreadPool::async
auto async(ThreadPoolTaskGroup &Group, Func &&F) -> std::shared_future< decltype(F())>
Definition: ThreadPool.h:89
llvm::ThreadPool::async
auto async(Func &&F) -> std::shared_future< decltype(F())>
Asynchronous submission of a task to the pool.
Definition: ThreadPool.h:83
llvm::ThreadPool::async
auto async(Function &&F, Args &&...ArgList)
Asynchronous submission of a task to the pool.
Definition: ThreadPool.h:66
S
add sub stmia L5 ldr r0 bl L_printf $stub Instead of a and a wouldn t it be better to do three moves *Return an aggregate type is even return S
Definition: README.txt:210
llvm::ThreadPool::async
auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList)
Overload, task will be in the given task group.
Definition: ThreadPool.h:74
llvm::ThreadPoolTaskGroup::async
auto async(Function &&F, Args &&...ArgList)
Calls ThreadPool::async() for this group.
Definition: ThreadPool.h:235
llvm::ThreadPool::wait
void wait()
Blocking wait for all the threads to complete and the queue to be empty.
Definition: ThreadPool.cpp:143
llvm::sys::SmartRWMutex< false >
thread.h
llvm::AMDGPU::HSAMD::Kernel::Key::Args
constexpr char Args[]
Key for Kernel::Metadata::mArgs.
Definition: AMDGPUMetadata.h:394