libxr 1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
message.hpp
1#pragma once
2
3#include "crc.hpp"
4#include "libxr_cb.hpp"
5#include "libxr_def.hpp"
6#include "libxr_type.hpp"
7#include "list.hpp"
8#include "lock_queue.hpp"
9#include "lockfree_queue.hpp"
10#include "mutex.hpp"
11#include "queue.hpp"
12#include "rbt.hpp"
13#include "semaphore.hpp"
14#include "spin_lock.hpp"
15#include "thread.hpp"
16
17namespace LibXR
18{
28class Topic
29{
34 public:
45#ifndef __DOXYGEN__
50 struct __attribute__((packed)) PackedDataHeader
51 {
52 uint8_t prefix;
53 uint32_t
54 topic_name_crc32;
55 uint32_t data_len : 24;
56 uint8_t pack_header_crc8;
57 };
58
67 template <typename Data>
68 class __attribute__((packed)) PackedData
69 {
70 public:
77 struct __attribute__((packed))
78 {
79 PackedDataHeader header;
80 Data data_;
81 } raw;
82
83 uint8_t crc8_;
84
91 const Data &operator=(const Data &data)
92 {
93 raw.data_ = data;
94 crc8_ = CRC8::Calculate(&raw, sizeof(raw));
95 return data;
96 }
97
102 operator Data() { return raw.data_; }
103
107 Data *operator->() { return &(raw.data_); }
108
113 const Data *operator->() const { return &(raw.data_); }
114 };
115
116 static constexpr size_t PACK_BASE_SIZE = sizeof(PackedData<uint8_t>) - 1;
117
118#endif
119
126
132 class Domain
133 {
134 public:
140 Domain(const char *name)
141 {
142 if (!domain_)
143 {
145 if (!domain_)
146 {
147 domain_ =
148 new RBTree<uint32_t>([](const uint32_t &a, const uint32_t &b)
149 { return static_cast<int>(a) - static_cast<int>(b); });
150 }
152 }
153
154 auto crc32 = CRC32::Calculate(name, strlen(name));
155
156 auto domain = domain_->Search<RBTree<uint32_t>>(crc32);
157
158 if (domain != nullptr)
159 {
160 node_ = domain;
161 return;
162 }
163
165 [](const uint32_t &a, const uint32_t &b)
166 { return static_cast<int>(a) - static_cast<int>(b); });
167
168 domain_->Insert(*node_, crc32);
169 }
170
175 };
176
181 enum class SuberType : uint8_t
182 {
183 SYNC,
184 ASYNC,
185 QUEUE,
186 CALLBACK,
187 };
188
194 {
196 };
197
202 struct SyncBlock : public SuberBlock
203 {
206 };
207
214 template <typename Data>
216 {
217 public:
225 SyncSubscriber(const char *name, Data &data, Domain *domain = nullptr)
226 {
227 *this = SyncSubscriber(WaitTopic(name, UINT32_MAX, domain), data);
228 }
229
237 {
238 if (topic.block_->data_.check_length)
239 {
240 ASSERT(topic.block_->data_.max_length == sizeof(Data));
241 }
242 else
243 {
244 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
245 }
246
249 block_->data_.buff = RawData(data);
250 topic.block_->data_.subers.Add(*block_);
251 }
252
258 ErrorCode Wait(uint32_t timeout = UINT32_MAX)
259 {
260 return block_->data_.sem.Wait(timeout);
261 }
262
264 };
265
271 typedef struct ASyncBlock : public SuberBlock
272 {
275 bool waiting;
276 } ASyncBlock;
277
284 template <typename Data>
286 {
287 public:
294 ASyncSubscriber(const char *name, Domain *domain = nullptr)
295 {
297 }
298
305 {
306 if (topic.block_->data_.check_length)
307 {
308 ASSERT(topic.block_->data_.max_length == sizeof(Data));
309 }
310 else
311 {
312 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
313 }
314
317 block_->data_.buff = *(new Data);
318 topic.block_->data_.subers.Add(*block_);
319 }
320
329 bool Available() { return block_->data_.data_ready; }
330
338 {
339 block_->data_.data_ready = false;
340 return *reinterpret_cast<Data *>(block_->data_.buff.addr_);
341 }
342
347 void StartWaiting() { block_->data_.waiting = true; }
348
350 };
351
357 typedef struct QueueBlock : public SuberBlock
358 {
359 void *queue;
360 void (*fun)(RawData &, void *,
361 bool);
362 } QueueBlock;
363
374 {
375 public:
376 template <typename Data, uint32_t Length>
377 QueuedSubscriber(const char *name, LockFreeQueue<Data> &queue,
378 Domain *domain = nullptr)
379 {
380 *this = QueuedSubscriber(WaitTopic(name, UINT32_MAX, domain), queue);
381 }
382
390 template <typename Data>
392 {
393 if (topic.block_->data_.check_length)
394 {
395 ASSERT(topic.block_->data_.max_length == sizeof(Data));
396 }
397 else
398 {
399 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
400 }
401
402 auto block = new List::Node<QueueBlock>;
404 block->data_.queue = &queue;
405 block->data_.fun = [](RawData &data, void *arg, bool in_isr)
406 {
407 UNUSED(in_isr);
408 LockFreeQueue<Data> *queue = reinterpret_cast<LockFreeQueue<Data>>(arg);
409 queue->Push(reinterpret_cast<Data>(data.addr_));
410 };
411
412 topic.block_->data_.subers.Add(*block);
413 }
414
423 template <typename Data>
424 QueuedSubscriber(const char *name, LockQueue<Data> &queue, Domain *domain = nullptr)
425 {
426 *this = QueuedSubscriber(WaitTopic(name, UINT32_MAX, domain), queue);
427 }
428
436 template <typename Data>
438 {
439 if (topic.block_->data_.check_length)
440 {
441 ASSERT(topic.block_->data_.max_length == sizeof(Data));
442 }
443 else
444 {
445 ASSERT(topic.block_->data_.max_length <= sizeof(Data));
446 }
447
448 auto block = new List::Node<QueueBlock>;
450 block->data_.queue = &queue;
451 block->data_.fun = [](RawData &data, void *arg, bool in_isr)
452 {
453 LockQueue<Data> *queue = reinterpret_cast<LockQueue<Data> *>(arg);
454 queue->PushFromCallback(*reinterpret_cast<const Data *>(data.addr_), in_isr);
455 };
456
457 topic.block_->data_.subers.Add(*block);
458 }
459 };
460
466 typedef struct CallbackBlock : public SuberBlock
467 {
470
477 {
479 block.cb = cb;
482 block_->data_.subers.Add(*node);
483 }
484
489 Topic() {}
490
503 Topic(const char *name, uint32_t max_length, Domain *domain = nullptr,
504 bool cache = false, bool check_length = false)
505 {
506 if (!def_domain_)
507 {
509 if (!domain_)
510 {
511 domain_ =
512 new RBTree<uint32_t>([](const uint32_t &a, const uint32_t &b)
513 { return static_cast<int>(a) - static_cast<int>(b); });
514 }
515 if (!def_domain_)
516 {
517 def_domain_ = new Domain("libxr_def_domain");
518 }
520 }
521
522 if (domain == nullptr)
523 {
525 }
526
527 auto crc32 = CRC32::Calculate(name, strlen(name));
528
529 auto topic = domain->node_->data_.Search<Block>(crc32);
530
531 if (topic)
532 {
533 ASSERT(topic->data_.max_length == max_length);
534 ASSERT(topic->data_.check_length == check_length);
535
536 block_ = topic;
537 }
538 else
539 {
541 block_->data_.max_length = max_length;
542 block_->data_.crc32 = crc32;
543 block_->data_.data.addr_ = nullptr;
544 block_->data_.cache = false;
545 block_->data_.check_length = check_length;
546
547 domain->node_->data_.Insert(*block_, crc32);
548 }
549
550 if (cache && !block_->data_.cache)
551 {
552 EnableCache();
553 }
554 }
555
568 template <typename Data>
569 static Topic CreateTopic(const char *name, Domain *domain = nullptr, bool cache = false,
570 bool check_length = true)
571 {
572 return Topic(name, sizeof(Data), domain, cache, check_length);
573 }
574
581
590 static TopicHandle Find(const char *name, Domain *domain = nullptr)
591 {
592 if (domain == nullptr)
593 {
595 }
596
597 auto crc32 = CRC32::Calculate(name, strlen(name));
598
599 return domain->node_->data_.Search<Block>(crc32);
600 }
601
614 template <typename Data>
615 static TopicHandle FindOrCreate(const char *name, Domain *domain = nullptr,
616 bool cache = false, bool check_length = true)
617 {
618 auto topic = Find(name, domain);
619 if (topic == nullptr)
620 {
621 topic = CreateTopic<Data>(name, domain, cache, check_length).block_;
622 }
623 return topic;
624 }
625
631 {
632 block_->data_.mutex.Lock();
633 if (!block_->data_.cache)
634 {
635 block_->data_.cache = true;
636 block_->data_.data.addr_ = new uint8_t[block_->data_.max_length];
637 }
638 block_->data_.mutex.Unlock();
639 }
640
647 template <typename Data>
648 void Publish(Data &data)
649 {
650 Publish(&data, sizeof(Data));
651 }
652
660 template <typename Data>
662 {
663 PublishFromCallback(&data, sizeof(Data), in_isr);
664 }
665
672 void Publish(void *addr, uint32_t size)
673 {
674 block_->data_.mutex.Lock();
675 if (block_->data_.check_length)
676 {
677 ASSERT(size == block_->data_.max_length);
678 }
679 else
680 {
681 ASSERT(size <= block_->data_.max_length);
682 }
683
684 if (block_->data_.cache)
685 {
686 memcpy(block_->data_.data.addr_, addr, size);
687 block_->data_.data.size_ = size;
688 }
689 else
690 {
691 block_->data_.data.addr_ = addr;
692 block_->data_.data.size_ = size;
693 }
694
695 RawData data = block_->data_.data;
696
697 auto foreach_fun = [&](SuberBlock &block)
698 {
699 switch (block.type)
700 {
701 case SuberType::SYNC:
702 {
703 auto sync = reinterpret_cast<SyncBlock *>(&block);
704 memcpy(sync->buff.addr_, data.addr_, data.size_);
705 sync->sem.Post();
706 break;
707 }
708 case SuberType::ASYNC:
709 {
710 auto async = reinterpret_cast<ASyncBlock *>(&block);
711 if (async->waiting)
712 {
713 memcpy(async->buff.addr_, data.addr_, data.size_);
714 async->data_ready = true;
715 }
716 break;
717 }
718 case SuberType::QUEUE:
719 {
720 auto queue_block = reinterpret_cast<QueueBlock *>(&block);
721 queue_block->fun(data, queue_block->queue, false);
722 break;
723 }
725 {
726 auto cb_block = reinterpret_cast<CallbackBlock *>(&block);
727 cb_block->cb.Run(false, data);
728 break;
729 }
730 }
731 return ErrorCode::OK;
732 };
733
734 block_->data_.subers.Foreach<SuberBlock>(foreach_fun);
735
736 block_->data_.mutex.Unlock();
737 }
738
746 void PublishFromCallback(void *addr, uint32_t size, bool in_isr)
747 {
748 if (block_->data_.mutex.TryLockInCallback(in_isr) != ErrorCode::OK)
749 {
750 return;
751 }
752
753 if (block_->data_.check_length)
754 {
755 ASSERT(size == block_->data_.max_length);
756 }
757 else
758 {
759 ASSERT(size <= block_->data_.max_length);
760 }
761
762 if (block_->data_.cache)
763 {
764 memcpy(block_->data_.data.addr_, addr, size);
765 block_->data_.data.size_ = size;
766 }
767 else
768 {
769 block_->data_.data.addr_ = addr;
770 block_->data_.data.size_ = size;
771 }
772
773 RawData data = block_->data_.data;
774
775 auto foreach_fun = [&](SuberBlock &block)
776 {
777 switch (block.type)
778 {
779 case SuberType::SYNC:
780 {
781 auto sync = reinterpret_cast<SyncBlock *>(&block);
782 memcpy(sync->buff.addr_, data.addr_, data.size_);
783 sync->sem.PostFromCallback(in_isr);
784 break;
785 }
786 case SuberType::ASYNC:
787 {
788 auto async = reinterpret_cast<ASyncBlock *>(&block);
789 memcpy(async->buff.addr_, data.addr_, data.size_);
790 async->data_ready = true;
791 break;
792 }
793 case SuberType::QUEUE:
794 {
795 auto queue_block = reinterpret_cast<QueueBlock *>(&block);
796 queue_block->fun(data, queue_block->queue, in_isr);
797 break;
798 }
800 {
801 auto cb_block = reinterpret_cast<CallbackBlock *>(&block);
802 cb_block->cb.Run(in_isr, data);
803 break;
804 }
805 }
806 return ErrorCode::OK;
807 };
808
809 block_->data_.subers.ForeachFromCallback<SuberBlock>(foreach_fun, in_isr);
810
811 block_->data_.mutex.UnlockFromCallback(in_isr);
812 }
813
823 template <SizeLimitMode Mode = SizeLimitMode::MORE>
824 ErrorCode DumpData(RawData data, bool pack = false)
825 {
826 if (block_->data_.data.addr_ == nullptr)
827 {
828 return ErrorCode::EMPTY;
829 }
830
831 if (!pack)
832 {
833 Assert::SizeLimitCheck<Mode>(block_->data_.data.size_, data.size_);
834 block_->data_.mutex.Lock();
835 memcpy(data.addr_, block_->data_.data.addr_, block_->data_.data.size_);
836 block_->data_.mutex.Unlock();
837 }
838 else
839 {
840 Assert::SizeLimitCheck<Mode>(PACK_BASE_SIZE + block_->data_.data.size_, data.size_);
841
842 block_->data_.mutex.Lock();
843 PackData(block_->data_.crc32, data, block_->data_.data);
844 block_->data_.mutex.Unlock();
845 }
846
847 return ErrorCode::OK;
848 }
849
850 static void PackData(uint32_t topic_name_crc32, RawData buffer, RawData source)
851 {
852 PackedData<uint8_t> *pack = reinterpret_cast<PackedData<uint8_t> *>(buffer.addr_);
853
854 memcpy(&pack->raw.data_, source.addr_, source.size_);
855
856 pack->raw.header.prefix = 0xa5;
857 pack->raw.header.topic_name_crc32 = topic_name_crc32;
858 pack->raw.header.data_len = source.size_;
859 pack->raw.header.pack_header_crc8 =
860 CRC8::Calculate(&pack->raw, sizeof(PackedDataHeader) - sizeof(uint8_t));
862 reinterpret_cast<uint8_t *>(reinterpret_cast<uint8_t *>(pack) + PACK_BASE_SIZE +
863 source.size_ - sizeof(uint8_t));
864 *crc8_pack = CRC8::Calculate(pack, PACK_BASE_SIZE - sizeof(uint8_t) + source.size_);
865 }
866
873 template <typename Data>
874 ErrorCode DumpData(PackedData<Data> &data)
875 {
876 if (block_->data_.data.addr_ == nullptr)
877 {
878 return ErrorCode::EMPTY;
879 }
880
881 ASSERT(sizeof(Data) == block_->data_.data.size_);
882
883 return DumpData<SizeLimitMode::NONE>(RawData(data), true);
884 }
885
892 template <typename Data>
893 ErrorCode DumpData(Data &data)
894 {
895 if (block_->data_.data.addr_ == nullptr)
896 {
897 return ErrorCode::EMPTY;
898 }
899
900 ASSERT(sizeof(Data) == block_->data_.data.size_);
901
902 return DumpData<SizeLimitMode::NONE>(data, false);
903 }
904
916 static TopicHandle WaitTopic(const char *name, uint32_t timeout = UINT32_MAX,
917 Domain *domain = nullptr)
918 {
919 TopicHandle topic = nullptr;
920 do
921 {
922 topic = Find(name, domain);
923 if (topic == nullptr)
924 {
925 if (timeout <= Thread::GetTime())
926 {
927 return nullptr;
928 }
929 Thread::Sleep(1);
930 }
931 } while (topic == nullptr);
932
933 return topic;
934 }
935
941 operator TopicHandle() { return block_; }
942
949 class Server
950 {
951 public:
957 enum class Status : uint8_t
958 {
959 WAIT_START,
960 WAIT_TOPIC,
962 };
963
971 { return static_cast<int>(a) - static_cast<int>(b); }),
973 {
974 /* Minimum size: header8 + crc32 + length24 + crc8 + data + crc8 = 10 */
975 ASSERT(buffer_length >= PACK_BASE_SIZE);
978 }
979
990
998 {
999 size_t count = 0;
1000
1001 queue_.PushBatch(data.addr_, data.size_);
1002
1004 /* 1. Check prefix */
1006 {
1007 /* Check start frame */
1008 auto queue_size = queue_.Size();
1009 for (uint32_t i = 0; i < queue_size; i++)
1010 {
1011 uint8_t prefix = 0;
1012 queue_.Peek(&prefix);
1013 if (prefix == 0xa5)
1014 {
1016 break;
1017 }
1018 queue_.Pop();
1019 }
1020 /* Not found */
1022 {
1023 return count;
1024 }
1025 }
1026
1027 /* 2. Get topic info */
1029 {
1030 /* Check size&crc */
1031 if (queue_.Size() >= sizeof(PackedDataHeader))
1032 {
1035 {
1036 auto header = reinterpret_cast<PackedDataHeader *>(parse_buff_.addr_);
1037 /* Find topic */
1038 auto node = topic_map_.Search<TopicHandle>(header->topic_name_crc32);
1039 if (node)
1040 {
1041 data_len_ = header->data_len;
1044 }
1045 else
1046 {
1048 goto check_start; // NOLINT
1049 }
1050 }
1051 else
1052 {
1054 goto check_start; // NOLINT
1055 }
1056 }
1057 else
1058 {
1059 return count;
1060 }
1061 }
1062
1064 {
1065 /* Check size&crc */
1066 if (queue_.Size() >= data_len_ + sizeof(uint8_t))
1067 {
1068 uint8_t *data =
1069 reinterpret_cast<uint8_t *>(parse_buff_.addr_) + sizeof(PackedDataHeader);
1070 queue_.PopBatch(data, data_len_ + sizeof(uint8_t));
1072 data_len_ + sizeof(PackedDataHeader) + sizeof(uint8_t)))
1073 {
1075 auto data =
1076 reinterpret_cast<uint8_t *>(parse_buff_.addr_) + sizeof(PackedDataHeader);
1077
1078 Topic(current_topic_).Publish(data, data_len_);
1079
1080 count++;
1081
1082 goto check_start; // NOLINT
1083 }
1084 else
1085 {
1086 goto check_start; // NOLINT
1087 }
1088 }
1089 else
1090 {
1091 return count;
1092 }
1093 }
1094
1095 return count;
1096 }
1097
1098 private:
1106 };
1107
1108 private:
1114
1119 static inline RBTree<uint32_t> *domain_ = nullptr;
1120
1125 static inline SpinLock domain_lock_;
1126
1131 static inline Domain *def_domain_ = nullptr;
1132};
1133} // namespace LibXR
基础队列类,提供固定大小的循环缓冲区 (Base queue class providing a fixed-size circular buffer).
Definition queue.hpp:20
ErrorCode Pop(void *data=nullptr)
移除队列头部的元素 (Pop the front element from the queue).
Definition queue.hpp:114
size_t Size()
获取队列中的元素个数 (Get the number of elements in the queue).
Definition queue.hpp:305
ErrorCode PopBatch(void *data, size_t size)
批量移除多个元素 (Pop multiple elements from the queue).
Definition queue.hpp:208
ErrorCode Peek(void *data)
获取队列头部的元素但不移除 (Peek at the front element without removing it).
Definition queue.hpp:92
ErrorCode PushBatch(const void *data, size_t size)
批量推入多个元素 (Push multiple elements into the queue).
Definition queue.hpp:175
static uint32_t Calculate(const void *raw, size_t len)
计算数据的 CRC32 校验码 / Computes the CRC32 checksum for the given data
Definition crc.hpp:251
static uint8_t Calculate(const void *raw, size_t len)
计算数据的 CRC8 校验码 / Computes the CRC8 checksum for the given data
Definition crc.hpp:66
static bool Verify(const void *raw, size_t len)
验证数据的 CRC8 校验码 / Verifies the CRC8 checksum of the given data
Definition crc.hpp:91
提供一个通用的回调包装,支持动态参数传递。 Provides a generic callback wrapper, supporting dynamic argument passing.
Definition libxr_cb.hpp:124
常量原始数据封装类。 A class for encapsulating constant raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
const void * addr_
数据存储地址(常量)。 The storage address of the data (constant).
数据节点模板,继承自 BaseNode,用于存储具体数据类型。 Template data node that inherits from BaseNode to store specific data...
Definition list.hpp:60
Data data_
存储的数据。 The stored data.
Definition list.hpp:111
链表实现,用于存储和管理数据节点。 A linked list implementation for storing and managing data nodes.
Definition list.hpp:23
无锁队列实现 / Lock-free queue implementation
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
线程安全的队列实现,基于 FreeRTOS 消息队列 Thread-safe queue implementation based on FreeRTOS message queue
ErrorCode PushFromCallback(const Data &data, bool in_isr)
从 ISR(中断服务例程)推入数据 Pushes data into the queue from an ISR (Interrupt Service Routine)
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
Definition mutex.hpp:18
红黑树的泛型数据节点,继承自 BaseNode (Generic data node for Red-Black Tree, inheriting from BaseNode).
Definition rbt.hpp:64
红黑树实现,支持泛型键和值,并提供线程安全操作 (Red-Black Tree implementation supporting generic keys and values with thread...
Definition rbt.hpp:24
Node< Data > * Search(const Key &key)
搜索红黑树中的节点 (Search for a node in the Red-Black Tree).
Definition rbt.hpp:122
void Insert(BaseNode &node, KeyType &&key)
在树中插入新节点 (Insert a new node into the tree).
Definition rbt.hpp:237
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
信号量类,实现线程同步机制 Semaphore class implementing thread synchronization
Definition semaphore.hpp:23
轻量级自旋锁实现 / Lightweight spinlock implementation
Definition spin_lock.hpp:17
void Lock() noexcept
阻塞直到获取锁 / Blocks until the lock is acquired
Definition spin_lock.hpp:44
void Unlock() noexcept
释放锁 / Releases the lock
Definition spin_lock.hpp:57
static uint32_t GetTime()
获取当前系统时间(毫秒) Gets the current system time in milliseconds
Definition thread.cpp:16
static void Sleep(uint32_t milliseconds)
让线程进入休眠状态 Puts the thread to sleep
Definition thread.cpp:9
异步订阅者类,用于订阅异步数据 Asynchronous subscriber class for subscribing to asynchronous data
Definition message.hpp:286
ASyncSubscriber(Topic topic)
构造函数,使用 Topic 进行初始化 Constructor using a Topic for initialization
Definition message.hpp:304
Data & GetData()
获取当前数据 Retrieves the current data
Definition message.hpp:337
bool Available()
检查数据是否可用 Checks if data is available
Definition message.hpp:329
List::Node< ASyncBlock > * block_
订阅者数据块。Subscriber data block.
Definition message.hpp:349
ASyncSubscriber(const char *name, Domain *domain=nullptr)
构造函数,通过名称和数据创建订阅者 Constructor to create a subscriber with a name and data
Definition message.hpp:294
void StartWaiting()
开始等待数据更新 Starts waiting for data update
Definition message.hpp:347
主题域(Domain)管理器,用于组织多个主题。Domain manager for organizing multiple topics.
Definition message.hpp:133
Domain(const char *name)
构造函数,初始化或查找指定名称的主题域。Constructor initializing or looking up a domain by name.
Definition message.hpp:140
RBTree< uint32_t >::Node< RBTree< uint32_t > > * node_
指向该域的根节点。Pointer to the root node of the domain.
Definition message.hpp:174
构造函数,使用名称和无锁队列进行初始化 Constructor using a name and a lock-free queue
Definition message.hpp:374
QueuedSubscriber(Topic topic, LockFreeQueue< Data > &queue)
构造函数,使用 Topic 和无锁队列进行初始化 Constructor using a Topic and a lock-free queue
Definition message.hpp:391
QueuedSubscriber(const char *name, LockQueue< Data > &queue, Domain *domain=nullptr)
构造函数,使用名称和带锁队列进行初始化 Constructor using a name and a locked queue
Definition message.hpp:424
QueuedSubscriber(Topic topic, LockQueue< Data > &queue)
构造函数,使用 Topic 和带锁队列进行初始化 Constructor using a Topic and a locked queue
Definition message.hpp:437
服务器类,负责解析数据并将其分发到相应的主题 Server class responsible for parsing data and distributing it to corresponding...
Definition message.hpp:950
RBTree< uint32_t > topic_map_
主题映射表 Topic mapping table
Definition message.hpp:1102
void Register(TopicHandle topic)
注册一个主题 Registers a topic
Definition message.hpp:985
Server(size_t buffer_length)
构造函数,初始化服务器并分配缓冲区 Constructor to initialize the server and allocate buffer
Definition message.hpp:969
BaseQueue queue_
数据队列 Data queue
Definition message.hpp:1103
size_t ParseData(ConstRawData data)
解析接收到的数据 Parses received data
Definition message.hpp:997
TopicHandle current_topic_
当前主题句柄 Current topic handle
Definition message.hpp:1105
uint32_t data_len_
当前数据长度 Current data length
Definition message.hpp:1101
Status
服务器解析状态枚举 Enumeration of server parsing states
Definition message.hpp:958
@ WAIT_DATA_CRC
等待数据校验 Waiting for data CRC validation
@ WAIT_START
等待起始标志 Waiting for start flag
@ WAIT_TOPIC
等待主题信息 Waiting for topic information
RawData parse_buff_
解析数据缓冲区 Data buffer for parsing
Definition message.hpp:1104
Status status_
服务器的当前解析状态 Current parsing state of the server
Definition message.hpp:1099
同步订阅者类,允许同步方式接收数据。Synchronous subscriber class allowing data reception in a synchronous manner.
Definition message.hpp:216
SyncSubscriber(Topic topic, Data &data)
通过 Topic 句柄构造同步订阅者。Constructs a synchronous subscriber using a Topic handle.
Definition message.hpp:236
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待接收数据。Waits for data reception.
Definition message.hpp:258
List::Node< SyncBlock > * block_
订阅者数据块。Subscriber data block.
Definition message.hpp:263
SyncSubscriber(const char *name, Data &data, Domain *domain=nullptr)
通过主题名称构造同步订阅者。Constructs a synchronous subscriber by topic name.
Definition message.hpp:225
主题(Topic)管理类 / Topic management class
Definition message.hpp:29
void Publish(void *addr, uint32_t size)
以原始地址和大小发布数据 Publishes data using raw address and size
Definition message.hpp:672
SuberType
订阅者类型。Subscriber type.
Definition message.hpp:182
@ SYNC
同步订阅者。Synchronous subscriber.
@ ASYNC
异步订阅者。Asynchronous subscriber.
@ QUEUE
队列订阅者。Queued subscriber.
@ CALLBACK
回调订阅者。Callback subscriber.
RBTree< uint32_t >::Node< Block > * TopicHandle
主题句柄,指向存储数据的红黑树节点。Handle pointing to a red-black tree node storing data.
Definition message.hpp:125
void Publish(Data &data)
发布数据 Publishes data
Definition message.hpp:648
Topic(const char *name, uint32_t max_length, Domain *domain=nullptr, bool cache=false, bool check_length=false)
构造函数,使用指定名称、最大长度、域及其他选项初始化主题 Constructor to initialize a topic with the specified name,...
Definition message.hpp:503
static Domain * def_domain_
默认的主题域,所有未指定域的主题都会归入此域 Default domain where all topics without a specified domain are assigned
Definition message.hpp:1131
ErrorCode DumpData(Data &data)
转储数据到普通数据结构 Dumps data into a normal data structure
Definition message.hpp:893
void PublishFromCallback(void *addr, uint32_t size, bool in_isr)
从回调函数发布数据 Publishes data from a callback function
Definition message.hpp:746
static Topic CreateTopic(const char *name, Domain *domain=nullptr, bool cache=false, bool check_length=true)
创建一个新的主题 Creates a new topic
Definition message.hpp:569
static TopicHandle FindOrCreate(const char *name, Domain *domain=nullptr, bool cache=false, bool check_length=true)
在指定域中查找或创建主题 Finds or creates a topic in the specified domain
Definition message.hpp:615
Topic()
默认构造函数,创建一个空的 Topic 实例 Default constructor, creates an empty Topic instance
Definition message.hpp:489
void EnableCache()
启用主题的缓存功能 Enables caching for the topic
Definition message.hpp:630
void RegisterCallback(Callback< RawData & > &cb)
注册回调函数 Registers a callback function
Definition message.hpp:476
ErrorCode DumpData(PackedData< Data > &data)
转储数据到 PackedData Dumps data into PackedData format
Definition message.hpp:874
static TopicHandle Find(const char *name, Domain *domain=nullptr)
在指定域中查找主题 Finds a topic in the specified domain
Definition message.hpp:590
Topic(TopicHandle topic)
通过句柄构造主题 Constructs a topic from a topic handle
Definition message.hpp:580
ErrorCode DumpData(RawData data, bool pack=false)
转储数据 Dump data
Definition message.hpp:824
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待主题的创建并返回其句柄 Waits for a topic to be created and returns its handle
Definition message.hpp:916
void PublishFromCallback(Data &data, bool in_isr)
从回调函数发布数据 Publishes data from a callback function
Definition message.hpp:661
static SpinLock domain_lock_
主题域访问的自旋锁,确保多线程安全 SpinLock for domain access to ensure thread safety
Definition message.hpp:1125
TopicHandle block_
主题句柄,指向当前主题的内存块 Topic handle pointing to the memory block of the current topic
Definition message.hpp:1113
static RBTree< uint32_t > * domain_
主题域的红黑树结构,存储不同的主题 Red-Black Tree structure for storing different topics in the domain
Definition message.hpp:1119
LibXR Color Control Library / LibXR终端颜色控制库
constexpr auto min(T1 a, T2 b) -> typename std::common_type< T1, T2 >::type
计算两个数的最小值
异步订阅块,继承自 SuberBlock Asynchronous subscription block, inheriting from SuberBlock
Definition message.hpp:272
RawData buff
缓冲区数据 Buffer data
Definition message.hpp:273
bool waiting
等待中标志 Waiting flag
Definition message.hpp:275
bool data_ready
数据就绪标志 Data ready flag
Definition message.hpp:274
存储主题(Topic)数据的结构体。Structure storing topic data.
Definition message.hpp:36
bool cache
是否启用数据缓存。Indicates whether data caching is enabled.
Definition message.hpp:41
Mutex mutex
线程同步互斥锁。Mutex for thread synchronization.
Definition message.hpp:39
List subers
订阅者列表。List of subscribers.
Definition message.hpp:43
bool check_length
是否检查数据长度。Indicates whether data length is checked.
Definition message.hpp:42
uint32_t max_length
数据的最大长度。Maximum length of data.
Definition message.hpp:37
uint32_t crc32
主题名称的 CRC32 校验码。CRC32 checksum of the topic name.
Definition message.hpp:38
RawData data
存储的数据。Stored data.
Definition message.hpp:40
回调订阅块,继承自 SuberBlock Callback subscription block, inheriting from SuberBlock
Definition message.hpp:467
Callback< RawData & > cb
订阅的回调函数 Subscribed callback function
Definition message.hpp:468
队列订阅块,继承自 SuberBlock Queue subscription block, inheriting from SuberBlock
Definition message.hpp:358
void * queue
指向订阅队列的指针 Pointer to the subscribed queue
Definition message.hpp:359
void(* fun)(RawData &, void *, bool)
处理数据的回调函数 Callback function to handle data
Definition message.hpp:360
订阅者信息存储结构。Structure storing subscriber information.
Definition message.hpp:194
SuberType type
订阅者类型。Type of subscriber.
Definition message.hpp:195
同步订阅者存储结构。Structure storing synchronous subscriber data.
Definition message.hpp:203
Semaphore sem
信号量,用于同步等待数据。Semaphore for data synchronization.
Definition message.hpp:205
RawData buff
存储的数据缓冲区。Data buffer.
Definition message.hpp:204