libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
message.hpp
1#pragma once
2
3#include <atomic>
4#include <cstdint>
5
6#include "crc.hpp"
7#include "libxr_cb.hpp"
8#include "libxr_def.hpp"
9#include "libxr_type.hpp"
10#include "lock_queue.hpp"
11#include "lockfree_list.hpp"
12#include "lockfree_queue.hpp"
13#include "mutex.hpp"
14#include "queue.hpp"
15#include "rbt.hpp"
16#include "semaphore.hpp"
17#include "thread.hpp"
18
19namespace LibXR
20{
30class Topic
31{
32 enum class LockState : uint32_t
33 {
34 UNLOCKED = 0,
35 LOCKED = 1,
36 USE_MUTEX = UINT32_MAX,
37 };
38
39 public:
44 struct Block
45 {
46 std::atomic<LockState> busy;
48 uint32_t max_length;
49 uint32_t crc32;
52 bool cache;
54 };
55#ifndef __DOXYGEN__
56
57#pragma pack(push, 1)
62 struct PackedDataHeader
63 {
64 uint8_t prefix;
65 uint32_t
66 topic_name_crc32;
67 uint8_t data_len_raw[3];
68 uint8_t pack_header_crc8;
69
70 void SetDataLen(uint32_t len);
71
72 uint32_t GetDataLen() const;
73 };
74#pragma pack(pop)
75#pragma pack(push, 1)
84 template <typename Data>
85 class PackedData
86 {
87 public:
88#pragma pack(push, 1)
94 struct
95 {
96 PackedDataHeader header_;
97 uint8_t data_[sizeof(Data)];
98 } raw;
99
100 uint8_t crc8_;
101
102#pragma pack(pop)
103
110 PackedData &operator=(const Data &data)
111 {
112 LibXR::Memory::FastCopy(raw.data_, &data, sizeof(Data));
113 crc8_ = CRC8::Calculate(&raw, sizeof(raw));
114 return *this;
115 }
116
121 operator Data() { return *reinterpret_cast<Data *>(raw.data_); }
122
126 Data *operator->() { return reinterpret_cast<Data *>(raw.data_); }
127
132 const Data *operator->() const { return reinterpret_cast<const Data *>(raw.data_); }
133 };
134#pragma pack(pop)
135
136 static constexpr size_t PACK_BASE_SIZE = sizeof(PackedData<uint8_t>) - 1;
137
138#endif
139
146
153 static void Lock(TopicHandle topic);
154
160 static void Unlock(TopicHandle topic);
161
168 static void LockFromCallback(TopicHandle topic);
169
175 static void UnlockFromCallback(TopicHandle topic);
176
182 class Domain
183 {
184 public:
193 Domain(const char *name);
194
199 };
200
205 enum class SuberType : uint8_t
206 {
207 SYNC,
208 ASYNC,
209 QUEUE,
210 CALLBACK,
211 };
212
218 {
220 };
221
226 struct SyncBlock : public SuberBlock
227 {
230 };
231
238 template <typename Data>
240 {
241 public:
252 SyncSubscriber(const char *name, Data &data, Domain *domain = nullptr)
253 {
254 *this = SyncSubscriber<Data>(WaitTopic(name, UINT32_MAX, domain), data);
255 }
256
266 SyncSubscriber(Topic topic, Data &data)
267 {
268 if (topic.block_->data_.check_length)
269 {
270 ASSERT(topic.block_->data_.max_length == sizeof(Data));
271 }
272 else
273 {
274 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
275 }
276
279 block_->data_.buff = RawData(data);
280 topic.block_->data_.subers.Add(*block_);
281 }
282
288 ErrorCode Wait(uint32_t timeout = UINT32_MAX)
289 {
290 // TODO: Reset sem
291 return block_->data_.sem.Wait(timeout);
292 }
293
295 };
296
297 enum class ASyncSubscriberState : uint32_t
298 {
299 IDLE = 0,
300 WAITING = 1,
301 DATA_READY = UINT32_MAX
302 };
303
309 typedef struct ASyncBlock : public SuberBlock
310 {
312 std::atomic<ASyncSubscriberState> state;
313 } ASyncBlock;
314
321 template <typename Data>
323 {
324 public:
334 ASyncSubscriber(const char *name, Domain *domain = nullptr)
335 {
336 *this = ASyncSubscriber(WaitTopic(name, UINT32_MAX, domain));
337 }
338
348 {
349 if (topic.block_->data_.check_length)
350 {
351 ASSERT(topic.block_->data_.max_length == sizeof(Data));
352 }
353 else
354 {
355 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
356 }
357
360 block_->data_.buff = *(new Data);
361 topic.block_->data_.subers.Add(*block_);
362 }
363
373 {
374 return block_->data_.state.load(std::memory_order_acquire) ==
375 ASyncSubscriberState::DATA_READY;
376 }
377
384 Data &GetData()
385 {
386 block_->data_.state.store(ASyncSubscriberState::IDLE, std::memory_order_release);
387 return *reinterpret_cast<Data *>(block_->data_.buff.addr_);
388 }
389
395 {
396 block_->data_.state.store(ASyncSubscriberState::WAITING, std::memory_order_release);
397 }
398
400 };
401
407 typedef struct QueueBlock : public SuberBlock
408 {
409 void *queue;
410 void (*fun)(RawData &, void *,
411 bool);
412 } QueueBlock;
413
415 {
416 public:
429 template <typename Data, uint32_t Length>
430 QueuedSubscriber(const char *name, LockFreeQueue<Data> &queue,
431 Domain *domain = nullptr)
432 {
433 *this = QueuedSubscriber(WaitTopic(name, UINT32_MAX, domain), queue);
434 }
435
446 template <typename Data>
448 {
449 if (topic.block_->data_.check_length)
450 {
451 ASSERT(topic.block_->data_.max_length == sizeof(Data));
452 }
453 else
454 {
455 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
456 }
457
458 auto block = new LockFreeList::Node<QueueBlock>;
459 block->data_.type = SuberType::QUEUE;
460 block->data_.queue = &queue;
461 block->data_.fun = [](RawData &data, void *arg, bool in_isr)
462 {
463 UNUSED(in_isr);
464 LockFreeQueue<Data> *queue = reinterpret_cast<LockFreeQueue<Data> *>(arg);
465 queue->Push(*reinterpret_cast<Data *>(data.addr_));
466 };
467
468 topic.block_->data_.subers.Add(*block);
469 }
470 };
471
473
479 typedef struct CallbackBlock : public SuberBlock
480 {
483
492 void RegisterCallback(Callback &cb);
493
498 Topic();
499
517 Topic(const char *name, uint32_t max_length, Domain *domain = nullptr,
518 bool multi_publisher = false, bool cache = false, bool check_length = false);
519
537 template <typename Data>
538 static Topic CreateTopic(const char *name, Domain *domain = nullptr,
539 bool multi_publisher = false, bool cache = false,
540 bool check_length = false)
541 {
542 return Topic(name, sizeof(Data), domain, multi_publisher, cache, check_length);
543 }
544
550 Topic(TopicHandle topic);
551
560 static TopicHandle Find(const char *name, Domain *domain = nullptr);
561
578 template <typename Data>
579 static TopicHandle FindOrCreate(const char *name, Domain *domain = nullptr,
580 bool multi_publisher = false, bool cache = false,
581 bool check_length = false)
582 {
583 auto topic = Find(name, domain);
584 if (topic == nullptr)
585 {
586 topic =
587 CreateTopic<Data>(name, domain, multi_publisher, cache, check_length).block_;
588 }
589 return topic;
590 }
591
599 void EnableCache();
600
607 template <typename Data>
608 void Publish(Data &data)
609 {
610 Publish(&data, sizeof(Data));
611 }
612
619 void Publish(void *addr, uint32_t size);
620
628 template <typename Data>
629 void PublishFromCallback(Data &data, bool in_isr)
630 {
631 PublishFromCallback(&data, sizeof(Data), in_isr);
632 }
633
641 void PublishFromCallback(void *addr, uint32_t size, bool in_isr);
642
652 template <SizeLimitMode Mode = SizeLimitMode::MORE>
653 ErrorCode DumpData(RawData data, bool pack = false)
654 {
655 if (block_->data_.data.addr_ == nullptr)
656 {
657 return ErrorCode::EMPTY;
658 }
659
660 if (!pack)
661 {
663 Lock(block_);
664 LibXR::Memory::FastCopy(data.addr_, block_->data_.data.addr_,
665 block_->data_.data.size_);
666 Unlock(block_);
667 }
668 else
669 {
670 Assert::SizeLimitCheck<Mode>(PACK_BASE_SIZE + block_->data_.data.size_, data.size_);
671
672 Lock(block_);
673 PackData(block_->data_.crc32, data, block_->data_.data);
674 Unlock(block_);
675 }
676
677 return ErrorCode::OK;
678 }
679
687 static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source);
688
695 template <typename Data>
696 ErrorCode DumpData(PackedData<Data> &data)
697 {
698 if (block_->data_.data.addr_ == nullptr)
699 {
700 return ErrorCode::EMPTY;
701 }
702
703 ASSERT(sizeof(Data) == block_->data_.data.size_);
704
705 return DumpData<SizeLimitMode::NONE>(RawData(data), true);
706 }
707
714 template <typename Data>
715 ErrorCode DumpData(Data &data)
716 {
717 if (block_->data_.data.addr_ == nullptr)
718 {
719 return ErrorCode::EMPTY;
720 }
721
722 ASSERT(sizeof(Data) == block_->data_.data.size_);
723
724 return DumpData<SizeLimitMode::NONE>(data, false);
725 }
726
738 static TopicHandle WaitTopic(const char *name, uint32_t timeout = UINT32_MAX,
739 Domain *domain = nullptr);
740
746 operator TopicHandle() { return block_; }
747
754 uint32_t GetKey() const;
755
762 class Server
763 {
764 public:
770 enum class Status : uint8_t
771 {
772 WAIT_START,
773 WAIT_TOPIC,
775 };
776
785 Server(size_t buffer_length);
786
795 void Register(TopicHandle topic);
796
803 size_t ParseData(ConstRawData data);
804
805 private:
808 uint32_t data_len_ = 0;
813 };
814
815 private:
821
826 static inline RBTree<uint32_t> *domain_ = nullptr;
827
832 static inline Domain *def_domain_ = nullptr;
833};
834} // namespace LibXR
static void SizeLimitCheck(size_t limit, size_t size)
在非调试模式下的占位大小检查函数(无实际作用)。 Dummy size limit check for non-debug builds.
基础队列类,提供固定大小的循环缓冲区 (Base queue class providing a fixed-size circular buffer).
Definition queue.hpp:20
提供一个通用的回调包装,支持动态参数传递。 Provides a generic callback wrapper, supporting dynamic argument passing.
Definition libxr_cb.hpp:124
常量原始数据封装类。 A class for encapsulating constant raw data.
数据节点模板,继承自 BaseNode,用于存储具体数据类型。 Template data node that inherits from BaseNode to store specific data...
Data data_
存储的数据。 The stored data.
链表实现,用于存储和管理数据节点。 A linked list implementation for storing and managing data nodes.
无锁队列实现 / Lock-free queue implementation
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
static void FastCopy(void *dst, const void *src, size_t size)
快速内存拷贝 / Fast memory copy
Definition libxr_mem.cpp:17
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
Definition mutex.hpp:18
红黑树的泛型数据节点,继承自 BaseNode (Generic data node for Red-Black Tree, inheriting from BaseNode).
Definition rbt.hpp:64
Data data_
存储的数据 (Stored data).
Definition rbt.hpp:99
红黑树实现,支持泛型键和值,并提供线程安全操作 (Red-Black Tree implementation supporting generic keys and values with thread...
Definition rbt.hpp:24
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
信号量类,实现线程同步机制 Semaphore class implementing thread synchronization
Definition semaphore.hpp:23
异步订阅者类,用于订阅异步数据 Asynchronous subscriber class for subscribing to asynchronous data
Definition message.hpp:323
ASyncSubscriber(Topic topic)
构造函数,使用 Topic 进行初始化 Constructor using a Topic for initialization
Definition message.hpp:347
Data & GetData()
获取当前数据 Retrieves the current data
Definition message.hpp:384
LockFreeList::Node< ASyncBlock > * block_
订阅者数据块。Subscriber data block.
Definition message.hpp:399
bool Available()
检查数据是否可用 Checks if data is available
Definition message.hpp:372
ASyncSubscriber(const char *name, Domain *domain=nullptr)
构造函数,通过名称和数据创建订阅者 Constructor to create a subscriber with a name and data
Definition message.hpp:334
void StartWaiting()
开始等待数据更新 Starts waiting for data update
Definition message.hpp:394
主题域(Domain)管理器,用于组织多个主题。Domain manager for organizing multiple topics.
Definition message.hpp:183
Domain(const char *name)
构造函数,初始化或查找指定名称的主题域。Constructor initializing or looking up a domain by name.
Definition message.cpp:88
RBTree< uint32_t >::Node< RBTree< uint32_t > > * node_
指向该域的根节点。Pointer to the root node of the domain.
Definition message.hpp:198
QueuedSubscriber(Topic topic, LockFreeQueue< Data > &queue)
构造函数,使用 Topic 和无锁队列进行初始化 Constructor using a Topic and a lock-free queue
Definition message.hpp:447
QueuedSubscriber(const char *name, LockFreeQueue< Data > &queue, Domain *domain=nullptr)
构造函数,自动创建队列
Definition message.hpp:430
服务器类,负责解析数据并将其分发到相应的主题 Server class responsible for parsing data and distributing it to corresponding...
Definition message.hpp:763
RBTree< uint32_t > topic_map_
主题映射表 Topic mapping table
Definition message.hpp:809
void Register(TopicHandle topic)
注册一个主题 Registers a topic
Definition message.cpp:410
BaseQueue queue_
数据队列 Data queue
Definition message.hpp:810
size_t ParseData(ConstRawData data)
解析接收到的数据 Parses received data
Definition message.cpp:416
TopicHandle current_topic_
当前主题句柄 Current topic handle
Definition message.hpp:812
uint32_t data_len_
当前数据长度 Current data length
Definition message.hpp:808
Status
服务器解析状态枚举 Enumeration of server parsing states
Definition message.hpp:771
@ WAIT_DATA_CRC
等待数据校验 Waiting for data CRC validation
@ WAIT_START
等待起始标志 Waiting for start flag
@ WAIT_TOPIC
等待主题信息 Waiting for topic information
RawData parse_buff_
解析数据缓冲区 Data buffer for parsing
Definition message.hpp:811
Server(size_t buffer_length)
构造函数,初始化服务器并分配缓冲区 Constructor to initialize the server and allocate buffer
Definition message.cpp:399
Status status_
服务器的当前解析状态 Current parsing state of the server
Definition message.hpp:806
同步订阅者类,允许同步方式接收数据。Synchronous subscriber class allowing data reception in a synchronous manner.
Definition message.hpp:240
SyncSubscriber(Topic topic, Data &data)
通过 Topic 句柄构造同步订阅者。Constructs a synchronous subscriber using a Topic handle.
Definition message.hpp:266
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待接收数据。Waits for data reception.
Definition message.hpp:288
LockFreeList::Node< SyncBlock > * block_
订阅者数据块。Subscriber data block.
Definition message.hpp:294
SyncSubscriber(const char *name, Data &data, Domain *domain=nullptr)
通过主题名称构造同步订阅者。Constructs a synchronous subscriber by topic name.
Definition message.hpp:252
主题(Topic)管理类 / Topic management class
Definition message.hpp:31
SuberType
订阅者类型。Subscriber type.
Definition message.hpp:206
@ SYNC
同步订阅者。Synchronous subscriber.
@ ASYNC
异步订阅者。Asynchronous subscriber.
@ QUEUE
队列订阅者。Queued subscriber.
@ CALLBACK
回调订阅者。Callback subscriber.
void Publish(Data &data)
发布数据 Publishes data
Definition message.hpp:608
static Domain * def_domain_
默认的主题域,所有未指定域的主题都会归入此域 Default domain where all topics without a specified domain are assigned
Definition message.hpp:832
static void Unlock(TopicHandle topic)
解锁主题。Unlock the topic.
Definition message.cpp:46
RBTree< uint32_t >::Node< Block > * TopicHandle
主题句柄,指向存储数据的红黑树节点。Handle pointing to a red-black tree node storing data.
Definition message.hpp:145
ErrorCode DumpData(Data &data)
转储数据到普通数据结构 Dumps data into a normal data structure
Definition message.hpp:715
ErrorCode DumpData(PackedData< Data > &data)
转储数据到 PackedData Dumps data into PackedData format
Definition message.hpp:696
static void Lock(TopicHandle topic)
锁定主题,防止其被多个订阅者同时访问。Lock the topic to prevent it from being accessed by multiple subscribers at the sa...
Definition message.cpp:28
static void UnlockFromCallback(TopicHandle topic)
从回调中解锁主题。Unlock the topic from a callback.
Definition message.cpp:76
static TopicHandle FindOrCreate(const char *name, Domain *domain=nullptr, bool multi_publisher=false, bool cache=false, bool check_length=false)
在指定域中查找或创建主题 Finds or creates a topic in the specified domain
Definition message.hpp:579
static void LockFromCallback(TopicHandle topic)
从回调中锁定主题,防止其被多个订阅者同时访问。Lock the topic from a callback to prevent it from being accessed by multiple s...
Definition message.cpp:58
ErrorCode DumpData(RawData data, bool pack=false)
转储数据 Dump data
Definition message.hpp:653
void EnableCache()
启用主题的缓存功能 Enables caching for the topic
Definition message.cpp:204
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待主题的创建并返回其句柄 Waits for a topic to be created and returns its handle
Definition message.cpp:368
void PublishFromCallback(Data &data, bool in_isr)
在回调函数中发布数据 Publishes data in a callback
Definition message.hpp:629
uint32_t GetKey() const
获取主题的键值 Gets the key value of the topic
Definition message.cpp:387
TopicHandle block_
主题句柄,指向当前主题的内存块 Topic handle pointing to the memory block of the current topic
Definition message.hpp:820
void RegisterCallback(Callback &cb)
注册回调函数 Registers a callback function
Definition message.cpp:117
static TopicHandle Find(const char *name, Domain *domain=nullptr)
在指定域中查找主题 Finds a topic in the specified domain
Definition message.cpp:192
static RBTree< uint32_t > * domain_
主题域的红黑树结构,存储不同的主题 Red-Black Tree structure for storing different topics in the domain
Definition message.hpp:826
static Topic CreateTopic(const char *name, Domain *domain=nullptr, bool multi_publisher=false, bool cache=false, bool check_length=false)
创建一个新的主题 Creates a new topic
Definition message.hpp:538
Topic()
默认构造函数,创建一个空的 Topic 实例 Default constructor, creates an empty Topic instance
Definition message.cpp:127
static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
打包数据
Definition message.cpp:351
LibXR 命名空间
异步订阅块,继承自 SuberBlock Asynchronous subscription block, inheriting from SuberBlock
Definition message.hpp:310
std::atomic< ASyncSubscriberState > state
订阅者状态 Subscriber state
Definition message.hpp:312
RawData buff
缓冲区数据 Buffer data
Definition message.hpp:311
存储主题(Topic)数据的结构体。Structure storing topic data.
Definition message.hpp:45
bool cache
是否启用数据缓存。Indicates whether data caching is enabled.
Definition message.hpp:52
bool check_length
是否检查数据长度。Indicates whether data length is checked.
Definition message.hpp:53
std::atomic< LockState > busy
是否忙碌。Indicates whether it is busy.
Definition message.hpp:46
uint32_t max_length
数据的最大长度。Maximum length of data.
Definition message.hpp:48
Mutex * mutex
线程同步互斥锁。Mutex for thread synchronization.
Definition message.hpp:50
uint32_t crc32
主题名称的 CRC32 校验码。CRC32 checksum of the topic name.
Definition message.hpp:49
RawData data
存储的数据。Stored data.
Definition message.hpp:51
LockFreeList subers
订阅者列表。List of subscribers.
Definition message.hpp:47
回调订阅块,继承自 SuberBlock Callback subscription block, inheriting from SuberBlock
Definition message.hpp:480
Callback cb
订阅的回调函数 Subscribed callback function
Definition message.hpp:481
队列订阅块,继承自 SuberBlock Queue subscription block, inheriting from SuberBlock
Definition message.hpp:408
void * queue
指向订阅队列的指针 Pointer to the subscribed queue
Definition message.hpp:409
void(* fun)(RawData &, void *, bool)
处理数据的回调函数 Callback function to handle data
Definition message.hpp:410
订阅者信息存储结构。Structure storing subscriber information.
Definition message.hpp:218
SuberType type
订阅者类型。Type of subscriber.
Definition message.hpp:219
同步订阅者存储结构。Structure storing synchronous subscriber data.
Definition message.hpp:227
Semaphore sem
信号量,用于同步等待数据。Semaphore for data synchronization.
Definition message.hpp:229
RawData buff
存储的数据缓冲区。Data buffer.
Definition message.hpp:228