From 3bf4a674bc083cfe804f2c358195e2f49f2719ac Mon Sep 17 00:00:00 2001 From: Bartosz Taudul Date: Sun, 2 Jun 2024 12:27:17 +0200 Subject: [PATCH] Use multiple compression streams when writing trace files. --- server/TracyFileHeader.hpp | 5 +- server/TracyFileWrite.hpp | 155 +++++++++++++++++++++++++++++-------- 2 files changed, 127 insertions(+), 33 deletions(-) diff --git a/server/TracyFileHeader.hpp b/server/TracyFileHeader.hpp index efb2bb20..650de534 100644 --- a/server/TracyFileHeader.hpp +++ b/server/TracyFileHeader.hpp @@ -8,8 +8,9 @@ namespace tracy { -static const char Lz4Header[4] = { 't', 'l', 'Z', 4 }; -static const char ZstdHeader[4] = { 't', 'Z', 's', 't' }; +static const uint8_t TracyHeader[4] = { 't', 'r', 253, 'P' }; +static const uint8_t Lz4Header[4] = { 't', 'l', 'Z', 4 }; +static const uint8_t ZstdHeader[4] = { 't', 'Z', 's', 't' }; static constexpr tracy_force_inline int FileVersion( uint8_t h5, uint8_t h6, uint8_t h7 ) { diff --git a/server/TracyFileWrite.hpp b/server/TracyFileWrite.hpp index 51d8a665..6e4edb63 100644 --- a/server/TracyFileWrite.hpp +++ b/server/TracyFileWrite.hpp @@ -7,9 +7,13 @@ #include #include +#include +#include #include #include +#include #include +#include #include "TracyFileHeader.hpp" #include "../public/common/tracy_lz4.hpp" @@ -38,8 +42,9 @@ public: : m_stream( nullptr ) , m_streamHC( nullptr ) , m_streamZstd( nullptr ) - , m_buf( m_bufData[0] ) - , m_second( m_bufData[1] ) + , m_buf( new char[FileBufSize] ) + , m_second( new char[FileBufSize] ) + , m_compressed( new char[FileBoundSize] ) { switch( comp ) { @@ -66,18 +71,24 @@ public: ~WriteStream() { + delete[] m_buf; + delete[] m_second; + delete[] m_compressed; + if( m_stream ) LZ4_freeStream( m_stream ); if( m_streamHC ) LZ4_freeStreamHC( m_streamHC ); if( m_streamZstd ) ZSTD_freeCStream( m_streamZstd ); } - char* GetBuffer() { return m_buf; } + char* GetInputBuffer() { return m_buf; } + const char* GetCompressedData() const { return m_compressed; } + uint32_t GetSize() const { return m_size; } - const char* Compress( uint32_t& sz ) + void Compress( uint32_t sz ) { if( m_stream ) { - sz = LZ4_compress_fast_continue( m_stream, m_buf, m_compressed, sz, FileBoundSize, 1 ); + m_size = LZ4_compress_fast_continue( m_stream, m_buf, m_compressed, sz, FileBoundSize, 1 ); } else if( m_streamZstd ) { @@ -85,15 +96,14 @@ public: ZSTD_inBuffer in = { m_buf, sz, 0 }; const auto ret = ZSTD_compressStream2( m_streamZstd, &out, &in, ZSTD_e_flush ); assert( ret == 0 ); - sz = out.pos; + m_size = out.pos; } else { - sz = LZ4_compress_HC_continue( m_streamHC, m_buf, m_compressed, sz, FileBoundSize ); + m_size = LZ4_compress_HC_continue( m_streamHC, m_buf, m_compressed, sz, FileBoundSize ); } std::swap( m_buf, m_second ); - return m_compressed; } private: @@ -101,20 +111,39 @@ private: LZ4_streamHC_t* m_streamHC; ZSTD_CStream* m_streamZstd; - char m_bufData[2][FileBufSize]; char* m_buf; char* m_second; - - char m_compressed[FileBoundSize]; + char* m_compressed; + uint32_t m_size; }; class FileWrite { + struct StreamHandle + { + StreamHandle( FileCompression comp, int level ) : stream( comp, level ) {} + + WriteStream stream; + uint32_t size; + + bool inputReady = false; + bool outputReady = false; + bool exit = false; + + std::mutex signalLock; + std::condition_variable signal; + + std::thread thread; + }; + public: - static FileWrite* Open( const char* fn, FileCompression comp = FileCompression::Fast, int level = 1 ) + static FileWrite* Open( const char* fn, FileCompression comp = FileCompression::Fast, int level = 1, int streams = -1 ) { auto f = fopen( fn, "wb" ); - return f ? new FileWrite( f, comp, level ) : nullptr; + if( !f ) return nullptr; + if( streams <= 0 ) streams = std::max( 1, std::thread::hardware_concurrency() ); + if( streams > 255 ) streams = 255; + return new FileWrite( f, comp, level, streams ); } ~FileWrite() @@ -126,6 +155,15 @@ public: void Finish() { if( m_offset > 0 ) WriteBlock(); + while( m_streamPending > 0 ) ProcessPending(); + for( auto& v : m_streams ) + { + std::lock_guard lock( v->signalLock ); + v->exit = true; + v->signal.notify_one(); + } + for( auto& v : m_streams ) v->thread.join(); + m_streams.clear(); } tracy_force_inline void Write( const void* ptr, size_t size ) @@ -143,22 +181,30 @@ public: std::pair GetCompressionStatistics() const { return std::make_pair( m_srcBytes, m_dstBytes ); } private: - FileWrite( FILE* f, FileCompression comp, int level ) - : m_stream( comp, level ) + FileWrite( FILE* f, FileCompression comp, int level, int streams ) + : m_offset( 0 ) , m_file( f ) - , m_buf( m_stream.GetBuffer() ) - , m_offset( 0 ) , m_srcBytes( 0 ) , m_dstBytes( 0 ) { - if( comp == FileCompression::Zstd ) + assert( streams > 0 ); + assert( streams < 256 ); + + fwrite( TracyHeader, 1, sizeof( TracyHeader ), m_file ); + uint8_t u8 = comp == FileCompression::Zstd ? 1 : 0; + fwrite( &u8, 1, 1, m_file ); + u8 = streams; + fwrite( &u8, 1, 1, m_file ); + + m_streams.reserve( streams ); + for( int i=0; i( comp, level ); + uptr->thread = std::thread( [ptr = uptr.get()]{ Worker( ptr ); } ); + m_streams.emplace_back( std::move( uptr ) ); } + + m_buf = m_streams[m_streamId]->stream.GetInputBuffer(); } tracy_force_inline void WriteSmall( const void* ptr, size_t size ) @@ -187,22 +233,69 @@ private: void WriteBlock() { - uint32_t sz = m_offset; m_srcBytes += m_offset; - auto block = m_stream.Compress( sz ); - m_dstBytes += sz; - fwrite( &sz, 1, sizeof( sz ), m_file ); - fwrite( block, 1, sz, m_file ); + auto& hnd = *m_streams[m_streamId]; + assert( hnd.stream.GetInputBuffer() == m_buf ); + + std::unique_lock lock( hnd.signalLock ); + hnd.inputReady = true; + hnd.size = m_offset; + hnd.signal.notify_one(); + lock.unlock(); + + m_streamPending++; + m_streamId = ( m_streamId + 1 ) % m_streams.size(); + if( m_streamPending == m_streams.size() ) ProcessPending(); m_offset = 0; - m_buf = m_stream.GetBuffer(); + m_buf = m_streams[m_streamId]->stream.GetInputBuffer(); + } + + void ProcessPending() + { + assert( m_streamPending > 0 ); + int id = ( m_streamId + m_streams.size() - m_streamPending ) % m_streams.size(); + m_streamPending--; + auto& hnd = *m_streams[id]; + + std::unique_lock lock( hnd.signalLock ); + hnd.signal.wait( lock, [&hnd]{ return hnd.outputReady; } ); + lock.unlock(); + + hnd.outputReady = false; + const uint32_t size = hnd.stream.GetSize(); + m_dstBytes += size; + fwrite( &size, 1, sizeof( size ), m_file ); + fwrite( hnd.stream.GetCompressedData(), 1, size, m_file ); + } + + static void Worker( StreamHandle* hnd ) + { + std::unique_lock lock( hnd->signalLock ); + for(;;) + { + hnd->signal.wait( lock, [&hnd]{ return hnd->inputReady || hnd->exit; } ); + if( hnd->exit ) return; + lock.unlock(); + + hnd->stream.Compress( hnd->size ); + hnd->inputReady = false; + + lock.lock(); + hnd->outputReady = true; + hnd->signal.notify_one(); + } } - WriteStream m_stream; - FILE* m_file; char* m_buf; size_t m_offset; + + int m_streamId = 0; + int m_streamPending = 0; + std::vector> m_streams; + FILE* m_file; + size_t m_srcBytes; size_t m_dstBytes; };