[libc] Replace RPC 'close()' mechanism with RAII handler (#181690)
Summary: Closing ports was previously done manually, This makes the protocol more error prone as unclosed ports will leak and eventually the locks will run out. I believe the original fear was that the RAII portion would negatively impact code generation but I have not noticed anything significant.
This commit is contained in:
parent
7c3c9c45f8
commit
d85576d368
@ -117,7 +117,6 @@ done. It can be omitted if asynchronous execution is desired.
|
||||
buffer->data[0] = reinterpret_cast<uintptr_t>(fn);
|
||||
});
|
||||
port.recv([](rpc::Buffer *, uint32_t) {});
|
||||
port.close();
|
||||
}
|
||||
|
||||
Server Example
|
||||
@ -162,7 +161,6 @@ data.
|
||||
port->recv([](rpc::Buffer *) {});
|
||||
break;
|
||||
}
|
||||
port->close();
|
||||
}
|
||||
|
||||
Function Dispatch
|
||||
@ -199,7 +197,6 @@ than submitting asynchronously.
|
||||
port->recv([](rpc::Buffer *) {});
|
||||
break;
|
||||
}
|
||||
port->close();
|
||||
}
|
||||
|
||||
|
||||
@ -275,7 +272,6 @@ but the following example shows how it can be used by a standard user.
|
||||
|
||||
// Only available in-tree from the 'libc' sources.
|
||||
handle_libc_opcodes(*port, warp_size);
|
||||
port->close();
|
||||
} while (cudaStreamQuery(stream) == cudaErrorNotReady);
|
||||
}
|
||||
|
||||
|
||||
@ -297,7 +297,7 @@ template <bool T> struct Port {
|
||||
uint32_t index, uint32_t out)
|
||||
: process(process), lane_mask(lane_mask), lane_size(lane_size),
|
||||
index(index), out(out), receive(false), owns_buffer(true) {}
|
||||
RPC_ATTRS ~Port() = default;
|
||||
RPC_ATTRS ~Port() { close(); }
|
||||
|
||||
private:
|
||||
RPC_ATTRS Port(const Port &) = delete;
|
||||
@ -332,6 +332,7 @@ public:
|
||||
return lane_mask;
|
||||
}
|
||||
|
||||
private:
|
||||
RPC_ATTRS void close() {
|
||||
// Wait for all lanes to finish using the port.
|
||||
rpc::sync_lane(lane_mask);
|
||||
@ -343,7 +344,6 @@ public:
|
||||
process.unlock(lane_mask, index);
|
||||
}
|
||||
|
||||
private:
|
||||
Process<T> &process;
|
||||
uint64_t lane_mask;
|
||||
uint32_t lane_size;
|
||||
|
||||
@ -208,7 +208,6 @@ dispatch(rpc::Client &client, FnTy, CallArgs... args) {
|
||||
using BufferTy = rpc::conditional_t<rpc::is_void_v<RetTy>, uint8_t, RetTy>;
|
||||
BufferTy ret{};
|
||||
port.recv_n(&ret);
|
||||
port.close();
|
||||
|
||||
if constexpr (!rpc::is_void_v<RetTy>)
|
||||
return ret;
|
||||
|
||||
@ -61,7 +61,6 @@ static void *rpc_allocate(uint64_t size) {
|
||||
[&](rpc::Buffer *buffer, uint32_t) {
|
||||
ptr = reinterpret_cast<void *>(buffer->data[0]);
|
||||
});
|
||||
port.close();
|
||||
return ptr;
|
||||
}
|
||||
|
||||
@ -71,7 +70,6 @@ static void rpc_free(void *ptr) {
|
||||
port.send([=](rpc::Buffer *buffer, uint32_t) {
|
||||
buffer->data[0] = reinterpret_cast<uintptr_t>(ptr);
|
||||
});
|
||||
port.close();
|
||||
}
|
||||
|
||||
// Convert a potentially disjoint bitmask into an increasing integer per-lane
|
||||
|
||||
@ -24,7 +24,6 @@ namespace internal {
|
||||
port.send([&](rpc::Buffer *buffer, uint32_t) {
|
||||
reinterpret_cast<uint32_t *>(buffer->data)[0] = status;
|
||||
});
|
||||
port.close();
|
||||
|
||||
gpu::end_program();
|
||||
}
|
||||
|
||||
@ -18,7 +18,6 @@ void write_to_stderr(cpp::string_view msg) {
|
||||
rpc::Client::Port port = rpc::client.open<LIBC_WRITE_TO_STDERR>();
|
||||
port.send_n(msg.data(), msg.size());
|
||||
port.recv([](rpc::Buffer *, uint32_t) { /* void */ });
|
||||
port.close();
|
||||
}
|
||||
|
||||
} // namespace LIBC_NAMESPACE_DECL
|
||||
|
||||
@ -21,7 +21,6 @@ LLVM_LIBC_FUNCTION(void, clearerr, (::FILE * stream)) {
|
||||
buffer->data[0] = file::from_stream(stream);
|
||||
},
|
||||
[&](rpc::Buffer *, uint32_t) {});
|
||||
port.close();
|
||||
}
|
||||
|
||||
} // namespace LIBC_NAMESPACE_DECL
|
||||
|
||||
@ -22,7 +22,6 @@ LLVM_LIBC_FUNCTION(int, fclose, (::FILE * stream)) {
|
||||
port.send_and_recv(
|
||||
[=](rpc::Buffer *buffer, uint32_t) { buffer->data[0] = file; },
|
||||
[&](rpc::Buffer *buffer, uint32_t) { ret = buffer->data[0]; });
|
||||
port.close();
|
||||
|
||||
if (ret != 0)
|
||||
return EOF;
|
||||
|
||||
@ -24,7 +24,6 @@ LLVM_LIBC_FUNCTION(int, feof, (::FILE * stream)) {
|
||||
[&](rpc::Buffer *buffer, uint32_t) {
|
||||
ret = static_cast<int>(buffer->data[0]);
|
||||
});
|
||||
port.close();
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -24,7 +24,6 @@ LLVM_LIBC_FUNCTION(int, ferror, (::FILE * stream)) {
|
||||
[&](rpc::Buffer *buffer, uint32_t) {
|
||||
ret = static_cast<int>(buffer->data[0]);
|
||||
});
|
||||
port.close();
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -24,7 +24,6 @@ LLVM_LIBC_FUNCTION(int, fflush, (::FILE * stream)) {
|
||||
[&](rpc::Buffer *buffer, uint32_t) {
|
||||
ret = static_cast<int>(buffer->data[0]);
|
||||
});
|
||||
port.close();
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -31,7 +31,6 @@ LLVM_LIBC_FUNCTION(char *, fgets,
|
||||
});
|
||||
port.recv_n(&buf, &recv_size,
|
||||
[&](uint64_t) { return reinterpret_cast<void *>(str); });
|
||||
port.close();
|
||||
|
||||
if (recv_size == 0)
|
||||
return nullptr;
|
||||
|
||||
@ -63,7 +63,6 @@ LIBC_INLINE uint64_t write_impl(::FILE *file, const void *data, size_t size) {
|
||||
port.recv([&](rpc::Buffer *buffer, uint32_t) {
|
||||
ret = reinterpret_cast<uint64_t *>(buffer->data)[0];
|
||||
});
|
||||
port.close();
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -86,7 +85,6 @@ LIBC_INLINE uint64_t read_from_stream(::FILE *file, void *buf, size_t size) {
|
||||
});
|
||||
port.recv_n(&buf, &recv_size, [&](uint64_t) { return buf; });
|
||||
port.recv([&](rpc::Buffer *buffer, uint32_t) { ret = buffer->data[0]; });
|
||||
port.close();
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -26,7 +26,6 @@ LLVM_LIBC_FUNCTION(::FILE *, fopen,
|
||||
inline_memcpy(buffer->data, mode, internal::string_length(mode) + 1);
|
||||
},
|
||||
[&](rpc::Buffer *buffer, uint32_t) { file = buffer->data[0]; });
|
||||
port.close();
|
||||
|
||||
return reinterpret_cast<FILE *>(file);
|
||||
}
|
||||
|
||||
@ -26,7 +26,6 @@ LLVM_LIBC_FUNCTION(int, fseek, (::FILE * stream, long offset, int whence)) {
|
||||
[&](rpc::Buffer *buffer, uint32_t) {
|
||||
ret = static_cast<int>(buffer->data[0]);
|
||||
});
|
||||
port.close();
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -24,7 +24,6 @@ LLVM_LIBC_FUNCTION(long, ftell, (::FILE * stream)) {
|
||||
[&](rpc::Buffer *buffer, uint32_t) {
|
||||
ret = static_cast<long>(buffer->data[0]);
|
||||
});
|
||||
port.close();
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -22,7 +22,6 @@ LLVM_LIBC_FUNCTION(int, remove, (const char *path)) {
|
||||
port.recv([&](rpc::Buffer *buffer, uint32_t) {
|
||||
ret = static_cast<int>(buffer->data[0]);
|
||||
});
|
||||
port.close();
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -23,7 +23,6 @@ LLVM_LIBC_FUNCTION(int, rename, (const char *oldpath, const char *newpath)) {
|
||||
port.recv([&](rpc::Buffer *buffer, uint32_t) {
|
||||
ret = static_cast<int>(buffer->data[0]);
|
||||
});
|
||||
port.close();
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -25,7 +25,6 @@ LLVM_LIBC_FUNCTION(int, ungetc, (int c, ::FILE *stream)) {
|
||||
[&](rpc::Buffer *buffer, uint32_t) {
|
||||
ret = static_cast<int>(buffer->data[0]);
|
||||
});
|
||||
port.close();
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -51,7 +51,6 @@ LIBC_INLINE int vfprintf_impl(::FILE *__restrict file,
|
||||
port.send_n(str, size);
|
||||
}
|
||||
|
||||
port.close();
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -21,7 +21,6 @@ LLVM_LIBC_FUNCTION(void, abort, ()) {
|
||||
port.send_and_recv([](rpc::Buffer *, uint32_t) {},
|
||||
[](rpc::Buffer *, uint32_t) {});
|
||||
port.send([&](rpc::Buffer *, uint32_t) {});
|
||||
port.close();
|
||||
|
||||
gpu::end_program();
|
||||
}
|
||||
|
||||
@ -22,7 +22,6 @@ LLVM_LIBC_FUNCTION(int, system, (const char *command)) {
|
||||
port.recv([&](rpc::Buffer *buffer, uint32_t) {
|
||||
ret = static_cast<int>(buffer->data[0]);
|
||||
});
|
||||
port.close();
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -53,7 +53,6 @@ static void test_interface(bool end_with_send) {
|
||||
port.recv([&](LIBC_NAMESPACE::rpc::Buffer *buffer, uint32_t) {
|
||||
cnt = buffer->data[0];
|
||||
});
|
||||
port.close();
|
||||
|
||||
ASSERT_TRUE(cnt == 9 && "Invalid number of increments");
|
||||
}
|
||||
|
||||
@ -23,7 +23,6 @@ static void test_add() {
|
||||
[&](LIBC_NAMESPACE::rpc::Buffer *buffer, uint32_t) {
|
||||
cnt = reinterpret_cast<uint64_t *>(buffer->data)[0];
|
||||
});
|
||||
port.close();
|
||||
EXPECT_EQ(cnt, gpu::get_lane_id() + 1);
|
||||
EXPECT_EQ(gpu::get_thread_id(), gpu::get_lane_id());
|
||||
}
|
||||
|
||||
@ -39,7 +39,6 @@ static void test_stream() {
|
||||
port.send_n(send_ptr, send_size);
|
||||
port.recv_n(&recv_ptr, &recv_size,
|
||||
[](uint64_t size) { return malloc(size); });
|
||||
port.close();
|
||||
ASSERT_TRUE(inline_memcmp(recv_ptr, str, recv_size) == 0 && "Data mismatch");
|
||||
ASSERT_TRUE(recv_size == send_size && "Data size mismatch");
|
||||
|
||||
@ -83,7 +82,6 @@ static void test_divergent() {
|
||||
port.send_n(buffer, offset);
|
||||
inline_memset(buffer, 0, offset);
|
||||
port.recv_n(&recv_ptr, &recv_size, [&](uint64_t) { return buffer; });
|
||||
port.close();
|
||||
|
||||
ASSERT_TRUE(inline_memcmp(recv_ptr, &data[offset], recv_size) == 0 &&
|
||||
"Data mismatch");
|
||||
|
||||
@ -26,7 +26,6 @@ static void test_add_simple() {
|
||||
[&](LIBC_NAMESPACE::rpc::Buffer *buffer, uint32_t) {
|
||||
cnt = reinterpret_cast<uint64_t *>(buffer->data)[0];
|
||||
});
|
||||
port.close();
|
||||
}
|
||||
ASSERT_TRUE(cnt == num_additions && "Incorrect sum");
|
||||
}
|
||||
@ -38,7 +37,6 @@ static void test_noop(uint8_t data) {
|
||||
port.send([=](LIBC_NAMESPACE::rpc::Buffer *buffer, uint32_t) {
|
||||
buffer->data[0] = data;
|
||||
});
|
||||
port.close();
|
||||
}
|
||||
|
||||
TEST_MAIN(int argc, char **argv, char **envp) {
|
||||
|
||||
@ -47,8 +47,6 @@ inline uint32_t handle_server(rpc::Server &server, uint32_t index,
|
||||
if (status != rpc::RPC_SUCCESS)
|
||||
handle_error("Error handling RPC server");
|
||||
|
||||
port->close();
|
||||
|
||||
return index;
|
||||
}
|
||||
|
||||
|
||||
@ -111,7 +111,6 @@ runServer(plugin::GenericDeviceTy &Device, void *Buffer,
|
||||
if (Status == rpc::RPC_UNHANDLED_OPCODE)
|
||||
Status = LIBC_NAMESPACE::shared::handle_libc_opcodes(*Port, NumLanes);
|
||||
|
||||
Port->close();
|
||||
return Status;
|
||||
}
|
||||
|
||||
|
||||
@ -131,7 +131,6 @@ unsigned long long __llvm_omp_host_call(void *fn, void *data, size_t size) {
|
||||
Port.recv([&](rpc::Buffer *Buffer, uint32_t) {
|
||||
Ret = static_cast<unsigned long long>(Buffer->data[0]);
|
||||
});
|
||||
Port.close();
|
||||
return Ret;
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user