libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
LibXR::LockFreeQueue< Data > Class Template Reference

无锁队列实现 / Lock-free queue implementation More...

#include <lockfree_queue.hpp>

Public Member Functions

 LockFreeQueue (size_t length)
 构造函数 / Constructor
 
 ~LockFreeQueue ()
 析构函数 / Destructor
 
Data * operator[] (uint32_t index)
 获取指定索引的数据指针 / Retrieves the data pointer at a specified index
 
template<typename ElementData = Data>
ErrorCode Push (ElementData &&item)
 向队列中推入数据 / Pushes data into the queue
 
template<typename ElementData = Data>
ErrorCode Pop (ElementData &item)
 从队列中弹出数据 / Pops data from the queue
 
ErrorCode Pop (Data &item)
 从队列中移除头部元素,并获取该元素的数据 (Remove the front element from the queue and retrieve its data).
 
ErrorCode Pop ()
 从队列中弹出数据(不返回数据) / Pops data from the queue (without returning data)
 
ErrorCode Peek (Data &item)
 获取队列头部数据但不弹出 / Retrieves the front data of the queue without popping
 
ErrorCode PushBatch (const Data *data, size_t size)
 批量推入数据 / Pushes multiple elements into the queue
 
template<typename Writer >
ErrorCode PushWithWriter (size_t size, Writer &&writer)
 通过写入器回调写入固定长度数据(单生产者) / Push fixed-size data via writer callback (single producer)
 
template<typename Reader >
ErrorCode PopWithReader (size_t size, Reader &&reader)
 通过读取器回调弹出固定长度数据(单消费者) / Pop fixed-size data via reader callback (single consumer)
 
ErrorCode PopBatch (Data *data, size_t size)
 批量弹出数据 / Pops multiple elements from the queue
 
ErrorCode PeekBatch (Data *data, size_t size)
 批量查看队列中的数据(不移除) / Peeks multiple elements from the queue without removing them
 
void Reset ()
 重置队列 / Resets the queue
 
size_t Size () const
 获取当前队列中的元素数量 / Returns the number of elements currently in the queue
 
size_t EmptySize ()
 计算队列剩余可用空间 / Calculates the remaining available space in the queue
 
size_t MaxSize () const
 获取队列的最大容量 / Returns the maximum capacity of the queue
 

Private Member Functions

constexpr size_t AlignUp (size_t size, size_t align)
 
uint32_t Increment (uint32_t index) const
 

Private Attributes

std::atomic< uint32_t > head_
 
std::atomic< uint32_t > tail_
 
const size_t LENGTH
 
Data * queue_handle_
 

Detailed Description

template<typename Data>
class LibXR::LockFreeQueue< Data >

无锁队列实现 / Lock-free queue implementation

该类实现了单生产者多消费者无锁队列(SPMC Lock-Free Queue),支持多线程环境下的高效入队和出队操作, 适用于需要高并发性能的场景,如实时系统和多线程数据处理。 This class implements a single-producer, multiple-consumer lock-free queue (SPMC Lock-Free Queue) that supports high-efficiency enqueue and dequeue operations in a multi-threaded environment. It is suitable for scenarios requiring high concurrency, such as real-time systems and multi-threaded data processing.

Template Parameters
Data队列存储的数据类型 / The type of data stored in the queue.

Definition at line 29 of file lockfree_queue.hpp.

Constructor & Destructor Documentation

◆ LockFreeQueue()

template<typename Data >
LibXR::LockFreeQueue< Data >::LockFreeQueue ( size_t length)
inline

构造函数 / Constructor

Parameters
length队列的最大容量 / Maximum capacity of the queue

创建一个指定大小的无锁队列,并初始化相关变量。 Creates a lock-free queue with the specified size and initializes relevant variables.

Note
包含动态内存分配。 Contains dynamic memory allocation.

Definition at line 47 of file lockfree_queue.hpp.

48 : head_(0),
49 tail_(0),
50 LENGTH(AlignUp(length, LibXR::ALIGN_SIZE) - 1),
51 queue_handle_(new Data[LENGTH + 1])
52 {
53 }
constexpr size_t ALIGN_SIZE
平台自然对齐大小 / Native platform alignment size
Definition libxr_def.hpp:35

◆ ~LockFreeQueue()

template<typename Data >
LibXR::LockFreeQueue< Data >::~LockFreeQueue ( )
inline

析构函数 / Destructor

释放队列所占用的内存。 Releases the memory occupied by the queue.

Definition at line 61 of file lockfree_queue.hpp.

61{ delete[] queue_handle_; }

Member Function Documentation

◆ AlignUp()

template<typename Data >
size_t LibXR::LockFreeQueue< Data >::AlignUp ( size_t size,
size_t align )
inlineconstexprprivate

Definition at line 31 of file lockfree_queue.hpp.

32 {
33 return (size / align + 1) * align;
34 }

◆ EmptySize()

template<typename Data >
size_t LibXR::LockFreeQueue< Data >::EmptySize ( )
inline

计算队列剩余可用空间 / Calculates the remaining available space in the queue

Definition at line 509 of file lockfree_queue.hpp.

509{ return LENGTH - Size(); }
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue

◆ Increment()

template<typename Data >
uint32_t LibXR::LockFreeQueue< Data >::Increment ( uint32_t index) const
inlineprivate

Definition at line 522 of file lockfree_queue.hpp.

522{ return (index + 1) % (LENGTH + 1); }

◆ MaxSize()

template<typename Data >
size_t LibXR::LockFreeQueue< Data >::MaxSize ( ) const
inline

获取队列的最大容量 / Returns the maximum capacity of the queue

Definition at line 514 of file lockfree_queue.hpp.

514{ return LENGTH; }

◆ operator[]()

template<typename Data >
Data * LibXR::LockFreeQueue< Data >::operator[] ( uint32_t index)
inline

获取指定索引的数据指针 / Retrieves the data pointer at a specified index

Parameters
index数据索引 / Data index
Returns
指向该索引数据的指针 / Pointer to the data at the given index

Definition at line 68 of file lockfree_queue.hpp.

68{ return &queue_handle_[static_cast<size_t>(index)]; }

◆ Peek()

template<typename Data >
ErrorCode LibXR::LockFreeQueue< Data >::Peek ( Data & item)
inline

获取队列头部数据但不弹出 / Retrieves the front data of the queue without popping

Parameters
item用于存储获取的数据 / Variable to store the retrieved data
Returns
操作结果,成功返回 ErrorCode::OK,队列为空返回 ErrorCode::EMPTY / Operation result: returns ErrorCode::OK on success, ErrorCode::EMPTY if the queue is empty

Definition at line 198 of file lockfree_queue.hpp.

199 {
200 while (true)
201 {
202 auto current_head = head_.load(std::memory_order_relaxed);
203 if (current_head == tail_.load(std::memory_order_acquire))
204 {
205 return ErrorCode::EMPTY;
206 }
207
208 item = queue_handle_[current_head];
209
210 if (head_.load(std::memory_order_acquire) == current_head)
211 {
212 return ErrorCode::OK;
213 }
214 }
215 }
@ EMPTY
为空 | Empty
@ OK
操作成功 | Operation successful

◆ PeekBatch()

template<typename Data >
ErrorCode LibXR::LockFreeQueue< Data >::PeekBatch ( Data * data,
size_t size )
inline

批量查看队列中的数据(不移除) / Peeks multiple elements from the queue without removing them

Parameters
data数据存储数组指针 / Pointer to the array to store peeked data
size要查看的元素个数 / Number of elements to peek
Returns
操作结果,成功返回 ErrorCode::OK,队列中数据不足返回 ErrorCode::EMPTY / Operation result: returns ErrorCode::OK on success, ErrorCode::EMPTY if not enough data is available

Definition at line 436 of file lockfree_queue.hpp.

437 {
438 if (size == 0)
439 {
440 return ErrorCode::OK;
441 }
442
443 const size_t CAPACITY = LENGTH + 1;
444
445 while (true)
446 {
447 auto current_head = head_.load(std::memory_order_relaxed);
448 auto current_tail = tail_.load(std::memory_order_acquire);
449
450 size_t available = (current_tail >= current_head)
451 ? (current_tail - current_head)
452 : (CAPACITY - current_head + current_tail);
453
454 if (available < size)
455 {
456 return ErrorCode::EMPTY;
457 }
458
459 if (data != nullptr)
460 {
461 size_t first_chunk = LibXR::min(size, CAPACITY - current_head);
463 reinterpret_cast<void*>(data),
464 reinterpret_cast<const void*>(queue_handle_ + current_head),
465 first_chunk * sizeof(Data));
466
467 if (size > first_chunk)
468 {
469 LibXR::Memory::FastCopy(reinterpret_cast<void*>(data + first_chunk),
470 reinterpret_cast<const void*>(queue_handle_),
471 (size - first_chunk) * sizeof(Data));
472 }
473 }
474
475 if (head_.load(std::memory_order_acquire) == current_head)
476 {
477 return ErrorCode::OK;
478 }
479 }
480 }
static void FastCopy(void *dst, const void *src, size_t size)
快速内存拷贝 / Fast memory copy
Definition libxr_mem.cpp:5
constexpr auto min(LeftType a, RightType b) -> std::common_type_t< LeftType, RightType >
计算两个数的最小值

◆ Pop() [1/3]

template<typename Data >
ErrorCode LibXR::LockFreeQueue< Data >::Pop ( )
inline

从队列中弹出数据(不返回数据) / Pops data from the queue (without returning data)

Returns
操作结果,成功返回 ErrorCode::OK,队列为空返回 ErrorCode::EMPTY / Operation result: returns ErrorCode::OK on success, ErrorCode::EMPTY if the queue is empty

Definition at line 170 of file lockfree_queue.hpp.

171 {
172 auto current_head = head_.load(std::memory_order_relaxed);
173
174 while (true)
175 {
176 if (current_head == tail_.load(std::memory_order_acquire))
177 {
178 return ErrorCode::EMPTY;
179 }
180
181 if (head_.compare_exchange_weak(current_head, Increment(current_head),
182 std::memory_order_acq_rel,
183 std::memory_order_relaxed))
184 {
185 return ErrorCode::OK;
186 }
187 }
188 }

◆ Pop() [2/3]

template<typename Data >
ErrorCode LibXR::LockFreeQueue< Data >::Pop ( Data & item)
inline

从队列中移除头部元素,并获取该元素的数据 (Remove the front element from the queue and retrieve its data).

This function atomically updates the head_ index using compare_exchange_weak to ensure thread safety. If the queue is empty, it returns ErrorCode::EMPTY. Otherwise, it updates the head pointer, retrieves the element, and returns ErrorCode::OK. 该函数使用 compare_exchange_weak 原子地更新 head_ 索引,以确保线程安全。 如果队列为空,则返回 ErrorCode::EMPTY,否则更新头指针,获取元素数据,并返回 ErrorCode::OK

Parameters
item用于存储弹出元素的引用 (Reference to store the popped element).
Returns
操作结果 (Operation result):

Definition at line 141 of file lockfree_queue.hpp.

142 {
143 auto current_head = head_.load(std::memory_order_relaxed);
144
145 while (true)
146 {
147 if (current_head == tail_.load(std::memory_order_acquire))
148 {
149 return ErrorCode::EMPTY;
150 }
151
152 item = queue_handle_[current_head];
153
154 if (head_.compare_exchange_weak(current_head, Increment(current_head),
155 std::memory_order_acq_rel,
156 std::memory_order_relaxed))
157 {
158 return ErrorCode::OK;
159 }
160 }
161 }

◆ Pop() [3/3]

template<typename Data >
template<typename ElementData = Data>
ErrorCode LibXR::LockFreeQueue< Data >::Pop ( ElementData & item)
inline

从队列中弹出数据 / Pops data from the queue

Parameters
item用于存储弹出数据的变量 / Variable to store the popped data
Returns
操作结果,成功返回 ErrorCode::OK,队列为空返回 ErrorCode::EMPTY / Operation result: returns ErrorCode::OK on success, ErrorCode::EMPTY if the queue is empty

Definition at line 101 of file lockfree_queue.hpp.

102 {
103 auto current_head = head_.load(std::memory_order_relaxed);
104
105 while (true)
106 {
107 if (current_head == tail_.load(std::memory_order_acquire))
108 {
109 return ErrorCode::EMPTY;
110 }
111
112 item = queue_handle_[current_head];
113
114 if (head_.compare_exchange_weak(current_head, Increment(current_head),
115 std::memory_order_acq_rel,
116 std::memory_order_relaxed))
117 {
118 return ErrorCode::OK;
119 }
120 }
121 }

◆ PopBatch()

template<typename Data >
ErrorCode LibXR::LockFreeQueue< Data >::PopBatch ( Data * data,
size_t size )
inline

批量弹出数据 / Pops multiple elements from the queue

Parameters
data数据存储数组指针 / Pointer to the array to store popped data
size需要弹出的数据个数 / Number of elements to pop
Returns
操作结果,成功返回 ErrorCode::OK,队列为空返回 ErrorCode::EMPTY / Operation result: returns ErrorCode::OK on success, ErrorCode::EMPTY if the queue is empty

Definition at line 383 of file lockfree_queue.hpp.

384 {
385 size_t capacity = LENGTH + 1;
386
387 while (true)
388 {
389 auto current_head = head_.load(std::memory_order_relaxed);
390 auto current_tail = tail_.load(std::memory_order_acquire);
391
392 size_t available = (current_tail >= current_head)
393 ? (current_tail - current_head)
394 : (capacity - current_head + current_tail);
395
396 if (available < size)
397 {
398 return ErrorCode::EMPTY;
399 }
400
401 if (data != nullptr)
402 {
403 size_t first_chunk = LibXR::min(size, capacity - current_head);
405 reinterpret_cast<void*>(data),
406 reinterpret_cast<const void*>(queue_handle_ + current_head),
407 first_chunk * sizeof(Data));
408
409 if (size > first_chunk)
410 {
411 LibXR::Memory::FastCopy(reinterpret_cast<void*>(data + first_chunk),
412 reinterpret_cast<const void*>(queue_handle_),
413 (size - first_chunk) * sizeof(Data));
414 }
415 }
416
417 size_t new_head = (current_head + size) % capacity;
418
419 if (head_.compare_exchange_weak(current_head, new_head, std::memory_order_acq_rel,
420 std::memory_order_relaxed))
421 {
422 return ErrorCode::OK;
423 }
424 }
425 }

◆ PopWithReader()

template<typename Data >
template<typename Reader >
ErrorCode LibXR::LockFreeQueue< Data >::PopWithReader ( size_t size,
Reader && reader )
inline

通过读取器回调弹出固定长度数据(单消费者) / Pop fixed-size data via reader callback (single consumer)

Parameters
size需要弹出的元素数 / Number of elements to pop
reader读取器:签名 ErrorCode(const Data* buffer, size_t chunk_size) Reader callback signature: ErrorCode(const Data* buffer, size_t chunk_size)
Note
语义对齐 PopBatch:数据不足返回 EMPTY;仅当整段读取成功后才提交 head。 Semantics align with PopBatch: returns EMPTY when data is insufficient; head is committed only after whole range is read successfully.

Definition at line 329 of file lockfree_queue.hpp.

330 {
331 static_assert(std::is_invocable_v<Reader&, const Data*, size_t>,
332 "PopWithReader reader must be callable as "
333 "ErrorCode(const Data* buffer, size_t chunk_size)");
334 using ReaderRet = std::invoke_result_t<Reader&, const Data*, size_t>;
335 static_assert(std::is_convertible_v<ReaderRet, ErrorCode>,
336 "PopWithReader reader return type must be convertible to ErrorCode");
337
338 if (size == 0U)
339 {
340 return ErrorCode::OK;
341 }
342
343 const auto current_head = head_.load(std::memory_order_relaxed);
344 const auto current_tail = tail_.load(std::memory_order_acquire);
345 const size_t capacity = LENGTH + 1;
346 const size_t available = (current_tail >= current_head)
347 ? (current_tail - current_head)
348 : (capacity - current_head + current_tail);
349
350 if (available < size)
351 {
352 return ErrorCode::EMPTY;
353 }
354
355 const size_t first_chunk = LibXR::min(size, capacity - static_cast<size_t>(current_head));
356 Reader& reader_ref = reader;
357 const ErrorCode first_ec = reader_ref(queue_handle_ + current_head, first_chunk);
358 if (first_ec != ErrorCode::OK)
359 {
360 return first_ec;
361 }
362
363 if (size > first_chunk)
364 {
365 const ErrorCode second_ec = reader_ref(queue_handle_, size - first_chunk);
366 if (second_ec != ErrorCode::OK)
367 {
368 return second_ec;
369 }
370 }
371
372 head_.store((current_head + size) % capacity, std::memory_order_release);
373 return ErrorCode::OK;
374 }
ErrorCode
定义错误码枚举

◆ Push()

template<typename Data >
template<typename ElementData = Data>
ErrorCode LibXR::LockFreeQueue< Data >::Push ( ElementData && item)
inline

向队列中推入数据 / Pushes data into the queue

Parameters
item要插入的元素 / Element to be inserted
Returns
操作结果,成功返回 ErrorCode::OK,队列满返回 ErrorCode::FULL / Operation result: returns ErrorCode::OK on success, ErrorCode::FULL if the queue is full

Definition at line 78 of file lockfree_queue.hpp.

79 {
80 const auto CURRENT_TAIL = tail_.load(std::memory_order_relaxed);
81 const auto NEXT_TAIL = Increment(CURRENT_TAIL);
82
83 if (NEXT_TAIL == head_.load(std::memory_order_acquire))
84 {
85 return ErrorCode::FULL;
86 }
87
88 queue_handle_[CURRENT_TAIL] = std::forward<ElementData>(item);
89 tail_.store(NEXT_TAIL, std::memory_order_release);
90 return ErrorCode::OK;
91 }
@ FULL
已满 | Full

◆ PushBatch()

template<typename Data >
ErrorCode LibXR::LockFreeQueue< Data >::PushBatch ( const Data * data,
size_t size )
inline

批量推入数据 / Pushes multiple elements into the queue

Parameters
data数据数组指针 / Pointer to the data array
size数据个数 / Number of elements
Returns
操作结果,成功返回 ErrorCode::OK,队列满返回 ErrorCode::FULL / Operation result: returns ErrorCode::OK on success, ErrorCode::FULL if the queue is full

Definition at line 225 of file lockfree_queue.hpp.

226 {
227 auto current_tail = tail_.load(std::memory_order_relaxed);
228 auto current_head = head_.load(std::memory_order_acquire);
229
230 size_t capacity = LENGTH + 1;
231 size_t free_space = (current_tail >= current_head)
232 ? (capacity - (current_tail - current_head) - 1)
233 : (current_head - current_tail - 1);
234
235 if (free_space < size)
236 {
237 return ErrorCode::FULL;
238 }
239
240 size_t first_chunk = LibXR::min(size, capacity - current_tail);
241 LibXR::Memory::FastCopy(reinterpret_cast<void*>(queue_handle_ + current_tail),
242 reinterpret_cast<const void*>(data),
243 first_chunk * sizeof(Data));
244
245 if (size > first_chunk)
246 {
247 LibXR::Memory::FastCopy(reinterpret_cast<void*>(queue_handle_),
248 reinterpret_cast<const void*>(data + first_chunk),
249 (size - first_chunk) * sizeof(Data));
250 }
251
252 tail_.store((current_tail + size) % capacity, std::memory_order_release);
253 return ErrorCode::OK;
254 }

◆ PushWithWriter()

template<typename Data >
template<typename Writer >
ErrorCode LibXR::LockFreeQueue< Data >::PushWithWriter ( size_t size,
Writer && writer )
inline

通过写入器回调写入固定长度数据(单生产者) / Push fixed-size data via writer callback (single producer)

Parameters
size需要写入的元素数 / Number of elements to write
writer写入器:签名 ErrorCode(Data* buffer, size_t chunk_size) Writer callback signature: ErrorCode(Data* buffer, size_t chunk_size)
Note
语义对齐 PushBatch:空间不足返回 FULL;仅当整段写入成功后才提交 tail。 Semantics align with PushBatch: returns FULL when space is insufficient; tail is committed only after whole range is written successfully.

Definition at line 269 of file lockfree_queue.hpp.

270 {
271 static_assert(std::is_invocable_v<Writer&, Data*, size_t>,
272 "PushWithWriter writer must be callable as "
273 "ErrorCode(Data* buffer, size_t chunk_size)");
274 using WriterRet = std::invoke_result_t<Writer&, Data*, size_t>;
275 static_assert(std::is_convertible_v<WriterRet, ErrorCode>,
276 "PushWithWriter writer return type must be convertible to ErrorCode");
277
278 if (size == 0U)
279 {
280 return ErrorCode::OK;
281 }
282
283 const auto current_tail = tail_.load(std::memory_order_relaxed);
284 const auto current_head = head_.load(std::memory_order_acquire);
285 const size_t capacity = LENGTH + 1;
286 const size_t free_space =
287 (current_tail >= current_head) ? (capacity - (current_tail - current_head) - 1)
288 : (current_head - current_tail - 1);
289
290 if (free_space < size)
291 {
292 return ErrorCode::FULL;
293 }
294
295 const size_t first_chunk = LibXR::min(size, capacity - static_cast<size_t>(current_tail));
296 Writer& writer_ref = writer;
297 const ErrorCode first_ec = writer_ref(queue_handle_ + current_tail, first_chunk);
298 if (first_ec != ErrorCode::OK)
299 {
300 return first_ec;
301 }
302
303 if (size > first_chunk)
304 {
305 const ErrorCode second_ec = writer_ref(queue_handle_, size - first_chunk);
306 if (second_ec != ErrorCode::OK)
307 {
308 return second_ec;
309 }
310 }
311
312 tail_.store((current_tail + size) % capacity, std::memory_order_release);
313 return ErrorCode::OK;
314 }

◆ Reset()

template<typename Data >
void LibXR::LockFreeQueue< Data >::Reset ( )
inline

重置队列 / Resets the queue

该方法清空队列并将头尾指针重置为 0。 This method clears the queue and resets the head and tail pointers to 0.

Definition at line 488 of file lockfree_queue.hpp.

489 {
490 head_.store(0, std::memory_order_relaxed);
491 tail_.store(0, std::memory_order_relaxed);
492 }

◆ Size()

template<typename Data >
size_t LibXR::LockFreeQueue< Data >::Size ( ) const
inline

获取当前队列中的元素数量 / Returns the number of elements currently in the queue

Definition at line 498 of file lockfree_queue.hpp.

499 {
500 const auto CURRENT_HEAD = head_.load(std::memory_order_acquire);
501 const auto CURRENT_TAIL = tail_.load(std::memory_order_acquire);
502 return (CURRENT_TAIL >= CURRENT_HEAD) ? (CURRENT_TAIL - CURRENT_HEAD)
503 : ((LENGTH + 1) - CURRENT_HEAD + CURRENT_TAIL);
504 }

Field Documentation

◆ head_

template<typename Data >
std::atomic<uint32_t> LibXR::LockFreeQueue< Data >::head_
private

Definition at line 517 of file lockfree_queue.hpp.

◆ LENGTH

template<typename Data >
const size_t LibXR::LockFreeQueue< Data >::LENGTH
private

Definition at line 519 of file lockfree_queue.hpp.

◆ queue_handle_

template<typename Data >
Data* LibXR::LockFreeQueue< Data >::queue_handle_
private

Definition at line 520 of file lockfree_queue.hpp.

◆ tail_

template<typename Data >
std::atomic<uint32_t> LibXR::LockFreeQueue< Data >::tail_
private

Definition at line 518 of file lockfree_queue.hpp.


The documentation for this class was generated from the following file: