diff --git a/client/tracy_concurrentqueue.h b/client/tracy_concurrentqueue.h index 98d36f1e..616ed11d 100644 --- a/client/tracy_concurrentqueue.h +++ b/client/tracy_concurrentqueue.h @@ -123,7 +123,7 @@ struct ConcurrentQueueDefaultTraits { // General-purpose size type. std::size_t is strongly recommended. typedef std::size_t size_t; - + // The type used for the enqueue and dequeue indices. Must be at least as // large as size_t. Should be significantly larger than the number of elements // you expect to hold at once, especially if you have a high turnover rate; @@ -135,37 +135,37 @@ struct ConcurrentQueueDefaultTraits // whether the queue is lock-free with a 64-int type depends on the whether // std::atomic is lock-free, which is platform-specific. typedef std::size_t index_t; - + // Internally, all elements are enqueued and dequeued from multi-element // blocks; this is the smallest controllable unit. If you expect few elements // but many producers, a smaller block size should be favoured. For few producers // and/or many elements, a larger block size is preferred. A sane default // is provided. Must be a power of 2. static const size_t BLOCK_SIZE = 64*1024; - + // For explicit producers (i.e. when using a producer token), the block is // checked for being empty by iterating through a list of flags, one per element. // For large block sizes, this is too inefficient, and switching to an atomic // counter-based approach is faster. The switch is made for block sizes strictly // larger than this threshold. static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32; - + // How many full blocks can be expected for a single explicit producer? This should // reflect that number's maximum for optimal performance. Must be a power of 2. static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32; - + // Controls the number of items that an explicit consumer (i.e. one with a token) // must consume before it causes all consumers to rotate and move on to the next // internal queue. static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256; - + // The maximum number of elements (inclusive) that can be enqueued to a sub-queue. // Enqueue operations that would cause this limit to be surpassed will fail. Note // that this limit is enforced at the block level (for performance reasons), i.e. // it's rounded up to the nearest block size. static const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max::value; - - + + // Memory allocation can be customized if needed. // malloc should return nullptr on failure, and handle alignment like std::malloc. #if defined(malloc) || defined(free) @@ -203,13 +203,13 @@ namespace details std::atomic inactive; ProducerToken* token; uint64_t threadId; - + ConcurrentQueueProducerTypelessBase() : next(nullptr), inactive(false), token(nullptr), threadId(0) { } }; - + template static inline bool circular_less_than(T a, T b) { @@ -223,7 +223,7 @@ namespace details #pragma warning(pop) #endif } - + template static inline char* align_for(char* ptr) { @@ -247,7 +247,7 @@ namespace details ++x; return x; } - + template static inline void swap_relaxed(std::atomic& left, std::atomic& right) { @@ -255,13 +255,13 @@ namespace details left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed); right.store(std::move(temp), std::memory_order_relaxed); } - + template static inline T const& nomove(T const& x) { return x; } - + template struct nomove_if { @@ -271,7 +271,7 @@ namespace details return x; } }; - + template<> struct nomove_if { @@ -282,19 +282,19 @@ namespace details return std::forward(x); } }; - + template static inline auto deref_noexcept(It& it) noexcept -> decltype(*it) { return *it; } - + #if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8) template struct is_trivially_destructible : std::is_trivially_destructible { }; #else template struct is_trivially_destructible : std::has_trivial_destructor { }; #endif - + template struct static_is_lock_free_num { enum { value = 0 }; }; template<> struct static_is_lock_free_num { enum { value = ATOMIC_CHAR_LOCK_FREE }; }; template<> struct static_is_lock_free_num { enum { value = ATOMIC_SHORT_LOCK_FREE }; }; @@ -337,7 +337,7 @@ struct ProducerToken other.producer->token = &other; } } - + // A token is always valid unless: // 1) Memory allocation failed during construction // 2) It was moved via the move constructor @@ -347,7 +347,7 @@ struct ProducerToken // that the token is valid for use with a specific queue, // but not which one; that's up to the user to track. inline bool valid() const { return producer != nullptr; } - + ~ProducerToken() { if (producer != nullptr) { @@ -355,14 +355,14 @@ struct ProducerToken producer->inactive.store(true, std::memory_order_release); } } - + // Disable copying and assignment ProducerToken(ProducerToken const&) = delete; ProducerToken& operator=(ProducerToken const&) = delete; private: template friend class ConcurrentQueue; - + protected: details::ConcurrentQueueProducerTypelessBase* producer; }; @@ -392,14 +392,14 @@ struct ConsumerToken std::swap(currentProducer, other.currentProducer); std::swap(desiredProducer, other.desiredProducer); } - + // Disable copying and assignment ConsumerToken(ConsumerToken const&) = delete; ConsumerToken& operator=(ConsumerToken const&) = delete; private: template friend class ConcurrentQueue; - + private: // but shared with ConcurrentQueue std::uint32_t initialOffset; std::uint32_t lastKnownGlobalOffset; @@ -417,10 +417,10 @@ public: typedef moodycamel::ProducerToken producer_token_t; typedef moodycamel::ConsumerToken consumer_token_t; - + typedef typename Traits::index_t index_t; typedef typename Traits::size_t size_t; - + static const size_t BLOCK_SIZE = static_cast(Traits::BLOCK_SIZE); static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD); static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast(Traits::EXPLICIT_INITIAL_INDEX_SIZE); @@ -462,7 +462,7 @@ public: { populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1)); } - + // Computes the correct amount of pre-allocated blocks for you based // on the minimum number of elements you want available at any given // time, and the maximum concurrent number of each type of producer. @@ -476,7 +476,7 @@ public: size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers); populate_initial_block_list(blocks); } - + // Note: The queue should not be accessed concurrently while it's // being deleted. It's up to the user to synchronize this. // This method is not thread safe. @@ -492,7 +492,7 @@ public: destroy(ptr); ptr = next; } - + // Destroy global free list auto block = freeList.head_unsafe(); while (block != nullptr) { @@ -502,7 +502,7 @@ public: } block = next; } - + // Destroy initial free list destroy_array(initialBlockPool, initialBlockPoolSize); } @@ -559,8 +559,8 @@ public: return count; } } - - + + // Returns an estimate of the total number of elements currently in the queue. This // estimate is only accurate if the queue has completely stabilized before it is called // (i.e. all enqueue and dequeue operations have completed and their memory effects are @@ -575,8 +575,8 @@ public: } return size; } - - + + // Returns true if the underlying atomic variables used by // the queue are lock-free (they should be on most platforms). // Thread-safe. @@ -595,12 +595,12 @@ private: friend struct ProducerToken; friend struct ConsumerToken; friend struct ExplicitProducer; - - + + /////////////////////////////// // Queue methods /////////////////////////////// - + inline bool update_current_producer_after_rotation(consumer_token_t& token) { // Ah, there's been a rotation, figure out where we should be! @@ -623,7 +623,7 @@ private: } } } - + std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset; if (delta >= prodCount) { delta = delta % prodCount; @@ -634,27 +634,27 @@ private: token.desiredProducer = tail; } } - + token.lastKnownGlobalOffset = globalOffset; token.currentProducer = token.desiredProducer; token.itemsConsumedFromCurrent = 0; return true; } - - + + /////////////////////////// // Free list /////////////////////////// - + template struct FreeListNode { FreeListNode() : freeListRefs(0), freeListNext(nullptr) { } - + std::atomic freeListRefs; std::atomic freeListNext; }; - + // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly // speedy under low contention. @@ -678,7 +678,7 @@ private: add_knowing_refcount_is_zero(node); } } - + inline N* try_get() { auto head = freeListHead.load(std::memory_order_acquire); @@ -689,7 +689,7 @@ private: head = freeListHead.load(std::memory_order_acquire); continue; } - + // Good, reference count has been incremented (it wasn't at zero), which means we can read the // next and not worry about it changing between now and the time we do the CAS auto next = head->freeListNext.load(std::memory_order_relaxed); @@ -697,12 +697,12 @@ private: // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on). assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0); - + // Decrease refcount twice, once for our ref, and once for the list's ref head->freeListRefs.fetch_sub(2, std::memory_order_release); return head; } - + // OK, the head must have changed on us, but we still need to decrease the refcount we increased. // Note that we don't need to release any memory effects, but we do need to ensure that the reference // count decrement happens-after the CAS on the head. @@ -711,13 +711,13 @@ private: add_knowing_refcount_is_zero(prevHead); } } - + return nullptr; } - + // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes) N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); } - + private: inline void add_knowing_refcount_is_zero(N* node) { @@ -742,27 +742,27 @@ private: return; } } - + private: // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention) std::atomic freeListHead; - + static const std::uint32_t REFS_MASK = 0x7FFFFFFF; static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000; }; - - + + /////////////////////////// // Block /////////////////////////// - + struct Block { Block() : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), shouldBeOnFreeList(false), dynamicallyAllocated(true) { } - + inline bool is_empty() const { if (compile_time_condition::value) { @@ -772,7 +772,7 @@ private: return false; } } - + // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set std::atomic_thread_fence(std::memory_order_acquire); return true; @@ -787,7 +787,7 @@ private: return false; } } - + // Returns true if the block is now empty (does not apply in explicit context) inline bool set_empty(index_t i) { @@ -804,7 +804,7 @@ private: return prevVal == BLOCK_SIZE - 1; } } - + // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0). // Returns true if the block is now empty (does not apply in explicit context). inline bool set_many_empty(index_t i, size_t count) @@ -826,7 +826,7 @@ private: return prevVal + count == BLOCK_SIZE; } } - + inline void set_all_empty() { if (BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { @@ -840,7 +840,7 @@ private: elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed); } } - + inline void reset_empty() { if (compile_time_condition::value) { @@ -889,7 +889,7 @@ private: /////////////////////////// // Producer base /////////////////////////// - + struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase { ProducerBase(ConcurrentQueue* parent_) : @@ -901,39 +901,39 @@ private: parent(parent_) { } - + virtual ~ProducerBase() { }; - + template inline size_t dequeue_bulk(NotifyThread notifyThread, ProcessData processData) { return static_cast(this)->dequeue_bulk(notifyThread, processData); } - + inline ProducerBase* next_prod() const { return static_cast(next); } - + inline size_t size_approx() const { auto tail = tailIndex.load(std::memory_order_relaxed); auto head = headIndex.load(std::memory_order_relaxed); return details::circular_less_than(head, tail) ? static_cast(tail - head) : 0; } - + inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); } protected: std::atomic tailIndex; // Where to enqueue to next std::atomic headIndex; // Where to dequeue from next - + std::atomic dequeueOptimisticCount; std::atomic dequeueOvercommit; - + Block* tailBlock; - + public: ConcurrentQueue* parent; }; - - + + public: /////////////////////////// // Explicit queue @@ -953,10 +953,10 @@ private: if (poolBasedIndexSize > pr_blockIndexSize) { pr_blockIndexSize = poolBasedIndexSize; } - + new_block_index(0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE } - + ~ExplicitProducer() { // Destruct any elements not yet dequeued. @@ -975,7 +975,7 @@ private: assert(details::circular_less_than(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed))); halfDequeuedBlock = pr_blockIndexEntries[i].block; } - + // Start at the head block (note the first line in the loop gives us the head from the tail on the first iteration) auto block = this->tailBlock; do { @@ -983,12 +983,12 @@ private: if (block->ConcurrentQueue::Block::is_empty()) { continue; } - + size_t i = 0; // Offset into block if (block == halfDequeuedBlock) { i = static_cast(this->headIndex.load(std::memory_order_relaxed) & static_cast(BLOCK_SIZE - 1)); } - + // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) & static_cast(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE : static_cast(this->tailIndex.load(std::memory_order_relaxed) & static_cast(BLOCK_SIZE - 1)); while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) { @@ -996,7 +996,7 @@ private: } } while (block != this->tailBlock); } - + // Destroy all blocks that we own if (this->tailBlock != nullptr) { auto block = this->tailBlock; @@ -1011,7 +1011,7 @@ private: block = nextBlock; } while (block != this->tailBlock); } - + // Destroy the block indices auto header = static_cast(pr_blockIndexRaw); while (header != nullptr) { @@ -1021,12 +1021,12 @@ private: header = prev; } } - + inline void enqueue_begin_alloc(index_t currentTailIndex) { // We reached the end of a block, start a new one if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::is_empty()) { - // We can re-use the block ahead of us, it's empty! + // We can re-use the block ahead of us, it's empty! this->tailBlock = this->tailBlock->next; this->tailBlock->ConcurrentQueue::Block::reset_empty(); @@ -1080,7 +1080,7 @@ private: { return this->tailIndex; } - + template size_t dequeue_bulk(NotifyThread notifyThread, ProcessData processData) { @@ -1090,10 +1090,10 @@ private: if (details::circular_less_than(0, desiredCount)) { desiredCount = desiredCount < 8192 ? desiredCount : 8192; std::atomic_thread_fence(std::memory_order_acquire); - + auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed); assert(overcommit <= myDequeueCount); - + tail = this->tailIndex.load(std::memory_order_acquire); auto actualCount = static_cast(tail - (myDequeueCount - overcommit)); if (details::circular_less_than(0, actualCount)) { @@ -1101,15 +1101,15 @@ private: if (actualCount < desiredCount) { this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release); } - + // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this // will never exceed tail. auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel); - + // Determine which block the first element is in auto localBlockIndex = blockIndex.load(std::memory_order_acquire); auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire); - + auto headBase = localBlockIndex->entries[localBlockIndexHead].base; auto firstBlockBaseIndex = firstIndex & ~static_cast(BLOCK_SIZE - 1); auto offset = static_cast(static_cast::type>(firstBlockBaseIndex - headBase) / BLOCK_SIZE); @@ -1132,7 +1132,7 @@ private: block->ConcurrentQueue::Block::set_many_empty(firstIndexInBlock, static_cast(endIndex - firstIndexInBlock)); indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1); } while (index != firstIndex + actualCount); - + return actualCount; } else { @@ -1140,7 +1140,7 @@ private: this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release); } } - + return 0; } @@ -1150,7 +1150,7 @@ private: index_t base; Block* block; }; - + struct BlockIndexHeader { size_t size; @@ -1158,12 +1158,12 @@ private: BlockIndexEntry* entries; void* prev; }; - - + + bool new_block_index(size_t numberOfFilledSlotsToExpose) { auto prevBlockSizeMask = pr_blockIndexSize - 1; - + // Create the new block pr_blockIndexSize <<= 1; auto newRawPtr = static_cast((Traits::malloc)(sizeof(BlockIndexHeader) + std::alignment_of::value - 1 + sizeof(BlockIndexEntry) * pr_blockIndexSize)); @@ -1171,9 +1171,9 @@ private: pr_blockIndexSize >>= 1; // Reset to allow graceful retry return false; } - + auto newBlockIndexEntries = reinterpret_cast(details::align_for(newRawPtr + sizeof(BlockIndexHeader))); - + // Copy in all the old indices, if any size_t j = 0; if (pr_blockIndexSlotsUsed != 0) { @@ -1183,25 +1183,25 @@ private: i = (i + 1) & prevBlockSizeMask; } while (i != pr_blockIndexFront); } - + // Update everything auto header = new (newRawPtr) BlockIndexHeader; header->size = pr_blockIndexSize; header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed); header->entries = newBlockIndexEntries; header->prev = pr_blockIndexRaw; // we link the new block to the old one so we can free it later - + pr_blockIndexFront = j; pr_blockIndexEntries = newBlockIndexEntries; pr_blockIndexRaw = newRawPtr; blockIndex.store(header, std::memory_order_release); - + return true; } - + private: std::atomic blockIndex; - + // To be used by producer only -- consumer must use the ones in referenced by blockIndex size_t pr_blockIndexSlotsUsed; size_t pr_blockIndexSize; @@ -1209,18 +1209,18 @@ private: BlockIndexEntry* pr_blockIndexEntries; void* pr_blockIndexRaw; }; - + ExplicitProducer* get_explicit_producer(producer_token_t const& token) { return static_cast(token.producer); } private: - + ////////////////////////////////// // Block pool manipulation ////////////////////////////////// - + void populate_initial_block_list(size_t blockCount) { initialBlockPoolSize = blockCount; @@ -1228,7 +1228,7 @@ private: initialBlockPool = nullptr; return; } - + initialBlockPool = create_array(blockCount); if (initialBlockPool == nullptr) { initialBlockPoolSize = 0; @@ -1237,23 +1237,23 @@ private: initialBlockPool[i].dynamicallyAllocated = false; } } - + inline Block* try_get_block_from_initial_pool() { if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) { return nullptr; } - + auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed); - + return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr; } - + inline void add_block_to_free_list(Block* block) { freeList.add(block); } - + inline void add_blocks_to_free_list(Block* block) { while (block != nullptr) { @@ -1262,12 +1262,12 @@ private: block = next; } } - + inline Block* try_get_block_from_free_list() { return freeList.try_get(); } - + // Gets a free block from one of the memory pools, or allocates a new one (if applicable) Block* requisition_block() { @@ -1275,26 +1275,26 @@ private: if (block != nullptr) { return block; } - + block = try_get_block_from_free_list(); if (block != nullptr) { return block; } - + return create(); } - - + + ////////////////////////////////// // Producer list manipulation - ////////////////////////////////// - + ////////////////////////////////// + ProducerBase* recycle_or_create_producer() { bool recycled; return recycle_or_create_producer(recycled); } - + ProducerBase* recycle_or_create_producer(bool& recycled) { // Try to re-use one first @@ -1315,25 +1315,25 @@ private: recycled = false; return add_producer(static_cast(create(this))); } - + ProducerBase* add_producer(ProducerBase* producer) { // Handle failed memory allocation if (producer == nullptr) { return nullptr; } - + producerCount.fetch_add(1, std::memory_order_relaxed); - + // Add it to the lock-free list auto prevTail = producerListTail.load(std::memory_order_relaxed); do { producer->next = prevTail; } while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed)); - + return producer; } - + void reown_producers() { // After another instance is moved-into/swapped-with this one, all the @@ -1343,18 +1343,18 @@ private: ptr->parent = this; } } - + ////////////////////////////////// // Utility functions ////////////////////////////////// - + template static inline U* create_array(size_t count) { assert(count > 0); return static_cast((Traits::malloc)(sizeof(U) * count)); } - + template static inline void destroy_array(U* p, size_t count) { @@ -1364,21 +1364,21 @@ private: (Traits::free)(p); } } - + template static inline U* create() { auto p = (Traits::malloc)(sizeof(U)); return new (p) U; } - + template static inline U* create(A1&& a1) { auto p = (Traits::malloc)(sizeof(U)); return new (p) U(std::forward(a1)); } - + template static inline void destroy(U* p) { @@ -1391,13 +1391,13 @@ private: private: std::atomic producerListTail; std::atomic producerCount; - + std::atomic initialBlockPoolIndex; Block* initialBlockPool; size_t initialBlockPoolSize; - + FreeList freeList; - + std::atomic nextExplicitConsumerId; std::atomic globalExplicitConsumerOffset; };