libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
spsc_queue_base.hpp
1#pragma once
2
3#include <algorithm>
4#include <atomic>
5#include <cstddef>
6#include <limits>
7#include <new>
8
9#include "libxr_def.hpp"
10#include "libxr_mem.hpp"
11
12namespace LibXR
13{
27class alignas(LibXR::CONCURRENCY_ALIGNMENT) SPSCQueueBase
28{
29 public:
30 using IndexType = size_t;
31
37 SPSCQueueBase(size_t element_size, size_t capacity)
38 : element_size_(element_size),
39 capacity_(capacity),
40 payload_alloc_align_(alignof(std::max_align_t)),
41 payload_stride_(ComputeStride(element_size, alignof(std::byte))),
42 payloads_(nullptr),
43 head_(0),
44 tail_(0)
45 {
46 InitStorage();
47 }
48
56 SPSCQueueBase(size_t element_size, size_t element_align, size_t capacity)
57 : element_size_(element_size),
58 capacity_(capacity),
59 payload_alloc_align_(alignof(std::max_align_t)),
60 payload_stride_(ComputeStride(element_size, element_align)),
61 payloads_(nullptr),
62 head_(0),
63 tail_(0)
64 {
65 InitStorage();
66 }
67
68 private:
69 void InitStorage()
70 {
71 ASSERT(element_size_ > 0);
72 ASSERT(payload_alloc_align_ > 0);
73 ASSERT(capacity_ > 0);
74 ASSERT(capacity_ <= std::numeric_limits<size_t>::max() - 1);
75 ASSERT((payload_alloc_align_ & (payload_alloc_align_ - 1)) == 0);
76
77 const size_t payload_bytes = MultiplyChecked(payload_stride_, RingCapacity());
78 payloads_ = static_cast<std::byte*>(::operator new[](
79 payload_bytes, std::align_val_t(payload_alloc_align_)));
80 }
81
82 public:
83
88 {
89 ::operator delete[](payloads_, std::align_val_t(payload_alloc_align_));
90 }
91
100 ErrorCode PushBytes(const void* value)
101 {
102 if (value == nullptr)
103 {
104 return ErrorCode::PTR_NULL;
105 }
106
107 const auto current_tail = tail_.load(std::memory_order_relaxed);
108 const auto next_tail = Increment(current_tail);
109
110 if (next_tail == head_.load(std::memory_order_acquire))
111 {
112 return ErrorCode::FULL;
113 }
114
115 LibXR::Memory::FastCopy(PayloadPtr(current_tail), value, element_size_);
116 tail_.store(next_tail, std::memory_order_release);
117 return ErrorCode::OK;
118 }
119
129 ErrorCode PopBytes(void* value = nullptr)
130 {
131 const auto current_head = head_.load(std::memory_order_relaxed);
132
133 if (current_head == tail_.load(std::memory_order_acquire))
134 {
135 return ErrorCode::EMPTY;
136 }
137
138 if (value != nullptr)
139 {
140 LibXR::Memory::FastCopy(value, PayloadPtr(current_head), element_size_);
141 }
142
143 head_.store(Increment(current_head), std::memory_order_release);
144 return ErrorCode::OK;
145 }
146
155 ErrorCode PeekBytes(void* value)
156 {
157 if (value == nullptr)
158 {
159 return ErrorCode::PTR_NULL;
160 }
161
162 const auto current_head = head_.load(std::memory_order_relaxed);
163 if (current_head == tail_.load(std::memory_order_acquire))
164 {
165 return ErrorCode::EMPTY;
166 }
167
168 LibXR::Memory::FastCopy(value, PayloadPtr(current_head), element_size_);
169 return ErrorCode::OK;
170 }
171
180 ErrorCode PushBatchBytes(const void* data, size_t count)
181 {
182 if (count == 0U)
183 {
184 return ErrorCode::OK;
185 }
186 if (data == nullptr)
187 {
188 return ErrorCode::PTR_NULL;
189 }
190
191 const auto current_tail = tail_.load(std::memory_order_relaxed);
192 const auto current_head = head_.load(std::memory_order_acquire);
193 const size_t capacity = RingCapacity();
194 const size_t free_space =
195 (current_tail >= current_head) ? (capacity - (current_tail - current_head) - 1)
196 : (current_head - current_tail - 1);
197
198 if (free_space < count)
199 {
200 return ErrorCode::FULL;
201 }
202
203 const auto* src = static_cast<const std::byte*>(data);
204 for (size_t index = 0; index < count; ++index)
205 {
206 LibXR::Memory::FastCopy(PayloadPtr((current_tail + index) % capacity),
207 src + index * element_size_, element_size_);
208 }
209
210 tail_.store((current_tail + count) % capacity, std::memory_order_release);
211 return ErrorCode::OK;
212 }
213
228 template <typename Writer>
229 ErrorCode PushBytesWithWriter(size_t count, Writer&& writer)
230 {
231 if (count == 0U)
232 {
233 return ErrorCode::OK;
234 }
235
236 const auto current_tail = tail_.load(std::memory_order_relaxed);
237 const auto current_head = head_.load(std::memory_order_acquire);
238 const size_t capacity = RingCapacity();
239 const size_t free_space =
240 (current_tail >= current_head) ? (capacity - (current_tail - current_head) - 1)
241 : (current_head - current_tail - 1);
242
243 if (free_space < count)
244 {
245 return ErrorCode::FULL;
246 }
247
248 const size_t first_chunk = std::min(count, capacity - current_tail);
249 Writer& writer_ref = writer;
250 const ErrorCode first_ec = writer_ref(PayloadPtr(current_tail), first_chunk);
251 if (first_ec != ErrorCode::OK)
252 {
253 return first_ec;
254 }
255
256 if (count > first_chunk)
257 {
258 const ErrorCode second_ec = writer_ref(PayloadPtr(0), count - first_chunk);
259 if (second_ec != ErrorCode::OK)
260 {
261 return second_ec;
262 }
263 }
264
265 tail_.store((current_tail + count) % capacity, std::memory_order_release);
266 return ErrorCode::OK;
267 }
268
278 ErrorCode PopBatchBytes(void* data, size_t count)
279 {
280 if (count == 0U)
281 {
282 return ErrorCode::OK;
283 }
284
285 const auto current_head = head_.load(std::memory_order_relaxed);
286 const auto current_tail = tail_.load(std::memory_order_acquire);
287 const size_t capacity = RingCapacity();
288 const size_t available = (current_tail >= current_head)
289 ? (current_tail - current_head)
290 : (capacity - current_head + current_tail);
291
292 if (available < count)
293 {
294 return ErrorCode::EMPTY;
295 }
296
297 auto* dst = static_cast<std::byte*>(data);
298 if (dst != nullptr)
299 {
300 for (size_t index = 0; index < count; ++index)
301 {
303 PayloadPtr((current_head + index) % capacity),
305 }
306 }
307
308 head_.store((current_head + count) % capacity, std::memory_order_release);
309 return ErrorCode::OK;
310 }
311
326 template <typename Reader>
327 ErrorCode PopBytesWithReader(size_t count, Reader&& reader)
328 {
329 if (count == 0U)
330 {
331 return ErrorCode::OK;
332 }
333
334 const auto current_head = head_.load(std::memory_order_relaxed);
335 const auto current_tail = tail_.load(std::memory_order_acquire);
336 const size_t capacity = RingCapacity();
337 const size_t available = (current_tail >= current_head)
338 ? (current_tail - current_head)
339 : (capacity - current_head + current_tail);
340
341 if (available < count)
342 {
343 return ErrorCode::EMPTY;
344 }
345
346 const size_t first_chunk = std::min(count, capacity - current_head);
347 Reader& reader_ref = reader;
348 const ErrorCode first_ec = reader_ref(PayloadPtr(current_head), first_chunk);
349 if (first_ec != ErrorCode::OK)
350 {
351 return first_ec;
352 }
353
354 if (count > first_chunk)
355 {
356 const ErrorCode second_ec = reader_ref(PayloadPtr(0), count - first_chunk);
357 if (second_ec != ErrorCode::OK)
358 {
359 return second_ec;
360 }
361 }
362
363 head_.store((current_head + count) % capacity, std::memory_order_release);
364 return ErrorCode::OK;
365 }
366
376 ErrorCode PeekBatchBytes(void* data, size_t count)
377 {
378 if (count == 0U)
379 {
380 return ErrorCode::OK;
381 }
382 if (data == nullptr)
383 {
384 return ErrorCode::PTR_NULL;
385 }
386
387 const auto current_head = head_.load(std::memory_order_relaxed);
388 const auto current_tail = tail_.load(std::memory_order_acquire);
389 const size_t capacity = RingCapacity();
390 const size_t available = (current_tail >= current_head)
391 ? (current_tail - current_head)
392 : (capacity - current_head + current_tail);
393
394 if (available < count)
395 {
396 return ErrorCode::EMPTY;
397 }
398
399 auto* dst = static_cast<std::byte*>(data);
400 for (size_t index = 0; index < count; ++index)
401 {
403 PayloadPtr((current_head + index) % capacity),
405 }
406 return ErrorCode::OK;
407 }
408
412 void Reset()
413 {
414 head_.store(0, std::memory_order_relaxed);
415 tail_.store(0, std::memory_order_relaxed);
416 }
417
422 size_t Size() const
423 {
424 const auto current_head = head_.load(std::memory_order_acquire);
425 const auto current_tail = tail_.load(std::memory_order_acquire);
426 return (current_tail >= current_head) ? (current_tail - current_head)
427 : (RingCapacity() - current_head + current_tail);
428 }
429
434 size_t EmptySize() const { return capacity_ - Size(); }
435
440 size_t MaxSize() const { return capacity_; }
441
442 private:
448 std::byte* PayloadPtr(IndexType index)
449 {
450 return payloads_ + index * payload_stride_;
451 }
452
460 const std::byte* PayloadPtr(IndexType index) const
461 {
462 return payloads_ + index * payload_stride_;
463 }
464
470 size_t RingCapacity() const { return capacity_ + 1; }
471
478 {
479 return (index + 1) % RingCapacity();
480 }
481
490
497 static size_t AlignUpChecked(size_t size, size_t align)
498 {
499 ASSERT(align > 0);
500 ASSERT((align & (align - 1)) == 0);
501 ASSERT(size <= std::numeric_limits<size_t>::max() - (align - 1));
502 return ((size + align - 1) / align) * align;
503 }
504
505 static size_t ComputeStride(size_t element_size, size_t element_align)
506 {
507 ASSERT(element_size > 0);
508 ASSERT(element_align > 0);
509 ASSERT((element_align & (element_align - 1)) == 0);
510 return AlignUpChecked(element_size, element_align);
511 }
512
519 static size_t MultiplyChecked(size_t lhs, size_t rhs)
520 {
521 if (lhs == 0 || rhs == 0)
522 {
523 return 0;
524 }
525
526 ASSERT(lhs <= std::numeric_limits<size_t>::max() / rhs);
527 return lhs * rhs;
528 }
529
530 const size_t element_size_;
531 const size_t capacity_;
532 const size_t payload_alloc_align_;
533 const size_t payload_stride_;
534 std::byte* payloads_;
535
536 alignas(LibXR::CONCURRENCY_ALIGNMENT) std::atomic<IndexType>
538 alignas(LibXR::CONCURRENCY_ALIGNMENT) std::atomic<IndexType>
540};
541} // namespace LibXR
static void FastCopy(void *dst, const void *src, size_t size)
快速内存拷贝 / Fast memory copy
单生产者单消费者字节队列内核 / Single-producer single-consumer byte-queue core
size_t MaxSize() const
获取队列最大容量 / Get the maximum queue capacity
void Reset()
重置队列状态 / Reset the queue state
SPSCQueueBase(SPSCQueueBase &&)
禁止移动构造。 Non-movable.
std::atomic< IndexType > head_
下一个待出队的环形下标。 Next ring index to dequeue.
~SPSCQueueBase()
析构 SPSC 字节队列内核 / Destroy the SPSC byte-queue core
SPSCQueueBase(size_t element_size, size_t element_align, size_t capacity)
构造 SPSC 字节队列内核并指定元素对齐 / Construct the SPSC byte-queue core with explicit element alignment
IndexType Increment(IndexType index) const
沿环形缓冲区推进一个槽位 / Advance one slot along the ring
static size_t MultiplyChecked(size_t lhs, size_t rhs)
安全地计算两个字节数的乘积 / Safely multiply two byte counts
size_t IndexType
环形缓冲区索引类型 / Ring-buffer index type.
std::atomic< IndexType > tail_
下一个待入队的环形下标。 Next ring index to enqueue.
const size_t payload_alloc_align_
整体分配对齐。 Allocation alignment for the payload buffer.
SPSCQueueBase(const SPSCQueueBase &)
禁止拷贝构造。 Non-copyable.
ErrorCode PeekBytes(void *value)
按字节查看一个队头 payload 但不出队 / Peek one front payload by bytes without dequeuing it
std::byte * payloads_
payload 字节缓冲区。 Byte buffer storing payloads.
ErrorCode PeekBatchBytes(void *data, size_t count)
按字节批量查看多个 payload 但不出队 / Peek multiple payloads by bytes without dequeuing them
const size_t payload_stride_
相邻 payload 槽位之间的步长。 Byte stride between adjacent payload slots.
size_t Size() const
获取当前已用元素数 / Get the current element count
static size_t AlignUpChecked(size_t size, size_t align)
安全地向上对齐字节数 / Safely align one byte count upward
const size_t capacity_
队列容量。 Queue capacity.
std::byte * PayloadPtr(IndexType index)
获取指定槽位 payload 起始地址 / Get the payload base address of one slot
SPSCQueueBase(size_t element_size, size_t capacity)
构造 SPSC 字节队列内核 / Construct the SPSC byte-queue core
ErrorCode PushBytes(const void *value)
按字节入队一个 payload / Enqueue one payload by bytes
SPSCQueueBase & operator=(const SPSCQueueBase &)
禁止拷贝赋值。 Non-copy-assignable.
SPSCQueueBase & operator=(SPSCQueueBase &&)
禁止移动赋值。 Non-move-assignable.
size_t EmptySize() const
获取剩余空槽数 / Get the current free-slot count
size_t RingCapacity() const
获取环形缓冲区的物理槽位总数 / Get the physical ring-slot count
ErrorCode PopBatchBytes(void *data, size_t count)
按字节批量出队多个 payload / Dequeue multiple payloads by bytes
ErrorCode PushBatchBytes(const void *data, size_t count)
按字节批量入队多个 payload / Enqueue multiple payloads by bytes
ErrorCode PopBytesWithReader(size_t count, Reader &&reader)
通过读取器回调批量出队 payload / Dequeue payloads through a reader callback
const std::byte * PayloadPtr(IndexType index) const
获取指定槽位 payload 起始地址(只读) / Get the payload base address of one slot (const)
const size_t element_size_
单个 payload 的字节数。 Byte size of one payload.
ErrorCode PopBytes(void *value=nullptr)
按字节出队一个 payload;传空指针时仅丢弃队头元素 / Dequeue one payload by bytes; pass null to discard the front item only
ErrorCode PushBytesWithWriter(size_t count, Writer &&writer)
通过写入器回调批量入队 payload / Enqueue payloads through a writer callback
LibXR 命名空间
Definition ch32_can.hpp:14
ErrorCode
定义错误码枚举
@ PTR_NULL
空指针 | Null pointer
@ EMPTY
为空 | Empty
@ FULL
已满 | Full
@ OK
操作成功 | Operation successful
constexpr size_t CONCURRENCY_ALIGNMENT
并发结构对齐粒度(用于降低多核伪共享) / Alignment policy used by concurrency-oriented structures
Definition libxr_def.hpp:60