libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
LibXR::MPMCQueueBase Class Reference

有界 MPMC 字节队列内核 / Bounded MPMC byte-queue core More...

#include <mpmc_queue_base.hpp>

Inheritance diagram for LibXR::MPMCQueueBase:
[legend]
Collaboration diagram for LibXR::MPMCQueueBase:
[legend]

Data Structures

struct  SequenceCell
 每个逻辑槽对应的序号单元。 Sequence cell for one logical slot. More...
 

Public Types

using SequenceType = size_t
 单调递增的逻辑序号类型 / Monotonic logical sequence type.
 
using SequenceDiffType
 序号差值判定类型 / Signed type used for sequence-delta checks.
 

Public Member Functions

 MPMCQueueBase (size_t element_size, size_t capacity)
 构造一个字节队列内核 / Construct one byte-queue core
 
 ~MPMCQueueBase ()
 析构字节队列内核 / Destroy the byte-queue core
 
ErrorCode PushBytes (const void *value)
 按字节入队一个 payload / Enqueue one payload by bytes
 
ErrorCode PopBytes (void *value=nullptr)
 按字节出队一个 payload;传空指针时只丢弃队头元素 / Dequeue one payload by bytes; pass null to discard the front item only
 
size_t MaxSize () const
 获取队列最大容量 / Get the maximum queue capacity
 
size_t Size () const
 获取并发快照下的当前元素数 / Get the current approximate element count
 
size_t EmptySize () const
 获取剩余空槽数 / Get the current free-slot count
 
size_t ElementSize () const
 获取单个 payload 的字节数 / Get the byte size of one payload
 

Private Member Functions

void * PayloadPtr (size_t index)
 获取指定槽位 payload 起始地址。 Get the payload base address of one slot.
 
const void * PayloadPtr (size_t index) const
 获取指定槽位 payload 起始地址(只读)。 Get the payload base address of one slot (const).
 
 MPMCQueueBase (const MPMCQueueBase &)
 禁止拷贝构造。 Non-copyable.
 
MPMCQueueBaseoperator= (const MPMCQueueBase &)
 禁止拷贝赋值。 Non-copy-assignable.
 
 MPMCQueueBase (MPMCQueueBase &&)
 禁止移动构造。 Non-movable.
 
MPMCQueueBaseoperator= (MPMCQueueBase &&)
 禁止移动赋值。 Non-move-assignable.
 

Static Private Member Functions

static size_t AlignUpChecked (size_t value, size_t align)
 安全地向上对齐字节数。 Safely align one byte count upward.
 
static size_t MultiplyChecked (size_t lhs, size_t rhs)
 安全地计算乘积。 Safely multiply two size values.
 

Private Attributes

const size_t element_size_
 单个 payload 的字节数。 Byte size of one payload.
 
const size_t capacity_
 队列容量。 Queue capacity.
 
const size_t payload_stride_
 相邻 payload 槽位之间的步长。 Byte stride between adjacent payload slots.
 
SequenceCellsequences_
 槽序号数组。 Array of per-slot sequence cells.
 
std::byte * payloads_
 payload 字节缓冲区。 Byte buffer storing payloads.
 
std::atomic< SequenceTypehead_
 下一个待出队的逻辑位置。 Next logical dequeue position.
 
std::atomic< SequenceTypetail_
 下一个待入队的逻辑位置。 Next logical enqueue position.
 

Static Private Attributes

static constexpr size_t PAYLOAD_ALLOC_ALIGN
 payload 缓冲区整体分配对齐 / Allocation alignment used for the whole payload buffer
 

Detailed Description

有界 MPMC 字节队列内核 / Bounded MPMC byte-queue core

这个内核把并发协议和字节搬运集中在一个非模板实现里,以减少不同 payload 类型各自实例化一整套无锁协议所带来的 flash 膨胀。它只负责搬运固定大小、 默认字宽对齐的字节 payload;类型语义由上层薄包装负责。

This core keeps the concurrency protocol and byte-copying logic in one non-template implementation so different payload types do not each instantiate a full copy of the lock-free protocol. It only moves fixed-size, word-aligned byte payloads; type semantics are handled by thin wrappers above it.

Definition at line 28 of file mpmc_queue_base.hpp.

Member Typedef Documentation

◆ SequenceDiffType

Initial value:
std::make_signed_t<SequenceType>

序号差值判定类型 / Signed type used for sequence-delta checks.

Definition at line 32 of file mpmc_queue_base.hpp.

◆ SequenceType

单调递增的逻辑序号类型 / Monotonic logical sequence type.

Definition at line 31 of file mpmc_queue_base.hpp.

Constructor & Destructor Documentation

◆ MPMCQueueBase()

LibXR::MPMCQueueBase::MPMCQueueBase ( size_t element_size,
size_t capacity )

构造一个字节队列内核 / Construct one byte-queue core

构造字节队列内核 / Construct the byte-queue core

Parameters
element_size单个 payload 的字节数 / Byte size of one payload
capacity队列容量 / Queue capacity

Definition at line 15 of file mpmc_queue_base.cpp.

16 : element_size_(element_size),
17 capacity_(capacity),
19 sequences_(nullptr),
20 payloads_(nullptr),
21 head_(0),
22 tail_(0)
23{
24 REQUIRE(element_size_ > 0);
25 REQUIRE(capacity_ > 1);
26 REQUIRE(capacity_ <= static_cast<size_t>(std::numeric_limits<SequenceDiffType>::max()));
27
28 const size_t payload_bytes = MultiplyChecked(payload_stride_, capacity_);
29 sequences_ = new (std::align_val_t(alignof(SequenceCell))) SequenceCell[capacity_];
30
31 payloads_ = static_cast<std::byte*>(
32 ::operator new[](payload_bytes, std::align_val_t(PAYLOAD_ALLOC_ALIGN)));
33
34 for (size_t index = 0; index < capacity_; ++index)
35 {
36 sequences_[index].value.store(static_cast<SequenceType>(index),
37 std::memory_order_relaxed);
38 }
39}
size_t SequenceType
单调递增的逻辑序号类型 / Monotonic logical sequence type.
static size_t MultiplyChecked(size_t lhs, size_t rhs)
安全地计算乘积。 Safely multiply two size values.
static size_t AlignUpChecked(size_t value, size_t align)
安全地向上对齐字节数。 Safely align one byte count upward.
std::byte * payloads_
payload 字节缓冲区。 Byte buffer storing payloads.
const size_t payload_stride_
相邻 payload 槽位之间的步长。 Byte stride between adjacent payload slots.
std::atomic< SequenceType > head_
下一个待出队的逻辑位置。 Next logical dequeue position.
const size_t capacity_
队列容量。 Queue capacity.
static constexpr size_t PAYLOAD_ALLOC_ALIGN
payload 缓冲区整体分配对齐 / Allocation alignment used for the whole payload buffer
std::atomic< SequenceType > tail_
下一个待入队的逻辑位置。 Next logical enqueue position.
SequenceCell * sequences_
槽序号数组。 Array of per-slot sequence cells.
const size_t element_size_
单个 payload 的字节数。 Byte size of one payload.
std::atomic< SequenceType > value
当前槽的逻辑序号。 Current logical sequence of the slot.

◆ ~MPMCQueueBase()

LibXR::MPMCQueueBase::~MPMCQueueBase ( )

析构字节队列内核 / Destroy the byte-queue core

Definition at line 44 of file mpmc_queue_base.cpp.

45{
46 ::operator delete[](payloads_, std::align_val_t(PAYLOAD_ALLOC_ALIGN));
47 delete[] sequences_;
48}

Member Function Documentation

◆ AlignUpChecked()

size_t LibXR::MPMCQueueBase::AlignUpChecked ( size_t value,
size_t align )
staticnodiscardprivate

安全地向上对齐字节数。 Safely align one byte count upward.

向上对齐到指定粒度 / Align one byte count upward to the target granularity

Parameters
value待对齐字节数 / Byte count to align
align目标对齐粒度 / Target alignment granularity

Definition at line 171 of file mpmc_queue_base.cpp.

172{
173 REQUIRE(align > 0);
174 REQUIRE(value <= std::numeric_limits<size_t>::max() - (align - 1));
175 return ((value + align - 1) / align) * align;
176}

◆ ElementSize()

size_t LibXR::MPMCQueueBase::ElementSize ( ) const
inlinenodiscard

获取单个 payload 的字节数 / Get the byte size of one payload

Returns
单个 payload 的字节数 / Byte size of one payload

Definition at line 94 of file mpmc_queue_base.hpp.

94{ return element_size_; }

◆ EmptySize()

size_t LibXR::MPMCQueueBase::EmptySize ( ) const
inlinenodiscard

获取剩余空槽数 / Get the current free-slot count

Returns
当前空槽个数 / Current number of free slots

Definition at line 89 of file mpmc_queue_base.hpp.

89{ return capacity_ - Size(); }
size_t Size() const
获取并发快照下的当前元素数 / Get the current approximate element count

◆ MaxSize()

size_t LibXR::MPMCQueueBase::MaxSize ( ) const
inlinenodiscard

获取队列最大容量 / Get the maximum queue capacity

Returns
队列容量 / Queue capacity

Definition at line 70 of file mpmc_queue_base.hpp.

70{ return capacity_; }

◆ MultiplyChecked()

size_t LibXR::MPMCQueueBase::MultiplyChecked ( size_t lhs,
size_t rhs )
staticnodiscardprivate

安全地计算乘积。 Safely multiply two size values.

安全地计算两个字节数的乘积 / Safely multiply two byte counts

Parameters
lhs左操作数 / Left operand
rhs右操作数 / Right operand
Returns
乘积结果 / Product result

Definition at line 184 of file mpmc_queue_base.cpp.

185{
186 if (lhs == 0 || rhs == 0)
187 {
188 return 0;
189 }
190
191 REQUIRE(lhs <= std::numeric_limits<size_t>::max() / rhs);
192 return lhs * rhs;
193}

◆ PayloadPtr() [1/2]

void * LibXR::MPMCQueueBase::PayloadPtr ( size_t index)
nodiscardprivate

获取指定槽位 payload 起始地址。 Get the payload base address of one slot.

获取指定槽位 payload 起始地址 / Get the payload base address of one slot

Parameters
index槽位下标 / Slot index

Definition at line 151 of file mpmc_queue_base.cpp.

152{
153 return payloads_ + index * payload_stride_;
154}

◆ PayloadPtr() [2/2]

const void * LibXR::MPMCQueueBase::PayloadPtr ( size_t index) const
nodiscardprivate

获取指定槽位 payload 起始地址(只读)。 Get the payload base address of one slot (const).

获取指定槽位 payload 起始地址(只读) / Get the payload base address of one slot (const)

Parameters
index槽位下标 / Slot index

Definition at line 161 of file mpmc_queue_base.cpp.

162{
163 return payloads_ + index * payload_stride_;
164}

◆ PopBytes()

ErrorCode LibXR::MPMCQueueBase::PopBytes ( void * value = nullptr)

按字节出队一个 payload;传空指针时只丢弃队头元素 / Dequeue one payload by bytes; pass null to discard the front item only

按字节出队一个 payload / Dequeue one payload by bytes

Parameters
value用于接收 payload 的缓冲区;传 nullptr 时仅丢弃 / Buffer that receives the payload; pass nullptr to discard only
Returns
成功返回 ErrorCode::OK;队列空返回 ErrorCode::EMPTY Returns ErrorCode::OK on success; returns ErrorCode::EMPTY when the queue is empty
Parameters
value用于接收 payload 的缓冲区;传 nullptr 时仅丢弃队头元素 / Buffer that receives the payload; pass nullptr to discard the front element only

Definition at line 96 of file mpmc_queue_base.cpp.

97{
98 SequenceType position = head_.load(std::memory_order_relaxed);
99
100 while (true)
101 {
102 SequenceCell& slot = sequences_[position % capacity_];
103 const SequenceType sequence = slot.value.load(std::memory_order_acquire);
104 const SequenceType expected_ready = position + 1;
105 const SequenceDiffType diff =
106 static_cast<SequenceDiffType>(sequence - expected_ready);
107
108 if (diff == 0)
109 {
110 if (head_.compare_exchange_weak(position, position + 1, std::memory_order_relaxed,
111 std::memory_order_relaxed))
112 {
113 if (value != nullptr)
114 {
116 }
117 slot.value.store(position + static_cast<SequenceType>(capacity_),
118 std::memory_order_release);
119 return ErrorCode::OK;
120 }
121 continue;
122 }
123
124 if (diff < 0)
125 {
126 return ErrorCode::EMPTY;
127 }
128
129 position = head_.load(std::memory_order_relaxed);
130 }
131}
std::make_signed_t< SequenceType > SequenceDiffType
序号差值判定类型 / Signed type used for sequence-delta checks.
void * PayloadPtr(size_t index)
获取指定槽位 payload 起始地址。 Get the payload base address of one slot.
static void FastCopy(void *dst, const void *src, size_t size)
快速内存拷贝 / Fast memory copy
@ EMPTY
为空 | Empty
@ OK
操作成功 | Operation successful

◆ PushBytes()

ErrorCode LibXR::MPMCQueueBase::PushBytes ( const void * value)

按字节入队一个 payload / Enqueue one payload by bytes

Parameters
value指向待入队 payload 的指针 / Pointer to the payload to enqueue
Returns
成功返回 ErrorCode::OK;队列满返回 ErrorCode::FULL;空指针返回 ErrorCode::PTR_NULL Returns ErrorCode::OK on success; ErrorCode::FULL when the queue is full; ErrorCode::PTR_NULL when value is null
Parameters
value指向待入队 payload 的指针 / Pointer to the payload to enqueue

Definition at line 54 of file mpmc_queue_base.cpp.

55{
56 if (value == nullptr)
57 {
59 }
60
61 SequenceType position = tail_.load(std::memory_order_relaxed);
62
63 while (true)
64 {
65 SequenceCell& slot = sequences_[position % capacity_];
66 const SequenceType sequence = slot.value.load(std::memory_order_acquire);
67 const SequenceDiffType diff = static_cast<SequenceDiffType>(sequence - position);
68
69 if (diff == 0)
70 {
71 if (tail_.compare_exchange_weak(position, position + 1, std::memory_order_relaxed,
72 std::memory_order_relaxed))
73 {
75 slot.value.store(position + 1, std::memory_order_release);
76 return ErrorCode::OK;
77 }
78 continue;
79 }
80
81 if (diff < 0)
82 {
83 return ErrorCode::FULL;
84 }
85
86 position = tail_.load(std::memory_order_relaxed);
87 }
88}
@ PTR_NULL
空指针 | Null pointer
@ FULL
已满 | Full

◆ Size()

size_t LibXR::MPMCQueueBase::Size ( ) const
nodiscard

获取并发快照下的当前元素数 / Get the current approximate element count

Note
该值是近似快照:
  • 在并发入队/出队时,该值可能已经过期;
  • 这里按 SequenceType 的模差值估算已用槽数,再钳到 [0, MaxSize()], 因此即使极端长寿命下序号回绕后,它仍然只是近似值,而不是精确值。 This value is an approximate snapshot:
  • it may already be stale while producers/consumers are progressing;
  • it is computed from modular SequenceType differences and then clamped to [0, MaxSize()], so it remains approximate rather than exact even after very long-lived sequence wraparound.
Returns
并发快照下的元素数,范围被钳在 [0, MaxSize()] Approximate element count from a concurrent snapshot, clamped to [0, MaxSize()]

Definition at line 139 of file mpmc_queue_base.cpp.

140{
141 const SequenceType head_snapshot = head_.load(std::memory_order_acquire);
142 const SequenceType tail_snapshot = tail_.load(std::memory_order_acquire);
143 const SequenceType used = tail_snapshot - head_snapshot;
144 return (used <= capacity_) ? used : capacity_;
145}

Field Documentation

◆ capacity_

const size_t LibXR::MPMCQueueBase::capacity_
private

队列容量。 Queue capacity.

Definition at line 127 of file mpmc_queue_base.hpp.

◆ element_size_

const size_t LibXR::MPMCQueueBase::element_size_
private

单个 payload 的字节数。 Byte size of one payload.

Definition at line 126 of file mpmc_queue_base.hpp.

◆ head_

std::atomic<SequenceType> LibXR::MPMCQueueBase::head_
private

下一个待出队的逻辑位置。 Next logical dequeue position.

Definition at line 133 of file mpmc_queue_base.hpp.

◆ PAYLOAD_ALLOC_ALIGN

size_t LibXR::MPMCQueueBase::PAYLOAD_ALLOC_ALIGN
staticconstexprprivate
Initial value:
=
std::max(alignof(size_t), alignof(std::max_align_t))

payload 缓冲区整体分配对齐 / Allocation alignment used for the whole payload buffer

Definition at line 114 of file mpmc_queue_base.hpp.

◆ payload_stride_

const size_t LibXR::MPMCQueueBase::payload_stride_
private

相邻 payload 槽位之间的步长。 Byte stride between adjacent payload slots.

Definition at line 128 of file mpmc_queue_base.hpp.

◆ payloads_

std::byte* LibXR::MPMCQueueBase::payloads_
private

payload 字节缓冲区。 Byte buffer storing payloads.

Definition at line 130 of file mpmc_queue_base.hpp.

◆ sequences_

SequenceCell* LibXR::MPMCQueueBase::sequences_
private

槽序号数组。 Array of per-slot sequence cells.

Definition at line 129 of file mpmc_queue_base.hpp.

◆ tail_

std::atomic<SequenceType> LibXR::MPMCQueueBase::tail_
private

下一个待入队的逻辑位置。 Next logical enqueue position.

Definition at line 135 of file mpmc_queue_base.hpp.


The documentation for this class was generated from the following files: