9#include "libxr_def.hpp"
10#include "libxr_mem.hpp"
56 SPSCQueueBase(
size_t element_size,
size_t element_align,
size_t capacity)
74 ASSERT(
capacity_ <= std::numeric_limits<size_t>::max() - 1);
78 payloads_ =
static_cast<std::byte*
>(::operator
new[](
102 if (value ==
nullptr)
107 const auto current_tail =
tail_.load(std::memory_order_relaxed);
108 const auto next_tail =
Increment(current_tail);
110 if (next_tail ==
head_.load(std::memory_order_acquire))
116 tail_.store(next_tail, std::memory_order_release);
131 const auto current_head =
head_.load(std::memory_order_relaxed);
133 if (current_head ==
tail_.load(std::memory_order_acquire))
138 if (value !=
nullptr)
157 if (value ==
nullptr)
162 const auto current_head =
head_.load(std::memory_order_relaxed);
163 if (current_head ==
tail_.load(std::memory_order_acquire))
191 const auto current_tail =
tail_.load(std::memory_order_relaxed);
192 const auto current_head =
head_.load(std::memory_order_acquire);
194 const size_t free_space =
195 (current_tail >= current_head) ? (capacity - (current_tail - current_head) - 1)
196 : (current_head - current_tail - 1);
198 if (free_space < count)
203 const auto* src =
static_cast<const std::byte*
>(data);
204 for (
size_t index = 0; index < count; ++index)
210 tail_.store((current_tail + count) % capacity, std::memory_order_release);
228 template <
typename Writer>
236 const auto current_tail =
tail_.load(std::memory_order_relaxed);
237 const auto current_head =
head_.load(std::memory_order_acquire);
239 const size_t free_space =
240 (current_tail >= current_head) ? (capacity - (current_tail - current_head) - 1)
241 : (current_head - current_tail - 1);
243 if (free_space < count)
248 const size_t first_chunk = std::min(count, capacity - current_tail);
249 Writer& writer_ref = writer;
256 if (count > first_chunk)
265 tail_.store((current_tail + count) % capacity, std::memory_order_release);
285 const auto current_head =
head_.load(std::memory_order_relaxed);
286 const auto current_tail =
tail_.load(std::memory_order_acquire);
288 const size_t available = (current_tail >= current_head)
289 ? (current_tail - current_head)
290 : (capacity - current_head + current_tail);
292 if (available < count)
297 auto* dst =
static_cast<std::byte*
>(data);
300 for (
size_t index = 0; index < count; ++index)
303 PayloadPtr((current_head + index) % capacity),
308 head_.store((current_head + count) % capacity, std::memory_order_release);
326 template <
typename Reader>
334 const auto current_head =
head_.load(std::memory_order_relaxed);
335 const auto current_tail =
tail_.load(std::memory_order_acquire);
337 const size_t available = (current_tail >= current_head)
338 ? (current_tail - current_head)
339 : (capacity - current_head + current_tail);
341 if (available < count)
346 const size_t first_chunk = std::min(count, capacity - current_head);
347 Reader& reader_ref = reader;
354 if (count > first_chunk)
363 head_.store((current_head + count) % capacity, std::memory_order_release);
387 const auto current_head =
head_.load(std::memory_order_relaxed);
388 const auto current_tail =
tail_.load(std::memory_order_acquire);
390 const size_t available = (current_tail >= current_head)
391 ? (current_tail - current_head)
392 : (capacity - current_head + current_tail);
394 if (available < count)
399 auto* dst =
static_cast<std::byte*
>(data);
400 for (
size_t index = 0; index < count; ++index)
403 PayloadPtr((current_head + index) % capacity),
414 head_.store(0, std::memory_order_relaxed);
415 tail_.store(0, std::memory_order_relaxed);
424 const auto current_head =
head_.load(std::memory_order_acquire);
425 const auto current_tail =
tail_.load(std::memory_order_acquire);
426 return (current_tail >= current_head) ? (current_tail - current_head)
500 ASSERT((align & (align - 1)) == 0);
501 ASSERT(size <= std::numeric_limits<size_t>::max() - (align - 1));
502 return ((size + align - 1) / align) * align;
505 static size_t ComputeStride(
size_t element_size,
size_t element_align)
507 ASSERT(element_size > 0);
508 ASSERT(element_align > 0);
509 ASSERT((element_align & (element_align - 1)) == 0);
521 if (lhs == 0 || rhs == 0)
526 ASSERT(lhs <= std::numeric_limits<size_t>::max() / rhs);
static void FastCopy(void *dst, const void *src, size_t size)
快速内存拷贝 / Fast memory copy
单生产者单消费者字节队列内核 / Single-producer single-consumer byte-queue core
size_t MaxSize() const
获取队列最大容量 / Get the maximum queue capacity
void Reset()
重置队列状态 / Reset the queue state
SPSCQueueBase(SPSCQueueBase &&)
禁止移动构造。 Non-movable.
std::atomic< IndexType > head_
下一个待出队的环形下标。 Next ring index to dequeue.
~SPSCQueueBase()
析构 SPSC 字节队列内核 / Destroy the SPSC byte-queue core
SPSCQueueBase(size_t element_size, size_t element_align, size_t capacity)
构造 SPSC 字节队列内核并指定元素对齐 / Construct the SPSC byte-queue core with explicit element alignment
IndexType Increment(IndexType index) const
沿环形缓冲区推进一个槽位 / Advance one slot along the ring
static size_t MultiplyChecked(size_t lhs, size_t rhs)
安全地计算两个字节数的乘积 / Safely multiply two byte counts
size_t IndexType
环形缓冲区索引类型 / Ring-buffer index type.
std::atomic< IndexType > tail_
下一个待入队的环形下标。 Next ring index to enqueue.
const size_t payload_alloc_align_
整体分配对齐。 Allocation alignment for the payload buffer.
SPSCQueueBase(const SPSCQueueBase &)
禁止拷贝构造。 Non-copyable.
ErrorCode PeekBytes(void *value)
按字节查看一个队头 payload 但不出队 / Peek one front payload by bytes without dequeuing it
std::byte * payloads_
payload 字节缓冲区。 Byte buffer storing payloads.
ErrorCode PeekBatchBytes(void *data, size_t count)
按字节批量查看多个 payload 但不出队 / Peek multiple payloads by bytes without dequeuing them
const size_t payload_stride_
相邻 payload 槽位之间的步长。 Byte stride between adjacent payload slots.
size_t Size() const
获取当前已用元素数 / Get the current element count
static size_t AlignUpChecked(size_t size, size_t align)
安全地向上对齐字节数 / Safely align one byte count upward
const size_t capacity_
队列容量。 Queue capacity.
std::byte * PayloadPtr(IndexType index)
获取指定槽位 payload 起始地址 / Get the payload base address of one slot
SPSCQueueBase(size_t element_size, size_t capacity)
构造 SPSC 字节队列内核 / Construct the SPSC byte-queue core
ErrorCode PushBytes(const void *value)
按字节入队一个 payload / Enqueue one payload by bytes
SPSCQueueBase & operator=(const SPSCQueueBase &)
禁止拷贝赋值。 Non-copy-assignable.
SPSCQueueBase & operator=(SPSCQueueBase &&)
禁止移动赋值。 Non-move-assignable.
size_t EmptySize() const
获取剩余空槽数 / Get the current free-slot count
size_t RingCapacity() const
获取环形缓冲区的物理槽位总数 / Get the physical ring-slot count
ErrorCode PopBatchBytes(void *data, size_t count)
按字节批量出队多个 payload / Dequeue multiple payloads by bytes
ErrorCode PushBatchBytes(const void *data, size_t count)
按字节批量入队多个 payload / Enqueue multiple payloads by bytes
ErrorCode PopBytesWithReader(size_t count, Reader &&reader)
通过读取器回调批量出队 payload / Dequeue payloads through a reader callback
const std::byte * PayloadPtr(IndexType index) const
获取指定槽位 payload 起始地址(只读) / Get the payload base address of one slot (const)
const size_t element_size_
单个 payload 的字节数。 Byte size of one payload.
ErrorCode PopBytes(void *value=nullptr)
按字节出队一个 payload;传空指针时仅丢弃队头元素 / Dequeue one payload by bytes; pass null to discard the front item only
ErrorCode PushBytesWithWriter(size_t count, Writer &&writer)
通过写入器回调批量入队 payload / Enqueue payloads through a writer callback
@ PTR_NULL
空指针 | Null pointer
@ OK
操作成功 | Operation successful
constexpr size_t CONCURRENCY_ALIGNMENT
并发结构对齐粒度(用于降低多核伪共享) / Alignment policy used by concurrency-oriented structures