From 806c8de46382910604b244d03a3139e2c6fda892 Mon Sep 17 00:00:00 2001 From: Bartosz Taudul Date: Sun, 9 Sep 2018 17:47:20 +0200 Subject: [PATCH] Only one outgoing server connection is supported. --- server/TracyWorker.cpp | 220 ++++++++++++++++++++--------------------- 1 file changed, 110 insertions(+), 110 deletions(-) diff --git a/server/TracyWorker.cpp b/server/TracyWorker.cpp index 88d94435..48f2ef9b 100644 --- a/server/TracyWorker.cpp +++ b/server/TracyWorker.cpp @@ -1273,141 +1273,141 @@ void Worker::Exec() return m_shutdown.load( std::memory_order_relaxed ); }; - auto lz4buf = std::make_unique( LZ4Size ); for(;;) { if( m_shutdown.load( std::memory_order_relaxed ) ) return; - if( !m_sock.Connect( m_addr.c_str(), "8086" ) ) continue; + if( m_sock.Connect( m_addr.c_str(), "8086" ) ) break; + } - std::chrono::time_point t0; + auto lz4buf = std::make_unique( LZ4Size ); + std::chrono::time_point t0; - uint64_t bytes = 0; - uint64_t decBytes = 0; + uint64_t bytes = 0; + uint64_t decBytes = 0; - m_data.framesBase = m_data.frames.Retrieve( 0, [this] ( uint64_t name ) { - auto fd = m_slab.AllocInit(); - fd->name = name; - fd->continuous = 1; - return fd; - }, [this] ( uint64_t name ) { - assert( name == 0 ); - char tmp[6] = "Frame"; - HandleFrameName( name, tmp, 5 ); - } ); + m_data.framesBase = m_data.frames.Retrieve( 0, [this] ( uint64_t name ) { + auto fd = m_slab.AllocInit(); + fd->name = name; + fd->continuous = 1; + return fd; + }, [this] ( uint64_t name ) { + assert( name == 0 ); + char tmp[6] = "Frame"; + HandleFrameName( name, tmp, 5 ); + } ); + + { + WelcomeMessage welcome; + if( !m_sock.Read( &welcome, sizeof( welcome ), &tv, ShouldExit ) ) goto close; + m_timerMul = welcome.timerMul; + const auto initEnd = TscTime( welcome.initEnd ); + m_data.framesBase->frames.push_back( FrameEvent{ TscTime( welcome.initBegin ), -1 } ); + m_data.framesBase->frames.push_back( FrameEvent{ initEnd, -1 } ); + m_data.lastTime = initEnd; + m_delay = TscTime( welcome.delay ); + m_resolution = TscTime( welcome.resolution ); + m_onDemand = welcome.onDemand; + m_captureProgram = welcome.programName; + m_captureTime = welcome.epoch; + + char dtmp[64]; + time_t date = welcome.epoch; + auto lt = localtime( &date ); + strftime( dtmp, 64, "%F %T", lt ); + char tmp[1024]; + sprintf( tmp, "%s @ %s", welcome.programName, dtmp ); + m_captureName = tmp; + + m_hostInfo = welcome.hostInfo; + + if( welcome.onDemand != 0 ) + { + OnDemandPayloadMessage onDemand; + if( !m_sock.Read( &onDemand, sizeof( onDemand ), &tv, ShouldExit ) ) goto close; + m_data.frameOffset = onDemand.frames; + } + } + + m_hasData.store( true, std::memory_order_release ); + + LZ4_setStreamDecode( m_stream, nullptr, 0 ); + m_connected.store( true, std::memory_order_relaxed ); + + t0 = std::chrono::high_resolution_clock::now(); + + for(;;) + { + if( m_shutdown.load( std::memory_order_relaxed ) ) return; + + auto buf = m_buffer + m_bufferOffset; + lz4sz_t lz4sz; + if( !m_sock.Read( &lz4sz, sizeof( lz4sz ), &tv, ShouldExit ) ) goto close; + if( !m_sock.Read( lz4buf.get(), lz4sz, &tv, ShouldExit ) ) goto close; + bytes += sizeof( lz4sz ) + lz4sz; + + auto sz = LZ4_decompress_safe_continue( m_stream, lz4buf.get(), buf, lz4sz, TargetFrameSize ); + assert( sz >= 0 ); + decBytes += sz; + + char* ptr = buf; + const char* end = buf + sz; { - WelcomeMessage welcome; - if( !m_sock.Read( &welcome, sizeof( welcome ), &tv, ShouldExit ) ) goto close; - m_timerMul = welcome.timerMul; - const auto initEnd = TscTime( welcome.initEnd ); - m_data.framesBase->frames.push_back( FrameEvent{ TscTime( welcome.initBegin ), -1 } ); - m_data.framesBase->frames.push_back( FrameEvent{ initEnd, -1 } ); - m_data.lastTime = initEnd; - m_delay = TscTime( welcome.delay ); - m_resolution = TscTime( welcome.resolution ); - m_onDemand = welcome.onDemand; - m_captureProgram = welcome.programName; - m_captureTime = welcome.epoch; - - char dtmp[64]; - time_t date = welcome.epoch; - auto lt = localtime( &date ); - strftime( dtmp, 64, "%F %T", lt ); - char tmp[1024]; - sprintf( tmp, "%s @ %s", welcome.programName, dtmp ); - m_captureName = tmp; - - m_hostInfo = welcome.hostInfo; - - if( welcome.onDemand != 0 ) + std::lock_guard lock( m_data.lock ); + while( ptr < end ) { - OnDemandPayloadMessage onDemand; - if( !m_sock.Read( &onDemand, sizeof( onDemand ), &tv, ShouldExit ) ) goto close; - m_data.frameOffset = onDemand.frames; + auto ev = (const QueueItem*)ptr; + DispatchProcess( *ev, ptr ); } + + m_bufferOffset += sz; + if( m_bufferOffset > TargetFrameSize * 2 ) m_bufferOffset = 0; + + HandlePostponedPlots(); } - m_hasData.store( true, std::memory_order_release ); - - LZ4_setStreamDecode( m_stream, nullptr, 0 ); - m_connected.store( true, std::memory_order_relaxed ); - - t0 = std::chrono::high_resolution_clock::now(); - - for(;;) + auto t1 = std::chrono::high_resolution_clock::now(); + auto td = std::chrono::duration_cast( t1 - t0 ).count(); + enum { MbpsUpdateTime = 200 }; + if( td > MbpsUpdateTime ) { - if( m_shutdown.load( std::memory_order_relaxed ) ) return; - - auto buf = m_buffer + m_bufferOffset; - lz4sz_t lz4sz; - if( !m_sock.Read( &lz4sz, sizeof( lz4sz ), &tv, ShouldExit ) ) goto close; - if( !m_sock.Read( lz4buf.get(), lz4sz, &tv, ShouldExit ) ) goto close; - bytes += sizeof( lz4sz ) + lz4sz; - - auto sz = LZ4_decompress_safe_continue( m_stream, lz4buf.get(), buf, lz4sz, TargetFrameSize ); - assert( sz >= 0 ); - decBytes += sz; - - char* ptr = buf; - const char* end = buf + sz; + std::lock_guard lock( m_mbpsData.lock ); + m_mbpsData.mbps.erase( m_mbpsData.mbps.begin() ); + m_mbpsData.mbps.emplace_back( bytes / ( td * 125.f ) ); + m_mbpsData.compRatio = float( bytes ) / decBytes; + t0 = t1; + bytes = 0; + decBytes = 0; + } + if( m_terminate ) + { + if( m_pendingStrings != 0 || m_pendingThreads != 0 || m_pendingSourceLocation != 0 || m_pendingCallstackFrames != 0 || + !m_pendingCustomStrings.empty() || m_data.plots.IsPending() || !m_pendingCallstacks.empty() ) { - std::lock_guard lock( m_data.lock ); - while( ptr < end ) - { - auto ev = (const QueueItem*)ptr; - DispatchProcess( *ev, ptr ); - } - - m_bufferOffset += sz; - if( m_bufferOffset > TargetFrameSize * 2 ) m_bufferOffset = 0; - - HandlePostponedPlots(); + continue; } - - auto t1 = std::chrono::high_resolution_clock::now(); - auto td = std::chrono::duration_cast( t1 - t0 ).count(); - enum { MbpsUpdateTime = 200 }; - if( td > MbpsUpdateTime ) + if( !m_crashed ) { - std::lock_guard lock( m_mbpsData.lock ); - m_mbpsData.mbps.erase( m_mbpsData.mbps.begin() ); - m_mbpsData.mbps.emplace_back( bytes / ( td * 125.f ) ); - m_mbpsData.compRatio = float( bytes ) / decBytes; - t0 = t1; - bytes = 0; - decBytes = 0; - } - - if( m_terminate ) - { - if( m_pendingStrings != 0 || m_pendingThreads != 0 || m_pendingSourceLocation != 0 || m_pendingCallstackFrames != 0 || - !m_pendingCustomStrings.empty() || m_data.plots.IsPending() || !m_pendingCallstacks.empty() ) + bool done = true; + for( auto& v : m_data.threads ) { - continue; - } - if( !m_crashed ) - { - bool done = true; - for( auto& v : m_data.threads ) + if( !v->stack.empty() ) { - if( !v->stack.empty() ) - { - done = false; - break; - } + done = false; + break; } - if( !done ) continue; } - ServerQuery( ServerQueryTerminate, 0 ); - break; + if( !done ) continue; } + ServerQuery( ServerQueryTerminate, 0 ); + break; } + } close: - m_sock.Close(); - m_connected.store( false, std::memory_order_relaxed ); - } + m_sock.Close(); + m_connected.store( false, std::memory_order_relaxed ); } void Worker::ServerQuery( uint8_t type, uint64_t data )