libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
message.cpp
1#include "message.hpp"
2
3#include <atomic>
4
5#include "libxr_def.hpp"
6#include "logger.hpp"
7#include "mutex.hpp"
8
9using namespace LibXR;
10
11template class LibXR::RBTree<uint32_t>;
13
14void Topic::PackedDataHeader::SetDataLen(uint32_t len)
15{
16 data_len_raw[0] = static_cast<uint8_t>(len >> 16);
17 data_len_raw[1] = static_cast<uint8_t>(len >> 8);
18 data_len_raw[2] = static_cast<uint8_t>(len);
19}
20
21uint32_t Topic::PackedDataHeader::GetDataLen() const
22{
23 return static_cast<uint32_t>(data_len_raw[0]) << 16 |
24 static_cast<uint32_t>(data_len_raw[1]) << 8 |
25 static_cast<uint32_t>(data_len_raw[2]);
26}
27
29{
30 if (topic->data_.mutex)
31 {
32 topic->data_.mutex->Lock();
33 }
34 else
35 {
36 LockState expected = LockState::UNLOCKED;
37 if (!topic->data_.busy.compare_exchange_strong(expected, LockState::LOCKED))
38 {
39 /* Multiple threads are trying to lock the same topic */
40 ASSERT(false);
41 return;
42 }
43 }
44}
45
47{
48 if (topic->data_.mutex)
49 {
50 topic->data_.mutex->Unlock();
51 }
52 else
53 {
54 topic->data_.busy.store(LockState::UNLOCKED, std::memory_order_release);
55 }
56}
57
59{
60 if (topic->data_.mutex)
61 {
62 ASSERT(false);
63 }
64 else
65 {
66 LockState expected = LockState::UNLOCKED;
67 if (!topic->data_.busy.compare_exchange_strong(expected, LockState::LOCKED))
68 {
69 /* Multiple threads are trying to lock the same topic */
70 ASSERT(false);
71 return;
72 }
73 }
74}
75
77{
78 if (topic->data_.mutex)
79 {
80 ASSERT(false);
81 }
82 else
83 {
84 topic->data_.busy.store(LockState::UNLOCKED, std::memory_order_release);
85 }
86}
87
88Topic::Domain::Domain(const char *name)
89{
90 if (!domain_)
91 {
92 if (!domain_)
93 {
94 domain_ =
95 new RBTree<uint32_t>([](const uint32_t &a, const uint32_t &b)
96 { return static_cast<int>(a) - static_cast<int>(b); });
97 }
98 }
99
100 auto crc32 = CRC32::Calculate(name, strlen(name));
101
102 auto domain = domain_->Search<RBTree<uint32_t>>(crc32);
103
104 if (domain != nullptr)
105 {
106 node_ = domain;
107 return;
108 }
109
111 [](const uint32_t &a, const uint32_t &b)
112 { return static_cast<int>(a) - static_cast<int>(b); });
113
114 domain_->Insert(*node_, crc32);
115}
116
118{
119 CallbackBlock block;
120 block.cb = cb;
122 auto node = new (std::align_val_t(LIBXR_CACHE_LINE_SIZE))
124 block_->data_.subers.Add(*node);
125}
126
128
129Topic::Topic(const char *name, uint32_t max_length, Domain *domain, bool multi_publisher,
130 bool cache, bool check_length)
131{
132 if (!def_domain_)
133 {
134 if (!def_domain_)
135 {
136 def_domain_ = new Domain("libxr_def_domain");
137 }
138 }
139
140 if (domain == nullptr)
141 {
142 domain = def_domain_;
143 }
144
145 auto crc32 = CRC32::Calculate(name, strlen(name));
146
147 auto topic = domain->node_->data_.Search<Block>(crc32);
148
149 if (topic)
150 {
151 ASSERT(topic->data_.max_length == max_length);
152 ASSERT(topic->data_.check_length == check_length);
153
154 if (multi_publisher && !topic->data_.mutex)
155 {
156 ASSERT(false);
157 }
158
159 block_ = topic;
160 }
161 else
162 {
164 block_->data_.max_length = max_length;
165 block_->data_.crc32 = crc32;
166 block_->data_.data.addr_ = nullptr;
167 block_->data_.cache = false;
168 block_->data_.check_length = check_length;
169
170 if (multi_publisher)
171 {
172 block_->data_.mutex = new Mutex();
173 block_->data_.busy.store(LockState::USE_MUTEX, std::memory_order_release);
174 }
175 else
176 {
177 block_->data_.mutex = nullptr;
178 block_->data_.busy.store(LockState::UNLOCKED, std::memory_order_release);
179 }
180
181 domain->node_->data_.Insert(*block_, crc32);
182 }
183
184 if (cache && !block_->data_.cache)
185 {
186 EnableCache();
187 }
188}
189
191
192Topic::TopicHandle Topic::Find(const char *name, Domain *domain)
193{
194 if (domain == nullptr)
195 {
196 domain = def_domain_;
197 }
198
199 auto crc32 = CRC32::Calculate(name, strlen(name));
200
201 return domain->node_->data_.Search<Block>(crc32);
202}
203
205{
206 Lock(block_);
207
208 if (!block_->data_.cache)
209 {
210 block_->data_.cache = true;
211 block_->data_.data.addr_ = new uint8_t[block_->data_.max_length];
212 }
213
214 Unlock(block_);
215}
216
217void Topic::Publish(void *addr, uint32_t size)
218{
219 Lock(block_);
220 if (block_->data_.check_length)
221 {
222 ASSERT(size == block_->data_.max_length);
223 }
224 else
225 {
226 ASSERT(size <= block_->data_.max_length);
227 }
228
229 if (block_->data_.cache)
230 {
231 memcpy(block_->data_.data.addr_, addr, size);
232 block_->data_.data.size_ = size;
233 }
234 else
235 {
236 block_->data_.data.addr_ = addr;
237 block_->data_.data.size_ = size;
238 }
239
240 RawData data = block_->data_.data;
241
242 auto foreach_fun = [&](SuberBlock &block)
243 {
244 switch (block.type)
245 {
246 case SuberType::SYNC:
247 {
248 auto sync = reinterpret_cast<SyncBlock *>(&block);
249 memcpy(sync->buff.addr_, data.addr_, data.size_);
250 sync->sem.Post();
251 break;
252 }
253 case SuberType::ASYNC:
254 {
255 auto async = reinterpret_cast<ASyncBlock *>(&block);
256 if (async->state.load(std::memory_order_acquire) == ASyncSubscriberState::WAITING)
257 {
258 memcpy(async->buff.addr_, data.addr_, data.size_);
259 async->state.store(ASyncSubscriberState::DATA_READY, std::memory_order_release);
260 }
261 break;
262 }
263 case SuberType::QUEUE:
264 {
265 auto queue_block = reinterpret_cast<QueueBlock *>(&block);
266 queue_block->fun(data, queue_block->queue, false);
267 break;
268 }
270 {
271 auto cb_block = reinterpret_cast<CallbackBlock *>(&block);
272 cb_block->cb.Run(false, data);
273 break;
274 }
275 }
276 return ErrorCode::OK;
277 };
278
279 block_->data_.subers.Foreach<SuberBlock>(foreach_fun);
280
281 Unlock(block_);
282}
283
284void Topic::PublishFromCallback(void *addr, uint32_t size, bool in_isr)
285{
287 if (block_->data_.check_length)
288 {
289 ASSERT(size == block_->data_.max_length);
290 }
291 else
292 {
293 ASSERT(size <= block_->data_.max_length);
294 }
295
296 if (block_->data_.cache)
297 {
298 memcpy(block_->data_.data.addr_, addr, size);
299 block_->data_.data.size_ = size;
300 }
301 else
302 {
303 block_->data_.data.addr_ = addr;
304 block_->data_.data.size_ = size;
305 }
306
307 RawData data = block_->data_.data;
308
309 auto foreach_fun = [&](SuberBlock &block)
310 {
311 switch (block.type)
312 {
313 case SuberType::SYNC:
314 {
315 auto sync = reinterpret_cast<SyncBlock *>(&block);
316 memcpy(sync->buff.addr_, data.addr_, data.size_);
317 sync->sem.PostFromCallback(in_isr);
318 break;
319 }
320 case SuberType::ASYNC:
321 {
322 auto async = reinterpret_cast<ASyncBlock *>(&block);
323 if (async->state.load(std::memory_order_acquire) == ASyncSubscriberState::WAITING)
324 {
325 memcpy(async->buff.addr_, data.addr_, data.size_);
326 async->state.store(ASyncSubscriberState::DATA_READY, std::memory_order_release);
327 }
328 break;
329 }
330 case SuberType::QUEUE:
331 {
332 auto queue_block = reinterpret_cast<QueueBlock *>(&block);
333 queue_block->fun(data, queue_block->queue, false);
334 break;
335 }
337 {
338 auto cb_block = reinterpret_cast<CallbackBlock *>(&block);
339 cb_block->cb.Run(in_isr, data);
340 break;
341 }
342 }
343 return ErrorCode::OK;
344 };
345
346 block_->data_.subers.Foreach<SuberBlock>(foreach_fun);
347
349}
350
351void Topic::PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
352{
353 PackedData<uint8_t> *pack = reinterpret_cast<PackedData<uint8_t> *>(buffer.addr_);
354
355 memcpy(&pack->raw.data_, source.addr_, source.size_);
356
357 pack->raw.header_.prefix = 0xa5;
358 pack->raw.header_.topic_name_crc32 = topic_name_crc32;
359 pack->raw.header_.SetDataLen(source.size_);
360 pack->raw.header_.pack_header_crc8 =
361 CRC8::Calculate(&pack->raw, sizeof(PackedDataHeader) - sizeof(uint8_t));
362 uint8_t *crc8_pack =
363 reinterpret_cast<uint8_t *>(reinterpret_cast<uint8_t *>(pack) + PACK_BASE_SIZE +
364 source.size_ - sizeof(uint8_t));
365 *crc8_pack = CRC8::Calculate(pack, PACK_BASE_SIZE - sizeof(uint8_t) + source.size_);
366}
367
368Topic::TopicHandle Topic::WaitTopic(const char *name, uint32_t timeout, Domain *domain)
369{
370 TopicHandle topic = nullptr;
371 do
372 {
373 topic = Find(name, domain);
374 if (topic == nullptr)
375 {
376 if (timeout <= Thread::GetTime())
377 {
378 return nullptr;
379 }
380 Thread::Sleep(1);
381 }
382 } while (topic == nullptr);
383
384 return topic;
385}
386
387uint32_t Topic::GetKey() const
388{
389 if (block_)
390 {
391 return block_->key;
392 }
393 else
394 {
395 return 0;
396 }
397}
398
399Topic::Server::Server(size_t buffer_length)
400 : topic_map_([](const uint32_t &a, const uint32_t &b)
401 { return static_cast<int>(a) - static_cast<int>(b); }),
402 queue_(1, buffer_length)
403{
404 /* Minimum size: header8 + crc32 + length24 + crc8 + data + crc8 = 10 */
405 ASSERT(buffer_length > PACK_BASE_SIZE);
406 parse_buff_.size_ = buffer_length;
407 parse_buff_.addr_ = new uint8_t[buffer_length];
408}
409
411{
412 auto node = new RBTree<uint32_t>::Node<TopicHandle>(topic);
413 topic_map_.Insert(*node, topic->key);
414}
415
417{
418 size_t count = 0;
419
420 queue_.PushBatch(data.addr_, data.size_);
421
422 while (true)
423 { /* 1. Check prefix */
424 if (status_ == Status::WAIT_START)
425 {
426 /* Check start frame */
427 auto queue_size = queue_.Size();
428 for (uint32_t i = 0; i < queue_size; i++)
429 {
430 uint8_t prefix = 0;
431 queue_.Peek(&prefix);
432 if (prefix == 0xa5)
433 {
434 status_ = Status::WAIT_TOPIC;
435 break;
436 }
437 queue_.Pop();
438 }
439 /* Not found */
440 if (status_ == Status::WAIT_START)
441 {
442 return count;
443 }
444 }
445
446 /* 2. Get topic info */
447 if (status_ == Status::WAIT_TOPIC)
448 {
449 /* Check size&crc */
450 if (queue_.Size() >= sizeof(PackedDataHeader))
451 {
452 queue_.PopBatch(parse_buff_.addr_, sizeof(PackedDataHeader));
453 if (CRC8::Verify(parse_buff_.addr_, sizeof(PackedDataHeader)))
454 {
455 auto header = reinterpret_cast<PackedDataHeader *>(parse_buff_.addr_);
456 /* Find topic */
457 auto node = topic_map_.Search<TopicHandle>(header->topic_name_crc32);
458 if (node)
459 {
460 data_len_ = header->GetDataLen();
461 current_topic_ = *node;
462 if (data_len_ + PACK_BASE_SIZE >= queue_.length_)
463 {
464 status_ = Status::WAIT_START;
465 continue;
466 }
467 status_ = Status::WAIT_DATA_CRC;
468 }
469 else
470 {
471 status_ = Status::WAIT_START;
472 continue;
473 }
474 }
475 else
476 {
477 status_ = Status::WAIT_START;
478 continue;
479 }
480 }
481 else
482 {
483 return count;
484 }
485 }
486
487 /* 3. Get data */
488 if (status_ == Status::WAIT_DATA_CRC)
489 {
490 /* Check size&crc */
491 if (queue_.Size() >= data_len_ + sizeof(uint8_t))
492 {
493 uint8_t *data =
494 reinterpret_cast<uint8_t *>(parse_buff_.addr_) + sizeof(PackedDataHeader);
495 queue_.PopBatch(data, data_len_ + sizeof(uint8_t));
496 if (CRC8::Verify(parse_buff_.addr_,
497 data_len_ + sizeof(PackedDataHeader) + sizeof(uint8_t)))
498 {
499 status_ = Status::WAIT_START;
500 auto data =
501 reinterpret_cast<uint8_t *>(parse_buff_.addr_) + sizeof(PackedDataHeader);
502 if (data_len_ > current_topic_->data_.max_length)
503 {
504 data_len_ = current_topic_->data_.max_length;
505 }
506 Topic(current_topic_).Publish(data, data_len_);
507
508 count++;
509
510 continue;
511 }
512 else
513 {
514 status_ = Status::WAIT_START;
515 continue;
516 }
517 }
518 else
519 {
520 return count;
521 }
522 }
523 }
524 return count;
525}
static uint32_t Calculate(const void *raw, size_t len)
计算数据的 CRC32 校验码 / Computes the CRC32 checksum for the given data
Definition crc.hpp:251
static uint8_t Calculate(const void *raw, size_t len)
计算数据的 CRC8 校验码 / Computes the CRC8 checksum for the given data
Definition crc.hpp:66
static bool Verify(const void *raw, size_t len)
验证数据的 CRC8 校验码 / Verifies the CRC8 checksum of the given data
Definition crc.hpp:91
提供一个通用的回调包装,支持动态参数传递。 Provides a generic callback wrapper, supporting dynamic argument passing.
Definition libxr_cb.hpp:124
void Run(bool in_isr, PassArgs &&...args) const
执行回调函数,并传递参数。 Executes the callback function, passing the arguments.
Definition libxr_cb.hpp:207
常量原始数据封装类。 A class for encapsulating constant raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
const void * addr_
数据存储地址(常量)。 The storage address of the data (constant).
数据节点模板,继承自 BaseNode,用于存储具体数据类型。 Template data node that inherits from BaseNode to store specific data...
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
Definition mutex.hpp:18
Key key
节点键值 (Key associated with the node).
Definition rbt.hpp:41
红黑树的泛型数据节点,继承自 BaseNode (Generic data node for Red-Black Tree, inheriting from BaseNode).
Definition rbt.hpp:64
Data data_
存储的数据 (Stored data).
Definition rbt.hpp:99
红黑树实现,支持泛型键和值,并提供线程安全操作 (Red-Black Tree implementation supporting generic keys and values with thread...
Definition rbt.hpp:24
Node< Data > * Search(const Key &key)
搜索红黑树中的节点 (Search for a node in the Red-Black Tree).
Definition rbt.hpp:122
void Insert(BaseNode &node, KeyType &&key)
在树中插入新节点 (Insert a new node into the tree).
Definition rbt.hpp:237
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
static uint32_t GetTime()
获取当前系统时间(毫秒) Gets the current system time in milliseconds
Definition thread.cpp:41
static void Sleep(uint32_t milliseconds)
让线程进入休眠状态 Puts the thread to sleep
Definition thread.cpp:16
主题域(Domain)管理器,用于组织多个主题。Domain manager for organizing multiple topics.
Definition message.hpp:183
Domain(const char *name)
构造函数,初始化或查找指定名称的主题域。Constructor initializing or looking up a domain by name.
Definition message.cpp:88
RBTree< uint32_t >::Node< RBTree< uint32_t > > * node_
指向该域的根节点。Pointer to the root node of the domain.
Definition message.hpp:195
void Register(TopicHandle topic)
注册一个主题 Registers a topic
Definition message.cpp:410
size_t ParseData(ConstRawData data)
解析接收到的数据 Parses received data
Definition message.cpp:416
Server(size_t buffer_length)
构造函数,初始化服务器并分配缓冲区 Constructor to initialize the server and allocate buffer
Definition message.cpp:399
@ SYNC
同步订阅者。Synchronous subscriber.
@ ASYNC
异步订阅者。Asynchronous subscriber.
@ QUEUE
队列订阅者。Queued subscriber.
@ CALLBACK
回调订阅者。Callback subscriber.
void Publish(Data &data)
发布数据 Publishes data
Definition message.hpp:569
static Domain * def_domain_
默认的主题域,所有未指定域的主题都会归入此域 Default domain where all topics without a specified domain are assigned
Definition message.hpp:786
static void Unlock(TopicHandle topic)
解锁主题。Unlock the topic.
Definition message.cpp:46
static void Lock(TopicHandle topic)
锁定主题,防止其被多个订阅者同时访问。Lock the topic to prevent it from being accessed by multiple subscribers at the sa...
Definition message.cpp:28
static void UnlockFromCallback(TopicHandle topic)
从回调中解锁主题。Unlock the topic from a callback.
Definition message.cpp:76
static void LockFromCallback(TopicHandle topic)
从回调中锁定主题,防止其被多个订阅者同时访问。Lock the topic from a callback to prevent it from being accessed by multiple s...
Definition message.cpp:58
void EnableCache()
启用主题的缓存功能 Enables caching for the topic
Definition message.cpp:204
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待主题的创建并返回其句柄 Waits for a topic to be created and returns its handle
Definition message.cpp:368
void PublishFromCallback(Data &data, bool in_isr)
在回调函数中发布数据 Publishes data in a callback
Definition message.hpp:590
uint32_t GetKey() const
获取主题的键值 Gets the key value of the topic
Definition message.cpp:387
TopicHandle block_
主题句柄,指向当前主题的内存块 Topic handle pointing to the memory block of the current topic
Definition message.hpp:774
void RegisterCallback(Callback &cb)
注册回调函数 Registers a callback function
Definition message.cpp:117
static TopicHandle Find(const char *name, Domain *domain=nullptr)
在指定域中查找主题 Finds a topic in the specified domain
Definition message.cpp:192
static RBTree< uint32_t > * domain_
主题域的红黑树结构,存储不同的主题 Red-Black Tree structure for storing different topics in the domain
Definition message.hpp:780
Topic()
默认构造函数,创建一个空的 Topic 实例 Default constructor, creates an empty Topic instance
Definition message.cpp:127
static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
打包数据
Definition message.cpp:351
LibXR 命名空间
Definition ch32_gpio.hpp:9
异步订阅块,继承自 SuberBlock Asynchronous subscription block, inheriting from SuberBlock
Definition message.hpp:301
存储主题(Topic)数据的结构体。Structure storing topic data.
Definition message.hpp:45
回调订阅块,继承自 SuberBlock Callback subscription block, inheriting from SuberBlock
Definition message.hpp:459
Callback cb
订阅的回调函数 Subscribed callback function
Definition message.hpp:460
队列订阅块,继承自 SuberBlock Queue subscription block, inheriting from SuberBlock
Definition message.hpp:393
void(* fun)(RawData &, void *, bool)
处理数据的回调函数 Callback function to handle data
Definition message.hpp:395
订阅者信息存储结构。Structure storing subscriber information.
Definition message.hpp:215
SuberType type
订阅者类型。Type of subscriber.
Definition message.hpp:216
同步订阅者存储结构。Structure storing synchronous subscriber data.
Definition message.hpp:224