libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
lockfree_queue.hpp
1#pragma once
2
3#include <atomic>
4
5#include "libxr_def.hpp"
6
7namespace LibXR
8{
9
24template <typename Data>
25class alignas(LIBXR_CACHE_LINE_SIZE) LockFreeQueue
26{
27 inline constexpr size_t AlignUp(size_t size, size_t align)
28 {
29 return ((size + align - 1) / align) * align;
30 }
31
32 public:
43 LockFreeQueue(size_t length)
44 : head_(0),
45 tail_(0),
46 LENGTH(AlignUp(length, LIBXR_ALIGN_SIZE) - 1),
47 queue_handle_(new Data[LENGTH + 1])
48 {
49 }
50
57 ~LockFreeQueue() { delete[] queue_handle_; }
58
64 Data *operator[](uint32_t index) { return &queue_handle_[static_cast<size_t>(index)]; }
65
73 template <typename ElementData = Data>
74 ErrorCode Push(ElementData &&item)
75 {
76 const auto CURRENT_TAIL = tail_.load(std::memory_order_relaxed);
77 const auto NEXT_TAIL = Increment(CURRENT_TAIL);
78
79 if (NEXT_TAIL == head_.load(std::memory_order_acquire))
80 {
81 return ErrorCode::FULL;
82 }
83
84 queue_handle_[CURRENT_TAIL] = std::forward<ElementData>(item);
85 tail_.store(NEXT_TAIL, std::memory_order_release);
86 return ErrorCode::OK;
87 }
88
96 template <typename ElementData = Data>
97 ErrorCode Pop(ElementData &item)
98 {
99 auto current_head = head_.load(std::memory_order_relaxed);
100
101 while (true)
102 {
103 if (current_head == tail_.load(std::memory_order_acquire))
104 {
105 return ErrorCode::EMPTY;
106 }
107
108 item = queue_handle_[current_head];
109
110 if (head_.compare_exchange_weak(current_head, Increment(current_head),
111 std::memory_order_acq_rel,
112 std::memory_order_relaxed))
113 {
114 return ErrorCode::OK;
115 }
116 }
117 }
118
137 ErrorCode Pop(Data &item)
138 {
139 auto current_head = head_.load(std::memory_order_relaxed);
140
141 while (true)
142 {
143 if (current_head == tail_.load(std::memory_order_acquire))
144 {
145 return ErrorCode::EMPTY;
146 }
147
148 item = queue_handle_[current_head];
149
150 if (head_.compare_exchange_weak(current_head, Increment(current_head),
151 std::memory_order_acq_rel,
152 std::memory_order_relaxed))
153 {
154 return ErrorCode::OK;
155 }
156 }
157 }
158
166 ErrorCode Pop()
167 {
168 auto current_head = head_.load(std::memory_order_relaxed);
169
170 while (true)
171 {
172 if (current_head == tail_.load(std::memory_order_acquire))
173 {
174 return ErrorCode::EMPTY;
175 }
176
177 if (head_.compare_exchange_weak(current_head, Increment(current_head),
178 std::memory_order_acq_rel,
179 std::memory_order_relaxed))
180 {
181 return ErrorCode::OK;
182 }
183 }
184 }
185
194 ErrorCode Peek(Data &item)
195 {
196 while (true)
197 {
198 auto current_head = head_.load(std::memory_order_relaxed);
199 if (current_head == tail_.load(std::memory_order_acquire))
200 {
201 return ErrorCode::EMPTY;
202 }
203
204 item = queue_handle_[current_head];
205
206 if (head_.load(std::memory_order_acquire) == current_head)
207 {
208 return ErrorCode::OK;
209 }
210 }
211 }
212
221 ErrorCode PushBatch(const Data *data, size_t size)
222 {
223 auto current_tail = tail_.load(std::memory_order_relaxed);
224 auto current_head = head_.load(std::memory_order_acquire);
225
226 size_t capacity = LENGTH + 1;
227 size_t free_space = (current_tail >= current_head)
228 ? (capacity - (current_tail - current_head) - 1)
229 : (current_head - current_tail - 1);
230
231 if (free_space < size)
232 {
233 return ErrorCode::FULL;
234 }
235
236 size_t first_chunk = LibXR::min(size, capacity - current_tail);
237 LibXR::Memory::FastCopy(reinterpret_cast<void *>(queue_handle_ + current_tail),
238 reinterpret_cast<const void *>(data),
239 first_chunk * sizeof(Data));
240
241 if (size > first_chunk)
242 {
243 LibXR::Memory::FastCopy(reinterpret_cast<void *>(queue_handle_),
244 reinterpret_cast<const void *>(data + first_chunk),
245 (size - first_chunk) * sizeof(Data));
246 }
247
248 tail_.store((current_tail + size) % capacity, std::memory_order_release);
249 return ErrorCode::OK;
250 }
251
260 ErrorCode PopBatch(Data *data, size_t size)
261 {
262 size_t capacity = LENGTH + 1;
263
264 while (true)
265 {
266 auto current_head = head_.load(std::memory_order_relaxed);
267 auto current_tail = tail_.load(std::memory_order_acquire);
268
269 size_t available = (current_tail >= current_head)
270 ? (current_tail - current_head)
271 : (capacity - current_head + current_tail);
272
273 if (available < size)
274 {
275 return ErrorCode::EMPTY;
276 }
277
278 if (data != nullptr)
279 {
280 size_t first_chunk = LibXR::min(size, capacity - current_head);
282 reinterpret_cast<void *>(data),
283 reinterpret_cast<const void *>(queue_handle_ + current_head),
284 first_chunk * sizeof(Data));
285
286 if (size > first_chunk)
287 {
288 LibXR::Memory::FastCopy(reinterpret_cast<void *>(data + first_chunk),
289 reinterpret_cast<const void *>(queue_handle_),
290 (size - first_chunk) * sizeof(Data));
291 }
292 }
293
294 size_t new_head = (current_head + size) % capacity;
295
296 if (head_.compare_exchange_weak(current_head, new_head, std::memory_order_acq_rel,
297 std::memory_order_relaxed))
298 {
299 return ErrorCode::OK;
300 }
301 }
302 }
303
313 ErrorCode PeekBatch(Data *data, size_t size)
314 {
315 if (size == 0)
316 {
317 return ErrorCode::OK;
318 }
319
320 const size_t CAPACITY = LENGTH + 1;
321
322 while (true)
323 {
324 auto current_head = head_.load(std::memory_order_relaxed);
325 auto current_tail = tail_.load(std::memory_order_acquire);
326
327 size_t available = (current_tail >= current_head)
328 ? (current_tail - current_head)
329 : (CAPACITY - current_head + current_tail);
330
331 if (available < size)
332 {
333 return ErrorCode::EMPTY;
334 }
335
336 if (data != nullptr)
337 {
338 size_t first_chunk = LibXR::min(size, CAPACITY - current_head);
340 reinterpret_cast<void *>(data),
341 reinterpret_cast<const void *>(queue_handle_ + current_head),
342 first_chunk * sizeof(Data));
343
344 if (size > first_chunk)
345 {
346 LibXR::Memory::FastCopy(reinterpret_cast<void *>(data + first_chunk),
347 reinterpret_cast<const void *>(queue_handle_),
348 (size - first_chunk) * sizeof(Data));
349 }
350 }
351
352 if (head_.load(std::memory_order_acquire) == current_head)
353 {
354 return ErrorCode::OK;
355 }
356 }
357 }
358
365 void Reset()
366 {
367 head_.store(0, std::memory_order_relaxed);
368 tail_.store(0, std::memory_order_relaxed);
369 }
370
375 size_t Size() const
376 {
377 const auto CURRENT_HEAD = head_.load(std::memory_order_acquire);
378 const auto CURRENT_TAIL = tail_.load(std::memory_order_acquire);
379 return (CURRENT_TAIL >= CURRENT_HEAD) ? (CURRENT_TAIL - CURRENT_HEAD)
380 : ((LENGTH + 1) - CURRENT_HEAD + CURRENT_TAIL);
381 }
382
386 size_t EmptySize() { return LENGTH - Size(); }
387
388 private:
389 alignas(LIBXR_CACHE_LINE_SIZE) std::atomic<uint32_t> head_;
390 alignas(LIBXR_CACHE_LINE_SIZE) std::atomic<uint32_t> tail_;
391 const size_t LENGTH;
392 Data *queue_handle_;
393
394 uint32_t Increment(uint32_t index) const { return (index + 1) % (LENGTH + 1); }
395};
396
397} // namespace LibXR
无锁队列实现 / Lock-free queue implementation
void Reset()
重置队列 / Resets the queue
ErrorCode Pop(ElementData &item)
从队列中弹出数据 / Pops data from the queue
ErrorCode Pop()
从队列中弹出数据(不返回数据) / Pops data from the queue (without returning data)
ErrorCode PushBatch(const Data *data, size_t size)
批量推入数据 / Pushes multiple elements into the queue
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
ErrorCode Pop(Data &item)
从队列中移除头部元素,并获取该元素的数据 (Remove the front element from the queue and retrieve its data).
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
LockFreeQueue(size_t length)
构造函数 / Constructor
ErrorCode PeekBatch(Data *data, size_t size)
批量查看队列中的数据(不移除) / Peeks multiple elements from the queue without removing them
Data * operator[](uint32_t index)
获取指定索引的数据指针 / Retrieves the data pointer at a specified index
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue
~LockFreeQueue()
析构函数 / Destructor
ErrorCode PopBatch(Data *data, size_t size)
批量弹出数据 / Pops multiple elements from the queue
ErrorCode Peek(Data &item)
获取队列头部数据但不弹出 / Retrieves the front data of the queue without popping
static void FastCopy(void *dst, const void *src, size_t size)
快速内存拷贝 / Fast memory copy
Definition libxr_mem.cpp:3
LibXR 命名空间
constexpr auto min(T1 a, T2 b) -> typename std::common_type< T1, T2 >::type
计算两个数的最小值