libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
lockfree_queue.hpp
1#pragma once
2
3#include <atomic>
4
5#include "libxr_def.hpp"
6
7namespace LibXR
8{
9
24template <typename Data>
25class alignas(LIBXR_CACHE_LINE_SIZE) LockFreeQueue
26{
27 public:
35 LockFreeQueue(size_t length)
36 : head_(0), tail_(0), queue_handle_(new Data[length + 1]), LENGTH(length)
37 {
38 }
39
46 ~LockFreeQueue() { delete[] queue_handle_; }
47
53 Data *operator[](uint32_t index) { return &queue_handle_[static_cast<size_t>(index)]; }
54
62 template <typename ElementData = Data>
63 ErrorCode Push(ElementData &&item)
64 {
65 const auto CURRENT_TAIL = tail_.load(std::memory_order_relaxed);
66 const auto NEXT_TAIL = Increment(CURRENT_TAIL);
67
68 if (NEXT_TAIL == head_.load(std::memory_order_acquire))
69 {
70 return ErrorCode::FULL;
71 }
72
73 queue_handle_[CURRENT_TAIL] = std::forward<ElementData>(item);
74 tail_.store(NEXT_TAIL, std::memory_order_release);
75 return ErrorCode::OK;
76 }
77
85 template <typename ElementData = Data>
86 ErrorCode Pop(ElementData &item)
87 {
88 auto current_head = head_.load(std::memory_order_relaxed);
89
90 while (true)
91 {
92 if (current_head == tail_.load(std::memory_order_acquire))
93 {
94 return ErrorCode::EMPTY;
95 }
96
97 if (head_.compare_exchange_weak(current_head, Increment(current_head),
98 std::memory_order_acquire,
99 std::memory_order_relaxed))
100 {
101 item = queue_handle_[current_head];
102 return ErrorCode::OK;
103 }
104 }
105 }
106
129 ErrorCode Pop(Data &item)
130 {
131 auto current_head = head_.load(std::memory_order_relaxed);
132
133 while (true)
134 {
135 if (current_head == tail_.load(std::memory_order_acquire))
136 {
137 return ErrorCode::EMPTY;
138 }
139
140 if (head_.compare_exchange_weak(current_head, Increment(current_head),
141 std::memory_order_acquire,
142 std::memory_order_relaxed))
143 {
144 std::atomic_thread_fence(std::memory_order_acquire);
145 item = queue_handle_[current_head];
146 return ErrorCode::OK;
147 }
148 current_head = head_.load(std::memory_order_relaxed);
149 }
150 }
151
159 ErrorCode Pop()
160 {
161 auto current_head = head_.load(std::memory_order_relaxed);
162
163 while (true)
164 {
165 if (current_head == tail_.load(std::memory_order_acquire))
166 {
167 return ErrorCode::EMPTY;
168 }
169
170 if (head_.compare_exchange_weak(current_head, Increment(current_head),
171 std::memory_order_acquire,
172 std::memory_order_relaxed))
173 {
174 return ErrorCode::OK;
175 }
176 current_head = head_.load(std::memory_order_relaxed);
177 }
178 }
179
188 ErrorCode Peek(Data &item)
189 {
190 const auto CURRENT_HEAD = head_.load(std::memory_order_acquire);
191 if (CURRENT_HEAD == tail_.load(std::memory_order_acquire))
192 {
193 return ErrorCode::EMPTY;
194 }
195
196 item = queue_handle_[CURRENT_HEAD];
197 return ErrorCode::OK;
198 }
199
208 ErrorCode PushBatch(const Data *data, size_t size)
209 {
210 auto current_tail = tail_.load(std::memory_order_relaxed);
211 auto current_head = head_.load(std::memory_order_acquire);
212
213 size_t capacity = LENGTH + 1;
214 size_t free_space = (current_tail >= current_head)
215 ? (capacity - (current_tail - current_head) - 1)
216 : (current_head - current_tail - 1);
217
218 if (free_space < size)
219 {
220 return ErrorCode::FULL;
221 }
222
223 size_t first_chunk = LibXR::min(size, capacity - current_tail);
224 memcpy(reinterpret_cast<void *>(queue_handle_ + current_tail),
225 reinterpret_cast<const void *>(data), first_chunk * sizeof(Data));
226
227 if (size > first_chunk)
228 {
229 memcpy(reinterpret_cast<void *>(queue_handle_),
230 reinterpret_cast<const void *>(data + first_chunk),
231 (size - first_chunk) * sizeof(Data));
232 }
233
234 tail_.store((current_tail + size) % capacity, std::memory_order_release);
235 return ErrorCode::OK;
236 }
237
246 ErrorCode PopBatch(Data *data, size_t size)
247 {
248 size_t capacity = LENGTH + 1;
249
250 while (true)
251 {
252 auto current_head = head_.load(std::memory_order_relaxed);
253 auto current_tail = tail_.load(std::memory_order_acquire);
254
255 size_t available = (current_tail >= current_head)
256 ? (current_tail - current_head)
257 : (capacity - current_head + current_tail);
258
259 if (available < size)
260 {
261 return ErrorCode::EMPTY;
262 }
263
264 if (data != nullptr)
265 {
266 size_t first_chunk = LibXR::min(size, capacity - current_head);
267 memcpy(reinterpret_cast<void *>(data),
268 reinterpret_cast<const void *>(queue_handle_ + current_head),
269 first_chunk * sizeof(Data));
270
271 if (size > first_chunk)
272 {
273 memcpy(reinterpret_cast<void *>(data + first_chunk),
274 reinterpret_cast<const void *>(queue_handle_),
275 (size - first_chunk) * sizeof(Data));
276 }
277 }
278
279 size_t new_head = (current_head + size) % capacity;
280
281 if (head_.compare_exchange_weak(current_head, new_head, std::memory_order_acquire,
282 std::memory_order_relaxed))
283 {
284 return ErrorCode::OK;
285 }
286 }
287 }
288
298 ErrorCode PeekBatch(Data *data, size_t size)
299 {
300 size_t capacity = LENGTH + 1;
301
302 while (true)
303 {
304 auto current_head = head_.load(std::memory_order_relaxed);
305 auto current_tail = tail_.load(std::memory_order_acquire);
306
307 size_t available = (current_tail >= current_head)
308 ? (current_tail - current_head)
309 : (capacity - current_head + current_tail);
310
311 if (available < size)
312 {
313 return ErrorCode::EMPTY;
314 }
315
316 size_t first_chunk = LibXR::min(size, capacity - current_head);
317 memcpy(reinterpret_cast<void *>(data),
318 reinterpret_cast<const void *>(queue_handle_ + current_head),
319 first_chunk * sizeof(Data));
320
321 if (size > first_chunk)
322 {
323 memcpy(reinterpret_cast<void *>(data + first_chunk),
324 reinterpret_cast<const void *>(queue_handle_),
325 (size - first_chunk) * sizeof(Data));
326 }
327
328 if (head_.load(std::memory_order_acquire) == current_head)
329 {
330 return ErrorCode::OK;
331 }
332 }
333 }
334
341 void Reset()
342 {
343 head_.store(0, std::memory_order_relaxed);
344 tail_.store(0, std::memory_order_relaxed);
345 }
346
351 size_t Size() const
352 {
353 const auto CURRENT_HEAD = head_.load(std::memory_order_acquire);
354 const auto CURRENT_TAIL = tail_.load(std::memory_order_acquire);
355 return (CURRENT_TAIL >= CURRENT_HEAD) ? (CURRENT_TAIL - CURRENT_HEAD)
356 : ((LENGTH + 1) - CURRENT_HEAD + CURRENT_TAIL);
357 }
358
362 size_t EmptySize() { return LENGTH - Size(); }
363
364 private:
365 alignas(LIBXR_CACHE_LINE_SIZE) std::atomic<uint32_t> head_;
366 alignas(LIBXR_CACHE_LINE_SIZE) std::atomic<uint32_t> tail_;
367 Data *queue_handle_;
368 const size_t LENGTH;
369
370 uint32_t Increment(uint32_t index) const { return (index + 1) % (LENGTH + 1); }
371};
372
373} // namespace LibXR
无锁队列实现 / Lock-free queue implementation
void Reset()
重置队列 / Resets the queue
ErrorCode Pop(ElementData &item)
从队列中弹出数据 / Pops data from the queue
ErrorCode Pop()
从队列中弹出数据(不返回数据) / Pops data from the queue (without returning data)
ErrorCode PushBatch(const Data *data, size_t size)
批量推入数据 / Pushes multiple elements into the queue
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
ErrorCode Pop(Data &item)
从队列中移除头部元素,并获取该元素的数据 (Remove the front element from the queue and retrieve its data).
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
LockFreeQueue(size_t length)
构造函数 / Constructor
ErrorCode PeekBatch(Data *data, size_t size)
批量查看队列中的数据(不移除) / Peeks multiple elements from the queue without removing them
Data * operator[](uint32_t index)
获取指定索引的数据指针 / Retrieves the data pointer at a specified index
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue
~LockFreeQueue()
析构函数 / Destructor
ErrorCode PopBatch(Data *data, size_t size)
批量弹出数据 / Pops multiple elements from the queue
ErrorCode Peek(Data &item)
获取队列头部数据但不弹出 / Retrieves the front data of the queue without popping
LibXR 命名空间
Definition ch32_gpio.hpp:9
constexpr auto min(T1 a, T2 b) -> typename std::common_type< T1, T2 >::type
计算两个数的最小值