[lldb] Refactor JSONTransport own MainLoop read handle. (#179564)

When working with a MainLoop, if the file reaches the EOF it will
immediately fire the read handle callback. We cannot readily determine
if the file is at EOF or if the file is pointing to a socket/pipe that
has consumed all the current data in the buffer but the remote end has
not yet hung up. This is causing JSONTransport to continuously fire the
OnRead callback trigging repeated calls to the
`MessageHandler::OnClose`.

Since MainLoop does not perform the actual read, we need to adjust the
behavior of JSONTransport to fully own the read handle.

This change moves the ownership of the `MainLoop::ReadHandleUP` and
additionally own a reference to the `MainLoop` itself to ensure the loop
outlives the JSONTransport object.

This allows us to remove the handle immediately when we detect an EOF /
hang up has occurred.
This commit is contained in:
John Harrison 2026-02-06 10:07:09 -08:00 committed by GitHub
parent 5efb69f7c3
commit 69878f9fa1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 77 additions and 111 deletions

View File

@ -166,8 +166,7 @@ public:
///
/// If an unexpected error occurs, the MainLoop will be terminated and a log
/// message will include additional information about the termination reason.
virtual llvm::Expected<MainLoop::ReadHandleUP>
RegisterMessageHandler(MainLoop &loop, MessageHandler &handler) = 0;
virtual llvm::Error RegisterMessageHandler(MessageHandler &handler) = 0;
protected:
template <typename... Ts> inline auto Logv(const char *Fmt, Ts &&...Vals) {
@ -182,29 +181,27 @@ public:
using Message = typename JSONTransport<Proto>::Message;
using MessageHandler = typename JSONTransport<Proto>::MessageHandler;
IOTransport(lldb::IOObjectSP in, lldb::IOObjectSP out)
: m_in(in), m_out(out) {}
IOTransport(MainLoop &loop, lldb::IOObjectSP in, lldb::IOObjectSP out)
: m_loop(loop), m_in(in), m_out(out) {}
llvm::Error Send(const typename Proto::Evt &evt) override {
return Write(evt);
}
llvm::Error Send(const typename Proto::Req &req) override {
return Write(req);
}
llvm::Error Send(const typename Proto::Resp &resp) override {
return Write(resp);
}
llvm::Expected<MainLoop::ReadHandleUP>
RegisterMessageHandler(MainLoop &loop, MessageHandler &handler) override {
llvm::Error RegisterMessageHandler(MessageHandler &handler) override {
Status status;
MainLoop::ReadHandleUP read_handle = loop.RegisterReadObject(
m_read_handle = m_loop.RegisterReadObject(
m_in, [this, &handler](MainLoopBase &base) { OnRead(base, handler); },
status);
if (status.Fail()) {
return status.takeError();
}
return read_handle;
return status.takeError();
}
/// Public for testing purposes, otherwise this should be an implementation
@ -263,11 +260,15 @@ private:
handler.OnError(llvm::make_error<TransportUnhandledContentsError>(
std::string(m_buffer.str())));
handler.OnClosed();
// On EOF, remove the read handle from the MainLoop.
m_read_handle.reset();
}
}
MainLoop &m_loop;
lldb::IOObjectSP m_in;
lldb::IOObjectSP m_out;
MainLoop::ReadHandleUP m_read_handle;
};
/// A transport class for JSON with a HTTP header.

View File

@ -21,6 +21,7 @@
#include "llvm/Support/FormatVariadic.h"
#include "llvm/Support/JSON.h"
#include "llvm/Support/Signals.h"
#include <map>
#include <memory>
#include <string>
#include <vector>
@ -40,7 +41,7 @@ public:
void AddTool(std::unique_ptr<Tool> tool);
void AddResourceProvider(std::unique_ptr<ResourceProvider> resource_provider);
llvm::Error Accept(lldb_private::MainLoop &, MCPTransportUP);
llvm::Error Accept(MCPTransportUP);
protected:
MCPBinderUP Bind(MCPTransport &);
@ -70,7 +71,6 @@ private:
LogCallback m_log_callback;
struct Client {
ReadHandleUP handle;
MCPTransportUP transport;
MCPBinderUP binder;
};

View File

@ -83,8 +83,8 @@ using LogCallback = llvm::unique_function<void(llvm::StringRef message)>;
class Transport final
: public lldb_private::transport::JSONRPCTransport<ProtocolDescriptor> {
public:
Transport(lldb::IOObjectSP in, lldb::IOObjectSP out,
LogCallback log_callback = {});
Transport(lldb_private::MainLoop &loop, lldb::IOObjectSP in,
lldb::IOObjectSP out, LogCallback log_callback = {});
virtual ~Transport() = default;
/// Transport is not copyable.

View File

@ -66,11 +66,11 @@ void ProtocolServerMCP::AcceptCallback(std::unique_ptr<Socket> socket) {
lldb::IOObjectSP io_sp = std::move(socket);
auto transport_up = std::make_unique<lldb_protocol::mcp::Transport>(
io_sp, io_sp, [client_name](llvm::StringRef message) {
m_loop, io_sp, io_sp, [client_name](llvm::StringRef message) {
LLDB_LOG(GetLog(LLDBLog::Host), "{0}: {1}", client_name, message);
});
if (auto error = m_server->Accept(m_loop, std::move(transport_up)))
if (auto error = m_server->Accept(std::move(transport_up)))
LLDB_LOG_ERROR(log, std::move(error), "{0}:");
}

View File

@ -13,11 +13,12 @@
#include "lldb/Host/MainLoop.h"
#include "lldb/Host/Socket.h"
#include "lldb/Protocol/MCP/Server.h"
#include "lldb/Protocol/MCP/Transport.h"
#include <map>
#include "llvm/ADT/StringRef.h"
#include "llvm/Support/Error.h"
#include <cstddef>
#include <memory>
#include <mutex>
#include <thread>
#include <tuple>
#include <vector>
namespace lldb_private::mcp {

View File

@ -144,7 +144,7 @@ MCPBinderUP Server::Bind(MCPTransport &transport) {
return binder_up;
}
llvm::Error Server::Accept(MainLoop &loop, MCPTransportUP transport) {
llvm::Error Server::Accept(MCPTransportUP transport) {
MCPBinderUP binder = Bind(*transport);
MCPTransport *transport_ptr = transport.get();
binder->OnDisconnect([this, transport_ptr]() {
@ -156,12 +156,10 @@ llvm::Error Server::Accept(MainLoop &loop, MCPTransportUP transport) {
Logv("Transport error: {0}", llvm::toString(std::move(err)));
});
auto handle = transport->RegisterMessageHandler(loop, *binder);
if (!handle)
return handle.takeError();
if (llvm::Error err = transport->RegisterMessageHandler(*binder))
return err;
m_instances[transport_ptr] =
Client{std::move(*handle), std::move(transport), std::move(binder)};
m_instances[transport_ptr] = Client{std::move(transport), std::move(binder)};
return llvm::Error::success();
}

View File

@ -13,9 +13,10 @@
using namespace lldb_protocol::mcp;
using namespace llvm;
Transport::Transport(lldb::IOObjectSP in, lldb::IOObjectSP out,
LogCallback log_callback)
: JSONRPCTransport(in, out), m_log_callback(std::move(log_callback)) {}
Transport::Transport(lldb_private::MainLoop &loop, lldb::IOObjectSP in,
lldb::IOObjectSP out, LogCallback log_callback)
: JSONRPCTransport(loop, in, out), m_log_callback(std::move(log_callback)) {
}
void Transport::Log(StringRef message) {
if (m_log_callback)

View File

@ -1051,9 +1051,8 @@ void DAP::TransportHandler() {
m_queue_cv.notify_all();
});
auto handle = transport.RegisterMessageHandler(m_loop, *this);
if (!handle) {
DAP_LOG_ERROR(log, handle.takeError(),
if (llvm::Error err = transport.RegisterMessageHandler(*this)) {
DAP_LOG_ERROR(log, std::move(err),
"registering message handler failed: {0}");
std::lock_guard<std::mutex> guard(m_queue_mutex);
m_error_occurred = true;

View File

@ -17,9 +17,9 @@ using namespace lldb_private;
namespace lldb_dap {
Transport::Transport(lldb_dap::Log &log, lldb::IOObjectSP input,
lldb::IOObjectSP output)
: HTTPDelimitedJSONTransport(input, output), m_log(log) {}
Transport::Transport(lldb_dap::Log &log, lldb_private::MainLoop &loop,
lldb::IOObjectSP input, lldb::IOObjectSP output)
: HTTPDelimitedJSONTransport(loop, input, output), m_log(log) {}
void Transport::Log(llvm::StringRef message) {
// Emit the message directly, since this log was forwarded.

View File

@ -35,8 +35,8 @@ class Transport final
: public lldb_private::transport::HTTPDelimitedJSONTransport<
ProtocolDescriptor> {
public:
Transport(lldb_dap::Log &log, lldb::IOObjectSP input,
lldb::IOObjectSP output);
Transport(lldb_dap::Log &log, lldb_private::MainLoop &loop,
lldb::IOObjectSP input, lldb::IOObjectSP output);
virtual ~Transport() = default;
void Log(llvm::StringRef message) override;

View File

@ -47,7 +47,6 @@
#include "llvm/Support/Threading.h"
#include "llvm/Support/WithColor.h"
#include "llvm/Support/raw_ostream.h"
#include <condition_variable>
#include <cstddef>
#include <cstdio>
#include <cstdlib>
@ -463,7 +462,7 @@ static llvm::Error serveConnection(
DAP_LOG(client_log, "client connected");
MainLoop loop;
Transport transport(client_log, io, io);
Transport transport(client_log, loop, io, io);
DAP dap(client_log, default_repl_mode, pre_init_commands, no_lldbinit,
client_name, transport, loop);
@ -738,7 +737,7 @@ int main(int argc, char *argv[]) {
constexpr llvm::StringLiteral client_name = "stdio";
MainLoop loop;
Log client_log = log.WithPrefix("(stdio)");
Transport transport(client_log, input, output);
Transport transport(client_log, loop, input, output);
DAP dap(client_log, default_repl_mode, pre_init_commands, no_lldbinit,
client_name, transport, loop);

View File

@ -35,7 +35,7 @@ using lldb_private::MainLoop;
using lldb_private::Pipe;
void TransportBase::SetUp() {
std::tie(to_client, to_server) = TestDAPTransport::createPair();
std::tie(to_client, to_server) = TestDAPTransport::createPair(loop);
log = std::make_unique<Log>(llvm::outs(), log_mutex);
dap = std::make_unique<DAP>(
@ -46,13 +46,8 @@ void TransportBase::SetUp() {
/*client_name=*/"test_client",
/*transport=*/*to_client, /*loop=*/loop);
auto server_handle = to_server->RegisterMessageHandler(loop, *dap);
EXPECT_THAT_EXPECTED(server_handle, Succeeded());
handles[0] = std::move(*server_handle);
auto client_handle = to_client->RegisterMessageHandler(loop, client);
EXPECT_THAT_EXPECTED(client_handle, Succeeded());
handles[1] = std::move(*client_handle);
EXPECT_THAT_ERROR(to_server->RegisterMessageHandler(*dap), Succeeded());
EXPECT_THAT_ERROR(to_client->RegisterMessageHandler(client), Succeeded());
}
void TransportBase::Run() {

View File

@ -59,7 +59,6 @@ protected:
lldb_private::SubsystemRAII<lldb_private::FileSystem, lldb_private::HostInfo>
subsystems;
lldb_private::MainLoop loop;
lldb_private::MainLoop::ReadHandleUP handles[2];
std::unique_ptr<lldb_dap::Log> log;
lldb_dap::Log::Mutex log_mutex;

View File

@ -247,19 +247,22 @@ template <typename T> class JSONTransportTest : public PipePairTest {
protected:
SubsystemRAII<FileSystem> subsystems;
MainLoop loop;
test_protocol::MessageHandler message_handler;
std::unique_ptr<T> transport;
MainLoop loop;
void SetUp() override {
PipePairTest::SetUp();
transport = std::make_unique<T>(
std::make_shared<NativeFile>(input.GetReadFileDescriptor(),
loop,
std::make_shared<NativeFile>(input.ReleaseReadFileDescriptor(),
File::eOpenOptionReadOnly,
NativeFile::Unowned),
std::make_shared<NativeFile>(output.GetWriteFileDescriptor(),
NativeFile::Owned),
std::make_shared<NativeFile>(output.ReleaseWriteFileDescriptor(),
File::eOpenOptionWriteOnly,
NativeFile::Unowned));
NativeFile::Owned));
EXPECT_THAT_ERROR(transport->RegisterMessageHandler(message_handler),
Succeeded());
}
/// Run the transport MainLoop and return any messages received.
@ -272,17 +275,13 @@ protected:
loop.RequestTermination();
});
}
bool addition_succeeded = loop.AddCallback(
bool registered_timeout = loop.AddCallback(
[](MainLoopBase &loop) {
loop.RequestTermination();
FAIL() << "timeout";
},
timeout);
EXPECT_TRUE(addition_succeeded);
auto handle = transport->RegisterMessageHandler(loop, message_handler);
if (!handle)
return handle.takeError();
EXPECT_TRUE(registered_timeout);
return loop.Run().takeError();
}
@ -360,14 +359,13 @@ protected:
MainLoop loop;
void SetUp() override {
std::tie(to_remote, from_remote) = test_protocol::Transport::createPair();
std::tie(to_remote, from_remote) =
test_protocol::Transport::createPair(loop);
binder = std::make_unique<test_protocol::Binder>(*to_remote);
auto binder_handle = to_remote->RegisterMessageHandler(loop, remote);
EXPECT_THAT_EXPECTED(binder_handle, Succeeded());
auto remote_handle = from_remote->RegisterMessageHandler(loop, *binder);
EXPECT_THAT_EXPECTED(remote_handle, Succeeded());
EXPECT_THAT_ERROR(to_remote->RegisterMessageHandler(remote), Succeeded());
EXPECT_THAT_ERROR(from_remote->RegisterMessageHandler(*binder),
Succeeded());
}
void Run() {
@ -502,8 +500,8 @@ TEST_F(HTTPDelimitedJSONTransportTest, ReaderWithUnhandledData) {
TEST_F(HTTPDelimitedJSONTransportTest, InvalidTransport) {
transport =
std::make_unique<TestHTTPDelimitedJSONTransport>(nullptr, nullptr);
ASSERT_THAT_ERROR(Run(/*close_input=*/false),
std::make_unique<TestHTTPDelimitedJSONTransport>(loop, nullptr, nullptr);
ASSERT_THAT_ERROR(transport->RegisterMessageHandler(message_handler),
FailedWithMessage("IO object is not valid."));
}
@ -624,8 +622,8 @@ TEST_F(JSONRPCTransportTest, Write) {
}
TEST_F(JSONRPCTransportTest, InvalidTransport) {
transport = std::make_unique<TestJSONRPCTransport>(nullptr, nullptr);
ASSERT_THAT_ERROR(Run(/*close_input=*/false),
transport = std::make_unique<TestJSONRPCTransport>(loop, nullptr, nullptr);
ASSERT_THAT_ERROR(transport->RegisterMessageHandler(message_handler),
FailedWithMessage("IO object is not valid."));
}

View File

@ -157,22 +157,18 @@ public:
}
void SetUp() override {
std::tie(to_client, to_server) = Transport::createPair();
std::tie(to_client, to_server) = Transport::createPair(loop);
server_up = std::make_unique<TestServer>(
"lldb-mcp", "0.1.0",
[this](StringRef msg) { logged_messages.push_back(msg.str()); });
binder = server_up->Bind(*to_client);
auto server_handle = to_server->RegisterMessageHandler(loop, *binder);
EXPECT_THAT_EXPECTED(server_handle, Succeeded());
binder->OnError([](llvm::Error error) {
llvm::errs() << formatv("Server transport error: {0}", error);
});
handles[0] = std::move(*server_handle);
auto client_handle = to_client->RegisterMessageHandler(loop, client);
EXPECT_THAT_EXPECTED(client_handle, Succeeded());
handles[1] = std::move(*client_handle);
EXPECT_THAT_ERROR(to_server->RegisterMessageHandler(*binder), Succeeded());
EXPECT_THAT_ERROR(to_client->RegisterMessageHandler(client), Succeeded());
}
template <typename Result, typename Params>

View File

@ -30,72 +30,51 @@ public:
static std::pair<std::unique_ptr<TestTransport<Proto>>,
std::unique_ptr<TestTransport<Proto>>>
createPair() {
createPair(lldb_private::MainLoop &loop) {
std::unique_ptr<TestTransport<Proto>> transports[2] = {
std::make_unique<TestTransport<Proto>>(),
std::make_unique<TestTransport<Proto>>()};
std::make_unique<TestTransport<Proto>>(loop),
std::make_unique<TestTransport<Proto>>(loop)};
return std::make_pair(std::move(transports[0]), std::move(transports[1]));
}
explicit TestTransport() {
llvm::Expected<lldb::FileUP> dummy_file =
lldb_private::FileSystem::Instance().Open(
lldb_private::FileSpec(lldb_private::FileSystem::DEV_NULL),
lldb_private::File::eOpenOptionReadWrite);
EXPECT_THAT_EXPECTED(dummy_file, llvm::Succeeded());
m_dummy_file = std::move(*dummy_file);
}
explicit TestTransport(lldb_private::MainLoop &loop) : m_loop(loop) {}
llvm::Error Send(const typename Proto::Evt &evt) override {
EXPECT_TRUE(m_loop && m_handler)
<< "Send called before RegisterMessageHandler";
m_loop->AddPendingCallback([this, evt](lldb_private::MainLoopBase &) {
EXPECT_TRUE(m_handler) << "Send called before RegisterMessageHandler";
m_loop.AddPendingCallback([this, evt](lldb_private::MainLoopBase &) {
m_handler->Received(evt);
});
return llvm::Error::success();
}
llvm::Error Send(const typename Proto::Req &req) override {
EXPECT_TRUE(m_loop && m_handler)
<< "Send called before RegisterMessageHandler";
m_loop->AddPendingCallback([this, req](lldb_private::MainLoopBase &) {
EXPECT_TRUE(m_handler) << "Send called before RegisterMessageHandler";
m_loop.AddPendingCallback([this, req](lldb_private::MainLoopBase &) {
m_handler->Received(req);
});
return llvm::Error::success();
}
llvm::Error Send(const typename Proto::Resp &resp) override {
EXPECT_TRUE(m_loop && m_handler)
<< "Send called before RegisterMessageHandler";
m_loop->AddPendingCallback([this, resp](lldb_private::MainLoopBase &) {
EXPECT_TRUE(m_handler) << "Send called before RegisterMessageHandler";
m_loop.AddPendingCallback([this, resp](lldb_private::MainLoopBase &) {
m_handler->Received(resp);
});
return llvm::Error::success();
}
llvm::Expected<lldb_private::MainLoop::ReadHandleUP>
RegisterMessageHandler(lldb_private::MainLoop &loop,
MessageHandler &handler) override {
if (!m_loop)
m_loop = &loop;
llvm::Error RegisterMessageHandler(MessageHandler &handler) override {
if (!m_handler)
m_handler = &handler;
lldb_private::Status status;
auto handle = loop.RegisterReadObject(
m_dummy_file, [](lldb_private::MainLoopBase &) {}, status);
if (status.Fail())
return status.takeError();
return handle;
return llvm::Error::success();
}
protected:
void Log(llvm::StringRef message) override {};
private:
lldb_private::MainLoop *m_loop = nullptr;
lldb_private::MainLoop &m_loop;
MessageHandler *m_handler = nullptr;
// Dummy file for registering with the MainLoop.
lldb::FileSP m_dummy_file = nullptr;
};
template <typename Proto>