libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
mpmc_queue_base.cpp
1#include "mpmc_queue_base.hpp"
2
3#include <algorithm>
4#include <new>
5
6#include "libxr_mem.hpp"
7
8namespace LibXR
9{
15MPMCQueueBase::MPMCQueueBase(size_t element_size, size_t capacity)
16 : element_size_(element_size),
17 capacity_(capacity),
18 payload_stride_(AlignUpChecked(element_size_, alignof(size_t))),
19 sequences_(nullptr),
20 payloads_(nullptr),
21 head_(0),
22 tail_(0)
23{
24 REQUIRE(element_size_ > 0);
25 REQUIRE(capacity_ > 1);
26 REQUIRE(capacity_ <= static_cast<size_t>(std::numeric_limits<SequenceDiffType>::max()));
27
28 const size_t payload_bytes = MultiplyChecked(payload_stride_, capacity_);
29 sequences_ = new (std::align_val_t(alignof(SequenceCell))) SequenceCell[capacity_];
30
31 payloads_ = static_cast<std::byte*>(
32 ::operator new[](payload_bytes, std::align_val_t(PAYLOAD_ALLOC_ALIGN)));
33
34 for (size_t index = 0; index < capacity_; ++index)
35 {
36 sequences_[index].value.store(static_cast<SequenceType>(index),
37 std::memory_order_relaxed);
38 }
39}
40
45{
46 ::operator delete[](payloads_, std::align_val_t(PAYLOAD_ALLOC_ALIGN));
47 delete[] sequences_;
48}
49
55{
56 if (value == nullptr)
57 {
59 }
60
61 SequenceType position = tail_.load(std::memory_order_relaxed);
62
63 while (true)
64 {
65 SequenceCell& slot = sequences_[position % capacity_];
66 const SequenceType sequence = slot.value.load(std::memory_order_acquire);
67 const SequenceDiffType diff = static_cast<SequenceDiffType>(sequence - position);
68
69 if (diff == 0)
70 {
71 if (tail_.compare_exchange_weak(position, position + 1, std::memory_order_relaxed,
72 std::memory_order_relaxed))
73 {
75 slot.value.store(position + 1, std::memory_order_release);
76 return ErrorCode::OK;
77 }
78 continue;
79 }
80
81 if (diff < 0)
82 {
83 return ErrorCode::FULL;
84 }
85
86 position = tail_.load(std::memory_order_relaxed);
87 }
88}
89
97{
98 SequenceType position = head_.load(std::memory_order_relaxed);
99
100 while (true)
101 {
102 SequenceCell& slot = sequences_[position % capacity_];
103 const SequenceType sequence = slot.value.load(std::memory_order_acquire);
104 const SequenceType expected_ready = position + 1;
105 const SequenceDiffType diff =
106 static_cast<SequenceDiffType>(sequence - expected_ready);
107
108 if (diff == 0)
109 {
110 if (head_.compare_exchange_weak(position, position + 1, std::memory_order_relaxed,
111 std::memory_order_relaxed))
112 {
113 if (value != nullptr)
114 {
116 }
117 slot.value.store(position + static_cast<SequenceType>(capacity_),
118 std::memory_order_release);
119 return ErrorCode::OK;
120 }
121 continue;
122 }
123
124 if (diff < 0)
125 {
126 return ErrorCode::EMPTY;
127 }
128
129 position = head_.load(std::memory_order_relaxed);
130 }
131}
132
140{
141 const SequenceType head_snapshot = head_.load(std::memory_order_acquire);
142 const SequenceType tail_snapshot = tail_.load(std::memory_order_acquire);
143 const SequenceType used = tail_snapshot - head_snapshot;
144 return (used <= capacity_) ? used : capacity_;
145}
146
151void* MPMCQueueBase::PayloadPtr(size_t index)
152{
153 return payloads_ + index * payload_stride_;
154}
155
161const void* MPMCQueueBase::PayloadPtr(size_t index) const
162{
163 return payloads_ + index * payload_stride_;
164}
165
171size_t MPMCQueueBase::AlignUpChecked(size_t value, size_t align)
172{
173 REQUIRE(align > 0);
174 REQUIRE(value <= std::numeric_limits<size_t>::max() - (align - 1));
175 return ((value + align - 1) / align) * align;
176}
177
184size_t MPMCQueueBase::MultiplyChecked(size_t lhs, size_t rhs)
185{
186 if (lhs == 0 || rhs == 0)
187 {
188 return 0;
189 }
190
191 REQUIRE(lhs <= std::numeric_limits<size_t>::max() / rhs);
192 return lhs * rhs;
193}
194
195} // namespace LibXR
size_t SequenceType
单调递增的逻辑序号类型 / Monotonic logical sequence type.
MPMCQueueBase(size_t element_size, size_t capacity)
构造一个字节队列内核 / Construct one byte-queue core
static size_t MultiplyChecked(size_t lhs, size_t rhs)
安全地计算乘积。 Safely multiply two size values.
std::make_signed_t< SequenceType > SequenceDiffType
序号差值判定类型 / Signed type used for sequence-delta checks.
static size_t AlignUpChecked(size_t value, size_t align)
安全地向上对齐字节数。 Safely align one byte count upward.
size_t Size() const
获取并发快照下的当前元素数 / Get the current approximate element count
std::byte * payloads_
payload 字节缓冲区。 Byte buffer storing payloads.
const size_t payload_stride_
相邻 payload 槽位之间的步长。 Byte stride between adjacent payload slots.
std::atomic< SequenceType > head_
下一个待出队的逻辑位置。 Next logical dequeue position.
const size_t capacity_
队列容量。 Queue capacity.
ErrorCode PopBytes(void *value=nullptr)
按字节出队一个 payload;传空指针时只丢弃队头元素 / Dequeue one payload by bytes; pass null to discard the front item only
static constexpr size_t PAYLOAD_ALLOC_ALIGN
payload 缓冲区整体分配对齐 / Allocation alignment used for the whole payload buffer
void * PayloadPtr(size_t index)
获取指定槽位 payload 起始地址。 Get the payload base address of one slot.
~MPMCQueueBase()
析构字节队列内核 / Destroy the byte-queue core
std::atomic< SequenceType > tail_
下一个待入队的逻辑位置。 Next logical enqueue position.
SequenceCell * sequences_
槽序号数组。 Array of per-slot sequence cells.
ErrorCode PushBytes(const void *value)
按字节入队一个 payload / Enqueue one payload by bytes
const size_t element_size_
单个 payload 的字节数。 Byte size of one payload.
static void FastCopy(void *dst, const void *src, size_t size)
快速内存拷贝 / Fast memory copy
LibXR 命名空间
Definition ch32_can.hpp:14
ErrorCode
定义错误码枚举
@ PTR_NULL
空指针 | Null pointer
@ EMPTY
为空 | Empty
@ FULL
已满 | Full
@ OK
操作成功 | Operation successful
每个逻辑槽对应的序号单元。 Sequence cell for one logical slot.
std::atomic< SequenceType > value
当前槽的逻辑序号。 Current logical sequence of the slot.