libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
message.hpp
1#pragma once
2
3#include <atomic>
4#include <cstdint>
5
6#include "crc.hpp"
7#include "libxr_cb.hpp"
8#include "libxr_def.hpp"
9#include "libxr_type.hpp"
10#include "lock_queue.hpp"
11#include "lockfree_list.hpp"
12#include "lockfree_queue.hpp"
13#include "mutex.hpp"
14#include "queue.hpp"
15#include "rbt.hpp"
16#include "semaphore.hpp"
17#include "thread.hpp"
18
19namespace LibXR
20{
30class Topic
31{
32 enum class LockState : uint32_t
33 {
34 UNLOCKED = 0,
35 LOCKED = 1,
36 USE_MUTEX = UINT32_MAX,
37 };
38
39 public:
44 struct Block
45 {
46 std::atomic<LockState> busy;
48 uint32_t max_length;
49 uint32_t crc32;
52 bool cache;
54 };
55#ifndef __DOXYGEN__
56
57#pragma pack(push, 1)
62 struct PackedDataHeader
63 {
64 uint8_t prefix;
65 uint32_t
66 topic_name_crc32;
67 uint8_t data_len_raw[3];
68 uint8_t pack_header_crc8;
69
70 void SetDataLen(uint32_t len);
71
72 uint32_t GetDataLen() const;
73 };
74#pragma pack(pop)
75#pragma pack(push, 1)
84 template <typename Data>
85 class PackedData
86 {
87 public:
88#pragma pack(push, 1)
94 struct
95 {
96 PackedDataHeader header_;
97 uint8_t data_[sizeof(Data)];
98 } raw;
99
100 uint8_t crc8_;
101
102#pragma pack(pop)
103
110 PackedData &operator=(const Data &data)
111 {
112 LibXR::Memory::FastCopy(raw.data_, &data, sizeof(Data));
113 crc8_ = CRC8::Calculate(&raw, sizeof(raw));
114 return *this;
115 }
116
121 operator Data() { return *reinterpret_cast<Data *>(raw.data_); }
122
126 Data *operator->() { return reinterpret_cast<Data *>(raw.data_); }
127
132 const Data *operator->() const { return reinterpret_cast<const Data *>(raw.data_); }
133 };
134#pragma pack(pop)
135
136 static constexpr size_t PACK_BASE_SIZE = sizeof(PackedData<uint8_t>) - 1;
137
138#endif
139
146
153 static void Lock(TopicHandle topic);
154
160 static void Unlock(TopicHandle topic);
161
168 static void LockFromCallback(TopicHandle topic);
169
175 static void UnlockFromCallback(TopicHandle topic);
176
182 class Domain
183 {
184 public:
190 Domain(const char *name);
191
196 };
197
202 enum class SuberType : uint8_t
203 {
204 SYNC,
205 ASYNC,
206 QUEUE,
207 CALLBACK,
208 };
209
215 {
217 };
218
223 struct SyncBlock : public SuberBlock
224 {
227 };
228
235 template <typename Data>
237 {
238 public:
246 SyncSubscriber(const char *name, Data &data, Domain *domain = nullptr)
247 {
248 *this = SyncSubscriber<Data>(WaitTopic(name, UINT32_MAX, domain), data);
249 }
250
257 SyncSubscriber(Topic topic, Data &data)
258 {
259 if (topic.block_->data_.check_length)
260 {
261 ASSERT(topic.block_->data_.max_length == sizeof(Data));
262 }
263 else
264 {
265 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
266 }
267
270 block_->data_.buff = RawData(data);
271 topic.block_->data_.subers.Add(*block_);
272 }
273
279 ErrorCode Wait(uint32_t timeout = UINT32_MAX)
280 {
281 // TODO: Reset sem
282 return block_->data_.sem.Wait(timeout);
283 }
284
286 };
287
288 enum class ASyncSubscriberState : uint32_t
289 {
290 IDLE = 0,
291 WAITING = 1,
292 DATA_READY = UINT32_MAX
293 };
294
300 typedef struct ASyncBlock : public SuberBlock
301 {
303 std::atomic<ASyncSubscriberState> state;
304 } ASyncBlock;
305
312 template <typename Data>
314 {
315 public:
322 ASyncSubscriber(const char *name, Domain *domain = nullptr)
323 {
324 *this = ASyncSubscriber(WaitTopic(name, UINT32_MAX, domain));
325 }
326
333 {
334 if (topic.block_->data_.check_length)
335 {
336 ASSERT(topic.block_->data_.max_length == sizeof(Data));
337 }
338 else
339 {
340 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
341 }
342
345 block_->data_.buff = *(new Data);
346 topic.block_->data_.subers.Add(*block_);
347 }
348
358 {
359 return block_->data_.state.load(std::memory_order_acquire) ==
360 ASyncSubscriberState::DATA_READY;
361 }
362
369 Data &GetData()
370 {
371 block_->data_.state.store(ASyncSubscriberState::IDLE, std::memory_order_release);
372 return *reinterpret_cast<Data *>(block_->data_.buff.addr_);
373 }
374
380 {
381 block_->data_.state.store(ASyncSubscriberState::WAITING, std::memory_order_release);
382 }
383
385 };
386
392 typedef struct QueueBlock : public SuberBlock
393 {
394 void *queue;
395 void (*fun)(RawData &, void *,
396 bool);
397 } QueueBlock;
398
409 {
410 public:
411 template <typename Data, uint32_t Length>
412 QueuedSubscriber(const char *name, LockFreeQueue<Data> &queue,
413 Domain *domain = nullptr)
414 {
415 *this = QueuedSubscriber(WaitTopic(name, UINT32_MAX, domain), queue);
416 }
417
425 template <typename Data>
427 {
428 if (topic.block_->data_.check_length)
429 {
430 ASSERT(topic.block_->data_.max_length == sizeof(Data));
431 }
432 else
433 {
434 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
435 }
436
437 auto block = new LockFreeList::Node<QueueBlock>;
438 block->data_.type = SuberType::QUEUE;
439 block->data_.queue = &queue;
440 block->data_.fun = [](RawData &data, void *arg, bool in_isr)
441 {
442 UNUSED(in_isr);
443 LockFreeQueue<Data> *queue = reinterpret_cast<LockFreeQueue<Data> *>(arg);
444 queue->Push(*reinterpret_cast<Data *>(data.addr_));
445 };
446
447 topic.block_->data_.subers.Add(*block);
448 }
449 };
450
452
458 typedef struct CallbackBlock : public SuberBlock
459 {
462
468 void RegisterCallback(Callback &cb);
469
474 Topic();
475
490 Topic(const char *name, uint32_t max_length, Domain *domain = nullptr,
491 bool multi_publisher = false, bool cache = false, bool check_length = false);
492
507 template <typename Data>
508 static Topic CreateTopic(const char *name, Domain *domain = nullptr,
509 bool multi_publisher = false, bool cache = false,
510 bool check_length = false)
511 {
512 return Topic(name, sizeof(Data), domain, multi_publisher, cache, check_length);
513 }
514
520 Topic(TopicHandle topic);
521
530 static TopicHandle Find(const char *name, Domain *domain = nullptr);
531
545 template <typename Data>
546 static TopicHandle FindOrCreate(const char *name, Domain *domain = nullptr,
547 bool multi_publisher = false, bool cache = false,
548 bool check_length = false)
549 {
550 auto topic = Find(name, domain);
551 if (topic == nullptr)
552 {
553 topic =
554 CreateTopic<Data>(name, domain, multi_publisher, cache, check_length).block_;
555 }
556 return topic;
557 }
558
563 void EnableCache();
564
571 template <typename Data>
572 void Publish(Data &data)
573 {
574 Publish(&data, sizeof(Data));
575 }
576
583 void Publish(void *addr, uint32_t size);
584
592 template <typename Data>
593 void PublishFromCallback(Data &data, bool in_isr)
594 {
595 PublishFromCallback(&data, sizeof(Data), in_isr);
596 }
597
605 void PublishFromCallback(void *addr, uint32_t size, bool in_isr);
606
616 template <SizeLimitMode Mode = SizeLimitMode::MORE>
617 ErrorCode DumpData(RawData data, bool pack = false)
618 {
619 if (block_->data_.data.addr_ == nullptr)
620 {
621 return ErrorCode::EMPTY;
622 }
623
624 if (!pack)
625 {
627 Lock(block_);
628 LibXR::Memory::FastCopy(data.addr_, block_->data_.data.addr_,
629 block_->data_.data.size_);
630 Unlock(block_);
631 }
632 else
633 {
634 Assert::SizeLimitCheck<Mode>(PACK_BASE_SIZE + block_->data_.data.size_, data.size_);
635
636 Lock(block_);
637 PackData(block_->data_.crc32, data, block_->data_.data);
638 Unlock(block_);
639 }
640
641 return ErrorCode::OK;
642 }
643
651 static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source);
652
659 template <typename Data>
660 ErrorCode DumpData(PackedData<Data> &data)
661 {
662 if (block_->data_.data.addr_ == nullptr)
663 {
664 return ErrorCode::EMPTY;
665 }
666
667 ASSERT(sizeof(Data) == block_->data_.data.size_);
668
669 return DumpData<SizeLimitMode::NONE>(RawData(data), true);
670 }
671
678 template <typename Data>
679 ErrorCode DumpData(Data &data)
680 {
681 if (block_->data_.data.addr_ == nullptr)
682 {
683 return ErrorCode::EMPTY;
684 }
685
686 ASSERT(sizeof(Data) == block_->data_.data.size_);
687
688 return DumpData<SizeLimitMode::NONE>(data, false);
689 }
690
702 static TopicHandle WaitTopic(const char *name, uint32_t timeout = UINT32_MAX,
703 Domain *domain = nullptr);
704
710 operator TopicHandle() { return block_; }
711
718 uint32_t GetKey() const;
719
726 class Server
727 {
728 public:
734 enum class Status : uint8_t
735 {
736 WAIT_START,
737 WAIT_TOPIC,
739 };
740
746 Server(size_t buffer_length);
747
753 void Register(TopicHandle topic);
754
761 size_t ParseData(ConstRawData data);
762
763 private:
766 uint32_t data_len_ = 0;
771 };
772
773 private:
779
784 static inline RBTree<uint32_t> *domain_ = nullptr;
785
790 static inline Domain *def_domain_ = nullptr;
791};
792} // namespace LibXR
static void SizeLimitCheck(size_t limit, size_t size)
在非调试模式下的占位大小检查函数(无实际作用)。 Dummy size limit check for non-debug builds.
基础队列类,提供固定大小的循环缓冲区 (Base queue class providing a fixed-size circular buffer).
Definition queue.hpp:20
提供一个通用的回调包装,支持动态参数传递。 Provides a generic callback wrapper, supporting dynamic argument passing.
Definition libxr_cb.hpp:124
常量原始数据封装类。 A class for encapsulating constant raw data.
数据节点模板,继承自 BaseNode,用于存储具体数据类型。 Template data node that inherits from BaseNode to store specific data...
Data data_
存储的数据。 The stored data.
链表实现,用于存储和管理数据节点。 A linked list implementation for storing and managing data nodes.
无锁队列实现 / Lock-free queue implementation
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
static void FastCopy(void *dst, const void *src, size_t size)
快速内存拷贝 / Fast memory copy
Definition libxr_mem.cpp:17
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
Definition mutex.hpp:18
红黑树的泛型数据节点,继承自 BaseNode (Generic data node for Red-Black Tree, inheriting from BaseNode).
Definition rbt.hpp:64
Data data_
存储的数据 (Stored data).
Definition rbt.hpp:99
红黑树实现,支持泛型键和值,并提供线程安全操作 (Red-Black Tree implementation supporting generic keys and values with thread...
Definition rbt.hpp:24
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
信号量类,实现线程同步机制 Semaphore class implementing thread synchronization
Definition semaphore.hpp:23
异步订阅者类,用于订阅异步数据 Asynchronous subscriber class for subscribing to asynchronous data
Definition message.hpp:314
ASyncSubscriber(Topic topic)
构造函数,使用 Topic 进行初始化 Constructor using a Topic for initialization
Definition message.hpp:332
Data & GetData()
获取当前数据 Retrieves the current data
Definition message.hpp:369
LockFreeList::Node< ASyncBlock > * block_
订阅者数据块。Subscriber data block.
Definition message.hpp:384
bool Available()
检查数据是否可用 Checks if data is available
Definition message.hpp:357
ASyncSubscriber(const char *name, Domain *domain=nullptr)
构造函数,通过名称和数据创建订阅者 Constructor to create a subscriber with a name and data
Definition message.hpp:322
void StartWaiting()
开始等待数据更新 Starts waiting for data update
Definition message.hpp:379
主题域(Domain)管理器,用于组织多个主题。Domain manager for organizing multiple topics.
Definition message.hpp:183
Domain(const char *name)
构造函数,初始化或查找指定名称的主题域。Constructor initializing or looking up a domain by name.
Definition message.cpp:88
RBTree< uint32_t >::Node< RBTree< uint32_t > > * node_
指向该域的根节点。Pointer to the root node of the domain.
Definition message.hpp:195
构造函数,使用名称和无锁队列进行初始化 Constructor using a name and a lock-free queue
Definition message.hpp:409
QueuedSubscriber(Topic topic, LockFreeQueue< Data > &queue)
构造函数,使用 Topic 和无锁队列进行初始化 Constructor using a Topic and a lock-free queue
Definition message.hpp:426
服务器类,负责解析数据并将其分发到相应的主题 Server class responsible for parsing data and distributing it to corresponding...
Definition message.hpp:727
RBTree< uint32_t > topic_map_
主题映射表 Topic mapping table
Definition message.hpp:767
void Register(TopicHandle topic)
注册一个主题 Registers a topic
Definition message.cpp:410
BaseQueue queue_
数据队列 Data queue
Definition message.hpp:768
size_t ParseData(ConstRawData data)
解析接收到的数据 Parses received data
Definition message.cpp:416
TopicHandle current_topic_
当前主题句柄 Current topic handle
Definition message.hpp:770
uint32_t data_len_
当前数据长度 Current data length
Definition message.hpp:766
Status
服务器解析状态枚举 Enumeration of server parsing states
Definition message.hpp:735
@ WAIT_DATA_CRC
等待数据校验 Waiting for data CRC validation
@ WAIT_START
等待起始标志 Waiting for start flag
@ WAIT_TOPIC
等待主题信息 Waiting for topic information
RawData parse_buff_
解析数据缓冲区 Data buffer for parsing
Definition message.hpp:769
Server(size_t buffer_length)
构造函数,初始化服务器并分配缓冲区 Constructor to initialize the server and allocate buffer
Definition message.cpp:399
Status status_
服务器的当前解析状态 Current parsing state of the server
Definition message.hpp:764
同步订阅者类,允许同步方式接收数据。Synchronous subscriber class allowing data reception in a synchronous manner.
Definition message.hpp:237
SyncSubscriber(Topic topic, Data &data)
通过 Topic 句柄构造同步订阅者。Constructs a synchronous subscriber using a Topic handle.
Definition message.hpp:257
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待接收数据。Waits for data reception.
Definition message.hpp:279
LockFreeList::Node< SyncBlock > * block_
订阅者数据块。Subscriber data block.
Definition message.hpp:285
SyncSubscriber(const char *name, Data &data, Domain *domain=nullptr)
通过主题名称构造同步订阅者。Constructs a synchronous subscriber by topic name.
Definition message.hpp:246
主题(Topic)管理类 / Topic management class
Definition message.hpp:31
SuberType
订阅者类型。Subscriber type.
Definition message.hpp:203
@ SYNC
同步订阅者。Synchronous subscriber.
@ ASYNC
异步订阅者。Asynchronous subscriber.
@ QUEUE
队列订阅者。Queued subscriber.
@ CALLBACK
回调订阅者。Callback subscriber.
void Publish(Data &data)
发布数据 Publishes data
Definition message.hpp:572
static Domain * def_domain_
默认的主题域,所有未指定域的主题都会归入此域 Default domain where all topics without a specified domain are assigned
Definition message.hpp:790
static void Unlock(TopicHandle topic)
解锁主题。Unlock the topic.
Definition message.cpp:46
RBTree< uint32_t >::Node< Block > * TopicHandle
主题句柄,指向存储数据的红黑树节点。Handle pointing to a red-black tree node storing data.
Definition message.hpp:145
ErrorCode DumpData(Data &data)
转储数据到普通数据结构 Dumps data into a normal data structure
Definition message.hpp:679
ErrorCode DumpData(PackedData< Data > &data)
转储数据到 PackedData Dumps data into PackedData format
Definition message.hpp:660
static void Lock(TopicHandle topic)
锁定主题,防止其被多个订阅者同时访问。Lock the topic to prevent it from being accessed by multiple subscribers at the sa...
Definition message.cpp:28
static void UnlockFromCallback(TopicHandle topic)
从回调中解锁主题。Unlock the topic from a callback.
Definition message.cpp:76
static TopicHandle FindOrCreate(const char *name, Domain *domain=nullptr, bool multi_publisher=false, bool cache=false, bool check_length=false)
在指定域中查找或创建主题 Finds or creates a topic in the specified domain
Definition message.hpp:546
static void LockFromCallback(TopicHandle topic)
从回调中锁定主题,防止其被多个订阅者同时访问。Lock the topic from a callback to prevent it from being accessed by multiple s...
Definition message.cpp:58
ErrorCode DumpData(RawData data, bool pack=false)
转储数据 Dump data
Definition message.hpp:617
void EnableCache()
启用主题的缓存功能 Enables caching for the topic
Definition message.cpp:204
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待主题的创建并返回其句柄 Waits for a topic to be created and returns its handle
Definition message.cpp:368
void PublishFromCallback(Data &data, bool in_isr)
在回调函数中发布数据 Publishes data in a callback
Definition message.hpp:593
uint32_t GetKey() const
获取主题的键值 Gets the key value of the topic
Definition message.cpp:387
TopicHandle block_
主题句柄,指向当前主题的内存块 Topic handle pointing to the memory block of the current topic
Definition message.hpp:778
void RegisterCallback(Callback &cb)
注册回调函数 Registers a callback function
Definition message.cpp:117
static TopicHandle Find(const char *name, Domain *domain=nullptr)
在指定域中查找主题 Finds a topic in the specified domain
Definition message.cpp:192
static RBTree< uint32_t > * domain_
主题域的红黑树结构,存储不同的主题 Red-Black Tree structure for storing different topics in the domain
Definition message.hpp:784
static Topic CreateTopic(const char *name, Domain *domain=nullptr, bool multi_publisher=false, bool cache=false, bool check_length=false)
创建一个新的主题 Creates a new topic
Definition message.hpp:508
Topic()
默认构造函数,创建一个空的 Topic 实例 Default constructor, creates an empty Topic instance
Definition message.cpp:127
static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
打包数据
Definition message.cpp:351
LibXR 命名空间
异步订阅块,继承自 SuberBlock Asynchronous subscription block, inheriting from SuberBlock
Definition message.hpp:301
std::atomic< ASyncSubscriberState > state
订阅者状态 Subscriber state
Definition message.hpp:303
RawData buff
缓冲区数据 Buffer data
Definition message.hpp:302
存储主题(Topic)数据的结构体。Structure storing topic data.
Definition message.hpp:45
bool cache
是否启用数据缓存。Indicates whether data caching is enabled.
Definition message.hpp:52
bool check_length
是否检查数据长度。Indicates whether data length is checked.
Definition message.hpp:53
std::atomic< LockState > busy
是否忙碌。Indicates whether it is busy.
Definition message.hpp:46
uint32_t max_length
数据的最大长度。Maximum length of data.
Definition message.hpp:48
Mutex * mutex
线程同步互斥锁。Mutex for thread synchronization.
Definition message.hpp:50
uint32_t crc32
主题名称的 CRC32 校验码。CRC32 checksum of the topic name.
Definition message.hpp:49
RawData data
存储的数据。Stored data.
Definition message.hpp:51
LockFreeList subers
订阅者列表。List of subscribers.
Definition message.hpp:47
回调订阅块,继承自 SuberBlock Callback subscription block, inheriting from SuberBlock
Definition message.hpp:459
Callback cb
订阅的回调函数 Subscribed callback function
Definition message.hpp:460
队列订阅块,继承自 SuberBlock Queue subscription block, inheriting from SuberBlock
Definition message.hpp:393
void * queue
指向订阅队列的指针 Pointer to the subscribed queue
Definition message.hpp:394
void(* fun)(RawData &, void *, bool)
处理数据的回调函数 Callback function to handle data
Definition message.hpp:395
订阅者信息存储结构。Structure storing subscriber information.
Definition message.hpp:215
SuberType type
订阅者类型。Type of subscriber.
Definition message.hpp:216
同步订阅者存储结构。Structure storing synchronous subscriber data.
Definition message.hpp:224
Semaphore sem
信号量,用于同步等待数据。Semaphore for data synchronization.
Definition message.hpp:226
RawData buff
存储的数据缓冲区。Data buffer.
Definition message.hpp:225