5#include "libxr_def.hpp"
14void Topic::PackedDataHeader::SetDataLen(uint32_t len)
16 data_len_raw[0] =
static_cast<uint8_t
>(len >> 16);
17 data_len_raw[1] =
static_cast<uint8_t
>(len >> 8);
18 data_len_raw[2] =
static_cast<uint8_t
>(len);
21uint32_t Topic::PackedDataHeader::GetDataLen()
const
23 return static_cast<uint32_t
>(data_len_raw[0]) << 16 |
24 static_cast<uint32_t
>(data_len_raw[1]) << 8 |
25 static_cast<uint32_t
>(data_len_raw[2]);
30 if (topic->
data_.mutex)
32 topic->
data_.mutex->Lock();
36 LockState expected = LockState::UNLOCKED;
37 if (!topic->
data_.busy.compare_exchange_strong(expected, LockState::LOCKED))
48 if (topic->
data_.mutex)
50 topic->
data_.mutex->Unlock();
54 topic->
data_.busy.store(LockState::UNLOCKED, std::memory_order_release);
60 if (topic->
data_.mutex)
66 LockState expected = LockState::UNLOCKED;
67 if (!topic->
data_.busy.compare_exchange_strong(expected, LockState::LOCKED))
78 if (topic->
data_.mutex)
84 topic->
data_.busy.store(LockState::UNLOCKED, std::memory_order_release);
96 {
return static_cast<int>(a) -
static_cast<int>(b); });
104 if (domain !=
nullptr)
111 [](
const uint32_t& a,
const uint32_t& b)
112 {
return static_cast<int>(a) -
static_cast<int>(b); });
122 auto node =
new (std::align_val_t(LIBXR_CACHE_LINE_SIZE))
130 bool cache,
bool check_length)
140 if (domain ==
nullptr)
151 ASSERT(topic->data_.max_length == max_length);
152 ASSERT(topic->data_.check_length == check_length);
154 if (multi_publisher && !topic->data_.mutex)
173 block_->
data_.busy.store(LockState::USE_MUTEX, std::memory_order_release);
178 block_->
data_.busy.store(LockState::UNLOCKED, std::memory_order_release);
194 if (domain ==
nullptr)
226 ASSERT(size <= block_->data_.max_length);
248 auto sync =
reinterpret_cast<SyncBlock*
>(&block);
255 auto async =
reinterpret_cast<ASyncBlock*
>(&block);
256 if (async->state.load(std::memory_order_acquire) == ASyncSubscriberState::WAITING)
259 async->state.store(ASyncSubscriberState::DATA_READY, std::memory_order_release);
265 auto queue_block =
reinterpret_cast<QueueBlock*
>(&block);
266 queue_block->
fun(data, queue_block->queue,
false);
272 cb_block->
cb.
Run(
false, data);
276 return ErrorCode::OK;
293 ASSERT(size <= block_->data_.max_length);
315 auto sync =
reinterpret_cast<SyncBlock*
>(&block);
317 sync->sem.PostFromCallback(in_isr);
322 auto async =
reinterpret_cast<ASyncBlock*
>(&block);
323 if (async->state.load(std::memory_order_acquire) == ASyncSubscriberState::WAITING)
326 async->state.store(ASyncSubscriberState::DATA_READY, std::memory_order_release);
332 auto queue_block =
reinterpret_cast<QueueBlock*
>(&block);
333 queue_block->
fun(data, queue_block->queue,
false);
339 cb_block->
cb.
Run(in_isr, data);
343 return ErrorCode::OK;
353 PackedData<uint8_t>* pack =
reinterpret_cast<PackedData<uint8_t>*
>(buffer.
addr_);
357 pack->raw.header_.prefix = 0xa5;
358 pack->raw.header_.topic_name_crc32 = topic_name_crc32;
359 pack->raw.header_.SetDataLen(source.
size_);
360 pack->raw.header_.pack_header_crc8 =
361 CRC8::Calculate(&pack->raw,
sizeof(PackedDataHeader) -
sizeof(uint8_t));
362 uint8_t* crc8_pack =
reinterpret_cast<uint8_t*
>(
363 reinterpret_cast<uint8_t*
>(pack) + PACK_BASE_SIZE + source.
size_ -
sizeof(uint8_t));
372 topic =
Find(name, domain);
373 if (topic ==
nullptr)
381 }
while (topic ==
nullptr);
399 : topic_map_([](const uint32_t& a, const uint32_t& b)
400 {
return static_cast<int>(a) -
static_cast<int>(b); }),
401 queue_(1, buffer_length)
404 ASSERT(buffer_length > PACK_BASE_SIZE);
405 parse_buff_.size_ = buffer_length;
406 parse_buff_.addr_ =
new uint8_t[buffer_length];
412 topic_map_.Insert(*node, topic->
key);
423 if (status_ == Status::WAIT_START)
426 auto queue_size = queue_.Size();
427 for (uint32_t i = 0; i < queue_size; i++)
430 queue_.Peek(&prefix);
433 status_ = Status::WAIT_TOPIC;
439 if (status_ == Status::WAIT_START)
446 if (status_ == Status::WAIT_TOPIC)
449 if (queue_.Size() >=
sizeof(PackedDataHeader))
451 queue_.PopBatch(parse_buff_.addr_,
sizeof(PackedDataHeader));
452 if (
CRC8::Verify(parse_buff_.addr_,
sizeof(PackedDataHeader)))
454 auto header =
reinterpret_cast<PackedDataHeader*
>(parse_buff_.addr_);
456 auto node = topic_map_.Search<
TopicHandle>(header->topic_name_crc32);
459 data_len_ = header->GetDataLen();
460 current_topic_ = *node;
461 if (data_len_ + PACK_BASE_SIZE >= queue_.length_)
463 status_ = Status::WAIT_START;
466 status_ = Status::WAIT_DATA_CRC;
470 status_ = Status::WAIT_START;
476 status_ = Status::WAIT_START;
487 if (status_ == Status::WAIT_DATA_CRC)
490 if (queue_.Size() >= data_len_ +
sizeof(uint8_t))
493 reinterpret_cast<uint8_t*
>(parse_buff_.addr_) +
sizeof(PackedDataHeader);
494 queue_.PopBatch(data, data_len_ +
sizeof(uint8_t));
496 data_len_ +
sizeof(PackedDataHeader) +
sizeof(uint8_t)))
498 status_ = Status::WAIT_START;
500 reinterpret_cast<uint8_t*
>(parse_buff_.addr_) +
sizeof(PackedDataHeader);
501 if (data_len_ > current_topic_->data_.max_length)
503 data_len_ = current_topic_->data_.max_length;
505 Topic(current_topic_).Publish(data, data_len_);
513 status_ = Status::WAIT_START;
static uint32_t Calculate(const void *raw, size_t len)
计算数据的 CRC32 校验码 / Computes the CRC32 checksum for the given data
static uint8_t Calculate(const void *raw, size_t len)
计算数据的 CRC8 校验码 / Computes the CRC8 checksum for the given data
static bool Verify(const void *raw, size_t len)
验证数据的 CRC8 校验码 / Verifies the CRC8 checksum of the given data
通用回调包装,支持动态参数传递 / Generic callback wrapper supporting dynamic argument passing
void Run(bool in_isr, PassArgs &&... args) const
执行回调函数并传递参数 / Execute the callback with arguments
常量原始数据封装类。 A class for encapsulating constant raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
const void * addr_
数据存储地址(常量)。 The storage address of the data (constant).
数据节点模板,继承自 BaseNode,用于存储具体数据类型。 Template data node that inherits from BaseNode to store specific data...
static void FastCopy(void *dst, const void *src, size_t size)
快速内存拷贝 / Fast memory copy
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
Key key
节点键值 (Key associated with the node).
红黑树的泛型数据节点,继承自 BaseNode (Generic data node for Red-Black Tree, inheriting from BaseNode).
Data data_
存储的数据 (Stored data).
红黑树实现,支持泛型键和值,并提供线程安全操作 (Red-Black Tree implementation supporting generic keys and values with thread...
Node< Data > * Search(const Key &key)
搜索红黑树中的节点 (Search for a node in the Red-Black Tree).
void Insert(BaseNode &node, KeyType &&key)
在树中插入新节点 (Insert a new node into the tree).
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
static uint32_t GetTime()
获取当前系统时间(毫秒) Gets the current system time in milliseconds
static void Sleep(uint32_t milliseconds)
让线程进入休眠状态 Puts the thread to sleep
主题域(Domain)管理器,用于组织多个主题。Domain manager for organizing multiple topics.
Domain(const char *name)
构造函数,初始化或查找指定名称的主题域。Constructor initializing or looking up a domain by name.
RBTree< uint32_t >::Node< RBTree< uint32_t > > * node_
指向该域的根节点。Pointer to the root node of the domain.
void Register(TopicHandle topic)
注册一个主题 Registers a topic
size_t ParseData(ConstRawData data)
解析接收到的数据 Parses received data
Server(size_t buffer_length)
构造函数,初始化服务器并分配缓冲区 Constructor to initialize the server and allocate buffer
@ SYNC
同步订阅者。Synchronous subscriber.
@ ASYNC
异步订阅者。Asynchronous subscriber.
@ QUEUE
队列订阅者。Queued subscriber.
@ CALLBACK
回调订阅者。Callback subscriber.
void Publish(Data &data)
发布数据 Publishes data
static Domain * def_domain_
默认的主题域,所有未指定域的主题都会归入此域 Default domain where all topics without a specified domain are assigned
static void Unlock(TopicHandle topic)
解锁主题。Unlock the topic.
static void Lock(TopicHandle topic)
锁定主题,防止其被多个订阅者同时访问。Lock the topic to prevent it from being accessed by multiple subscribers at the sa...
static void UnlockFromCallback(TopicHandle topic)
从回调中解锁主题。Unlock the topic from a callback.
static void LockFromCallback(TopicHandle topic)
从回调中锁定主题,防止其被多个订阅者同时访问。Lock the topic from a callback to prevent it from being accessed by multiple s...
void EnableCache()
启用主题的缓存功能 Enables caching for the topic
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待主题的创建并返回其句柄 Waits for a topic to be created and returns its handle
void PublishFromCallback(Data &data, bool in_isr)
在回调函数中发布数据 Publishes data in a callback
uint32_t GetKey() const
获取主题的键值 Gets the key value of the topic
TopicHandle block_
主题句柄,指向当前主题的内存块 Topic handle pointing to the memory block of the current topic
void RegisterCallback(Callback &cb)
注册回调函数 Registers a callback function
static TopicHandle Find(const char *name, Domain *domain=nullptr)
在指定域中查找主题 Finds a topic in the specified domain
static RBTree< uint32_t > * domain_
主题域的红黑树结构,存储不同的主题 Red-Black Tree structure for storing different topics in the domain
Topic()
默认构造函数,创建一个空的 Topic 实例 Default constructor, creates an empty Topic instance
static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
打包数据
异步订阅块,继承自 SuberBlock Asynchronous subscription block, inheriting from SuberBlock
存储主题(Topic)数据的结构体。Structure storing topic data.
回调订阅块,继承自 SuberBlock Callback subscription block, inheriting from SuberBlock
Callback cb
订阅的回调函数 Subscribed callback function
队列订阅块,继承自 SuberBlock Queue subscription block, inheriting from SuberBlock
void(* fun)(RawData &, void *, bool)
处理数据的回调函数 Callback function to handle data
订阅者信息存储结构。Structure storing subscriber information.
SuberType type
订阅者类型。Type of subscriber.
同步订阅者存储结构。Structure storing synchronous subscriber data.