diff --git a/client/concurrentqueue.h b/client/concurrentqueue.h index f587b45c..007326ac 100755 --- a/client/concurrentqueue.h +++ b/client/concurrentqueue.h @@ -942,7 +942,17 @@ public: { return inner_enqueue(token, std::move(item)); } - + + inline T* enqueue_begin(producer_token_t const& token) + { + return inner_enqueue_begin(token); + } + + inline void enqueue_finish(producer_token_t const& token) + { + inner_enqueue_finish(token); + } + // Enqueues several items. // Allocates memory if required. Only fails if memory allocation fails (or // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE @@ -1281,7 +1291,18 @@ private: { return static_cast(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue(std::forward(element)); } - + + template + inline T* inner_enqueue_begin(producer_token_t const& token) + { + return static_cast(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue_begin(); + } + + inline void inner_enqueue_finish(producer_token_t const& token) + { + return static_cast(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue_finish(); + } + template inline bool inner_enqueue(U&& element) { @@ -1878,7 +1899,91 @@ private: this->tailIndex.store(newTailIndex, std::memory_order_release); return true; } - + + template + inline T* enqueue_begin() + { + index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed); + pr_newTailIndex = 1 + currentTailIndex; + if ((currentTailIndex & static_cast(BLOCK_SIZE - 1)) == 0) { + // We reached the end of a block, start a new one + auto startBlock = this->tailBlock; + auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed; + if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::template is_empty()) { + // We can re-use the block ahead of us, it's empty! + this->tailBlock = this->tailBlock->next; + this->tailBlock->ConcurrentQueue::Block::template reset_empty(); + + // We'll put the block on the block index (guaranteed to be room since we're conceptually removing the + // last block from it first -- except instead of removing then adding, we can just overwrite). + // Note that there must be a valid block index here, since even if allocation failed in the ctor, + // it would have been re-attempted when adding the first block to the queue; since there is such + // a block, a block index must have been successfully allocated. + } + else { + // Whatever head value we see here is >= the last value we saw here (relatively), + // and <= its current value. Since we have the most recent tail, the head must be + // <= to it. + auto head = this->headIndex.load(std::memory_order_relaxed); + assert(!details::circular_less_than(currentTailIndex, head)); + if (!details::circular_less_than(head, currentTailIndex + BLOCK_SIZE) + || (MAX_SUBQUEUE_SIZE != details::const_numeric_max::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) { + // We can't enqueue in another block because there's not enough leeway -- the + // tail could surpass the head by the time the block fills up! (Or we'll exceed + // the size limit, if the second part of the condition was true.) + return nullptr; + } + // We're going to need a new block; check that the block index has room + if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) { + // Hmm, the circular block index is already full -- we'll need + // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if + // the initial allocation failed in the constructor. + + if (allocMode == CannotAlloc || !new_block_index(pr_blockIndexSlotsUsed)) { + return nullptr; + } + } + + // Insert a new block in the circular linked list + auto newBlock = this->parent->ConcurrentQueue::template requisition_block(); + if (newBlock == nullptr) { + return nullptr; + } +#if MCDBGQ_TRACKMEM + newBlock->owner = this; +#endif + newBlock->ConcurrentQueue::Block::template reset_empty(); + if (this->tailBlock == nullptr) { + newBlock->next = newBlock; + } + else { + newBlock->next = this->tailBlock->next; + this->tailBlock->next = newBlock; + } + this->tailBlock = newBlock; + ++pr_blockIndexSlotsUsed; + } + + (void)startBlock; + (void)originalBlockIndexSlotsUsed; + + // Add block to block index + auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront]; + entry.base = currentTailIndex; + entry.block = this->tailBlock; + blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release); + pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1); + } + + // Enqueue + return (*this->tailBlock)[currentTailIndex]; + } + + inline void enqueue_finish() + { + this->tailIndex.store(pr_newTailIndex, std::memory_order_release); + } + template bool dequeue(U& element) { @@ -2316,6 +2421,7 @@ private: std::atomic blockIndex; // To be used by producer only -- consumer must use the ones in referenced by blockIndex + index_t pr_newTailIndex; size_t pr_blockIndexSlotsUsed; size_t pr_blockIndexSize; size_t pr_blockIndexFront; // Next slot (not current)