libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
message.cpp
1#include "message.hpp"
2
3using namespace LibXR;
4
5template class LibXR::RBTree<uint32_t>;
7
8void Topic::PackedDataHeader::SetDataLen(uint32_t len)
9{
10 data_len_raw[0] = static_cast<uint8_t>(len >> 16);
11 data_len_raw[1] = static_cast<uint8_t>(len >> 8);
12 data_len_raw[2] = static_cast<uint8_t>(len);
13}
14
15uint32_t Topic::PackedDataHeader::GetDataLen() const
16{
17 return static_cast<uint32_t>(data_len_raw[0]) << 16 |
18 static_cast<uint32_t>(data_len_raw[1]) << 8 |
19 static_cast<uint32_t>(data_len_raw[2]);
20}
21
22Topic::Domain::Domain(const char *name)
23{
24 if (!domain_)
25 {
26 if (!domain_)
27 {
28 domain_ =
29 new RBTree<uint32_t>([](const uint32_t &a, const uint32_t &b)
30 { return static_cast<int>(a) - static_cast<int>(b); });
31 }
32 }
33
34 auto crc32 = CRC32::Calculate(name, strlen(name));
35
36 auto domain = domain_->Search<RBTree<uint32_t>>(crc32);
37
38 if (domain != nullptr)
39 {
40 node_ = domain;
41 return;
42 }
43
45 [](const uint32_t &a, const uint32_t &b)
46 { return static_cast<int>(a) - static_cast<int>(b); });
47
48 domain_->Insert(*node_, crc32);
49}
50
52{
53 CallbackBlock block;
54 block.cb = cb;
56 auto node = new (std::align_val_t(LIBXR_CACHE_LINE_SIZE))
58 block_->data_.subers.Add(*node);
59}
60
62
63Topic::Topic(const char *name, uint32_t max_length, Domain *domain, bool cache,
64 bool check_length)
65{
66 if (!def_domain_)
67 {
68 if (!def_domain_)
69 {
70 def_domain_ = new Domain("libxr_def_domain");
71 }
72 }
73
74 if (domain == nullptr)
75 {
76 domain = def_domain_;
77 }
78
79 auto crc32 = CRC32::Calculate(name, strlen(name));
80
81 auto topic = domain->node_->data_.Search<Block>(crc32);
82
83 if (topic)
84 {
85 ASSERT(topic->data_.max_length == max_length);
86 ASSERT(topic->data_.check_length == check_length);
87
88 block_ = topic;
89 }
90 else
91 {
93 block_->data_.max_length = max_length;
94 block_->data_.crc32 = crc32;
95 block_->data_.data.addr_ = nullptr;
96 block_->data_.cache = false;
97 block_->data_.check_length = check_length;
98
99 domain->node_->data_.Insert(*block_, crc32);
100 }
101
102 if (cache && !block_->data_.cache)
103 {
104 EnableCache();
105 }
106}
107
109
110Topic::TopicHandle Topic::Find(const char *name, Domain *domain)
111{
112 if (domain == nullptr)
113 {
114 domain = def_domain_;
115 }
116
117 auto crc32 = CRC32::Calculate(name, strlen(name));
118
119 return domain->node_->data_.Search<Block>(crc32);
120}
121
123{
124 block_->data_.mutex.Lock();
125 if (!block_->data_.cache)
126 {
127 block_->data_.cache = true;
128 block_->data_.data.addr_ = new uint8_t[block_->data_.max_length];
129 }
130 block_->data_.mutex.Unlock();
131}
132
133void Topic::Publish(void *addr, uint32_t size)
134{
135 block_->data_.mutex.Lock();
136 if (block_->data_.check_length)
137 {
138 ASSERT(size == block_->data_.max_length);
139 }
140 else
141 {
142 ASSERT(size <= block_->data_.max_length);
143 }
144
145 if (block_->data_.cache)
146 {
147 memcpy(block_->data_.data.addr_, addr, size);
148 block_->data_.data.size_ = size;
149 }
150 else
151 {
152 block_->data_.data.addr_ = addr;
153 block_->data_.data.size_ = size;
154 }
155
156 RawData data = block_->data_.data;
157
158 auto foreach_fun = [&](SuberBlock &block)
159 {
160 switch (block.type)
161 {
162 case SuberType::SYNC:
163 {
164 auto sync = reinterpret_cast<SyncBlock *>(&block);
165 memcpy(sync->buff.addr_, data.addr_, data.size_);
166 sync->sem.Post();
167 break;
168 }
169 case SuberType::ASYNC:
170 {
171 auto async = reinterpret_cast<ASyncBlock *>(&block);
172 if (async->waiting)
173 {
174 memcpy(async->buff.addr_, data.addr_, data.size_);
175 async->data_ready = true;
176 }
177 break;
178 }
179 case SuberType::QUEUE:
180 {
181 auto queue_block = reinterpret_cast<QueueBlock *>(&block);
182 queue_block->fun(data, queue_block->queue, false);
183 break;
184 }
186 {
187 auto cb_block = reinterpret_cast<CallbackBlock *>(&block);
188 cb_block->cb.Run(false, data);
189 break;
190 }
191 }
192 return ErrorCode::OK;
193 };
194
195 block_->data_.subers.Foreach<SuberBlock>(foreach_fun);
196
197 block_->data_.mutex.Unlock();
198}
199
200void Topic::PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
201{
202 PackedData<uint8_t> *pack = reinterpret_cast<PackedData<uint8_t> *>(buffer.addr_);
203
204 memcpy(&pack->raw.data_, source.addr_, source.size_);
205
206 pack->raw.header_.prefix = 0xa5;
207 pack->raw.header_.topic_name_crc32 = topic_name_crc32;
208 pack->raw.header_.SetDataLen(source.size_);
209 pack->raw.header_.pack_header_crc8 =
210 CRC8::Calculate(&pack->raw, sizeof(PackedDataHeader) - sizeof(uint8_t));
211 uint8_t *crc8_pack =
212 reinterpret_cast<uint8_t *>(reinterpret_cast<uint8_t *>(pack) + PACK_BASE_SIZE +
213 source.size_ - sizeof(uint8_t));
214 *crc8_pack = CRC8::Calculate(pack, PACK_BASE_SIZE - sizeof(uint8_t) + source.size_);
215}
216
217Topic::TopicHandle Topic::WaitTopic(const char *name, uint32_t timeout, Domain *domain)
218{
219 TopicHandle topic = nullptr;
220 do
221 {
222 topic = Find(name, domain);
223 if (topic == nullptr)
224 {
225 if (timeout <= Thread::GetTime())
226 {
227 return nullptr;
228 }
229 Thread::Sleep(1);
230 }
231 } while (topic == nullptr);
232
233 return topic;
234}
235
236uint32_t Topic::GetKey() const
237{
238 if (block_)
239 {
240 return block_->key;
241 }
242 else
243 {
244 return 0;
245 }
246}
247
248Topic::Server::Server(size_t buffer_length)
249 : topic_map_([](const uint32_t &a, const uint32_t &b)
250 { return static_cast<int>(a) - static_cast<int>(b); }),
251 queue_(1, buffer_length)
252{
253 /* Minimum size: header8 + crc32 + length24 + crc8 + data + crc8 = 10 */
254 ASSERT(buffer_length > PACK_BASE_SIZE);
255 parse_buff_.size_ = buffer_length;
256 parse_buff_.addr_ = new uint8_t[buffer_length];
257}
258
260{
261 auto node = new RBTree<uint32_t>::Node<TopicHandle>(topic);
262 topic_map_.Insert(*node, topic->key);
263}
264
266{
267 size_t count = 0;
268
269 queue_.PushBatch(data.addr_, data.size_);
270
271 while (true)
272 { /* 1. Check prefix */
273 if (status_ == Status::WAIT_START)
274 {
275 /* Check start frame */
276 auto queue_size = queue_.Size();
277 for (uint32_t i = 0; i < queue_size; i++)
278 {
279 uint8_t prefix = 0;
280 queue_.Peek(&prefix);
281 if (prefix == 0xa5)
282 {
283 status_ = Status::WAIT_TOPIC;
284 break;
285 }
286 queue_.Pop();
287 }
288 /* Not found */
289 if (status_ == Status::WAIT_START)
290 {
291 return count;
292 }
293 }
294
295 /* 2. Get topic info */
296 if (status_ == Status::WAIT_TOPIC)
297 {
298 /* Check size&crc */
299 if (queue_.Size() >= sizeof(PackedDataHeader))
300 {
301 queue_.PopBatch(parse_buff_.addr_, sizeof(PackedDataHeader));
302 if (CRC8::Verify(parse_buff_.addr_, sizeof(PackedDataHeader)))
303 {
304 auto header = reinterpret_cast<PackedDataHeader *>(parse_buff_.addr_);
305 /* Find topic */
306 auto node = topic_map_.Search<TopicHandle>(header->topic_name_crc32);
307 if (node)
308 {
309 data_len_ = header->GetDataLen();
310 current_topic_ = *node;
311 if (data_len_ + PACK_BASE_SIZE >= queue_.length_)
312 {
313 status_ = Status::WAIT_START;
314 continue;
315 }
316 status_ = Status::WAIT_DATA_CRC;
317 }
318 else
319 {
320 status_ = Status::WAIT_START;
321 continue;
322 }
323 }
324 else
325 {
326 status_ = Status::WAIT_START;
327 continue;
328 }
329 }
330 else
331 {
332 return count;
333 }
334 }
335
336 /* 3. Get data */
337 if (status_ == Status::WAIT_DATA_CRC)
338 {
339 /* Check size&crc */
340 if (queue_.Size() >= data_len_ + sizeof(uint8_t))
341 {
342 uint8_t *data =
343 reinterpret_cast<uint8_t *>(parse_buff_.addr_) + sizeof(PackedDataHeader);
344 queue_.PopBatch(data, data_len_ + sizeof(uint8_t));
345 if (CRC8::Verify(parse_buff_.addr_,
346 data_len_ + sizeof(PackedDataHeader) + sizeof(uint8_t)))
347 {
348 status_ = Status::WAIT_START;
349 auto data =
350 reinterpret_cast<uint8_t *>(parse_buff_.addr_) + sizeof(PackedDataHeader);
351 if (data_len_ > current_topic_->data_.max_length)
352 {
353 data_len_ = current_topic_->data_.max_length;
354 }
355 Topic(current_topic_).Publish(data, data_len_);
356
357 count++;
358
359 continue;
360 }
361 else
362 {
363 status_ = Status::WAIT_START;
364 continue;
365 }
366 }
367 else
368 {
369 return count;
370 }
371 }
372 }
373 return count;
374}
static uint32_t Calculate(const void *raw, size_t len)
计算数据的 CRC32 校验码 / Computes the CRC32 checksum for the given data
Definition crc.hpp:251
static uint8_t Calculate(const void *raw, size_t len)
计算数据的 CRC8 校验码 / Computes the CRC8 checksum for the given data
Definition crc.hpp:66
static bool Verify(const void *raw, size_t len)
验证数据的 CRC8 校验码 / Verifies the CRC8 checksum of the given data
Definition crc.hpp:91
提供一个通用的回调包装,支持动态参数传递。 Provides a generic callback wrapper, supporting dynamic argument passing.
Definition libxr_cb.hpp:124
void Run(bool in_isr, PassArgs &&...args) const
执行回调函数,并传递参数。 Executes the callback function, passing the arguments.
Definition libxr_cb.hpp:207
常量原始数据封装类。 A class for encapsulating constant raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
const void * addr_
数据存储地址(常量)。 The storage address of the data (constant).
数据节点模板,继承自 BaseNode,用于存储具体数据类型。 Template data node that inherits from BaseNode to store specific data...
Key key
节点键值 (Key associated with the node).
Definition rbt.hpp:41
红黑树的泛型数据节点,继承自 BaseNode (Generic data node for Red-Black Tree, inheriting from BaseNode).
Definition rbt.hpp:64
Data data_
存储的数据 (Stored data).
Definition rbt.hpp:99
红黑树实现,支持泛型键和值,并提供线程安全操作 (Red-Black Tree implementation supporting generic keys and values with thread...
Definition rbt.hpp:24
Node< Data > * Search(const Key &key)
搜索红黑树中的节点 (Search for a node in the Red-Black Tree).
Definition rbt.hpp:122
void Insert(BaseNode &node, KeyType &&key)
在树中插入新节点 (Insert a new node into the tree).
Definition rbt.hpp:237
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
static uint32_t GetTime()
获取当前系统时间(毫秒) Gets the current system time in milliseconds
Definition thread.cpp:41
static void Sleep(uint32_t milliseconds)
让线程进入休眠状态 Puts the thread to sleep
Definition thread.cpp:16
主题域(Domain)管理器,用于组织多个主题。Domain manager for organizing multiple topics.
Definition message.hpp:142
Domain(const char *name)
构造函数,初始化或查找指定名称的主题域。Constructor initializing or looking up a domain by name.
Definition message.cpp:22
RBTree< uint32_t >::Node< RBTree< uint32_t > > * node_
指向该域的根节点。Pointer to the root node of the domain.
Definition message.hpp:154
void Register(TopicHandle topic)
注册一个主题 Registers a topic
Definition message.cpp:259
size_t ParseData(ConstRawData data)
解析接收到的数据 Parses received data
Definition message.cpp:265
Server(size_t buffer_length)
构造函数,初始化服务器并分配缓冲区 Constructor to initialize the server and allocate buffer
Definition message.cpp:248
@ SYNC
同步订阅者。Synchronous subscriber.
@ ASYNC
异步订阅者。Asynchronous subscriber.
@ QUEUE
队列订阅者。Queued subscriber.
@ CALLBACK
回调订阅者。Callback subscriber.
void Publish(Data &data)
发布数据 Publishes data
Definition message.hpp:554
static Domain * def_domain_
默认的主题域,所有未指定域的主题都会归入此域 Default domain where all topics without a specified domain are assigned
Definition message.hpp:749
void EnableCache()
启用主题的缓存功能 Enables caching for the topic
Definition message.cpp:122
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待主题的创建并返回其句柄 Waits for a topic to be created and returns its handle
Definition message.cpp:217
uint32_t GetKey() const
获取主题的键值 Gets the key value of the topic
Definition message.cpp:236
TopicHandle block_
主题句柄,指向当前主题的内存块 Topic handle pointing to the memory block of the current topic
Definition message.hpp:737
void RegisterCallback(Callback &cb)
注册回调函数 Registers a callback function
Definition message.cpp:51
static TopicHandle Find(const char *name, Domain *domain=nullptr)
在指定域中查找主题 Finds a topic in the specified domain
Definition message.cpp:110
static RBTree< uint32_t > * domain_
主题域的红黑树结构,存储不同的主题 Red-Black Tree structure for storing different topics in the domain
Definition message.hpp:743
Topic()
默认构造函数,创建一个空的 Topic 实例 Default constructor, creates an empty Topic instance
Definition message.cpp:61
static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
打包数据
Definition message.cpp:200
LibXR 命名空间
Definition ch32_gpio.hpp:9
异步订阅块,继承自 SuberBlock Asynchronous subscription block, inheriting from SuberBlock
Definition message.hpp:252
存储主题(Topic)数据的结构体。Structure storing topic data.
Definition message.hpp:35
回调订阅块,继承自 SuberBlock Callback subscription block, inheriting from SuberBlock
Definition message.hpp:449
Callback cb
订阅的回调函数 Subscribed callback function
Definition message.hpp:450
队列订阅块,继承自 SuberBlock Queue subscription block, inheriting from SuberBlock
Definition message.hpp:338
void(* fun)(RawData &, void *, bool)
处理数据的回调函数 Callback function to handle data
Definition message.hpp:340
订阅者信息存储结构。Structure storing subscriber information.
Definition message.hpp:174
SuberType type
订阅者类型。Type of subscriber.
Definition message.hpp:175
同步订阅者存储结构。Structure storing synchronous subscriber data.
Definition message.hpp:183