LLVM 23.0.0git
Parallel.cpp
Go to the documentation of this file.
1//===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8
10#include "llvm/ADT/ScopeExit.h"
11#include "llvm/Config/llvm-config.h"
16
17#include <atomic>
18#include <future>
19#include <memory>
20#include <mutex>
21#include <thread>
22#include <vector>
23
24using namespace llvm;
25using namespace llvm::parallel;
26
28
29#if LLVM_ENABLE_THREADS
30
31#ifdef _WIN32
32static thread_local unsigned threadIndex = UINT_MAX;
33
34unsigned parallel::getThreadIndex() { GET_THREAD_INDEX_IMPL; }
35#else
36thread_local unsigned parallel::threadIndex = UINT_MAX;
37#endif
38
39namespace {
40
41/// Runs closures on a thread pool in filo order.
42class ThreadPoolExecutor {
43public:
44 explicit ThreadPoolExecutor(ThreadPoolStrategy S) {
45 if (S.UseJobserver)
46 TheJobserver = JobserverClient::getInstance();
47
49 // Spawn all but one of the threads in another thread as spawning threads
50 // can take a while.
51 Threads.reserve(ThreadCount);
52 Threads.resize(1);
53 std::lock_guard<std::mutex> Lock(Mutex);
54 // Use operator[] before creating the thread to avoid data race in .size()
55 // in 'safe libc++' mode.
56 auto &Thread0 = Threads[0];
57 Thread0 = std::thread([this, S] {
58 for (unsigned I = 1; I < ThreadCount; ++I) {
59 Threads.emplace_back([this, S, I] { work(S, I); });
60 if (Stop)
61 break;
62 }
63 ThreadsCreated.set_value();
64 work(S, 0);
65 });
66 }
67
68 // To make sure the thread pool executor can only be created with a parallel
69 // strategy.
70 ThreadPoolExecutor() = delete;
71
72 void stop() {
73 {
74 std::lock_guard<std::mutex> Lock(Mutex);
75 if (Stop)
76 return;
77 Stop = true;
78 }
79 Cond.notify_all();
80 ThreadsCreated.get_future().wait();
81
82 std::thread::id CurrentThreadId = std::this_thread::get_id();
83 for (std::thread &T : Threads)
84 if (T.get_id() == CurrentThreadId)
85 T.detach();
86 else
87 T.join();
88 }
89
90 ~ThreadPoolExecutor() { stop(); }
91
92 struct Creator {
93 static void *call() { return new ThreadPoolExecutor(strategy); }
94 };
95 struct Deleter {
96 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
97 };
98
99 struct WorkItem {
100 std::function<void()> F;
101 std::reference_wrapper<parallel::detail::Latch> L;
102 void operator()() {
103 F();
104 L.get().dec();
105 }
106 };
107
108 void add(std::function<void()> F, parallel::detail::Latch &L) {
109 {
110 std::lock_guard<std::mutex> Lock(Mutex);
111 WorkStack.push_back({std::move(F), std::ref(L)});
112 }
113 Cond.notify_one();
114 }
115
116 size_t getThreadCount() const { return ThreadCount; }
117
118private:
119 // Pop one task from the queue and run it. Must be called with Lock held;
120 // releases Lock before executing the task.
121 void popAndRun(std::unique_lock<std::mutex> &Lock) {
122 auto Item = std::move(WorkStack.back());
123 WorkStack.pop_back();
124 Lock.unlock();
125 Item();
126 }
127
128 void work(ThreadPoolStrategy S, unsigned ThreadID) {
129 threadIndex = ThreadID;
130 S.apply_thread_strategy(ThreadID);
131 // Note on jobserver deadlock avoidance:
132 // GNU Make grants each invoked process one implicit job slot. Our
133 // JobserverClient models this by returning an implicit JobSlot on the
134 // first successful tryAcquire() in a process. This guarantees forward
135 // progress without requiring a dedicated "always-on" thread here.
136
137 while (true) {
138 if (TheJobserver) {
139 // Jobserver-mode scheduling:
140 // - Acquire one job slot (with exponential backoff to avoid busy-wait).
141 // - While holding the slot, drain and run tasks from the local queue.
142 // - Release the slot when the queue is empty or when shutting down.
143 // Rationale: Holding a slot amortizes acquire/release overhead over
144 // multiple tasks and avoids requeue/yield churn, while still enforcing
145 // the jobserver’s global concurrency limit. With K available slots,
146 // up to K workers run tasks in parallel; within each worker tasks run
147 // sequentially until the local queue is empty.
148 ExponentialBackoff Backoff(std::chrono::hours(24));
149 JobSlot Slot;
150 do {
151 if (Stop)
152 return;
153 Slot = TheJobserver->tryAcquire();
154 if (Slot.isValid())
155 break;
156 } while (Backoff.waitForNextAttempt());
157
158 llvm::scope_exit SlotReleaser(
159 [&] { TheJobserver->release(std::move(Slot)); });
160
161 while (true) {
162 std::unique_lock<std::mutex> Lock(Mutex);
163 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
164 if (Stop && WorkStack.empty())
165 return;
166 if (WorkStack.empty())
167 break;
168 popAndRun(Lock);
169 }
170 } else {
171 std::unique_lock<std::mutex> Lock(Mutex);
172 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
173 if (Stop)
174 break;
175 popAndRun(Lock);
176 }
177 }
178 }
179
180 std::atomic<bool> Stop{false};
181 std::vector<WorkItem> WorkStack;
182 std::mutex Mutex;
183 std::condition_variable Cond;
184 std::promise<void> ThreadsCreated;
185 std::vector<std::thread> Threads;
186 unsigned ThreadCount;
187
188 JobserverClient *TheJobserver = nullptr;
189};
190} // namespace
191
192static ThreadPoolExecutor *getDefaultExecutor() {
193#ifdef _WIN32
194 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
195 // llvm_shutdown() on Windows. This is important to avoid various race
196 // conditions at process exit that can cause crashes or deadlocks.
197
198 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
199 ThreadPoolExecutor::Deleter>
200 ManagedExec;
201 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
202 return Exec.get();
203#else
204 // ManagedStatic is not desired on other platforms. When `Exec` is destroyed
205 // by llvm_shutdown(), worker threads will clean up and invoke TLS
206 // destructors. This can lead to race conditions if other threads attempt to
207 // access TLS objects that have already been destroyed.
208 static ThreadPoolExecutor Exec(strategy);
209 return &Exec;
210#endif
211}
212
214 return getDefaultExecutor()->getThreadCount();
215}
216#endif
217
218// Latch::sync() called by the dtor may cause one thread to block. If is a dead
219// lock if all threads in the default executor are blocked. To prevent the dead
220// lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
221// of nested parallel_for_each(), only the outermost one runs parallelly.
223 : Parallel(
224#if LLVM_ENABLE_THREADS
225 strategy.ThreadsRequested != 1 && threadIndex == UINT_MAX
226#else
227 false
228#endif
229 ) {
230}
232 // We must ensure that all the workloads have finished before decrementing the
233 // instances count.
234 L.sync();
235}
236
237void TaskGroup::spawn(std::function<void()> F) {
238#if LLVM_ENABLE_THREADS
239 if (Parallel) {
240 L.inc();
241 getDefaultExecutor()->add(std::move(F), L);
242 return;
243 }
244#endif
245 F();
246}
247
248void llvm::parallelFor(size_t Begin, size_t End,
249 function_ref<void(size_t)> Fn) {
250#if LLVM_ENABLE_THREADS
251 if (strategy.ThreadsRequested != 1) {
252 size_t NumItems = End - Begin;
253 if (NumItems == 0)
254 return;
255 // Distribute work via an atomic counter shared by NumWorkers threads,
256 // keeping the task count (and thus Linux futex calls) at O(ThreadCount)
257 // For lld, per-file work is somewhat uneven, so a multipler > 1 is safer.
258 // While 2 vs 4 vs 8 makes no measurable difference, 4 is used as a
259 // reasonable default.
260 size_t NumWorkers = std::min<size_t>(NumItems, getThreadCount());
261 size_t ChunkSize = std::max(size_t(1), NumItems / (NumWorkers * 4));
262 std::atomic<size_t> Idx{Begin};
263 auto Worker = [&] {
264 while (true) {
265 size_t I = Idx.fetch_add(ChunkSize, std::memory_order_relaxed);
266 if (I >= End)
267 break;
268 size_t IEnd = std::min(I + ChunkSize, End);
269 for (; I < IEnd; ++I)
270 Fn(I);
271 }
272 };
273
274 TaskGroup TG;
275 for (size_t I = 0; I != NumWorkers; ++I)
276 TG.spawn(Worker);
277 return;
278 }
279#endif
280
281 for (; Begin != End; ++Begin)
282 Fn(Begin);
283}
#define F(x, y, z)
Definition MD5.cpp:54
#define I(x, y, z)
Definition MD5.cpp:57
#define T
if(PassOpts->AAPipeline)
const SmallVectorImpl< MachineOperand > & Cond
This file defines the make_scope_exit function, which executes user-defined cleanup logic at scope ex...
static cl::opt< int > ThreadCount("threads", cl::init(0))
static LLVM_ABI_FOR_TEST JobserverClient * getInstance()
Returns the singleton instance of the JobserverClient.
ManagedStatic - This transparently changes the behavior of global statics to be lazily constructed on...
This tells how a thread pool will be used.
Definition Threading.h:115
LLVM_ABI void apply_thread_strategy(unsigned ThreadPoolNum) const
Assign the current thread to an ideal hardware CPU or NUMA node.
LLVM_ABI unsigned compute_thread_count() const
Retrieves the max available threads for the current strategy.
Definition Threading.cpp:43
bool UseJobserver
If true, the thread pool will attempt to coordinate with a GNU Make jobserver, acquiring a job slot b...
Definition Threading.h:149
An efficient, type-erasing, non-owning reference to a callable.
LLVM_ABI void spawn(std::function< void()> f)
Definition Parallel.cpp:237
LLVM_ABI ThreadPoolStrategy strategy
Definition Parallel.cpp:27
unsigned getThreadIndex()
Definition Parallel.h:56
size_t getThreadCount()
Definition Parallel.h:57
SmartMutex< false > Mutex
Mutex - A standard, always enforced mutex.
Definition Mutex.h:66
This is an optimization pass for GlobalISel generic memory operations.
scope_exit(Callable) -> scope_exit< Callable >
LLVM_ABI void parallelFor(size_t Begin, size_t End, function_ref< void(size_t)> Fn)
Definition Parallel.cpp:248