Separate enqueue allocation functionality.

This commit is contained in:
Bartosz Taudul 2017-10-03 14:13:46 +02:00
parent 16a49356a0
commit 439a23049d

View File

@ -1900,77 +1900,84 @@ private:
return true; return true;
} }
template<AllocationMode allocMode>
inline void enqueue_begin_alloc()
{
// We reached the end of a block, start a new one
if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
// We can re-use the block ahead of us, it's empty!
this->tailBlock = this->tailBlock->next;
this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
// We'll put the block on the block index (guaranteed to be room since we're conceptually removing the
// last block from it first -- except instead of removing then adding, we can just overwrite).
// Note that there must be a valid block index here, since even if allocation failed in the ctor,
// it would have been re-attempted when adding the first block to the queue; since there is such
// a block, a block index must have been successfully allocated.
}
else {
// Whatever head value we see here is >= the last value we saw here (relatively),
// and <= its current value. Since we have the most recent tail, the head must be
// <= to it.
auto head = this->headIndex.load(std::memory_order_relaxed);
assert(!details::circular_less_than<index_t>(pr_currentTailIndex, head));
if (!details::circular_less_than<index_t>(head, pr_currentTailIndex + BLOCK_SIZE)
|| (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < pr_currentTailIndex - head))) {
// We can't enqueue in another block because there's not enough leeway -- the
// tail could surpass the head by the time the block fills up! (Or we'll exceed
// the size limit, if the second part of the condition was true.)
return;
}
// We're going to need a new block; check that the block index has room
if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
// Hmm, the circular block index is already full -- we'll need
// to allocate a new index. Note pr_blockIndexRaw can only be nullptr if
// the initial allocation failed in the constructor.
if (allocMode == CannotAlloc || !new_block_index(pr_blockIndexSlotsUsed)) {
return;
}
}
// Insert a new block in the circular linked list
auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
if (newBlock == nullptr) {
return;
}
#if MCDBGQ_TRACKMEM
newBlock->owner = this;
#endif
newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
if (this->tailBlock == nullptr) {
newBlock->next = newBlock;
}
else {
newBlock->next = this->tailBlock->next;
this->tailBlock->next = newBlock;
}
this->tailBlock = newBlock;
++pr_blockIndexSlotsUsed;
}
// Add block to block index
auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
entry.base = pr_currentTailIndex;
entry.block = this->tailBlock;
blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
}
template<AllocationMode allocMode> template<AllocationMode allocMode>
inline T* enqueue_begin() inline T* enqueue_begin()
{ {
pr_currentTailIndex = this->tailIndex.load(std::memory_order_relaxed); pr_currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
if ((pr_currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) { if ((pr_currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0) {
// We reached the end of a block, start a new one return (*this->tailBlock)[pr_currentTailIndex];
if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) { }
// We can re-use the block ahead of us, it's empty! else {
this->tailBlock = this->tailBlock->next; this->enqueue_begin_alloc<allocMode>();
this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>(); return (*this->tailBlock)[pr_currentTailIndex];
// We'll put the block on the block index (guaranteed to be room since we're conceptually removing the
// last block from it first -- except instead of removing then adding, we can just overwrite).
// Note that there must be a valid block index here, since even if allocation failed in the ctor,
// it would have been re-attempted when adding the first block to the queue; since there is such
// a block, a block index must have been successfully allocated.
}
else {
// Whatever head value we see here is >= the last value we saw here (relatively),
// and <= its current value. Since we have the most recent tail, the head must be
// <= to it.
auto head = this->headIndex.load(std::memory_order_relaxed);
assert(!details::circular_less_than<index_t>(pr_currentTailIndex, head));
if (!details::circular_less_than<index_t>(head, pr_currentTailIndex + BLOCK_SIZE)
|| (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < pr_currentTailIndex - head))) {
// We can't enqueue in another block because there's not enough leeway -- the
// tail could surpass the head by the time the block fills up! (Or we'll exceed
// the size limit, if the second part of the condition was true.)
return nullptr;
}
// We're going to need a new block; check that the block index has room
if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
// Hmm, the circular block index is already full -- we'll need
// to allocate a new index. Note pr_blockIndexRaw can only be nullptr if
// the initial allocation failed in the constructor.
if (allocMode == CannotAlloc || !new_block_index(pr_blockIndexSlotsUsed)) {
return nullptr;
}
}
// Insert a new block in the circular linked list
auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
if (newBlock == nullptr) {
return nullptr;
}
#if MCDBGQ_TRACKMEM
newBlock->owner = this;
#endif
newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
if (this->tailBlock == nullptr) {
newBlock->next = newBlock;
}
else {
newBlock->next = this->tailBlock->next;
this->tailBlock->next = newBlock;
}
this->tailBlock = newBlock;
++pr_blockIndexSlotsUsed;
}
// Add block to block index
auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
entry.base = pr_currentTailIndex;
entry.block = this->tailBlock;
blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
} }
// Enqueue
return (*this->tailBlock)[pr_currentTailIndex];
} }
inline void enqueue_finish() inline void enqueue_finish()