libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
message.hpp
1#pragma once
2
3#include "crc.hpp"
4#include "libxr_cb.hpp"
5#include "libxr_def.hpp"
6#include "libxr_type.hpp"
7#include "lock_queue.hpp"
8#include "lockfree_list.hpp"
9#include "lockfree_queue.hpp"
10#include "mutex.hpp"
11#include "queue.hpp"
12#include "rbt.hpp"
13#include "semaphore.hpp"
14#include "thread.hpp"
15
16namespace LibXR
17{
27class Topic
28{
33 public:
44#ifndef __DOXYGEN__
45
46#pragma pack(push, 1)
51 struct PackedDataHeader
52 {
53 uint8_t prefix;
54 uint32_t
55 topic_name_crc32;
56 uint8_t data_len_raw[3];
57 uint8_t pack_header_crc8;
58
59 void SetDataLen(uint32_t len);
60
61 uint32_t GetDataLen() const;
62 };
63#pragma pack(pop)
64#pragma pack(push, 1)
73 template <typename Data>
74 class PackedData
75 {
76 public:
77#pragma pack(push, 1)
83 struct
84 {
85 PackedDataHeader header_;
86 uint8_t data_[sizeof(Data)];
87 } raw;
88
89 uint8_t crc8_;
90
91#pragma pack(pop)
92
99 PackedData &operator=(const Data &data)
100 {
101 memcpy(raw.data_, &data, sizeof(Data));
102 crc8_ = CRC8::Calculate(&raw, sizeof(raw));
103 return *this;
104 }
105
110 operator Data() { return *reinterpret_cast<Data *>(raw.data_); }
111
115 Data *operator->() { return reinterpret_cast<Data *>(raw.data_); }
116
121 const Data *operator->() const { return reinterpret_cast<const Data *>(raw.data_); }
122 };
123#pragma pack(pop)
124
125 static constexpr size_t PACK_BASE_SIZE = sizeof(PackedData<uint8_t>) - 1;
126
127#endif
128
135
141 class Domain
142 {
143 public:
149 Domain(const char *name);
150
155 };
156
161 enum class SuberType : uint8_t
162 {
163 SYNC,
164 ASYNC,
165 QUEUE,
166 CALLBACK,
167 };
168
174 {
176 };
177
182 struct SyncBlock : public SuberBlock
183 {
186 };
187
194 template <typename Data>
196 {
197 public:
205 SyncSubscriber(const char *name, Data &data, Domain *domain = nullptr)
206 {
207 *this = SyncSubscriber<Data>(WaitTopic(name, UINT32_MAX, domain), data);
208 }
209
216 SyncSubscriber(Topic topic, Data &data)
217 {
218 if (topic.block_->data_.check_length)
219 {
220 ASSERT(topic.block_->data_.max_length == sizeof(Data));
221 }
222 else
223 {
224 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
225 }
226
229 block_->data_.buff = RawData(data);
230 topic.block_->data_.subers.Add(*block_);
231 }
232
238 ErrorCode Wait(uint32_t timeout = UINT32_MAX)
239 {
240 return block_->data_.sem.Wait(timeout);
241 }
242
244 };
245
251 typedef struct ASyncBlock : public SuberBlock
252 {
255 bool waiting;
256 } ASyncBlock;
257
264 template <typename Data>
266 {
267 public:
274 ASyncSubscriber(const char *name, Domain *domain = nullptr)
275 {
276 *this = ASyncSubscriber(WaitTopic(name, UINT32_MAX, domain));
277 }
278
285 {
286 if (topic.block_->data_.check_length)
287 {
288 ASSERT(topic.block_->data_.max_length == sizeof(Data));
289 }
290 else
291 {
292 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
293 }
294
297 block_->data_.buff = *(new Data);
298 topic.block_->data_.subers.Add(*block_);
299 }
300
309 bool Available() { return block_->data_.data_ready; }
310
317 Data &GetData()
318 {
319 block_->data_.data_ready = false;
320 return *reinterpret_cast<Data *>(block_->data_.buff.addr_);
321 }
322
327 void StartWaiting() { block_->data_.waiting = true; }
328
330 };
331
337 typedef struct QueueBlock : public SuberBlock
338 {
339 void *queue;
340 void (*fun)(RawData &, void *,
341 bool);
342 } QueueBlock;
343
354 {
355 public:
356 template <typename Data, uint32_t Length>
357 QueuedSubscriber(const char *name, LockFreeQueue<Data> &queue,
358 Domain *domain = nullptr)
359 {
360 *this = QueuedSubscriber(WaitTopic(name, UINT32_MAX, domain), queue);
361 }
362
370 template <typename Data>
372 {
373 if (topic.block_->data_.check_length)
374 {
375 ASSERT(topic.block_->data_.max_length == sizeof(Data));
376 }
377 else
378 {
379 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
380 }
381
382 auto block = new LockFreeList::Node<QueueBlock>;
383 block->data_.type = SuberType::QUEUE;
384 block->data_.queue = &queue;
385 block->data_.fun = [](RawData &data, void *arg, bool in_isr)
386 {
387 UNUSED(in_isr);
388 LockFreeQueue<Data> *queue = reinterpret_cast<LockFreeQueue<Data>>(arg);
389 queue->Push(reinterpret_cast<Data>(data.addr_));
390 };
391
392 topic.block_->data_.subers.Add(*block);
393 }
394
403 template <typename Data>
404 QueuedSubscriber(const char *name, LockQueue<Data> &queue, Domain *domain = nullptr)
405 {
406 *this = QueuedSubscriber(WaitTopic(name, UINT32_MAX, domain), queue);
407 }
408
416 template <typename Data>
418 {
419 if (topic.block_->data_.check_length)
420 {
421 ASSERT(topic.block_->data_.max_length == sizeof(Data));
422 }
423 else
424 {
425 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
426 }
427
428 auto block = new LockFreeList::Node<QueueBlock>;
429 block->data_.type = SuberType::QUEUE;
430 block->data_.queue = &queue;
431 block->data_.fun = [](RawData &data, void *arg, bool in_isr)
432 {
433 LockQueue<Data> *queue = reinterpret_cast<LockQueue<Data> *>(arg);
434 queue->PushFromCallback(*reinterpret_cast<const Data *>(data.addr_), in_isr);
435 };
436
437 topic.block_->data_.subers.Add(*block);
438 }
439 };
440
442
448 typedef struct CallbackBlock : public SuberBlock
449 {
452
458 void RegisterCallback(Callback &cb);
459
464 Topic();
465
478 Topic(const char *name, uint32_t max_length, Domain *domain = nullptr,
479 bool cache = false, bool check_length = false);
480
493 template <typename Data>
494 static Topic CreateTopic(const char *name, Domain *domain = nullptr, bool cache = false,
495 bool check_length = true)
496 {
497 return Topic(name, sizeof(Data), domain, cache, check_length);
498 }
499
505 Topic(TopicHandle topic);
506
515 static TopicHandle Find(const char *name, Domain *domain = nullptr);
516
529 template <typename Data>
530 static TopicHandle FindOrCreate(const char *name, Domain *domain = nullptr,
531 bool cache = false, bool check_length = true)
532 {
533 auto topic = Find(name, domain);
534 if (topic == nullptr)
535 {
536 topic = CreateTopic<Data>(name, domain, cache, check_length).block_;
537 }
538 return topic;
539 }
540
545 void EnableCache();
546
553 template <typename Data>
554 void Publish(Data &data)
555 {
556 Publish(&data, sizeof(Data));
557 }
558
565 void Publish(void *addr, uint32_t size);
566
576 template <SizeLimitMode Mode = SizeLimitMode::MORE>
577 ErrorCode DumpData(RawData data, bool pack = false)
578 {
579 if (block_->data_.data.addr_ == nullptr)
580 {
581 return ErrorCode::EMPTY;
582 }
583
584 if (!pack)
585 {
587 block_->data_.mutex.Lock();
588 memcpy(data.addr_, block_->data_.data.addr_, block_->data_.data.size_);
589 block_->data_.mutex.Unlock();
590 }
591 else
592 {
593 Assert::SizeLimitCheck<Mode>(PACK_BASE_SIZE + block_->data_.data.size_, data.size_);
594
595 block_->data_.mutex.Lock();
596 PackData(block_->data_.crc32, data, block_->data_.data);
597 block_->data_.mutex.Unlock();
598 }
599
600 return ErrorCode::OK;
601 }
602
610 static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source);
611
618 template <typename Data>
619 ErrorCode DumpData(PackedData<Data> &data)
620 {
621 if (block_->data_.data.addr_ == nullptr)
622 {
623 return ErrorCode::EMPTY;
624 }
625
626 ASSERT(sizeof(Data) == block_->data_.data.size_);
627
628 return DumpData<SizeLimitMode::NONE>(RawData(data), true);
629 }
630
637 template <typename Data>
638 ErrorCode DumpData(Data &data)
639 {
640 if (block_->data_.data.addr_ == nullptr)
641 {
642 return ErrorCode::EMPTY;
643 }
644
645 ASSERT(sizeof(Data) == block_->data_.data.size_);
646
647 return DumpData<SizeLimitMode::NONE>(data, false);
648 }
649
661 static TopicHandle WaitTopic(const char *name, uint32_t timeout = UINT32_MAX,
662 Domain *domain = nullptr);
663
669 operator TopicHandle() { return block_; }
670
677 uint32_t GetKey() const;
678
685 class Server
686 {
687 public:
693 enum class Status : uint8_t
694 {
695 WAIT_START,
696 WAIT_TOPIC,
698 };
699
705 Server(size_t buffer_length);
706
712 void Register(TopicHandle topic);
713
720 size_t ParseData(ConstRawData data);
721
722 private:
725 uint32_t data_len_ = 0;
730 };
731
732 private:
738
743 static inline RBTree<uint32_t> *domain_ = nullptr;
744
749 static inline Domain *def_domain_ = nullptr;
750};
751} // namespace LibXR
static void SizeLimitCheck(size_t limit, size_t size)
在非调试模式下的占位大小检查函数(无实际作用)。 Dummy size limit check for non-debug builds.
基础队列类,提供固定大小的循环缓冲区 (Base queue class providing a fixed-size circular buffer).
Definition queue.hpp:20
提供一个通用的回调包装,支持动态参数传递。 Provides a generic callback wrapper, supporting dynamic argument passing.
Definition libxr_cb.hpp:124
常量原始数据封装类。 A class for encapsulating constant raw data.
数据节点模板,继承自 BaseNode,用于存储具体数据类型。 Template data node that inherits from BaseNode to store specific data...
Data data_
存储的数据。 The stored data.
链表实现,用于存储和管理数据节点。 A linked list implementation for storing and managing data nodes.
无锁队列实现 / Lock-free queue implementation
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
线程安全的锁队列类,提供同步和异步操作支持 Thread-safe lock queue class with synchronous and asynchronous operation suppor...
ErrorCode PushFromCallback(const Data &data, bool in_isr)
从回调函数中推送数据 Pushes data into the queue from a callback function
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
Definition mutex.hpp:18
红黑树的泛型数据节点,继承自 BaseNode (Generic data node for Red-Black Tree, inheriting from BaseNode).
Definition rbt.hpp:64
Data data_
存储的数据 (Stored data).
Definition rbt.hpp:99
红黑树实现,支持泛型键和值,并提供线程安全操作 (Red-Black Tree implementation supporting generic keys and values with thread...
Definition rbt.hpp:24
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
信号量类,实现线程同步机制 Semaphore class implementing thread synchronization
Definition semaphore.hpp:23
异步订阅者类,用于订阅异步数据 Asynchronous subscriber class for subscribing to asynchronous data
Definition message.hpp:266
ASyncSubscriber(Topic topic)
构造函数,使用 Topic 进行初始化 Constructor using a Topic for initialization
Definition message.hpp:284
Data & GetData()
获取当前数据 Retrieves the current data
Definition message.hpp:317
LockFreeList::Node< ASyncBlock > * block_
订阅者数据块。Subscriber data block.
Definition message.hpp:329
bool Available()
检查数据是否可用 Checks if data is available
Definition message.hpp:309
ASyncSubscriber(const char *name, Domain *domain=nullptr)
构造函数,通过名称和数据创建订阅者 Constructor to create a subscriber with a name and data
Definition message.hpp:274
void StartWaiting()
开始等待数据更新 Starts waiting for data update
Definition message.hpp:327
主题域(Domain)管理器,用于组织多个主题。Domain manager for organizing multiple topics.
Definition message.hpp:142
Domain(const char *name)
构造函数,初始化或查找指定名称的主题域。Constructor initializing or looking up a domain by name.
Definition message.cpp:22
RBTree< uint32_t >::Node< RBTree< uint32_t > > * node_
指向该域的根节点。Pointer to the root node of the domain.
Definition message.hpp:154
构造函数,使用名称和无锁队列进行初始化 Constructor using a name and a lock-free queue
Definition message.hpp:354
QueuedSubscriber(Topic topic, LockFreeQueue< Data > &queue)
构造函数,使用 Topic 和无锁队列进行初始化 Constructor using a Topic and a lock-free queue
Definition message.hpp:371
QueuedSubscriber(const char *name, LockQueue< Data > &queue, Domain *domain=nullptr)
构造函数,使用名称和带锁队列进行初始化 Constructor using a name and a locked queue
Definition message.hpp:404
QueuedSubscriber(Topic topic, LockQueue< Data > &queue)
构造函数,使用 Topic 和带锁队列进行初始化 Constructor using a Topic and a locked queue
Definition message.hpp:417
服务器类,负责解析数据并将其分发到相应的主题 Server class responsible for parsing data and distributing it to corresponding...
Definition message.hpp:686
RBTree< uint32_t > topic_map_
主题映射表 Topic mapping table
Definition message.hpp:726
void Register(TopicHandle topic)
注册一个主题 Registers a topic
Definition message.cpp:259
BaseQueue queue_
数据队列 Data queue
Definition message.hpp:727
size_t ParseData(ConstRawData data)
解析接收到的数据 Parses received data
Definition message.cpp:265
TopicHandle current_topic_
当前主题句柄 Current topic handle
Definition message.hpp:729
uint32_t data_len_
当前数据长度 Current data length
Definition message.hpp:725
Status
服务器解析状态枚举 Enumeration of server parsing states
Definition message.hpp:694
@ WAIT_DATA_CRC
等待数据校验 Waiting for data CRC validation
@ WAIT_START
等待起始标志 Waiting for start flag
@ WAIT_TOPIC
等待主题信息 Waiting for topic information
RawData parse_buff_
解析数据缓冲区 Data buffer for parsing
Definition message.hpp:728
Server(size_t buffer_length)
构造函数,初始化服务器并分配缓冲区 Constructor to initialize the server and allocate buffer
Definition message.cpp:248
Status status_
服务器的当前解析状态 Current parsing state of the server
Definition message.hpp:723
同步订阅者类,允许同步方式接收数据。Synchronous subscriber class allowing data reception in a synchronous manner.
Definition message.hpp:196
SyncSubscriber(Topic topic, Data &data)
通过 Topic 句柄构造同步订阅者。Constructs a synchronous subscriber using a Topic handle.
Definition message.hpp:216
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待接收数据。Waits for data reception.
Definition message.hpp:238
LockFreeList::Node< SyncBlock > * block_
订阅者数据块。Subscriber data block.
Definition message.hpp:243
SyncSubscriber(const char *name, Data &data, Domain *domain=nullptr)
通过主题名称构造同步订阅者。Constructs a synchronous subscriber by topic name.
Definition message.hpp:205
主题(Topic)管理类 / Topic management class
Definition message.hpp:28
SuberType
订阅者类型。Subscriber type.
Definition message.hpp:162
@ SYNC
同步订阅者。Synchronous subscriber.
@ ASYNC
异步订阅者。Asynchronous subscriber.
@ QUEUE
队列订阅者。Queued subscriber.
@ CALLBACK
回调订阅者。Callback subscriber.
void Publish(Data &data)
发布数据 Publishes data
Definition message.hpp:554
static Domain * def_domain_
默认的主题域,所有未指定域的主题都会归入此域 Default domain where all topics without a specified domain are assigned
Definition message.hpp:749
RBTree< uint32_t >::Node< Block > * TopicHandle
主题句柄,指向存储数据的红黑树节点。Handle pointing to a red-black tree node storing data.
Definition message.hpp:134
ErrorCode DumpData(Data &data)
转储数据到普通数据结构 Dumps data into a normal data structure
Definition message.hpp:638
static Topic CreateTopic(const char *name, Domain *domain=nullptr, bool cache=false, bool check_length=true)
创建一个新的主题 Creates a new topic
Definition message.hpp:494
static TopicHandle FindOrCreate(const char *name, Domain *domain=nullptr, bool cache=false, bool check_length=true)
在指定域中查找或创建主题 Finds or creates a topic in the specified domain
Definition message.hpp:530
ErrorCode DumpData(PackedData< Data > &data)
转储数据到 PackedData Dumps data into PackedData format
Definition message.hpp:619
ErrorCode DumpData(RawData data, bool pack=false)
转储数据 Dump data
Definition message.hpp:577
void EnableCache()
启用主题的缓存功能 Enables caching for the topic
Definition message.cpp:122
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待主题的创建并返回其句柄 Waits for a topic to be created and returns its handle
Definition message.cpp:217
uint32_t GetKey() const
获取主题的键值 Gets the key value of the topic
Definition message.cpp:236
TopicHandle block_
主题句柄,指向当前主题的内存块 Topic handle pointing to the memory block of the current topic
Definition message.hpp:737
void RegisterCallback(Callback &cb)
注册回调函数 Registers a callback function
Definition message.cpp:51
static TopicHandle Find(const char *name, Domain *domain=nullptr)
在指定域中查找主题 Finds a topic in the specified domain
Definition message.cpp:110
static RBTree< uint32_t > * domain_
主题域的红黑树结构,存储不同的主题 Red-Black Tree structure for storing different topics in the domain
Definition message.hpp:743
Topic()
默认构造函数,创建一个空的 Topic 实例 Default constructor, creates an empty Topic instance
Definition message.cpp:61
static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
打包数据
Definition message.cpp:200
LibXR 命名空间
Definition ch32_gpio.hpp:9
异步订阅块,继承自 SuberBlock Asynchronous subscription block, inheriting from SuberBlock
Definition message.hpp:252
RawData buff
缓冲区数据 Buffer data
Definition message.hpp:253
bool waiting
等待中标志 Waiting flag
Definition message.hpp:255
bool data_ready
数据就绪标志 Data ready flag
Definition message.hpp:254
存储主题(Topic)数据的结构体。Structure storing topic data.
Definition message.hpp:35
bool cache
是否启用数据缓存。Indicates whether data caching is enabled.
Definition message.hpp:40
Mutex mutex
线程同步互斥锁。Mutex for thread synchronization.
Definition message.hpp:38
bool check_length
是否检查数据长度。Indicates whether data length is checked.
Definition message.hpp:41
uint32_t max_length
数据的最大长度。Maximum length of data.
Definition message.hpp:36
uint32_t crc32
主题名称的 CRC32 校验码。CRC32 checksum of the topic name.
Definition message.hpp:37
RawData data
存储的数据。Stored data.
Definition message.hpp:39
LockFreeList subers
订阅者列表。List of subscribers.
Definition message.hpp:42
回调订阅块,继承自 SuberBlock Callback subscription block, inheriting from SuberBlock
Definition message.hpp:449
Callback cb
订阅的回调函数 Subscribed callback function
Definition message.hpp:450
队列订阅块,继承自 SuberBlock Queue subscription block, inheriting from SuberBlock
Definition message.hpp:338
void * queue
指向订阅队列的指针 Pointer to the subscribed queue
Definition message.hpp:339
void(* fun)(RawData &, void *, bool)
处理数据的回调函数 Callback function to handle data
Definition message.hpp:340
订阅者信息存储结构。Structure storing subscriber information.
Definition message.hpp:174
SuberType type
订阅者类型。Type of subscriber.
Definition message.hpp:175
同步订阅者存储结构。Structure storing synchronous subscriber data.
Definition message.hpp:183
Semaphore sem
信号量,用于同步等待数据。Semaphore for data synchronization.
Definition message.hpp:185
RawData buff
存储的数据缓冲区。Data buffer.
Definition message.hpp:184