libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
lockfree_queue.hpp
1#pragma once
2
3#include <atomic>
4
5#include "libxr_def.hpp"
6
7namespace LibXR
8{
9
24template <typename Data>
25class alignas(LIBXR_CACHE_LINE_SIZE) LockFreeQueue
26{
27 inline constexpr size_t AlignUp(size_t size, size_t align)
28 {
29 return ((size + align - 1) / align) * align;
30 }
31
32 public:
40 LockFreeQueue(size_t length)
41 : head_(0),
42 tail_(0),
43 LENGTH(AlignUp(length, LIBXR_ALIGN_SIZE) - 1),
44 queue_handle_(new Data[LENGTH + 1])
45 {
46 }
47
54 ~LockFreeQueue() { delete[] queue_handle_; }
55
61 Data *operator[](uint32_t index) { return &queue_handle_[static_cast<size_t>(index)]; }
62
70 template <typename ElementData = Data>
71 ErrorCode Push(ElementData &&item)
72 {
73 const auto CURRENT_TAIL = tail_.load(std::memory_order_relaxed);
74 const auto NEXT_TAIL = Increment(CURRENT_TAIL);
75
76 if (NEXT_TAIL == head_.load(std::memory_order_acquire))
77 {
78 return ErrorCode::FULL;
79 }
80
81 queue_handle_[CURRENT_TAIL] = std::forward<ElementData>(item);
82 tail_.store(NEXT_TAIL, std::memory_order_release);
83 return ErrorCode::OK;
84 }
85
93 template <typename ElementData = Data>
94 ErrorCode Pop(ElementData &item)
95 {
96 auto current_head = head_.load(std::memory_order_relaxed);
97
98 while (true)
99 {
100 if (current_head == tail_.load(std::memory_order_acquire))
101 {
102 return ErrorCode::EMPTY;
103 }
104
105 item = queue_handle_[current_head];
106
107 if (head_.compare_exchange_weak(current_head, Increment(current_head),
108 std::memory_order_acq_rel,
109 std::memory_order_relaxed))
110 {
111 return ErrorCode::OK;
112 }
113 }
114 }
115
134 ErrorCode Pop(Data &item)
135 {
136 auto current_head = head_.load(std::memory_order_relaxed);
137
138 while (true)
139 {
140 if (current_head == tail_.load(std::memory_order_acquire))
141 {
142 return ErrorCode::EMPTY;
143 }
144
145 item = queue_handle_[current_head];
146
147 if (head_.compare_exchange_weak(current_head, Increment(current_head),
148 std::memory_order_acq_rel,
149 std::memory_order_relaxed))
150 {
151 return ErrorCode::OK;
152 }
153 }
154 }
155
163 ErrorCode Pop()
164 {
165 auto current_head = head_.load(std::memory_order_relaxed);
166
167 while (true)
168 {
169 if (current_head == tail_.load(std::memory_order_acquire))
170 {
171 return ErrorCode::EMPTY;
172 }
173
174 if (head_.compare_exchange_weak(current_head, Increment(current_head),
175 std::memory_order_acq_rel,
176 std::memory_order_relaxed))
177 {
178 return ErrorCode::OK;
179 }
180 }
181 }
182
191 ErrorCode Peek(Data &item)
192 {
193 while (true)
194 {
195 auto current_head = head_.load(std::memory_order_relaxed);
196 if (current_head == tail_.load(std::memory_order_acquire))
197 {
198 return ErrorCode::EMPTY;
199 }
200
201 item = queue_handle_[current_head];
202
203 if (head_.load(std::memory_order_acquire) == current_head)
204 {
205 return ErrorCode::OK;
206 }
207 }
208 }
209
218 ErrorCode PushBatch(const Data *data, size_t size)
219 {
220 auto current_tail = tail_.load(std::memory_order_relaxed);
221 auto current_head = head_.load(std::memory_order_acquire);
222
223 size_t capacity = LENGTH + 1;
224 size_t free_space = (current_tail >= current_head)
225 ? (capacity - (current_tail - current_head) - 1)
226 : (current_head - current_tail - 1);
227
228 if (free_space < size)
229 {
230 return ErrorCode::FULL;
231 }
232
233 size_t first_chunk = LibXR::min(size, capacity - current_tail);
234 LibXR::Memory::FastCopy(reinterpret_cast<void *>(queue_handle_ + current_tail),
235 reinterpret_cast<const void *>(data),
236 first_chunk * sizeof(Data));
237
238 if (size > first_chunk)
239 {
240 LibXR::Memory::FastCopy(reinterpret_cast<void *>(queue_handle_),
241 reinterpret_cast<const void *>(data + first_chunk),
242 (size - first_chunk) * sizeof(Data));
243 }
244
245 tail_.store((current_tail + size) % capacity, std::memory_order_release);
246 return ErrorCode::OK;
247 }
248
257 ErrorCode PopBatch(Data *data, size_t size)
258 {
259 size_t capacity = LENGTH + 1;
260
261 while (true)
262 {
263 auto current_head = head_.load(std::memory_order_relaxed);
264 auto current_tail = tail_.load(std::memory_order_acquire);
265
266 size_t available = (current_tail >= current_head)
267 ? (current_tail - current_head)
268 : (capacity - current_head + current_tail);
269
270 if (available < size)
271 {
272 return ErrorCode::EMPTY;
273 }
274
275 if (data != nullptr)
276 {
277 size_t first_chunk = LibXR::min(size, capacity - current_head);
279 reinterpret_cast<void *>(data),
280 reinterpret_cast<const void *>(queue_handle_ + current_head),
281 first_chunk * sizeof(Data));
282
283 if (size > first_chunk)
284 {
285 LibXR::Memory::FastCopy(reinterpret_cast<void *>(data + first_chunk),
286 reinterpret_cast<const void *>(queue_handle_),
287 (size - first_chunk) * sizeof(Data));
288 }
289 }
290
291 size_t new_head = (current_head + size) % capacity;
292
293 if (head_.compare_exchange_weak(current_head, new_head, std::memory_order_acq_rel,
294 std::memory_order_relaxed))
295 {
296 return ErrorCode::OK;
297 }
298 }
299 }
300
310 ErrorCode PeekBatch(Data *data, size_t size)
311 {
312 if (size == 0)
313 {
314 return ErrorCode::OK;
315 }
316
317 const size_t CAPACITY = LENGTH + 1;
318
319 while (true)
320 {
321 auto current_head = head_.load(std::memory_order_relaxed);
322 auto current_tail = tail_.load(std::memory_order_acquire);
323
324 size_t available = (current_tail >= current_head)
325 ? (current_tail - current_head)
326 : (CAPACITY - current_head + current_tail);
327
328 if (available < size)
329 {
330 return ErrorCode::EMPTY;
331 }
332
333 if (data != nullptr)
334 {
335 size_t first_chunk = LibXR::min(size, CAPACITY - current_head);
337 reinterpret_cast<void *>(data),
338 reinterpret_cast<const void *>(queue_handle_ + current_head),
339 first_chunk * sizeof(Data));
340
341 if (size > first_chunk)
342 {
343 LibXR::Memory::FastCopy(reinterpret_cast<void *>(data + first_chunk),
344 reinterpret_cast<const void *>(queue_handle_),
345 (size - first_chunk) * sizeof(Data));
346 }
347 }
348
349 if (head_.load(std::memory_order_acquire) == current_head)
350 {
351 return ErrorCode::OK;
352 }
353 }
354 }
355
362 void Reset()
363 {
364 head_.store(0, std::memory_order_relaxed);
365 tail_.store(0, std::memory_order_relaxed);
366 }
367
372 size_t Size() const
373 {
374 const auto CURRENT_HEAD = head_.load(std::memory_order_acquire);
375 const auto CURRENT_TAIL = tail_.load(std::memory_order_acquire);
376 return (CURRENT_TAIL >= CURRENT_HEAD) ? (CURRENT_TAIL - CURRENT_HEAD)
377 : ((LENGTH + 1) - CURRENT_HEAD + CURRENT_TAIL);
378 }
379
383 size_t EmptySize() { return LENGTH - Size(); }
384
385 private:
386 alignas(LIBXR_CACHE_LINE_SIZE) std::atomic<uint32_t> head_;
387 alignas(LIBXR_CACHE_LINE_SIZE) std::atomic<uint32_t> tail_;
388 const size_t LENGTH;
389 Data *queue_handle_;
390
391 uint32_t Increment(uint32_t index) const { return (index + 1) % (LENGTH + 1); }
392};
393
394} // namespace LibXR
无锁队列实现 / Lock-free queue implementation
void Reset()
重置队列 / Resets the queue
ErrorCode Pop(ElementData &item)
从队列中弹出数据 / Pops data from the queue
ErrorCode Pop()
从队列中弹出数据(不返回数据) / Pops data from the queue (without returning data)
ErrorCode PushBatch(const Data *data, size_t size)
批量推入数据 / Pushes multiple elements into the queue
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
ErrorCode Pop(Data &item)
从队列中移除头部元素,并获取该元素的数据 (Remove the front element from the queue and retrieve its data).
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
LockFreeQueue(size_t length)
构造函数 / Constructor
ErrorCode PeekBatch(Data *data, size_t size)
批量查看队列中的数据(不移除) / Peeks multiple elements from the queue without removing them
Data * operator[](uint32_t index)
获取指定索引的数据指针 / Retrieves the data pointer at a specified index
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue
~LockFreeQueue()
析构函数 / Destructor
ErrorCode PopBatch(Data *data, size_t size)
批量弹出数据 / Pops multiple elements from the queue
ErrorCode Peek(Data &item)
获取队列头部数据但不弹出 / Retrieves the front data of the queue without popping
static void FastCopy(void *dst, const void *src, size_t size)
快速内存拷贝 / Fast memory copy
Definition libxr_mem.cpp:17
LibXR 命名空间
constexpr auto min(T1 a, T2 b) -> typename std::common_type< T1, T2 >::type
计算两个数的最小值