[orc-rt] Add TaskGroup for tracking completion of a set of tasks. (#187205)

TaskGroup provides a mechanism for tracking execution of multiple
concurrent tasks and receiving notification when all tasks have
completed. This is useful for coordinating asynchronous operations in
the ORC runtime.

TaskGroup::Token is an RAII handle representing participation in a
group. The group cannot complete while any valid (non-default) Token
exists.

TaskGroup::addOnComplete registers callbacks to run when the group
closes and all tokens are released. (Callbacks registered after
completion run immediately).

TalkGroup::close seals the group: no new tokens can be acquired after
close is called.

All methods may be called concurrently from multiple threads.
This commit is contained in:
Lang Hames 2026-03-18 18:55:15 +11:00 committed by GitHub
parent 76d5704633
commit 0f622c507e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 583 additions and 0 deletions

View File

@ -29,6 +29,7 @@ set(ORC_RT_HEADERS
orc-rt/SPSMemoryFlags.h
orc-rt/SPSWrapperFunction.h
orc-rt/SPSWrapperFunctionBuffer.h
orc-rt/TaskGroup.h
orc-rt/TaskDispatcher.h
orc-rt/ThreadPoolTaskDispatcher.h
orc-rt/WrapperFunction.h

View File

@ -0,0 +1,203 @@
//===--- TaskGroup.h - Tracks completion of a group of tasks ---*- C++ -*-===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
//
// TaskGroup and related APIs.
//
//===----------------------------------------------------------------------===//
#ifndef ORC_RT_TASKGROUP_H
#define ORC_RT_TASKGROUP_H
#include "move_only_function.h"
#include <cassert>
#include <memory>
#include <mutex>
#include <vector>
namespace orc_rt {
/// TaskGroup tracks execution of a set of tasks, providing notification when
/// all tasks have completed.
class TaskGroup {
public:
/// Token represents the right to proceed with a task as part of a
/// TaskGroup.
///
/// Construction (from a TaskGroup or by copy) may fail if the group is
/// closed. Always check validity with operator bool() before proceeding:
///
/// Token T(TG);
/// if (!T) return; // Group was closed
///
/// WARNING: Avoid storing Tokens in long-lived data structures. The TaskGroup
/// cannot complete while any Token exists, so stashing copies may
/// unintentionally defer completion.
class Token {
public:
/// Construct an empty Token not associated with any TaskGroup.
Token() = default;
/// Attempt to create a copy of the given Token.
/// Note that this may fail if the TaskGroup has been closed. Clients must
/// check whether the Token is valid (using operator bool()) before
/// continuing with their task.
Token(const Token &Other) {
if (Other.G && Other.G->acquireToken())
G = Other.G;
}
/// Attempt to overwrite this Token.
/// Note that this will:
/// 1. Trigger task group completion if this Token represented the last
/// running task in the TaskGroup and Other is an empty Token.
/// 2. Fail if the TaskGroup referenced by Other has been closed. Clients
/// must check whether the Token is valid (using operator bool())
/// before continuing with their task.
Token &operator=(const Token &Other) {
if (&Other == this)
return *this;
if (G)
G->releaseToken();
if (Other.G && Other.G->acquireToken())
G = Other.G;
else
G = nullptr;
return *this;
}
/// Move-construct from Other.
Token(Token &&Other) { std::swap(G, Other.G); }
/// Move-assign from Other.
///
/// Note that this will trigger task group completion if this Token
/// represented the last running task in the TaskGroup and Other is an
/// empty Token.
Token &operator=(Token &&Other) {
if (this == &Other)
return *this;
if (G) {
G->releaseToken();
G = nullptr;
}
std::swap(G, Other.G);
return *this;
}
/// Construct a Token from the given TaskGroup.
/// Note that this may fail if the TaskGroup has been closed. Clients must
/// check whether the resulting Token is valid (using operator bool())
/// before continuing with their task.
Token(std::shared_ptr<TaskGroup> G) {
if (G && G->acquireToken())
this->G = std::move(G);
}
/// Destroys this Token, potentially triggering task group completion if
/// this Token represented the last running task in the TaskGroup.
~Token() {
if (G)
G->releaseToken();
}
/// Returns true if this Token is valid and attached to a task group.
explicit operator bool() const noexcept { return !!G; }
private:
std::shared_ptr<TaskGroup> G;
};
using OnCompleteFn = move_only_function<void()>;
TaskGroup(const TaskGroup &) = delete;
TaskGroup &operator=(const TaskGroup &) = delete;
TaskGroup(TaskGroup &&) = delete;
TaskGroup &operator=(TaskGroup &&) = delete;
static std::shared_ptr<TaskGroup> Create() noexcept {
return std::shared_ptr<TaskGroup>(new TaskGroup());
}
/// Increment the number of tasks in this group if it is still open.
/// Returns true on success, false on failure.
bool acquireToken() noexcept {
std::scoped_lock<std::mutex> Lock(M);
if (Closed)
return false;
++NumTasks;
return true;
}
/// Decrement the number of tasks in this group. This will trigger any
/// OnComplete callbacks if the TaskGroup has been closed and the count
/// reaches zero.
void releaseToken() noexcept {
std::vector<OnCompleteFn> ToRun;
{
std::scoped_lock<std::mutex> Lock(M);
assert(NumTasks > 0 && "TaskCount is invalid");
--NumTasks;
if (NumTasks == 0 && Closed)
ToRun = std::move(OnCompletes);
}
if (ToRun.empty())
return;
runOnCompletes(std::move(ToRun));
}
/// Close the TaskGroup. No new Tokens will be issued. OnComplete callbacks
/// will be run once the task count reaches zero.
void close() {
std::vector<OnCompleteFn> ToRun;
{
std::scoped_lock<std::mutex> Lock(M);
Closed = true;
if (NumTasks == 0)
ToRun = std::move(OnCompletes);
}
if (ToRun.empty())
return;
runOnCompletes(std::move(ToRun));
}
/// Register an OnComplete callback. The given callback will be run once the
/// group is closed and all tasks in it have completed.
void addOnComplete(OnCompleteFn OnComplete) {
assert(OnComplete && "OnComplete cannot be null");
{
std::scoped_lock<std::mutex> Lock(M);
if (!Closed || NumTasks > 0) {
OnCompletes.push_back(std::move(OnComplete));
return;
}
}
assert(OnComplete && "OnComplete should still be present here");
OnComplete();
}
private:
TaskGroup() noexcept = default;
static void runOnCompletes(std::vector<OnCompleteFn> ToRun) {
// TODO: Exception handling
for (auto &OnComplete : ToRun)
OnComplete();
}
std::mutex M;
bool Closed = false;
size_t NumTasks = 0;
std::vector<OnCompleteFn> OnCompletes;
};
} // namespace orc_rt
#endif // ORC_RT_TASKGROUP_H

View File

@ -39,6 +39,7 @@ add_orc_rt_unittest(CoreTests
SPSMemoryFlagsTest.cpp
SPSWrapperFunctionTest.cpp
SPSWrapperFunctionBufferTest.cpp
TaskGroupTest.cpp
ThreadPoolTaskDispatcherTest.cpp
WrapperFunctionBufferTest.cpp
bind-test.cpp

View File

@ -0,0 +1,378 @@
//===- TaskGroupTest.cpp --------------------------------------------------===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
//
// Tests for orc-rt's TaskGroup.h APIs.
//
//===----------------------------------------------------------------------===//
#include "orc-rt/TaskGroup.h"
#include "gtest/gtest.h"
#include <atomic>
#include <chrono>
#include <thread>
#include <vector>
using namespace orc_rt;
TEST(TaskGroupTest, TrivialConstructionAndDestruction) {
auto TG = TaskGroup::Create();
}
TEST(TaskGroupTest, SingleTokenThenClose) {
bool Completed = false;
auto TG = TaskGroup::Create();
TG->addOnComplete([&]() { Completed = true; });
{
TaskGroup::Token T(TG);
EXPECT_TRUE(T);
TG->close();
EXPECT_FALSE(Completed);
}
EXPECT_TRUE(Completed);
}
TEST(TaskGroupTest, CloseWithNoTokens) {
bool Completed = false;
auto TG = TaskGroup::Create();
TG->addOnComplete([&]() { Completed = true; });
TG->close();
EXPECT_TRUE(Completed);
}
TEST(TaskGroupTest, TokenFromClosedGroup) {
auto TG = TaskGroup::Create();
TG->close();
TaskGroup::Token T(TG);
EXPECT_FALSE(T);
}
TEST(TaskGroupTest, TokenFromNullSharedPtr) {
std::shared_ptr<TaskGroup> TG;
TaskGroup::Token T(TG);
EXPECT_FALSE(T);
}
TEST(TaskGroupTest, CopyToken) {
bool Completed = false;
auto TG = TaskGroup::Create();
TG->addOnComplete([&]() { Completed = true; });
{
TaskGroup::Token T1(TG);
EXPECT_TRUE(T1);
{
TaskGroup::Token T2(T1); // Copy increments count
EXPECT_TRUE(T2);
TG->close();
EXPECT_FALSE(Completed);
}
EXPECT_FALSE(Completed); // T1 still alive
}
EXPECT_TRUE(Completed);
}
TEST(TaskGroupTest, MoveToken) {
bool Completed = false;
auto TG = TaskGroup::Create();
TG->addOnComplete([&]() { Completed = true; });
TaskGroup::Token T1(TG);
TaskGroup::Token T2(std::move(T1));
EXPECT_FALSE(T1);
EXPECT_TRUE(T2);
TG->close();
EXPECT_FALSE(Completed);
T2 = TaskGroup::Token(); // Release
EXPECT_TRUE(Completed);
}
TEST(TaskGroupTest, CopyAssignmentReleasesOld) {
bool Completed1 = false;
bool Completed2 = false;
auto TG1 = TaskGroup::Create();
auto TG2 = TaskGroup::Create();
TG1->addOnComplete([&]() { Completed1 = true; });
TG2->addOnComplete([&]() { Completed2 = true; });
TaskGroup::Token T1(TG1);
TaskGroup::Token T2(TG2);
TG1->close();
TG2->close();
EXPECT_FALSE(Completed1);
EXPECT_FALSE(Completed2);
T1 = T2; // Releases TG1, acquires TG2
EXPECT_TRUE(Completed1); // TG1 should complete
EXPECT_FALSE(Completed2); // TG2 still has T1 and T2
}
TEST(TaskGroupTest, CopyAssignmentFromClosedGroup) {
bool Completed = false;
auto TG1 = TaskGroup::Create();
auto TG2 = TaskGroup::Create();
TG1->addOnComplete([&]() { Completed = true; });
TG2->close();
TaskGroup::Token T1(TG1);
TaskGroup::Token T2(TG2);
EXPECT_TRUE(T1);
EXPECT_FALSE(T2);
TG1->close();
EXPECT_FALSE(Completed);
T1 = T2; // Assign from empty, releases TG1
EXPECT_FALSE(T1);
EXPECT_TRUE(Completed);
}
TEST(TaskGroupTest, MoveAssignmentReleasesOld) {
bool Completed1 = false;
bool Completed2 = false;
auto TG1 = TaskGroup::Create();
auto TG2 = TaskGroup::Create();
TG1->addOnComplete([&]() { Completed1 = true; });
TG2->addOnComplete([&]() { Completed2 = true; });
TaskGroup::Token T1(TG1);
TaskGroup::Token T2(TG2);
TG1->close();
TG2->close();
EXPECT_FALSE(Completed1);
EXPECT_FALSE(Completed2);
T1 = std::move(T2); // Releases TG1, takes TG2 from T2
EXPECT_TRUE(Completed1); // TG1 should complete
EXPECT_FALSE(Completed2); // TG2 now held by T1
EXPECT_FALSE(T2); // T2 is now empty
}
TEST(TaskGroupTest, SelfCopyAssignment) {
auto TG = TaskGroup::Create();
TaskGroup::Token T(TG);
EXPECT_TRUE(T);
#if defined(__clang__)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wself-assign-overloaded"
#endif
T = T; // Self-assign
#if defined(__clang__)
#pragma clang diagnostic pop
#endif
EXPECT_TRUE(T); // Should still be valid
TG->close();
}
TEST(TaskGroupTest, SelfMoveAssignment) {
auto TG = TaskGroup::Create();
TaskGroup::Token T(TG);
EXPECT_TRUE(T);
#if defined(__clang__)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wself-move"
#endif
T = std::move(T); // Self-move-assign
#if defined(__clang__)
#pragma clang diagnostic pop
#endif
EXPECT_TRUE(T); // Should still be valid
TG->close();
}
TEST(TaskGroupTest, AddOnCompleteAfterCompletion) {
auto TG = TaskGroup::Create();
TG->close();
bool Completed = false;
TG->addOnComplete([&]() { Completed = true; });
EXPECT_TRUE(Completed); // Runs immediately
}
TEST(TaskGroupTest, AddOnCompleteWhileTasksPending) {
auto TG = TaskGroup::Create();
TaskGroup::Token T(TG);
TG->close();
bool Completed = false;
TG->addOnComplete([&]() { Completed = true; });
EXPECT_FALSE(Completed); // Queued, not run yet
T = TaskGroup::Token(); // Release
EXPECT_TRUE(Completed);
}
TEST(TaskGroupTest, MultipleCallbacks) {
std::vector<int> Order;
auto TG = TaskGroup::Create();
TG->addOnComplete([&]() { Order.push_back(1); });
TG->addOnComplete([&]() { Order.push_back(2); });
TG->addOnComplete([&]() { Order.push_back(3); });
TG->close();
ASSERT_EQ(Order.size(), 3u);
EXPECT_EQ(Order[0], 1);
EXPECT_EQ(Order[1], 2);
EXPECT_EQ(Order[2], 3);
}
TEST(TaskGroupTest, MultipleTokens) {
int CompletionCount = 0;
auto TG = TaskGroup::Create();
TG->addOnComplete([&]() { CompletionCount++; });
{
TaskGroup::Token T1(TG);
TaskGroup::Token T2(TG);
TaskGroup::Token T3(TG);
TG->close();
EXPECT_EQ(CompletionCount, 0);
}
EXPECT_EQ(CompletionCount, 1); // Only fires once
}
TEST(TaskGroupTest, CloseIsIdempotent) {
int CompletionCount = 0;
auto TG = TaskGroup::Create();
TG->addOnComplete([&]() { CompletionCount++; });
TG->close();
TG->close();
TG->close();
EXPECT_EQ(CompletionCount, 1);
}
TEST(TaskGroupTest, AcquireAfterCloseViaDirectAPI) {
auto TG = TaskGroup::Create();
EXPECT_TRUE(TG->acquireToken());
TG->close();
EXPECT_FALSE(TG->acquireToken());
TG->releaseToken(); // Release the one we acquired
}
TEST(TaskGroupTest, DirectAPIMatchesRAII) {
bool Completed = false;
auto TG = TaskGroup::Create();
TG->addOnComplete([&]() { Completed = true; });
TG->acquireToken();
TG->acquireToken();
TG->close();
EXPECT_FALSE(Completed);
TG->releaseToken();
EXPECT_FALSE(Completed);
TG->releaseToken();
EXPECT_TRUE(Completed);
}
TEST(TaskGroupTest, TokenKeepsTaskGroupAlive) {
TaskGroup::Token T;
bool Completed = false;
{
auto TG = TaskGroup::Create();
TG->addOnComplete([&]() { Completed = true; });
T = TaskGroup::Token(TG);
TG->close();
// TG goes out of scope here, but T holds a shared_ptr
}
EXPECT_FALSE(Completed); // Still pending - T keeps TG alive
T = TaskGroup::Token(); // Release
EXPECT_TRUE(Completed);
}
TEST(TaskGroupTest, ConcurrentTokens) {
for (int Iter = 0; Iter < 100; ++Iter) {
std::atomic<int> Count{0};
auto TG = TaskGroup::Create();
TG->addOnComplete([&]() { Count++; });
std::vector<std::thread> Threads;
for (int I = 0; I < 10; ++I) {
TaskGroup::Token T(TG);
Threads.emplace_back([T = std::move(T)]() {
std::this_thread::sleep_for(std::chrono::microseconds(10));
});
}
TG->close();
for (auto &T : Threads)
T.join();
EXPECT_EQ(Count, 1);
}
}
TEST(TaskGroupTest, ConcurrentAddOnCompleteAndClose) {
for (int Iter = 0; Iter < 100; ++Iter) {
std::atomic<int> Count{0};
auto TG = TaskGroup::Create();
std::thread Closer([&]() { TG->close(); });
std::thread Registerer([&]() { TG->addOnComplete([&]() { Count++; }); });
Closer.join();
Registerer.join();
// Callback should have run exactly once regardless of order
EXPECT_EQ(Count, 1);
}
}
TEST(TaskGroupTest, ConcurrentAcquireAndClose) {
for (int Iter = 0; Iter < 100; ++Iter) {
std::atomic<int> SuccessfulAcquires{0};
std::atomic<int> CompletionCount{0};
auto TG = TaskGroup::Create();
TG->addOnComplete([&]() { CompletionCount++; });
std::vector<std::thread> Threads;
// Multiple threads trying to acquire
for (int I = 0; I < 5; ++I) {
Threads.emplace_back([&, T = TaskGroup::Token(TG)]() {
if (T)
SuccessfulAcquires++;
std::this_thread::sleep_for(std::chrono::milliseconds(5));
});
}
// One thread closing
Threads.emplace_back([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(2));
TG->close();
});
for (auto &T : Threads)
T.join();
// Should complete exactly once
EXPECT_EQ(CompletionCount, 1);
// At least some acquires should have succeeded before close
EXPECT_EQ(SuccessfulAcquires, 5);
}
}