37 enum WaitState : uint32_t
56template <
typename Data>
93 block_->data_.wait_state.store(SyncBlock::WAIT_IDLE, std::memory_order_relaxed);
111 other.block_ =
nullptr;
123 ASSERT(
block_ !=
nullptr);
125 auto& data =
block_->data_;
126 uint32_t expected = SyncBlock::WAIT_IDLE;
127 if (!data.wait_state.compare_exchange_strong(expected, SyncBlock::WAITING,
128 std::memory_order_acq_rel,
129 std::memory_order_acquire))
134 auto wait_ans = data.sem.Wait(timeout);
137 data.wait_state.store(SyncBlock::WAIT_IDLE, std::memory_order_release);
141 expected = SyncBlock::WAITING;
142 if (data.wait_state.compare_exchange_strong(expected, SyncBlock::WAIT_IDLE,
143 std::memory_order_acq_rel,
144 std::memory_order_acquire))
149 ASSERT(data.wait_state.load(std::memory_order_acquire) == SyncBlock::WAIT_CLAIMED);
151 auto finish_wait_ans = data.sem.Wait(UINT32_MAX);
152 UNUSED(finish_wait_ans);
154 data.wait_state.store(SyncBlock::WAIT_IDLE, std::memory_order_release);
163enum class Topic::ASyncSubscriberState : uint32_t
167 DATA_READY = UINT32_MAX
179 std::atomic<ASyncSubscriberState>
state =
180 ASyncSubscriberState::IDLE;
189template <
typename Data>
241 other.block_ =
nullptr;
256 return block_->data_.state.load(std::memory_order_acquire) ==
257 ASyncSubscriberState::DATA_READY;
268 if (
block_->data_.state.load(std::memory_order_acquire) ==
269 ASyncSubscriberState::DATA_READY)
271 block_->data_.state.store(ASyncSubscriberState::IDLE, std::memory_order_release);
273 return *
reinterpret_cast<Data*
>(
block_->data_.buff.addr_);
289 if (
block_->data_.state.load(std::memory_order_acquire) ==
290 ASyncSubscriberState::IDLE)
292 block_->data_.state.store(ASyncSubscriberState::WAITING,
293 std::memory_order_release);
327 template <
typename Data>
341 template <
typename Data>
359 template <
typename Data>
366 block_->data_.queue = &queue;
370 (void)queue->
Push(*
reinterpret_cast<Data*
>(data.addr_));
380 template <
typename Data>
387 block_->data_.queue = &queue;
393 (void)queue->
Push(
Message<Data>{timestamp, *reinterpret_cast<Data*>(data.addr_)});
404 other.block_ =
nullptr;
411 block_ = other.block_;
412 other.block_ =
nullptr;
418 LockFreeList::Node<QueueBlock>* block_ =
nullptr;
423 template <
typename Function>
426 template <
typename Return,
typename... Args>
429 using ReturnType = Return;
430 static constexpr size_t ARITY =
sizeof...(Args);
432 template <
size_t Index>
433 using Arg = std::tuple_element_t<Index, std::tuple<Args...>>;
436 template <
typename T>
439 static constexpr bool VALUE =
false;
442 template <
typename Data>
445 static constexpr bool VALUE =
true;
446 using DataType = Data;
449 template <
typename T>
450 using RemoveCVRef = std::remove_cv_t<std::remove_reference_t<T>>;
452 template <
typename T>
453 static constexpr bool IS_RAW_DATA =
456 template <
typename T>
459 template <
typename T>
460 static constexpr bool IS_TYPED_DATA =
463 template <
typename T>
464 static constexpr bool IS_CALLBACK_PAYLOAD =
465 IS_RAW_DATA<T> || IS_MESSAGE_VIEW<T> || IS_TYPED_DATA<T>;
471 RunFun run =
nullptr;
472 uint32_t payload_size = 0;
475 template <
typename PayloadArg>
476 static consteval uint32_t PayloadSize()
478 if constexpr (IS_RAW_DATA<PayloadArg>)
482 else if constexpr (IS_MESSAGE_VIEW<PayloadArg>)
484 using View = MessageViewTraits<RemoveCVRef<PayloadArg>>;
485 return sizeof(
typename View::DataType);
489 static_assert(IS_TYPED_DATA<PayloadArg>,
490 "LibXR::Topic::Callback payload must be RawData, ConstRawData, "
491 "Topic::MessageView<T>, T, T&, or const T&.");
492 return sizeof(RemoveCVRef<PayloadArg>);
496 template <
typename Function,
typename BoundArg,
typename PayloadArg>
497 static void InvokePayload(Function fun, BoundArg& arg,
bool in_isr,
498 MicrosecondTimestamp timestamp, RawData& data)
500 if constexpr (std::same_as<RemoveCVRef<PayloadArg>, RawData>)
502 fun(in_isr, arg, data);
504 else if constexpr (std::same_as<RemoveCVRef<PayloadArg>, ConstRawData>)
506 ConstRawData const_data(data);
507 fun(in_isr, arg, const_data);
509 else if constexpr (IS_MESSAGE_VIEW<PayloadArg>)
511 using View = MessageViewTraits<RemoveCVRef<PayloadArg>>;
512 using Data =
typename View::DataType;
513 MessageView<Data> message{timestamp, *
reinterpret_cast<Data*
>(data.addr_)};
514 fun(in_isr, arg, message);
518 using Data = RemoveCVRef<PayloadArg>;
519 fun(in_isr, arg, *
reinterpret_cast<Data*
>(data.addr_));
523 template <
typename Function,
typename BoundArg,
typename PayloadArg>
527 :
BlockHeader{&Run, PayloadSize<PayloadArg>()}, fun_(fun), arg_(std::move(arg))
531 static void Run(
const BlockHeader* header,
bool in_isr,
535 InvokePayload<Function, BoundArg, PayloadArg>(
536 block->fun_,
const_cast<BoundArg&
>(block->arg_), in_isr, timestamp, data);
543 template <
typename Function,
typename BoundArg,
typename PayloadArg>
547 :
BlockHeader{&Run, PayloadSize<PayloadArg>()}, fun_(fun), arg_(std::move(arg))
551 static void Run(
const BlockHeader* header,
bool in_isr,
555 if constexpr (std::same_as<RemoveCVRef<PayloadArg>,
RawData>)
557 block->fun_(in_isr, block->arg_, timestamp, data);
559 else if constexpr (std::same_as<RemoveCVRef<PayloadArg>,
ConstRawData>)
562 block->fun_(in_isr, block->arg_, timestamp, const_data);
564 else if constexpr (IS_MESSAGE_VIEW<PayloadArg>)
567 using Data =
typename View::DataType;
569 block->fun_(in_isr, block->arg_, timestamp, message);
573 using Data = RemoveCVRef<PayloadArg>;
574 block->fun_(in_isr, block->arg_, timestamp, *
reinterpret_cast<Data*
>(data.
addr_));
582 template <
typename Function,
typename BoundArg,
size_t Arity>
585 static_assert(Arity == 3 || Arity == 4,
586 "LibXR::Topic::Callback function must be void(bool, Arg, Payload) "
587 "or void(bool, Arg, MicrosecondTimestamp, Payload).");
590 template <
typename Function,
typename BoundArg>
594 using PayloadArg =
typename Traits::template Arg<2>;
596 static BlockHeader* Create(Function fun, BoundArg&& arg)
598 static_assert(std::same_as<typename Traits::ReturnType, void>);
599 static_assert(std::same_as<typename Traits::template Arg<0>,
bool>);
600 static_assert(std::same_as<typename Traits::template Arg<1>, BoundArg>);
601 static_assert(IS_CALLBACK_PAYLOAD<PayloadArg>);
606 template <
typename Function,
typename BoundArg>
610 using TimestampArg =
typename Traits::template Arg<2>;
611 using PayloadArg =
typename Traits::template Arg<3>;
613 static BlockHeader* Create(Function fun, BoundArg&& arg)
615 static_assert(std::same_as<typename Traits::ReturnType, void>);
616 static_assert(std::same_as<typename Traits::template Arg<0>,
bool>);
617 static_assert(std::same_as<typename Traits::template Arg<1>, BoundArg>);
619 static_assert(IS_CALLBACK_PAYLOAD<PayloadArg>);
627 inline static BlockHeader empty_block_{&EmptyRun, 0};
634 template <
typename BoundArg,
typename Callable>
635 [[nodiscard]]
static Callback Create(Callable fun, BoundArg arg)
637 using Function =
decltype(+std::declval<Callable>());
638 using Traits = FunctionTraits<Function>;
639 static_assert(Traits::ARITY == 3 || Traits::ARITY == 4,
640 "LibXR::Topic::Callback::Create expects a capture-free callable "
641 "with bool and bound-argument parameters.");
643 Factory<Function, BoundArg, Traits::ARITY>::Create(+fun, std::move(arg)));
646 void Run(
bool in_isr, MicrosecondTimestamp timestamp, RawData& data)
const
648 block_->run(block_, in_isr, timestamp, data);
651 uint32_t PayloadSize()
const {
return block_->payload_size; }
654 explicit Callback(BlockHeader* block) : block_(block ? block : &empty_block_) {}
656 BlockHeader* block_ = &empty_block_;
通用回调包装,支持动态参数传递 / Generic callback wrapper supporting dynamic argument passing
只读原始数据视图 / Immutable raw data view
数据节点模板,继承自 BaseNode,用于存储具体数据类型。 Template data node that inherits from BaseNode to store specific data...
Data data_
存储的数据。 The stored data.
无锁队列实现 / Lock-free queue implementation
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
微秒时间戳 / Microsecond timestamp
Data data_
存储的数据 (Stored data).
可写原始数据视图 / Mutable raw data view
void * addr_
数据起始地址 / Data start address
信号量类,实现线程同步机制 Semaphore class implementing thread synchronization
异步订阅者类,用于订阅异步数据 Asynchronous subscriber class for subscribing to asynchronous data
ASyncSubscriber(Topic topic)
构造函数,使用 Topic 进行初始化 Constructor using a Topic for initialization
Data & GetData()
获取当前数据 Retrieves the current data
LockFreeList::Node< ASyncBlock > * block_
订阅者数据块。Subscriber data block.
bool Available()
检查数据是否可用 Checks if data is available
ASyncSubscriber(const char *name, Domain *domain=nullptr)
构造函数,通过名称和数据创建订阅者 Constructor to create a subscriber with a name and data
void StartWaiting()
开始等待数据更新 Starts waiting for data update
主题域(Domain)管理器,用于组织多个主题。Domain manager for organizing multiple topics.
QueuedSubscriber(Topic topic, LockFreeQueue< Data > &queue)
构造函数,使用 Topic 和无锁队列进行初始化 Constructor using a Topic and a lock-free queue
QueuedSubscriber(const char *name, LockFreeQueue< Data > &queue, Domain *domain=nullptr)
构造函数,自动创建队列
QueuedSubscriber(Topic topic, LockFreeQueue< Message< Data > > &queue)
构造函数,使用 Topic 和带时间戳消息队列进行初始化 Constructor using a Topic and a timestamped message queue
QueuedSubscriber(const char *name, LockFreeQueue< Message< Data > > &queue, Domain *domain=nullptr)
构造函数,使用 Topic 和无锁队列进行初始化 Constructor using a Topic and a lock-free queue
同步订阅者类,允许同步方式接收数据。Synchronous subscriber class allowing data reception in a synchronous manner.
SyncSubscriber(Topic topic, Data &data)
通过 Topic 句柄构造同步订阅者。Constructs a synchronous subscriber using a Topic handle.
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待接收数据。Waits for data reception.
LockFreeList::Node< SyncBlock > * block_
订阅者数据块。Subscriber data block.
SyncSubscriber(const char *name, Data &data, Domain *domain=nullptr)
通过主题名称构造同步订阅者。Constructs a synchronous subscriber by topic name.
主题(Topic)管理类 / Topic management class
SuberType
订阅者类型。Subscriber type.
@ SYNC
同步订阅者。Synchronous subscriber.
@ ASYNC
异步订阅者。Asynchronous subscriber.
@ QUEUE
队列订阅者。Queued subscriber.
@ CALLBACK
回调订阅者。Callback subscriber.
static RawData NewSubscriberBuffer()
为异步订阅者分配长期存在的接收缓冲区。 Allocates a long-lived receive buffer for an async subscriber.
static void CheckSubscriberDataSize(Topic topic)
校验订阅者数据类型和主题最大长度是否兼容。 Checks whether subscriber data type is compatible with topic max length.
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待主题的创建并返回其句柄 Waits for a topic to be created and returns its handle
TopicHandle block_
主题句柄,指向当前主题的内存块 Topic handle pointing to the memory block of the current topic
@ OK
操作成功 | Operation successful
异步订阅块,继承自 SuberBlock Asynchronous subscription block, inheriting from SuberBlock
std::atomic< ASyncSubscriberState > state
订阅者状态 Subscriber state
RawData buff
缓冲区数据 Buffer data
MicrosecondTimestamp timestamp
最近接收的消息时间戳 Latest timestamp
回调订阅块,继承自 SuberBlock Callback subscription block, inheriting from SuberBlock
Callback cb
订阅的回调函数 Subscribed callback function
带时间戳的类型化消息。Timestamped typed message.
带时间戳的类型化消息视图。Timestamped typed message view.
队列订阅块,继承自 SuberBlock Queue subscription block, inheriting from SuberBlock
void * queue
指向订阅队列的指针 Pointer to the subscribed queue
void(* fun)(MicrosecondTimestamp, RawData, QueueBlock &)
处理消息的回调函数 Callback function to handle message
订阅者信息存储结构。Structure storing subscriber information.
SuberType type
订阅者类型。Type of subscriber.
同步订阅者存储结构。Structure storing synchronous subscriber data.
std::atomic< uint32_t > wait_state
挂起等待状态。Pending wait state.
Semaphore sem
信号量,用于同步等待数据。Semaphore for data synchronization.
MicrosecondTimestamp timestamp
最近接收的消息时间戳。Latest received timestamp.
RawData buff
存储的数据缓冲区。Data buffer.