Process queue data in-place.

This commit is contained in:
Bartosz Taudul 2020-02-23 14:50:50 +01:00
parent 96034bca3e
commit 02d200878d
3 changed files with 242 additions and 289 deletions

View File

@ -1013,8 +1013,6 @@ TRACY_API LuaZoneState& GetLuaZoneState() { return s_luaZoneState; }
# endif
#endif
enum { BulkSize = TargetFrameSize / QueueItemSize };
Profiler::Profiler()
: m_timeBegin( 0 )
, m_mainThread( detail::GetThreadHandleImpl() )
@ -1030,7 +1028,6 @@ Profiler::Profiler()
, m_buffer( (char*)tracy_malloc( TargetFrameSize*3 ) )
, m_bufferOffset( 0 )
, m_bufferStart( 0 )
, m_itemBuf( (QueueItem*)tracy_malloc( sizeof( QueueItem ) * BulkSize ) )
, m_lz4Buf( (char*)tracy_malloc( LZ4Size + sizeof( lz4sz_t ) ) )
, m_serialQueue( 1024*1024 )
, m_serialDequeue( 1024*1024 )
@ -1128,7 +1125,6 @@ Profiler::~Profiler()
tracy_free( s_thread );
tracy_free( m_lz4Buf );
tracy_free( m_itemBuf );
tracy_free( m_buffer );
LZ4_freeStream( (LZ4_stream_t*)m_stream );
@ -1640,9 +1636,8 @@ void Profiler::ClearQueues( moodycamel::ConsumerToken& token )
{
for(;;)
{
const auto sz = GetQueue().try_dequeue_bulk( token, m_itemBuf, BulkSize );
const auto sz = GetQueue().try_dequeue_bulk_single( token, [](auto){}, []( QueueItem* item, size_t sz ) { assert( sz > 0 ); while( sz-- > 0 ) FreeAssociatedMemory( *item++ ); } );
if( sz == 0 ) break;
for( size_t i=0; i<sz; i++ ) FreeAssociatedMemory( m_itemBuf[i] );
}
ClearSerial();
@ -1672,26 +1667,28 @@ void Profiler::ClearSerial()
Profiler::DequeueStatus Profiler::Dequeue( moodycamel::ConsumerToken& token )
{
uint64_t threadId;
const auto sz = GetQueue().try_dequeue_bulk_single( token, m_itemBuf, BulkSize, threadId );
if( sz > 0 )
bool connectionLost = false;
const auto sz = GetQueue().try_dequeue_bulk_single( token,
[this, &connectionLost] ( const uint64_t& threadId )
{
if( threadId != m_threadCtx )
{
QueueItem item;
MemWrite( &item.hdr.type, QueueType::ThreadContext );
MemWrite( &item.threadCtx.thread, threadId );
if( !AppendData( &item, QueueDataSize[(int)QueueType::ThreadContext] ) ) return DequeueStatus::ConnectionLost;
if( !AppendData( &item, QueueDataSize[(int)QueueType::ThreadContext] ) ) connectionLost = true;
m_threadCtx = threadId;
m_refTimeThread = 0;
}
},
[this, &connectionLost] ( QueueItem* item, size_t sz )
{
if( connectionLost ) return;
assert( sz > 0 );
int64_t refThread = m_refTimeThread;
int64_t refCtx = m_refTimeCtx;
int64_t refGpu = m_refTimeGpu;
auto end = m_itemBuf + sz;
auto item = m_itemBuf;
while( item != end )
while( sz-- > 0 )
{
uint64_t ptr;
const auto idx = MemRead<uint8_t>( &item->hdr.idx );
@ -1841,31 +1838,35 @@ Profiler::DequeueStatus Profiler::Dequeue( moodycamel::ConsumerToken& token )
break;
}
}
if( !AppendData( item, QueueDataSize[idx] ) ) return DequeueStatus::ConnectionLost;
item++;
if( !AppendData( item++, QueueDataSize[idx] ) )
{
connectionLost = true;
m_refTimeThread = refThread;
m_refTimeCtx = refCtx;
m_refTimeGpu = refGpu;
return;
}
}
m_refTimeThread = refThread;
m_refTimeCtx = refCtx;
m_refTimeGpu = refGpu;
}
else
{
return DequeueStatus::QueueEmpty;
}
return DequeueStatus::DataDequeued;
);
if( connectionLost ) return DequeueStatus::ConnectionLost;
return sz > 0 ? DequeueStatus::DataDequeued : DequeueStatus::QueueEmpty;
}
Profiler::DequeueStatus Profiler::DequeueContextSwitches( tracy::moodycamel::ConsumerToken& token, int64_t& timeStop )
{
const auto sz = GetQueue().try_dequeue_bulk( token, m_itemBuf, BulkSize );
if( sz > 0 )
const auto sz = GetQueue().try_dequeue_bulk_single( token, [] ( const uint64_t& ) {},
[this, &timeStop] ( QueueItem* item, size_t sz )
{
assert( sz > 0 );
int64_t refCtx = m_refTimeCtx;
auto end = m_itemBuf + sz;
auto item = m_itemBuf;
while( item != end )
while( sz-- > 0 )
{
FreeAssociatedMemory( *item );
if( timeStop < 0 ) return;
const auto idx = MemRead<uint8_t>( &item->hdr.idx );
if( idx == (uint8_t)QueueType::ContextSwitch )
{
@ -1873,12 +1874,18 @@ Profiler::DequeueStatus Profiler::DequeueContextSwitches( tracy::moodycamel::Con
if( csTime > timeStop )
{
timeStop = -1;
return DequeueStatus::DataDequeued;
m_refTimeCtx = refCtx;
return;
}
int64_t dt = csTime - refCtx;
refCtx = csTime;
MemWrite( &item->contextSwitch.time, dt );
if( !AppendData( item, QueueDataSize[(int)QueueType::ContextSwitch] ) ) return DequeueStatus::ConnectionLost;
if( !AppendData( item, QueueDataSize[(int)QueueType::ContextSwitch] ) )
{
timeStop = -2;
m_refTimeCtx = refCtx;
return;
}
}
else if( idx == (uint8_t)QueueType::ThreadWakeup )
{
@ -1886,22 +1893,27 @@ Profiler::DequeueStatus Profiler::DequeueContextSwitches( tracy::moodycamel::Con
if( csTime > timeStop )
{
timeStop = -1;
return DequeueStatus::DataDequeued;
m_refTimeCtx = refCtx;
return;
}
int64_t dt = csTime - refCtx;
refCtx = csTime;
MemWrite( &item->threadWakeup.time, dt );
if( !AppendData( item, QueueDataSize[(int)QueueType::ThreadWakeup] ) ) return DequeueStatus::ConnectionLost;
if( !AppendData( item, QueueDataSize[(int)QueueType::ThreadWakeup] ) )
{
timeStop = -2;
m_refTimeCtx = refCtx;
return;
}
}
item++;
}
m_refTimeCtx = refCtx;
}
else
{
return DequeueStatus::QueueEmpty;
}
return DequeueStatus::DataDequeued;
);
if( timeStop == -2 ) return DequeueStatus::ConnectionLost;
return ( timeStop == -1 || sz > 0 ) ? DequeueStatus::DataDequeued : DequeueStatus::QueueEmpty;
}
Profiler::DequeueStatus Profiler::DequeueSerial()
@ -2439,13 +2451,11 @@ void Profiler::CalibrateDelay()
const auto dt = t1 - t0;
m_delay = dt / Events;
enum { Bulk = 1000 };
moodycamel::ConsumerToken token( GetQueue() );
int left = Events;
QueueItem item[Bulk];
while( left != 0 )
{
const auto sz = GetQueue().try_dequeue_bulk( token, item, std::min( left, (int)Bulk ) );
const auto sz = GetQueue().try_dequeue_bulk_single( token, [](auto){}, [](auto, auto){} );
assert( sz > 0 );
left -= (int)sz;
}

View File

@ -631,7 +631,6 @@ private:
int m_bufferOffset;
int m_bufferStart;
QueueItem* m_itemBuf;
char* m_lz4Buf;
FastVector<QueueItem> m_serialQueue, m_serialDequeue;

View File

@ -569,13 +569,8 @@ public:
return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::enqueue_begin(currentTailIndex);
}
// Attempts to dequeue several elements from the queue using an explicit consumer token.
// Returns the number of items actually dequeued.
// Returns 0 if all producer streams appeared empty at the time they
// were checked (so, the queue is likely but not guaranteed to be empty).
// Never allocates. Thread-safe.
template<typename It>
size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
template<class NotifyThread, class ProcessData>
size_t try_dequeue_bulk_single(consumer_token_t& token, NotifyThread notifyThread, ProcessData processData )
{
if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
if (!update_current_producer_after_rotation(token)) {
@ -583,57 +578,7 @@ public:
}
}
size_t count = static_cast<ProducerBase*>(token.currentProducer)->dequeue_bulk(itemFirst, max);
if (count == max) {
if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
}
return max;
}
token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count);
max -= count;
auto tail = producerListTail.load(std::memory_order_acquire);
auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
if (ptr == nullptr) {
ptr = tail;
}
while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
auto dequeued = ptr->dequeue_bulk(itemFirst, max);
count += dequeued;
if (dequeued != 0) {
token.currentProducer = ptr;
token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued);
}
if (dequeued == max) {
break;
}
max -= dequeued;
ptr = ptr->next_prod();
if (ptr == nullptr) {
ptr = tail;
}
}
return count;
}
template<typename It>
size_t try_dequeue_bulk_single(consumer_token_t& token, It itemFirst, size_t max, uint64_t& threadId )
{
if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
if (!update_current_producer_after_rotation(token)) {
return 0;
}
}
size_t count = static_cast<ProducerBase*>(token.currentProducer)->dequeue_bulk(itemFirst, max);
if (count == max) {
if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
}
threadId = token.currentProducer->threadId;
return max;
}
size_t count = static_cast<ProducerBase*>(token.currentProducer)->dequeue_bulk(notifyThread, processData);
token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count);
auto tail = producerListTail.load(std::memory_order_acquire);
@ -644,9 +589,8 @@ public:
if( count == 0 )
{
while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
auto dequeued = ptr->dequeue_bulk(itemFirst, max);
auto dequeued = ptr->dequeue_bulk(notifyThread, processData);
if (dequeued != 0) {
threadId = ptr->threadId;
token.currentProducer = ptr;
token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued);
return dequeued;
@ -660,7 +604,6 @@ public:
}
else
{
threadId = token.currentProducer->threadId;
token.currentProducer = ptr;
token.itemsConsumedFromCurrent = 0;
return count;
@ -1011,10 +954,10 @@ private:
virtual ~ProducerBase() { };
template<typename It>
inline size_t dequeue_bulk(It& itemFirst, size_t max)
template<class NotifyThread, class ProcessData>
inline size_t dequeue_bulk(NotifyThread notifyThread, ProcessData processData)
{
return static_cast<ExplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
return static_cast<ExplicitProducer*>(this)->dequeue_bulk(notifyThread, processData);
}
inline ProducerBase* next_prod() const { return static_cast<ProducerBase*>(next); }
@ -1188,14 +1131,14 @@ private:
return this->tailIndex;
}
template<typename It>
size_t dequeue_bulk(It& itemFirst, size_t max)
template<class NotifyThread, class ProcessData>
size_t dequeue_bulk(NotifyThread notifyThread, ProcessData processData)
{
auto tail = this->tailIndex.load(std::memory_order_relaxed);
auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
if (details::circular_less_than<size_t>(0, desiredCount)) {
desiredCount = desiredCount < max ? desiredCount : max;
desiredCount = desiredCount < 8192 ? desiredCount : 8192;
std::atomic_thread_fence(std::memory_order_acquire);
auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
@ -1222,6 +1165,8 @@ private:
auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) / BLOCK_SIZE);
auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
notifyThread( this->threadId );
// Iterate the blocks and dequeue
auto index = firstIndex;
do {
@ -1231,9 +1176,8 @@ private:
auto block = localBlockIndex->entries[indexIndex].block;
const auto sz = endIndex - index;
memcpy( itemFirst, (*block)[index], sizeof( T ) * sz );
processData( (*block)[index], sz );
index += sz;
itemFirst += sz;
block->ConcurrentQueue::Block::set_many_empty(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);