Parallelize file reading.

Use spin-locks for synchronization.

IsEOF() is now buggy, but the bug chance is fairly low (1/65536) - it
can happen when the last compressed block has exactly max decompressed
block size. Don't care about it much, as it's only used to open old
archives.
This commit is contained in:
Bartosz Taudul 2018-05-03 17:56:43 +02:00
parent 3d13ea09e8
commit a46d27f312

View File

@ -1,10 +1,12 @@
#ifndef __TRACYFILEREAD_HPP__
#define __TRACYFILEREAD_HPP__
#include <atomic>
#include <algorithm>
#include <stdexcept>
#include <stdio.h>
#include <string.h>
#include <thread>
#include "TracyFileHeader.hpp"
#include "../common/tracy_lz4.hpp"
@ -26,6 +28,9 @@ public:
~FileRead()
{
m_exit.store( true, std::memory_order_relaxed );
m_decThread.join();
fclose( m_file );
LZ4_freeStreamDecode( m_stream );
}
@ -104,10 +109,13 @@ private:
FileRead( FILE* f )
: m_stream( LZ4_createStreamDecode() )
, m_file( f )
, m_buf( m_bufData[0] )
, m_second( m_bufData[1] )
, m_offset( BufSize )
, m_buf( m_bufData[1] )
, m_second( m_bufData[0] )
, m_offset( 0 )
, m_lastBlock( 0 )
, m_signalSwitch( false )
, m_signalAvailable( false )
, m_exit( false )
{
char hdr[4];
if( fread( hdr, 1, sizeof( hdr ), m_file ) != sizeof( hdr ) ) throw NotTracyDump();
@ -119,6 +127,28 @@ private:
memcpy( &sz, hdr, sizeof( sz ) );
if( sz > LZ4Size ) throw NotTracyDump();
}
ReadBlock();
std::swap( m_buf, m_second );
m_decThread = std::thread( [this] { Worker(); } );
}
void Worker()
{
for(;;)
{
ReadBlock();
for(;;)
{
if( m_exit.load( std::memory_order_relaxed ) == true ) return;
if( m_signalSwitch.load( std::memory_order_relaxed ) == true ) break;
}
m_signalSwitch.store( false, std::memory_order_relaxed );
std::swap( m_buf, m_second );
m_offset = 0;
m_signalAvailable.store( true, std::memory_order_release );
if( m_lastBlock != BufSize ) return;
}
}
tracy_force_inline void ReadSmall( void* ptr, size_t size )
@ -134,8 +164,9 @@ private:
{
if( m_offset == BufSize )
{
std::swap( m_buf, m_second );
ReadBlock();
m_signalSwitch.store( true, std::memory_order_relaxed );
while( m_signalAvailable.load( std::memory_order_acquire ) == false ) {}
m_signalAvailable.store( false, std::memory_order_relaxed );
}
const auto sz = std::min( size, BufSize - m_offset );
@ -152,8 +183,9 @@ private:
{
if( m_offset == BufSize )
{
std::swap( m_buf, m_second );
ReadBlock();
m_signalSwitch.store( true, std::memory_order_relaxed );
while( m_signalAvailable.load( std::memory_order_acquire ) == false ) {}
m_signalAvailable.store( false, std::memory_order_relaxed );
}
const auto sz = std::min( size, BufSize - m_offset );
@ -165,11 +197,16 @@ private:
void ReadBlock()
{
char m_lz4buf[LZ4Size];
m_offset = 0;
uint32_t sz;
fread( &sz, 1, sizeof( sz ), m_file );
fread( m_lz4buf, 1, sz, m_file );
m_lastBlock = LZ4_decompress_safe_continue( m_stream, m_lz4buf, m_buf, sz, BufSize );
if( fread( &sz, 1, sizeof( sz ), m_file ) == sizeof( sz ) )
{
fread( m_lz4buf, 1, sz, m_file );
m_lastBlock = LZ4_decompress_safe_continue( m_stream, m_lz4buf, m_second, sz, BufSize );
}
else
{
m_lastBlock = 0;
}
}
enum { BufSize = 64 * 1024 };
@ -182,6 +219,12 @@ private:
char* m_second;
size_t m_offset;
int m_lastBlock;
std::atomic<bool> m_signalSwitch;
std::atomic<bool> m_signalAvailable;
std::atomic<bool> m_exit;
std::thread m_decThread;
};
}