[orc-rt] Add an ExecutorProcessInfo field to Session. (#187155)

This will provide a central location for ORC runtime code (and ORC
runtime API clients) to find executor process information.
This commit is contained in:
Lang Hames 2026-03-18 10:42:22 +11:00 committed by GitHub
parent e732600796
commit 3482480087
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 48 additions and 20 deletions

View File

@ -15,6 +15,7 @@
#include "orc-rt/ControllerInterface.h"
#include "orc-rt/Error.h"
#include "orc-rt/ExecutorProcessInfo.h"
#include "orc-rt/LockedAccess.h"
#include "orc-rt/Service.h"
#include "orc-rt/TaskDispatcher.h"
@ -117,7 +118,7 @@ public:
///
/// Note that entry into the reporter is not synchronized: it may be
/// called from multiple threads concurrently.
Session(std::unique_ptr<TaskDispatcher> Dispatcher,
Session(ExecutorProcessInfo EPI, std::unique_ptr<TaskDispatcher> Dispatcher,
ErrorReporterFn ReportError);
// Sessions are not copyable or moveable.
@ -128,6 +129,10 @@ public:
~Session();
/// Provides information about the host process that the Session is running
/// in.
const ExecutorProcessInfo &processInfo() const noexcept { return EPI; }
/// Dispatch a task using the Session's TaskDispatcher.
void dispatch(std::unique_ptr<Task> T) { Dispatcher->dispatch(std::move(T)); }
@ -204,6 +209,7 @@ private:
static void wrapperReturn(orc_rt_SessionRef S, uint64_t CallId,
orc_rt_WrapperFunctionBuffer ResultBytes);
ExecutorProcessInfo EPI;
std::unique_ptr<TaskDispatcher> Dispatcher;
std::shared_ptr<ControllerAccess> CA;
ErrorReporterFn ReportError;

View File

@ -16,9 +16,11 @@ namespace orc_rt {
Session::ControllerAccess::~ControllerAccess() = default;
Session::Session(std::unique_ptr<TaskDispatcher> Dispatcher,
Session::Session(ExecutorProcessInfo EPI,
std::unique_ptr<TaskDispatcher> Dispatcher,
ErrorReporterFn ReportError)
: Dispatcher(std::move(Dispatcher)), ReportError(std::move(ReportError)) {
: EPI(std::move(EPI)), Dispatcher(std::move(Dispatcher)),
ReportError(std::move(ReportError)) {
std::pair<const char *, void *> InitialSymbols[] = {
{"orc_rt_SessionInstance", static_cast<void *>(this)}};

View File

@ -68,6 +68,10 @@ public:
void doMoreConfig(int) noexcept {}
};
static ExecutorProcessInfo mockExecutorProcessInfo() noexcept {
return ExecutorProcessInfo("arm64-apple-darwin", 16384);
}
class NoDispatcher : public TaskDispatcher {
public:
void dispatch(std::unique_ptr<Task> T) override {
@ -274,14 +278,15 @@ private:
static void noErrors(Error Err) { cantFail(std::move(Err)); }
TEST(SessionTest, TrivialConstructionAndDestruction) {
Session S(std::make_unique<NoDispatcher>(), noErrors);
Session S(mockExecutorProcessInfo(), std::make_unique<NoDispatcher>(),
noErrors);
}
TEST(SessionTest, ReportError) {
Error E = Error::success();
cantFail(std::move(E)); // Force error into checked state.
Session S(std::make_unique<NoDispatcher>(),
Session S(mockExecutorProcessInfo(), std::make_unique<NoDispatcher>(),
[&](Error Err) { E = std::move(Err); });
S.reportError(make_error<StringError>("foo"));
@ -294,7 +299,8 @@ TEST(SessionTest, ReportError) {
TEST(SessionTest, DispatchTask) {
int X = 0;
std::deque<std::unique_ptr<Task>> Tasks;
Session S(std::make_unique<EnqueueingDispatcher>(Tasks), noErrors);
Session S(mockExecutorProcessInfo(),
std::make_unique<EnqueueingDispatcher>(Tasks), noErrors);
EXPECT_EQ(Tasks.size(), 0U);
S.dispatch(makeGenericTask([&]() { ++X; }));
@ -311,7 +317,8 @@ TEST(SessionTest, SingleService) {
std::optional<size_t> ShutdownOpIdx;
{
Session S(std::make_unique<NoDispatcher>(), noErrors);
Session S(mockExecutorProcessInfo(), std::make_unique<NoDispatcher>(),
noErrors);
S.addService(
std::make_unique<MockService>(DetachOpIdx, ShutdownOpIdx, OpIdx));
}
@ -327,7 +334,8 @@ TEST(SessionTest, MultipleServices) {
std::optional<size_t> ShutdownOpIdx[3];
{
Session S(std::make_unique<NoDispatcher>(), noErrors);
Session S(mockExecutorProcessInfo(), std::make_unique<NoDispatcher>(),
noErrors);
for (size_t I = 0; I != 3; ++I)
S.addService(std::make_unique<MockService>(DetachOpIdx[I],
ShutdownOpIdx[I], OpIdx));
@ -354,7 +362,8 @@ TEST(SessionTest, ExpectedShutdownSequence) {
bool DispatcherShutDown = false;
bool SessionShutdownComplete = false;
std::deque<std::unique_ptr<Task>> Tasks;
Session S(std::make_unique<EnqueueingDispatcher>(
Session S(mockExecutorProcessInfo(),
std::make_unique<EnqueueingDispatcher>(
Tasks,
[&]() {
EXPECT_TRUE(ShutdownOpIdx);
@ -376,26 +385,30 @@ TEST(SessionTest, ExpectedShutdownSequence) {
}
TEST(SessionTest, AddServiceAndUseRef) {
Session S(std::make_unique<NoDispatcher>(), noErrors);
Session S(mockExecutorProcessInfo(), std::make_unique<NoDispatcher>(),
noErrors);
auto &CS = S.addService(std::make_unique<ConfigurableService>(42));
CS.doMoreConfig(1);
}
TEST(SessionTest, CreateServiceAndUseRef) {
Session S(std::make_unique<NoDispatcher>(), noErrors);
Session S(mockExecutorProcessInfo(), std::make_unique<NoDispatcher>(),
noErrors);
auto &CS = S.createService<ConfigurableService>(42);
CS.doMoreConfig(1);
}
TEST(SessionTest, ControllerInterfaceContainsSessionByDefault) {
Session S(std::make_unique<NoDispatcher>(), noErrors);
Session S(mockExecutorProcessInfo(), std::make_unique<NoDispatcher>(),
noErrors);
ASSERT_TRUE(S.controllerInterface()->count("orc_rt_SessionInstance"));
EXPECT_EQ(S.controllerInterface()->at("orc_rt_SessionInstance"),
static_cast<void *>(&S));
}
TEST(SessionTest, ControllerInterfaceWithRef) {
Session S(std::make_unique<NoDispatcher>(), noErrors);
Session S(mockExecutorProcessInfo(), std::make_unique<NoDispatcher>(),
noErrors);
int X = 0, Y = 0;
S.controllerInterface().with_ref([&](ControllerInterface &CI) {
std::pair<const char *, void *> Syms[] = {
@ -409,7 +422,8 @@ TEST(SessionTest, ControllerInterfaceWithRef) {
}
TEST(SessionTest, ControllerInterfaceConstAccess) {
Session S(std::make_unique<NoDispatcher>(), noErrors);
Session S(mockExecutorProcessInfo(), std::make_unique<NoDispatcher>(),
noErrors);
int X = 0;
std::pair<const char *, void *> Syms[] = {{"orc_rt_X", &X}};
cantFail(S.controllerInterface()->addSymbolsUnique(Syms));
@ -423,7 +437,8 @@ TEST(ControllerAccessTest, Basics) {
// Test that we can set the ControllerAccess implementation and still shut
// down as expected.
std::deque<std::unique_ptr<Task>> Tasks;
Session S(std::make_unique<EnqueueingDispatcher>(Tasks), noErrors);
Session S(mockExecutorProcessInfo(),
std::make_unique<EnqueueingDispatcher>(Tasks), noErrors);
auto CA = std::make_shared<MockControllerAccess>(S);
S.setController(CA);
@ -445,7 +460,8 @@ static void add_sps_wrapper(orc_rt_SessionRef S, uint64_t CallId,
TEST(ControllerAccessTest, ValidCallToController) {
// Simulate a call to a controller handler.
std::deque<std::unique_ptr<Task>> Tasks;
Session S(std::make_unique<EnqueueingDispatcher>(Tasks), noErrors);
Session S(mockExecutorProcessInfo(),
std::make_unique<EnqueueingDispatcher>(Tasks), noErrors);
auto CA = std::make_shared<MockControllerAccess>(S);
S.setController(CA);
@ -464,7 +480,8 @@ TEST(ControllerAccessTest, ValidCallToController) {
TEST(ControllerAccessTest, CallToControllerBeforeAttach) {
// Expect calls to the controller prior to attaching to fail.
std::deque<std::unique_ptr<Task>> Tasks;
Session S(std::make_unique<EnqueueingDispatcher>(Tasks), noErrors);
Session S(mockExecutorProcessInfo(),
std::make_unique<EnqueueingDispatcher>(Tasks), noErrors);
Error Err = Error::success();
SPSWrapperFunction<int32_t(int32_t, int32_t)>::call(
@ -483,7 +500,8 @@ TEST(ControllerAccessTest, CallToControllerBeforeAttach) {
TEST(ControllerAccessTest, CallToControllerAfterDetach) {
// Expect calls to the controller prior to attaching to fail.
std::deque<std::unique_ptr<Task>> Tasks;
Session S(std::make_unique<EnqueueingDispatcher>(Tasks), noErrors);
Session S(mockExecutorProcessInfo(),
std::make_unique<EnqueueingDispatcher>(Tasks), noErrors);
auto CA = std::make_shared<MockControllerAccess>(S);
S.setController(CA);
@ -506,7 +524,8 @@ TEST(ControllerAccessTest, CallToControllerAfterDetach) {
TEST(ControllerAccessTest, CallFromController) {
// Simulate a call from the controller.
std::deque<std::unique_ptr<Task>> Tasks;
Session S(std::make_unique<EnqueueingDispatcher>(Tasks), noErrors);
Session S(mockExecutorProcessInfo(),
std::make_unique<EnqueueingDispatcher>(Tasks), noErrors);
auto CA = std::make_shared<MockControllerAccess>(S);
S.setController(CA);
@ -525,7 +544,8 @@ TEST(ControllerAccessTest, CallFromController) {
TEST(ControllerAccessTest, RedundantAsyncShutdown) {
// Check that redundant calls to shutdown have their callbacks run.
std::deque<std::unique_ptr<Task>> Tasks;
Session S(std::make_unique<EnqueueingDispatcher>(Tasks), noErrors);
Session S(mockExecutorProcessInfo(),
std::make_unique<EnqueueingDispatcher>(Tasks), noErrors);
S.waitForShutdown();
bool RedundantCallbackRan = false;