10#include "libxr_def.hpp"
11#include "libxr_time.hpp"
12#include "libxr_type.hpp"
13#include "lockfree_list.hpp"
14#include "lockfree_queue.hpp"
17#include "semaphore.hpp"
27template <
typename Data>
29 !std::is_reference_v<Data> && !std::is_const_v<Data> && !std::is_volatile_v<Data> &&
30 std::is_object_v<Data> && std::is_default_constructible_v<Data> &&
31 std::is_copy_assignable_v<Data> && std::is_trivially_destructible_v<Data>;
38template <
typename Data>
42 "LibXR::Topic typed payload must be a non-cv/ref object type that is "
43 "default-constructible, copy-assignable, and trivially destructible.");
82 std::atomic<LockState>
busy;
96 struct PackedDataHeader;
104 template <
typename Data>
106 static constexpr uint8_t PACKET_PREFIX = 0x5A;
107 static constexpr uint8_t PACKET_VERSION = 0x01;
108 static constexpr size_t PACK_BASE_SIZE = 17;
124 template <
typename Data>
138 template <
typename Data>
224 template <
typename Data>
246 template <
typename Data>
312 Topic(
const char* name, TypeID::ID payload_type_id,
size_t payload_size,
313 size_t payload_alignment,
Domain* domain =
nullptr,
314 bool multi_publisher =
false);
326 template <
typename Data>
328 bool multi_publisher =
false)
360 template <
typename Data>
362 bool multi_publisher =
false)
365 auto topic =
Find(name, domain);
366 if (topic !=
nullptr)
369 if (multi_publisher && !topic->data_.mutex)
387 ASSERT(
block_ !=
nullptr);
398 ASSERT(
block_ !=
nullptr);
408 template <
typename Data>
421 template <
typename Data>
434 template <
typename Data>
448 template <
typename Data>
490 template <
typename Data>
505 template <
typename Data>
517 Domain* domain =
nullptr);
559 ASSERT(topic !=
nullptr);
560 ASSERT(payload_addr !=
nullptr);
561 ASSERT(payload_size == topic->
data_.payload_size);
562 ASSERT(topic->
data_.payload_alignment != 0);
563 ASSERT(
reinterpret_cast<uintptr_t
>(payload_addr) % topic->
data_.payload_alignment ==
573 template <
typename Data>
577 ASSERT(topic.
block_ !=
nullptr);
579 ASSERT(topic.
block_->
data_.payload_size ==
sizeof(Data));
580 ASSERT(topic.
block_->
data_.payload_alignment ==
alignof(Data));
589 template <
typename Data>
604 template <
typename Data>
608 ASSERT(dst !=
nullptr);
609 ASSERT(payload_addr !=
nullptr);
610 *
reinterpret_cast<Data*
>(dst) = *
reinterpret_cast<Data*
>(payload_addr);
623 template <
typename Data>
663 size_t payload_size,
size_t payload_alignment);
687 void* payload_addr,
bool from_callback,
bool in_isr);
701 void* payload_addr,
bool from_callback,
bool in_isr);
741#include "packet/packet.hpp"
742#include "server/server.hpp"
743#include "subscriber/async.hpp"
744#include "subscriber/callback.hpp"
745#include "subscriber/queue.hpp"
746#include "subscriber/sync.hpp"
只读原始数据视图 / Immutable raw data view
链表实现,用于存储和管理数据节点。 A linked list implementation for storing and managing data nodes.
微秒时间戳 / Microsecond timestamp
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
红黑树的泛型数据节点,继承自 BaseNode (Generic data node for Red-Black Tree, inheriting from BaseNode).
Data data_
存储的数据 (Stored data).
红黑树实现,支持泛型键和值,并提供线程安全操作 (Red-Black Tree implementation supporting generic keys and values with thread...
可写原始数据视图 / Mutable raw data view
先 StartWaiting(),再自己来取数据的订阅者 / Subscriber that first calls StartWaiting() and later pulls the data it...
每次发布时直接执行函数的订阅句柄 / Subscription handle that runs a function on each publish
topic 所属的命名域 / Naming domain that groups topics
Domain(const char *name)
构造一个 topic 域 / Construct one topic domain
RBTree< uint32_t >::Node< RBTree< uint32_t > > * node_
该域在全局域表里的节点。This domain's node inside the global domain tree.
每次发布都往队列里塞一份数据的订阅者 / Subscriber that pushes one entry into a queue on each publish
将字节流解析成 packet 并发布到已注册 topic 的状态机 / State machine that parses byte streams into packets and publishes...
调用 Wait() 收消息的订阅者 / Subscriber that receives messages by calling Wait()
发布订阅主题 / Publish-subscribe topic
SuberType
topic 支持的订阅者种类 / Subscriber kinds supported by a topic
@ SYNC
同步等待型订阅者。Synchronous wait-based subscriber.
@ ASYNC
异步本地缓冲型订阅者。Asynchronous local-buffer subscriber.
@ QUEUE
队列转发型订阅者。Queue-forwarding subscriber.
@ CALLBACK
回调执行型订阅者。Callback-executing subscriber.
static void PackBytes(uint32_t topic_name_crc32, RawData buffer, MicrosecondTimestamp timestamp, ConstRawData data)
将一段 payload 字节和 topic 元数据拼成 packet / Pack one payload byte range together with topic metadata into on...
void Publish(Data &data)
在普通上下文里发布一条消息,并自动取当前时间戳 / Publish one message in normal context and stamp it with the current time
void PublishFromCallback(Data &data, MicrosecondTimestamp timestamp, bool in_isr)
在回调或 ISR 路径里按指定时间戳发布一条消息 / Publish one message from callback or ISR context with an explicit timestam...
static Domain * def_domain_
缺省 topic 域。Default topic domain.
static void Unlock(TopicHandle topic)
在普通上下文里释放一个 topic 发布路径 / Unlock one topic publish path in normal context
static TopicHandle FindOrCreate(const char *name, Domain *domain=nullptr, bool multi_publisher=false)
按精确类型查找或创建一个 topic / Find or create one topic with an exact payload type
static void * AllocateSubscriberBuffer()
为订阅者分配一个长期存在的本地接收对象 / Allocate one long-lived local receive object for a subscriber
void PublishServerBytes(void *payload_addr, size_t payload_size, MicrosecondTimestamp timestamp, bool from_callback, bool in_isr)
PublishBytesFromServer*() 的共享实现 / Shared implementation behind PublishBytesFromServer*()
RBTree< uint32_t >::Node< Block > * TopicHandle
指向一个 topic 运行时状态块的句柄 / Handle pointing to one topic runtime state block
size_t PayloadSize() const
获取该 topic 固定 payload 字节数 / Get the fixed payload size of this topic
ASyncSubscriberState
异步订阅者本地缓冲区的状态 / State of the async subscriber's local buffer
void PublishTyped(Data &data, MicrosecondTimestamp timestamp, bool from_callback, bool in_isr)
强类型发布入口的共享实现 / Shared implementation of typed publish entry points
static Domain * EnsureDefaultDomain()
确保默认域已创建 / Ensure the default domain exists
static MicrosecondTimestamp NowTimestamp()
读取当前时间戳 / Read the current timestamp
static Topic CreateTopic(const char *name, Domain *domain=nullptr, bool multi_publisher=false)
用精确类型创建或查找一个 topic / Create or look up one topic using one exact payload type
static void CopyPayload(void *dst, void *payload_addr)
按精确类型把一份 payload 拷到订阅者缓冲区 / Copy one payload into a subscriber buffer using the exact type
static void CheckSubscriberType(Topic topic)
断言订阅者看到的精确 payload 类型与 topic 契约一致 / Assert that the exact payload type seen by a subscriber matches t...
static void Lock(TopicHandle topic)
在普通上下文里锁住一个 topic 发布路径 / Lock one topic publish path in normal context
static void UnlockFromCallback(TopicHandle topic)
在回调或 ISR 路径里释放一个 topic 发布路径 / Unlock one topic publish path from callback or ISR context
static void LockFromCallback(TopicHandle topic)
在回调或 ISR 路径里锁住一个 topic 发布路径 / Lock one topic publish path from callback or ISR context
void PublishBytesFromServerCallback(void *payload_addr, size_t payload_size, MicrosecondTimestamp timestamp, bool in_isr)
供回调/ISR 上下文里的 packet/server 路径按字节发布一条消息 / Publish one packet/server message from callback/ISR context...
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待指定名称的 topic 出现 / Wait until a topic with the given name exists
size_t PayloadAlignment() const
获取该 topic payload 的对齐要求 / Get the payload alignment requirement of this topic
static void CheckServerPublishContract(TopicHandle topic, void *payload_addr, size_t payload_size)
校验 server 侧字节发布前提 / Check the preconditions of one server-side byte publish
static void CheckPublishContract(TopicHandle topic, TypeID::ID payload_type_id, size_t payload_size, size_t payload_alignment)
校验一次强类型发布的运行时契约 / Check the runtime contract of one typed publish
void PublishFromCallback(Data &data, bool in_isr)
在回调或 ISR 路径里发布一条消息,并自动取当前时间戳 / Publish one message from callback or ISR context and stamp it with the...
static void DispatchSubscriber(SuberBlock &block, MicrosecondTimestamp timestamp, void *payload_addr, bool from_callback, bool in_isr)
将一条消息分发给一个订阅块 / Dispatch one message to one subscriber block
uint32_t GetKey() const
读取 topic 键值 / Read the key value of this topic
TopicHandle block_
当前 topic 视图绑定的状态块。Runtime state block bound to the current topic view.
LockState
topic 发布路径的内部锁状态 / Internal lock state of the topic publish path
@ UNLOCKED
当前未持有发布锁。The publish path is currently unlocked.
@ USE_MUTEX
当前主题改走互斥量串行化。This topic currently serializes publishers through a mutex.
@ LOCKED
当前通过原子快路径持有发布权。The publish path is locked through the atomic fast path.
void RegisterCallback(Callback &cb)
注册一个回调订阅者 / Register one callback subscriber
void Publish(Data &data, MicrosecondTimestamp timestamp)
在普通上下文里按指定时间戳发布一条消息 / Publish one message in normal context with an explicit timestamp
static TopicHandle Find(const char *name, Domain *domain=nullptr)
按名称查找一个已存在 topic / Find one existing topic by name
static RBTree< uint32_t > * domain_
全局 topic 域注册表。Global registry of topic domains.
static void DispatchSubscribers(TopicHandle topic, MicrosecondTimestamp timestamp, void *payload_addr, bool from_callback, bool in_isr)
将一条消息分发给一个 topic 上的全部订阅者 / Dispatch one message to all subscribers attached to one topic
Topic()
构造一个空 topic 视图 / Construct one empty topic view
void PublishBytesFromServer(void *payload_addr, size_t payload_size, MicrosecondTimestamp timestamp)
供 packet/server 路径按字节发布一条消息 / Publish one message from the packet/server path using bytes already arr...
ErrorCode PackData(const Data &data, PackedData< Data > &packet)
将一个精确类型消息打包成 packet / Pack one exact-typed message into one packet using the topic's runtime contract...
static void EnsureDomainRegistry()
确保全局域注册表已创建 / Ensure the global domain registry exists
static ID GetID()
获取类型的唯一标识符 / Get a unique identifier for type T
topic 可承载 payload 的类型约束 / Type constraint for payloads carried by one topic
constexpr void CheckTopicPayload()
在模板上下文里断言 payload 类型满足 topic 契约 / Assert in template context that one payload type satisfies the topi...
异步订阅者自己挂的数据块 / Data block owned by one asynchronous subscriber
topic 运行时状态块 / Runtime state block of one topic
std::atomic< LockState > busy
发布路径串行化状态。Publish-path serialization state.
Mutex * mutex
多发布者主题使用的互斥量。Mutex used by multi-publisher topics.
TypeID::ID payload_type_id
精确 payload 类型标识。Exact payload type identifier.
uint32_t payload_alignment
该 topic payload 所需对齐。Required payload alignment of this topic.
uint32_t payload_size
该 topic 固定 payload 字节数。Fixed payload size in bytes of this topic.
uint32_t crc32
主题名 CRC32 键。CRC32 key of the topic name.
LockFreeList subers
已挂接订阅者链表。List of attached subscribers.
挂在 topic 订阅链表里的回调记录 / Callback record stored in the topic subscriber list
带时间戳和 payload 副本的消息对象 / Message object carrying a timestamp and a payload copy
Data data
payload 对象副本。Copied payload object.
MicrosecondTimestamp timestamp
消息时间戳。Message timestamp.
带时间戳和 payload 指针的只读消息视图 / Read-only message view carrying a timestamp and a payload pointer
Data * data
指向本次发布 payload 对象的指针。Pointer to the payload object of this publish.
MicrosecondTimestamp timestamp
消息时间戳。Message timestamp.
队列订阅者自己挂的数据块 / Data block owned by one queued subscriber
所有订阅块共用的公共头 / Common header shared by all subscriber blocks
SuberType type
订阅块的具体种类。Concrete kind of this subscriber block.
同步订阅者自己挂的数据块 / Data block owned by one synchronous subscriber