5#include "libxr_def.hpp"
24template <
typename Data>
36 : head_(0), tail_(0), queue_handle_(new Data[length + 1]), LENGTH(length)
53 Data *
operator[](uint32_t index) {
return &queue_handle_[
static_cast<size_t>(index)]; }
62 template <
typename ElementData = Data>
63 ErrorCode
Push(ElementData &&item)
65 const auto CURRENT_TAIL = tail_.load(std::memory_order_relaxed);
66 const auto NEXT_TAIL = Increment(CURRENT_TAIL);
68 if (NEXT_TAIL == head_.load(std::memory_order_acquire))
70 return ErrorCode::FULL;
73 queue_handle_[CURRENT_TAIL] = std::forward<ElementData>(item);
74 tail_.store(NEXT_TAIL, std::memory_order_release);
85 template <
typename ElementData = Data>
86 ErrorCode
Pop(ElementData &item)
88 auto current_head = head_.load(std::memory_order_relaxed);
92 if (current_head == tail_.load(std::memory_order_acquire))
94 return ErrorCode::EMPTY;
97 if (head_.compare_exchange_weak(current_head, Increment(current_head),
98 std::memory_order_acquire,
99 std::memory_order_relaxed))
101 item = queue_handle_[current_head];
102 return ErrorCode::OK;
131 auto current_head = head_.load(std::memory_order_relaxed);
135 if (current_head == tail_.load(std::memory_order_acquire))
137 return ErrorCode::EMPTY;
140 if (head_.compare_exchange_weak(current_head, Increment(current_head),
141 std::memory_order_acquire,
142 std::memory_order_relaxed))
144 std::atomic_thread_fence(std::memory_order_acquire);
145 item = queue_handle_[current_head];
146 return ErrorCode::OK;
148 current_head = head_.load(std::memory_order_relaxed);
161 auto current_head = head_.load(std::memory_order_relaxed);
165 if (current_head == tail_.load(std::memory_order_acquire))
167 return ErrorCode::EMPTY;
170 if (head_.compare_exchange_weak(current_head, Increment(current_head),
171 std::memory_order_acquire,
172 std::memory_order_relaxed))
174 return ErrorCode::OK;
176 current_head = head_.load(std::memory_order_relaxed);
190 const auto CURRENT_HEAD = head_.load(std::memory_order_acquire);
191 if (CURRENT_HEAD == tail_.load(std::memory_order_acquire))
193 return ErrorCode::EMPTY;
196 item = queue_handle_[CURRENT_HEAD];
197 return ErrorCode::OK;
210 auto current_tail = tail_.load(std::memory_order_relaxed);
211 auto current_head = head_.load(std::memory_order_acquire);
213 size_t capacity = LENGTH + 1;
214 size_t free_space = (current_tail >= current_head)
215 ? (capacity - (current_tail - current_head) - 1)
216 : (current_head - current_tail - 1);
218 if (free_space < size)
220 return ErrorCode::FULL;
223 size_t first_chunk =
LibXR::min(size, capacity - current_tail);
224 memcpy(queue_handle_ + current_tail, data, first_chunk *
sizeof(Data));
226 if (size > first_chunk)
228 memcpy(queue_handle_, data + first_chunk, (size - first_chunk) *
sizeof(Data));
231 tail_.store((current_tail + size) % capacity, std::memory_order_release);
232 return ErrorCode::OK;
245 size_t capacity = LENGTH + 1;
249 auto current_head = head_.load(std::memory_order_relaxed);
250 auto current_tail = tail_.load(std::memory_order_acquire);
252 size_t available = (current_tail >= current_head)
253 ? (current_tail - current_head)
254 : (capacity - current_head + current_tail);
256 if (available < size)
258 return ErrorCode::EMPTY;
263 size_t first_chunk =
LibXR::min(size, capacity - current_head);
264 memcpy(data, queue_handle_ + current_head, first_chunk *
sizeof(Data));
266 if (size > first_chunk)
268 memcpy(data + first_chunk, queue_handle_, (size - first_chunk) *
sizeof(Data));
272 size_t new_head = (current_head + size) % capacity;
274 if (head_.compare_exchange_weak(current_head, new_head, std::memory_order_acquire,
275 std::memory_order_relaxed))
277 return ErrorCode::OK;
293 size_t capacity = LENGTH + 1;
297 auto current_head = head_.load(std::memory_order_relaxed);
298 auto current_tail = tail_.load(std::memory_order_acquire);
300 size_t available = (current_tail >= current_head)
301 ? (current_tail - current_head)
302 : (capacity - current_head + current_tail);
304 if (available < size)
306 return ErrorCode::EMPTY;
309 size_t first_chunk =
LibXR::min(size, capacity - current_head);
310 memcpy(data, queue_handle_ + current_head, first_chunk *
sizeof(Data));
312 if (size > first_chunk)
314 memcpy(data + first_chunk, queue_handle_, (size - first_chunk) *
sizeof(Data));
317 if (head_.load(std::memory_order_acquire) == current_head)
319 return ErrorCode::OK;
332 head_.store(0, std::memory_order_relaxed);
333 tail_.store(0, std::memory_order_relaxed);
342 const auto CURRENT_HEAD = head_.load(std::memory_order_acquire);
343 const auto CURRENT_TAIL = tail_.load(std::memory_order_acquire);
344 return (CURRENT_TAIL >= CURRENT_HEAD) ? (CURRENT_TAIL - CURRENT_HEAD)
345 : ((LENGTH + 1) - CURRENT_HEAD + CURRENT_TAIL);
354 alignas(LIBXR_CACHE_LINE_SIZE) std::atomic<uint32_t> head_;
355 alignas(LIBXR_CACHE_LINE_SIZE) std::atomic<uint32_t> tail_;
359 uint32_t Increment(uint32_t index)
const {
return (index + 1) % (LENGTH + 1); }
无锁队列实现 / Lock-free queue implementation
void Reset()
重置队列 / Resets the queue
ErrorCode Pop(ElementData &item)
从队列中弹出数据 / Pops data from the queue
ErrorCode Pop()
从队列中弹出数据(不返回数据) / Pops data from the queue (without returning data)
ErrorCode PushBatch(const Data *data, size_t size)
批量推入数据 / Pushes multiple elements into the queue
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
ErrorCode Pop(Data &item)
从队列中移除头部元素,并获取该元素的数据 (Remove the front element from the queue and retrieve its data).
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
LockFreeQueue(size_t length)
构造函数 / Constructor
ErrorCode PeekBatch(Data *data, size_t size)
批量查看队列中的数据(不移除) / Peeks multiple elements from the queue without removing them
Data * operator[](uint32_t index)
获取指定索引的数据指针 / Retrieves the data pointer at a specified index
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue
~LockFreeQueue()
析构函数 / Destructor
ErrorCode PopBatch(Data *data, size_t size)
批量弹出数据 / Pops multiple elements from the queue
ErrorCode Peek(Data &item)
获取队列头部数据但不弹出 / Retrieves the front data of the queue without popping
constexpr auto min(T1 a, T2 b) -> typename std::common_type< T1, T2 >::type
计算两个数的最小值