libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
message.cpp
1#include "message.hpp"
2
3#include <atomic>
4
5#include "libxr_def.hpp"
6#include "libxr_mem.hpp"
7#include "logger.hpp"
8#include "mutex.hpp"
9
10using namespace LibXR;
11
12template class LibXR::RBTree<uint32_t>;
14
15void Topic::PackedDataHeader::SetDataLen(uint32_t len)
16{
17 data_len_raw[0] = static_cast<uint8_t>(len >> 16);
18 data_len_raw[1] = static_cast<uint8_t>(len >> 8);
19 data_len_raw[2] = static_cast<uint8_t>(len);
20}
21
22uint32_t Topic::PackedDataHeader::GetDataLen() const
23{
24 return static_cast<uint32_t>(data_len_raw[0]) << 16 |
25 static_cast<uint32_t>(data_len_raw[1]) << 8 |
26 static_cast<uint32_t>(data_len_raw[2]);
27}
28
30{
31 if (topic->data_.mutex)
32 {
33 topic->data_.mutex->Lock();
34 }
35 else
36 {
37 LockState expected = LockState::UNLOCKED;
38 if (!topic->data_.busy.compare_exchange_strong(expected, LockState::LOCKED))
39 {
40 /* Multiple threads are trying to lock the same topic */
41 ASSERT(false);
42 return;
43 }
44 }
45}
46
48{
49 if (topic->data_.mutex)
50 {
51 topic->data_.mutex->Unlock();
52 }
53 else
54 {
55 topic->data_.busy.store(LockState::UNLOCKED, std::memory_order_release);
56 }
57}
58
60{
61 if (topic->data_.mutex)
62 {
63 ASSERT(false);
64 }
65 else
66 {
67 LockState expected = LockState::UNLOCKED;
68 if (!topic->data_.busy.compare_exchange_strong(expected, LockState::LOCKED))
69 {
70 /* Multiple threads are trying to lock the same topic */
71 ASSERT(false);
72 return;
73 }
74 }
75}
76
78{
79 if (topic->data_.mutex)
80 {
81 ASSERT(false);
82 }
83 else
84 {
85 topic->data_.busy.store(LockState::UNLOCKED, std::memory_order_release);
86 }
87}
88
89Topic::Domain::Domain(const char* name)
90{
91 ASSERT(name != nullptr);
92
93 if (!domain_)
94 {
95 if (!domain_)
96 {
97 domain_ =
98 new RBTree<uint32_t>([](const uint32_t& a, const uint32_t& b)
99 { return static_cast<int>(a) - static_cast<int>(b); });
100 }
101 }
102
103 auto crc32 = CRC32::Calculate(name, strlen(name));
104
105 auto domain = domain_->Search<RBTree<uint32_t>>(crc32);
106
107 if (domain != nullptr)
108 {
109 node_ = domain;
110 return;
111 }
112
114 [](const uint32_t& a, const uint32_t& b)
115 { return static_cast<int>(a) - static_cast<int>(b); });
116
117 domain_->Insert(*node_, crc32);
118}
119
121{
122 CallbackBlock block;
123 block.cb = cb;
125 auto node = new (std::align_val_t(LibXR::CACHE_LINE_SIZE))
127 block_->data_.subers.Add(*node);
128}
129
131
132Topic::Topic(const char* name, uint32_t max_length, Domain* domain, bool multi_publisher,
133 bool cache, bool check_length)
134{
135 if (!def_domain_)
136 {
137 if (!def_domain_)
138 {
139 def_domain_ = new Domain("libxr_def_domain");
140 }
141 }
142
143 if (domain == nullptr)
144 {
145 domain = def_domain_;
146 }
147
148 auto crc32 = CRC32::Calculate(name, strlen(name));
149
150 auto topic = domain->node_->data_.Search<Block>(crc32);
151
152 if (topic)
153 {
154 ASSERT(topic->data_.max_length == max_length);
155 ASSERT(topic->data_.check_length == check_length);
156
157 if (multi_publisher && !topic->data_.mutex)
158 {
159 ASSERT(false);
160 }
161
162 block_ = topic;
163 }
164 else
165 {
167 block_->data_.max_length = max_length;
168 block_->data_.crc32 = crc32;
169 block_->data_.data.addr_ = nullptr;
170 block_->data_.cache = false;
171 block_->data_.check_length = check_length;
172
173 if (multi_publisher)
174 {
175 block_->data_.mutex = new Mutex();
176 block_->data_.busy.store(LockState::USE_MUTEX, std::memory_order_release);
177 }
178 else
179 {
180 block_->data_.mutex = nullptr;
181 block_->data_.busy.store(LockState::UNLOCKED, std::memory_order_release);
182 }
183
184 domain->node_->data_.Insert(*block_, crc32);
185 }
186
187 if (cache && !block_->data_.cache)
188 {
189 EnableCache();
190 }
191}
192
194
195Topic::TopicHandle Topic::Find(const char* name, Domain* domain)
196{
197 if (domain == nullptr)
198 {
199 domain = def_domain_;
200 }
201
202 auto crc32 = CRC32::Calculate(name, strlen(name));
203
204 return domain->node_->data_.Search<Block>(crc32);
205}
206
208{
209 Lock(block_);
210
211 if (!block_->data_.cache)
212 {
213 block_->data_.cache = true;
214 block_->data_.data.addr_ = new uint8_t[block_->data_.max_length];
215 }
216
217 Unlock(block_);
218}
219
220void Topic::Publish(void* addr, uint32_t size)
221{
222 Lock(block_);
223 if (block_->data_.check_length)
224 {
225 ASSERT(size == block_->data_.max_length);
226 }
227 else
228 {
229 ASSERT(size <= block_->data_.max_length);
230 }
231
232 if (block_->data_.cache)
233 {
234 LibXR::Memory::FastCopy(block_->data_.data.addr_, addr, size);
235 block_->data_.data.size_ = size;
236 }
237 else
238 {
239 block_->data_.data.addr_ = addr;
240 block_->data_.data.size_ = size;
241 }
242
243 RawData data = block_->data_.data;
244
245 auto foreach_fun = [&](SuberBlock& block)
246 {
247 switch (block.type)
248 {
249 case SuberType::SYNC:
250 {
251 auto sync = reinterpret_cast<SyncBlock*>(&block);
252 LibXR::Memory::FastCopy(sync->buff.addr_, data.addr_, data.size_);
253 sync->sem.Post();
254 break;
255 }
256 case SuberType::ASYNC:
257 {
258 auto async = reinterpret_cast<ASyncBlock*>(&block);
259 if (async->state.load(std::memory_order_acquire) == ASyncSubscriberState::WAITING)
260 {
261 LibXR::Memory::FastCopy(async->buff.addr_, data.addr_, data.size_);
262 async->state.store(ASyncSubscriberState::DATA_READY, std::memory_order_release);
263 }
264 break;
265 }
266 case SuberType::QUEUE:
267 {
268 auto queue_block = reinterpret_cast<QueueBlock*>(&block);
269 queue_block->fun(data, queue_block->queue);
270 break;
271 }
273 {
274 auto cb_block = reinterpret_cast<CallbackBlock*>(&block);
275 cb_block->cb.Run(false, data);
276 break;
277 }
278 }
279 return ErrorCode::OK;
280 };
281
282 block_->data_.subers.Foreach<SuberBlock>(foreach_fun);
283
284 Unlock(block_);
285}
286
287void Topic::PublishFromCallback(void* addr, uint32_t size, bool in_isr)
288{
290 if (block_->data_.check_length)
291 {
292 ASSERT(size == block_->data_.max_length);
293 }
294 else
295 {
296 ASSERT(size <= block_->data_.max_length);
297 }
298
299 if (block_->data_.cache)
300 {
301 LibXR::Memory::FastCopy(block_->data_.data.addr_, addr, size);
302 block_->data_.data.size_ = size;
303 }
304 else
305 {
306 block_->data_.data.addr_ = addr;
307 block_->data_.data.size_ = size;
308 }
309
310 RawData data = block_->data_.data;
311
312 auto foreach_fun = [&](SuberBlock& block)
313 {
314 switch (block.type)
315 {
316 case SuberType::SYNC:
317 {
318 auto sync = reinterpret_cast<SyncBlock*>(&block);
319 LibXR::Memory::FastCopy(sync->buff.addr_, data.addr_, data.size_);
320 sync->sem.PostFromCallback(in_isr);
321 break;
322 }
323 case SuberType::ASYNC:
324 {
325 auto async = reinterpret_cast<ASyncBlock*>(&block);
326 if (async->state.load(std::memory_order_acquire) == ASyncSubscriberState::WAITING)
327 {
328 LibXR::Memory::FastCopy(async->buff.addr_, data.addr_, data.size_);
329 async->state.store(ASyncSubscriberState::DATA_READY, std::memory_order_release);
330 }
331 break;
332 }
333 case SuberType::QUEUE:
334 {
335 auto queue_block = reinterpret_cast<QueueBlock*>(&block);
336 queue_block->fun(data, queue_block->queue);
337 break;
338 }
340 {
341 auto cb_block = reinterpret_cast<CallbackBlock*>(&block);
342 cb_block->cb.Run(in_isr, data);
343 break;
344 }
345 }
346 return ErrorCode::OK;
347 };
348
349 block_->data_.subers.Foreach<SuberBlock>(foreach_fun);
350
352}
353
354void Topic::PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
355{
356 PackedData<uint8_t>* pack = reinterpret_cast<PackedData<uint8_t>*>(buffer.addr_);
357
358 LibXR::Memory::FastCopy(&pack->raw.data_, source.addr_, source.size_);
359
360 pack->raw.header_.prefix = 0xa5;
361 pack->raw.header_.topic_name_crc32 = topic_name_crc32;
362 pack->raw.header_.SetDataLen(source.size_);
363 pack->raw.header_.pack_header_crc8 =
364 CRC8::Calculate(&pack->raw, sizeof(PackedDataHeader) - sizeof(uint8_t));
365 uint8_t* crc8_pack = reinterpret_cast<uint8_t*>(
366 reinterpret_cast<uint8_t*>(pack) + PACK_BASE_SIZE + source.size_ - sizeof(uint8_t));
367 *crc8_pack = CRC8::Calculate(pack, PACK_BASE_SIZE - sizeof(uint8_t) + source.size_);
368}
369
370Topic::TopicHandle Topic::WaitTopic(const char* name, uint32_t timeout, Domain* domain)
371{
372 const uint32_t start_time = Thread::GetTime();
373 TopicHandle topic = nullptr;
374 do
375 {
376 topic = Find(name, domain);
377 if (topic == nullptr)
378 {
379 if (timeout != UINT32_MAX &&
380 static_cast<uint32_t>(Thread::GetTime() - start_time) >= timeout)
381 {
382 return nullptr;
383 }
384 Thread::Sleep(1);
385 }
386 } while (topic == nullptr);
387
388 return topic;
389}
390
391uint32_t Topic::GetKey() const
392{
393 if (block_)
394 {
395 return block_->key;
396 }
397 else
398 {
399 return 0;
400 }
401}
402
403Topic::Server::Server(size_t buffer_length)
404 : topic_map_([](const uint32_t& a, const uint32_t& b)
405 { return static_cast<int>(a) - static_cast<int>(b); }),
406 queue_(1, buffer_length)
407{
408 /* Minimum size: header8 + crc32 + length24 + crc8 + data + crc8 = 10 */
409 ASSERT(buffer_length > PACK_BASE_SIZE);
410 parse_buff_.size_ = buffer_length;
411 parse_buff_.addr_ = new uint8_t[buffer_length];
412}
413
415{
416 auto node = new RBTree<uint32_t>::Node<TopicHandle>(topic);
417 topic_map_.Insert(*node, topic->key);
418}
419
421{
422 size_t count = 0;
423
424 queue_.PushBatch(data.addr_, data.size_);
425
426 while (true)
427 { /* 1. Check prefix */
428 if (status_ == Status::WAIT_START)
429 {
430 /* Check start frame */
431 auto queue_size = queue_.Size();
432 for (uint32_t i = 0; i < queue_size; i++)
433 {
434 uint8_t prefix = 0;
435 queue_.Peek(&prefix);
436 if (prefix == 0xa5)
437 {
438 status_ = Status::WAIT_TOPIC;
439 break;
440 }
441 queue_.Pop();
442 }
443 /* Not found */
444 if (status_ == Status::WAIT_START)
445 {
446 return count;
447 }
448 }
449
450 /* 2. Get topic info */
451 if (status_ == Status::WAIT_TOPIC)
452 {
453 /* Check size&crc */
454 if (queue_.Size() >= sizeof(PackedDataHeader))
455 {
456 queue_.PopBatch(parse_buff_.addr_, sizeof(PackedDataHeader));
457 if (CRC8::Verify(parse_buff_.addr_, sizeof(PackedDataHeader)))
458 {
459 auto header = reinterpret_cast<PackedDataHeader*>(parse_buff_.addr_);
460 /* Find topic */
461 auto node = topic_map_.Search<TopicHandle>(header->topic_name_crc32);
462 if (node)
463 {
464 data_len_ = header->GetDataLen();
465 current_topic_ = *node;
466 if (data_len_ + PACK_BASE_SIZE >= queue_.length_)
467 {
468 status_ = Status::WAIT_START;
469 continue;
470 }
471 status_ = Status::WAIT_DATA_CRC;
472 }
473 else
474 {
475 status_ = Status::WAIT_START;
476 continue;
477 }
478 }
479 else
480 {
481 status_ = Status::WAIT_START;
482 continue;
483 }
484 }
485 else
486 {
487 return count;
488 }
489 }
490
491 /* 3. Get data */
492 if (status_ == Status::WAIT_DATA_CRC)
493 {
494 /* Check size&crc */
495 if (queue_.Size() >= data_len_ + sizeof(uint8_t))
496 {
497 uint8_t* data =
498 reinterpret_cast<uint8_t*>(parse_buff_.addr_) + sizeof(PackedDataHeader);
499 queue_.PopBatch(data, data_len_ + sizeof(uint8_t));
500 if (CRC8::Verify(parse_buff_.addr_,
501 data_len_ + sizeof(PackedDataHeader) + sizeof(uint8_t)))
502 {
503 status_ = Status::WAIT_START;
504 auto data =
505 reinterpret_cast<uint8_t*>(parse_buff_.addr_) + sizeof(PackedDataHeader);
506 if (data_len_ > current_topic_->data_.max_length)
507 {
508 data_len_ = current_topic_->data_.max_length;
509 }
510 Topic(current_topic_).Publish(data, data_len_);
511
512 count++;
513
514 continue;
515 }
516 else
517 {
518 status_ = Status::WAIT_START;
519 continue;
520 }
521 }
522 else
523 {
524 return count;
525 }
526 }
527 }
528 return count;
529}
static uint32_t Calculate(const void *raw, size_t len)
计算数据的 CRC32 校验码 / Computes the CRC32 checksum for the given data
Definition crc.hpp:251
static uint8_t Calculate(const void *raw, size_t len)
计算数据的 CRC8 校验码 / Computes the CRC8 checksum for the given data
Definition crc.hpp:66
static bool Verify(const void *raw, size_t len)
验证数据的 CRC8 校验码 / Verifies the CRC8 checksum of the given data
Definition crc.hpp:91
通用回调包装,支持动态参数传递 / Generic callback wrapper supporting dynamic argument passing
Definition libxr_cb.hpp:142
常量原始数据封装类。 A class for encapsulating constant raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
const void * addr_
数据存储地址(常量)。 The storage address of the data (constant).
数据节点模板,继承自 BaseNode,用于存储具体数据类型。 Template data node that inherits from BaseNode to store specific data...
static void FastCopy(void *dst, const void *src, size_t size)
快速内存拷贝 / Fast memory copy
Definition libxr_mem.cpp:5
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
Definition mutex.hpp:18
Key key
节点键值 (Key associated with the node).
Definition rbt.hpp:41
红黑树的泛型数据节点,继承自 BaseNode (Generic data node for Red-Black Tree, inheriting from BaseNode).
Definition rbt.hpp:64
Data data_
存储的数据 (Stored data).
Definition rbt.hpp:99
红黑树实现,支持泛型键和值,并提供线程安全操作 (Red-Black Tree implementation supporting generic keys and values with thread...
Definition rbt.hpp:24
Node< Data > * Search(const Key &key)
搜索红黑树中的节点 (Search for a node in the Red-Black Tree).
Definition rbt.hpp:121
void Insert(BaseNode &node, KeyType &&key)
在树中插入新节点 (Insert a new node into the tree).
Definition rbt.hpp:236
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
static uint32_t GetTime()
获取当前系统时间(毫秒) Gets the current system time in milliseconds
Definition thread.cpp:35
static void Sleep(uint32_t milliseconds)
让线程进入休眠状态 Puts the thread to sleep
Definition thread.cpp:15
主题域(Domain)管理器,用于组织多个主题。Domain manager for organizing multiple topics.
Definition message.hpp:183
Domain(const char *name)
构造函数,初始化或查找指定名称的主题域。Constructor initializing or looking up a domain by name.
Definition message.cpp:89
RBTree< uint32_t >::Node< RBTree< uint32_t > > * node_
指向该域的根节点。Pointer to the root node of the domain.
Definition message.hpp:198
void Register(TopicHandle topic)
注册一个主题 Registers a topic
Definition message.cpp:414
size_t ParseData(ConstRawData data)
解析接收到的数据 Parses received data
Definition message.cpp:420
Server(size_t buffer_length)
构造函数,初始化服务器并分配缓冲区 Constructor to initialize the server and allocate buffer
Definition message.cpp:403
@ SYNC
同步订阅者。Synchronous subscriber.
@ ASYNC
异步订阅者。Asynchronous subscriber.
@ QUEUE
队列订阅者。Queued subscriber.
@ CALLBACK
回调订阅者。Callback subscriber.
void Publish(Data &data)
发布数据 Publishes data
Definition message.hpp:607
static Domain * def_domain_
默认的主题域,所有未指定域的主题都会归入此域 Default domain where all topics without a specified domain are assigned
Definition message.hpp:831
static void Unlock(TopicHandle topic)
解锁主题。Unlock the topic.
Definition message.cpp:47
static void Lock(TopicHandle topic)
锁定主题,防止其被多个订阅者同时访问。Lock the topic to prevent it from being accessed by multiple subscribers at the sa...
Definition message.cpp:29
static void UnlockFromCallback(TopicHandle topic)
从回调中解锁主题。Unlock the topic from a callback.
Definition message.cpp:77
static void LockFromCallback(TopicHandle topic)
从回调中锁定主题,防止其被多个订阅者同时访问。Lock the topic from a callback to prevent it from being accessed by multiple s...
Definition message.cpp:59
void EnableCache()
启用主题的缓存功能 Enables caching for the topic
Definition message.cpp:207
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待主题的创建并返回其句柄 Waits for a topic to be created and returns its handle
Definition message.cpp:370
void PublishFromCallback(Data &data, bool in_isr)
在回调函数中发布数据 Publishes data in a callback
Definition message.hpp:628
uint32_t GetKey() const
获取主题的键值 Gets the key value of the topic
Definition message.cpp:391
TopicHandle block_
主题句柄,指向当前主题的内存块 Topic handle pointing to the memory block of the current topic
Definition message.hpp:819
void RegisterCallback(Callback &cb)
注册回调函数 Registers a callback function
Definition message.cpp:120
static TopicHandle Find(const char *name, Domain *domain=nullptr)
在指定域中查找主题 Finds a topic in the specified domain
Definition message.cpp:195
static RBTree< uint32_t > * domain_
主题域的红黑树结构,存储不同的主题 Red-Black Tree structure for storing different topics in the domain
Definition message.hpp:825
Topic()
默认构造函数,创建一个空的 Topic 实例 Default constructor, creates an empty Topic instance
Definition message.cpp:130
static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
打包数据
Definition message.cpp:354
LibXR 命名空间
Definition ch32_can.hpp:14
@ OK
操作成功 | Operation successful
constexpr size_t CACHE_LINE_SIZE
缓存行大小 / Cache line size
Definition libxr_def.hpp:32
异步订阅块,继承自 SuberBlock Asynchronous subscription block, inheriting from SuberBlock
Definition message.hpp:310
存储主题(Topic)数据的结构体。Structure storing topic data.
Definition message.hpp:45
回调订阅块,继承自 SuberBlock Callback subscription block, inheriting from SuberBlock
Definition message.hpp:479
Callback cb
订阅的回调函数 Subscribed callback function
Definition message.hpp:480
队列订阅块,继承自 SuberBlock Queue subscription block, inheriting from SuberBlock
Definition message.hpp:408
void(* fun)(RawData &, void *)
处理数据的回调函数 Callback function to handle data
Definition message.hpp:410
订阅者信息存储结构。Structure storing subscriber information.
Definition message.hpp:218
SuberType type
订阅者类型。Type of subscriber.
Definition message.hpp:219
同步订阅者存储结构。Structure storing synchronous subscriber data.
Definition message.hpp:227