libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
message.hpp
1#pragma once
2
3#include <atomic>
4#include <cstdint>
5
6#include "crc.hpp"
7#include "libxr_cb.hpp"
8#include "libxr_def.hpp"
9#include "libxr_type.hpp"
10#include "lock_queue.hpp"
11#include "lockfree_list.hpp"
12#include "lockfree_queue.hpp"
13#include "mutex.hpp"
14#include "queue.hpp"
15#include "rbt.hpp"
16#include "semaphore.hpp"
17#include "thread.hpp"
18
19namespace LibXR
20{
30class Topic
31{
32 enum class LockState : uint32_t
33 {
34 UNLOCKED = 0,
35 LOCKED = 1,
36 USE_MUTEX = UINT32_MAX,
37 };
38
39 public:
44 struct Block
45 {
46 std::atomic<LockState> busy;
48 uint32_t max_length;
49 uint32_t crc32;
52 bool cache;
54 };
55#ifndef __DOXYGEN__
56
57#pragma pack(push, 1)
62 struct PackedDataHeader
63 {
64 uint8_t prefix;
65 uint32_t
66 topic_name_crc32;
67 uint8_t data_len_raw[3];
68 uint8_t pack_header_crc8;
69
70 void SetDataLen(uint32_t len);
71
72 uint32_t GetDataLen() const;
73 };
74#pragma pack(pop)
75#pragma pack(push, 1)
84 template <typename Data>
85 class PackedData
86 {
87 public:
88#pragma pack(push, 1)
94 struct
95 {
96 PackedDataHeader header_;
97 uint8_t data_[sizeof(Data)];
98 } raw;
99
100 uint8_t crc8_;
101
102#pragma pack(pop)
103
110 PackedData &operator=(const Data &data)
111 {
112 memcpy(raw.data_, &data, sizeof(Data));
113 crc8_ = CRC8::Calculate(&raw, sizeof(raw));
114 return *this;
115 }
116
121 operator Data() { return *reinterpret_cast<Data *>(raw.data_); }
122
126 Data *operator->() { return reinterpret_cast<Data *>(raw.data_); }
127
132 const Data *operator->() const { return reinterpret_cast<const Data *>(raw.data_); }
133 };
134#pragma pack(pop)
135
136 static constexpr size_t PACK_BASE_SIZE = sizeof(PackedData<uint8_t>) - 1;
137
138#endif
139
146
153 static void Lock(TopicHandle topic);
154
160 static void Unlock(TopicHandle topic);
161
168 static void LockFromCallback(TopicHandle topic);
169
175 static void UnlockFromCallback(TopicHandle topic);
176
182 class Domain
183 {
184 public:
190 Domain(const char *name);
191
196 };
197
202 enum class SuberType : uint8_t
203 {
204 SYNC,
205 ASYNC,
206 QUEUE,
207 CALLBACK,
208 };
209
215 {
217 };
218
223 struct SyncBlock : public SuberBlock
224 {
227 };
228
235 template <typename Data>
237 {
238 public:
246 SyncSubscriber(const char *name, Data &data, Domain *domain = nullptr)
247 {
248 *this = SyncSubscriber<Data>(WaitTopic(name, UINT32_MAX, domain), data);
249 }
250
257 SyncSubscriber(Topic topic, Data &data)
258 {
259 if (topic.block_->data_.check_length)
260 {
261 ASSERT(topic.block_->data_.max_length == sizeof(Data));
262 }
263 else
264 {
265 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
266 }
267
270 block_->data_.buff = RawData(data);
271 topic.block_->data_.subers.Add(*block_);
272 }
273
279 ErrorCode Wait(uint32_t timeout = UINT32_MAX)
280 {
281 // TODO: Reset sem
282 return block_->data_.sem.Wait(timeout);
283 }
284
286 };
287
288 enum class ASyncSubscriberState : uint32_t
289 {
290 IDLE = 0,
291 WAITING = 1,
292 DATA_READY = UINT32_MAX
293 };
294
300 typedef struct ASyncBlock : public SuberBlock
301 {
303 std::atomic<ASyncSubscriberState> state;
304 } ASyncBlock;
305
312 template <typename Data>
314 {
315 public:
322 ASyncSubscriber(const char *name, Domain *domain = nullptr)
323 {
324 *this = ASyncSubscriber(WaitTopic(name, UINT32_MAX, domain));
325 }
326
333 {
334 if (topic.block_->data_.check_length)
335 {
336 ASSERT(topic.block_->data_.max_length == sizeof(Data));
337 }
338 else
339 {
340 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
341 }
342
345 block_->data_.buff = *(new Data);
346 topic.block_->data_.subers.Add(*block_);
347 }
348
358 {
359 return block_->data_.state.load(std::memory_order_acquire) ==
360 ASyncSubscriberState::DATA_READY;
361 }
362
369 Data &GetData()
370 {
371 block_->data_.state.store(ASyncSubscriberState::IDLE, std::memory_order_release);
372 return *reinterpret_cast<Data *>(block_->data_.buff.addr_);
373 }
374
380 {
381 block_->data_.state.store(ASyncSubscriberState::WAITING, std::memory_order_release);
382 }
383
385 };
386
392 typedef struct QueueBlock : public SuberBlock
393 {
394 void *queue;
395 void (*fun)(RawData &, void *,
396 bool);
397 } QueueBlock;
398
409 {
410 public:
411 template <typename Data, uint32_t Length>
412 QueuedSubscriber(const char *name, LockFreeQueue<Data> &queue,
413 Domain *domain = nullptr)
414 {
415 *this = QueuedSubscriber(WaitTopic(name, UINT32_MAX, domain), queue);
416 }
417
425 template <typename Data>
427 {
428 if (topic.block_->data_.check_length)
429 {
430 ASSERT(topic.block_->data_.max_length == sizeof(Data));
431 }
432 else
433 {
434 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
435 }
436
437 auto block = new LockFreeList::Node<QueueBlock>;
438 block->data_.type = SuberType::QUEUE;
439 block->data_.queue = &queue;
440 block->data_.fun = [](RawData &data, void *arg, bool in_isr)
441 {
442 UNUSED(in_isr);
443 LockFreeQueue<Data>* queue = reinterpret_cast<LockFreeQueue<Data>*>(arg);
444 queue->Push(*reinterpret_cast<Data*>(data.addr_));
445 };
446
447 topic.block_->data_.subers.Add(*block);
448 }
449 };
450
452
458 typedef struct CallbackBlock : public SuberBlock
459 {
462
468 void RegisterCallback(Callback &cb);
469
474 Topic();
475
490 Topic(const char *name, uint32_t max_length, Domain *domain = nullptr,
491 bool multi_publisher = false, bool cache = false, bool check_length = false);
492
507 template <typename Data>
508 static Topic CreateTopic(const char *name, Domain *domain = nullptr,
509 bool multi_publisher = false, bool cache = false,
510 bool check_length = true)
511 {
512 return Topic(name, sizeof(Data), domain, multi_publisher, cache, check_length);
513 }
514
520 Topic(TopicHandle topic);
521
530 static TopicHandle Find(const char *name, Domain *domain = nullptr);
531
544 template <typename Data>
545 static TopicHandle FindOrCreate(const char *name, Domain *domain = nullptr,
546 bool cache = false, bool check_length = true)
547 {
548 auto topic = Find(name, domain);
549 if (topic == nullptr)
550 {
551 topic = CreateTopic<Data>(name, domain, cache, check_length).block_;
552 }
553 return topic;
554 }
555
560 void EnableCache();
561
568 template <typename Data>
569 void Publish(Data &data)
570 {
571 Publish(&data, sizeof(Data));
572 }
573
580 void Publish(void *addr, uint32_t size);
581
589 template <typename Data>
590 void PublishFromCallback(Data &data, bool in_isr)
591 {
592 PublishFromCallback(&data, sizeof(Data), in_isr);
593 }
594
602 void PublishFromCallback(void *addr, uint32_t size, bool in_isr);
603
613 template <SizeLimitMode Mode = SizeLimitMode::MORE>
614 ErrorCode DumpData(RawData data, bool pack = false)
615 {
616 if (block_->data_.data.addr_ == nullptr)
617 {
618 return ErrorCode::EMPTY;
619 }
620
621 if (!pack)
622 {
624 Lock(block_);
625 memcpy(data.addr_, block_->data_.data.addr_, block_->data_.data.size_);
626 Unlock(block_);
627 }
628 else
629 {
630 Assert::SizeLimitCheck<Mode>(PACK_BASE_SIZE + block_->data_.data.size_, data.size_);
631
632 Lock(block_);
633 PackData(block_->data_.crc32, data, block_->data_.data);
634 Unlock(block_);
635 }
636
637 return ErrorCode::OK;
638 }
639
647 static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source);
648
655 template <typename Data>
656 ErrorCode DumpData(PackedData<Data> &data)
657 {
658 if (block_->data_.data.addr_ == nullptr)
659 {
660 return ErrorCode::EMPTY;
661 }
662
663 ASSERT(sizeof(Data) == block_->data_.data.size_);
664
665 return DumpData<SizeLimitMode::NONE>(RawData(data), true);
666 }
667
674 template <typename Data>
675 ErrorCode DumpData(Data &data)
676 {
677 if (block_->data_.data.addr_ == nullptr)
678 {
679 return ErrorCode::EMPTY;
680 }
681
682 ASSERT(sizeof(Data) == block_->data_.data.size_);
683
684 return DumpData<SizeLimitMode::NONE>(data, false);
685 }
686
698 static TopicHandle WaitTopic(const char *name, uint32_t timeout = UINT32_MAX,
699 Domain *domain = nullptr);
700
706 operator TopicHandle() { return block_; }
707
714 uint32_t GetKey() const;
715
722 class Server
723 {
724 public:
730 enum class Status : uint8_t
731 {
732 WAIT_START,
733 WAIT_TOPIC,
735 };
736
742 Server(size_t buffer_length);
743
749 void Register(TopicHandle topic);
750
757 size_t ParseData(ConstRawData data);
758
759 private:
762 uint32_t data_len_ = 0;
767 };
768
769 private:
775
780 static inline RBTree<uint32_t> *domain_ = nullptr;
781
786 static inline Domain *def_domain_ = nullptr;
787};
788} // namespace LibXR
static void SizeLimitCheck(size_t limit, size_t size)
在非调试模式下的占位大小检查函数(无实际作用)。 Dummy size limit check for non-debug builds.
基础队列类,提供固定大小的循环缓冲区 (Base queue class providing a fixed-size circular buffer).
Definition queue.hpp:20
提供一个通用的回调包装,支持动态参数传递。 Provides a generic callback wrapper, supporting dynamic argument passing.
Definition libxr_cb.hpp:124
常量原始数据封装类。 A class for encapsulating constant raw data.
数据节点模板,继承自 BaseNode,用于存储具体数据类型。 Template data node that inherits from BaseNode to store specific data...
Data data_
存储的数据。 The stored data.
链表实现,用于存储和管理数据节点。 A linked list implementation for storing and managing data nodes.
无锁队列实现 / Lock-free queue implementation
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
Definition mutex.hpp:18
红黑树的泛型数据节点,继承自 BaseNode (Generic data node for Red-Black Tree, inheriting from BaseNode).
Definition rbt.hpp:64
Data data_
存储的数据 (Stored data).
Definition rbt.hpp:99
红黑树实现,支持泛型键和值,并提供线程安全操作 (Red-Black Tree implementation supporting generic keys and values with thread...
Definition rbt.hpp:24
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
信号量类,实现线程同步机制 Semaphore class implementing thread synchronization
Definition semaphore.hpp:23
异步订阅者类,用于订阅异步数据 Asynchronous subscriber class for subscribing to asynchronous data
Definition message.hpp:314
ASyncSubscriber(Topic topic)
构造函数,使用 Topic 进行初始化 Constructor using a Topic for initialization
Definition message.hpp:332
Data & GetData()
获取当前数据 Retrieves the current data
Definition message.hpp:369
LockFreeList::Node< ASyncBlock > * block_
订阅者数据块。Subscriber data block.
Definition message.hpp:384
bool Available()
检查数据是否可用 Checks if data is available
Definition message.hpp:357
ASyncSubscriber(const char *name, Domain *domain=nullptr)
构造函数,通过名称和数据创建订阅者 Constructor to create a subscriber with a name and data
Definition message.hpp:322
void StartWaiting()
开始等待数据更新 Starts waiting for data update
Definition message.hpp:379
主题域(Domain)管理器,用于组织多个主题。Domain manager for organizing multiple topics.
Definition message.hpp:183
Domain(const char *name)
构造函数,初始化或查找指定名称的主题域。Constructor initializing or looking up a domain by name.
Definition message.cpp:88
RBTree< uint32_t >::Node< RBTree< uint32_t > > * node_
指向该域的根节点。Pointer to the root node of the domain.
Definition message.hpp:195
构造函数,使用名称和无锁队列进行初始化 Constructor using a name and a lock-free queue
Definition message.hpp:409
QueuedSubscriber(Topic topic, LockFreeQueue< Data > &queue)
构造函数,使用 Topic 和无锁队列进行初始化 Constructor using a Topic and a lock-free queue
Definition message.hpp:426
服务器类,负责解析数据并将其分发到相应的主题 Server class responsible for parsing data and distributing it to corresponding...
Definition message.hpp:723
RBTree< uint32_t > topic_map_
主题映射表 Topic mapping table
Definition message.hpp:763
void Register(TopicHandle topic)
注册一个主题 Registers a topic
Definition message.cpp:410
BaseQueue queue_
数据队列 Data queue
Definition message.hpp:764
size_t ParseData(ConstRawData data)
解析接收到的数据 Parses received data
Definition message.cpp:416
TopicHandle current_topic_
当前主题句柄 Current topic handle
Definition message.hpp:766
uint32_t data_len_
当前数据长度 Current data length
Definition message.hpp:762
Status
服务器解析状态枚举 Enumeration of server parsing states
Definition message.hpp:731
@ WAIT_DATA_CRC
等待数据校验 Waiting for data CRC validation
@ WAIT_START
等待起始标志 Waiting for start flag
@ WAIT_TOPIC
等待主题信息 Waiting for topic information
RawData parse_buff_
解析数据缓冲区 Data buffer for parsing
Definition message.hpp:765
Server(size_t buffer_length)
构造函数,初始化服务器并分配缓冲区 Constructor to initialize the server and allocate buffer
Definition message.cpp:399
Status status_
服务器的当前解析状态 Current parsing state of the server
Definition message.hpp:760
同步订阅者类,允许同步方式接收数据。Synchronous subscriber class allowing data reception in a synchronous manner.
Definition message.hpp:237
SyncSubscriber(Topic topic, Data &data)
通过 Topic 句柄构造同步订阅者。Constructs a synchronous subscriber using a Topic handle.
Definition message.hpp:257
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待接收数据。Waits for data reception.
Definition message.hpp:279
LockFreeList::Node< SyncBlock > * block_
订阅者数据块。Subscriber data block.
Definition message.hpp:285
SyncSubscriber(const char *name, Data &data, Domain *domain=nullptr)
通过主题名称构造同步订阅者。Constructs a synchronous subscriber by topic name.
Definition message.hpp:246
主题(Topic)管理类 / Topic management class
Definition message.hpp:31
SuberType
订阅者类型。Subscriber type.
Definition message.hpp:203
@ SYNC
同步订阅者。Synchronous subscriber.
@ ASYNC
异步订阅者。Asynchronous subscriber.
@ QUEUE
队列订阅者。Queued subscriber.
@ CALLBACK
回调订阅者。Callback subscriber.
void Publish(Data &data)
发布数据 Publishes data
Definition message.hpp:569
static Domain * def_domain_
默认的主题域,所有未指定域的主题都会归入此域 Default domain where all topics without a specified domain are assigned
Definition message.hpp:786
static void Unlock(TopicHandle topic)
解锁主题。Unlock the topic.
Definition message.cpp:46
RBTree< uint32_t >::Node< Block > * TopicHandle
主题句柄,指向存储数据的红黑树节点。Handle pointing to a red-black tree node storing data.
Definition message.hpp:145
ErrorCode DumpData(Data &data)
转储数据到普通数据结构 Dumps data into a normal data structure
Definition message.hpp:675
static TopicHandle FindOrCreate(const char *name, Domain *domain=nullptr, bool cache=false, bool check_length=true)
在指定域中查找或创建主题 Finds or creates a topic in the specified domain
Definition message.hpp:545
ErrorCode DumpData(PackedData< Data > &data)
转储数据到 PackedData Dumps data into PackedData format
Definition message.hpp:656
static void Lock(TopicHandle topic)
锁定主题,防止其被多个订阅者同时访问。Lock the topic to prevent it from being accessed by multiple subscribers at the sa...
Definition message.cpp:28
static void UnlockFromCallback(TopicHandle topic)
从回调中解锁主题。Unlock the topic from a callback.
Definition message.cpp:76
static void LockFromCallback(TopicHandle topic)
从回调中锁定主题,防止其被多个订阅者同时访问。Lock the topic from a callback to prevent it from being accessed by multiple s...
Definition message.cpp:58
ErrorCode DumpData(RawData data, bool pack=false)
转储数据 Dump data
Definition message.hpp:614
void EnableCache()
启用主题的缓存功能 Enables caching for the topic
Definition message.cpp:204
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待主题的创建并返回其句柄 Waits for a topic to be created and returns its handle
Definition message.cpp:368
void PublishFromCallback(Data &data, bool in_isr)
在回调函数中发布数据 Publishes data in a callback
Definition message.hpp:590
uint32_t GetKey() const
获取主题的键值 Gets the key value of the topic
Definition message.cpp:387
static Topic CreateTopic(const char *name, Domain *domain=nullptr, bool multi_publisher=false, bool cache=false, bool check_length=true)
创建一个新的主题 Creates a new topic
Definition message.hpp:508
TopicHandle block_
主题句柄,指向当前主题的内存块 Topic handle pointing to the memory block of the current topic
Definition message.hpp:774
void RegisterCallback(Callback &cb)
注册回调函数 Registers a callback function
Definition message.cpp:117
static TopicHandle Find(const char *name, Domain *domain=nullptr)
在指定域中查找主题 Finds a topic in the specified domain
Definition message.cpp:192
static RBTree< uint32_t > * domain_
主题域的红黑树结构,存储不同的主题 Red-Black Tree structure for storing different topics in the domain
Definition message.hpp:780
Topic()
默认构造函数,创建一个空的 Topic 实例 Default constructor, creates an empty Topic instance
Definition message.cpp:127
static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
打包数据
Definition message.cpp:351
LibXR 命名空间
Definition ch32_gpio.hpp:9
异步订阅块,继承自 SuberBlock Asynchronous subscription block, inheriting from SuberBlock
Definition message.hpp:301
std::atomic< ASyncSubscriberState > state
订阅者状态 Subscriber state
Definition message.hpp:303
RawData buff
缓冲区数据 Buffer data
Definition message.hpp:302
存储主题(Topic)数据的结构体。Structure storing topic data.
Definition message.hpp:45
bool cache
是否启用数据缓存。Indicates whether data caching is enabled.
Definition message.hpp:52
bool check_length
是否检查数据长度。Indicates whether data length is checked.
Definition message.hpp:53
std::atomic< LockState > busy
是否忙碌。Indicates whether it is busy.
Definition message.hpp:46
uint32_t max_length
数据的最大长度。Maximum length of data.
Definition message.hpp:48
Mutex * mutex
线程同步互斥锁。Mutex for thread synchronization.
Definition message.hpp:50
uint32_t crc32
主题名称的 CRC32 校验码。CRC32 checksum of the topic name.
Definition message.hpp:49
RawData data
存储的数据。Stored data.
Definition message.hpp:51
LockFreeList subers
订阅者列表。List of subscribers.
Definition message.hpp:47
回调订阅块,继承自 SuberBlock Callback subscription block, inheriting from SuberBlock
Definition message.hpp:459
Callback cb
订阅的回调函数 Subscribed callback function
Definition message.hpp:460
队列订阅块,继承自 SuberBlock Queue subscription block, inheriting from SuberBlock
Definition message.hpp:393
void * queue
指向订阅队列的指针 Pointer to the subscribed queue
Definition message.hpp:394
void(* fun)(RawData &, void *, bool)
处理数据的回调函数 Callback function to handle data
Definition message.hpp:395
订阅者信息存储结构。Structure storing subscriber information.
Definition message.hpp:215
SuberType type
订阅者类型。Type of subscriber.
Definition message.hpp:216
同步订阅者存储结构。Structure storing synchronous subscriber data.
Definition message.hpp:224
Semaphore sem
信号量,用于同步等待数据。Semaphore for data synchronization.
Definition message.hpp:226
RawData buff
存储的数据缓冲区。Data buffer.
Definition message.hpp:225