libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
message.hpp
1#pragma once
2
3#include <atomic>
4#include <concepts>
5#include <cstddef>
6#include <cstdint>
7#include <tuple>
8#include <type_traits>
9#include <utility>
10
11#include "crc.hpp"
12#include "libxr_cb.hpp"
13#include "libxr_def.hpp"
14#include "libxr_mem.hpp"
15#include "libxr_time.hpp"
16#include "libxr_type.hpp"
17#include "lock_queue.hpp"
18#include "lockfree_list.hpp"
19#include "lockfree_queue.hpp"
20#include "mutex.hpp"
21#include "queue.hpp"
22#include "rbt.hpp"
23#include "semaphore.hpp"
24#include "thread.hpp"
25
26namespace LibXR
27{
28template <typename Data>
29concept TopicPayload =
30 !std::is_reference_v<Data> && !std::is_const_v<Data> && !std::is_volatile_v<Data> &&
31 std::is_object_v<Data> && std::is_default_constructible_v<Data> &&
32 std::is_copy_assignable_v<Data> && std::is_trivially_destructible_v<Data>;
33
34template <typename Data>
35constexpr void CheckTopicPayload()
36{
37 static_assert(TopicPayload<Data>,
38 "LibXR::Topic typed payload must be a non-cv/ref object type that is "
39 "default-constructible, copy-assignable, and trivially destructible.");
40}
41
60class Topic
61{
62 enum class LockState : uint32_t
63 {
64 UNLOCKED = 0,
65 LOCKED = 1,
66 USE_MUTEX = UINT32_MAX,
67 };
68
69 public:
79 struct Block
80 {
81 std::atomic<LockState> busy;
83 uint32_t max_length;
84 uint32_t crc32;
88 bool cache;
90 };
91
92#ifndef __DOXYGEN__
93#pragma pack(push, 1)
98 struct PackedDataHeader
99 {
100 uint8_t prefix;
101 uint8_t data_len_raw[3];
102 uint32_t
103 topic_name_crc32;
104 uint8_t timestamp_us_raw[8];
105 uint8_t pack_header_crc8;
106
107 void SetDataLen(uint32_t len);
108
109 uint32_t GetDataLen() const;
110
111 void SetTimestamp(MicrosecondTimestamp timestamp);
112
113 MicrosecondTimestamp GetTimestamp() const;
114 };
115#pragma pack(pop)
116
117 template <typename Data>
118 class PackedData;
119
120 static constexpr uint8_t PACKET_PREFIX = 0x5A;
121 static constexpr size_t PACK_BASE_SIZE = sizeof(PackedDataHeader) + sizeof(uint8_t);
122 static_assert(sizeof(PackedDataHeader) == 17);
123 static_assert(offsetof(PackedDataHeader, prefix) == 0);
124 static_assert(offsetof(PackedDataHeader, data_len_raw) == 1);
125 static_assert(offsetof(PackedDataHeader, topic_name_crc32) == 4);
126 static_assert(offsetof(PackedDataHeader, timestamp_us_raw) == 8);
127 static_assert(offsetof(PackedDataHeader, pack_header_crc8) == 16);
128#endif
129
136
141 template <typename Data>
143 {
144 static_assert(TopicPayload<Data>);
145
147 Data& data;
148 };
149
154 template <typename Data>
155 struct Message
156 {
157 static_assert(TopicPayload<Data>);
158
160 Data data;
161 };
162
169 static void Lock(TopicHandle topic);
170
176 static void Unlock(TopicHandle topic);
177
184 static void LockFromCallback(TopicHandle topic);
185
191 static void UnlockFromCallback(TopicHandle topic);
192
198 class Domain
199 {
200 public:
210 Domain(const char* name);
211
216 };
217
218 enum class SuberType : uint8_t;
219 struct SuberBlock;
220 struct SyncBlock;
221 template <typename Data>
222 class SyncSubscriber;
223 enum class ASyncSubscriberState : uint32_t;
224 struct ASyncBlock;
225 template <typename Data>
226 class ASyncSubscriber;
227 struct QueueBlock;
228 class QueuedSubscriber;
229 class Callback;
230 struct CallbackBlock;
231
244 void RegisterCallback(Callback& cb);
245
251
256 Topic();
257
281 Topic(const char* name, uint32_t max_length, Domain* domain = nullptr,
282 bool multi_publisher = false, bool cache = false, bool check_length = false);
283
302 template <typename Data>
303 static Topic CreateTopic(const char* name, Domain* domain = nullptr,
304 bool multi_publisher = false, bool cache = false,
305 bool check_length = false)
306 {
307 CheckTopicPayload<Data>();
308 return Topic(name, sizeof(Data), domain, multi_publisher, cache, check_length);
309 }
310
316 Topic(TopicHandle topic);
317
330 static TopicHandle Find(const char* name, Domain* domain = nullptr);
331
349 template <typename Data>
350 static TopicHandle FindOrCreate(const char* name, Domain* domain = nullptr,
351 bool multi_publisher = false, bool cache = false,
352 bool check_length = false)
353 {
354 CheckTopicPayload<Data>();
355 auto topic = Find(name, domain);
356 if (topic == nullptr)
357 {
358 topic =
359 CreateTopic<Data>(name, domain, multi_publisher, cache, check_length).block_;
360 }
361 return topic;
362 }
363
372 void EnableCache();
373
380 template <typename Data>
381 void Publish(Data& data)
382 {
383 CheckTopicPayload<Data>();
384 Publish(static_cast<void*>(&data), static_cast<uint32_t>(sizeof(Data)));
385 }
386
387 template <typename Data>
388 void Publish(Data& data, MicrosecondTimestamp timestamp)
389 {
390 CheckTopicPayload<Data>();
391 Publish(static_cast<void*>(&data), static_cast<uint32_t>(sizeof(Data)), timestamp);
392 }
393
400 void Publish(void* addr, uint32_t size);
401
402 void Publish(void* addr, uint32_t size, MicrosecondTimestamp timestamp);
403
411 template <typename Data>
412 void PublishFromCallback(Data& data, bool in_isr)
413 {
414 CheckTopicPayload<Data>();
415 PublishFromCallback(static_cast<void*>(&data), static_cast<uint32_t>(sizeof(Data)),
416 in_isr);
417 }
418
419 template <typename Data>
420 void PublishFromCallback(Data& data, MicrosecondTimestamp timestamp, bool in_isr)
421 {
422 CheckTopicPayload<Data>();
423 PublishFromCallback(static_cast<void*>(&data), static_cast<uint32_t>(sizeof(Data)),
424 timestamp, in_isr);
425 }
426
441 void PublishFromCallback(void* addr, uint32_t size, bool in_isr);
442 void PublishFromCallback(void* addr, uint32_t size, MicrosecondTimestamp timestamp,
443 bool in_isr);
444
453 static void PackData(uint32_t topic_name_crc32, RawData buffer,
454 MicrosecondTimestamp timestamp, ConstRawData data);
455
462 template <typename Data>
463 ErrorCode DumpData(PackedData<Data>& data);
464
468 template <SizeLimitMode Mode = SizeLimitMode::MORE>
470 {
471 MicrosecondTimestamp timestamp;
472 return DumpPayload<Mode>(data, timestamp);
473 }
474
475 template <SizeLimitMode Mode = SizeLimitMode::MORE>
477 {
478 return DumpPayload<Mode>(data, timestamp);
479 }
480
493 template <typename Data>
494 requires(!std::same_as<std::remove_cv_t<Data>, RawData>)
495 ErrorCode DumpData(Data& data)
496 {
497 MicrosecondTimestamp timestamp;
498 return DumpData(data, timestamp);
499 }
500
501 template <typename Data>
502 requires(!std::same_as<std::remove_cv_t<Data>, RawData>)
503 ErrorCode DumpData(Data& data, MicrosecondTimestamp& timestamp)
504 {
505 CheckTopicPayload<Data>();
506 return DumpPayload<SizeLimitMode::LESS>(RawData(data), timestamp);
507 }
508
509 MicrosecondTimestamp GetTimestamp() const;
510
522 static TopicHandle WaitTopic(const char* name, uint32_t timeout = UINT32_MAX,
523 Domain* domain = nullptr);
524
530 operator TopicHandle() { return block_; }
531
538 uint32_t GetKey() const;
539
540 class Server;
541
542 private:
548
557 static inline RBTree<uint32_t>* domain_ = nullptr;
558
563 static inline Domain* def_domain_ = nullptr;
564
565 static void EnsureDomainRegistry();
566 static Domain* EnsureDefaultDomain();
567
572 template <typename Data>
574 {
575 CheckTopicPayload<Data>();
576 if (topic.block_->data_.check_length)
577 {
578 ASSERT(topic.block_->data_.max_length == sizeof(Data));
579 }
580 else
581 {
582 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
583 }
584 }
585
590 template <typename Data>
592 {
593 CheckTopicPayload<Data>();
594 auto* data = new Data;
595 return RawData(*data);
596 }
597
598 template <SizeLimitMode Mode = SizeLimitMode::MORE>
599 ErrorCode DumpPayload(RawData buffer, MicrosecondTimestamp& timestamp)
600 {
601 if (block_->data_.data.addr_ == nullptr)
602 {
603 return ErrorCode::EMPTY;
604 }
605
606 const size_t payload_size = block_->data_.data.size_;
607 size_t copy_size = payload_size;
608
609 if constexpr (Mode == SizeLimitMode::EQUAL)
610 {
611 if (payload_size != buffer.size_)
612 {
613 return ErrorCode::SIZE_ERR;
614 }
615 }
616 else if constexpr (Mode == SizeLimitMode::LESS)
617 {
618 if (payload_size < buffer.size_)
619 {
620 return ErrorCode::SIZE_ERR;
621 }
622 copy_size = buffer.size_;
623 }
624 else if constexpr (Mode == SizeLimitMode::MORE)
625 {
626 if (payload_size > buffer.size_)
627 {
628 return ErrorCode::SIZE_ERR;
629 }
630 }
631
632 Lock(block_);
633 timestamp = block_->data_.timestamp;
634 LibXR::Memory::FastCopy(buffer.addr_, block_->data_.data.addr_, copy_size);
635 Unlock(block_);
636
637 return ErrorCode::OK;
638 }
639
640 template <SizeLimitMode Mode = SizeLimitMode::MORE>
641 ErrorCode DumpPacket(RawData buffer)
642 {
643 if (block_->data_.data.addr_ == nullptr)
644 {
645 return ErrorCode::EMPTY;
646 }
647
648 Assert::SizeLimitCheck<Mode>(PACK_BASE_SIZE + block_->data_.data.size_, buffer.size_);
649 Lock(block_);
650 PackData(block_->data_.crc32, buffer, block_->data_.timestamp, block_->data_.data);
651 Unlock(block_);
652
653 return ErrorCode::OK;
654 }
655
656 void PublishRaw(void* addr, uint32_t size, MicrosecondTimestamp timestamp,
657 bool from_callback, bool in_isr);
658
659 static void CheckPublishSize(TopicHandle topic, uint32_t size);
660 static RawData StorePublishedData(TopicHandle topic, void* addr, uint32_t size,
661 MicrosecondTimestamp timestamp);
662 static void DispatchSubscriber(SuberBlock& block, MicrosecondTimestamp timestamp,
663 RawData data, bool from_callback, bool in_isr);
664 static void DispatchSubscribers(TopicHandle topic, MicrosecondTimestamp timestamp,
665 RawData data, bool from_callback, bool in_isr);
666};
667} // namespace LibXR
668
669#include "message/packet.hpp"
670#include "message/server.hpp"
671#include "message/subscriber.hpp"
static void SizeLimitCheck(size_t limit, size_t size)
在非调试模式下的占位大小检查函数(无实际作用)。 Dummy size limit check for non-debug builds.
链表实现,用于存储和管理数据节点。 A linked list implementation for storing and managing data nodes.
static void FastCopy(void *dst, const void *src, size_t size)
快速内存拷贝 / Fast memory copy
Definition libxr_mem.cpp:5
微秒时间戳 / Microsecond timestamp
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
Definition mutex.hpp:18
红黑树的泛型数据节点,继承自 BaseNode (Generic data node for Red-Black Tree, inheriting from BaseNode).
Definition rbt.hpp:64
Data data_
存储的数据 (Stored data).
Definition rbt.hpp:99
红黑树实现,支持泛型键和值,并提供线程安全操作 (Red-Black Tree implementation supporting generic keys and values with thread...
Definition rbt.hpp:24
可写原始数据视图 / Mutable raw data view
size_t size_
数据字节数 / Data size in bytes
void * addr_
数据起始地址 / Data start address
异步订阅者类,用于订阅异步数据 Asynchronous subscriber class for subscribing to asynchronous data
主题域(Domain)管理器,用于组织多个主题。Domain manager for organizing multiple topics.
Definition message.hpp:199
Domain(const char *name)
构造函数,初始化或查找指定名称的主题域。Constructor initializing or looking up a domain by name.
Definition topic.cpp:90
RBTree< uint32_t >::Node< RBTree< uint32_t > > * node_
指向该域的根节点。Pointer to the root node of the domain.
Definition message.hpp:215
同步订阅者类,允许同步方式接收数据。Synchronous subscriber class allowing data reception in a synchronous manner.
主题(Topic)管理类 / Topic management class
Definition message.hpp:61
SuberType
订阅者类型。Subscriber type.
void Publish(Data &data)
发布数据 Publishes data
Definition message.hpp:381
static Domain * def_domain_
默认的主题域,所有未指定域的主题都会归入此域 Default domain where all topics without a specified domain are assigned
Definition message.hpp:563
static void Unlock(TopicHandle topic)
解锁主题。Unlock the topic.
Definition topic.cpp:48
RBTree< uint32_t >::Node< Block > * TopicHandle
主题句柄,指向存储数据的红黑树节点。Handle pointing to a red-black tree node storing data.
Definition message.hpp:135
static RawData NewSubscriberBuffer()
为异步订阅者分配长期存在的接收缓冲区。 Allocates a long-lived receive buffer for an async subscriber.
Definition message.hpp:591
ErrorCode DumpData(Data &data)
转储数据到普通数据结构 Dumps data into a normal data structure
Definition message.hpp:495
static MicrosecondTimestamp NowTimestamp()
生成默认发布时刻。Create the default publish timestamp.
Definition publish.cpp:9
ErrorCode DumpData(PackedData< Data > &data)
转储数据到 PackedData Dumps data into PackedData format
Definition packet.hpp:67
static void CheckSubscriberDataSize(Topic topic)
校验订阅者数据类型和主题最大长度是否兼容。 Checks whether subscriber data type is compatible with topic max length.
Definition message.hpp:573
static void Lock(TopicHandle topic)
锁定主题,防止其被多个订阅者同时访问。Lock the topic to prevent it from being accessed by multiple subscribers at the sa...
Definition topic.cpp:30
static void UnlockFromCallback(TopicHandle topic)
从回调中解锁主题。Unlock the topic from a callback.
Definition topic.cpp:78
static TopicHandle FindOrCreate(const char *name, Domain *domain=nullptr, bool multi_publisher=false, bool cache=false, bool check_length=false)
在指定域中查找或创建主题 Finds or creates a topic in the specified domain
Definition message.hpp:350
static void LockFromCallback(TopicHandle topic)
从回调中锁定主题,防止其被多个订阅者同时访问。Lock the topic from a callback to prevent it from being accessed by multiple s...
Definition topic.cpp:60
void EnableCache()
启用主题的缓存功能 Enables caching for the topic
Definition topic.cpp:210
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待主题的创建并返回其句柄 Waits for a topic to be created and returns its handle
Definition topic.cpp:223
static void PackData(uint32_t topic_name_crc32, RawData buffer, MicrosecondTimestamp timestamp, ConstRawData data)
打包数据
Definition packet.cpp:40
void PublishFromCallback(Data &data, bool in_isr)
在回调函数中发布数据 Publishes data in a callback
Definition message.hpp:412
uint32_t GetKey() const
获取主题的键值 Gets the key value of the topic
Definition topic.cpp:244
TopicHandle block_
主题句柄,指向当前主题的内存块 Topic handle pointing to the memory block of the current topic
Definition message.hpp:547
void RegisterCallback(Callback &cb)
注册回调函数 Registers a callback function
Definition topic.cpp:113
static TopicHandle Find(const char *name, Domain *domain=nullptr)
在指定域中查找主题 Finds a topic in the specified domain
Definition topic.cpp:192
static RBTree< uint32_t > * domain_
主题域的红黑树结构,存储不同的主题 Red-Black Tree structure for storing different topics in the domain
Definition message.hpp:557
static Topic CreateTopic(const char *name, Domain *domain=nullptr, bool multi_publisher=false, bool cache=false, bool check_length=false)
创建一个新的主题 Creates a new topic
Definition message.hpp:303
Topic()
默认构造函数,创建一个空的 Topic 实例 Default constructor, creates an empty Topic instance
Definition topic.cpp:132
ErrorCode DumpData(RawData data)
转储数据到原始缓冲区。Dumps data into a raw buffer.
Definition message.hpp:469
LibXR 命名空间
Definition ch32_can.hpp:14
ErrorCode
定义错误码枚举
@ SIZE_ERR
尺寸错误 | Size error
@ EMPTY
为空 | Empty
@ OK
操作成功 | Operation successful
@ LESS
尺寸必须小于 | Size must be less
@ EQUAL
尺寸必须相等 | Size must be equal
@ MORE
尺寸必须大于 | Size must be more
异步订阅块,继承自 SuberBlock Asynchronous subscription block, inheriting from SuberBlock
存储主题运行状态和静态配置。Stores topic runtime state and static configuration.
Definition message.hpp:80
bool cache
是否启用数据缓存。Indicates whether data caching is enabled.
Definition message.hpp:88
bool check_length
是否检查数据长度。Indicates whether data length is checked.
Definition message.hpp:89
std::atomic< LockState > busy
是否忙碌。Indicates whether it is busy.
Definition message.hpp:81
uint32_t max_length
数据的最大长度。Maximum length of data.
Definition message.hpp:83
Mutex * mutex
线程同步互斥锁。Mutex for thread synchronization.
Definition message.hpp:85
uint32_t crc32
主题名称的 CRC32 校验码。CRC32 checksum of the topic name.
Definition message.hpp:84
RawData data
最近一次发布的数据视图。Latest published data view.
Definition message.hpp:86
LockFreeList subers
订阅者列表。List of subscribers.
Definition message.hpp:82
MicrosecondTimestamp timestamp
最近一次消息时间戳。Latest message timestamp.
Definition message.hpp:87
回调订阅块,继承自 SuberBlock Callback subscription block, inheriting from SuberBlock
带时间戳的类型化消息。Timestamped typed message.
Definition message.hpp:156
Data data
消息数据。Message payload.
Definition message.hpp:160
MicrosecondTimestamp timestamp
消息时间戳。Message timestamp.
Definition message.hpp:159
带时间戳的类型化消息视图。Timestamped typed message view.
Definition message.hpp:143
Data & data
消息数据引用。Message data reference.
Definition message.hpp:147
MicrosecondTimestamp timestamp
消息时间戳。Message timestamp.
Definition message.hpp:146
队列订阅块,继承自 SuberBlock Queue subscription block, inheriting from SuberBlock
订阅者信息存储结构。Structure storing subscriber information.
同步订阅者存储结构。Structure storing synchronous subscriber data.