8#include "libxr_def.hpp"
9#include "libxr_mem.hpp"
28template <
typename Data>
31 inline constexpr size_t AlignUp(
size_t size,
size_t align)
33 return (size / align + 1) * align;
51 queue_handle_(new Data[LENGTH + 1])
68 Data*
operator[](uint32_t index) {
return &queue_handle_[
static_cast<size_t>(index)]; }
77 template <
typename ElementData = Data>
80 const auto CURRENT_TAIL = tail_.load(std::memory_order_relaxed);
81 const auto NEXT_TAIL = Increment(CURRENT_TAIL);
83 if (NEXT_TAIL == head_.load(std::memory_order_acquire))
88 queue_handle_[CURRENT_TAIL] = std::forward<ElementData>(item);
89 tail_.store(NEXT_TAIL, std::memory_order_release);
100 template <
typename ElementData = Data>
103 auto current_head = head_.load(std::memory_order_relaxed);
107 if (current_head == tail_.load(std::memory_order_acquire))
112 item = queue_handle_[current_head];
114 if (head_.compare_exchange_weak(current_head, Increment(current_head),
115 std::memory_order_acq_rel,
116 std::memory_order_relaxed))
143 auto current_head = head_.load(std::memory_order_relaxed);
147 if (current_head == tail_.load(std::memory_order_acquire))
152 item = queue_handle_[current_head];
154 if (head_.compare_exchange_weak(current_head, Increment(current_head),
155 std::memory_order_acq_rel,
156 std::memory_order_relaxed))
172 auto current_head = head_.load(std::memory_order_relaxed);
176 if (current_head == tail_.load(std::memory_order_acquire))
181 if (head_.compare_exchange_weak(current_head, Increment(current_head),
182 std::memory_order_acq_rel,
183 std::memory_order_relaxed))
202 auto current_head = head_.load(std::memory_order_relaxed);
203 if (current_head == tail_.load(std::memory_order_acquire))
208 item = queue_handle_[current_head];
210 if (head_.load(std::memory_order_acquire) == current_head)
227 auto current_tail = tail_.load(std::memory_order_relaxed);
228 auto current_head = head_.load(std::memory_order_acquire);
230 size_t capacity = LENGTH + 1;
231 size_t free_space = (current_tail >= current_head)
232 ? (capacity - (current_tail - current_head) - 1)
233 : (current_head - current_tail - 1);
235 if (free_space < size)
240 size_t first_chunk =
LibXR::min(size, capacity - current_tail);
242 reinterpret_cast<const void*
>(data),
243 first_chunk *
sizeof(Data));
245 if (size > first_chunk)
248 reinterpret_cast<const void*
>(data + first_chunk),
249 (size - first_chunk) *
sizeof(Data));
252 tail_.store((current_tail + size) % capacity, std::memory_order_release);
268 template <
typename Writer>
271 static_assert(std::is_invocable_v<Writer&, Data*, size_t>,
272 "PushWithWriter writer must be callable as "
273 "ErrorCode(Data* buffer, size_t chunk_size)");
274 using WriterRet = std::invoke_result_t<Writer&, Data*, size_t>;
275 static_assert(std::is_convertible_v<WriterRet, ErrorCode>,
276 "PushWithWriter writer return type must be convertible to ErrorCode");
283 const auto current_tail = tail_.load(std::memory_order_relaxed);
284 const auto current_head = head_.load(std::memory_order_acquire);
285 const size_t capacity = LENGTH + 1;
286 const size_t free_space =
287 (current_tail >= current_head) ? (capacity - (current_tail - current_head) - 1)
288 : (current_head - current_tail - 1);
290 if (free_space < size)
295 const size_t first_chunk =
LibXR::min(size, capacity -
static_cast<size_t>(current_tail));
296 Writer& writer_ref = writer;
297 const ErrorCode first_ec = writer_ref(queue_handle_ + current_tail, first_chunk);
303 if (size > first_chunk)
305 const ErrorCode second_ec = writer_ref(queue_handle_, size - first_chunk);
312 tail_.store((current_tail + size) % capacity, std::memory_order_release);
328 template <
typename Reader>
331 static_assert(std::is_invocable_v<Reader&, const Data*, size_t>,
332 "PopWithReader reader must be callable as "
333 "ErrorCode(const Data* buffer, size_t chunk_size)");
334 using ReaderRet = std::invoke_result_t<Reader&, const Data*, size_t>;
335 static_assert(std::is_convertible_v<ReaderRet, ErrorCode>,
336 "PopWithReader reader return type must be convertible to ErrorCode");
343 const auto current_head = head_.load(std::memory_order_relaxed);
344 const auto current_tail = tail_.load(std::memory_order_acquire);
345 const size_t capacity = LENGTH + 1;
346 const size_t available = (current_tail >= current_head)
347 ? (current_tail - current_head)
348 : (capacity - current_head + current_tail);
350 if (available < size)
355 const size_t first_chunk =
LibXR::min(size, capacity -
static_cast<size_t>(current_head));
356 Reader& reader_ref = reader;
357 const ErrorCode first_ec = reader_ref(queue_handle_ + current_head, first_chunk);
363 if (size > first_chunk)
365 const ErrorCode second_ec = reader_ref(queue_handle_, size - first_chunk);
372 head_.store((current_head + size) % capacity, std::memory_order_release);
385 size_t capacity = LENGTH + 1;
389 auto current_head = head_.load(std::memory_order_relaxed);
390 auto current_tail = tail_.load(std::memory_order_acquire);
392 size_t available = (current_tail >= current_head)
393 ? (current_tail - current_head)
394 : (capacity - current_head + current_tail);
396 if (available < size)
403 size_t first_chunk =
LibXR::min(size, capacity - current_head);
405 reinterpret_cast<void*
>(data),
406 reinterpret_cast<const void*
>(queue_handle_ + current_head),
407 first_chunk *
sizeof(Data));
409 if (size > first_chunk)
412 reinterpret_cast<const void*
>(queue_handle_),
413 (size - first_chunk) *
sizeof(Data));
417 size_t new_head = (current_head + size) % capacity;
419 if (head_.compare_exchange_weak(current_head, new_head, std::memory_order_acq_rel,
420 std::memory_order_relaxed))
443 const size_t CAPACITY = LENGTH + 1;
447 auto current_head = head_.load(std::memory_order_relaxed);
448 auto current_tail = tail_.load(std::memory_order_acquire);
450 size_t available = (current_tail >= current_head)
451 ? (current_tail - current_head)
452 : (CAPACITY - current_head + current_tail);
454 if (available < size)
461 size_t first_chunk =
LibXR::min(size, CAPACITY - current_head);
463 reinterpret_cast<void*
>(data),
464 reinterpret_cast<const void*
>(queue_handle_ + current_head),
465 first_chunk *
sizeof(Data));
467 if (size > first_chunk)
470 reinterpret_cast<const void*
>(queue_handle_),
471 (size - first_chunk) *
sizeof(Data));
475 if (head_.load(std::memory_order_acquire) == current_head)
490 head_.store(0, std::memory_order_relaxed);
491 tail_.store(0, std::memory_order_relaxed);
500 const auto CURRENT_HEAD = head_.load(std::memory_order_acquire);
501 const auto CURRENT_TAIL = tail_.load(std::memory_order_acquire);
502 return (CURRENT_TAIL >= CURRENT_HEAD) ? (CURRENT_TAIL - CURRENT_HEAD)
503 : ((LENGTH + 1) - CURRENT_HEAD + CURRENT_TAIL);
522 uint32_t Increment(uint32_t index)
const {
return (index + 1) % (LENGTH + 1); }
无锁队列实现 / Lock-free queue implementation
void Reset()
重置队列 / Resets the queue
size_t MaxSize() const
获取队列的最大容量 / Returns the maximum capacity of the queue
ErrorCode Pop(ElementData &item)
从队列中弹出数据 / Pops data from the queue
ErrorCode Pop()
从队列中弹出数据(不返回数据) / Pops data from the queue (without returning data)
ErrorCode PushBatch(const Data *data, size_t size)
批量推入数据 / Pushes multiple elements into the queue
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
ErrorCode Pop(Data &item)
从队列中移除头部元素,并获取该元素的数据 (Remove the front element from the queue and retrieve its data).
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
ErrorCode PushWithWriter(size_t size, Writer &&writer)
通过写入器回调写入固定长度数据(单生产者) / Push fixed-size data via writer callback (single producer)
LockFreeQueue(size_t length)
构造函数 / Constructor
ErrorCode PopWithReader(size_t size, Reader &&reader)
通过读取器回调弹出固定长度数据(单消费者) / Pop fixed-size data via reader callback (single consumer)
ErrorCode PeekBatch(Data *data, size_t size)
批量查看队列中的数据(不移除) / Peeks multiple elements from the queue without removing them
Data * operator[](uint32_t index)
获取指定索引的数据指针 / Retrieves the data pointer at a specified index
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue
~LockFreeQueue()
析构函数 / Destructor
ErrorCode PopBatch(Data *data, size_t size)
批量弹出数据 / Pops multiple elements from the queue
ErrorCode Peek(Data &item)
获取队列头部数据但不弹出 / Retrieves the front data of the queue without popping
static void FastCopy(void *dst, const void *src, size_t size)
快速内存拷贝 / Fast memory copy
@ OK
操作成功 | Operation successful
constexpr auto min(LeftType a, RightType b) -> std::common_type_t< LeftType, RightType >
计算两个数的最小值
constexpr size_t CACHE_LINE_SIZE
缓存行大小 / Cache line size
constexpr size_t ALIGN_SIZE
平台自然对齐大小 / Native platform alignment size