From eea62159e853b59a4e4e69da22175222ccd8c663 Mon Sep 17 00:00:00 2001 From: Joseph Huber Date: Wed, 19 Nov 2025 15:56:25 -0600 Subject: [PATCH] [Offload] Make the RPC thread sleep briefly when idle (#168596) Summary: We start this thread if the RPC client symbol is detected in the loaded binary. We should make this sleep if there's no work to avoid the thread running at high priority when the (scarecely used) RPC call is actually required. So, right now after 25 microseconds we will assume the server is inactive and begin sleeping. This resets once we do find work. AMD supports a more intelligent way to do this. HSA signals can wake a sleeping thread from the kernel, and signals can be sent from the GPU side. This would be nice to have and I'm planning on working with it in the future to make this infrastructure more usable with existing AMD workloads. --- offload/plugins-nextgen/common/src/RPC.cpp | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/offload/plugins-nextgen/common/src/RPC.cpp b/offload/plugins-nextgen/common/src/RPC.cpp index e19f2ef94de6..8d6704733970 100644 --- a/offload/plugins-nextgen/common/src/RPC.cpp +++ b/offload/plugins-nextgen/common/src/RPC.cpp @@ -83,7 +83,8 @@ static rpc::Status handleOffloadOpcodes(plugin::GenericDeviceTy &Device, return rpc::RPC_ERROR; } -static rpc::Status runServer(plugin::GenericDeviceTy &Device, void *Buffer) { +static rpc::Status runServer(plugin::GenericDeviceTy &Device, void *Buffer, + bool &ClientInUse) { uint64_t NumPorts = std::min(Device.requestedRPCPortCount(), rpc::MAX_PORT_COUNT); rpc::Server Server(NumPorts, Buffer); @@ -92,6 +93,7 @@ static rpc::Status runServer(plugin::GenericDeviceTy &Device, void *Buffer) { if (!Port) return rpc::RPC_SUCCESS; + ClientInUse = true; rpc::Status Status = handleOffloadOpcodes(Device, *Port, Device.getWarpSize()); @@ -99,7 +101,6 @@ static rpc::Status runServer(plugin::GenericDeviceTy &Device, void *Buffer) { if (Status == rpc::RPC_UNHANDLED_OPCODE) Status = LIBC_NAMESPACE::shared::handle_libc_opcodes(*Port, Device.getWarpSize()); - Port->close(); return Status; @@ -122,7 +123,11 @@ void RPCServerTy::ServerThread::shutDown() { } void RPCServerTy::ServerThread::run() { + static constexpr auto IdleTime = std::chrono::microseconds(25); + static constexpr auto IdleSleep = std::chrono::microseconds(250); std::unique_lock Lock(Mutex); + + auto LastUse = std::chrono::steady_clock::now(); for (;;) { CV.wait(Lock, [&]() { return NumUsers.load(std::memory_order_acquire) > 0 || @@ -133,15 +138,25 @@ void RPCServerTy::ServerThread::run() { return; Lock.unlock(); + bool ClientInUse = false; while (NumUsers.load(std::memory_order_relaxed) > 0 && Running.load(std::memory_order_relaxed)) { + + // Suspend this thread briefly if there is no current work. + auto Now = std::chrono::steady_clock::now(); + if (!ClientInUse && Now - LastUse >= IdleTime) + std::this_thread::sleep_for(IdleSleep); + else if (ClientInUse) + LastUse = Now; + + ClientInUse = false; std::lock_guard Lock(BufferMutex); for (const auto &[Buffer, Device] : llvm::zip_equal(Buffers, Devices)) { if (!Buffer || !Device) continue; // If running the server failed, print a message but keep running. - if (runServer(*Device, Buffer) != rpc::RPC_SUCCESS) + if (runServer(*Device, Buffer, ClientInUse) != rpc::RPC_SUCCESS) FAILURE_MESSAGE("Unhandled or invalid RPC opcode!"); } }