Wait for send buffer to be full before sending data.

This commit is contained in:
Bartosz Taudul 2017-11-11 14:16:37 +01:00
parent 35391d08f1
commit fc4e31bb8f
2 changed files with 44 additions and 34 deletions

View File

@ -118,6 +118,7 @@ Profiler::Profiler()
, m_stream( LZ4_createStream() ) , m_stream( LZ4_createStream() )
, m_buffer( (char*)tracy_malloc( TargetFrameSize*3 ) ) , m_buffer( (char*)tracy_malloc( TargetFrameSize*3 ) )
, m_bufferOffset( 0 ) , m_bufferOffset( 0 )
, m_bufferStart( 0 )
, m_itemBuf( (QueueItem*)tracy_malloc( sizeof( QueueItem ) * BulkSize ) ) , m_itemBuf( (QueueItem*)tracy_malloc( sizeof( QueueItem ) * BulkSize ) )
, m_lz4Buf( (char*)tracy_malloc( LZ4Size + sizeof( lz4sz_t ) ) ) , m_lz4Buf( (char*)tracy_malloc( LZ4Size + sizeof( lz4sz_t ) ) )
{ {
@ -214,6 +215,7 @@ void Profiler::Worker()
else if( status == QueueEmpty ) else if( status == QueueEmpty )
{ {
if( ShouldExit() ) break; if( ShouldExit() ) break;
if( m_bufferOffset != m_bufferStart ) CommitData();
std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) ); std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) );
} }
@ -232,8 +234,19 @@ void Profiler::Worker()
{ {
if( m_sock->HasData() ) if( m_sock->HasData() )
{ {
while( m_sock->HasData() ) if( !HandleServerQuery() ) return; while( m_sock->HasData() )
{
if( !HandleServerQuery() )
{
if( m_bufferOffset != m_bufferStart ) CommitData();
return;
}
}
while( Dequeue( token ) == Success ) {} while( Dequeue( token ) == Success ) {}
if( m_bufferOffset != m_bufferStart )
{
if( !CommitData() ) return;
}
} }
else else
{ {
@ -247,17 +260,10 @@ Profiler::DequeueStatus Profiler::Dequeue( moodycamel::ConsumerToken& token )
const auto sz = s_queue.try_dequeue_bulk( token, m_itemBuf, BulkSize ); const auto sz = s_queue.try_dequeue_bulk( token, m_itemBuf, BulkSize );
if( sz > 0 ) if( sz > 0 )
{ {
auto buf = m_buffer + m_bufferOffset;
auto ptr = buf;
for( size_t i=0; i<sz; i++ ) for( size_t i=0; i<sz; i++ )
{ {
const auto dsz = QueueDataSize[m_itemBuf[i].hdr.idx]; if( !AppendData( m_itemBuf+i, QueueDataSize[m_itemBuf[i].hdr.idx] ) ) return ConnectionLost;
memcpy( ptr, m_itemBuf+i, dsz );
ptr += dsz;
} }
if( !SendData( buf, ptr - buf ) ) return ConnectionLost;
m_bufferOffset += int( ptr - buf );
if( m_bufferOffset > TargetFrameSize * 2 ) m_bufferOffset = 0;
} }
else else
{ {
@ -266,6 +272,23 @@ Profiler::DequeueStatus Profiler::Dequeue( moodycamel::ConsumerToken& token )
return Success; return Success;
} }
bool Profiler::AppendData( const void* data, size_t len )
{
auto ret = true;
if( m_bufferOffset - m_bufferStart + len > TargetFrameSize ) ret = CommitData();
memcpy( m_buffer + m_bufferOffset, data, len );
m_bufferOffset += len;
return ret;
}
bool Profiler::CommitData()
{
bool ret = SendData( m_buffer + m_bufferStart, m_bufferOffset - m_bufferStart );
if( m_bufferOffset > TargetFrameSize * 2 ) m_bufferOffset = 0;
m_bufferStart = m_bufferOffset;
return ret;
}
bool Profiler::SendData( const char* data, size_t len ) bool Profiler::SendData( const char* data, size_t len )
{ {
const lz4sz_t lz4sz = LZ4_compress_fast_continue( m_stream, data, m_lz4Buf + sizeof( lz4sz_t ), (int)len, LZ4Size, 1 ); const lz4sz_t lz4sz = LZ4_compress_fast_continue( m_stream, data, m_lz4Buf + sizeof( lz4sz_t ), (int)len, LZ4Size, 1 );
@ -280,23 +303,15 @@ bool Profiler::SendString( uint64_t str, const char* ptr, QueueType type )
QueueItem item; QueueItem item;
item.hdr.type = type; item.hdr.type = type;
item.stringTransfer.ptr = str; item.stringTransfer.ptr = str;
AppendData( &item, QueueDataSize[item.hdr.idx] );
const auto isz = QueueDataSize[item.hdr.idx];
auto buf = m_buffer + m_bufferOffset;
memcpy( buf, &item, isz );
auto len = strlen( ptr ); auto len = strlen( ptr );
assert( len < TargetFrameSize - isz - sizeof( uint16_t ) );
assert( len <= std::numeric_limits<uint16_t>::max() ); assert( len <= std::numeric_limits<uint16_t>::max() );
auto l16 = uint16_t( len ); auto l16 = uint16_t( len );
memcpy( buf + isz, &l16, sizeof( l16 ) ); AppendData( &l16, sizeof( l16 ) );
memcpy( buf + isz + sizeof( l16 ), ptr, l16 ); AppendData( ptr, l16 );
m_bufferOffset += int( isz + sizeof( l16 ) + l16 ); return true;
if( m_bufferOffset > TargetFrameSize * 2 ) m_bufferOffset = 0;
return SendData( buf, isz + sizeof( l16 ) + l16 );
} }
void Profiler::SendSourceLocation( uint64_t ptr ) void Profiler::SendSourceLocation( uint64_t ptr )
@ -321,24 +336,16 @@ bool Profiler::SendSourceLocationPayload( uint64_t _ptr )
QueueItem item; QueueItem item;
item.hdr.type = QueueType::SourceLocationPayload; item.hdr.type = QueueType::SourceLocationPayload;
item.stringTransfer.ptr = _ptr; item.stringTransfer.ptr = _ptr;
AppendData( &item, QueueDataSize[item.hdr.idx] );
const auto isz = QueueDataSize[item.hdr.idx];
auto buf = m_buffer + m_bufferOffset;
memcpy( buf, &item, isz );
const auto len = *((uint32_t*)ptr); const auto len = *((uint32_t*)ptr);
assert( len < TargetFrameSize - isz );
assert( len <= std::numeric_limits<uint16_t>::max() ); assert( len <= std::numeric_limits<uint16_t>::max() );
assert( len > 4 ); assert( len > 4 );
const auto l16 = (uint16_t)len - 4; const auto l16 = uint16_t( len - 4 );
memcpy( buf + isz, &l16, 2 ); AppendData( &l16, sizeof( l16 ) );
memcpy( buf + isz + 2, ptr + 4, len - 4 ); AppendData( ptr + 4, l16 );
m_bufferOffset += int( isz + len - 2 ); return true;
if( m_bufferOffset > TargetFrameSize * 2 ) m_bufferOffset = 0;
return SendData( buf, isz + len - 2 );
} }
static bool DontExit() { return false; } static bool DontExit() { return false; }

View File

@ -186,6 +186,8 @@ private:
void Worker(); void Worker();
DequeueStatus Dequeue( moodycamel::ConsumerToken& token ); DequeueStatus Dequeue( moodycamel::ConsumerToken& token );
bool AppendData( const void* data, size_t len );
bool CommitData();
bool SendData( const char* data, size_t len ); bool SendData( const char* data, size_t len );
bool SendString( uint64_t ptr, const char* str, QueueType type ); bool SendString( uint64_t ptr, const char* str, QueueType type );
@ -209,6 +211,7 @@ private:
LZ4_stream_t* m_stream; LZ4_stream_t* m_stream;
char* m_buffer; char* m_buffer;
int m_bufferOffset; int m_bufferOffset;
int m_bufferStart;
QueueItem* m_itemBuf; QueueItem* m_itemBuf;
char* m_lz4Buf; char* m_lz4Buf;