// Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue. // An overview, including benchmark results, is provided here: // http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++ // The full design is also described in excruciating detail at: // http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue // Simplified BSD license: // Copyright (c) 2013-2016, Cameron Desrochers. // All rights reserved. // // Redistribution and use in source and binary forms, with or without modification, // are permitted provided that the following conditions are met: // // - Redistributions of source code must retain the above copyright notice, this list of // conditions and the following disclaimer. // - Redistributions in binary form must reproduce the above copyright notice, this list of // conditions and the following disclaimer in the documentation and/or other materials // provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL // THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT // OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR // TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, // EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #pragma once #include "../common/TracyAlloc.hpp" #include "../common/TracyForceInline.hpp" #include "../common/TracySystem.hpp" #if defined(__GNUC__) // Disable -Wconversion warnings (spuriously triggered when Traits::size_t and // Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings // upon assigning any computed values) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wconversion" #endif #if defined(__APPLE__) #include "TargetConditionals.h" #endif #include // Requires C++11. Sorry VS2010. #include #include // for max_align_t #include #include #include #include #include #include #include // for CHAR_BIT #include #include // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading namespace tracy { // Compiler-specific likely/unlikely hints namespace moodycamel { namespace details { #if defined(__GNUC__) inline bool cqLikely(bool x) { return __builtin_expect((x), true); } inline bool cqUnlikely(bool x) { return __builtin_expect((x), false); } #else inline bool cqLikely(bool x) { return x; } inline bool cqUnlikely(bool x) { return x; } #endif } } namespace { // to avoid MSVC warning 4127: conditional expression is constant template struct compile_time_condition { static const bool value = false; }; template <> struct compile_time_condition { static const bool value = true; }; } namespace moodycamel { namespace details { template struct const_numeric_max { static_assert(std::is_integral::value, "const_numeric_max can only be used with integers"); static const T value = std::numeric_limits::is_signed ? (static_cast(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast(1) : static_cast(-1); }; #if defined(__GLIBCXX__) typedef ::max_align_t std_max_align_t; // libstdc++ forgot to add it to std:: for a while #else typedef std::max_align_t std_max_align_t; // Others (e.g. MSVC) insist it can *only* be accessed via std:: #endif // Some platforms have incorrectly set max_align_t to a type with <8 bytes alignment even while supporting // 8-byte aligned scalar values (*cough* 32-bit iOS). Work around this with our own union. See issue #64. typedef union { std_max_align_t x; long long y; void* z; } max_align_t; } // Default traits for the ConcurrentQueue. To change some of the // traits without re-implementing all of them, inherit from this // struct and shadow the declarations you wish to be different; // since the traits are used as a template type parameter, the // shadowed declarations will be used where defined, and the defaults // otherwise. struct ConcurrentQueueDefaultTraits { // General-purpose size type. std::size_t is strongly recommended. typedef std::size_t size_t; // The type used for the enqueue and dequeue indices. Must be at least as // large as size_t. Should be significantly larger than the number of elements // you expect to hold at once, especially if you have a high turnover rate; // for example, on 32-bit x86, if you expect to have over a hundred million // elements or pump several million elements through your queue in a very // short space of time, using a 32-bit type *may* trigger a race condition. // A 64-bit int type is recommended in that case, and in practice will // prevent a race condition no matter the usage of the queue. Note that // whether the queue is lock-free with a 64-int type depends on the whether // std::atomic is lock-free, which is platform-specific. typedef std::size_t index_t; // Internally, all elements are enqueued and dequeued from multi-element // blocks; this is the smallest controllable unit. If you expect few elements // but many producers, a smaller block size should be favoured. For few producers // and/or many elements, a larger block size is preferred. A sane default // is provided. Must be a power of 2. static const size_t BLOCK_SIZE = 64*1024; // For explicit producers (i.e. when using a producer token), the block is // checked for being empty by iterating through a list of flags, one per element. // For large block sizes, this is too inefficient, and switching to an atomic // counter-based approach is faster. The switch is made for block sizes strictly // larger than this threshold. static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32; // How many full blocks can be expected for a single explicit producer? This should // reflect that number's maximum for optimal performance. Must be a power of 2. static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32; // Controls the number of items that an explicit consumer (i.e. one with a token) // must consume before it causes all consumers to rotate and move on to the next // internal queue. static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256; // The maximum number of elements (inclusive) that can be enqueued to a sub-queue. // Enqueue operations that would cause this limit to be surpassed will fail. Note // that this limit is enforced at the block level (for performance reasons), i.e. // it's rounded up to the nearest block size. static const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max::value; // Memory allocation can be customized if needed. // malloc should return nullptr on failure, and handle alignment like std::malloc. #if defined(malloc) || defined(free) // Gah, this is 2015, stop defining macros that break standard code already! // Work around malloc/free being special macros: static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); } static inline void WORKAROUND_free(void* ptr) { return free(ptr); } static inline void* (malloc)(size_t size) { return WORKAROUND_malloc(size); } static inline void (free)(void* ptr) { return WORKAROUND_free(ptr); } #else static inline void* malloc(size_t size) { return tracy::tracy_malloc(size); } static inline void free(void* ptr) { return tracy::tracy_free(ptr); } #endif }; // When producing or consuming many elements, the most efficient way is to: // 1) Use one of the bulk-operation methods of the queue with a token // 2) Failing that, use the bulk-operation methods without a token // 3) Failing that, create a token and use that with the single-item methods // 4) Failing that, use the single-parameter methods of the queue // Having said that, don't create tokens willy-nilly -- ideally there should be // a maximum of one token per thread (of each kind). struct ProducerToken; struct ConsumerToken; template class ConcurrentQueue; namespace details { struct ConcurrentQueueProducerTypelessBase { ConcurrentQueueProducerTypelessBase* next; std::atomic inactive; ProducerToken* token; uint32_t threadId; ConcurrentQueueProducerTypelessBase() : next(nullptr), inactive(false), token(nullptr), threadId(0) { } }; template static inline bool circular_less_than(T a, T b) { static_assert(std::is_integral::value && !std::numeric_limits::is_signed, "circular_less_than is intended to be used only with unsigned integer types"); return static_cast(a - b) > static_cast(static_cast(1) << (static_cast(sizeof(T) * CHAR_BIT - 1))); // Note: extra parens around rhs of operator<< is MSVC bug: https://developercommunity2.visualstudio.com/t/C4554-triggers-when-both-lhs-and-rhs-is/10034931 // silencing the bug requires #pragma warning(disable: 4554) around the calling code and has no effect when done here. } template static inline char* align_for(char* ptr) { const std::size_t alignment = std::alignment_of::value; return ptr + (alignment - (reinterpret_cast(ptr) % alignment)) % alignment; } template static inline T ceil_to_pow_2(T x) { static_assert(std::is_integral::value && !std::numeric_limits::is_signed, "ceil_to_pow_2 is intended to be used only with unsigned integer types"); // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2 --x; x |= x >> 1; x |= x >> 2; x |= x >> 4; for (std::size_t i = 1; i < sizeof(T); i <<= 1) { x |= x >> (i << 3); } ++x; return x; } template static inline void swap_relaxed(std::atomic& left, std::atomic& right) { T temp = std::move(left.load(std::memory_order_relaxed)); left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed); right.store(std::move(temp), std::memory_order_relaxed); } template static inline T const& nomove(T const& x) { return x; } template struct nomove_if { template static inline T const& eval(T const& x) { return x; } }; template<> struct nomove_if { template static inline auto eval(U&& x) -> decltype(std::forward(x)) { return std::forward(x); } }; template static inline auto deref_noexcept(It& it) noexcept -> decltype(*it) { return *it; } #if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8) template struct is_trivially_destructible : std::is_trivially_destructible { }; #else template struct is_trivially_destructible : std::has_trivial_destructor { }; #endif template struct static_is_lock_free_num { enum { value = 0 }; }; template<> struct static_is_lock_free_num { enum { value = ATOMIC_CHAR_LOCK_FREE }; }; template<> struct static_is_lock_free_num { enum { value = ATOMIC_SHORT_LOCK_FREE }; }; template<> struct static_is_lock_free_num { enum { value = ATOMIC_INT_LOCK_FREE }; }; template<> struct static_is_lock_free_num { enum { value = ATOMIC_LONG_LOCK_FREE }; }; template<> struct static_is_lock_free_num { enum { value = ATOMIC_LLONG_LOCK_FREE }; }; template struct static_is_lock_free : static_is_lock_free_num::type> { }; template<> struct static_is_lock_free { enum { value = ATOMIC_BOOL_LOCK_FREE }; }; template struct static_is_lock_free { enum { value = ATOMIC_POINTER_LOCK_FREE }; }; } struct ProducerToken { template explicit ProducerToken(ConcurrentQueue& queue); ProducerToken(ProducerToken&& other) noexcept : producer(other.producer) { other.producer = nullptr; if (producer != nullptr) { producer->token = this; } } inline ProducerToken& operator=(ProducerToken&& other) noexcept { swap(other); return *this; } void swap(ProducerToken& other) noexcept { std::swap(producer, other.producer); if (producer != nullptr) { producer->token = this; } if (other.producer != nullptr) { other.producer->token = &other; } } // A token is always valid unless: // 1) Memory allocation failed during construction // 2) It was moved via the move constructor // (Note: assignment does a swap, leaving both potentially valid) // 3) The associated queue was destroyed // Note that if valid() returns true, that only indicates // that the token is valid for use with a specific queue, // but not which one; that's up to the user to track. inline bool valid() const { return producer != nullptr; } ~ProducerToken() { if (producer != nullptr) { producer->token = nullptr; producer->inactive.store(true, std::memory_order_release); } } // Disable copying and assignment ProducerToken(ProducerToken const&) = delete; ProducerToken& operator=(ProducerToken const&) = delete; private: template friend class ConcurrentQueue; protected: details::ConcurrentQueueProducerTypelessBase* producer; }; struct ConsumerToken { template explicit ConsumerToken(ConcurrentQueue& q); ConsumerToken(ConsumerToken&& other) noexcept : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer) { } inline ConsumerToken& operator=(ConsumerToken&& other) noexcept { swap(other); return *this; } void swap(ConsumerToken& other) noexcept { std::swap(initialOffset, other.initialOffset); std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset); std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent); std::swap(currentProducer, other.currentProducer); std::swap(desiredProducer, other.desiredProducer); } // Disable copying and assignment ConsumerToken(ConsumerToken const&) = delete; ConsumerToken& operator=(ConsumerToken const&) = delete; private: template friend class ConcurrentQueue; private: // but shared with ConcurrentQueue std::uint32_t initialOffset; std::uint32_t lastKnownGlobalOffset; std::uint32_t itemsConsumedFromCurrent; details::ConcurrentQueueProducerTypelessBase* currentProducer; details::ConcurrentQueueProducerTypelessBase* desiredProducer; }; template class ConcurrentQueue { public: struct ExplicitProducer; typedef moodycamel::ProducerToken producer_token_t; typedef moodycamel::ConsumerToken consumer_token_t; typedef typename Traits::index_t index_t; typedef typename Traits::size_t size_t; static const size_t BLOCK_SIZE = static_cast(Traits::BLOCK_SIZE); static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD); static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast(Traits::EXPLICIT_INITIAL_INDEX_SIZE); static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE); #ifdef _MSC_VER #pragma warning(push) #pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!) #pragma warning(disable: 4309) // static_cast: Truncation of constant value #endif static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max::value - static_cast(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) ? details::const_numeric_max::value : ((static_cast(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE); #ifdef _MSC_VER #pragma warning(pop) #endif static_assert(!std::numeric_limits::is_signed && std::is_integral::value, "Traits::size_t must be an unsigned integral type"); static_assert(!std::numeric_limits::is_signed && std::is_integral::value, "Traits::index_t must be an unsigned integral type"); static_assert(sizeof(index_t) >= sizeof(size_t), "Traits::index_t must be at least as wide as Traits::size_t"); static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)), "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)"); static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)), "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)"); static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)"); public: // Creates a queue with at least `capacity` element slots; note that the // actual number of elements that can be inserted without additional memory // allocation depends on the number of producers and the block size (e.g. if // the block size is equal to `capacity`, only a single block will be allocated // up-front, which means only a single producer will be able to enqueue elements // without an extra allocation -- blocks aren't shared between producers). // This method is not thread safe -- it is up to the user to ensure that the // queue is fully constructed before it starts being used by other threads (this // includes making the memory effects of construction visible, possibly with a // memory barrier). explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE) : producerListTail(nullptr), producerCount(0), initialBlockPoolIndex(0), nextExplicitConsumerId(0), globalExplicitConsumerOffset(0) { populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1)); } // Computes the correct amount of pre-allocated blocks for you based // on the minimum number of elements you want available at any given // time, and the maximum concurrent number of each type of producer. ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers) : producerListTail(nullptr), producerCount(0), initialBlockPoolIndex(0), nextExplicitConsumerId(0), globalExplicitConsumerOffset(0) { size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers); populate_initial_block_list(blocks); } // Note: The queue should not be accessed concurrently while it's // being deleted. It's up to the user to synchronize this. // This method is not thread safe. ~ConcurrentQueue() { // Destroy producers auto ptr = producerListTail.load(std::memory_order_relaxed); while (ptr != nullptr) { auto next = ptr->next_prod(); if (ptr->token != nullptr) { ptr->token->producer = nullptr; } destroy(ptr); ptr = next; } // Destroy global free list auto block = freeList.head_unsafe(); while (block != nullptr) { auto next = block->freeListNext.load(std::memory_order_relaxed); if (block->dynamicallyAllocated) { destroy(block); } block = next; } // Destroy initial free list destroy_array(initialBlockPool, initialBlockPoolSize); } // Disable copying and copy assignment ConcurrentQueue(ConcurrentQueue const&) = delete; ConcurrentQueue(ConcurrentQueue&& other) = delete; ConcurrentQueue& operator=(ConcurrentQueue const&) = delete; ConcurrentQueue& operator=(ConcurrentQueue&& other) = delete; public: tracy_force_inline T* enqueue_begin(producer_token_t const& token, index_t& currentTailIndex) { return static_cast(token.producer)->ConcurrentQueue::ExplicitProducer::enqueue_begin(currentTailIndex); } template size_t try_dequeue_bulk_single(consumer_token_t& token, NotifyThread notifyThread, ProcessData processData ) { if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) { if (!update_current_producer_after_rotation(token)) { return 0; } } size_t count = static_cast(token.currentProducer)->dequeue_bulk(notifyThread, processData); token.itemsConsumedFromCurrent += static_cast(count); auto tail = producerListTail.load(std::memory_order_acquire); auto ptr = static_cast(token.currentProducer)->next_prod(); if (ptr == nullptr) { ptr = tail; } if( count == 0 ) { while (ptr != static_cast(token.currentProducer)) { auto dequeued = ptr->dequeue_bulk(notifyThread, processData); if (dequeued != 0) { token.currentProducer = ptr; token.itemsConsumedFromCurrent = static_cast(dequeued); return dequeued; } ptr = ptr->next_prod(); if (ptr == nullptr) { ptr = tail; } } return 0; } else { token.currentProducer = ptr; token.itemsConsumedFromCurrent = 0; return count; } } // Returns an estimate of the total number of elements currently in the queue. This // estimate is only accurate if the queue has completely stabilized before it is called // (i.e. all enqueue and dequeue operations have completed and their memory effects are // visible on the calling thread, and no further operations start while this method is // being called). // Thread-safe. size_t size_approx() const { size_t size = 0; for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) { size += ptr->size_approx(); } return size; } // Returns true if the underlying atomic variables used by // the queue are lock-free (they should be on most platforms). // Thread-safe. static bool is_lock_free() { return details::static_is_lock_free::value == 2 && details::static_is_lock_free::value == 2 && details::static_is_lock_free::value == 2 && details::static_is_lock_free::value == 2 && details::static_is_lock_free::value == 2; } private: friend struct ProducerToken; friend struct ConsumerToken; friend struct ExplicitProducer; /////////////////////////////// // Queue methods /////////////////////////////// inline bool update_current_producer_after_rotation(consumer_token_t& token) { // Ah, there's been a rotation, figure out where we should be! auto tail = producerListTail.load(std::memory_order_acquire); if (token.desiredProducer == nullptr && tail == nullptr) { return false; } auto prodCount = producerCount.load(std::memory_order_relaxed); auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed); if (details::cqUnlikely(token.desiredProducer == nullptr)) { // Aha, first time we're dequeueing anything. // Figure out our local position // Note: offset is from start, not end, but we're traversing from end -- subtract from count first std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount); token.desiredProducer = tail; for (std::uint32_t i = 0; i != offset; ++i) { token.desiredProducer = static_cast(token.desiredProducer)->next_prod(); if (token.desiredProducer == nullptr) { token.desiredProducer = tail; } } } std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset; if (delta >= prodCount) { delta = delta % prodCount; } for (std::uint32_t i = 0; i != delta; ++i) { token.desiredProducer = static_cast(token.desiredProducer)->next_prod(); if (token.desiredProducer == nullptr) { token.desiredProducer = tail; } } token.lastKnownGlobalOffset = globalOffset; token.currentProducer = token.desiredProducer; token.itemsConsumedFromCurrent = 0; return true; } /////////////////////////// // Free list /////////////////////////// template struct FreeListNode { FreeListNode() : freeListRefs(0), freeListNext(nullptr) { } std::atomic freeListRefs; std::atomic freeListNext; }; // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly // speedy under low contention. template // N must inherit FreeListNode or have the same fields (and initialization of them) struct FreeList { FreeList() : freeListHead(nullptr) { } FreeList(FreeList&& other) : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) { other.freeListHead.store(nullptr, std::memory_order_relaxed); } void swap(FreeList& other) { details::swap_relaxed(freeListHead, other.freeListHead); } FreeList(FreeList const&) = delete; FreeList& operator=(FreeList const&) = delete; inline void add(N* node) { // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to // set it using a fetch_add if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) { // Oh look! We were the last ones referencing this node, and we know // we want to add it to the free list, so let's do it! add_knowing_refcount_is_zero(node); } } inline N* try_get() { auto head = freeListHead.load(std::memory_order_acquire); while (head != nullptr) { auto prevHead = head; auto refs = head->freeListRefs.load(std::memory_order_relaxed); if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) { head = freeListHead.load(std::memory_order_acquire); continue; } // Good, reference count has been incremented (it wasn't at zero), which means we can read the // next and not worry about it changing between now and the time we do the CAS auto next = head->freeListNext.load(std::memory_order_relaxed); if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) { // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on). assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0); // Decrease refcount twice, once for our ref, and once for the list's ref head->freeListRefs.fetch_sub(2, std::memory_order_release); return head; } // OK, the head must have changed on us, but we still need to decrease the refcount we increased. // Note that we don't need to release any memory effects, but we do need to ensure that the reference // count decrement happens-after the CAS on the head. refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel); if (refs == SHOULD_BE_ON_FREELIST + 1) { add_knowing_refcount_is_zero(prevHead); } } return nullptr; } // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes) N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); } private: inline void add_knowing_refcount_is_zero(N* node) { // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we run // only one copy of this method per node at a time, i.e. the single thread case), then we know // we can safely change the next pointer of the node; however, once the refcount is back above // zero, then other threads could increase it (happens under heavy contention, when the refcount // goes to zero in between a load and a refcount increment of a node in try_get, then back up to // something non-zero, then the refcount increment is done by the other thread) -- so, if the CAS // to add the node to the actual list fails, decrease the refcount and leave the add operation to // the next thread who puts the refcount back at zero (which could be us, hence the loop). auto head = freeListHead.load(std::memory_order_relaxed); while (true) { node->freeListNext.store(head, std::memory_order_relaxed); node->freeListRefs.store(1, std::memory_order_release); if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) { // Hmm, the add failed, but we can only try again when the refcount goes back to zero if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) { continue; } } return; } } private: // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention) std::atomic freeListHead; static const std::uint32_t REFS_MASK = 0x7FFFFFFF; static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000; }; /////////////////////////// // Block /////////////////////////// struct Block { Block() : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), shouldBeOnFreeList(false), dynamicallyAllocated(true) { } inline bool is_empty() const { if (compile_time_condition::value) { // Check flags for (size_t i = 0; i < BLOCK_SIZE; ++i) { if (!emptyFlags[i].load(std::memory_order_relaxed)) { return false; } } // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set std::atomic_thread_fence(std::memory_order_acquire); return true; } else { // Check counter if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) { std::atomic_thread_fence(std::memory_order_acquire); return true; } assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE); return false; } } // Returns true if the block is now empty (does not apply in explicit context) inline bool set_empty(index_t i) { if (BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { // Set flag assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast(i & static_cast(BLOCK_SIZE - 1))].load(std::memory_order_relaxed)); emptyFlags[BLOCK_SIZE - 1 - static_cast(i & static_cast(BLOCK_SIZE - 1))].store(true, std::memory_order_release); return false; } else { // Increment counter auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release); assert(prevVal < BLOCK_SIZE); return prevVal == BLOCK_SIZE - 1; } } // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0). // Returns true if the block is now empty (does not apply in explicit context). inline bool set_many_empty(index_t i, size_t count) { if (compile_time_condition::value) { // Set flags std::atomic_thread_fence(std::memory_order_release); i = BLOCK_SIZE - 1 - static_cast(i & static_cast(BLOCK_SIZE - 1)) - count + 1; for (size_t j = 0; j != count; ++j) { assert(!emptyFlags[i + j].load(std::memory_order_relaxed)); emptyFlags[i + j].store(true, std::memory_order_relaxed); } return false; } else { // Increment counter auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release); assert(prevVal + count <= BLOCK_SIZE); return prevVal + count == BLOCK_SIZE; } } inline void set_all_empty() { if (BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { // Set all flags for (size_t i = 0; i != BLOCK_SIZE; ++i) { emptyFlags[i].store(true, std::memory_order_relaxed); } } else { // Reset counter elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed); } } inline void reset_empty() { if (compile_time_condition::value) { // Reset flags for (size_t i = 0; i != BLOCK_SIZE; ++i) { emptyFlags[i].store(false, std::memory_order_relaxed); } } else { // Reset counter elementsCompletelyDequeued.store(0, std::memory_order_relaxed); } } inline T* operator[](index_t idx) noexcept { return static_cast(static_cast(elements)) + static_cast(idx & static_cast(BLOCK_SIZE - 1)); } inline T const* operator[](index_t idx) const noexcept { return static_cast(static_cast(elements)) + static_cast(idx & static_cast(BLOCK_SIZE - 1)); } private: // IMPORTANT: This must be the first member in Block, so that if T depends on the alignment of // addresses returned by malloc, that alignment will be preserved. Apparently clang actually // generates code that uses this assumption for AVX instructions in some cases. Ideally, we // should also align Block to the alignment of T in case it's higher than malloc's 16-byte // alignment, but this is hard to do in a cross-platform way. Assert for this case: static_assert(std::alignment_of::value <= std::alignment_of::value, "The queue does not support super-aligned types at this time"); // Additionally, we need the alignment of Block itself to be a multiple of max_align_t since // otherwise the appropriate padding will not be added at the end of Block in order to make // arrays of Blocks all be properly aligned (not just the first one). We use a union to force // this. union { char elements[sizeof(T) * BLOCK_SIZE]; details::max_align_t dummy; }; public: Block* next; std::atomic elementsCompletelyDequeued; std::atomic emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1]; public: std::atomic freeListRefs; std::atomic freeListNext; std::atomic shouldBeOnFreeList; bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool' }; static_assert(std::alignment_of::value >= std::alignment_of::value, "Internal error: Blocks must be at least as aligned as the type they are wrapping"); /////////////////////////// // Producer base /////////////////////////// struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase { ProducerBase(ConcurrentQueue* parent_) : tailIndex(0), headIndex(0), dequeueOptimisticCount(0), dequeueOvercommit(0), tailBlock(nullptr), parent(parent_) { } virtual ~ProducerBase() { }; template inline size_t dequeue_bulk(NotifyThread notifyThread, ProcessData processData) { return static_cast(this)->dequeue_bulk(notifyThread, processData); } inline ProducerBase* next_prod() const { return static_cast(next); } inline size_t size_approx() const { auto tail = tailIndex.load(std::memory_order_relaxed); auto head = headIndex.load(std::memory_order_relaxed); return details::circular_less_than(head, tail) ? static_cast(tail - head) : 0; } inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); } protected: std::atomic tailIndex; // Where to enqueue to next std::atomic headIndex; // Where to dequeue from next std::atomic dequeueOptimisticCount; std::atomic dequeueOvercommit; Block* tailBlock; public: ConcurrentQueue* parent; }; public: /////////////////////////// // Explicit queue /////////////////////////// struct ExplicitProducer : public ProducerBase { explicit ExplicitProducer(ConcurrentQueue* _parent) : ProducerBase(_parent), blockIndex(nullptr), pr_blockIndexSlotsUsed(0), pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1), pr_blockIndexFront(0), pr_blockIndexEntries(nullptr), pr_blockIndexRaw(nullptr) { size_t poolBasedIndexSize = details::ceil_to_pow_2(_parent->initialBlockPoolSize) >> 1; if (poolBasedIndexSize > pr_blockIndexSize) { pr_blockIndexSize = poolBasedIndexSize; } new_block_index(0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE } ~ExplicitProducer() { // Destruct any elements not yet dequeued. // Since we're in the destructor, we can assume all elements // are either completely dequeued or completely not (no halfways). if (this->tailBlock != nullptr) { // Note this means there must be a block index too // First find the block that's partially dequeued, if any Block* halfDequeuedBlock = nullptr; if ((this->headIndex.load(std::memory_order_relaxed) & static_cast(BLOCK_SIZE - 1)) != 0) { // The head's not on a block boundary, meaning a block somewhere is partially dequeued // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a boundary) size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1); while (details::circular_less_than(pr_blockIndexEntries[i].base + BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) { i = (i + 1) & (pr_blockIndexSize - 1); } assert(details::circular_less_than(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed))); halfDequeuedBlock = pr_blockIndexEntries[i].block; } // Start at the head block (note the first line in the loop gives us the head from the tail on the first iteration) auto block = this->tailBlock; do { block = block->next; if (block->ConcurrentQueue::Block::is_empty()) { continue; } size_t i = 0; // Offset into block if (block == halfDequeuedBlock) { i = static_cast(this->headIndex.load(std::memory_order_relaxed) & static_cast(BLOCK_SIZE - 1)); } // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) & static_cast(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE : static_cast(this->tailIndex.load(std::memory_order_relaxed) & static_cast(BLOCK_SIZE - 1)); while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) { (*block)[i++]->~T(); } } while (block != this->tailBlock); } // Destroy all blocks that we own if (this->tailBlock != nullptr) { auto block = this->tailBlock; do { auto nextBlock = block->next; if (block->dynamicallyAllocated) { destroy(block); } else { this->parent->add_block_to_free_list(block); } block = nextBlock; } while (block != this->tailBlock); } // Destroy the block indices auto header = static_cast(pr_blockIndexRaw); while (header != nullptr) { auto prev = static_cast(header->prev); header->~BlockIndexHeader(); (Traits::free)(header); header = prev; } } inline void enqueue_begin_alloc(index_t currentTailIndex) { // We reached the end of a block, start a new one if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::is_empty()) { // We can re-use the block ahead of us, it's empty! this->tailBlock = this->tailBlock->next; this->tailBlock->ConcurrentQueue::Block::reset_empty(); // We'll put the block on the block index (guaranteed to be room since we're conceptually removing the // last block from it first -- except instead of removing then adding, we can just overwrite). // Note that there must be a valid block index here, since even if allocation failed in the ctor, // it would have been re-attempted when adding the first block to the queue; since there is such // a block, a block index must have been successfully allocated. } else { // We're going to need a new block; check that the block index has room if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) { // Hmm, the circular block index is already full -- we'll need // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if // the initial allocation failed in the constructor. new_block_index(pr_blockIndexSlotsUsed); } // Insert a new block in the circular linked list auto newBlock = this->parent->ConcurrentQueue::requisition_block(); newBlock->ConcurrentQueue::Block::reset_empty(); if (this->tailBlock == nullptr) { newBlock->next = newBlock; } else { newBlock->next = this->tailBlock->next; this->tailBlock->next = newBlock; } this->tailBlock = newBlock; ++pr_blockIndexSlotsUsed; } // Add block to block index auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront]; entry.base = currentTailIndex; entry.block = this->tailBlock; blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release); pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1); } tracy_force_inline T* enqueue_begin(index_t& currentTailIndex) { currentTailIndex = this->tailIndex.load(std::memory_order_relaxed); if (details::cqUnlikely((currentTailIndex & static_cast(BLOCK_SIZE - 1)) == 0)) { this->enqueue_begin_alloc(currentTailIndex); } return (*this->tailBlock)[currentTailIndex]; } tracy_force_inline std::atomic& get_tail_index() { return this->tailIndex; } template size_t dequeue_bulk(NotifyThread notifyThread, ProcessData processData) { auto tail = this->tailIndex.load(std::memory_order_relaxed); auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed); auto desiredCount = static_cast(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit)); if (details::circular_less_than(0, desiredCount)) { desiredCount = desiredCount < 8192 ? desiredCount : 8192; std::atomic_thread_fence(std::memory_order_acquire); auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed); assert(overcommit <= myDequeueCount); tail = this->tailIndex.load(std::memory_order_acquire); auto actualCount = static_cast(tail - (myDequeueCount - overcommit)); if (details::circular_less_than(0, actualCount)) { actualCount = desiredCount < actualCount ? desiredCount : actualCount; if (actualCount < desiredCount) { this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release); } // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this // will never exceed tail. auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel); // Determine which block the first element is in auto localBlockIndex = blockIndex.load(std::memory_order_acquire); auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire); auto headBase = localBlockIndex->entries[localBlockIndexHead].base; auto firstBlockBaseIndex = firstIndex & ~static_cast(BLOCK_SIZE - 1); auto offset = static_cast(static_cast::type>(firstBlockBaseIndex - headBase) / BLOCK_SIZE); auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1); notifyThread( this->threadId ); // Iterate the blocks and dequeue auto index = firstIndex; do { auto firstIndexInBlock = index; auto endIndex = (index & ~static_cast(BLOCK_SIZE - 1)) + static_cast(BLOCK_SIZE); endIndex = details::circular_less_than(firstIndex + static_cast(actualCount), endIndex) ? firstIndex + static_cast(actualCount) : endIndex; auto block = localBlockIndex->entries[indexIndex].block; const auto sz = endIndex - index; processData( (*block)[index], sz ); index += sz; block->ConcurrentQueue::Block::set_many_empty(firstIndexInBlock, static_cast(endIndex - firstIndexInBlock)); indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1); } while (index != firstIndex + actualCount); return actualCount; } else { // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release); } } return 0; } private: struct BlockIndexEntry { index_t base; Block* block; }; struct BlockIndexHeader { size_t size; std::atomic front; // Current slot (not next, like pr_blockIndexFront) BlockIndexEntry* entries; void* prev; }; bool new_block_index(size_t numberOfFilledSlotsToExpose) { auto prevBlockSizeMask = pr_blockIndexSize - 1; // Create the new block pr_blockIndexSize <<= 1; auto newRawPtr = static_cast((Traits::malloc)(sizeof(BlockIndexHeader) + std::alignment_of::value - 1 + sizeof(BlockIndexEntry) * pr_blockIndexSize)); if (newRawPtr == nullptr) { pr_blockIndexSize >>= 1; // Reset to allow graceful retry return false; } auto newBlockIndexEntries = reinterpret_cast(details::align_for(newRawPtr + sizeof(BlockIndexHeader))); // Copy in all the old indices, if any size_t j = 0; if (pr_blockIndexSlotsUsed != 0) { auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask; do { newBlockIndexEntries[j++] = pr_blockIndexEntries[i]; i = (i + 1) & prevBlockSizeMask; } while (i != pr_blockIndexFront); } // Update everything auto header = new (newRawPtr) BlockIndexHeader; header->size = pr_blockIndexSize; header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed); header->entries = newBlockIndexEntries; header->prev = pr_blockIndexRaw; // we link the new block to the old one so we can free it later pr_blockIndexFront = j; pr_blockIndexEntries = newBlockIndexEntries; pr_blockIndexRaw = newRawPtr; blockIndex.store(header, std::memory_order_release); return true; } private: std::atomic blockIndex; // To be used by producer only -- consumer must use the ones in referenced by blockIndex size_t pr_blockIndexSlotsUsed; size_t pr_blockIndexSize; size_t pr_blockIndexFront; // Next slot (not current) BlockIndexEntry* pr_blockIndexEntries; void* pr_blockIndexRaw; }; ExplicitProducer* get_explicit_producer(producer_token_t const& token) { return static_cast(token.producer); } private: ////////////////////////////////// // Block pool manipulation ////////////////////////////////// void populate_initial_block_list(size_t blockCount) { initialBlockPoolSize = blockCount; if (initialBlockPoolSize == 0) { initialBlockPool = nullptr; return; } initialBlockPool = create_array(blockCount); if (initialBlockPool == nullptr) { initialBlockPoolSize = 0; } for (size_t i = 0; i < initialBlockPoolSize; ++i) { initialBlockPool[i].dynamicallyAllocated = false; } } inline Block* try_get_block_from_initial_pool() { if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) { return nullptr; } auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed); return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr; } inline void add_block_to_free_list(Block* block) { freeList.add(block); } inline void add_blocks_to_free_list(Block* block) { while (block != nullptr) { auto next = block->next; add_block_to_free_list(block); block = next; } } inline Block* try_get_block_from_free_list() { return freeList.try_get(); } // Gets a free block from one of the memory pools, or allocates a new one (if applicable) Block* requisition_block() { auto block = try_get_block_from_initial_pool(); if (block != nullptr) { return block; } block = try_get_block_from_free_list(); if (block != nullptr) { return block; } return create(); } ////////////////////////////////// // Producer list manipulation ////////////////////////////////// ProducerBase* recycle_or_create_producer() { bool recycled; return recycle_or_create_producer(recycled); } ProducerBase* recycle_or_create_producer(bool& recycled) { // Try to re-use one first for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) { if (ptr->inactive.load(std::memory_order_relaxed)) { if( ptr->size_approx() == 0 ) { bool expected = true; if (ptr->inactive.compare_exchange_strong(expected, /* desired */ false, std::memory_order_acquire, std::memory_order_relaxed)) { // We caught one! It's been marked as activated, the caller can have it recycled = true; return ptr; } } } } recycled = false; return add_producer(static_cast(create(this))); } ProducerBase* add_producer(ProducerBase* producer) { // Handle failed memory allocation if (producer == nullptr) { return nullptr; } producerCount.fetch_add(1, std::memory_order_relaxed); // Add it to the lock-free list auto prevTail = producerListTail.load(std::memory_order_relaxed); do { producer->next = prevTail; } while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed)); return producer; } void reown_producers() { // After another instance is moved-into/swapped-with this one, all the // producers we stole still think their parents are the other queue. // So fix them up! for (auto ptr = producerListTail.load(std::memory_order_relaxed); ptr != nullptr; ptr = ptr->next_prod()) { ptr->parent = this; } } ////////////////////////////////// // Utility functions ////////////////////////////////// template static inline U* create_array(size_t count) { assert(count > 0); return static_cast((Traits::malloc)(sizeof(U) * count)); } template static inline void destroy_array(U* p, size_t count) { ((void)count); if (p != nullptr) { assert(count > 0); (Traits::free)(p); } } template static inline U* create() { auto p = (Traits::malloc)(sizeof(U)); return new (p) U; } template static inline U* create(A1&& a1) { auto p = (Traits::malloc)(sizeof(U)); return new (p) U(std::forward(a1)); } template static inline void destroy(U* p) { if (p != nullptr) { p->~U(); } (Traits::free)(p); } private: std::atomic producerListTail; std::atomic producerCount; std::atomic initialBlockPoolIndex; Block* initialBlockPool; size_t initialBlockPoolSize; FreeList freeList; std::atomic nextExplicitConsumerId; std::atomic globalExplicitConsumerOffset; }; template ProducerToken::ProducerToken(ConcurrentQueue& queue) : producer(queue.recycle_or_create_producer()) { if (producer != nullptr) { producer->token = this; producer->threadId = detail::GetThreadHandleImpl(); } } template ConsumerToken::ConsumerToken(ConcurrentQueue& queue) : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr) { initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release); lastKnownGlobalOffset = static_cast(-1); } template inline void swap(ConcurrentQueue& a, ConcurrentQueue& b) noexcept { a.swap(b); } inline void swap(ProducerToken& a, ProducerToken& b) noexcept { a.swap(b); } inline void swap(ConsumerToken& a, ConsumerToken& b) noexcept { a.swap(b); } } } /* namespace tracy */ #if defined(__GNUC__) #pragma GCC diagnostic pop #endif