libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
message.hpp
1#pragma once
2
3#include "crc.hpp"
4#include "libxr_cb.hpp"
5#include "libxr_def.hpp"
6#include "libxr_type.hpp"
7#include "lock_queue.hpp"
8#include "lockfree_list.hpp"
9#include "lockfree_queue.hpp"
10#include "mutex.hpp"
11#include "queue.hpp"
12#include "rbt.hpp"
13#include "semaphore.hpp"
14#include "thread.hpp"
15
16namespace LibXR
17{
27class Topic
28{
33 public:
44#ifndef __DOXYGEN__
45
46#pragma pack(push, 1)
51 struct PackedDataHeader
52 {
53 uint8_t prefix;
54 uint32_t
55 topic_name_crc32;
56 uint8_t data_len_raw[3];
57 uint8_t pack_header_crc8;
58
59 void SetDataLen(uint32_t len)
60 {
61 data_len_raw[0] = static_cast<uint8_t>(len >> 16);
62 data_len_raw[1] = static_cast<uint8_t>(len >> 8);
63 data_len_raw[2] = static_cast<uint8_t>(len);
64 }
65
66 uint32_t GetDataLen() const
67 {
68 return static_cast<uint32_t>(data_len_raw[0]) << 16 |
69 static_cast<uint32_t>(data_len_raw[1]) << 8 |
70 static_cast<uint32_t>(data_len_raw[2]);
71 }
72 };
73#pragma pack(pop)
74#pragma pack(push, 1)
83 template <typename Data>
84 class PackedData
85 {
86 public:
87#pragma pack(push, 1)
93 struct
94 {
95 PackedDataHeader header_;
96 uint8_t data_[sizeof(Data)];
97 } raw;
98
99 uint8_t crc8_;
100
101#pragma pack(pop)
102
109 PackedData &operator=(const Data &data)
110 {
111 memcpy(raw.data_, &data, sizeof(Data));
112 crc8_ = CRC8::Calculate(&raw, sizeof(raw));
113 return *this;
114 }
115
120 operator Data() { return *reinterpret_cast<Data *>(raw.data_); }
121
125 Data *operator->() { return reinterpret_cast<Data *>(raw.data_); }
126
131 const Data *operator->() const { return reinterpret_cast<const Data *>(raw.data_); }
132 };
133#pragma pack(pop)
134
135 static constexpr size_t PACK_BASE_SIZE = sizeof(PackedData<uint8_t>) - 1;
136
137#endif
138
145
151 class Domain
152 {
153 public:
159 Domain(const char *name)
160 {
161 if (!domain_)
162 {
163 if (!domain_)
164 {
165 domain_ =
166 new RBTree<uint32_t>([](const uint32_t &a, const uint32_t &b)
167 { return static_cast<int>(a) - static_cast<int>(b); });
168 }
169 }
170
171 auto crc32 = CRC32::Calculate(name, strlen(name));
172
173 auto domain = domain_->Search<RBTree<uint32_t>>(crc32);
174
175 if (domain != nullptr)
176 {
177 node_ = domain;
178 return;
179 }
180
182 [](const uint32_t &a, const uint32_t &b)
183 { return static_cast<int>(a) - static_cast<int>(b); });
184
185 domain_->Insert(*node_, crc32);
186 }
187
192 };
193
198 enum class SuberType : uint8_t
199 {
200 SYNC,
201 ASYNC,
202 QUEUE,
203 CALLBACK,
204 };
205
211 {
213 };
214
219 struct SyncBlock : public SuberBlock
220 {
223 };
224
231 template <typename Data>
233 {
234 public:
242 SyncSubscriber(const char *name, Data &data, Domain *domain = nullptr)
243 {
244 *this = SyncSubscriber<Data>(WaitTopic(name, UINT32_MAX, domain), data);
245 }
246
253 SyncSubscriber(Topic topic, Data &data)
254 {
255 if (topic.block_->data_.check_length)
256 {
257 ASSERT(topic.block_->data_.max_length == sizeof(Data));
258 }
259 else
260 {
261 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
262 }
263
266 block_->data_.buff = RawData(data);
267 topic.block_->data_.subers.Add(*block_);
268 }
269
275 ErrorCode Wait(uint32_t timeout = UINT32_MAX)
276 {
277 return block_->data_.sem.Wait(timeout);
278 }
279
281 };
282
288 typedef struct ASyncBlock : public SuberBlock
289 {
292 bool waiting;
293 } ASyncBlock;
294
301 template <typename Data>
303 {
304 public:
311 ASyncSubscriber(const char *name, Domain *domain = nullptr)
312 {
313 *this = ASyncSubscriber(WaitTopic(name, UINT32_MAX, domain));
314 }
315
322 {
323 if (topic.block_->data_.check_length)
324 {
325 ASSERT(topic.block_->data_.max_length == sizeof(Data));
326 }
327 else
328 {
329 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
330 }
331
334 block_->data_.buff = *(new Data);
335 topic.block_->data_.subers.Add(*block_);
336 }
337
346 bool Available() { return block_->data_.data_ready; }
347
354 Data &GetData()
355 {
356 block_->data_.data_ready = false;
357 return *reinterpret_cast<Data *>(block_->data_.buff.addr_);
358 }
359
364 void StartWaiting() { block_->data_.waiting = true; }
365
367 };
368
374 typedef struct QueueBlock : public SuberBlock
375 {
376 void *queue;
377 void (*fun)(RawData &, void *,
378 bool);
379 } QueueBlock;
380
391 {
392 public:
393 template <typename Data, uint32_t Length>
394 QueuedSubscriber(const char *name, LockFreeQueue<Data> &queue,
395 Domain *domain = nullptr)
396 {
397 *this = QueuedSubscriber(WaitTopic(name, UINT32_MAX, domain), queue);
398 }
399
407 template <typename Data>
409 {
410 if (topic.block_->data_.check_length)
411 {
412 ASSERT(topic.block_->data_.max_length == sizeof(Data));
413 }
414 else
415 {
416 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
417 }
418
419 auto block = new LockFreeList::Node<QueueBlock>;
420 block->data_.type = SuberType::QUEUE;
421 block->data_.queue = &queue;
422 block->data_.fun = [](RawData &data, void *arg, bool in_isr)
423 {
424 UNUSED(in_isr);
425 LockFreeQueue<Data> *queue = reinterpret_cast<LockFreeQueue<Data>>(arg);
426 queue->Push(reinterpret_cast<Data>(data.addr_));
427 };
428
429 topic.block_->data_.subers.Add(*block);
430 }
431
440 template <typename Data>
441 QueuedSubscriber(const char *name, LockQueue<Data> &queue, Domain *domain = nullptr)
442 {
443 *this = QueuedSubscriber(WaitTopic(name, UINT32_MAX, domain), queue);
444 }
445
453 template <typename Data>
455 {
456 if (topic.block_->data_.check_length)
457 {
458 ASSERT(topic.block_->data_.max_length == sizeof(Data));
459 }
460 else
461 {
462 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
463 }
464
465 auto block = new LockFreeList::Node<QueueBlock>;
466 block->data_.type = SuberType::QUEUE;
467 block->data_.queue = &queue;
468 block->data_.fun = [](RawData &data, void *arg, bool in_isr)
469 {
470 LockQueue<Data> *queue = reinterpret_cast<LockQueue<Data> *>(arg);
471 queue->PushFromCallback(*reinterpret_cast<const Data *>(data.addr_), in_isr);
472 };
473
474 topic.block_->data_.subers.Add(*block);
475 }
476 };
477
479
485 typedef struct CallbackBlock : public SuberBlock
486 {
489
496 {
497 CallbackBlock block;
498 block.cb = cb;
500 auto node = new (std::align_val_t(LIBXR_CACHE_LINE_SIZE))
502 block_->data_.subers.Add(*node);
503 }
504
509 Topic() {}
510
523 Topic(const char *name, uint32_t max_length, Domain *domain = nullptr,
524 bool cache = false, bool check_length = false)
525 {
526 if (!def_domain_)
527 {
528 if (!def_domain_)
529 {
530 def_domain_ = new Domain("libxr_def_domain");
531 }
532 }
533
534 if (domain == nullptr)
535 {
536 domain = def_domain_;
537 }
538
539 auto crc32 = CRC32::Calculate(name, strlen(name));
540
541 auto topic = domain->node_->data_.Search<Block>(crc32);
542
543 if (topic)
544 {
545 ASSERT(topic->data_.max_length == max_length);
546 ASSERT(topic->data_.check_length == check_length);
547
548 block_ = topic;
549 }
550 else
551 {
553 block_->data_.max_length = max_length;
554 block_->data_.crc32 = crc32;
555 block_->data_.data.addr_ = nullptr;
556 block_->data_.cache = false;
557 block_->data_.check_length = check_length;
558
559 domain->node_->data_.Insert(*block_, crc32);
560 }
561
562 if (cache && !block_->data_.cache)
563 {
564 EnableCache();
565 }
566 }
567
580 template <typename Data>
581 static Topic CreateTopic(const char *name, Domain *domain = nullptr, bool cache = false,
582 bool check_length = true)
583 {
584 return Topic(name, sizeof(Data), domain, cache, check_length);
585 }
586
592 Topic(TopicHandle topic) : block_(topic) {}
593
602 static TopicHandle Find(const char *name, Domain *domain = nullptr)
603 {
604 if (domain == nullptr)
605 {
606 domain = def_domain_;
607 }
608
609 auto crc32 = CRC32::Calculate(name, strlen(name));
610
611 return domain->node_->data_.Search<Block>(crc32);
612 }
613
626 template <typename Data>
627 static TopicHandle FindOrCreate(const char *name, Domain *domain = nullptr,
628 bool cache = false, bool check_length = true)
629 {
630 auto topic = Find(name, domain);
631 if (topic == nullptr)
632 {
633 topic = CreateTopic<Data>(name, domain, cache, check_length).block_;
634 }
635 return topic;
636 }
637
643 {
644 block_->data_.mutex.Lock();
645 if (!block_->data_.cache)
646 {
647 block_->data_.cache = true;
648 block_->data_.data.addr_ = new uint8_t[block_->data_.max_length];
649 }
650 block_->data_.mutex.Unlock();
651 }
652
659 template <typename Data>
660 void Publish(Data &data)
661 {
662 Publish(&data, sizeof(Data));
663 }
664
671 void Publish(void *addr, uint32_t size)
672 {
673 block_->data_.mutex.Lock();
674 if (block_->data_.check_length)
675 {
676 ASSERT(size == block_->data_.max_length);
677 }
678 else
679 {
680 ASSERT(size <= block_->data_.max_length);
681 }
682
683 if (block_->data_.cache)
684 {
685 memcpy(block_->data_.data.addr_, addr, size);
686 block_->data_.data.size_ = size;
687 }
688 else
689 {
690 block_->data_.data.addr_ = addr;
691 block_->data_.data.size_ = size;
692 }
693
694 RawData data = block_->data_.data;
695
696 auto foreach_fun = [&](SuberBlock &block)
697 {
698 switch (block.type)
699 {
700 case SuberType::SYNC:
701 {
702 auto sync = reinterpret_cast<SyncBlock *>(&block);
703 memcpy(sync->buff.addr_, data.addr_, data.size_);
704 sync->sem.Post();
705 break;
706 }
707 case SuberType::ASYNC:
708 {
709 auto async = reinterpret_cast<ASyncBlock *>(&block);
710 if (async->waiting)
711 {
712 memcpy(async->buff.addr_, data.addr_, data.size_);
713 async->data_ready = true;
714 }
715 break;
716 }
717 case SuberType::QUEUE:
718 {
719 auto queue_block = reinterpret_cast<QueueBlock *>(&block);
720 queue_block->fun(data, queue_block->queue, false);
721 break;
722 }
724 {
725 auto cb_block = reinterpret_cast<CallbackBlock *>(&block);
726 cb_block->cb.Run(false, data);
727 break;
728 }
729 }
730 return ErrorCode::OK;
731 };
732
733 block_->data_.subers.Foreach<SuberBlock>(foreach_fun);
734
735 block_->data_.mutex.Unlock();
736 }
737
747 template <SizeLimitMode Mode = SizeLimitMode::MORE>
748 ErrorCode DumpData(RawData data, bool pack = false)
749 {
750 if (block_->data_.data.addr_ == nullptr)
751 {
752 return ErrorCode::EMPTY;
753 }
754
755 if (!pack)
756 {
758 block_->data_.mutex.Lock();
759 memcpy(data.addr_, block_->data_.data.addr_, block_->data_.data.size_);
760 block_->data_.mutex.Unlock();
761 }
762 else
763 {
764 Assert::SizeLimitCheck<Mode>(PACK_BASE_SIZE + block_->data_.data.size_, data.size_);
765
766 block_->data_.mutex.Lock();
767 PackData(block_->data_.crc32, data, block_->data_.data);
768 block_->data_.mutex.Unlock();
769 }
770
771 return ErrorCode::OK;
772 }
773
781 static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
782 {
783 PackedData<uint8_t> *pack = reinterpret_cast<PackedData<uint8_t> *>(buffer.addr_);
784
785 memcpy(&pack->raw.data_, source.addr_, source.size_);
786
787 pack->raw.header_.prefix = 0xa5;
788 pack->raw.header_.topic_name_crc32 = topic_name_crc32;
789 pack->raw.header_.SetDataLen(source.size_);
790 pack->raw.header_.pack_header_crc8 =
791 CRC8::Calculate(&pack->raw, sizeof(PackedDataHeader) - sizeof(uint8_t));
792 uint8_t *crc8_pack =
793 reinterpret_cast<uint8_t *>(reinterpret_cast<uint8_t *>(pack) + PACK_BASE_SIZE +
794 source.size_ - sizeof(uint8_t));
795 *crc8_pack = CRC8::Calculate(pack, PACK_BASE_SIZE - sizeof(uint8_t) + source.size_);
796 }
797
804 template <typename Data>
805 ErrorCode DumpData(PackedData<Data> &data)
806 {
807 if (block_->data_.data.addr_ == nullptr)
808 {
809 return ErrorCode::EMPTY;
810 }
811
812 ASSERT(sizeof(Data) == block_->data_.data.size_);
813
814 return DumpData<SizeLimitMode::NONE>(RawData(data), true);
815 }
816
823 template <typename Data>
824 ErrorCode DumpData(Data &data)
825 {
826 if (block_->data_.data.addr_ == nullptr)
827 {
828 return ErrorCode::EMPTY;
829 }
830
831 ASSERT(sizeof(Data) == block_->data_.data.size_);
832
833 return DumpData<SizeLimitMode::NONE>(data, false);
834 }
835
847 static TopicHandle WaitTopic(const char *name, uint32_t timeout = UINT32_MAX,
848 Domain *domain = nullptr)
849 {
850 TopicHandle topic = nullptr;
851 do
852 {
853 topic = Find(name, domain);
854 if (topic == nullptr)
855 {
856 if (timeout <= Thread::GetTime())
857 {
858 return nullptr;
859 }
860 Thread::Sleep(1);
861 }
862 } while (topic == nullptr);
863
864 return topic;
865 }
866
872 operator TopicHandle() { return block_; }
873
880 uint32_t GetKey() const
881 {
882 if (block_)
883 {
884 return block_->key;
885 }
886 else
887 {
888 return 0;
889 }
890 }
891
898 class Server
899 {
900 public:
906 enum class Status : uint8_t
907 {
908 WAIT_START,
909 WAIT_TOPIC,
911 };
912
918 Server(size_t buffer_length)
919 : topic_map_([](const uint32_t &a, const uint32_t &b)
920 { return static_cast<int>(a) - static_cast<int>(b); }),
921 queue_(1, buffer_length)
922 {
923 /* Minimum size: header8 + crc32 + length24 + crc8 + data + crc8 = 10 */
924 ASSERT(buffer_length > PACK_BASE_SIZE);
925 parse_buff_.size_ = buffer_length;
926 parse_buff_.addr_ = new uint8_t[buffer_length];
927 }
928
935 {
936 auto node = new RBTree<uint32_t>::Node<TopicHandle>(topic);
937 topic_map_.Insert(*node, topic->key);
938 }
939
947 {
948 size_t count = 0;
949
950 queue_.PushBatch(data.addr_, data.size_);
951
952 while (true)
953 { /* 1. Check prefix */
955 {
956 /* Check start frame */
957 auto queue_size = queue_.Size();
958 for (uint32_t i = 0; i < queue_size; i++)
959 {
960 uint8_t prefix = 0;
961 queue_.Peek(&prefix);
962 if (prefix == 0xa5)
963 {
965 break;
966 }
967 queue_.Pop();
968 }
969 /* Not found */
971 {
972 return count;
973 }
974 }
975
976 /* 2. Get topic info */
978 {
979 /* Check size&crc */
980 if (queue_.Size() >= sizeof(PackedDataHeader))
981 {
982 queue_.PopBatch(parse_buff_.addr_, sizeof(PackedDataHeader));
983 if (CRC8::Verify(parse_buff_.addr_, sizeof(PackedDataHeader)))
984 {
985 auto header = reinterpret_cast<PackedDataHeader *>(parse_buff_.addr_);
986 /* Find topic */
987 auto node = topic_map_.Search<TopicHandle>(header->topic_name_crc32);
988 if (node)
989 {
990 data_len_ = header->GetDataLen();
991 current_topic_ = *node;
992 if (data_len_ + PACK_BASE_SIZE >= queue_.length_)
993 {
995 continue;
996 }
998 }
999 else
1000 {
1002 continue;
1003 }
1004 }
1005 else
1006 {
1008 continue;
1009 }
1010 }
1011 else
1012 {
1013 return count;
1014 }
1015 }
1016
1017 /* 3. Get data */
1019 {
1020 /* Check size&crc */
1021 if (queue_.Size() >= data_len_ + sizeof(uint8_t))
1022 {
1023 uint8_t *data =
1024 reinterpret_cast<uint8_t *>(parse_buff_.addr_) + sizeof(PackedDataHeader);
1025 queue_.PopBatch(data, data_len_ + sizeof(uint8_t));
1027 data_len_ + sizeof(PackedDataHeader) + sizeof(uint8_t)))
1028 {
1030 auto data = reinterpret_cast<uint8_t *>(parse_buff_.addr_) +
1031 sizeof(PackedDataHeader);
1032 if (data_len_ > current_topic_->data_.max_length)
1033 {
1034 data_len_ = current_topic_->data_.max_length;
1035 }
1036 Topic(current_topic_).Publish(data, data_len_);
1037
1038 count++;
1039
1040 continue;
1041 }
1042 else
1043 {
1045 continue;
1046 }
1047 }
1048 else
1049 {
1050 return count;
1051 }
1052 }
1053 }
1054 return count;
1055 }
1056
1057 private:
1060 uint32_t data_len_ = 0;
1065 };
1066
1067 private:
1073
1078 static inline RBTree<uint32_t> *domain_ = nullptr;
1079
1084 static inline Domain *def_domain_ = nullptr;
1085};
1086} // namespace LibXR
static void SizeLimitCheck(size_t limit, size_t size)
在非调试模式下的占位大小检查函数(无实际作用)。 Dummy size limit check for non-debug builds.
基础队列类,提供固定大小的循环缓冲区 (Base queue class providing a fixed-size circular buffer).
Definition queue.hpp:20
ErrorCode Pop(void *data=nullptr)
移除队列头部的元素 (Pop the front element from the queue).
Definition queue.hpp:124
size_t length_
队列最大容量 (Maximum queue capacity).
Definition queue.hpp:345
ErrorCode PopBatch(void *data, size_t size)
批量移除多个元素 (Pop multiple elements from the queue).
Definition queue.hpp:214
size_t Size() const
获取队列中的元素个数 (Get the number of elements in the queue).
Definition queue.hpp:312
ErrorCode Peek(void *data)
获取队列头部的元素但不移除 (Peek at the front element without removing it).
Definition queue.hpp:102
ErrorCode PushBatch(const void *data, size_t size)
批量推入多个元素 (Push multiple elements into the queue).
Definition queue.hpp:177
static uint32_t Calculate(const void *raw, size_t len)
计算数据的 CRC32 校验码 / Computes the CRC32 checksum for the given data
Definition crc.hpp:251
static uint8_t Calculate(const void *raw, size_t len)
计算数据的 CRC8 校验码 / Computes the CRC8 checksum for the given data
Definition crc.hpp:66
static bool Verify(const void *raw, size_t len)
验证数据的 CRC8 校验码 / Verifies the CRC8 checksum of the given data
Definition crc.hpp:91
提供一个通用的回调包装,支持动态参数传递。 Provides a generic callback wrapper, supporting dynamic argument passing.
Definition libxr_cb.hpp:125
void Run(bool in_isr, PassArgs &&...args) const
执行回调函数,并传递参数。 Executes the callback function, passing the arguments.
Definition libxr_cb.hpp:208
常量原始数据封装类。 A class for encapsulating constant raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
const void * addr_
数据存储地址(常量)。 The storage address of the data (constant).
数据节点模板,继承自 BaseNode,用于存储具体数据类型。 Template data node that inherits from BaseNode to store specific data...
Data data_
存储的数据。 The stored data.
链表实现,用于存储和管理数据节点。 A linked list implementation for storing and managing data nodes.
无锁队列实现 / Lock-free queue implementation
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
线程安全的锁队列类,提供同步和异步操作支持 Thread-safe lock queue class with synchronous and asynchronous operation suppor...
ErrorCode PushFromCallback(const Data &data, bool in_isr)
从回调函数中推送数据 Pushes data into the queue from a callback function
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
Definition mutex.hpp:18
Key key
节点键值 (Key associated with the node).
Definition rbt.hpp:41
红黑树的泛型数据节点,继承自 BaseNode (Generic data node for Red-Black Tree, inheriting from BaseNode).
Definition rbt.hpp:64
Data data_
存储的数据 (Stored data).
Definition rbt.hpp:99
红黑树实现,支持泛型键和值,并提供线程安全操作 (Red-Black Tree implementation supporting generic keys and values with thread...
Definition rbt.hpp:24
Node< Data > * Search(const Key &key)
搜索红黑树中的节点 (Search for a node in the Red-Black Tree).
Definition rbt.hpp:122
void Insert(BaseNode &node, KeyType &&key)
在树中插入新节点 (Insert a new node into the tree).
Definition rbt.hpp:237
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
信号量类,实现线程同步机制 Semaphore class implementing thread synchronization
Definition semaphore.hpp:23
static uint32_t GetTime()
获取当前系统时间(毫秒) Gets the current system time in milliseconds
Definition thread.cpp:39
static void Sleep(uint32_t milliseconds)
让线程进入休眠状态 Puts the thread to sleep
Definition thread.cpp:16
异步订阅者类,用于订阅异步数据 Asynchronous subscriber class for subscribing to asynchronous data
Definition message.hpp:303
ASyncSubscriber(Topic topic)
构造函数,使用 Topic 进行初始化 Constructor using a Topic for initialization
Definition message.hpp:321
Data & GetData()
获取当前数据 Retrieves the current data
Definition message.hpp:354
LockFreeList::Node< ASyncBlock > * block_
订阅者数据块。Subscriber data block.
Definition message.hpp:366
bool Available()
检查数据是否可用 Checks if data is available
Definition message.hpp:346
ASyncSubscriber(const char *name, Domain *domain=nullptr)
构造函数,通过名称和数据创建订阅者 Constructor to create a subscriber with a name and data
Definition message.hpp:311
void StartWaiting()
开始等待数据更新 Starts waiting for data update
Definition message.hpp:364
主题域(Domain)管理器,用于组织多个主题。Domain manager for organizing multiple topics.
Definition message.hpp:152
Domain(const char *name)
构造函数,初始化或查找指定名称的主题域。Constructor initializing or looking up a domain by name.
Definition message.hpp:159
RBTree< uint32_t >::Node< RBTree< uint32_t > > * node_
指向该域的根节点。Pointer to the root node of the domain.
Definition message.hpp:191
构造函数,使用名称和无锁队列进行初始化 Constructor using a name and a lock-free queue
Definition message.hpp:391
QueuedSubscriber(Topic topic, LockFreeQueue< Data > &queue)
构造函数,使用 Topic 和无锁队列进行初始化 Constructor using a Topic and a lock-free queue
Definition message.hpp:408
QueuedSubscriber(const char *name, LockQueue< Data > &queue, Domain *domain=nullptr)
构造函数,使用名称和带锁队列进行初始化 Constructor using a name and a locked queue
Definition message.hpp:441
QueuedSubscriber(Topic topic, LockQueue< Data > &queue)
构造函数,使用 Topic 和带锁队列进行初始化 Constructor using a Topic and a locked queue
Definition message.hpp:454
服务器类,负责解析数据并将其分发到相应的主题 Server class responsible for parsing data and distributing it to corresponding...
Definition message.hpp:899
RBTree< uint32_t > topic_map_
主题映射表 Topic mapping table
Definition message.hpp:1061
void Register(TopicHandle topic)
注册一个主题 Registers a topic
Definition message.hpp:934
Server(size_t buffer_length)
构造函数,初始化服务器并分配缓冲区 Constructor to initialize the server and allocate buffer
Definition message.hpp:918
BaseQueue queue_
数据队列 Data queue
Definition message.hpp:1062
size_t ParseData(ConstRawData data)
解析接收到的数据 Parses received data
Definition message.hpp:946
TopicHandle current_topic_
当前主题句柄 Current topic handle
Definition message.hpp:1064
uint32_t data_len_
当前数据长度 Current data length
Definition message.hpp:1060
Status
服务器解析状态枚举 Enumeration of server parsing states
Definition message.hpp:907
@ WAIT_DATA_CRC
等待数据校验 Waiting for data CRC validation
@ WAIT_START
等待起始标志 Waiting for start flag
@ WAIT_TOPIC
等待主题信息 Waiting for topic information
RawData parse_buff_
解析数据缓冲区 Data buffer for parsing
Definition message.hpp:1063
Status status_
服务器的当前解析状态 Current parsing state of the server
Definition message.hpp:1058
同步订阅者类,允许同步方式接收数据。Synchronous subscriber class allowing data reception in a synchronous manner.
Definition message.hpp:233
SyncSubscriber(Topic topic, Data &data)
通过 Topic 句柄构造同步订阅者。Constructs a synchronous subscriber using a Topic handle.
Definition message.hpp:253
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待接收数据。Waits for data reception.
Definition message.hpp:275
LockFreeList::Node< SyncBlock > * block_
订阅者数据块。Subscriber data block.
Definition message.hpp:280
SyncSubscriber(const char *name, Data &data, Domain *domain=nullptr)
通过主题名称构造同步订阅者。Constructs a synchronous subscriber by topic name.
Definition message.hpp:242
主题(Topic)管理类 / Topic management class
Definition message.hpp:28
void Publish(void *addr, uint32_t size)
以原始地址和大小发布数据 Publishes data using raw address and size
Definition message.hpp:671
uint32_t GetKey() const
获取主题的键值 Gets the key value of the topic
Definition message.hpp:880
SuberType
订阅者类型。Subscriber type.
Definition message.hpp:199
@ SYNC
同步订阅者。Synchronous subscriber.
@ ASYNC
异步订阅者。Asynchronous subscriber.
@ QUEUE
队列订阅者。Queued subscriber.
@ CALLBACK
回调订阅者。Callback subscriber.
void Publish(Data &data)
发布数据 Publishes data
Definition message.hpp:660
Topic(const char *name, uint32_t max_length, Domain *domain=nullptr, bool cache=false, bool check_length=false)
构造函数,使用指定名称、最大长度、域及其他选项初始化主题 Constructor to initialize a topic with the specified name,...
Definition message.hpp:523
static Domain * def_domain_
默认的主题域,所有未指定域的主题都会归入此域 Default domain where all topics without a specified domain are assigned
Definition message.hpp:1084
static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
打包数据
Definition message.hpp:781
RBTree< uint32_t >::Node< Block > * TopicHandle
主题句柄,指向存储数据的红黑树节点。Handle pointing to a red-black tree node storing data.
Definition message.hpp:144
ErrorCode DumpData(Data &data)
转储数据到普通数据结构 Dumps data into a normal data structure
Definition message.hpp:824
static Topic CreateTopic(const char *name, Domain *domain=nullptr, bool cache=false, bool check_length=true)
创建一个新的主题 Creates a new topic
Definition message.hpp:581
static TopicHandle FindOrCreate(const char *name, Domain *domain=nullptr, bool cache=false, bool check_length=true)
在指定域中查找或创建主题 Finds or creates a topic in the specified domain
Definition message.hpp:627
Topic()
默认构造函数,创建一个空的 Topic 实例 Default constructor, creates an empty Topic instance
Definition message.hpp:509
void EnableCache()
启用主题的缓存功能 Enables caching for the topic
Definition message.hpp:642
ErrorCode DumpData(PackedData< Data > &data)
转储数据到 PackedData Dumps data into PackedData format
Definition message.hpp:805
static TopicHandle Find(const char *name, Domain *domain=nullptr)
在指定域中查找主题 Finds a topic in the specified domain
Definition message.hpp:602
Topic(TopicHandle topic)
通过句柄构造主题 Constructs a topic from a topic handle
Definition message.hpp:592
ErrorCode DumpData(RawData data, bool pack=false)
转储数据 Dump data
Definition message.hpp:748
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待主题的创建并返回其句柄 Waits for a topic to be created and returns its handle
Definition message.hpp:847
TopicHandle block_
主题句柄,指向当前主题的内存块 Topic handle pointing to the memory block of the current topic
Definition message.hpp:1072
void RegisterCallback(Callback &cb)
注册回调函数 Registers a callback function
Definition message.hpp:495
static RBTree< uint32_t > * domain_
主题域的红黑树结构,存储不同的主题 Red-Black Tree structure for storing different topics in the domain
Definition message.hpp:1078
LibXR 命名空间
Definition ch32_gpio.hpp:9
异步订阅块,继承自 SuberBlock Asynchronous subscription block, inheriting from SuberBlock
Definition message.hpp:289
RawData buff
缓冲区数据 Buffer data
Definition message.hpp:290
bool waiting
等待中标志 Waiting flag
Definition message.hpp:292
bool data_ready
数据就绪标志 Data ready flag
Definition message.hpp:291
存储主题(Topic)数据的结构体。Structure storing topic data.
Definition message.hpp:35
bool cache
是否启用数据缓存。Indicates whether data caching is enabled.
Definition message.hpp:40
Mutex mutex
线程同步互斥锁。Mutex for thread synchronization.
Definition message.hpp:38
bool check_length
是否检查数据长度。Indicates whether data length is checked.
Definition message.hpp:41
uint32_t max_length
数据的最大长度。Maximum length of data.
Definition message.hpp:36
uint32_t crc32
主题名称的 CRC32 校验码。CRC32 checksum of the topic name.
Definition message.hpp:37
RawData data
存储的数据。Stored data.
Definition message.hpp:39
LockFreeList subers
订阅者列表。List of subscribers.
Definition message.hpp:42
回调订阅块,继承自 SuberBlock Callback subscription block, inheriting from SuberBlock
Definition message.hpp:486
Callback cb
订阅的回调函数 Subscribed callback function
Definition message.hpp:487
队列订阅块,继承自 SuberBlock Queue subscription block, inheriting from SuberBlock
Definition message.hpp:375
void * queue
指向订阅队列的指针 Pointer to the subscribed queue
Definition message.hpp:376
void(* fun)(RawData &, void *, bool)
处理数据的回调函数 Callback function to handle data
Definition message.hpp:377
订阅者信息存储结构。Structure storing subscriber information.
Definition message.hpp:211
SuberType type
订阅者类型。Type of subscriber.
Definition message.hpp:212
同步订阅者存储结构。Structure storing synchronous subscriber data.
Definition message.hpp:220
Semaphore sem
信号量,用于同步等待数据。Semaphore for data synchronization.
Definition message.hpp:222
RawData buff
存储的数据缓冲区。Data buffer.
Definition message.hpp:221