
PerThreadBumpPtrAllocator allows separating allocations by thread id. That makes allocations race free. It is possible because ThreadPoolExecutor class creates threads, keeps them until the destructor of ThreadPoolExecutor is called, and assigns ids to the threads. Thus PerThreadBumpPtrAllocator should be used with only threads created by ThreadPoolExecutor. This allocator is useful when thread safe BumpPtrAllocator is needed. Reviewed By: MaskRay, dexonsmith, andrewng Differential Revision: https://reviews.llvm.org/D142318
258 lines
7.5 KiB
C++
258 lines
7.5 KiB
C++
//===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
|
|
//
|
|
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
|
|
// See https://llvm.org/LICENSE.txt for license information.
|
|
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
|
|
//
|
|
//===----------------------------------------------------------------------===//
|
|
|
|
#include "llvm/Support/Parallel.h"
|
|
#include "llvm/Config/llvm-config.h"
|
|
#include "llvm/Support/ManagedStatic.h"
|
|
#include "llvm/Support/Threading.h"
|
|
|
|
#include <atomic>
|
|
#include <deque>
|
|
#include <future>
|
|
#include <thread>
|
|
#include <vector>
|
|
|
|
llvm::ThreadPoolStrategy llvm::parallel::strategy;
|
|
|
|
namespace llvm {
|
|
namespace parallel {
|
|
#if LLVM_ENABLE_THREADS
|
|
|
|
#ifdef _WIN32
|
|
static thread_local unsigned threadIndex = UINT_MAX;
|
|
|
|
unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; }
|
|
#else
|
|
thread_local unsigned threadIndex = UINT_MAX;
|
|
#endif
|
|
|
|
namespace detail {
|
|
|
|
namespace {
|
|
|
|
/// An abstract class that takes closures and runs them asynchronously.
|
|
class Executor {
|
|
public:
|
|
virtual ~Executor() = default;
|
|
virtual void add(std::function<void()> func, bool Sequential = false) = 0;
|
|
virtual size_t getThreadCount() const = 0;
|
|
|
|
static Executor *getDefaultExecutor();
|
|
};
|
|
|
|
/// An implementation of an Executor that runs closures on a thread pool
|
|
/// in filo order.
|
|
class ThreadPoolExecutor : public Executor {
|
|
public:
|
|
explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) {
|
|
ThreadCount = S.compute_thread_count();
|
|
// Spawn all but one of the threads in another thread as spawning threads
|
|
// can take a while.
|
|
Threads.reserve(ThreadCount);
|
|
Threads.resize(1);
|
|
std::lock_guard<std::mutex> Lock(Mutex);
|
|
// Use operator[] before creating the thread to avoid data race in .size()
|
|
// in “safe libc++” mode.
|
|
auto &Thread0 = Threads[0];
|
|
Thread0 = std::thread([this, S] {
|
|
for (unsigned I = 1; I < ThreadCount; ++I) {
|
|
Threads.emplace_back([=] { work(S, I); });
|
|
if (Stop)
|
|
break;
|
|
}
|
|
ThreadsCreated.set_value();
|
|
work(S, 0);
|
|
});
|
|
}
|
|
|
|
void stop() {
|
|
{
|
|
std::lock_guard<std::mutex> Lock(Mutex);
|
|
if (Stop)
|
|
return;
|
|
Stop = true;
|
|
}
|
|
Cond.notify_all();
|
|
ThreadsCreated.get_future().wait();
|
|
}
|
|
|
|
~ThreadPoolExecutor() override {
|
|
stop();
|
|
std::thread::id CurrentThreadId = std::this_thread::get_id();
|
|
for (std::thread &T : Threads)
|
|
if (T.get_id() == CurrentThreadId)
|
|
T.detach();
|
|
else
|
|
T.join();
|
|
}
|
|
|
|
struct Creator {
|
|
static void *call() { return new ThreadPoolExecutor(strategy); }
|
|
};
|
|
struct Deleter {
|
|
static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
|
|
};
|
|
|
|
void add(std::function<void()> F, bool Sequential = false) override {
|
|
{
|
|
std::lock_guard<std::mutex> Lock(Mutex);
|
|
if (Sequential)
|
|
WorkQueueSequential.emplace_front(std::move(F));
|
|
else
|
|
WorkQueue.emplace_back(std::move(F));
|
|
}
|
|
Cond.notify_one();
|
|
}
|
|
|
|
size_t getThreadCount() const override { return ThreadCount; }
|
|
|
|
private:
|
|
bool hasSequentialTasks() const {
|
|
return !WorkQueueSequential.empty() && !SequentialQueueIsLocked;
|
|
}
|
|
|
|
bool hasGeneralTasks() const { return !WorkQueue.empty(); }
|
|
|
|
void work(ThreadPoolStrategy S, unsigned ThreadID) {
|
|
threadIndex = ThreadID;
|
|
S.apply_thread_strategy(ThreadID);
|
|
while (true) {
|
|
std::unique_lock<std::mutex> Lock(Mutex);
|
|
Cond.wait(Lock, [&] {
|
|
return Stop || hasGeneralTasks() || hasSequentialTasks();
|
|
});
|
|
if (Stop)
|
|
break;
|
|
bool Sequential = hasSequentialTasks();
|
|
if (Sequential)
|
|
SequentialQueueIsLocked = true;
|
|
else
|
|
assert(hasGeneralTasks());
|
|
|
|
auto &Queue = Sequential ? WorkQueueSequential : WorkQueue;
|
|
auto Task = std::move(Queue.back());
|
|
Queue.pop_back();
|
|
Lock.unlock();
|
|
Task();
|
|
if (Sequential)
|
|
SequentialQueueIsLocked = false;
|
|
}
|
|
}
|
|
|
|
std::atomic<bool> Stop{false};
|
|
std::atomic<bool> SequentialQueueIsLocked{false};
|
|
std::deque<std::function<void()>> WorkQueue;
|
|
std::deque<std::function<void()>> WorkQueueSequential;
|
|
std::mutex Mutex;
|
|
std::condition_variable Cond;
|
|
std::promise<void> ThreadsCreated;
|
|
std::vector<std::thread> Threads;
|
|
unsigned ThreadCount;
|
|
};
|
|
|
|
Executor *Executor::getDefaultExecutor() {
|
|
// The ManagedStatic enables the ThreadPoolExecutor to be stopped via
|
|
// llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
|
|
// stops the thread pool and waits for any worker thread creation to complete
|
|
// but does not wait for the threads to finish. The wait for worker thread
|
|
// creation to complete is important as it prevents intermittent crashes on
|
|
// Windows due to a race condition between thread creation and process exit.
|
|
//
|
|
// The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
|
|
// it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
|
|
// destructor ensures it has been stopped and waits for worker threads to
|
|
// finish. The wait is important as it prevents intermittent crashes on
|
|
// Windows when the process is doing a full exit.
|
|
//
|
|
// The Windows crashes appear to only occur with the MSVC static runtimes and
|
|
// are more frequent with the debug static runtime.
|
|
//
|
|
// This also prevents intermittent deadlocks on exit with the MinGW runtime.
|
|
|
|
static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
|
|
ThreadPoolExecutor::Deleter>
|
|
ManagedExec;
|
|
static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
|
|
return Exec.get();
|
|
}
|
|
} // namespace
|
|
} // namespace detail
|
|
|
|
size_t getThreadCount() {
|
|
return detail::Executor::getDefaultExecutor()->getThreadCount();
|
|
}
|
|
#endif
|
|
|
|
// Latch::sync() called by the dtor may cause one thread to block. If is a dead
|
|
// lock if all threads in the default executor are blocked. To prevent the dead
|
|
// lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
|
|
// of nested parallel_for_each(), only the outermost one runs parallelly.
|
|
TaskGroup::TaskGroup()
|
|
#if LLVM_ENABLE_THREADS
|
|
: Parallel((parallel::strategy.ThreadsRequested != 1) &&
|
|
(threadIndex == UINT_MAX)) {}
|
|
#else
|
|
: Parallel(false) {}
|
|
#endif
|
|
TaskGroup::~TaskGroup() {
|
|
// We must ensure that all the workloads have finished before decrementing the
|
|
// instances count.
|
|
L.sync();
|
|
}
|
|
|
|
void TaskGroup::spawn(std::function<void()> F, bool Sequential) {
|
|
#if LLVM_ENABLE_THREADS
|
|
if (Parallel) {
|
|
L.inc();
|
|
detail::Executor::getDefaultExecutor()->add(
|
|
[&, F = std::move(F)] {
|
|
F();
|
|
L.dec();
|
|
},
|
|
Sequential);
|
|
return;
|
|
}
|
|
#endif
|
|
F();
|
|
}
|
|
|
|
} // namespace parallel
|
|
} // namespace llvm
|
|
|
|
void llvm::parallelFor(size_t Begin, size_t End,
|
|
llvm::function_ref<void(size_t)> Fn) {
|
|
#if LLVM_ENABLE_THREADS
|
|
if (parallel::strategy.ThreadsRequested != 1) {
|
|
auto NumItems = End - Begin;
|
|
// Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
|
|
// overhead on large inputs.
|
|
auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
|
|
if (TaskSize == 0)
|
|
TaskSize = 1;
|
|
|
|
parallel::TaskGroup TG;
|
|
for (; Begin + TaskSize < End; Begin += TaskSize) {
|
|
TG.spawn([=, &Fn] {
|
|
for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
|
|
Fn(I);
|
|
});
|
|
}
|
|
if (Begin != End) {
|
|
TG.spawn([=, &Fn] {
|
|
for (size_t I = Begin; I != End; ++I)
|
|
Fn(I);
|
|
});
|
|
}
|
|
return;
|
|
}
|
|
#endif
|
|
|
|
for (; Begin != End; ++Begin)
|
|
Fn(Begin);
|
|
}
|