libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
subscriber.hpp
1#pragma once
2
3#include "message.hpp"
4
5namespace LibXR
6{
11enum class Topic::SuberType : uint8_t
12{
13 SYNC,
14 ASYNC,
15 QUEUE,
16 CALLBACK,
17};
18
27
33{
34 // WAIT_CLAIMED keeps a wakeup owned by the timed-out waiter until it consumes the
35 // semaphore post; otherwise a new Wait() could steal that post.
36 // WAIT_CLAIMED 表示本次唤醒已经归当前 waiter 所有,直到它消费 semaphore post。
37 enum WaitState : uint32_t
38 {
39 WAIT_IDLE = 0,
40 WAITING = 1,
41 WAIT_CLAIMED = 2
42 };
43
46 std::atomic<uint32_t> wait_state = WAIT_IDLE;
48};
49
56template <typename Data>
58{
59 public:
71 SyncSubscriber(const char* name, Data& data, Domain* domain = nullptr)
72 : SyncSubscriber(Topic(WaitTopic(name, UINT32_MAX, domain)), data)
73 {
74 }
75
86 SyncSubscriber(Topic topic, Data& data)
87 {
89
92 block_->data_.timestamp = MicrosecondTimestamp();
93 block_->data_.wait_state.store(SyncBlock::WAIT_IDLE, std::memory_order_relaxed);
94 block_->data_.buff = RawData(data);
95 topic.block_->data_.subers.Add(*block_);
96 }
97
98 SyncSubscriber(const SyncSubscriber&) = delete;
99 SyncSubscriber& operator=(const SyncSubscriber&) = delete;
100
101 SyncSubscriber(SyncSubscriber&& other) noexcept : block_(other.block_)
102 {
103 other.block_ = nullptr;
104 }
105
106 SyncSubscriber& operator=(SyncSubscriber&& other) noexcept
107 {
108 if (this != &other)
109 {
110 block_ = other.block_;
111 other.block_ = nullptr;
112 }
113 return *this;
114 }
115
121 ErrorCode Wait(uint32_t timeout = UINT32_MAX)
122 {
123 ASSERT(block_ != nullptr);
124
125 auto& data = block_->data_;
126 uint32_t expected = SyncBlock::WAIT_IDLE;
127 if (!data.wait_state.compare_exchange_strong(expected, SyncBlock::WAITING,
128 std::memory_order_acq_rel,
129 std::memory_order_acquire))
130 {
131 return ErrorCode::BUSY;
132 }
133
134 auto wait_ans = data.sem.Wait(timeout);
135 if (wait_ans == ErrorCode::OK)
136 {
137 data.wait_state.store(SyncBlock::WAIT_IDLE, std::memory_order_release);
138 return ErrorCode::OK;
139 }
140
141 expected = SyncBlock::WAITING;
142 if (data.wait_state.compare_exchange_strong(expected, SyncBlock::WAIT_IDLE,
143 std::memory_order_acq_rel,
144 std::memory_order_acquire))
145 {
146 return wait_ans;
147 }
148
149 ASSERT(data.wait_state.load(std::memory_order_acquire) == SyncBlock::WAIT_CLAIMED);
150
151 auto finish_wait_ans = data.sem.Wait(UINT32_MAX);
152 UNUSED(finish_wait_ans);
153 ASSERT(finish_wait_ans == ErrorCode::OK);
154 data.wait_state.store(SyncBlock::WAIT_IDLE, std::memory_order_release);
155 return ErrorCode::OK;
156 }
157
158 MicrosecondTimestamp GetTimestamp() const { return block_->data_.timestamp; }
159
161};
162
163enum class Topic::ASyncSubscriberState : uint32_t
164{
165 IDLE = 0,
166 WAITING = 1,
167 DATA_READY = UINT32_MAX
168};
169
176{
179 std::atomic<ASyncSubscriberState> state =
180 ASyncSubscriberState::IDLE;
181};
182
189template <typename Data>
191{
192 public:
203 ASyncSubscriber(const char* name, Domain* domain = nullptr)
204 : ASyncSubscriber(Topic(WaitTopic(name, UINT32_MAX, domain)))
205 {
206 }
207
218 {
220
223 block_->data_.timestamp = MicrosecondTimestamp();
224 block_->data_.buff = NewSubscriberBuffer<Data>();
225 topic.block_->data_.subers.Add(*block_);
226 }
227
228 ASyncSubscriber(const ASyncSubscriber&) = delete;
229 ASyncSubscriber& operator=(const ASyncSubscriber&) = delete;
230
231 ASyncSubscriber(ASyncSubscriber&& other) noexcept : block_(other.block_)
232 {
233 other.block_ = nullptr;
234 }
235
236 ASyncSubscriber& operator=(ASyncSubscriber&& other) noexcept
237 {
238 if (this != &other)
239 {
240 block_ = other.block_;
241 other.block_ = nullptr;
242 }
243 return *this;
244 }
245
255 {
256 return block_->data_.state.load(std::memory_order_acquire) ==
257 ASyncSubscriberState::DATA_READY;
258 }
259
266 Data& GetData()
267 {
268 if (block_->data_.state.load(std::memory_order_acquire) ==
269 ASyncSubscriberState::DATA_READY)
270 {
271 block_->data_.state.store(ASyncSubscriberState::IDLE, std::memory_order_release);
272 }
273 return *reinterpret_cast<Data*>(block_->data_.buff.addr_);
274 }
275
276 MicrosecondTimestamp GetTimestamp() const { return block_->data_.timestamp; }
277
288 {
289 if (block_->data_.state.load(std::memory_order_acquire) ==
290 ASyncSubscriberState::IDLE)
291 {
292 block_->data_.state.store(ASyncSubscriberState::WAITING,
293 std::memory_order_release);
294 }
295 }
296
298};
299
311
313{
314 public:
327 template <typename Data>
328 QueuedSubscriber(const char* name, LockFreeQueue<Data>& queue, Domain* domain = nullptr)
329 : QueuedSubscriber(Topic(WaitTopic(name, UINT32_MAX, domain)), queue)
330 {
331 }
332
341 template <typename Data>
343 Domain* domain = nullptr)
344 : QueuedSubscriber(Topic(WaitTopic(name, UINT32_MAX, domain)), queue)
345 {
346 }
347
359 template <typename Data>
361 {
363
365 block_->data_.type = SuberType::QUEUE;
366 block_->data_.queue = &queue;
367 block_->data_.fun = [](MicrosecondTimestamp, RawData data, QueueBlock& block)
368 {
369 LockFreeQueue<Data>* queue = reinterpret_cast<LockFreeQueue<Data>*>(block.queue);
370 (void)queue->Push(*reinterpret_cast<Data*>(data.addr_));
371 };
372
373 topic.block_->data_.subers.Add(*block_);
374 }
375
380 template <typename Data>
382 {
384
386 block_->data_.type = SuberType::QUEUE;
387 block_->data_.queue = &queue;
388 block_->data_.fun =
389 [](MicrosecondTimestamp timestamp, RawData data, QueueBlock& block)
390 {
392 reinterpret_cast<LockFreeQueue<Message<Data>>*>(block.queue);
393 (void)queue->Push(Message<Data>{timestamp, *reinterpret_cast<Data*>(data.addr_)});
394 };
395
396 topic.block_->data_.subers.Add(*block_);
397 }
398
399 QueuedSubscriber(const QueuedSubscriber&) = delete;
400 QueuedSubscriber& operator=(const QueuedSubscriber&) = delete;
401
402 QueuedSubscriber(QueuedSubscriber&& other) noexcept : block_(other.block_)
403 {
404 other.block_ = nullptr;
405 }
406
407 QueuedSubscriber& operator=(QueuedSubscriber&& other) noexcept
408 {
409 if (this != &other)
410 {
411 block_ = other.block_;
412 other.block_ = nullptr;
413 }
414 return *this;
415 }
416
417 private:
418 LockFreeList::Node<QueueBlock>* block_ = nullptr;
419};
420
422{
423 template <typename Function>
425
426 template <typename Return, typename... Args>
427 struct FunctionTraits<Return (*)(Args...)>
428 {
429 using ReturnType = Return;
430 static constexpr size_t ARITY = sizeof...(Args);
431
432 template <size_t Index>
433 using Arg = std::tuple_element_t<Index, std::tuple<Args...>>;
434 };
435
436 template <typename T>
438 {
439 static constexpr bool VALUE = false;
440 };
441
442 template <typename Data>
444 {
445 static constexpr bool VALUE = true;
446 using DataType = Data;
447 };
448
449 template <typename T>
450 using RemoveCVRef = std::remove_cv_t<std::remove_reference_t<T>>;
451
452 template <typename T>
453 static constexpr bool IS_RAW_DATA =
454 std::same_as<RemoveCVRef<T>, RawData> || std::same_as<RemoveCVRef<T>, ConstRawData>;
455
456 template <typename T>
457 static constexpr bool IS_MESSAGE_VIEW = MessageViewTraits<RemoveCVRef<T>>::VALUE;
458
459 template <typename T>
460 static constexpr bool IS_TYPED_DATA =
461 !IS_RAW_DATA<T> && !IS_MESSAGE_VIEW<T> && TopicPayload<RemoveCVRef<T>>;
462
463 template <typename T>
464 static constexpr bool IS_CALLBACK_PAYLOAD =
465 IS_RAW_DATA<T> || IS_MESSAGE_VIEW<T> || IS_TYPED_DATA<T>;
466
468 {
469 using RunFun = void (*)(const BlockHeader*, bool, MicrosecondTimestamp, RawData&);
470
471 RunFun run = nullptr;
472 uint32_t payload_size = 0;
473 };
474
475 template <typename PayloadArg>
476 static consteval uint32_t PayloadSize()
477 {
478 if constexpr (IS_RAW_DATA<PayloadArg>)
479 {
480 return 0;
481 }
482 else if constexpr (IS_MESSAGE_VIEW<PayloadArg>)
483 {
484 using View = MessageViewTraits<RemoveCVRef<PayloadArg>>;
485 return sizeof(typename View::DataType);
486 }
487 else
488 {
489 static_assert(IS_TYPED_DATA<PayloadArg>,
490 "LibXR::Topic::Callback payload must be RawData, ConstRawData, "
491 "Topic::MessageView<T>, T, T&, or const T&.");
492 return sizeof(RemoveCVRef<PayloadArg>);
493 }
494 }
495
496 template <typename Function, typename BoundArg, typename PayloadArg>
497 static void InvokePayload(Function fun, BoundArg& arg, bool in_isr,
498 MicrosecondTimestamp timestamp, RawData& data)
499 {
500 if constexpr (std::same_as<RemoveCVRef<PayloadArg>, RawData>)
501 {
502 fun(in_isr, arg, data);
503 }
504 else if constexpr (std::same_as<RemoveCVRef<PayloadArg>, ConstRawData>)
505 {
506 ConstRawData const_data(data);
507 fun(in_isr, arg, const_data);
508 }
509 else if constexpr (IS_MESSAGE_VIEW<PayloadArg>)
510 {
511 using View = MessageViewTraits<RemoveCVRef<PayloadArg>>;
512 using Data = typename View::DataType;
513 MessageView<Data> message{timestamp, *reinterpret_cast<Data*>(data.addr_)};
514 fun(in_isr, arg, message);
515 }
516 else
517 {
518 using Data = RemoveCVRef<PayloadArg>;
519 fun(in_isr, arg, *reinterpret_cast<Data*>(data.addr_));
520 }
521 }
522
523 template <typename Function, typename BoundArg, typename PayloadArg>
525 {
526 PayloadOnlyBlock(Function fun, BoundArg&& arg)
527 : BlockHeader{&Run, PayloadSize<PayloadArg>()}, fun_(fun), arg_(std::move(arg))
528 {
529 }
530
531 static void Run(const BlockHeader* header, bool in_isr,
532 MicrosecondTimestamp timestamp, RawData& data)
533 {
534 auto* block = static_cast<const PayloadOnlyBlock*>(header);
535 InvokePayload<Function, BoundArg, PayloadArg>(
536 block->fun_, const_cast<BoundArg&>(block->arg_), in_isr, timestamp, data);
537 }
538
539 Function fun_;
540 BoundArg arg_;
541 };
542
543 template <typename Function, typename BoundArg, typename PayloadArg>
545 {
546 TimestampPayloadBlock(Function fun, BoundArg&& arg)
547 : BlockHeader{&Run, PayloadSize<PayloadArg>()}, fun_(fun), arg_(std::move(arg))
548 {
549 }
550
551 static void Run(const BlockHeader* header, bool in_isr,
552 MicrosecondTimestamp timestamp, RawData& data)
553 {
554 auto* block = static_cast<const TimestampPayloadBlock*>(header);
555 if constexpr (std::same_as<RemoveCVRef<PayloadArg>, RawData>)
556 {
557 block->fun_(in_isr, block->arg_, timestamp, data);
558 }
559 else if constexpr (std::same_as<RemoveCVRef<PayloadArg>, ConstRawData>)
560 {
561 ConstRawData const_data(data);
562 block->fun_(in_isr, block->arg_, timestamp, const_data);
563 }
564 else if constexpr (IS_MESSAGE_VIEW<PayloadArg>)
565 {
567 using Data = typename View::DataType;
568 MessageView<Data> message{timestamp, *reinterpret_cast<Data*>(data.addr_)};
569 block->fun_(in_isr, block->arg_, timestamp, message);
570 }
571 else
572 {
573 using Data = RemoveCVRef<PayloadArg>;
574 block->fun_(in_isr, block->arg_, timestamp, *reinterpret_cast<Data*>(data.addr_));
575 }
576 }
577
578 Function fun_;
579 BoundArg arg_;
580 };
581
582 template <typename Function, typename BoundArg, size_t Arity>
583 struct Factory
584 {
585 static_assert(Arity == 3 || Arity == 4,
586 "LibXR::Topic::Callback function must be void(bool, Arg, Payload) "
587 "or void(bool, Arg, MicrosecondTimestamp, Payload).");
588 };
589
590 template <typename Function, typename BoundArg>
591 struct Factory<Function, BoundArg, 3>
592 {
594 using PayloadArg = typename Traits::template Arg<2>;
595
596 static BlockHeader* Create(Function fun, BoundArg&& arg)
597 {
598 static_assert(std::same_as<typename Traits::ReturnType, void>);
599 static_assert(std::same_as<typename Traits::template Arg<0>, bool>);
600 static_assert(std::same_as<typename Traits::template Arg<1>, BoundArg>);
601 static_assert(IS_CALLBACK_PAYLOAD<PayloadArg>);
602 return new PayloadOnlyBlock<Function, BoundArg, PayloadArg>(fun, std::move(arg));
603 }
604 };
605
606 template <typename Function, typename BoundArg>
607 struct Factory<Function, BoundArg, 4>
608 {
610 using TimestampArg = typename Traits::template Arg<2>;
611 using PayloadArg = typename Traits::template Arg<3>;
612
613 static BlockHeader* Create(Function fun, BoundArg&& arg)
614 {
615 static_assert(std::same_as<typename Traits::ReturnType, void>);
616 static_assert(std::same_as<typename Traits::template Arg<0>, bool>);
617 static_assert(std::same_as<typename Traits::template Arg<1>, BoundArg>);
618 static_assert(std::same_as<RemoveCVRef<TimestampArg>, MicrosecondTimestamp>);
619 static_assert(IS_CALLBACK_PAYLOAD<PayloadArg>);
621 std::move(arg));
622 }
623 };
624
625 static void EmptyRun(const BlockHeader*, bool, MicrosecondTimestamp, RawData&) {}
626
627 inline static BlockHeader empty_block_{&EmptyRun, 0};
628
629 public:
630 Callback() = default;
631 Callback(const Callback&) = default;
632 Callback& operator=(const Callback&) = default;
633
634 template <typename BoundArg, typename Callable>
635 [[nodiscard]] static Callback Create(Callable fun, BoundArg arg)
636 {
637 using Function = decltype(+std::declval<Callable>());
638 using Traits = FunctionTraits<Function>;
639 static_assert(Traits::ARITY == 3 || Traits::ARITY == 4,
640 "LibXR::Topic::Callback::Create expects a capture-free callable "
641 "with bool and bound-argument parameters.");
642 return Callback(
643 Factory<Function, BoundArg, Traits::ARITY>::Create(+fun, std::move(arg)));
644 }
645
646 void Run(bool in_isr, MicrosecondTimestamp timestamp, RawData& data) const
647 {
648 block_->run(block_, in_isr, timestamp, data);
649 }
650
651 uint32_t PayloadSize() const { return block_->payload_size; }
652
653 private:
654 explicit Callback(BlockHeader* block) : block_(block ? block : &empty_block_) {}
655
656 BlockHeader* block_ = &empty_block_;
657};
658
665{
666 explicit CallbackBlock(Callback& callback) : cb(callback)
667 {
669 }
670
672};
673} // namespace LibXR
通用回调包装,支持动态参数传递 / Generic callback wrapper supporting dynamic argument passing
Definition libxr_cb.hpp:142
只读原始数据视图 / Immutable raw data view
数据节点模板,继承自 BaseNode,用于存储具体数据类型。 Template data node that inherits from BaseNode to store specific data...
Data data_
存储的数据。 The stored data.
无锁队列实现 / Lock-free queue implementation
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
微秒时间戳 / Microsecond timestamp
Data data_
存储的数据 (Stored data).
Definition rbt.hpp:99
可写原始数据视图 / Mutable raw data view
void * addr_
数据起始地址 / Data start address
信号量类,实现线程同步机制 Semaphore class implementing thread synchronization
Definition semaphore.hpp:23
异步订阅者类,用于订阅异步数据 Asynchronous subscriber class for subscribing to asynchronous data
ASyncSubscriber(Topic topic)
构造函数,使用 Topic 进行初始化 Constructor using a Topic for initialization
Data & GetData()
获取当前数据 Retrieves the current data
LockFreeList::Node< ASyncBlock > * block_
订阅者数据块。Subscriber data block.
bool Available()
检查数据是否可用 Checks if data is available
ASyncSubscriber(const char *name, Domain *domain=nullptr)
构造函数,通过名称和数据创建订阅者 Constructor to create a subscriber with a name and data
void StartWaiting()
开始等待数据更新 Starts waiting for data update
主题域(Domain)管理器,用于组织多个主题。Domain manager for organizing multiple topics.
Definition message.hpp:199
QueuedSubscriber(Topic topic, LockFreeQueue< Data > &queue)
构造函数,使用 Topic 和无锁队列进行初始化 Constructor using a Topic and a lock-free queue
QueuedSubscriber(const char *name, LockFreeQueue< Data > &queue, Domain *domain=nullptr)
构造函数,自动创建队列
QueuedSubscriber(Topic topic, LockFreeQueue< Message< Data > > &queue)
构造函数,使用 Topic 和带时间戳消息队列进行初始化 Constructor using a Topic and a timestamped message queue
QueuedSubscriber(const char *name, LockFreeQueue< Message< Data > > &queue, Domain *domain=nullptr)
构造函数,使用 Topic 和无锁队列进行初始化 Constructor using a Topic and a lock-free queue
同步订阅者类,允许同步方式接收数据。Synchronous subscriber class allowing data reception in a synchronous manner.
SyncSubscriber(Topic topic, Data &data)
通过 Topic 句柄构造同步订阅者。Constructs a synchronous subscriber using a Topic handle.
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待接收数据。Waits for data reception.
LockFreeList::Node< SyncBlock > * block_
订阅者数据块。Subscriber data block.
SyncSubscriber(const char *name, Data &data, Domain *domain=nullptr)
通过主题名称构造同步订阅者。Constructs a synchronous subscriber by topic name.
主题(Topic)管理类 / Topic management class
Definition message.hpp:61
SuberType
订阅者类型。Subscriber type.
@ SYNC
同步订阅者。Synchronous subscriber.
@ ASYNC
异步订阅者。Asynchronous subscriber.
@ QUEUE
队列订阅者。Queued subscriber.
@ CALLBACK
回调订阅者。Callback subscriber.
static RawData NewSubscriberBuffer()
为异步订阅者分配长期存在的接收缓冲区。 Allocates a long-lived receive buffer for an async subscriber.
Definition message.hpp:591
static void CheckSubscriberDataSize(Topic topic)
校验订阅者数据类型和主题最大长度是否兼容。 Checks whether subscriber data type is compatible with topic max length.
Definition message.hpp:573
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待主题的创建并返回其句柄 Waits for a topic to be created and returns its handle
Definition topic.cpp:223
TopicHandle block_
主题句柄,指向当前主题的内存块 Topic handle pointing to the memory block of the current topic
Definition message.hpp:547
LibXR 命名空间
Definition ch32_can.hpp:14
ErrorCode
定义错误码枚举
@ BUSY
忙碌 | Busy
@ OK
操作成功 | Operation successful
异步订阅块,继承自 SuberBlock Asynchronous subscription block, inheriting from SuberBlock
std::atomic< ASyncSubscriberState > state
订阅者状态 Subscriber state
RawData buff
缓冲区数据 Buffer data
MicrosecondTimestamp timestamp
最近接收的消息时间戳 Latest timestamp
回调订阅块,继承自 SuberBlock Callback subscription block, inheriting from SuberBlock
Callback cb
订阅的回调函数 Subscribed callback function
带时间戳的类型化消息。Timestamped typed message.
Definition message.hpp:156
带时间戳的类型化消息视图。Timestamped typed message view.
Definition message.hpp:143
队列订阅块,继承自 SuberBlock Queue subscription block, inheriting from SuberBlock
void * queue
指向订阅队列的指针 Pointer to the subscribed queue
void(* fun)(MicrosecondTimestamp, RawData, QueueBlock &)
处理消息的回调函数 Callback function to handle message
订阅者信息存储结构。Structure storing subscriber information.
SuberType type
订阅者类型。Type of subscriber.
同步订阅者存储结构。Structure storing synchronous subscriber data.
std::atomic< uint32_t > wait_state
挂起等待状态。Pending wait state.
Semaphore sem
信号量,用于同步等待数据。Semaphore for data synchronization.
MicrosecondTimestamp timestamp
最近接收的消息时间戳。Latest received timestamp.
RawData buff
存储的数据缓冲区。Data buffer.