Read and decompress network data on a separate thread.

This commit is contained in:
Bartosz Taudul 2019-10-28 23:22:50 +01:00
parent e0356ae01e
commit 8050622b0f
2 changed files with 79 additions and 20 deletions

View File

@ -2282,6 +2282,42 @@ const Worker::SourceLocationZones& Worker::GetZonesForSourceLocation( int16_t sr
void Worker::Network() void Worker::Network()
{ {
auto ShouldExit = [this] { return m_shutdown.load( std::memory_order_relaxed ); };
auto lz4buf = std::make_unique<char[]>( LZ4Size );
for(;;)
{
{
std::unique_lock<std::mutex> lock( m_netWriteLock );
m_netWriteCv.wait( lock, [this] { return m_netWriteCnt > 0 || m_shutdown.load( std::memory_order_relaxed ); } );
if( m_shutdown.load( std::memory_order_relaxed ) ) goto close;
m_netWriteCnt--;
}
auto buf = m_buffer + m_bufferOffset;
lz4sz_t lz4sz;
if( !m_sock.Read( &lz4sz, sizeof( lz4sz ), 10, ShouldExit ) ) goto close;
if( !m_sock.Read( lz4buf.get(), lz4sz, 10, ShouldExit ) ) goto close;
m_bytes.fetch_add( sizeof( lz4sz ) + lz4sz, std::memory_order_relaxed );
auto sz = LZ4_decompress_safe_continue( m_stream, lz4buf.get(), buf, lz4sz, TargetFrameSize );
assert( sz >= 0 );
m_decBytes.fetch_add( sz, std::memory_order_relaxed );
{
std::lock_guard<std::mutex> lock( m_netReadLock );
m_netRead.push_back( NetBuffer { m_bufferOffset, sz } );
m_netReadCv.notify_one();
}
m_bufferOffset += sz;
if( m_bufferOffset > TargetFrameSize * 2 ) m_bufferOffset = 0;
}
close:
std::lock_guard<std::mutex> lock( m_netReadLock );
m_netRead.push_back( NetBuffer { -1 } );
m_netReadCv.notify_one();
} }
void Worker::Exec() void Worker::Exec()
@ -2294,13 +2330,8 @@ void Worker::Exec()
if( m_sock.Connect( m_addr.c_str(), m_port ) ) break; if( m_sock.Connect( m_addr.c_str(), m_port ) ) break;
} }
auto lz4buf = std::make_unique<char[]>( LZ4Size );
std::chrono::time_point<std::chrono::high_resolution_clock> t0; std::chrono::time_point<std::chrono::high_resolution_clock> t0;
uint64_t bytes = 0;
uint64_t decBytes = 0;
m_sock.Send( HandshakeShibboleth, HandshakeShibbolethSize ); m_sock.Send( HandshakeShibboleth, HandshakeShibbolethSize );
uint32_t protocolVersion = ProtocolVersion; uint32_t protocolVersion = ProtocolVersion;
m_sock.Send( &protocolVersion, sizeof( protocolVersion ) ); m_sock.Send( &protocolVersion, sizeof( protocolVersion ) );
@ -2381,6 +2412,11 @@ void Worker::Exec()
LZ4_setStreamDecode( m_stream, nullptr, 0 ); LZ4_setStreamDecode( m_stream, nullptr, 0 );
m_connected.store( true, std::memory_order_relaxed ); m_connected.store( true, std::memory_order_relaxed );
{
std::lock_guard<std::mutex> lock( m_netWriteLock );
m_netWriteCnt = 2;
m_netWriteCv.notify_one();
}
t0 = std::chrono::high_resolution_clock::now(); t0 = std::chrono::high_resolution_clock::now();
@ -2392,18 +2428,17 @@ void Worker::Exec()
goto close; goto close;
} }
auto buf = m_buffer + m_bufferOffset; NetBuffer netbuf;
lz4sz_t lz4sz; {
if( !m_sock.Read( &lz4sz, sizeof( lz4sz ), 10, ShouldExit ) ) goto close; std::unique_lock<std::mutex> lock( m_netReadLock );
if( !m_sock.Read( lz4buf.get(), lz4sz, 10, ShouldExit ) ) goto close; m_netReadCv.wait( lock, [this] { return !m_netRead.empty(); } );
bytes += sizeof( lz4sz ) + lz4sz; netbuf = m_netRead.front();
m_netRead.erase( m_netRead.begin() );
}
if( netbuf.bufferOffset < 0 ) goto close;
auto sz = LZ4_decompress_safe_continue( m_stream, lz4buf.get(), buf, lz4sz, TargetFrameSize ); char* ptr = m_buffer + netbuf.bufferOffset;
assert( sz >= 0 ); const char* end = ptr + netbuf.size;
decBytes += sz;
char* ptr = buf;
const char* end = buf + sz;
{ {
std::lock_guard<std::shared_mutex> lock( m_data.lock ); std::lock_guard<std::shared_mutex> lock( m_data.lock );
@ -2417,8 +2452,11 @@ void Worker::Exec()
} }
} }
m_bufferOffset += sz; {
if( m_bufferOffset > TargetFrameSize * 2 ) m_bufferOffset = 0; std::lock_guard<std::mutex> lock( m_netWriteLock );
m_netWriteCnt++;
m_netWriteCv.notify_one();
}
HandlePostponedPlots(); HandlePostponedPlots();
@ -2436,6 +2474,8 @@ void Worker::Exec()
enum { MbpsUpdateTime = 200 }; enum { MbpsUpdateTime = 200 };
if( td > MbpsUpdateTime ) if( td > MbpsUpdateTime )
{ {
const auto bytes = m_bytes.exchange( 0, std::memory_order_relaxed );
const auto decBytes = m_decBytes.exchange( 0, std::memory_order_relaxed );
std::lock_guard<std::shared_mutex> lock( m_mbpsData.lock ); std::lock_guard<std::shared_mutex> lock( m_mbpsData.lock );
m_mbpsData.mbps.erase( m_mbpsData.mbps.begin() ); m_mbpsData.mbps.erase( m_mbpsData.mbps.begin() );
m_mbpsData.mbps.emplace_back( bytes / ( td * 125.f ) ); m_mbpsData.mbps.emplace_back( bytes / ( td * 125.f ) );
@ -2443,8 +2483,6 @@ void Worker::Exec()
m_mbpsData.queue = m_serverQueryQueue.size(); m_mbpsData.queue = m_serverQueryQueue.size();
m_mbpsData.transferred += bytes; m_mbpsData.transferred += bytes;
t0 = t1; t0 = t1;
bytes = 0;
decBytes = 0;
} }
if( m_terminate ) if( m_terminate )
@ -2474,6 +2512,8 @@ void Worker::Exec()
} }
close: close:
Shutdown();
m_netWriteCv.notify_one();
m_sock.Close(); m_sock.Close();
m_connected.store( false, std::memory_order_relaxed ); m_connected.store( false, std::memory_order_relaxed );
} }

View File

@ -2,7 +2,9 @@
#define __TRACYWORKER_HPP__ #define __TRACYWORKER_HPP__
#include <atomic> #include <atomic>
#include <condition_variable>
#include <limits> #include <limits>
#include <mutex>
#include <shared_mutex> #include <shared_mutex>
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
@ -653,6 +655,23 @@ private:
int64_t m_refTimeSerial = 0; int64_t m_refTimeSerial = 0;
int64_t m_refTimeCtx = 0; int64_t m_refTimeCtx = 0;
int64_t m_refTimeGpu = 0; int64_t m_refTimeGpu = 0;
std::atomic<uint64_t> m_bytes { 0 };
std::atomic<uint64_t> m_decBytes { 0 };
struct NetBuffer
{
int bufferOffset;
int size;
};
std::vector<NetBuffer> m_netRead;
std::mutex m_netReadLock;
std::condition_variable m_netReadCv;
int m_netWriteCnt = 0;
std::mutex m_netWriteLock;
std::condition_variable m_netWriteCv;
}; };
} }