libxr 1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
lockfree_queue.hpp
1#pragma once
2
3#include <atomic>
4
5#include "libxr_def.hpp"
6
7static constexpr size_t LIBXR_CACHE_LINE_SIZE =
8 (sizeof(std::atomic<size_t>) * 8 > 32) ? 64 : 32;
9
10namespace LibXR
11{
12
25template <typename Data>
27{
28 public:
37 : head_(0), tail_(0), queue_handle_(new Data[length + 1]), length_(length)
38 {
39 }
40
47 ~LockFreeQueue() { delete[] queue_handle_; }
48
54 Data *operator[](uint32_t index) { return &queue_handle_[static_cast<size_t>(index)]; }
55
63 template <typename ElementData = Data>
64 ErrorCode Push(ElementData &&item)
65 {
66 const auto CURRENT_TAIL = tail_.load(std::memory_order_relaxed);
67 const auto NEXT_TAIL = Increment(CURRENT_TAIL);
68
69 if (NEXT_TAIL == head_.load(std::memory_order_acquire))
70 {
71 return ErrorCode::FULL;
72 }
73
74 queue_handle_[CURRENT_TAIL] = std::forward<ElementData>(item);
75 tail_.store(NEXT_TAIL, std::memory_order_release);
76 return ErrorCode::OK;
77 }
78
86 template <typename ElementData = Data>
87 ErrorCode Pop(ElementData &item)
88 {
89 auto current_head = head_.load(std::memory_order_relaxed);
90
91 while (true)
92 {
93 if (current_head == tail_.load(std::memory_order_acquire))
94 {
95 return ErrorCode::EMPTY;
96 }
97
98 if (head_.compare_exchange_weak(current_head, Increment(current_head),
99 std::memory_order_acquire,
100 std::memory_order_relaxed))
101 {
102 item = queue_handle_[current_head];
103 return ErrorCode::OK;
104 }
105 }
106 }
107
130 ErrorCode Pop(Data &item)
131 {
132 auto current_head = head_.load(std::memory_order_relaxed);
133
134 while (true)
135 {
136 if (current_head == tail_.load(std::memory_order_acquire))
137 {
138 return ErrorCode::EMPTY;
139 }
140
141 if (head_.compare_exchange_weak(current_head, Increment(current_head),
142 std::memory_order_acquire,
143 std::memory_order_relaxed))
144 {
145 std::atomic_thread_fence(std::memory_order_acquire);
146 item = queue_handle_[current_head];
147 return ErrorCode::OK;
148 }
149 current_head = head_.load(std::memory_order_relaxed);
150 }
151 }
152
160 ErrorCode Pop()
161 {
162 auto current_head = head_.load(std::memory_order_relaxed);
163
164 while (true)
165 {
166 if (current_head == tail_.load(std::memory_order_acquire))
167 {
168 return ErrorCode::EMPTY;
169 }
170
171 if (head_.compare_exchange_weak(current_head, Increment(current_head),
172 std::memory_order_acquire,
173 std::memory_order_relaxed))
174 {
175 return ErrorCode::OK;
176 }
177 current_head = head_.load(std::memory_order_relaxed);
178 }
179 }
180
189 ErrorCode Peek(Data &item)
190 {
191 const auto CURRENT_HEAD = head_.load(std::memory_order_acquire);
192 if (CURRENT_HEAD == tail_.load(std::memory_order_acquire))
193 {
194 return ErrorCode::EMPTY;
195 }
196
197 item = queue_handle_[CURRENT_HEAD];
198 return ErrorCode::OK;
199 }
200
209 ErrorCode PushBatch(const Data *data, size_t size)
210 {
211 auto current_tail = tail_.load(std::memory_order_relaxed);
212 auto current_head = head_.load(std::memory_order_acquire);
213
214 size_t capacity = length_ + 1;
217 : (current_head - current_tail - 1);
218
219 if (free_space < size)
220 {
221 return ErrorCode::FULL;
222 }
223
224 for (size_t i = 0; i < size; ++i)
225 {
226 queue_handle_[(current_tail + i) % capacity] = data[i];
227 }
228
229 tail_.store((current_tail + size) % capacity, std::memory_order_release);
230 return ErrorCode::OK;
231 }
232
241 ErrorCode PopBatch(Data *data, size_t batch_size)
242 {
243 size_t capacity = length_ + 1;
244
245 while (true)
246 {
247 auto current_head = head_.load(std::memory_order_relaxed);
248 auto current_tail = tail_.load(std::memory_order_acquire);
249
253
254 if (available < batch_size)
255 {
256 return ErrorCode::EMPTY;
257 }
258
259 for (size_t i = 0; i < batch_size; ++i)
260 {
261 data[i] = queue_handle_[(current_head + i) % capacity];
262 }
263
265
266 if (head_.compare_exchange_weak(current_head, next_head, std::memory_order_acquire,
267 std::memory_order_relaxed))
268 {
269 return ErrorCode::OK;
270 }
271 }
272 }
273
280 void Reset()
281 {
282 head_.store(0, std::memory_order_relaxed);
283 tail_.store(0, std::memory_order_relaxed);
284 }
285
290 size_t Size() const
291 {
292 const auto CURRENT_HEAD = head_.load(std::memory_order_acquire);
293 const auto CURRENT_TAIL = tail_.load(std::memory_order_acquire);
295 : ((length_ + 1) - CURRENT_HEAD + CURRENT_TAIL);
296 }
297
301 size_t EmptySize() { return length_ - Size(); }
302
303 private:
304 alignas(LIBXR_CACHE_LINE_SIZE) std::atomic<unsigned int> head_;
305 alignas(LIBXR_CACHE_LINE_SIZE) std::atomic<unsigned int> tail_;
306 Data *queue_handle_;
307 size_t length_;
308
309 unsigned int Increment(unsigned int index) const { return (index + 1) % (length_ + 1); }
310};
311
312} // namespace LibXR
无锁队列实现 / Lock-free queue implementation
void Reset()
重置队列 / Resets the queue
ErrorCode Pop(ElementData &item)
从队列中弹出数据 / Pops data from the queue
ErrorCode Pop()
从队列中弹出数据(不返回数据) / Pops data from the queue (without returning data)
ErrorCode PushBatch(const Data *data, size_t size)
批量推入数据 / Pushes multiple elements into the queue
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
ErrorCode PopBatch(Data *data, size_t batch_size)
批量弹出数据 / Pops multiple elements from the queue
ErrorCode Pop(Data &item)
从队列中移除头部元素,并获取该元素的数据 (Remove the front element from the queue and retrieve its data).
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
LockFreeQueue(size_t length)
构造函数 / Constructor
Data * operator[](uint32_t index)
获取指定索引的数据指针 / Retrieves the data pointer at a specified index
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue
~LockFreeQueue()
析构函数 / Destructor
ErrorCode Peek(Data &item)
获取队列头部数据但不弹出 / Retrieves the front data of the queue without popping
LibXR Color Control Library / LibXR终端颜色控制库
constexpr auto min(T1 a, T2 b) -> typename std::common_type< T1, T2 >::type
计算两个数的最小值