[Support] Support nested parallel TaskGroup via work-stealing (#189293)
Nested TaskGroups run serially to prevent deadlock, as documented by https://reviews.llvm.org/D61115 and refined by https://reviews.llvm.org/D148984 to use threadIndex. Enable nested parallelism by having worker threads actively execute tasks from the work queue while waiting (work-stealing), instead of just blocking. Root-level TaskGroups (main thread) keep the efficient blocking Latch::sync(), so there is no overhead for the common non-nested case. In lld, https://reviews.llvm.org/D131247 worked around the limitation by passing a single root TaskGroup into OutputSection::writeTo and spawning 4MB-chunked tasks into it. However, SyntheticSection::writeTo calls with internal parallelism (e.g. GdbIndexSection, MergeNoTailSection) still ran serially on worker threads. With this change, their internal parallelFor/parallelForEach calls parallelize automatically via helpSync work-stealing. The increased parallelism can reorder error messages from parallel phases (e.g. relocation processing during section writes), so one lld test is updated to use --threads=1 for deterministic output.
This commit is contained in:
parent
dee982d6c8
commit
8daaa26efd
@ -3,8 +3,8 @@
|
||||
## Non-section symbols and offset <= section_size are accepted, matching GNU ld.
|
||||
|
||||
# RUN: llvm-mc %s -o %t.o -filetype=obj -triple=x86_64
|
||||
# RUN: not ld.lld %t.o -o /dev/null -shared 2>&1 | FileCheck %s -DPREFIX=error --implicit-check-not=error:
|
||||
# RUN: ld.lld %t.o -o /dev/null -shared --noinhibit-exec 2>&1 | FileCheck %s -DPREFIX=warning --implicit-check-not=warning:
|
||||
# RUN: not ld.lld --threads=1 %t.o -o /dev/null -shared 2>&1 | FileCheck %s -DPREFIX=error --implicit-check-not=error:
|
||||
# RUN: ld.lld --threads=1 %t.o -o /dev/null -shared --noinhibit-exec 2>&1 | FileCheck %s -DPREFIX=warning --implicit-check-not=warning:
|
||||
|
||||
## .foo is 8 bytes with entsize=8 (1 piece). .foo+8 (offset==size) is accepted.
|
||||
# CHECK: [[PREFIX]]: {{.*}}:(.foo): offset 0x9 is outside the section
|
||||
|
||||
@ -78,6 +78,8 @@ public:
|
||||
Cond.notify_all();
|
||||
}
|
||||
|
||||
uint32_t getCount() const { return Count.load(std::memory_order_acquire); }
|
||||
|
||||
void sync() const {
|
||||
std::unique_lock<std::mutex> lock(Mutex);
|
||||
Cond.wait(lock, [&] { return Count.load(std::memory_order_relaxed) == 0; });
|
||||
@ -94,12 +96,8 @@ public:
|
||||
LLVM_ABI ~TaskGroup();
|
||||
|
||||
// Spawn a task, but does not wait for it to finish.
|
||||
// Tasks marked with \p Sequential will be executed
|
||||
// exactly in the order which they were spawned.
|
||||
LLVM_ABI void spawn(std::function<void()> f);
|
||||
|
||||
void sync() const { L.sync(); }
|
||||
|
||||
bool isParallel() const { return Parallel; }
|
||||
};
|
||||
|
||||
|
||||
@ -113,6 +113,18 @@ public:
|
||||
Cond.notify_one();
|
||||
}
|
||||
|
||||
// Execute tasks from the work queue until the latch reaches zero.
|
||||
// Used by nested TaskGroups (on worker threads) to prevent deadlock:
|
||||
// instead of blocking in sync(), actively help drain the queue.
|
||||
void helpSync(const parallel::detail::Latch &L) {
|
||||
while (L.getCount() != 0) {
|
||||
std::unique_lock<std::mutex> Lock(Mutex);
|
||||
if (Stop || WorkStack.empty())
|
||||
return;
|
||||
popAndRun(Lock);
|
||||
}
|
||||
}
|
||||
|
||||
size_t getThreadCount() const { return ThreadCount; }
|
||||
|
||||
private:
|
||||
@ -215,22 +227,30 @@ size_t parallel::getThreadCount() {
|
||||
}
|
||||
#endif
|
||||
|
||||
// Latch::sync() called by the dtor may cause one thread to block. If is a dead
|
||||
// lock if all threads in the default executor are blocked. To prevent the dead
|
||||
// lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
|
||||
// of nested parallel_for_each(), only the outermost one runs parallelly.
|
||||
static bool isNested() {
|
||||
#if LLVM_ENABLE_THREADS
|
||||
return threadIndex != UINT_MAX;
|
||||
#else
|
||||
return false;
|
||||
#endif
|
||||
}
|
||||
|
||||
TaskGroup::TaskGroup()
|
||||
: Parallel(
|
||||
#if LLVM_ENABLE_THREADS
|
||||
strategy.ThreadsRequested != 1 && threadIndex == UINT_MAX
|
||||
strategy.ThreadsRequested != 1
|
||||
#else
|
||||
false
|
||||
#endif
|
||||
) {
|
||||
}
|
||||
|
||||
TaskGroup::~TaskGroup() {
|
||||
// We must ensure that all the workloads have finished before decrementing the
|
||||
// instances count.
|
||||
#if LLVM_ENABLE_THREADS
|
||||
// In a nested TaskGroup (threadIndex != -1u), actively help drain the queue.
|
||||
if (Parallel && isNested())
|
||||
getDefaultExecutor()->helpSync(L);
|
||||
#endif
|
||||
L.sync();
|
||||
}
|
||||
|
||||
|
||||
@ -95,73 +95,30 @@ TEST(Parallel, ForEachError) {
|
||||
|
||||
#if LLVM_ENABLE_THREADS
|
||||
TEST(Parallel, NestedTaskGroup) {
|
||||
// This test checks:
|
||||
// 1. Root TaskGroup is in Parallel mode.
|
||||
// 2. Nested TaskGroup is not in Parallel mode.
|
||||
parallel::TaskGroup tg;
|
||||
|
||||
tg.spawn([&]() {
|
||||
EXPECT_TRUE(tg.isParallel() || (parallel::strategy.ThreadsRequested == 1));
|
||||
});
|
||||
EXPECT_TRUE(tg.isParallel() || (parallel::strategy.ThreadsRequested == 1));
|
||||
|
||||
tg.spawn([&]() {
|
||||
parallel::TaskGroup nestedTG;
|
||||
EXPECT_FALSE(nestedTG.isParallel());
|
||||
|
||||
nestedTG.spawn([&]() {
|
||||
// Check that root TaskGroup is in Parallel mode.
|
||||
EXPECT_TRUE(tg.isParallel() ||
|
||||
(parallel::strategy.ThreadsRequested == 1));
|
||||
|
||||
// Check that nested TaskGroup is not in Parallel mode.
|
||||
EXPECT_FALSE(nestedTG.isParallel());
|
||||
});
|
||||
EXPECT_TRUE(nestedTG.isParallel() ||
|
||||
(parallel::strategy.ThreadsRequested == 1));
|
||||
});
|
||||
}
|
||||
|
||||
TEST(Parallel, ParallelNestedTaskGroup) {
|
||||
// This test checks that it is possible to have several TaskGroups
|
||||
// run from different threads in Parallel mode.
|
||||
std::atomic<size_t> Count{0};
|
||||
|
||||
{
|
||||
std::function<void()> Fn = [&]() {
|
||||
parallel::TaskGroup tg;
|
||||
|
||||
tg.spawn([&]() {
|
||||
// Check that root TaskGroup is in Parallel mode.
|
||||
EXPECT_TRUE(tg.isParallel() ||
|
||||
(parallel::strategy.ThreadsRequested == 1));
|
||||
|
||||
// Check that nested TaskGroup is not in Parallel mode.
|
||||
parallel::TaskGroup nestedTG;
|
||||
EXPECT_FALSE(nestedTG.isParallel());
|
||||
++Count;
|
||||
|
||||
nestedTG.spawn([&]() {
|
||||
// Check that root TaskGroup is in Parallel mode.
|
||||
EXPECT_TRUE(tg.isParallel() ||
|
||||
(parallel::strategy.ThreadsRequested == 1));
|
||||
|
||||
// Check that nested TaskGroup is not in Parallel mode.
|
||||
EXPECT_FALSE(nestedTG.isParallel());
|
||||
++Count;
|
||||
});
|
||||
// Verify nested parallelFor doesn't deadlock. This is a simplified version of
|
||||
// the pattern from https://reviews.llvm.org/D61115 that originally motivated
|
||||
// serializing nested TaskGroups. With work-stealing in helpSync(), nested
|
||||
// parallelism now works without deadlock.
|
||||
TEST(Parallel, NestedParallelFor) {
|
||||
std::atomic<uint32_t> count{0};
|
||||
parallelFor(0, 8, [&](size_t i) {
|
||||
parallelFor(0, 8, [&](size_t j) {
|
||||
parallelFor(0, 8, [&](size_t k) {
|
||||
count.fetch_add(1, std::memory_order_relaxed);
|
||||
});
|
||||
};
|
||||
|
||||
DefaultThreadPool Pool;
|
||||
|
||||
Pool.async(Fn);
|
||||
Pool.async(Fn);
|
||||
Pool.async(Fn);
|
||||
Pool.async(Fn);
|
||||
Pool.async(Fn);
|
||||
Pool.async(Fn);
|
||||
|
||||
Pool.wait();
|
||||
}
|
||||
EXPECT_EQ(Count, 12ul);
|
||||
});
|
||||
});
|
||||
EXPECT_EQ(count.load(), 512u);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user