8#include "libxr_def.hpp"
9#include "libxr_type.hpp"
10#include "lock_queue.hpp"
11#include "lockfree_list.hpp"
12#include "lockfree_queue.hpp"
16#include "semaphore.hpp"
32 enum class LockState : uint32_t
36 USE_MUTEX = UINT32_MAX,
46 std::atomic<LockState>
busy;
62 struct PackedDataHeader
67 uint8_t data_len_raw[3];
68 uint8_t pack_header_crc8;
70 void SetDataLen(uint32_t len);
72 uint32_t GetDataLen()
const;
84 template <
typename Data>
96 PackedDataHeader header_;
97 uint8_t data_[
sizeof(Data)];
110 PackedData &operator=(
const Data &data)
112 memcpy(raw.data_, &data,
sizeof(Data));
113 crc8_ = CRC8::Calculate(&raw,
sizeof(raw));
121 operator Data() {
return *
reinterpret_cast<Data *
>(raw.data_); }
126 Data *operator->() {
return reinterpret_cast<Data *
>(raw.data_); }
132 const Data *operator->()
const {
return reinterpret_cast<const Data *
>(raw.data_); }
136 static constexpr size_t PACK_BASE_SIZE =
sizeof(PackedData<uint8_t>) - 1;
235 template <
typename Data>
261 ASSERT(topic.
block_->
data_.max_length ==
sizeof(Data));
265 ASSERT(topic.
block_->
data_.max_length <=
sizeof(Data));
279 ErrorCode
Wait(uint32_t timeout = UINT32_MAX)
282 return block_->data_.sem.Wait(timeout);
288 enum class ASyncSubscriberState : uint32_t
292 DATA_READY = UINT32_MAX
303 std::atomic<ASyncSubscriberState>
state;
312 template <
typename Data>
336 ASSERT(topic.
block_->
data_.max_length ==
sizeof(Data));
340 ASSERT(topic.
block_->
data_.max_length <=
sizeof(Data));
345 block_->data_.buff = *(
new Data);
359 return block_->data_.state.load(std::memory_order_acquire) ==
360 ASyncSubscriberState::DATA_READY;
371 block_->data_.state.store(ASyncSubscriberState::IDLE, std::memory_order_release);
372 return *
reinterpret_cast<Data *
>(
block_->data_.buff.addr_);
381 block_->data_.state.store(ASyncSubscriberState::WAITING, std::memory_order_release);
411 template <
typename Data, u
int32_t Length>
425 template <
typename Data>
430 ASSERT(topic.
block_->
data_.max_length ==
sizeof(Data));
434 ASSERT(topic.
block_->
data_.max_length <=
sizeof(Data));
439 block->data_.queue = &queue;
440 block->data_.fun = [](
RawData &data,
void *arg,
bool in_isr)
444 queue->
Push(*
reinterpret_cast<Data*
>(data.
addr_));
490 Topic(
const char *name, uint32_t max_length,
Domain *domain =
nullptr,
491 bool multi_publisher =
false,
bool cache =
false,
bool check_length =
false);
507 template <
typename Data>
509 bool multi_publisher =
false,
bool cache =
false,
510 bool check_length =
true)
512 return Topic(name,
sizeof(Data), domain, multi_publisher, cache, check_length);
544 template <
typename Data>
546 bool cache =
false,
bool check_length =
true)
548 auto topic =
Find(name, domain);
549 if (topic ==
nullptr)
568 template <
typename Data>
580 void Publish(
void *addr, uint32_t size);
589 template <
typename Data>
613 template <SizeLimitMode Mode = SizeLimitMode::MORE>
618 return ErrorCode::EMPTY;
637 return ErrorCode::OK;
655 template <
typename Data>
660 return ErrorCode::EMPTY;
674 template <
typename Data>
679 return ErrorCode::EMPTY;
699 Domain *domain =
nullptr);
742 Server(
size_t buffer_length);
static void SizeLimitCheck(size_t limit, size_t size)
在非调试模式下的占位大小检查函数(无实际作用)。 Dummy size limit check for non-debug builds.
基础队列类,提供固定大小的循环缓冲区 (Base queue class providing a fixed-size circular buffer).
提供一个通用的回调包装,支持动态参数传递。 Provides a generic callback wrapper, supporting dynamic argument passing.
常量原始数据封装类。 A class for encapsulating constant raw data.
数据节点模板,继承自 BaseNode,用于存储具体数据类型。 Template data node that inherits from BaseNode to store specific data...
Data data_
存储的数据。 The stored data.
链表实现,用于存储和管理数据节点。 A linked list implementation for storing and managing data nodes.
无锁队列实现 / Lock-free queue implementation
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
红黑树的泛型数据节点,继承自 BaseNode (Generic data node for Red-Black Tree, inheriting from BaseNode).
Data data_
存储的数据 (Stored data).
红黑树实现,支持泛型键和值,并提供线程安全操作 (Red-Black Tree implementation supporting generic keys and values with thread...
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
信号量类,实现线程同步机制 Semaphore class implementing thread synchronization
异步订阅者类,用于订阅异步数据 Asynchronous subscriber class for subscribing to asynchronous data
ASyncSubscriber(Topic topic)
构造函数,使用 Topic 进行初始化 Constructor using a Topic for initialization
Data & GetData()
获取当前数据 Retrieves the current data
LockFreeList::Node< ASyncBlock > * block_
订阅者数据块。Subscriber data block.
bool Available()
检查数据是否可用 Checks if data is available
ASyncSubscriber(const char *name, Domain *domain=nullptr)
构造函数,通过名称和数据创建订阅者 Constructor to create a subscriber with a name and data
void StartWaiting()
开始等待数据更新 Starts waiting for data update
主题域(Domain)管理器,用于组织多个主题。Domain manager for organizing multiple topics.
Domain(const char *name)
构造函数,初始化或查找指定名称的主题域。Constructor initializing or looking up a domain by name.
RBTree< uint32_t >::Node< RBTree< uint32_t > > * node_
指向该域的根节点。Pointer to the root node of the domain.
构造函数,使用名称和无锁队列进行初始化 Constructor using a name and a lock-free queue
QueuedSubscriber(Topic topic, LockFreeQueue< Data > &queue)
构造函数,使用 Topic 和无锁队列进行初始化 Constructor using a Topic and a lock-free queue
服务器类,负责解析数据并将其分发到相应的主题 Server class responsible for parsing data and distributing it to corresponding...
RBTree< uint32_t > topic_map_
主题映射表 Topic mapping table
void Register(TopicHandle topic)
注册一个主题 Registers a topic
BaseQueue queue_
数据队列 Data queue
size_t ParseData(ConstRawData data)
解析接收到的数据 Parses received data
TopicHandle current_topic_
当前主题句柄 Current topic handle
uint32_t data_len_
当前数据长度 Current data length
Status
服务器解析状态枚举 Enumeration of server parsing states
@ WAIT_DATA_CRC
等待数据校验 Waiting for data CRC validation
@ WAIT_START
等待起始标志 Waiting for start flag
@ WAIT_TOPIC
等待主题信息 Waiting for topic information
RawData parse_buff_
解析数据缓冲区 Data buffer for parsing
Server(size_t buffer_length)
构造函数,初始化服务器并分配缓冲区 Constructor to initialize the server and allocate buffer
Status status_
服务器的当前解析状态 Current parsing state of the server
同步订阅者类,允许同步方式接收数据。Synchronous subscriber class allowing data reception in a synchronous manner.
SyncSubscriber(Topic topic, Data &data)
通过 Topic 句柄构造同步订阅者。Constructs a synchronous subscriber using a Topic handle.
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待接收数据。Waits for data reception.
LockFreeList::Node< SyncBlock > * block_
订阅者数据块。Subscriber data block.
SyncSubscriber(const char *name, Data &data, Domain *domain=nullptr)
通过主题名称构造同步订阅者。Constructs a synchronous subscriber by topic name.
主题(Topic)管理类 / Topic management class
SuberType
订阅者类型。Subscriber type.
@ SYNC
同步订阅者。Synchronous subscriber.
@ ASYNC
异步订阅者。Asynchronous subscriber.
@ QUEUE
队列订阅者。Queued subscriber.
@ CALLBACK
回调订阅者。Callback subscriber.
void Publish(Data &data)
发布数据 Publishes data
static Domain * def_domain_
默认的主题域,所有未指定域的主题都会归入此域 Default domain where all topics without a specified domain are assigned
static void Unlock(TopicHandle topic)
解锁主题。Unlock the topic.
RBTree< uint32_t >::Node< Block > * TopicHandle
主题句柄,指向存储数据的红黑树节点。Handle pointing to a red-black tree node storing data.
ErrorCode DumpData(Data &data)
转储数据到普通数据结构 Dumps data into a normal data structure
static TopicHandle FindOrCreate(const char *name, Domain *domain=nullptr, bool cache=false, bool check_length=true)
在指定域中查找或创建主题 Finds or creates a topic in the specified domain
ErrorCode DumpData(PackedData< Data > &data)
转储数据到 PackedData Dumps data into PackedData format
static void Lock(TopicHandle topic)
锁定主题,防止其被多个订阅者同时访问。Lock the topic to prevent it from being accessed by multiple subscribers at the sa...
static void UnlockFromCallback(TopicHandle topic)
从回调中解锁主题。Unlock the topic from a callback.
static void LockFromCallback(TopicHandle topic)
从回调中锁定主题,防止其被多个订阅者同时访问。Lock the topic from a callback to prevent it from being accessed by multiple s...
ErrorCode DumpData(RawData data, bool pack=false)
转储数据 Dump data
void EnableCache()
启用主题的缓存功能 Enables caching for the topic
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待主题的创建并返回其句柄 Waits for a topic to be created and returns its handle
void PublishFromCallback(Data &data, bool in_isr)
在回调函数中发布数据 Publishes data in a callback
uint32_t GetKey() const
获取主题的键值 Gets the key value of the topic
static Topic CreateTopic(const char *name, Domain *domain=nullptr, bool multi_publisher=false, bool cache=false, bool check_length=true)
创建一个新的主题 Creates a new topic
TopicHandle block_
主题句柄,指向当前主题的内存块 Topic handle pointing to the memory block of the current topic
void RegisterCallback(Callback &cb)
注册回调函数 Registers a callback function
static TopicHandle Find(const char *name, Domain *domain=nullptr)
在指定域中查找主题 Finds a topic in the specified domain
static RBTree< uint32_t > * domain_
主题域的红黑树结构,存储不同的主题 Red-Black Tree structure for storing different topics in the domain
Topic()
默认构造函数,创建一个空的 Topic 实例 Default constructor, creates an empty Topic instance
static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
打包数据
异步订阅块,继承自 SuberBlock Asynchronous subscription block, inheriting from SuberBlock
std::atomic< ASyncSubscriberState > state
订阅者状态 Subscriber state
RawData buff
缓冲区数据 Buffer data
存储主题(Topic)数据的结构体。Structure storing topic data.
bool cache
是否启用数据缓存。Indicates whether data caching is enabled.
bool check_length
是否检查数据长度。Indicates whether data length is checked.
std::atomic< LockState > busy
是否忙碌。Indicates whether it is busy.
uint32_t max_length
数据的最大长度。Maximum length of data.
Mutex * mutex
线程同步互斥锁。Mutex for thread synchronization.
uint32_t crc32
主题名称的 CRC32 校验码。CRC32 checksum of the topic name.
RawData data
存储的数据。Stored data.
LockFreeList subers
订阅者列表。List of subscribers.
回调订阅块,继承自 SuberBlock Callback subscription block, inheriting from SuberBlock
Callback cb
订阅的回调函数 Subscribed callback function
队列订阅块,继承自 SuberBlock Queue subscription block, inheriting from SuberBlock
void * queue
指向订阅队列的指针 Pointer to the subscribed queue
void(* fun)(RawData &, void *, bool)
处理数据的回调函数 Callback function to handle data
订阅者信息存储结构。Structure storing subscriber information.
SuberType type
订阅者类型。Type of subscriber.
同步订阅者存储结构。Structure storing synchronous subscriber data.
Semaphore sem
信号量,用于同步等待数据。Semaphore for data synchronization.
RawData buff
存储的数据缓冲区。Data buffer.