llvm-project/llvm/lib/Support/Parallel.cpp
Yaxun (Sam) Liu ffc503edd0
[LLVM] Add GNU make jobserver support (#145131)
This patch introduces support for the jobserver protocol to control
parallelism for device offloading tasks.

When running a parallel build with a modern build system like `make -jN`
or `ninja -jN`, each Clang process might also be configured to use
multiple threads for its own tasks (e.g., via `--offload-jobs=4`). This
can lead to an explosion of threads (N * 4), causing heavy system load,
CPU contention, and ultimately slowing down the entire build.

This patch allows Clang to act as a cooperative client of the build
system's jobserver. It extends the `--offload-jobs` option to accept the
value 'jobserver'. With the recent addition of jobserver support to the
Ninja build system, this functionality now benefits users of both Make
and Ninja.

When `--offload-jobs=jobserver` is specified, Clang's thread pool will:
1. Parse the MAKEFLAGS environment variable to find the jobserver
details.
2. Before dispatching a task, acquire a job slot from the jobserver. If
none are available, the worker thread will block.
3. Release the job slot once the task is complete.

This ensures that the total number of active offload tasks across all
Clang processes does not exceed the limit defined by the parent build
system, leading to more efficient and controlled parallel builds.

Implementation:
- A new library, `llvm/Support/Jobserver`, is added to provide a
platform-agnostic client for the jobserver protocol, with backends for
Unix (FIFO) and Windows (semaphores).
- `llvm/Support/ThreadPool` and `llvm/Support/Parallel` are updated with
a `jobserver_concurrency` strategy to integrate this logic.
- The Clang driver and linker-wrapper are modified to recognize the
'jobserver' argument and enable the new thread pool strategy.
- New unit and integration tests are added to validate the feature.
2025-10-03 09:25:49 -04:00

318 lines
9.9 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
#include "llvm/Support/Parallel.h"
#include "llvm/ADT/ScopeExit.h"
#include "llvm/Config/llvm-config.h"
#include "llvm/Support/ExponentialBackoff.h"
#include "llvm/Support/Jobserver.h"
#include "llvm/Support/ManagedStatic.h"
#include "llvm/Support/Threading.h"
#include <atomic>
#include <future>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
llvm::ThreadPoolStrategy llvm::parallel::strategy;
namespace llvm {
namespace parallel {
#if LLVM_ENABLE_THREADS
#ifdef _WIN32
static thread_local unsigned threadIndex = UINT_MAX;
unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; }
#else
thread_local unsigned threadIndex = UINT_MAX;
#endif
namespace detail {
namespace {
/// An abstract class that takes closures and runs them asynchronously.
class Executor {
public:
virtual ~Executor() = default;
virtual void add(std::function<void()> func) = 0;
virtual size_t getThreadCount() const = 0;
static Executor *getDefaultExecutor();
};
/// An implementation of an Executor that runs closures on a thread pool
/// in filo order.
class ThreadPoolExecutor : public Executor {
public:
explicit ThreadPoolExecutor(ThreadPoolStrategy S) {
if (S.UseJobserver)
TheJobserver = JobserverClient::getInstance();
ThreadCount = S.compute_thread_count();
// Spawn all but one of the threads in another thread as spawning threads
// can take a while.
Threads.reserve(ThreadCount);
Threads.resize(1);
std::lock_guard<std::mutex> Lock(Mutex);
// Use operator[] before creating the thread to avoid data race in .size()
// in 'safe libc++' mode.
auto &Thread0 = Threads[0];
Thread0 = std::thread([this, S] {
for (unsigned I = 1; I < ThreadCount; ++I) {
Threads.emplace_back([this, S, I] { work(S, I); });
if (Stop)
break;
}
ThreadsCreated.set_value();
work(S, 0);
});
}
// To make sure the thread pool executor can only be created with a parallel
// strategy.
ThreadPoolExecutor() = delete;
void stop() {
{
std::lock_guard<std::mutex> Lock(Mutex);
if (Stop)
return;
Stop = true;
}
Cond.notify_all();
ThreadsCreated.get_future().wait();
}
~ThreadPoolExecutor() override {
stop();
std::thread::id CurrentThreadId = std::this_thread::get_id();
for (std::thread &T : Threads)
if (T.get_id() == CurrentThreadId)
T.detach();
else
T.join();
}
struct Creator {
static void *call() { return new ThreadPoolExecutor(strategy); }
};
struct Deleter {
static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
};
void add(std::function<void()> F) override {
{
std::lock_guard<std::mutex> Lock(Mutex);
WorkStack.push_back(std::move(F));
}
Cond.notify_one();
}
size_t getThreadCount() const override { return ThreadCount; }
private:
void work(ThreadPoolStrategy S, unsigned ThreadID) {
threadIndex = ThreadID;
S.apply_thread_strategy(ThreadID);
// Note on jobserver deadlock avoidance:
// GNU Make grants each invoked process one implicit job slot. Our
// JobserverClient models this by returning an implicit JobSlot on the
// first successful tryAcquire() in a process. This guarantees forward
// progress without requiring a dedicated "always-on" thread here.
static thread_local std::unique_ptr<ExponentialBackoff> Backoff;
while (true) {
if (TheJobserver) {
// Jobserver-mode scheduling:
// - Acquire one job slot (with exponential backoff to avoid busy-wait).
// - While holding the slot, drain and run tasks from the local queue.
// - Release the slot when the queue is empty or when shutting down.
// Rationale: Holding a slot amortizes acquire/release overhead over
// multiple tasks and avoids requeue/yield churn, while still enforcing
// the jobservers global concurrency limit. With K available slots,
// up to K workers run tasks in parallel; within each worker tasks run
// sequentially until the local queue is empty.
ExponentialBackoff Backoff(std::chrono::hours(24));
JobSlot Slot;
do {
if (Stop)
return;
Slot = TheJobserver->tryAcquire();
if (Slot.isValid())
break;
} while (Backoff.waitForNextAttempt());
auto SlotReleaser = llvm::make_scope_exit(
[&] { TheJobserver->release(std::move(Slot)); });
while (true) {
std::function<void()> Task;
{
std::unique_lock<std::mutex> Lock(Mutex);
Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
if (Stop && WorkStack.empty())
return;
if (WorkStack.empty())
break;
Task = std::move(WorkStack.back());
WorkStack.pop_back();
}
Task();
}
} else {
std::unique_lock<std::mutex> Lock(Mutex);
Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
if (Stop)
break;
auto Task = std::move(WorkStack.back());
WorkStack.pop_back();
Lock.unlock();
Task();
}
}
}
std::atomic<bool> Stop{false};
std::vector<std::function<void()>> WorkStack;
std::mutex Mutex;
std::condition_variable Cond;
std::promise<void> ThreadsCreated;
std::vector<std::thread> Threads;
unsigned ThreadCount;
JobserverClient *TheJobserver = nullptr;
};
// A global raw pointer to the executor. Lifetime is managed by the
// objects created within createExecutor().
static Executor *TheExec = nullptr;
static std::once_flag Flag;
// This function will be called exactly once to create the executor.
// It contains the necessary platform-specific logic. Since functions
// called by std::call_once cannot return value, we have to set the
// executor as a global variable.
void createExecutor() {
#ifdef _WIN32
// The ManagedStatic enables the ThreadPoolExecutor to be stopped via
// llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
// stops the thread pool and waits for any worker thread creation to complete
// but does not wait for the threads to finish. The wait for worker thread
// creation to complete is important as it prevents intermittent crashes on
// Windows due to a race condition between thread creation and process exit.
//
// The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
// it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
// destructor ensures it has been stopped and waits for worker threads to
// finish. The wait is important as it prevents intermittent crashes on
// Windows when the process is doing a full exit.
//
// The Windows crashes appear to only occur with the MSVC static runtimes and
// are more frequent with the debug static runtime.
//
// This also prevents intermittent deadlocks on exit with the MinGW runtime.
static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
ThreadPoolExecutor::Deleter>
ManagedExec;
static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
TheExec = Exec.get();
#else
// ManagedStatic is not desired on other platforms. When `Exec` is destroyed
// by llvm_shutdown(), worker threads will clean up and invoke TLS
// destructors. This can lead to race conditions if other threads attempt to
// access TLS objects that have already been destroyed.
static ThreadPoolExecutor Exec(strategy);
TheExec = &Exec;
#endif
}
Executor *Executor::getDefaultExecutor() {
// Use std::call_once to lazily and safely initialize the executor.
std::call_once(Flag, createExecutor);
return TheExec;
}
} // namespace
} // namespace detail
size_t getThreadCount() {
return detail::Executor::getDefaultExecutor()->getThreadCount();
}
#endif
// Latch::sync() called by the dtor may cause one thread to block. If is a dead
// lock if all threads in the default executor are blocked. To prevent the dead
// lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
// of nested parallel_for_each(), only the outermost one runs parallelly.
TaskGroup::TaskGroup()
#if LLVM_ENABLE_THREADS
: Parallel((parallel::strategy.ThreadsRequested != 1) &&
(threadIndex == UINT_MAX)) {}
#else
: Parallel(false) {}
#endif
TaskGroup::~TaskGroup() {
// We must ensure that all the workloads have finished before decrementing the
// instances count.
L.sync();
}
void TaskGroup::spawn(std::function<void()> F) {
#if LLVM_ENABLE_THREADS
if (Parallel) {
L.inc();
detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
F();
L.dec();
});
return;
}
#endif
F();
}
} // namespace parallel
} // namespace llvm
void llvm::parallelFor(size_t Begin, size_t End,
llvm::function_ref<void(size_t)> Fn) {
#if LLVM_ENABLE_THREADS
if (parallel::strategy.ThreadsRequested != 1) {
auto NumItems = End - Begin;
// Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
// overhead on large inputs.
auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
if (TaskSize == 0)
TaskSize = 1;
parallel::TaskGroup TG;
for (; Begin + TaskSize < End; Begin += TaskSize) {
TG.spawn([=, &Fn] {
for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
Fn(I);
});
}
if (Begin != End) {
TG.spawn([=, &Fn] {
for (size_t I = Begin; I != End; ++I)
Fn(I);
});
}
return;
}
#endif
for (; Begin != End; ++Begin)
Fn(Begin);
}