libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
lockfree_queue.hpp
1#pragma once
2
3#include <atomic>
4#include <cstddef>
5
6#include "libxr_def.hpp"
7
8namespace LibXR
9{
10
25template <typename Data>
26class alignas(LIBXR_CACHE_LINE_SIZE) LockFreeQueue
27{
28 inline constexpr size_t AlignUp(size_t size, size_t align)
29 {
30 return (size / align + 1) * align;
31 }
32
33 public:
44 LockFreeQueue(size_t length)
45 : head_(0),
46 tail_(0),
47 LENGTH(AlignUp(length, LIBXR_ALIGN_SIZE) - 1),
48 queue_handle_(new Data[LENGTH + 1])
49 {
50 }
51
58 ~LockFreeQueue() { delete[] queue_handle_; }
59
65 Data* operator[](uint32_t index) { return &queue_handle_[static_cast<size_t>(index)]; }
66
74 template <typename ElementData = Data>
75 ErrorCode Push(ElementData&& item)
76 {
77 const auto CURRENT_TAIL = tail_.load(std::memory_order_relaxed);
78 const auto NEXT_TAIL = Increment(CURRENT_TAIL);
79
80 if (NEXT_TAIL == head_.load(std::memory_order_acquire))
81 {
82 return ErrorCode::FULL;
83 }
84
85 queue_handle_[CURRENT_TAIL] = std::forward<ElementData>(item);
86 tail_.store(NEXT_TAIL, std::memory_order_release);
87 return ErrorCode::OK;
88 }
89
97 template <typename ElementData = Data>
98 ErrorCode Pop(ElementData& item)
99 {
100 auto current_head = head_.load(std::memory_order_relaxed);
101
102 while (true)
103 {
104 if (current_head == tail_.load(std::memory_order_acquire))
105 {
106 return ErrorCode::EMPTY;
107 }
108
109 item = queue_handle_[current_head];
110
111 if (head_.compare_exchange_weak(current_head, Increment(current_head),
112 std::memory_order_acq_rel,
113 std::memory_order_relaxed))
114 {
115 return ErrorCode::OK;
116 }
117 }
118 }
119
138 ErrorCode Pop(Data& item)
139 {
140 auto current_head = head_.load(std::memory_order_relaxed);
141
142 while (true)
143 {
144 if (current_head == tail_.load(std::memory_order_acquire))
145 {
146 return ErrorCode::EMPTY;
147 }
148
149 item = queue_handle_[current_head];
150
151 if (head_.compare_exchange_weak(current_head, Increment(current_head),
152 std::memory_order_acq_rel,
153 std::memory_order_relaxed))
154 {
155 return ErrorCode::OK;
156 }
157 }
158 }
159
167 ErrorCode Pop()
168 {
169 auto current_head = head_.load(std::memory_order_relaxed);
170
171 while (true)
172 {
173 if (current_head == tail_.load(std::memory_order_acquire))
174 {
175 return ErrorCode::EMPTY;
176 }
177
178 if (head_.compare_exchange_weak(current_head, Increment(current_head),
179 std::memory_order_acq_rel,
180 std::memory_order_relaxed))
181 {
182 return ErrorCode::OK;
183 }
184 }
185 }
186
195 ErrorCode Peek(Data& item)
196 {
197 while (true)
198 {
199 auto current_head = head_.load(std::memory_order_relaxed);
200 if (current_head == tail_.load(std::memory_order_acquire))
201 {
202 return ErrorCode::EMPTY;
203 }
204
205 item = queue_handle_[current_head];
206
207 if (head_.load(std::memory_order_acquire) == current_head)
208 {
209 return ErrorCode::OK;
210 }
211 }
212 }
213
222 ErrorCode PushBatch(const Data* data, size_t size)
223 {
224 auto current_tail = tail_.load(std::memory_order_relaxed);
225 auto current_head = head_.load(std::memory_order_acquire);
226
227 size_t capacity = LENGTH + 1;
228 size_t free_space = (current_tail >= current_head)
229 ? (capacity - (current_tail - current_head) - 1)
230 : (current_head - current_tail - 1);
231
232 if (free_space < size)
233 {
234 return ErrorCode::FULL;
235 }
236
237 size_t first_chunk = LibXR::min(size, capacity - current_tail);
238 LibXR::Memory::FastCopy(reinterpret_cast<void*>(queue_handle_ + current_tail),
239 reinterpret_cast<const void*>(data),
240 first_chunk * sizeof(Data));
241
242 if (size > first_chunk)
243 {
244 LibXR::Memory::FastCopy(reinterpret_cast<void*>(queue_handle_),
245 reinterpret_cast<const void*>(data + first_chunk),
246 (size - first_chunk) * sizeof(Data));
247 }
248
249 tail_.store((current_tail + size) % capacity, std::memory_order_release);
250 return ErrorCode::OK;
251 }
252
261 ErrorCode PopBatch(Data* data, size_t size)
262 {
263 size_t capacity = LENGTH + 1;
264
265 while (true)
266 {
267 auto current_head = head_.load(std::memory_order_relaxed);
268 auto current_tail = tail_.load(std::memory_order_acquire);
269
270 size_t available = (current_tail >= current_head)
271 ? (current_tail - current_head)
272 : (capacity - current_head + current_tail);
273
274 if (available < size)
275 {
276 return ErrorCode::EMPTY;
277 }
278
279 if (data != nullptr)
280 {
281 size_t first_chunk = LibXR::min(size, capacity - current_head);
283 reinterpret_cast<void*>(data),
284 reinterpret_cast<const void*>(queue_handle_ + current_head),
285 first_chunk * sizeof(Data));
286
287 if (size > first_chunk)
288 {
289 LibXR::Memory::FastCopy(reinterpret_cast<void*>(data + first_chunk),
290 reinterpret_cast<const void*>(queue_handle_),
291 (size - first_chunk) * sizeof(Data));
292 }
293 }
294
295 size_t new_head = (current_head + size) % capacity;
296
297 if (head_.compare_exchange_weak(current_head, new_head, std::memory_order_acq_rel,
298 std::memory_order_relaxed))
299 {
300 return ErrorCode::OK;
301 }
302 }
303 }
304
314 ErrorCode PeekBatch(Data* data, size_t size)
315 {
316 if (size == 0)
317 {
318 return ErrorCode::OK;
319 }
320
321 const size_t CAPACITY = LENGTH + 1;
322
323 while (true)
324 {
325 auto current_head = head_.load(std::memory_order_relaxed);
326 auto current_tail = tail_.load(std::memory_order_acquire);
327
328 size_t available = (current_tail >= current_head)
329 ? (current_tail - current_head)
330 : (CAPACITY - current_head + current_tail);
331
332 if (available < size)
333 {
334 return ErrorCode::EMPTY;
335 }
336
337 if (data != nullptr)
338 {
339 size_t first_chunk = LibXR::min(size, CAPACITY - current_head);
341 reinterpret_cast<void*>(data),
342 reinterpret_cast<const void*>(queue_handle_ + current_head),
343 first_chunk * sizeof(Data));
344
345 if (size > first_chunk)
346 {
347 LibXR::Memory::FastCopy(reinterpret_cast<void*>(data + first_chunk),
348 reinterpret_cast<const void*>(queue_handle_),
349 (size - first_chunk) * sizeof(Data));
350 }
351 }
352
353 if (head_.load(std::memory_order_acquire) == current_head)
354 {
355 return ErrorCode::OK;
356 }
357 }
358 }
359
366 void Reset()
367 {
368 head_.store(0, std::memory_order_relaxed);
369 tail_.store(0, std::memory_order_relaxed);
370 }
371
376 size_t Size() const
377 {
378 const auto CURRENT_HEAD = head_.load(std::memory_order_acquire);
379 const auto CURRENT_TAIL = tail_.load(std::memory_order_acquire);
380 return (CURRENT_TAIL >= CURRENT_HEAD) ? (CURRENT_TAIL - CURRENT_HEAD)
381 : ((LENGTH + 1) - CURRENT_HEAD + CURRENT_TAIL);
382 }
383
387 size_t EmptySize() { return LENGTH - Size(); }
388
392 size_t MaxSize() const { return LENGTH; }
393
394 private:
395 alignas(LIBXR_CACHE_LINE_SIZE) std::atomic<uint32_t> head_;
396 alignas(LIBXR_CACHE_LINE_SIZE) std::atomic<uint32_t> tail_;
397 const size_t LENGTH;
398 Data* queue_handle_;
399
400 uint32_t Increment(uint32_t index) const { return (index + 1) % (LENGTH + 1); }
401};
402
403} // namespace LibXR
无锁队列实现 / Lock-free queue implementation
void Reset()
重置队列 / Resets the queue
size_t MaxSize() const
获取队列的最大容量 / Returns the maximum capacity of the queue
ErrorCode Pop(ElementData &item)
从队列中弹出数据 / Pops data from the queue
ErrorCode Pop()
从队列中弹出数据(不返回数据) / Pops data from the queue (without returning data)
ErrorCode PushBatch(const Data *data, size_t size)
批量推入数据 / Pushes multiple elements into the queue
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
ErrorCode Pop(Data &item)
从队列中移除头部元素,并获取该元素的数据 (Remove the front element from the queue and retrieve its data).
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
LockFreeQueue(size_t length)
构造函数 / Constructor
ErrorCode PeekBatch(Data *data, size_t size)
批量查看队列中的数据(不移除) / Peeks multiple elements from the queue without removing them
Data * operator[](uint32_t index)
获取指定索引的数据指针 / Retrieves the data pointer at a specified index
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue
~LockFreeQueue()
析构函数 / Destructor
ErrorCode PopBatch(Data *data, size_t size)
批量弹出数据 / Pops multiple elements from the queue
ErrorCode Peek(Data &item)
获取队列头部数据但不弹出 / Retrieves the front data of the queue without popping
static void FastCopy(void *dst, const void *src, size_t size)
快速内存拷贝 / Fast memory copy
Definition libxr_mem.cpp:3
LibXR 命名空间
Definition ch32_can.hpp:14
constexpr auto min(T1 a, T2 b) -> typename std::common_type< T1, T2 >::type
计算两个数的最小值