diff --git a/libc/shared/rpc.h b/libc/shared/rpc.h index 668b6f1993d8..f162c4cfae5e 100644 --- a/libc/shared/rpc.h +++ b/libc/shared/rpc.h @@ -130,11 +130,14 @@ template struct Process { /// Equivalent to loading outbox followed by store of the inverted value /// The outbox is write only by this warp and tracking the value locally is /// cheaper than calling load_outbox to get the value to store. - RPC_ATTRS uint32_t invert_outbox(uint32_t index, uint32_t current_outbox) { + RPC_ATTRS uint32_t invert_outbox(uint64_t lane_mask, uint32_t index, + uint32_t current_outbox) { uint32_t inverted_outbox = !current_outbox; + rpc::sync_lane(lane_mask); __scoped_atomic_thread_fence(__ATOMIC_RELEASE, __MEMORY_SCOPE_SYSTEM); - __scoped_atomic_store_n(&outbox[index], inverted_outbox, __ATOMIC_RELAXED, - __MEMORY_SCOPE_SYSTEM); + if (rpc::is_first_lane(lane_mask)) + __scoped_atomic_store_n(&outbox[index], inverted_outbox, __ATOMIC_RELAXED, + __MEMORY_SCOPE_SYSTEM); return inverted_outbox; } @@ -340,7 +343,7 @@ private: // The server is passive, if it owns the buffer when it closes we need to // give ownership back to the client. if (owns_buffer && T) - out = process.invert_outbox(index, out); + out = process.invert_outbox(lane_mask, index, out); process.unlock(lane_mask, index); } @@ -403,7 +406,7 @@ template template RPC_ATTRS void Port::send(F fill) { // Apply the \p fill function to initialize the buffer and release the memory. invoke_rpc(fill, lane_size, get_lane_mask(), process.get_packet(index, lane_size)); - out = process.invert_outbox(index, out); + out = process.invert_outbox(lane_mask, index, out); owns_buffer = false; receive = false; } @@ -413,7 +416,7 @@ template template RPC_ATTRS void Port::recv(U use) { // We only exchange ownership of the buffer during a receive if we are waiting // for a previous receive to finish. if (receive) { - out = process.invert_outbox(index, out); + out = process.invert_outbox(lane_mask, index, out); owns_buffer = false; } @@ -556,8 +559,10 @@ template RPC_ATTRS Client::Port Client::open() { if (index >= process.port_count) index = 0; - // Attempt to acquire the lock on this index. + // Attempt to acquire the lock on this index. Under NVIDIA's ITS the lanes + // may reconverge with differing index values, ensure they are convergent. uint64_t lane_mask = rpc::get_lane_mask(); + index = rpc::broadcast_value(lane_mask, index); if (!process.try_lock(lane_mask, index)) continue; diff --git a/libc/test/integration/startup/gpu/CMakeLists.txt b/libc/test/integration/startup/gpu/CMakeLists.txt index 1eee7bcc3d18..06a8692801d3 100644 --- a/libc/test/integration/startup/gpu/CMakeLists.txt +++ b/libc/test/integration/startup/gpu/CMakeLists.txt @@ -54,6 +54,16 @@ add_integration_test( --blocks 8 ) +add_integration_test( + startup_rpc_divergence_test + SUITE libc-startup-tests + SRCS + rpc_divergence_test.cpp + LOADER_ARGS + --threads 256 + --blocks 16 +) + if(LIBC_TARGET_ARCHITECTURE_IS_AMDGPU) add_integration_test( startup_rpc_lane_test_w32 diff --git a/libc/test/integration/startup/gpu/rpc_divergence_test.cpp b/libc/test/integration/startup/gpu/rpc_divergence_test.cpp new file mode 100644 index 000000000000..bd1e4a5c2668 --- /dev/null +++ b/libc/test/integration/startup/gpu/rpc_divergence_test.cpp @@ -0,0 +1,63 @@ +//===-- Loader test to check the RPC interface with the loader ------------===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// + +#include "src/__support/GPU/utils.h" +#include "src/__support/RPC/rpc_client.h" + +#include "test/IntegrationTest/test.h" + +using namespace LIBC_NAMESPACE; + +static inline uint32_t entropy() { + return (static_cast(gpu::processor_clock()) ^ + (gpu::get_thread_id_x() * 0x632be59b) ^ + (gpu::get_block_id_x() * 0x85157af5)) * + 0x9e3779bb; +} + +static inline uint32_t xorshift32(uint32_t &state) { + state ^= state << 13; + state ^= state >> 17; + state ^= state << 5; + return state * 0x9e3779bb; +} + +void increment(uint64_t cnt) { + LIBC_NAMESPACE::rpc::Client::Port port = + LIBC_NAMESPACE::rpc::client.open(); + port.send_and_recv( + [=](LIBC_NAMESPACE::rpc::Buffer *buffer, uint32_t) { + reinterpret_cast(buffer->data)[0] = cnt; + }, + [&](LIBC_NAMESPACE::rpc::Buffer *buffer, uint32_t) { + ASSERT_TRUE(reinterpret_cast(buffer->data)[0] == cnt + 1); + }); +} + +TEST_MAIN(int, char **, char **) { + uint32_t state = entropy(); + + // Force a highly divergent warp state while hammering the RPC interface. + uint32_t iters = 128 + xorshift32(state) % 128; + for (uint32_t i = 0; i < iters; ++i) { + if (xorshift32(state) % 127 == 0) { + volatile int x = 0; + uint32_t delay = xorshift32(state) % 4096; + for (uint32_t j = 0; j < delay; ++j) + x++; + } + uint32_t roll = xorshift32(state); + if (roll % 2 == 0) { + uint32_t burst = roll % 64 == 0 ? 2 + xorshift32(state) % 7 : 1; + for (uint32_t b = 0; b < burst; ++b) + increment(xorshift32(state)); + } + } + + return 0; +}