5#include "libxr_def.hpp"
6#include "libxr_type.hpp"
7#include "lock_queue.hpp"
8#include "lockfree_list.hpp"
9#include "lockfree_queue.hpp"
13#include "semaphore.hpp"
51 struct PackedDataHeader
56 uint8_t data_len_raw[3];
57 uint8_t pack_header_crc8;
59 void SetDataLen(uint32_t len)
61 data_len_raw[0] =
static_cast<uint8_t
>(len >> 16);
62 data_len_raw[1] =
static_cast<uint8_t
>(len >> 8);
63 data_len_raw[2] =
static_cast<uint8_t
>(len);
66 uint32_t GetDataLen()
const
68 return static_cast<uint32_t
>(data_len_raw[0]) << 16 |
69 static_cast<uint32_t
>(data_len_raw[1]) << 8 |
70 static_cast<uint32_t
>(data_len_raw[2]);
83 template <
typename Data>
95 PackedDataHeader header_;
96 uint8_t data_[
sizeof(Data)];
109 PackedData &operator=(
const Data &data)
111 memcpy(raw.data_, &data,
sizeof(Data));
112 crc8_ = CRC8::Calculate(&raw,
sizeof(raw));
120 operator Data() {
return *
reinterpret_cast<Data *
>(raw.data_); }
125 Data *operator->() {
return reinterpret_cast<Data *
>(raw.data_); }
131 const Data *operator->()
const {
return reinterpret_cast<const Data *
>(raw.data_); }
135 static constexpr size_t PACK_BASE_SIZE =
sizeof(PackedData<uint8_t>) - 1;
167 {
return static_cast<int>(a) -
static_cast<int>(b); });
175 if (domain !=
nullptr)
182 [](
const uint32_t &a,
const uint32_t &b)
183 {
return static_cast<int>(a) -
static_cast<int>(b); });
231 template <
typename Data>
257 ASSERT(topic.
block_->
data_.max_length ==
sizeof(Data));
261 ASSERT(topic.
block_->
data_.max_length <=
sizeof(Data));
275 ErrorCode
Wait(uint32_t timeout = UINT32_MAX)
277 return block_->data_.sem.Wait(timeout);
301 template <
typename Data>
325 ASSERT(topic.
block_->
data_.max_length ==
sizeof(Data));
329 ASSERT(topic.
block_->
data_.max_length <=
sizeof(Data));
334 block_->data_.buff = *(
new Data);
356 block_->data_.data_ready =
false;
357 return *
reinterpret_cast<Data *
>(
block_->data_.buff.addr_);
393 template <
typename Data, u
int32_t Length>
407 template <
typename Data>
412 ASSERT(topic.
block_->
data_.max_length ==
sizeof(Data));
416 ASSERT(topic.
block_->
data_.max_length <=
sizeof(Data));
421 block->data_.queue = &queue;
422 block->data_.fun = [](
RawData &data,
void *arg,
bool in_isr)
426 queue->
Push(
reinterpret_cast<Data
>(data.
addr_));
440 template <
typename Data>
453 template <
typename Data>
458 ASSERT(topic.
block_->
data_.max_length ==
sizeof(Data));
462 ASSERT(topic.
block_->
data_.max_length <=
sizeof(Data));
467 block->data_.queue = &queue;
468 block->data_.fun = [](
RawData &data,
void *arg,
bool in_isr)
500 auto node =
new (std::align_val_t(LIBXR_CACHE_LINE_SIZE))
523 Topic(
const char *name, uint32_t max_length,
Domain *domain =
nullptr,
524 bool cache =
false,
bool check_length =
false)
534 if (domain ==
nullptr)
541 auto topic = domain->node_->data_.Search<
Block>(crc32);
545 ASSERT(topic->data_.max_length == max_length);
546 ASSERT(topic->data_.check_length == check_length);
559 domain->node_->data_.Insert(*
block_, crc32);
580 template <
typename Data>
582 bool check_length =
true)
584 return Topic(name,
sizeof(Data), domain, cache, check_length);
604 if (domain ==
nullptr)
611 return domain->node_->data_.Search<
Block>(crc32);
626 template <
typename Data>
628 bool cache =
false,
bool check_length =
true)
630 auto topic =
Find(name, domain);
631 if (topic ==
nullptr)
659 template <
typename Data>
680 ASSERT(size <= block_->data_.max_length);
702 auto sync =
reinterpret_cast<SyncBlock *
>(&block);
703 memcpy(sync->buff.addr_, data.
addr_, data.
size_);
709 auto async =
reinterpret_cast<ASyncBlock *
>(&block);
712 memcpy(async->buff.addr_, data.
addr_, data.
size_);
713 async->data_ready =
true;
719 auto queue_block =
reinterpret_cast<QueueBlock *
>(&block);
720 queue_block->
fun(data, queue_block->queue,
false);
726 cb_block->
cb.
Run(
false, data);
730 return ErrorCode::OK;
747 template <SizeLimitMode Mode = SizeLimitMode::MORE>
752 return ErrorCode::EMPTY;
771 return ErrorCode::OK;
783 PackedData<uint8_t> *pack =
reinterpret_cast<PackedData<uint8_t> *
>(buffer.
addr_);
785 memcpy(&pack->raw.data_, source.
addr_, source.
size_);
787 pack->raw.header_.prefix = 0xa5;
788 pack->raw.header_.topic_name_crc32 = topic_name_crc32;
789 pack->raw.header_.SetDataLen(source.
size_);
790 pack->raw.header_.pack_header_crc8 =
791 CRC8::Calculate(&pack->raw,
sizeof(PackedDataHeader) -
sizeof(uint8_t));
793 reinterpret_cast<uint8_t *
>(
reinterpret_cast<uint8_t *
>(pack) + PACK_BASE_SIZE +
794 source.
size_ -
sizeof(uint8_t));
804 template <
typename Data>
809 return ErrorCode::EMPTY;
823 template <
typename Data>
828 return ErrorCode::EMPTY;
853 topic =
Find(name, domain);
854 if (topic ==
nullptr)
862 }
while (topic ==
nullptr);
919 :
topic_map_([](const uint32_t &a, const uint32_t &b)
920 {
return static_cast<int>(a) -
static_cast<int>(b); }),
924 ASSERT(buffer_length > PACK_BASE_SIZE);
958 for (uint32_t i = 0; i < queue_size; i++)
980 if (
queue_.
Size() >=
sizeof(PackedDataHeader))
1027 data_len_ +
sizeof(PackedDataHeader) +
sizeof(uint8_t)))
1031 sizeof(PackedDataHeader);
static void SizeLimitCheck(size_t limit, size_t size)
在非调试模式下的占位大小检查函数(无实际作用)。 Dummy size limit check for non-debug builds.
基础队列类,提供固定大小的循环缓冲区 (Base queue class providing a fixed-size circular buffer).
ErrorCode Pop(void *data=nullptr)
移除队列头部的元素 (Pop the front element from the queue).
size_t length_
队列最大容量 (Maximum queue capacity).
ErrorCode PopBatch(void *data, size_t size)
批量移除多个元素 (Pop multiple elements from the queue).
size_t Size() const
获取队列中的元素个数 (Get the number of elements in the queue).
ErrorCode Peek(void *data)
获取队列头部的元素但不移除 (Peek at the front element without removing it).
ErrorCode PushBatch(const void *data, size_t size)
批量推入多个元素 (Push multiple elements into the queue).
static uint32_t Calculate(const void *raw, size_t len)
计算数据的 CRC32 校验码 / Computes the CRC32 checksum for the given data
static uint8_t Calculate(const void *raw, size_t len)
计算数据的 CRC8 校验码 / Computes the CRC8 checksum for the given data
static bool Verify(const void *raw, size_t len)
验证数据的 CRC8 校验码 / Verifies the CRC8 checksum of the given data
提供一个通用的回调包装,支持动态参数传递。 Provides a generic callback wrapper, supporting dynamic argument passing.
void Run(bool in_isr, PassArgs &&...args) const
执行回调函数,并传递参数。 Executes the callback function, passing the arguments.
常量原始数据封装类。 A class for encapsulating constant raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
const void * addr_
数据存储地址(常量)。 The storage address of the data (constant).
数据节点模板,继承自 BaseNode,用于存储具体数据类型。 Template data node that inherits from BaseNode to store specific data...
Data data_
存储的数据。 The stored data.
链表实现,用于存储和管理数据节点。 A linked list implementation for storing and managing data nodes.
无锁队列实现 / Lock-free queue implementation
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
线程安全的锁队列类,提供同步和异步操作支持 Thread-safe lock queue class with synchronous and asynchronous operation suppor...
ErrorCode PushFromCallback(const Data &data, bool in_isr)
从回调函数中推送数据 Pushes data into the queue from a callback function
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
Key key
节点键值 (Key associated with the node).
红黑树的泛型数据节点,继承自 BaseNode (Generic data node for Red-Black Tree, inheriting from BaseNode).
Data data_
存储的数据 (Stored data).
红黑树实现,支持泛型键和值,并提供线程安全操作 (Red-Black Tree implementation supporting generic keys and values with thread...
Node< Data > * Search(const Key &key)
搜索红黑树中的节点 (Search for a node in the Red-Black Tree).
void Insert(BaseNode &node, KeyType &&key)
在树中插入新节点 (Insert a new node into the tree).
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
信号量类,实现线程同步机制 Semaphore class implementing thread synchronization
static uint32_t GetTime()
获取当前系统时间(毫秒) Gets the current system time in milliseconds
static void Sleep(uint32_t milliseconds)
让线程进入休眠状态 Puts the thread to sleep
异步订阅者类,用于订阅异步数据 Asynchronous subscriber class for subscribing to asynchronous data
ASyncSubscriber(Topic topic)
构造函数,使用 Topic 进行初始化 Constructor using a Topic for initialization
Data & GetData()
获取当前数据 Retrieves the current data
LockFreeList::Node< ASyncBlock > * block_
订阅者数据块。Subscriber data block.
bool Available()
检查数据是否可用 Checks if data is available
ASyncSubscriber(const char *name, Domain *domain=nullptr)
构造函数,通过名称和数据创建订阅者 Constructor to create a subscriber with a name and data
void StartWaiting()
开始等待数据更新 Starts waiting for data update
主题域(Domain)管理器,用于组织多个主题。Domain manager for organizing multiple topics.
Domain(const char *name)
构造函数,初始化或查找指定名称的主题域。Constructor initializing or looking up a domain by name.
RBTree< uint32_t >::Node< RBTree< uint32_t > > * node_
指向该域的根节点。Pointer to the root node of the domain.
构造函数,使用名称和无锁队列进行初始化 Constructor using a name and a lock-free queue
QueuedSubscriber(Topic topic, LockFreeQueue< Data > &queue)
构造函数,使用 Topic 和无锁队列进行初始化 Constructor using a Topic and a lock-free queue
QueuedSubscriber(const char *name, LockQueue< Data > &queue, Domain *domain=nullptr)
构造函数,使用名称和带锁队列进行初始化 Constructor using a name and a locked queue
QueuedSubscriber(Topic topic, LockQueue< Data > &queue)
构造函数,使用 Topic 和带锁队列进行初始化 Constructor using a Topic and a locked queue
服务器类,负责解析数据并将其分发到相应的主题 Server class responsible for parsing data and distributing it to corresponding...
RBTree< uint32_t > topic_map_
主题映射表 Topic mapping table
void Register(TopicHandle topic)
注册一个主题 Registers a topic
Server(size_t buffer_length)
构造函数,初始化服务器并分配缓冲区 Constructor to initialize the server and allocate buffer
BaseQueue queue_
数据队列 Data queue
size_t ParseData(ConstRawData data)
解析接收到的数据 Parses received data
TopicHandle current_topic_
当前主题句柄 Current topic handle
uint32_t data_len_
当前数据长度 Current data length
Status
服务器解析状态枚举 Enumeration of server parsing states
@ WAIT_DATA_CRC
等待数据校验 Waiting for data CRC validation
@ WAIT_START
等待起始标志 Waiting for start flag
@ WAIT_TOPIC
等待主题信息 Waiting for topic information
RawData parse_buff_
解析数据缓冲区 Data buffer for parsing
Status status_
服务器的当前解析状态 Current parsing state of the server
同步订阅者类,允许同步方式接收数据。Synchronous subscriber class allowing data reception in a synchronous manner.
SyncSubscriber(Topic topic, Data &data)
通过 Topic 句柄构造同步订阅者。Constructs a synchronous subscriber using a Topic handle.
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待接收数据。Waits for data reception.
LockFreeList::Node< SyncBlock > * block_
订阅者数据块。Subscriber data block.
SyncSubscriber(const char *name, Data &data, Domain *domain=nullptr)
通过主题名称构造同步订阅者。Constructs a synchronous subscriber by topic name.
主题(Topic)管理类 / Topic management class
void Publish(void *addr, uint32_t size)
以原始地址和大小发布数据 Publishes data using raw address and size
uint32_t GetKey() const
获取主题的键值 Gets the key value of the topic
SuberType
订阅者类型。Subscriber type.
@ SYNC
同步订阅者。Synchronous subscriber.
@ ASYNC
异步订阅者。Asynchronous subscriber.
@ QUEUE
队列订阅者。Queued subscriber.
@ CALLBACK
回调订阅者。Callback subscriber.
void Publish(Data &data)
发布数据 Publishes data
Topic(const char *name, uint32_t max_length, Domain *domain=nullptr, bool cache=false, bool check_length=false)
构造函数,使用指定名称、最大长度、域及其他选项初始化主题 Constructor to initialize a topic with the specified name,...
static Domain * def_domain_
默认的主题域,所有未指定域的主题都会归入此域 Default domain where all topics without a specified domain are assigned
static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
打包数据
RBTree< uint32_t >::Node< Block > * TopicHandle
主题句柄,指向存储数据的红黑树节点。Handle pointing to a red-black tree node storing data.
ErrorCode DumpData(Data &data)
转储数据到普通数据结构 Dumps data into a normal data structure
static Topic CreateTopic(const char *name, Domain *domain=nullptr, bool cache=false, bool check_length=true)
创建一个新的主题 Creates a new topic
static TopicHandle FindOrCreate(const char *name, Domain *domain=nullptr, bool cache=false, bool check_length=true)
在指定域中查找或创建主题 Finds or creates a topic in the specified domain
Topic()
默认构造函数,创建一个空的 Topic 实例 Default constructor, creates an empty Topic instance
void EnableCache()
启用主题的缓存功能 Enables caching for the topic
ErrorCode DumpData(PackedData< Data > &data)
转储数据到 PackedData Dumps data into PackedData format
static TopicHandle Find(const char *name, Domain *domain=nullptr)
在指定域中查找主题 Finds a topic in the specified domain
Topic(TopicHandle topic)
通过句柄构造主题 Constructs a topic from a topic handle
ErrorCode DumpData(RawData data, bool pack=false)
转储数据 Dump data
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待主题的创建并返回其句柄 Waits for a topic to be created and returns its handle
TopicHandle block_
主题句柄,指向当前主题的内存块 Topic handle pointing to the memory block of the current topic
void RegisterCallback(Callback &cb)
注册回调函数 Registers a callback function
static RBTree< uint32_t > * domain_
主题域的红黑树结构,存储不同的主题 Red-Black Tree structure for storing different topics in the domain
异步订阅块,继承自 SuberBlock Asynchronous subscription block, inheriting from SuberBlock
RawData buff
缓冲区数据 Buffer data
bool waiting
等待中标志 Waiting flag
bool data_ready
数据就绪标志 Data ready flag
存储主题(Topic)数据的结构体。Structure storing topic data.
bool cache
是否启用数据缓存。Indicates whether data caching is enabled.
Mutex mutex
线程同步互斥锁。Mutex for thread synchronization.
bool check_length
是否检查数据长度。Indicates whether data length is checked.
uint32_t max_length
数据的最大长度。Maximum length of data.
uint32_t crc32
主题名称的 CRC32 校验码。CRC32 checksum of the topic name.
RawData data
存储的数据。Stored data.
LockFreeList subers
订阅者列表。List of subscribers.
回调订阅块,继承自 SuberBlock Callback subscription block, inheriting from SuberBlock
Callback cb
订阅的回调函数 Subscribed callback function
队列订阅块,继承自 SuberBlock Queue subscription block, inheriting from SuberBlock
void * queue
指向订阅队列的指针 Pointer to the subscribed queue
void(* fun)(RawData &, void *, bool)
处理数据的回调函数 Callback function to handle data
订阅者信息存储结构。Structure storing subscriber information.
SuberType type
订阅者类型。Type of subscriber.
同步订阅者存储结构。Structure storing synchronous subscriber data.
Semaphore sem
信号量,用于同步等待数据。Semaphore for data synchronization.
RawData buff
存储的数据缓冲区。Data buffer.