libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
message.cpp
1#include "message.hpp"
2
3#include <atomic>
4
5#include "libxr_def.hpp"
6#include "logger.hpp"
7#include "mutex.hpp"
8
9using namespace LibXR;
10
11template class LibXR::RBTree<uint32_t>;
13
14void Topic::PackedDataHeader::SetDataLen(uint32_t len)
15{
16 data_len_raw[0] = static_cast<uint8_t>(len >> 16);
17 data_len_raw[1] = static_cast<uint8_t>(len >> 8);
18 data_len_raw[2] = static_cast<uint8_t>(len);
19}
20
21uint32_t Topic::PackedDataHeader::GetDataLen() const
22{
23 return static_cast<uint32_t>(data_len_raw[0]) << 16 |
24 static_cast<uint32_t>(data_len_raw[1]) << 8 |
25 static_cast<uint32_t>(data_len_raw[2]);
26}
27
29{
30 if (topic->data_.mutex)
31 {
32 topic->data_.mutex->Lock();
33 }
34 else
35 {
36 LockState expected = LockState::UNLOCKED;
37 if (!topic->data_.busy.compare_exchange_strong(expected, LockState::LOCKED))
38 {
39 /* Multiple threads are trying to lock the same topic */
40 ASSERT(false);
41 return;
42 }
43 }
44}
45
47{
48 if (topic->data_.mutex)
49 {
50 topic->data_.mutex->Unlock();
51 }
52 else
53 {
54 topic->data_.busy.store(LockState::UNLOCKED, std::memory_order_release);
55 }
56}
57
59{
60 if (topic->data_.mutex)
61 {
62 ASSERT(false);
63 }
64 else
65 {
66 LockState expected = LockState::UNLOCKED;
67 if (!topic->data_.busy.compare_exchange_strong(expected, LockState::LOCKED))
68 {
69 /* Multiple threads are trying to lock the same topic */
70 ASSERT(false);
71 return;
72 }
73 }
74}
75
77{
78 if (topic->data_.mutex)
79 {
80 ASSERT(false);
81 }
82 else
83 {
84 topic->data_.busy.store(LockState::UNLOCKED, std::memory_order_release);
85 }
86}
87
88Topic::Domain::Domain(const char *name)
89{
90 if (!domain_)
91 {
92 if (!domain_)
93 {
94 domain_ =
95 new RBTree<uint32_t>([](const uint32_t &a, const uint32_t &b)
96 { return static_cast<int>(a) - static_cast<int>(b); });
97 }
98 }
99
100 auto crc32 = CRC32::Calculate(name, strlen(name));
101
102 auto domain = domain_->Search<RBTree<uint32_t>>(crc32);
103
104 if (domain != nullptr)
105 {
106 node_ = domain;
107 return;
108 }
109
111 [](const uint32_t &a, const uint32_t &b)
112 { return static_cast<int>(a) - static_cast<int>(b); });
113
114 domain_->Insert(*node_, crc32);
115}
116
118{
119 CallbackBlock block;
120 block.cb = cb;
122 auto node = new (std::align_val_t(LIBXR_CACHE_LINE_SIZE))
124 block_->data_.subers.Add(*node);
125}
126
128
129Topic::Topic(const char *name, uint32_t max_length, Domain *domain, bool multi_publisher,
130 bool cache, bool check_length)
131{
132 if (!def_domain_)
133 {
134 if (!def_domain_)
135 {
136 def_domain_ = new Domain("libxr_def_domain");
137 }
138 }
139
140 if (domain == nullptr)
141 {
142 domain = def_domain_;
143 }
144
145 auto crc32 = CRC32::Calculate(name, strlen(name));
146
147 auto topic = domain->node_->data_.Search<Block>(crc32);
148
149 if (topic)
150 {
151 ASSERT(topic->data_.max_length == max_length);
152 ASSERT(topic->data_.check_length == check_length);
153
154 if (multi_publisher && !topic->data_.mutex)
155 {
156 ASSERT(false);
157 }
158
159 block_ = topic;
160 }
161 else
162 {
164 block_->data_.max_length = max_length;
165 block_->data_.crc32 = crc32;
166 block_->data_.data.addr_ = nullptr;
167 block_->data_.cache = false;
168 block_->data_.check_length = check_length;
169
170 if (multi_publisher)
171 {
172 block_->data_.mutex = new Mutex();
173 block_->data_.busy.store(LockState::USE_MUTEX, std::memory_order_release);
174 }
175 else
176 {
177 block_->data_.mutex = nullptr;
178 block_->data_.busy.store(LockState::UNLOCKED, std::memory_order_release);
179 }
180
181 domain->node_->data_.Insert(*block_, crc32);
182 }
183
184 if (cache && !block_->data_.cache)
185 {
186 EnableCache();
187 }
188}
189
191
192Topic::TopicHandle Topic::Find(const char *name, Domain *domain)
193{
194 if (domain == nullptr)
195 {
196 domain = def_domain_;
197 }
198
199 auto crc32 = CRC32::Calculate(name, strlen(name));
200
201 return domain->node_->data_.Search<Block>(crc32);
202}
203
205{
206 Lock(block_);
207
208 if (!block_->data_.cache)
209 {
210 block_->data_.cache = true;
211 block_->data_.data.addr_ = new uint8_t[block_->data_.max_length];
212 }
213
214 Unlock(block_);
215}
216
217void Topic::Publish(void *addr, uint32_t size)
218{
219 Lock(block_);
220 if (block_->data_.check_length)
221 {
222 ASSERT(size == block_->data_.max_length);
223 }
224 else
225 {
226 ASSERT(size <= block_->data_.max_length);
227 }
228
229 if (block_->data_.cache)
230 {
231 LibXR::Memory::FastCopy(block_->data_.data.addr_, addr, size);
232 block_->data_.data.size_ = size;
233 }
234 else
235 {
236 block_->data_.data.addr_ = addr;
237 block_->data_.data.size_ = size;
238 }
239
240 RawData data = block_->data_.data;
241
242 auto foreach_fun = [&](SuberBlock &block)
243 {
244 switch (block.type)
245 {
246 case SuberType::SYNC:
247 {
248 auto sync = reinterpret_cast<SyncBlock *>(&block);
249 LibXR::Memory::FastCopy(sync->buff.addr_, data.addr_, data.size_);
250 sync->sem.Post();
251 break;
252 }
253 case SuberType::ASYNC:
254 {
255 auto async = reinterpret_cast<ASyncBlock *>(&block);
256 if (async->state.load(std::memory_order_acquire) == ASyncSubscriberState::WAITING)
257 {
258 LibXR::Memory::FastCopy(async->buff.addr_, data.addr_, data.size_);
259 async->state.store(ASyncSubscriberState::DATA_READY, std::memory_order_release);
260 }
261 break;
262 }
263 case SuberType::QUEUE:
264 {
265 auto queue_block = reinterpret_cast<QueueBlock *>(&block);
266 queue_block->fun(data, queue_block->queue, false);
267 break;
268 }
270 {
271 auto cb_block = reinterpret_cast<CallbackBlock *>(&block);
272 cb_block->cb.Run(false, data);
273 break;
274 }
275 }
276 return ErrorCode::OK;
277 };
278
279 block_->data_.subers.Foreach<SuberBlock>(foreach_fun);
280
281 Unlock(block_);
282}
283
284void Topic::PublishFromCallback(void *addr, uint32_t size, bool in_isr)
285{
287 if (block_->data_.check_length)
288 {
289 ASSERT(size == block_->data_.max_length);
290 }
291 else
292 {
293 ASSERT(size <= block_->data_.max_length);
294 }
295
296 if (block_->data_.cache)
297 {
298 LibXR::Memory::FastCopy(block_->data_.data.addr_, addr, size);
299 block_->data_.data.size_ = size;
300 }
301 else
302 {
303 block_->data_.data.addr_ = addr;
304 block_->data_.data.size_ = size;
305 }
306
307 RawData data = block_->data_.data;
308
309 auto foreach_fun = [&](SuberBlock &block)
310 {
311 switch (block.type)
312 {
313 case SuberType::SYNC:
314 {
315 auto sync = reinterpret_cast<SyncBlock *>(&block);
316 LibXR::Memory::FastCopy(sync->buff.addr_, data.addr_, data.size_);
317 sync->sem.PostFromCallback(in_isr);
318 break;
319 }
320 case SuberType::ASYNC:
321 {
322 auto async = reinterpret_cast<ASyncBlock *>(&block);
323 if (async->state.load(std::memory_order_acquire) == ASyncSubscriberState::WAITING)
324 {
325 LibXR::Memory::FastCopy(async->buff.addr_, data.addr_, data.size_);
326 async->state.store(ASyncSubscriberState::DATA_READY, std::memory_order_release);
327 }
328 break;
329 }
330 case SuberType::QUEUE:
331 {
332 auto queue_block = reinterpret_cast<QueueBlock *>(&block);
333 queue_block->fun(data, queue_block->queue, false);
334 break;
335 }
337 {
338 auto cb_block = reinterpret_cast<CallbackBlock *>(&block);
339 cb_block->cb.Run(in_isr, data);
340 break;
341 }
342 }
343 return ErrorCode::OK;
344 };
345
346 block_->data_.subers.Foreach<SuberBlock>(foreach_fun);
347
349}
350
351void Topic::PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
352{
353 PackedData<uint8_t> *pack = reinterpret_cast<PackedData<uint8_t> *>(buffer.addr_);
354
355 LibXR::Memory::FastCopy(&pack->raw.data_, source.addr_, source.size_);
356
357 pack->raw.header_.prefix = 0xa5;
358 pack->raw.header_.topic_name_crc32 = topic_name_crc32;
359 pack->raw.header_.SetDataLen(source.size_);
360 pack->raw.header_.pack_header_crc8 =
361 CRC8::Calculate(&pack->raw, sizeof(PackedDataHeader) - sizeof(uint8_t));
362 uint8_t *crc8_pack =
363 reinterpret_cast<uint8_t *>(reinterpret_cast<uint8_t *>(pack) + PACK_BASE_SIZE +
364 source.size_ - sizeof(uint8_t));
365 *crc8_pack = CRC8::Calculate(pack, PACK_BASE_SIZE - sizeof(uint8_t) + source.size_);
366}
367
368Topic::TopicHandle Topic::WaitTopic(const char *name, uint32_t timeout, Domain *domain)
369{
370 TopicHandle topic = nullptr;
371 do
372 {
373 topic = Find(name, domain);
374 if (topic == nullptr)
375 {
376 if (timeout <= Thread::GetTime())
377 {
378 return nullptr;
379 }
380 Thread::Sleep(1);
381 }
382 } while (topic == nullptr);
383
384 return topic;
385}
386
387uint32_t Topic::GetKey() const
388{
389 if (block_)
390 {
391 return block_->key;
392 }
393 else
394 {
395 return 0;
396 }
397}
398
399Topic::Server::Server(size_t buffer_length)
400 : topic_map_([](const uint32_t &a, const uint32_t &b)
401 { return static_cast<int>(a) - static_cast<int>(b); }),
402 queue_(1, buffer_length)
403{
404 /* Minimum size: header8 + crc32 + length24 + crc8 + data + crc8 = 10 */
405 ASSERT(buffer_length > PACK_BASE_SIZE);
406 parse_buff_.size_ = buffer_length;
407 parse_buff_.addr_ = new uint8_t[buffer_length];
408}
409
411{
412 auto node = new RBTree<uint32_t>::Node<TopicHandle>(topic);
413 topic_map_.Insert(*node, topic->key);
414}
415
417{
418 size_t count = 0;
419
420 queue_.PushBatch(data.addr_, data.size_);
421
422 while (true)
423 { /* 1. Check prefix */
424 if (status_ == Status::WAIT_START)
425 {
426 /* Check start frame */
427 auto queue_size = queue_.Size();
428 for (uint32_t i = 0; i < queue_size; i++)
429 {
430 uint8_t prefix = 0;
431 queue_.Peek(&prefix);
432 if (prefix == 0xa5)
433 {
434 status_ = Status::WAIT_TOPIC;
435 break;
436 }
437 queue_.Pop();
438 }
439 /* Not found */
440 if (status_ == Status::WAIT_START)
441 {
442 return count;
443 }
444 }
445
446 /* 2. Get topic info */
447 if (status_ == Status::WAIT_TOPIC)
448 {
449 /* Check size&crc */
450 if (queue_.Size() >= sizeof(PackedDataHeader))
451 {
452 queue_.PopBatch(parse_buff_.addr_, sizeof(PackedDataHeader));
453 if (CRC8::Verify(parse_buff_.addr_, sizeof(PackedDataHeader)))
454 {
455 auto header = reinterpret_cast<PackedDataHeader *>(parse_buff_.addr_);
456 /* Find topic */
457 auto node = topic_map_.Search<TopicHandle>(header->topic_name_crc32);
458 if (node)
459 {
460 data_len_ = header->GetDataLen();
461 current_topic_ = *node;
462 if (data_len_ + PACK_BASE_SIZE >= queue_.length_)
463 {
464 status_ = Status::WAIT_START;
465 continue;
466 }
467 status_ = Status::WAIT_DATA_CRC;
468 }
469 else
470 {
471 status_ = Status::WAIT_START;
472 continue;
473 }
474 }
475 else
476 {
477 status_ = Status::WAIT_START;
478 continue;
479 }
480 }
481 else
482 {
483 return count;
484 }
485 }
486
487 /* 3. Get data */
488 if (status_ == Status::WAIT_DATA_CRC)
489 {
490 /* Check size&crc */
491 if (queue_.Size() >= data_len_ + sizeof(uint8_t))
492 {
493 uint8_t *data =
494 reinterpret_cast<uint8_t *>(parse_buff_.addr_) + sizeof(PackedDataHeader);
495 queue_.PopBatch(data, data_len_ + sizeof(uint8_t));
496 if (CRC8::Verify(parse_buff_.addr_,
497 data_len_ + sizeof(PackedDataHeader) + sizeof(uint8_t)))
498 {
499 status_ = Status::WAIT_START;
500 auto data =
501 reinterpret_cast<uint8_t *>(parse_buff_.addr_) + sizeof(PackedDataHeader);
502 if (data_len_ > current_topic_->data_.max_length)
503 {
504 data_len_ = current_topic_->data_.max_length;
505 }
506 Topic(current_topic_).Publish(data, data_len_);
507
508 count++;
509
510 continue;
511 }
512 else
513 {
514 status_ = Status::WAIT_START;
515 continue;
516 }
517 }
518 else
519 {
520 return count;
521 }
522 }
523 }
524 return count;
525}
static uint32_t Calculate(const void *raw, size_t len)
计算数据的 CRC32 校验码 / Computes the CRC32 checksum for the given data
Definition crc.hpp:251
static uint8_t Calculate(const void *raw, size_t len)
计算数据的 CRC8 校验码 / Computes the CRC8 checksum for the given data
Definition crc.hpp:66
static bool Verify(const void *raw, size_t len)
验证数据的 CRC8 校验码 / Verifies the CRC8 checksum of the given data
Definition crc.hpp:91
提供一个通用的回调包装,支持动态参数传递。 Provides a generic callback wrapper, supporting dynamic argument passing.
Definition libxr_cb.hpp:124
void Run(bool in_isr, PassArgs &&...args) const
执行回调函数,并传递参数。 Executes the callback function, passing the arguments.
Definition libxr_cb.hpp:207
常量原始数据封装类。 A class for encapsulating constant raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
const void * addr_
数据存储地址(常量)。 The storage address of the data (constant).
数据节点模板,继承自 BaseNode,用于存储具体数据类型。 Template data node that inherits from BaseNode to store specific data...
static void FastCopy(void *dst, const void *src, size_t size)
快速内存拷贝 / Fast memory copy
Definition libxr_mem.cpp:17
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
Definition mutex.hpp:18
Key key
节点键值 (Key associated with the node).
Definition rbt.hpp:41
红黑树的泛型数据节点,继承自 BaseNode (Generic data node for Red-Black Tree, inheriting from BaseNode).
Definition rbt.hpp:64
Data data_
存储的数据 (Stored data).
Definition rbt.hpp:99
红黑树实现,支持泛型键和值,并提供线程安全操作 (Red-Black Tree implementation supporting generic keys and values with thread...
Definition rbt.hpp:24
Node< Data > * Search(const Key &key)
搜索红黑树中的节点 (Search for a node in the Red-Black Tree).
Definition rbt.hpp:122
void Insert(BaseNode &node, KeyType &&key)
在树中插入新节点 (Insert a new node into the tree).
Definition rbt.hpp:237
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
static uint32_t GetTime()
获取当前系统时间(毫秒) Gets the current system time in milliseconds
Definition thread.cpp:41
static void Sleep(uint32_t milliseconds)
让线程进入休眠状态 Puts the thread to sleep
Definition thread.cpp:16
主题域(Domain)管理器,用于组织多个主题。Domain manager for organizing multiple topics.
Definition message.hpp:183
Domain(const char *name)
构造函数,初始化或查找指定名称的主题域。Constructor initializing or looking up a domain by name.
Definition message.cpp:88
RBTree< uint32_t >::Node< RBTree< uint32_t > > * node_
指向该域的根节点。Pointer to the root node of the domain.
Definition message.hpp:195
void Register(TopicHandle topic)
注册一个主题 Registers a topic
Definition message.cpp:410
size_t ParseData(ConstRawData data)
解析接收到的数据 Parses received data
Definition message.cpp:416
Server(size_t buffer_length)
构造函数,初始化服务器并分配缓冲区 Constructor to initialize the server and allocate buffer
Definition message.cpp:399
@ SYNC
同步订阅者。Synchronous subscriber.
@ ASYNC
异步订阅者。Asynchronous subscriber.
@ QUEUE
队列订阅者。Queued subscriber.
@ CALLBACK
回调订阅者。Callback subscriber.
void Publish(Data &data)
发布数据 Publishes data
Definition message.hpp:572
static Domain * def_domain_
默认的主题域,所有未指定域的主题都会归入此域 Default domain where all topics without a specified domain are assigned
Definition message.hpp:790
static void Unlock(TopicHandle topic)
解锁主题。Unlock the topic.
Definition message.cpp:46
static void Lock(TopicHandle topic)
锁定主题,防止其被多个订阅者同时访问。Lock the topic to prevent it from being accessed by multiple subscribers at the sa...
Definition message.cpp:28
static void UnlockFromCallback(TopicHandle topic)
从回调中解锁主题。Unlock the topic from a callback.
Definition message.cpp:76
static void LockFromCallback(TopicHandle topic)
从回调中锁定主题,防止其被多个订阅者同时访问。Lock the topic from a callback to prevent it from being accessed by multiple s...
Definition message.cpp:58
void EnableCache()
启用主题的缓存功能 Enables caching for the topic
Definition message.cpp:204
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待主题的创建并返回其句柄 Waits for a topic to be created and returns its handle
Definition message.cpp:368
void PublishFromCallback(Data &data, bool in_isr)
在回调函数中发布数据 Publishes data in a callback
Definition message.hpp:593
uint32_t GetKey() const
获取主题的键值 Gets the key value of the topic
Definition message.cpp:387
TopicHandle block_
主题句柄,指向当前主题的内存块 Topic handle pointing to the memory block of the current topic
Definition message.hpp:778
void RegisterCallback(Callback &cb)
注册回调函数 Registers a callback function
Definition message.cpp:117
static TopicHandle Find(const char *name, Domain *domain=nullptr)
在指定域中查找主题 Finds a topic in the specified domain
Definition message.cpp:192
static RBTree< uint32_t > * domain_
主题域的红黑树结构,存储不同的主题 Red-Black Tree structure for storing different topics in the domain
Definition message.hpp:784
Topic()
默认构造函数,创建一个空的 Topic 实例 Default constructor, creates an empty Topic instance
Definition message.cpp:127
static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
打包数据
Definition message.cpp:351
LibXR 命名空间
Definition ch32_gpio.hpp:9
异步订阅块,继承自 SuberBlock Asynchronous subscription block, inheriting from SuberBlock
Definition message.hpp:301
存储主题(Topic)数据的结构体。Structure storing topic data.
Definition message.hpp:45
回调订阅块,继承自 SuberBlock Callback subscription block, inheriting from SuberBlock
Definition message.hpp:459
Callback cb
订阅的回调函数 Subscribed callback function
Definition message.hpp:460
队列订阅块,继承自 SuberBlock Queue subscription block, inheriting from SuberBlock
Definition message.hpp:393
void(* fun)(RawData &, void *, bool)
处理数据的回调函数 Callback function to handle data
Definition message.hpp:395
订阅者信息存储结构。Structure storing subscriber information.
Definition message.hpp:215
SuberType type
订阅者类型。Type of subscriber.
Definition message.hpp:216
同步订阅者存储结构。Structure storing synchronous subscriber data.
Definition message.hpp:224