libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
topic.hpp
1#pragma once
2
3#include <atomic>
4#include <concepts>
5#include <cstddef>
6#include <cstdint>
7#include <cstddef>
8
9#include "libxr_cb.hpp"
10#include "libxr_def.hpp"
11#include "libxr_time.hpp"
12#include "libxr_type.hpp"
13#include "lockfree_list.hpp"
14#include "lockfree_queue.hpp"
15#include "mutex.hpp"
16#include "rbt.hpp"
17#include "semaphore.hpp"
18#include "thread.hpp"
19
20namespace LibXR
21{
27template <typename Data>
28concept TopicPayload =
29 !std::is_reference_v<Data> && !std::is_const_v<Data> && !std::is_volatile_v<Data> &&
30 std::is_object_v<Data> && std::is_default_constructible_v<Data> &&
31 std::is_copy_assignable_v<Data> && std::is_trivially_destructible_v<Data>;
32
38template <typename Data>
39constexpr void CheckTopicPayload()
40{
41 static_assert(TopicPayload<Data>,
42 "LibXR::Topic typed payload must be a non-cv/ref object type that is "
43 "default-constructible, copy-assignable, and trivially destructible.");
44}
45
56class Topic
57{
62 enum class LockState : uint32_t
63 {
64 UNLOCKED = 0,
65 LOCKED = 1,
66 USE_MUTEX = UINT32_MAX
67 };
68
69 public:
80 struct Block
81 {
82 std::atomic<LockState> busy;
84 TypeID::ID payload_type_id;
85 uint32_t payload_size;
87 uint32_t crc32;
89 };
90
91#ifndef __DOXYGEN__
96 struct PackedDataHeader;
97
104 template <typename Data>
105 class PackedData;
106 static constexpr uint8_t PACKET_PREFIX = 0x5A;
107 static constexpr uint8_t PACKET_VERSION = 0x01;
108 static constexpr size_t PACK_BASE_SIZE = 17;
109#endif
110
117
124 template <typename Data>
126 {
127 static_assert(TopicPayload<Data>);
129 Data* data;
130 };
131
138 template <typename Data>
139 struct Message
140 {
141 static_assert(TopicPayload<Data>);
143 Data data;
144 };
145
151 static void Lock(TopicHandle topic);
152
158 static void Unlock(TopicHandle topic);
159
165 static void LockFromCallback(TopicHandle topic);
166
172 static void UnlockFromCallback(TopicHandle topic);
173
178 class Domain
179 {
180 public:
185 Domain(const char* name);
186
188 };
189
194 enum class SuberType : uint8_t
195 {
196 SYNC,
197 ASYNC,
198 QUEUE,
199 CALLBACK,
200 };
201
207 {
209 };
210
216 struct SyncBlock;
217
224 template <typename Data>
226
231 enum class ASyncSubscriberState : uint32_t;
232
238 struct ASyncBlock;
239
246 template <typename Data>
248
254 struct QueueBlock;
255
260 class QueuedSubscriber;
261
267 class Callback;
268
274 struct CallbackBlock;
275
281 class Server;
282
287 void RegisterCallback(Callback& cb);
288
294
298 Topic();
299
312 Topic(const char* name, TypeID::ID payload_type_id, size_t payload_size,
313 size_t payload_alignment, Domain* domain = nullptr,
314 bool multi_publisher = false);
315
326 template <typename Data>
327 static Topic CreateTopic(const char* name, Domain* domain = nullptr,
328 bool multi_publisher = false)
329 {
331 return Topic(name, TypeID::GetID<Data>(), sizeof(Data), alignof(Data), domain,
332 multi_publisher);
333 }
334
340 Topic(TopicHandle topic);
341
348 static TopicHandle Find(const char* name, Domain* domain = nullptr);
349
360 template <typename Data>
361 static TopicHandle FindOrCreate(const char* name, Domain* domain = nullptr,
362 bool multi_publisher = false)
363 {
365 auto topic = Find(name, domain);
366 if (topic != nullptr)
367 {
369 if (multi_publisher && !topic->data_.mutex)
370 {
371 ASSERT(false);
372 }
373 }
374 else
375 {
376 topic = CreateTopic<Data>(name, domain, multi_publisher).block_;
377 }
378 return topic;
379 }
380
385 [[nodiscard]] size_t PayloadSize() const
386 {
387 ASSERT(block_ != nullptr);
388 return block_->data_.payload_size;
389 }
390
396 [[nodiscard]] size_t PayloadAlignment() const
397 {
398 ASSERT(block_ != nullptr);
399 return block_->data_.payload_alignment;
400 }
401
408 template <typename Data>
409 void Publish(Data& data)
410 {
411 PublishTyped(data, NowTimestamp(), false, false);
412 }
413
421 template <typename Data>
422 void Publish(Data& data, MicrosecondTimestamp timestamp)
423 {
424 PublishTyped(data, timestamp, false, false);
425 }
426
434 template <typename Data>
435 void PublishFromCallback(Data& data, bool in_isr)
436 {
437 PublishTyped(data, NowTimestamp(), true, in_isr);
438 }
439
448 template <typename Data>
449 void PublishFromCallback(Data& data, MicrosecondTimestamp timestamp, bool in_isr)
450 {
451 PublishTyped(data, timestamp, true, in_isr);
452 }
453
461 void PublishBytesFromServer(void* payload_addr, size_t payload_size,
462 MicrosecondTimestamp timestamp)
463 {
464 PublishServerBytes(payload_addr, payload_size, timestamp, false, false);
465 }
466
476 void PublishBytesFromServerCallback(void* payload_addr, size_t payload_size,
477 MicrosecondTimestamp timestamp, bool in_isr)
478 {
479 PublishServerBytes(payload_addr, payload_size, timestamp, true, in_isr);
480 }
481
490 template <typename Data>
491 ErrorCode PackData(const Data& data, PackedData<Data>& packet)
492 {
493 return PackData(data, packet, NowTimestamp());
494 }
495
505 template <typename Data>
506 ErrorCode PackData(const Data& data, PackedData<Data>& packet,
507 MicrosecondTimestamp timestamp);
508
516 static TopicHandle WaitTopic(const char* name, uint32_t timeout = UINT32_MAX,
517 Domain* domain = nullptr);
518
524 operator TopicHandle() { return block_; }
525
530 uint32_t GetKey() const;
531
532 private:
533 TopicHandle block_ = nullptr;
534
535 static inline RBTree<uint32_t>* domain_ = nullptr;
536 static inline Domain* def_domain_ = nullptr;
537
541 static void EnsureDomainRegistry();
542
547 static Domain* EnsureDefaultDomain();
548
556 static void CheckServerPublishContract(TopicHandle topic, void* payload_addr,
557 size_t payload_size)
558 {
559 ASSERT(topic != nullptr);
560 ASSERT(payload_addr != nullptr);
561 ASSERT(payload_size == topic->data_.payload_size);
562 ASSERT(topic->data_.payload_alignment != 0);
563 ASSERT(reinterpret_cast<uintptr_t>(payload_addr) % topic->data_.payload_alignment ==
564 0);
565 }
566
573 template <typename Data>
574 static void CheckSubscriberType(Topic topic)
575 {
577 ASSERT(topic.block_ != nullptr);
578 ASSERT(topic.block_->data_.payload_type_id == TypeID::GetID<Data>());
579 ASSERT(topic.block_->data_.payload_size == sizeof(Data));
580 ASSERT(topic.block_->data_.payload_alignment == alignof(Data));
581 }
582
589 template <typename Data>
591 {
593 return new Data;
594 }
595
604 template <typename Data>
605 static void CopyPayload(void* dst, void* payload_addr)
606 {
608 ASSERT(dst != nullptr);
609 ASSERT(payload_addr != nullptr);
610 *reinterpret_cast<Data*>(dst) = *reinterpret_cast<Data*>(payload_addr);
611 }
612
623 template <typename Data>
624 void PublishTyped(Data& data, MicrosecondTimestamp timestamp, bool from_callback,
625 bool in_isr)
626 {
628
629 if (from_callback)
630 {
632 }
633 else
634 {
635 Lock(block_);
636 }
637
638 CheckPublishContract(block_, TypeID::GetID<Data>(), sizeof(Data), alignof(Data));
639 DispatchSubscribers(block_, timestamp, &data, from_callback, in_isr);
640
641 if (from_callback)
642 {
644 }
645 else
646 {
647 Unlock(block_);
648 }
649 }
650
662 static void CheckPublishContract(TopicHandle topic, TypeID::ID payload_type_id,
663 size_t payload_size, size_t payload_alignment);
664
673 static void PackBytes(uint32_t topic_name_crc32, RawData buffer,
674 MicrosecondTimestamp timestamp, ConstRawData data);
675
686 static void DispatchSubscriber(SuberBlock& block, MicrosecondTimestamp timestamp,
687 void* payload_addr, bool from_callback, bool in_isr);
688
700 static void DispatchSubscribers(TopicHandle topic, MicrosecondTimestamp timestamp,
701 void* payload_addr, bool from_callback, bool in_isr);
702
712 void PublishServerBytes(void* payload_addr, size_t payload_size,
713 MicrosecondTimestamp timestamp, bool from_callback,
714 bool in_isr)
715 {
716 CheckServerPublishContract(block_, payload_addr, payload_size);
717
718 if (from_callback)
719 {
721 }
722 else
723 {
724 Lock(block_);
725 }
726
727 DispatchSubscribers(block_, timestamp, payload_addr, from_callback, in_isr);
728
729 if (from_callback)
730 {
732 }
733 else
734 {
735 Unlock(block_);
736 }
737 }
738};
739} // namespace LibXR
740
741#include "packet/packet.hpp"
742#include "server/server.hpp"
743#include "subscriber/async.hpp"
744#include "subscriber/callback.hpp"
745#include "subscriber/queue.hpp"
746#include "subscriber/sync.hpp"
只读原始数据视图 / Immutable raw data view
链表实现,用于存储和管理数据节点。 A linked list implementation for storing and managing data nodes.
微秒时间戳 / Microsecond timestamp
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
Definition mutex.hpp:18
红黑树的泛型数据节点,继承自 BaseNode (Generic data node for Red-Black Tree, inheriting from BaseNode).
Definition rbt.hpp:63
Data data_
存储的数据 (Stored data).
Definition rbt.hpp:98
红黑树实现,支持泛型键和值,并提供线程安全操作 (Red-Black Tree implementation supporting generic keys and values with thread...
Definition rbt.hpp:23
可写原始数据视图 / Mutable raw data view
先 StartWaiting(),再自己来取数据的订阅者 / Subscriber that first calls StartWaiting() and later pulls the data it...
Definition topic.hpp:247
每次发布时直接执行函数的订阅句柄 / Subscription handle that runs a function on each publish
Definition callback.hpp:13
topic 所属的命名域 / Naming domain that groups topics
Definition topic.hpp:179
Domain(const char *name)
构造一个 topic 域 / Construct one topic domain
Definition topic.cpp:92
RBTree< uint32_t >::Node< RBTree< uint32_t > > * node_
该域在全局域表里的节点。This domain's node inside the global domain tree.
Definition topic.hpp:187
每次发布都往队列里塞一份数据的订阅者 / Subscriber that pushes one entry into a queue on each publish
Definition queue.hpp:24
将字节流解析成 packet 并发布到已注册 topic 的状态机 / State machine that parses byte streams into packets and publishes...
Definition server.hpp:29
调用 Wait() 收消息的订阅者 / Subscriber that receives messages by calling Wait()
Definition topic.hpp:225
发布订阅主题 / Publish-subscribe topic
Definition topic.hpp:57
SuberType
topic 支持的订阅者种类 / Subscriber kinds supported by a topic
Definition topic.hpp:195
@ SYNC
同步等待型订阅者。Synchronous wait-based subscriber.
@ ASYNC
异步本地缓冲型订阅者。Asynchronous local-buffer subscriber.
@ QUEUE
队列转发型订阅者。Queue-forwarding subscriber.
@ CALLBACK
回调执行型订阅者。Callback-executing subscriber.
static void PackBytes(uint32_t topic_name_crc32, RawData buffer, MicrosecondTimestamp timestamp, ConstRawData data)
将一段 payload 字节和 topic 元数据拼成 packet / Pack one payload byte range together with topic metadata into on...
Definition packet.cpp:42
void Publish(Data &data)
在普通上下文里发布一条消息,并自动取当前时间戳 / Publish one message in normal context and stamp it with the current time
Definition topic.hpp:409
void PublishFromCallback(Data &data, MicrosecondTimestamp timestamp, bool in_isr)
在回调或 ISR 路径里按指定时间戳发布一条消息 / Publish one message from callback or ISR context with an explicit timestam...
Definition topic.hpp:449
static Domain * def_domain_
缺省 topic 域。Default topic domain.
Definition topic.hpp:536
static void Unlock(TopicHandle topic)
在普通上下文里释放一个 topic 发布路径 / Unlock one topic publish path in normal context
Definition topic.cpp:50
static TopicHandle FindOrCreate(const char *name, Domain *domain=nullptr, bool multi_publisher=false)
按精确类型查找或创建一个 topic / Find or create one topic with an exact payload type
Definition topic.hpp:361
static void * AllocateSubscriberBuffer()
为订阅者分配一个长期存在的本地接收对象 / Allocate one long-lived local receive object for a subscriber
Definition topic.hpp:590
void PublishServerBytes(void *payload_addr, size_t payload_size, MicrosecondTimestamp timestamp, bool from_callback, bool in_isr)
PublishBytesFromServer*() 的共享实现 / Shared implementation behind PublishBytesFromServer*()
Definition topic.hpp:712
RBTree< uint32_t >::Node< Block > * TopicHandle
指向一个 topic 运行时状态块的句柄 / Handle pointing to one topic runtime state block
Definition topic.hpp:116
size_t PayloadSize() const
获取该 topic 固定 payload 字节数 / Get the fixed payload size of this topic
Definition topic.hpp:385
ASyncSubscriberState
异步订阅者本地缓冲区的状态 / State of the async subscriber's local buffer
Definition async.hpp:12
void PublishTyped(Data &data, MicrosecondTimestamp timestamp, bool from_callback, bool in_isr)
强类型发布入口的共享实现 / Shared implementation of typed publish entry points
Definition topic.hpp:624
static Domain * EnsureDefaultDomain()
确保默认域已创建 / Ensure the default domain exists
Definition topic.cpp:22
static MicrosecondTimestamp NowTimestamp()
读取当前时间戳 / Read the current timestamp
Definition publish.cpp:80
static Topic CreateTopic(const char *name, Domain *domain=nullptr, bool multi_publisher=false)
用精确类型创建或查找一个 topic / Create or look up one topic using one exact payload type
Definition topic.hpp:327
static void CopyPayload(void *dst, void *payload_addr)
按精确类型把一份 payload 拷到订阅者缓冲区 / Copy one payload into a subscriber buffer using the exact type
Definition topic.hpp:605
static void CheckSubscriberType(Topic topic)
断言订阅者看到的精确 payload 类型与 topic 契约一致 / Assert that the exact payload type seen by a subscriber matches t...
Definition topic.hpp:574
static void Lock(TopicHandle topic)
在普通上下文里锁住一个 topic 发布路径 / Lock one topic publish path in normal context
Definition topic.cpp:32
static void UnlockFromCallback(TopicHandle topic)
在回调或 ISR 路径里释放一个 topic 发布路径 / Unlock one topic publish path from callback or ISR context
Definition topic.cpp:80
static void LockFromCallback(TopicHandle topic)
在回调或 ISR 路径里锁住一个 topic 发布路径 / Lock one topic publish path from callback or ISR context
Definition topic.cpp:62
void PublishBytesFromServerCallback(void *payload_addr, size_t payload_size, MicrosecondTimestamp timestamp, bool in_isr)
供回调/ISR 上下文里的 packet/server 路径按字节发布一条消息 / Publish one packet/server message from callback/ISR context...
Definition topic.hpp:476
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待指定名称的 topic 出现 / Wait until a topic with the given name exists
Definition topic.cpp:191
size_t PayloadAlignment() const
获取该 topic payload 的对齐要求 / Get the payload alignment requirement of this topic
Definition topic.hpp:396
static void CheckServerPublishContract(TopicHandle topic, void *payload_addr, size_t payload_size)
校验 server 侧字节发布前提 / Check the preconditions of one server-side byte publish
Definition topic.hpp:556
static void CheckPublishContract(TopicHandle topic, TypeID::ID payload_type_id, size_t payload_size, size_t payload_alignment)
校验一次强类型发布的运行时契约 / Check the runtime contract of one typed publish
Definition publish.cpp:86
void PublishFromCallback(Data &data, bool in_isr)
在回调或 ISR 路径里发布一条消息,并自动取当前时间戳 / Publish one message from callback or ISR context and stamp it with the...
Definition topic.hpp:435
static void DispatchSubscriber(SuberBlock &block, MicrosecondTimestamp timestamp, void *payload_addr, bool from_callback, bool in_isr)
将一条消息分发给一个订阅块 / Dispatch one message to one subscriber block
Definition publish.cpp:12
uint32_t GetKey() const
读取 topic 键值 / Read the key value of this topic
Definition topic.cpp:212
TopicHandle block_
当前 topic 视图绑定的状态块。Runtime state block bound to the current topic view.
Definition topic.hpp:533
LockState
topic 发布路径的内部锁状态 / Internal lock state of the topic publish path
Definition topic.hpp:63
@ UNLOCKED
当前未持有发布锁。The publish path is currently unlocked.
@ USE_MUTEX
当前主题改走互斥量串行化。This topic currently serializes publishers through a mutex.
@ LOCKED
当前通过原子快路径持有发布权。The publish path is locked through the atomic fast path.
void RegisterCallback(Callback &cb)
注册一个回调订阅者 / Register one callback subscriber
Definition callback.hpp:506
void Publish(Data &data, MicrosecondTimestamp timestamp)
在普通上下文里按指定时间戳发布一条消息 / Publish one message in normal context with an explicit timestamp
Definition topic.hpp:422
static TopicHandle Find(const char *name, Domain *domain=nullptr)
按名称查找一个已存在 topic / Find one existing topic by name
Definition topic.cpp:173
static RBTree< uint32_t > * domain_
全局 topic 域注册表。Global registry of topic domains.
Definition topic.hpp:535
static void DispatchSubscribers(TopicHandle topic, MicrosecondTimestamp timestamp, void *payload_addr, bool from_callback, bool in_isr)
将一条消息分发给一个 topic 上的全部订阅者 / Dispatch one message to all subscribers attached to one topic
Definition publish.cpp:69
Topic()
构造一个空 topic 视图 / Construct one empty topic view
Definition topic.cpp:115
void PublishBytesFromServer(void *payload_addr, size_t payload_size, MicrosecondTimestamp timestamp)
供 packet/server 路径按字节发布一条消息 / Publish one message from the packet/server path using bytes already arr...
Definition topic.hpp:461
ErrorCode PackData(const Data &data, PackedData< Data > &packet)
将一个精确类型消息打包成 packet / Pack one exact-typed message into one packet using the topic's runtime contract...
Definition topic.hpp:491
static void EnsureDomainRegistry()
确保全局域注册表已创建 / Ensure the global domain registry exists
Definition topic.cpp:13
static ID GetID()
获取类型的唯一标识符 / Get a unique identifier for type T
topic 可承载 payload 的类型约束 / Type constraint for payloads carried by one topic
Definition topic.hpp:28
LibXR 命名空间
Definition ch32_can.hpp:14
ErrorCode
定义错误码枚举
constexpr void CheckTopicPayload()
在模板上下文里断言 payload 类型满足 topic 契约 / Assert in template context that one payload type satisfies the topi...
Definition topic.hpp:39
异步订阅者自己挂的数据块 / Data block owned by one asynchronous subscriber
Definition async.hpp:23
topic 运行时状态块 / Runtime state block of one topic
Definition topic.hpp:81
std::atomic< LockState > busy
发布路径串行化状态。Publish-path serialization state.
Definition topic.hpp:82
Mutex * mutex
多发布者主题使用的互斥量。Mutex used by multi-publisher topics.
Definition topic.hpp:88
TypeID::ID payload_type_id
精确 payload 类型标识。Exact payload type identifier.
Definition topic.hpp:84
uint32_t payload_alignment
该 topic payload 所需对齐。Required payload alignment of this topic.
Definition topic.hpp:86
uint32_t payload_size
该 topic 固定 payload 字节数。Fixed payload size in bytes of this topic.
Definition topic.hpp:85
uint32_t crc32
主题名 CRC32 键。CRC32 key of the topic name.
Definition topic.hpp:87
LockFreeList subers
已挂接订阅者链表。List of attached subscribers.
Definition topic.hpp:83
挂在 topic 订阅链表里的回调记录 / Callback record stored in the topic subscriber list
Definition callback.hpp:486
带时间戳和 payload 副本的消息对象 / Message object carrying a timestamp and a payload copy
Definition topic.hpp:140
Data data
payload 对象副本。Copied payload object.
Definition topic.hpp:143
MicrosecondTimestamp timestamp
消息时间戳。Message timestamp.
Definition topic.hpp:142
带时间戳和 payload 指针的只读消息视图 / Read-only message view carrying a timestamp and a payload pointer
Definition topic.hpp:126
Data * data
指向本次发布 payload 对象的指针。Pointer to the payload object of this publish.
Definition topic.hpp:129
MicrosecondTimestamp timestamp
消息时间戳。Message timestamp.
Definition topic.hpp:128
队列订阅者自己挂的数据块 / Data block owned by one queued subscriber
Definition queue.hpp:12
所有订阅块共用的公共头 / Common header shared by all subscriber blocks
Definition topic.hpp:207
SuberType type
订阅块的具体种类。Concrete kind of this subscriber block.
Definition topic.hpp:208
同步订阅者自己挂的数据块 / Data block owned by one synchronous subscriber
Definition sync.hpp:12