diff --git a/server/TracyFileRead.hpp b/server/TracyFileRead.hpp index 22f49bb5..ae301214 100644 --- a/server/TracyFileRead.hpp +++ b/server/TracyFileRead.hpp @@ -4,12 +4,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include @@ -34,8 +36,87 @@ namespace tracy struct NotTracyDump : public std::exception {}; struct FileReadError : public std::exception {}; +class ReadStream +{ +public: + ReadStream( uint8_t type ) + : m_stream( nullptr ) + , m_streamZstd( nullptr ) + , m_buf( new char[FileBufSize] ) + , m_second( new char[FileBufSize] ) + { + switch( type ) + { + case 0: + m_stream = LZ4_createStreamDecode(); + break; + case 1: + m_streamZstd = ZSTD_createDStream(); + break; + default: + assert( false ); + break; + } + } + + ~ReadStream() + { + delete[] m_buf; + delete[] m_second; + + if( m_stream ) LZ4_freeStreamDecode( m_stream ); + if( m_streamZstd ) ZSTD_freeDStream( m_streamZstd ); + } + + void Decompress( const char* src, uint32_t size ) + { + std::swap( m_buf, m_second ); + if( m_stream ) + { + m_size = (size_t)LZ4_decompress_safe_continue( m_stream, src, m_buf, size, FileBufSize ); + } + else + { + ZSTD_outBuffer out = { m_buf, FileBufSize, 0 }; + ZSTD_inBuffer in = { src, size, 0 }; + ZSTD_decompressStream( m_streamZstd, &out, &in ); + m_size = out.pos; + } + } + + const char* GetBuffer() const { return m_buf; } + size_t GetSize() const { return m_size; } + +private: + LZ4_streamDecode_t* m_stream; + ZSTD_DStream* m_streamZstd; + + char* m_buf; + char* m_second; + + size_t m_size; +}; + class FileRead { + struct StreamHandle + { + StreamHandle( uint8_t type ) : stream( type ) {} + + ReadStream stream; + const char* src; + uint32_t size; + + bool inputReady = false; + bool outputReady = false; + bool exit = false; + + std::mutex signalLock; + std::condition_variable signal; + + std::thread thread; + }; + public: static FileRead* Open( const char* fn ) { @@ -45,12 +126,15 @@ public: ~FileRead() { - m_exit.store( true, std::memory_order_relaxed ); - m_decThread.join(); - + for( auto& v : m_streams ) + { + std::lock_guard lock( v->signalLock ); + v->exit = true; + v->signal.notify_one(); + } + for( auto& v : m_streams ) v->thread.join(); + m_streams.clear(); if( m_data ) munmap( m_data, m_dataSize ); - if( m_stream ) LZ4_freeStreamDecode( m_stream ); - if( m_streamZstd ) ZSTD_freeDStream( m_streamZstd ); } tracy_force_inline void Read( void* ptr, size_t size ) @@ -331,16 +415,9 @@ public: private: FileRead( FILE* f, const char* fn ) - : m_stream( nullptr ) - , m_streamZstd( nullptr ) - , m_data( nullptr ) - , m_buf( m_bufData[1] ) - , m_second( m_bufData[0] ) + : m_data( nullptr ) , m_offset( 0 ) - , m_lastBlock( 0 ) - , m_signalSwitch( false ) - , m_signalAvailable( false ) - , m_exit( false ) + , m_streamId( 0 ) , m_filename( fn ) { char hdr[4]; @@ -351,29 +428,16 @@ private: } uint8_t streams = 1; + uint8_t type; m_dataOffset = sizeof( hdr ); if( memcmp( hdr, TracyHeader, sizeof( hdr ) ) == 0 ) { - uint8_t type; - if( fread( &type, 1, 1, f ) != 1 ) + if( fread( &type, 1, 1, f ) != 1 || type > 1 ) { fclose( f ); throw NotTracyDump(); } - switch( type ) - { - case 0: - m_stream = LZ4_createStreamDecode(); - break; - case 1: - m_streamZstd = ZSTD_createDStream(); - break; - default: - fclose( f ); - throw NotTracyDump(); - break; - } if( fread( &streams, 1, 1, f ) != 1 ) { fclose( f ); @@ -383,11 +447,11 @@ private: } else if( memcmp( hdr, Lz4Header, sizeof( hdr ) ) == 0 ) { - m_stream = LZ4_createStreamDecode(); + type = 0; } else if( memcmp( hdr, ZstdHeader, sizeof( hdr ) ) == 0 ) { - m_streamZstd = ZSTD_createDStream(); + type = 1; } else { @@ -413,9 +477,21 @@ private: throw FileReadError(); } - ReadBlock( ReadBlockSize() ); - std::swap( m_buf, m_second ); - m_decThread = std::thread( [this] { Worker(); } ); + for( int i=0; i<(int)streams; i++ ) + { + if( m_dataOffset == m_dataSize ) break; + + const auto sz = ReadBlockSize(); + auto uptr = std::make_unique( type ); + uptr->src = m_data + m_dataOffset; + uptr->size = sz; + uptr->inputReady = true; + uptr->thread = std::thread( [ptr = uptr.get()] { Worker( ptr ); } ); + m_streams.emplace_back( std::move( uptr ) ); + m_dataOffset += sz; + } + + GetNextDataBlock(); } tracy_force_inline uint32_t ReadBlockSize() @@ -426,24 +502,21 @@ private: return sz; } - void Worker() + static void Worker( StreamHandle* hnd ) { - uint32_t blockSz = ReadBlockSize(); + std::unique_lock lock( hnd->signalLock ); for(;;) { - ReadBlock( blockSz ); - if( m_lastBlock == FileBufSize ) blockSz = ReadBlockSize(); - for(;;) - { - if( m_exit.load( std::memory_order_relaxed ) == true ) return; - if( m_signalSwitch.load( std::memory_order_relaxed ) == true ) break; - YieldThread(); - } - m_signalSwitch.store( false, std::memory_order_relaxed ); - std::swap( m_buf, m_second ); - m_offset = 0; - m_signalAvailable.store( true, std::memory_order_release ); - if( m_lastBlock != FileBufSize ) return; + hnd->signal.wait( lock, [&] { return hnd->inputReady || hnd->exit; } ); + if( hnd->exit ) return; + lock.unlock(); + + hnd->stream.Decompress( hnd->src, hnd->size ); + hnd->inputReady = false; + + lock.lock(); + hnd->outputReady = true; + hnd->signal.notify_one(); } } @@ -463,12 +536,7 @@ private: if( m_offset == FileBufSize ) { sz = std::min( size, FileBufSize ); - - m_signalSwitch.store( true, std::memory_order_relaxed ); - while( m_signalAvailable.load( std::memory_order_acquire ) == false ) { YieldThread(); } - m_signalAvailable.store( false, std::memory_order_relaxed ); - assert( m_offset == 0 ); - + GetNextDataBlock(); memcpy( dst, m_buf, sz ); m_offset = sz; } @@ -489,55 +557,49 @@ private: { while( size > 0 ) { - if( m_offset == FileBufSize ) - { - m_signalSwitch.store( true, std::memory_order_relaxed ); - while( m_signalAvailable.load( std::memory_order_acquire ) == false ) { YieldThread(); } - m_signalAvailable.store( false, std::memory_order_relaxed ); - } - + if( m_offset == FileBufSize ) GetNextDataBlock(); const auto sz = std::min( size, FileBufSize - m_offset ); m_offset += sz; size -= sz; } } - void ReadBlock( uint32_t sz ) + void GetNextDataBlock() { - if( m_stream ) + auto& hnd = *m_streams[m_streamId]; + std::unique_lock lock( hnd.signalLock ); + hnd.signal.wait( lock, [&hnd]{ return hnd.outputReady; } ); + lock.unlock(); + + hnd.outputReady = false; + m_buf = hnd.stream.GetBuffer(); + m_offset = 0; + + if( m_dataOffset < m_dataSize ) { - m_lastBlock = (size_t)LZ4_decompress_safe_continue( m_stream, m_data + m_dataOffset, m_second, sz, FileBufSize ); + const auto sz = ReadBlockSize(); + lock.lock(); + hnd.src = m_data + m_dataOffset; + hnd.size = sz; + hnd.inputReady = true; + hnd.signal.notify_one(); + lock.unlock(); m_dataOffset += sz; } - else - { - ZSTD_outBuffer out = { m_second, FileBufSize, 0 }; - ZSTD_inBuffer in = { m_data + m_dataOffset, sz, 0 }; - m_dataOffset += sz; - const auto ret = ZSTD_decompressStream( m_streamZstd, &out, &in ); - assert( ret > 0 ); - m_lastBlock = out.pos; - } + + m_streamId = ( m_streamId + 1 ) % m_streams.size(); } - LZ4_streamDecode_t* m_stream; - ZSTD_DStream* m_streamZstd; char* m_data; + const char* m_buf; uint64_t m_dataSize; uint64_t m_dataOffset; - char* m_buf; - char* m_second; size_t m_offset; - size_t m_lastBlock; - - alignas(64) std::atomic m_signalSwitch; - alignas(64) std::atomic m_signalAvailable; - alignas(64) std::atomic m_exit; - - std::thread m_decThread; + int m_streamId; std::string m_filename; - char m_bufData[2][FileBufSize]; + + std::vector> m_streams; }; }