From 02d200878d1236858144e5337faac2acb52ef8df Mon Sep 17 00:00:00 2001 From: Bartosz Taudul Date: Sun, 23 Feb 2020 14:50:50 +0100 Subject: [PATCH] Process queue data in-place. --- client/TracyProfiler.cpp | 440 +++++++++++++++++---------------- client/TracyProfiler.hpp | 1 - client/tracy_concurrentqueue.h | 90 ++----- 3 files changed, 242 insertions(+), 289 deletions(-) diff --git a/client/TracyProfiler.cpp b/client/TracyProfiler.cpp index e9c0f818..309ef549 100644 --- a/client/TracyProfiler.cpp +++ b/client/TracyProfiler.cpp @@ -1013,8 +1013,6 @@ TRACY_API LuaZoneState& GetLuaZoneState() { return s_luaZoneState; } # endif #endif -enum { BulkSize = TargetFrameSize / QueueItemSize }; - Profiler::Profiler() : m_timeBegin( 0 ) , m_mainThread( detail::GetThreadHandleImpl() ) @@ -1030,7 +1028,6 @@ Profiler::Profiler() , m_buffer( (char*)tracy_malloc( TargetFrameSize*3 ) ) , m_bufferOffset( 0 ) , m_bufferStart( 0 ) - , m_itemBuf( (QueueItem*)tracy_malloc( sizeof( QueueItem ) * BulkSize ) ) , m_lz4Buf( (char*)tracy_malloc( LZ4Size + sizeof( lz4sz_t ) ) ) , m_serialQueue( 1024*1024 ) , m_serialDequeue( 1024*1024 ) @@ -1128,7 +1125,6 @@ Profiler::~Profiler() tracy_free( s_thread ); tracy_free( m_lz4Buf ); - tracy_free( m_itemBuf ); tracy_free( m_buffer ); LZ4_freeStream( (LZ4_stream_t*)m_stream ); @@ -1640,9 +1636,8 @@ void Profiler::ClearQueues( moodycamel::ConsumerToken& token ) { for(;;) { - const auto sz = GetQueue().try_dequeue_bulk( token, m_itemBuf, BulkSize ); + const auto sz = GetQueue().try_dequeue_bulk_single( token, [](auto){}, []( QueueItem* item, size_t sz ) { assert( sz > 0 ); while( sz-- > 0 ) FreeAssociatedMemory( *item++ ); } ); if( sz == 0 ) break; - for( size_t i=0; i 0 ) - { - if( threadId != m_threadCtx ) + bool connectionLost = false; + const auto sz = GetQueue().try_dequeue_bulk_single( token, + [this, &connectionLost] ( const uint64_t& threadId ) { - QueueItem item; - MemWrite( &item.hdr.type, QueueType::ThreadContext ); - MemWrite( &item.threadCtx.thread, threadId ); - if( !AppendData( &item, QueueDataSize[(int)QueueType::ThreadContext] ) ) return DequeueStatus::ConnectionLost; - m_threadCtx = threadId; - m_refTimeThread = 0; - } - - int64_t refThread = m_refTimeThread; - int64_t refCtx = m_refTimeCtx; - int64_t refGpu = m_refTimeGpu; - auto end = m_itemBuf + sz; - auto item = m_itemBuf; - while( item != end ) - { - uint64_t ptr; - const auto idx = MemRead( &item->hdr.idx ); - if( idx < (int)QueueType::Terminate ) + if( threadId != m_threadCtx ) { - switch( (QueueType)idx ) + QueueItem item; + MemWrite( &item.hdr.type, QueueType::ThreadContext ); + MemWrite( &item.threadCtx.thread, threadId ); + if( !AppendData( &item, QueueDataSize[(int)QueueType::ThreadContext] ) ) connectionLost = true; + m_threadCtx = threadId; + m_refTimeThread = 0; + } + }, + [this, &connectionLost] ( QueueItem* item, size_t sz ) + { + if( connectionLost ) return; + assert( sz > 0 ); + int64_t refThread = m_refTimeThread; + int64_t refCtx = m_refTimeCtx; + int64_t refGpu = m_refTimeGpu; + while( sz-- > 0 ) + { + uint64_t ptr; + const auto idx = MemRead( &item->hdr.idx ); + if( idx < (int)QueueType::Terminate ) { - case QueueType::ZoneText: - case QueueType::ZoneName: - ptr = MemRead( &item->zoneText.text ); - SendString( ptr, (const char*)ptr, QueueType::CustomStringData ); - tracy_free( (void*)ptr ); - break; - case QueueType::Message: - case QueueType::MessageColor: - case QueueType::MessageCallstack: - case QueueType::MessageColorCallstack: - ptr = MemRead( &item->message.text ); - SendString( ptr, (const char*)ptr, QueueType::CustomStringData ); - tracy_free( (void*)ptr ); - break; - case QueueType::MessageAppInfo: - ptr = MemRead( &item->message.text ); - SendString( ptr, (const char*)ptr, QueueType::CustomStringData ); -#ifndef TRACY_ON_DEMAND - tracy_free( (void*)ptr ); -#endif - break; - case QueueType::ZoneBeginAllocSrcLoc: - case QueueType::ZoneBeginAllocSrcLocCallstack: - { - int64_t t = MemRead( &item->zoneBegin.time ); - int64_t dt = t - refThread; - refThread = t; - MemWrite( &item->zoneBegin.time, dt ); - ptr = MemRead( &item->zoneBegin.srcloc ); - SendSourceLocationPayload( ptr ); - tracy_free( (void*)ptr ); - break; - } - case QueueType::Callstack: - ptr = MemRead( &item->callstack.ptr ); - SendCallstackPayload( ptr ); - tracy_free( (void*)ptr ); - break; - case QueueType::CallstackAlloc: - ptr = MemRead( &item->callstackAlloc.nativePtr ); - if( ptr != 0 ) + switch( (QueueType)idx ) { - CutCallstack( (void*)ptr, "lua_pcall" ); + case QueueType::ZoneText: + case QueueType::ZoneName: + ptr = MemRead( &item->zoneText.text ); + SendString( ptr, (const char*)ptr, QueueType::CustomStringData ); + tracy_free( (void*)ptr ); + break; + case QueueType::Message: + case QueueType::MessageColor: + case QueueType::MessageCallstack: + case QueueType::MessageColorCallstack: + ptr = MemRead( &item->message.text ); + SendString( ptr, (const char*)ptr, QueueType::CustomStringData ); + tracy_free( (void*)ptr ); + break; + case QueueType::MessageAppInfo: + ptr = MemRead( &item->message.text ); + SendString( ptr, (const char*)ptr, QueueType::CustomStringData ); +#ifndef TRACY_ON_DEMAND + tracy_free( (void*)ptr ); +#endif + break; + case QueueType::ZoneBeginAllocSrcLoc: + case QueueType::ZoneBeginAllocSrcLocCallstack: + { + int64_t t = MemRead( &item->zoneBegin.time ); + int64_t dt = t - refThread; + refThread = t; + MemWrite( &item->zoneBegin.time, dt ); + ptr = MemRead( &item->zoneBegin.srcloc ); + SendSourceLocationPayload( ptr ); + tracy_free( (void*)ptr ); + break; + } + case QueueType::Callstack: + ptr = MemRead( &item->callstack.ptr ); SendCallstackPayload( ptr ); tracy_free( (void*)ptr ); + break; + case QueueType::CallstackAlloc: + ptr = MemRead( &item->callstackAlloc.nativePtr ); + if( ptr != 0 ) + { + CutCallstack( (void*)ptr, "lua_pcall" ); + SendCallstackPayload( ptr ); + tracy_free( (void*)ptr ); + } + ptr = MemRead( &item->callstackAlloc.ptr ); + SendCallstackAlloc( ptr ); + tracy_free( (void*)ptr ); + break; + case QueueType::CallstackSample: + { + ptr = MemRead( &item->callstackSample.ptr ); + SendCallstackPayload64( ptr ); + tracy_free( (void*)ptr ); + int64_t t = MemRead( &item->callstackSample.time ); + int64_t dt = t - refCtx; + refCtx = t; + MemWrite( &item->callstackSample.time, dt ); + break; + } + case QueueType::FrameImage: + { + ptr = MemRead( &item->frameImage.image ); + const auto w = MemRead( &item->frameImage.w ); + const auto h = MemRead( &item->frameImage.h ); + const auto csz = size_t( w * h / 2 ); + SendLongString( ptr, (const char*)ptr, csz, QueueType::FrameImageData ); + tracy_free( (void*)ptr ); + break; + } + case QueueType::ZoneBegin: + case QueueType::ZoneBeginCallstack: + { + int64_t t = MemRead( &item->zoneBegin.time ); + int64_t dt = t - refThread; + refThread = t; + MemWrite( &item->zoneBegin.time, dt ); + break; + } + case QueueType::ZoneEnd: + { + int64_t t = MemRead( &item->zoneEnd.time ); + int64_t dt = t - refThread; + refThread = t; + MemWrite( &item->zoneEnd.time, dt ); + break; + } + case QueueType::GpuZoneBegin: + case QueueType::GpuZoneBeginCallstack: + { + int64_t t = MemRead( &item->gpuZoneBegin.cpuTime ); + int64_t dt = t - refThread; + refThread = t; + MemWrite( &item->gpuZoneBegin.cpuTime, dt ); + break; + } + case QueueType::GpuZoneEnd: + { + int64_t t = MemRead( &item->gpuZoneEnd.cpuTime ); + int64_t dt = t - refThread; + refThread = t; + MemWrite( &item->gpuZoneEnd.cpuTime, dt ); + break; + } + case QueueType::PlotData: + { + int64_t t = MemRead( &item->plotData.time ); + int64_t dt = t - refThread; + refThread = t; + MemWrite( &item->plotData.time, dt ); + break; + } + case QueueType::ContextSwitch: + { + int64_t t = MemRead( &item->contextSwitch.time ); + int64_t dt = t - refCtx; + refCtx = t; + MemWrite( &item->contextSwitch.time, dt ); + break; + } + case QueueType::ThreadWakeup: + { + int64_t t = MemRead( &item->threadWakeup.time ); + int64_t dt = t - refCtx; + refCtx = t; + MemWrite( &item->threadWakeup.time, dt ); + break; + } + case QueueType::GpuTime: + { + int64_t t = MemRead( &item->gpuTime.gpuTime ); + int64_t dt = t - refGpu; + refGpu = t; + MemWrite( &item->gpuTime.gpuTime, dt ); + break; + } + default: + assert( false ); + break; } - ptr = MemRead( &item->callstackAlloc.ptr ); - SendCallstackAlloc( ptr ); - tracy_free( (void*)ptr ); - break; - case QueueType::CallstackSample: - { - ptr = MemRead( &item->callstackSample.ptr ); - SendCallstackPayload64( ptr ); - tracy_free( (void*)ptr ); - int64_t t = MemRead( &item->callstackSample.time ); - int64_t dt = t - refCtx; - refCtx = t; - MemWrite( &item->callstackSample.time, dt ); - break; } - case QueueType::FrameImage: + if( !AppendData( item++, QueueDataSize[idx] ) ) { - ptr = MemRead( &item->frameImage.image ); - const auto w = MemRead( &item->frameImage.w ); - const auto h = MemRead( &item->frameImage.h ); - const auto csz = size_t( w * h / 2 ); - SendLongString( ptr, (const char*)ptr, csz, QueueType::FrameImageData ); - tracy_free( (void*)ptr ); - break; - } - case QueueType::ZoneBegin: - case QueueType::ZoneBeginCallstack: - { - int64_t t = MemRead( &item->zoneBegin.time ); - int64_t dt = t - refThread; - refThread = t; - MemWrite( &item->zoneBegin.time, dt ); - break; - } - case QueueType::ZoneEnd: - { - int64_t t = MemRead( &item->zoneEnd.time ); - int64_t dt = t - refThread; - refThread = t; - MemWrite( &item->zoneEnd.time, dt ); - break; - } - case QueueType::GpuZoneBegin: - case QueueType::GpuZoneBeginCallstack: - { - int64_t t = MemRead( &item->gpuZoneBegin.cpuTime ); - int64_t dt = t - refThread; - refThread = t; - MemWrite( &item->gpuZoneBegin.cpuTime, dt ); - break; - } - case QueueType::GpuZoneEnd: - { - int64_t t = MemRead( &item->gpuZoneEnd.cpuTime ); - int64_t dt = t - refThread; - refThread = t; - MemWrite( &item->gpuZoneEnd.cpuTime, dt ); - break; - } - case QueueType::PlotData: - { - int64_t t = MemRead( &item->plotData.time ); - int64_t dt = t - refThread; - refThread = t; - MemWrite( &item->plotData.time, dt ); - break; - } - case QueueType::ContextSwitch: - { - int64_t t = MemRead( &item->contextSwitch.time ); - int64_t dt = t - refCtx; - refCtx = t; - MemWrite( &item->contextSwitch.time, dt ); - break; - } - case QueueType::ThreadWakeup: - { - int64_t t = MemRead( &item->threadWakeup.time ); - int64_t dt = t - refCtx; - refCtx = t; - MemWrite( &item->threadWakeup.time, dt ); - break; - } - case QueueType::GpuTime: - { - int64_t t = MemRead( &item->gpuTime.gpuTime ); - int64_t dt = t - refGpu; - refGpu = t; - MemWrite( &item->gpuTime.gpuTime, dt ); - break; - } - default: - assert( false ); - break; + connectionLost = true; + m_refTimeThread = refThread; + m_refTimeCtx = refCtx; + m_refTimeGpu = refGpu; + return; } } - if( !AppendData( item, QueueDataSize[idx] ) ) return DequeueStatus::ConnectionLost; - item++; + m_refTimeThread = refThread; + m_refTimeCtx = refCtx; + m_refTimeGpu = refGpu; } - m_refTimeThread = refThread; - m_refTimeCtx = refCtx; - m_refTimeGpu = refGpu; - } - else - { - return DequeueStatus::QueueEmpty; - } - return DequeueStatus::DataDequeued; + ); + if( connectionLost ) return DequeueStatus::ConnectionLost; + return sz > 0 ? DequeueStatus::DataDequeued : DequeueStatus::QueueEmpty; } Profiler::DequeueStatus Profiler::DequeueContextSwitches( tracy::moodycamel::ConsumerToken& token, int64_t& timeStop ) { - const auto sz = GetQueue().try_dequeue_bulk( token, m_itemBuf, BulkSize ); - if( sz > 0 ) - { - int64_t refCtx = m_refTimeCtx; - auto end = m_itemBuf + sz; - auto item = m_itemBuf; - while( item != end ) + const auto sz = GetQueue().try_dequeue_bulk_single( token, [] ( const uint64_t& ) {}, + [this, &timeStop] ( QueueItem* item, size_t sz ) { - FreeAssociatedMemory( *item ); - const auto idx = MemRead( &item->hdr.idx ); - if( idx == (uint8_t)QueueType::ContextSwitch ) + assert( sz > 0 ); + int64_t refCtx = m_refTimeCtx; + while( sz-- > 0 ) { - const auto csTime = MemRead( &item->contextSwitch.time ); - if( csTime > timeStop ) + FreeAssociatedMemory( *item ); + if( timeStop < 0 ) return; + const auto idx = MemRead( &item->hdr.idx ); + if( idx == (uint8_t)QueueType::ContextSwitch ) { - timeStop = -1; - return DequeueStatus::DataDequeued; + const auto csTime = MemRead( &item->contextSwitch.time ); + if( csTime > timeStop ) + { + timeStop = -1; + m_refTimeCtx = refCtx; + return; + } + int64_t dt = csTime - refCtx; + refCtx = csTime; + MemWrite( &item->contextSwitch.time, dt ); + if( !AppendData( item, QueueDataSize[(int)QueueType::ContextSwitch] ) ) + { + timeStop = -2; + m_refTimeCtx = refCtx; + return; + } } - int64_t dt = csTime - refCtx; - refCtx = csTime; - MemWrite( &item->contextSwitch.time, dt ); - if( !AppendData( item, QueueDataSize[(int)QueueType::ContextSwitch] ) ) return DequeueStatus::ConnectionLost; - } - else if( idx == (uint8_t)QueueType::ThreadWakeup ) - { - const auto csTime = MemRead( &item->threadWakeup.time ); - if( csTime > timeStop ) + else if( idx == (uint8_t)QueueType::ThreadWakeup ) { - timeStop = -1; - return DequeueStatus::DataDequeued; + const auto csTime = MemRead( &item->threadWakeup.time ); + if( csTime > timeStop ) + { + timeStop = -1; + m_refTimeCtx = refCtx; + return; + } + int64_t dt = csTime - refCtx; + refCtx = csTime; + MemWrite( &item->threadWakeup.time, dt ); + if( !AppendData( item, QueueDataSize[(int)QueueType::ThreadWakeup] ) ) + { + timeStop = -2; + m_refTimeCtx = refCtx; + return; + } } - int64_t dt = csTime - refCtx; - refCtx = csTime; - MemWrite( &item->threadWakeup.time, dt ); - if( !AppendData( item, QueueDataSize[(int)QueueType::ThreadWakeup] ) ) return DequeueStatus::ConnectionLost; + item++; } - item++; + m_refTimeCtx = refCtx; } - m_refTimeCtx = refCtx; - } - else - { - return DequeueStatus::QueueEmpty; - } - return DequeueStatus::DataDequeued; + ); + + if( timeStop == -2 ) return DequeueStatus::ConnectionLost; + return ( timeStop == -1 || sz > 0 ) ? DequeueStatus::DataDequeued : DequeueStatus::QueueEmpty; } Profiler::DequeueStatus Profiler::DequeueSerial() @@ -2439,13 +2451,11 @@ void Profiler::CalibrateDelay() const auto dt = t1 - t0; m_delay = dt / Events; - enum { Bulk = 1000 }; moodycamel::ConsumerToken token( GetQueue() ); int left = Events; - QueueItem item[Bulk]; while( left != 0 ) { - const auto sz = GetQueue().try_dequeue_bulk( token, item, std::min( left, (int)Bulk ) ); + const auto sz = GetQueue().try_dequeue_bulk_single( token, [](auto){}, [](auto, auto){} ); assert( sz > 0 ); left -= (int)sz; } diff --git a/client/TracyProfiler.hpp b/client/TracyProfiler.hpp index ce82e9ff..9aaa43a8 100644 --- a/client/TracyProfiler.hpp +++ b/client/TracyProfiler.hpp @@ -631,7 +631,6 @@ private: int m_bufferOffset; int m_bufferStart; - QueueItem* m_itemBuf; char* m_lz4Buf; FastVector m_serialQueue, m_serialDequeue; diff --git a/client/tracy_concurrentqueue.h b/client/tracy_concurrentqueue.h index 10ba541a..9e62d7c2 100644 --- a/client/tracy_concurrentqueue.h +++ b/client/tracy_concurrentqueue.h @@ -568,57 +568,9 @@ public: { return static_cast(token.producer)->ConcurrentQueue::ExplicitProducer::enqueue_begin(currentTailIndex); } - - // Attempts to dequeue several elements from the queue using an explicit consumer token. - // Returns the number of items actually dequeued. - // Returns 0 if all producer streams appeared empty at the time they - // were checked (so, the queue is likely but not guaranteed to be empty). - // Never allocates. Thread-safe. - template - size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max) - { - if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) { - if (!update_current_producer_after_rotation(token)) { - return 0; - } - } - - size_t count = static_cast(token.currentProducer)->dequeue_bulk(itemFirst, max); - if (count == max) { - if ((token.itemsConsumedFromCurrent += static_cast(max)) >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) { - globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed); - } - return max; - } - token.itemsConsumedFromCurrent += static_cast(count); - max -= count; - - auto tail = producerListTail.load(std::memory_order_acquire); - auto ptr = static_cast(token.currentProducer)->next_prod(); - if (ptr == nullptr) { - ptr = tail; - } - while (ptr != static_cast(token.currentProducer)) { - auto dequeued = ptr->dequeue_bulk(itemFirst, max); - count += dequeued; - if (dequeued != 0) { - token.currentProducer = ptr; - token.itemsConsumedFromCurrent = static_cast(dequeued); - } - if (dequeued == max) { - break; - } - max -= dequeued; - ptr = ptr->next_prod(); - if (ptr == nullptr) { - ptr = tail; - } - } - return count; - } - - template - size_t try_dequeue_bulk_single(consumer_token_t& token, It itemFirst, size_t max, uint64_t& threadId ) + + template + size_t try_dequeue_bulk_single(consumer_token_t& token, NotifyThread notifyThread, ProcessData processData ) { if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) { if (!update_current_producer_after_rotation(token)) { @@ -626,14 +578,7 @@ public: } } - size_t count = static_cast(token.currentProducer)->dequeue_bulk(itemFirst, max); - if (count == max) { - if ((token.itemsConsumedFromCurrent += static_cast(max)) >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) { - globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed); - } - threadId = token.currentProducer->threadId; - return max; - } + size_t count = static_cast(token.currentProducer)->dequeue_bulk(notifyThread, processData); token.itemsConsumedFromCurrent += static_cast(count); auto tail = producerListTail.load(std::memory_order_acquire); @@ -644,9 +589,8 @@ public: if( count == 0 ) { while (ptr != static_cast(token.currentProducer)) { - auto dequeued = ptr->dequeue_bulk(itemFirst, max); + auto dequeued = ptr->dequeue_bulk(notifyThread, processData); if (dequeued != 0) { - threadId = ptr->threadId; token.currentProducer = ptr; token.itemsConsumedFromCurrent = static_cast(dequeued); return dequeued; @@ -660,7 +604,6 @@ public: } else { - threadId = token.currentProducer->threadId; token.currentProducer = ptr; token.itemsConsumedFromCurrent = 0; return count; @@ -1011,10 +954,10 @@ private: virtual ~ProducerBase() { }; - template - inline size_t dequeue_bulk(It& itemFirst, size_t max) + template + inline size_t dequeue_bulk(NotifyThread notifyThread, ProcessData processData) { - return static_cast(this)->dequeue_bulk(itemFirst, max); + return static_cast(this)->dequeue_bulk(notifyThread, processData); } inline ProducerBase* next_prod() const { return static_cast(next); } @@ -1188,14 +1131,14 @@ private: return this->tailIndex; } - template - size_t dequeue_bulk(It& itemFirst, size_t max) + template + size_t dequeue_bulk(NotifyThread notifyThread, ProcessData processData) { auto tail = this->tailIndex.load(std::memory_order_relaxed); auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed); auto desiredCount = static_cast(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit)); if (details::circular_less_than(0, desiredCount)) { - desiredCount = desiredCount < max ? desiredCount : max; + desiredCount = desiredCount < 8192 ? desiredCount : 8192; std::atomic_thread_fence(std::memory_order_acquire); auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed); @@ -1221,7 +1164,9 @@ private: auto firstBlockBaseIndex = firstIndex & ~static_cast(BLOCK_SIZE - 1); auto offset = static_cast(static_cast::type>(firstBlockBaseIndex - headBase) / BLOCK_SIZE); auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1); - + + notifyThread( this->threadId ); + // Iterate the blocks and dequeue auto index = firstIndex; do { @@ -1230,10 +1175,9 @@ private: endIndex = details::circular_less_than(firstIndex + static_cast(actualCount), endIndex) ? firstIndex + static_cast(actualCount) : endIndex; auto block = localBlockIndex->entries[indexIndex].block; - const auto sz = endIndex - index; - memcpy( itemFirst, (*block)[index], sizeof( T ) * sz ); - index += sz; - itemFirst += sz; + const auto sz = endIndex - index; + processData( (*block)[index], sz ); + index += sz; block->ConcurrentQueue::Block::set_many_empty(firstIndexInBlock, static_cast(endIndex - firstIndexInBlock)); indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);