llvm-project/llvm/lib/Support/ThreadPool.cpp
River Riddle 6569cf2a44 [mlir] Add a ThreadPool to MLIRContext and refactor MLIR threading usage
This revision refactors the usage of multithreaded utilities in MLIR to use a common
thread pool within the MLIR context, in addition to a new utility that makes writing
multi-threaded code in MLIR less error prone. Using a unified thread pool brings about
several advantages:

* Better thread usage and more control
We currently use the static llvm threading utilities, which do not allow multiple
levels of asynchronous scheduling (even if there are open threads). This is due to
how the current TaskGroup structure works, which only allows one truly multithreaded
instance at a time. By having our own ThreadPool we gain more control and flexibility
over our job/thread scheduling, and in a followup can enable threading more parts of
the compiler.

* The static nature of TaskGroup causes issues in certain configurations
Due to the static nature of TaskGroup, there have been quite a few problems related to
destruction that have caused several downstream projects to disable threading. See
D104207 for discussion on some related fallout. By having a ThreadPool scoped to
the context, we don't have to worry about destruction and can ensure that any
additional MLIR thread usage ends when the context is destroyed.

Differential Revision: https://reviews.llvm.org/D104516
2021-06-23 01:29:24 +00:00

144 lines
4.6 KiB
C++

//==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
//
// This file implements a crude C++11 based thread pool.
//
//===----------------------------------------------------------------------===//
#include "llvm/Support/ThreadPool.h"
#include "llvm/Config/llvm-config.h"
#include "llvm/Support/Threading.h"
#include "llvm/Support/raw_ostream.h"
using namespace llvm;
#if LLVM_ENABLE_THREADS
ThreadPool::ThreadPool(ThreadPoolStrategy S)
: ThreadCount(S.compute_thread_count()) {
// Create ThreadCount threads that will loop forever, wait on QueueCondition
// for tasks to be queued or the Pool to be destroyed.
Threads.reserve(ThreadCount);
for (unsigned ThreadID = 0; ThreadID < ThreadCount; ++ThreadID) {
Threads.emplace_back([S, ThreadID, this] {
S.apply_thread_strategy(ThreadID);
while (true) {
PackagedTaskTy Task;
{
std::unique_lock<std::mutex> LockGuard(QueueLock);
// Wait for tasks to be pushed in the queue
QueueCondition.wait(LockGuard,
[&] { return !EnableFlag || !Tasks.empty(); });
// Exit condition
if (!EnableFlag && Tasks.empty())
return;
// Yeah, we have a task, grab it and release the lock on the queue
// We first need to signal that we are active before popping the queue
// in order for wait() to properly detect that even if the queue is
// empty, there is still a task in flight.
++ActiveThreads;
Task = std::move(Tasks.front());
Tasks.pop();
}
// Run the task we just grabbed
Task();
bool Notify;
{
// Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
std::lock_guard<std::mutex> LockGuard(QueueLock);
--ActiveThreads;
Notify = workCompletedUnlocked();
}
// Notify task completion if this is the last active thread, in case
// someone waits on ThreadPool::wait().
if (Notify)
CompletionCondition.notify_all();
}
});
}
}
void ThreadPool::wait() {
// Wait for all threads to complete and the queue to be empty
std::unique_lock<std::mutex> LockGuard(QueueLock);
CompletionCondition.wait(LockGuard, [&] { return workCompletedUnlocked(); });
}
bool ThreadPool::isWorkerThread() const {
std::thread::id CurrentThreadId = std::this_thread::get_id();
for (const std::thread &Thread : Threads)
if (CurrentThreadId == Thread.get_id())
return true;
return false;
}
std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
/// Wrap the Task in a packaged_task to return a future object.
PackagedTaskTy PackagedTask(std::move(Task));
auto Future = PackagedTask.get_future();
{
// Lock the queue and push the new task
std::unique_lock<std::mutex> LockGuard(QueueLock);
// Don't allow enqueueing after disabling the pool
assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
Tasks.push(std::move(PackagedTask));
}
QueueCondition.notify_one();
return Future.share();
}
// The destructor joins all threads, waiting for completion.
ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> LockGuard(QueueLock);
EnableFlag = false;
}
QueueCondition.notify_all();
for (auto &Worker : Threads)
Worker.join();
}
#else // LLVM_ENABLE_THREADS Disabled
// No threads are launched, issue a warning if ThreadCount is not 0
ThreadPool::ThreadPool(ThreadPoolStrategy S)
: ThreadCount(S.compute_thread_count()) {
if (ThreadCount != 1) {
errs() << "Warning: request a ThreadPool with " << ThreadCount
<< " threads, but LLVM_ENABLE_THREADS has been turned off\n";
}
}
void ThreadPool::wait() {
// Sequential implementation running the tasks
while (!Tasks.empty()) {
auto Task = std::move(Tasks.front());
Tasks.pop();
Task();
}
}
std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) {
// Get a Future with launch::deferred execution using std::async
auto Future = std::async(std::launch::deferred, std::move(Task)).share();
// Wrap the future so that both ThreadPool::wait() can operate and the
// returned future can be sync'ed on.
PackagedTaskTy PackagedTask([Future]() { Future.get(); });
Tasks.push(std::move(PackagedTask));
return Future;
}
ThreadPool::~ThreadPool() { wait(); }
#endif