libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
LibXR::Topic::Server Class Reference

将字节流解析成 packet 并发布到已注册 topic 的状态机 / State machine that parses byte streams into packets and publishes them into registered topics More...

#include <server.hpp>

Collaboration diagram for LibXR::Topic::Server:
[legend]

Public Types

enum class  Status : uint8_t { WAIT_START , WAIT_TOPIC , WAIT_DATA_CRC }
 parser 当前所在阶段 / Current stage of the parser More...
 

Public Member Functions

 Server (size_t buffer_length)
 构造 parser 并分配内部暂存队列 / Construct the parser and allocate its internal staging queue
 
void Register (TopicHandle topic)
 注册一个可接收 packet 的 topic / Register one topic that may receive parsed packets
 
size_t ParseData (ConstRawData data)
 在普通上下文里喂入一批新字节 / Feed one new byte batch in normal context
 
size_t ParseDataFromCallback (ConstRawData data, bool in_isr)
 在回调/ISR 路径里喂入一批新字节 / Feed one new byte batch in callback/ISR path
 

Private Types

enum class  ParseResult : uint8_t { NEED_MORE , DROPPED , DELIVERED }
 一次 payload 阶段处理结果 / Result of one payload-stage handling step More...
 

Private Member Functions

size_t ParseDataRaw (ConstRawData data, bool from_callback, bool in_isr)
 ParseData*() 的共享实现 / Shared implementation behind ParseData*()
 
bool SyncToPacketStart ()
 把输入流同步到下一条 packet 起点 / Synchronize the input stream to the next packet start
 
bool ReadHeader ()
 在已对齐前缀后继续读取并校验完整头部 / Read and validate the full header after the prefix is aligned
 
ParseResult ReadPayload (bool from_callback, bool in_isr)
 读取当前包的 payload 和尾 CRC,并在成功时发布 / Read the payload and trailing CRC of the current packet and publish it on success
 
void ResetParser ()
 清空当前包的解析上下文并回到找起点状态 / Clear the current packet parsing context and return to the start-search state
 

Private Attributes

Status status_ = Status::WAIT_START
 当前 parser 阶段。Current parser stage.
 
uint32_t data_len_ = 0
 当前包头声明的 payload 长度。Payload length declared by the current header.
 
RBTree< uint32_t > topic_map_
 从 topic 名称 CRC32 到 topic 句柄的映射。Map from topic-name CRC32 to topic handle.
 
BaseQueue queue_
 输入字节 FIFO。Input byte FIFO.
 
RawData parse_buff_
 当前包头和 payload 的暂存缓冲区。Staging buffer holding the current header and payload.
 
TopicHandle current_topic_ = nullptr
 当前包命中的目标 topic。Target topic matched by the current packet.
 
MicrosecondTimestamp current_timestamp_
 当前包头里的时间戳。Timestamp carried by the current packet header.
 

Detailed Description

将字节流解析成 packet 并发布到已注册 topic 的状态机 / State machine that parses byte streams into packets and publishes them into registered topics

把字节流解析成 packet 并投递到 topic 的 parser / Parser that turns byte streams into packets and delivers them into topics

Note
当前 Server 自己不维护 raw topic/cache 语义;它只依赖注册 topic 的: payload_sizepayload_alignment 和名字 CRC 键。 The current Server does not restore old raw topic/cache semantics by itself; it relies only on each registered topic's payload_size, payload_alignment, and name CRC key.
当前兼容规则: 收到的 packet payload 短于 topic 固定大小时,仅保证前缀部分有效,后半段保持未定义; 长于 topic 固定大小时,仅保留前缀部分,其余字节直接截断。 Current compatibility rule: when an incoming packet payload is shorter than the topic's fixed size, only the leading prefix is guaranteed valid and the remaining tail stays unspecified; when it is longer, only the prefix matching the topic size is kept and the rest is truncated.

Definition at line 28 of file server.hpp.

Member Enumeration Documentation

◆ ParseResult

enum class LibXR::Topic::Server::ParseResult : uint8_t
strongprivate

一次 payload 阶段处理结果 / Result of one payload-stage handling step

Enumerator
NEED_MORE 

当前包还没收全。Current packet is still incomplete.

DROPPED 

当前包被丢弃。Current packet is dropped.

DELIVERED 

当前包已发布。Current packet is delivered.

Definition at line 87 of file server.hpp.

88 {
89 NEED_MORE,
90 DROPPED,
92 };
@ DROPPED
当前包被丢弃。Current packet is dropped.
@ NEED_MORE
当前包还没收全。Current packet is still incomplete.
@ DELIVERED
当前包已发布。Current packet is delivered.

◆ Status

enum class LibXR::Topic::Server::Status : uint8_t
strong

parser 当前所在阶段 / Current stage of the parser

Enumerator
WAIT_START 

正在找下一包前缀。Searching for the next packet prefix.

WAIT_TOPIC 

已读到前缀,正在等完整头。Prefix received; waiting for the full header.

WAIT_DATA_CRC 

头已接受,正在等 payload 和尾 CRC。Header accepted; waiting for payload plus trailing CRC.

Definition at line 35 of file server.hpp.

36 {
40 };
@ WAIT_DATA_CRC
头已接受,正在等 payload 和尾 CRC。Header accepted; waiting for payload plus trailing CRC.
@ WAIT_START
正在找下一包前缀。Searching for the next packet prefix.
@ WAIT_TOPIC
已读到前缀,正在等完整头。Prefix received; waiting for the full header.

Constructor & Destructor Documentation

◆ Server()

Topic::Server::Server ( size_t buffer_length)

构造 parser 并分配内部暂存队列 / Construct the parser and allocate its internal staging queue

Parameters
buffer_length内部字节队列和暂存缓冲区大小 / Size of the internal byte queue and staging buffer

Definition at line 12 of file server.cpp.

13 : topic_map_([](const uint32_t& a, const uint32_t& b)
14 { return static_cast<int>(a) - static_cast<int>(b); }),
15 queue_(1, buffer_length)
16{
17 ASSERT(buffer_length > PACK_BASE_SIZE);
18 parse_buff_.size_ = buffer_length;
20 new (std::align_val_t(LibXR::CACHE_LINE_SIZE)) uint8_t[parse_buff_.size_];
21}
size_t size_
数据字节数 / Data size in bytes
void * addr_
数据起始地址 / Data start address
RBTree< uint32_t > topic_map_
从 topic 名称 CRC32 到 topic 句柄的映射。Map from topic-name CRC32 to topic handle.
Definition server.hpp:138
BaseQueue queue_
输入字节 FIFO。Input byte FIFO.
Definition server.hpp:139
RawData parse_buff_
当前包头和 payload 的暂存缓冲区。Staging buffer holding the current header and payload.
Definition server.hpp:140
constexpr size_t CACHE_LINE_SIZE
缓存行大小 / Cache line size
Definition libxr_def.hpp:56

Member Function Documentation

◆ ParseData()

size_t Topic::Server::ParseData ( ConstRawData data)

在普通上下文里喂入一批新字节 / Feed one new byte batch in normal context

Parameters
data新收到的原始字节 / Newly received raw bytes
Returns
成功解析并发布的包数量 / Number of packets parsed and published

Definition at line 36 of file server.cpp.

37{
38 return ParseDataRaw(data, false, false);
39}
size_t ParseDataRaw(ConstRawData data, bool from_callback, bool in_isr)
ParseData*() 的共享实现 / Shared implementation behind ParseData*()
Definition server.cpp:46

◆ ParseDataFromCallback()

size_t Topic::Server::ParseDataFromCallback ( ConstRawData data,
bool in_isr )

在回调/ISR 路径里喂入一批新字节 / Feed one new byte batch in callback/ISR path

Parameters
data新收到的原始字节 / Newly received raw bytes
in_isr当前是否位于 ISR / Whether the current path is in ISR context
Returns
成功解析并发布的包数量 / Number of packets parsed and published

Definition at line 41 of file server.cpp.

42{
43 return ParseDataRaw(data, true, in_isr);
44}

◆ ParseDataRaw()

size_t Topic::Server::ParseDataRaw ( ConstRawData data,
bool from_callback,
bool in_isr )
private

ParseData*() 的共享实现 / Shared implementation behind ParseData*()

Parameters
data新收到的原始字节 / Newly received raw bytes
from_callback是否来自回调路径 / Whether the current parse comes from callback path
in_isr当前是否位于 ISR / Whether the current path is in ISR context
Returns
成功解析并发布的包数量 / Number of packets parsed and published

Definition at line 46 of file server.cpp.

47{
48 size_t count = 0;
49
50 (void)queue_.PushBatch(data.addr_, data.size_);
51
52 while (true)
53 {
55 {
56 return count;
57 }
58
60 {
61 return count;
62 }
63
65 {
66 switch (ReadPayload(from_callback, in_isr))
67 {
69 return count;
71 continue;
73 count++;
74 continue;
75 }
76 }
77 }
78}
ErrorCode PushBatch(const void *data, size_t size)
批量推入多个元素 (Push multiple elements into the queue).
Definition queue.cpp:105
size_t size_
数据字节数 / Data size in bytes
const void * addr_
数据起始地址 / Data start address
bool ReadHeader()
在已对齐前缀后继续读取并校验完整头部 / Read and validate the full header after the prefix is aligned
Definition server.cpp:98
bool SyncToPacketStart()
把输入流同步到下一条 packet 起点 / Synchronize the input stream to the next packet start
Definition server.cpp:80
Status status_
当前 parser 阶段。Current parser stage.
Definition server.hpp:136
ParseResult ReadPayload(bool from_callback, bool in_isr)
读取当前包的 payload 和尾 CRC,并在成功时发布 / Read the payload and trailing CRC of the current packet and publish i...
Definition server.cpp:147

◆ ReadHeader()

bool Topic::Server::ReadHeader ( )
private

在已对齐前缀后继续读取并校验完整头部 / Read and validate the full header after the prefix is aligned

Returns
头部有效返回 true,否则返回 false / Returns true when the header is valid, otherwise false

Definition at line 98 of file server.cpp.

99{
100 if (queue_.Size() < sizeof(PackedDataHeader))
101 {
102 return false;
103 }
104
105 queue_.PopBatch(parse_buff_.addr_, sizeof(PackedDataHeader));
106 if (!CRC8::Verify(parse_buff_.addr_, sizeof(PackedDataHeader)))
107 {
108 ResetParser();
109 return true;
110 }
111
112 auto* header = reinterpret_cast<PackedDataHeader*>(parse_buff_.addr_);
113 if (header->version != PACKET_VERSION)
114 {
115 ResetParser();
116 return true;
117 }
118
119 auto* node = topic_map_.Search<TopicHandle>(header->topic_name_crc32);
120 if (node == nullptr)
121 {
122 ResetParser();
123 return true;
124 }
125
126 data_len_ = header->GetDataLen();
127 current_timestamp_ = header->GetTimestamp();
128 current_topic_ = *node;
129 const auto target_size = current_topic_->data_.payload_size;
130
131 if (target_size + PACK_BASE_SIZE > parse_buff_.size_)
132 {
133 ResetParser();
134 return true;
135 }
136
137 if (data_len_ + PACK_BASE_SIZE > queue_.length_)
138 {
139 ResetParser();
140 return true;
141 }
142
144 return true;
145}
size_t length_
队列最大容量 (Maximum queue capacity).
Definition queue.hpp:163
size_t Size() const
获取队列中的元素个数 (Get the number of elements in the queue).
Definition queue.cpp:215
ErrorCode PopBatch(void *data, size_t size)
批量移除多个元素 (Pop multiple elements from the queue).
Definition queue.cpp:135
static bool Verify(const void *raw, size_t len)
验证数据的 CRC8 校验码 / Verifies the CRC8 checksum of the given data
Definition crc.hpp:94
Data data_
存储的数据 (Stored data).
Definition rbt.hpp:98
Node< Data > * Search(const Key &key)
搜索红黑树中的节点 (Search for a node in the Red-Black Tree).
Definition rbt.hpp:120
void ResetParser()
清空当前包的解析上下文并回到找起点状态 / Clear the current packet parsing context and return to the start-search state
Definition server.cpp:197
TopicHandle current_topic_
当前包命中的目标 topic。Target topic matched by the current packet.
Definition server.hpp:141
MicrosecondTimestamp current_timestamp_
当前包头里的时间戳。Timestamp carried by the current packet header.
Definition server.hpp:142
uint32_t data_len_
当前包头声明的 payload 长度。Payload length declared by the current header.
Definition server.hpp:137
RBTree< uint32_t >::Node< Block > * TopicHandle
指向一个 topic 运行时状态块的句柄 / Handle pointing to one topic runtime state block
Definition topic.hpp:116

◆ ReadPayload()

Topic::Server::ParseResult Topic::Server::ReadPayload ( bool from_callback,
bool in_isr )
private

读取当前包的 payload 和尾 CRC,并在成功时发布 / Read the payload and trailing CRC of the current packet and publish it on success

Parameters
from_callback是否来自回调路径 / Whether the current parse comes from callback path
in_isr当前是否位于 ISR / Whether the current path is in ISR context
Returns
当前 payload 阶段的处理结果 / Result of the current payload stage

Definition at line 147 of file server.cpp.

148{
149 if (queue_.Size() < data_len_ + sizeof(uint8_t))
150 {
152 }
153
154 auto* payload_addr =
155 reinterpret_cast<uint8_t*>(parse_buff_.addr_) + sizeof(PackedDataHeader);
156 queue_.PopBatch(payload_addr, data_len_ + sizeof(uint8_t));
157
159 data_len_ + sizeof(PackedDataHeader) + sizeof(uint8_t)))
160 {
161 ResetParser();
163 }
164
165 const auto target_size = current_topic_->data_.payload_size;
166 void* publish_addr = payload_addr;
167 if (reinterpret_cast<uintptr_t>(payload_addr) %
168 current_topic_->data_.payload_alignment !=
169 0)
170 {
171 publish_addr = parse_buff_.addr_;
172 if (data_len_ >= target_size)
173 {
174 LibXR::Memory::FastMove(publish_addr, payload_addr, target_size);
175 }
176 else
177 {
178 LibXR::Memory::FastMove(publish_addr, payload_addr, data_len_);
179 }
180 }
181
182 auto topic = Topic(current_topic_);
183 if (from_callback)
184 {
185 topic.PublishBytesFromServerCallback(publish_addr, target_size, current_timestamp_,
186 in_isr);
187 }
188 else
189 {
190 topic.PublishBytesFromServer(publish_addr, target_size, current_timestamp_);
191 }
192
193 ResetParser();
195}
static void FastMove(void *dst, const void *src, size_t size)
内存搬移 / Memory move
Topic()
构造一个空 topic 视图 / Construct one empty topic view
Definition topic.cpp:115

◆ Register()

void Topic::Server::Register ( TopicHandle topic)

注册一个可接收 packet 的 topic / Register one topic that may receive parsed packets

Parameters
topic目标 topic 句柄 / Target topic handle
Note
注册时会断言该 topic 的 payload_size + PACK_BASE_SIZE 能放进本 server 的 暂存缓冲区。 Registration asserts that the topic's payload_size + PACK_BASE_SIZE fits in this server's staging buffer.
server 的内部暂存缓冲区在构造时固定按 CACHE_LINE_SIZE 对齐分配;注册时还会 断言该 topic 的 payload_alignment <= CACHE_LINE_SIZE。 The server's internal staging buffer is allocated once at CACHE_LINE_SIZE alignment during construction; registration also asserts that the topic's payload_alignment <= CACHE_LINE_SIZE.

Definition at line 23 of file server.cpp.

24{
25 ASSERT(topic != nullptr);
26 ASSERT(topic->data_.payload_size != 0);
27 ASSERT(topic->data_.payload_alignment != 0);
28 ASSERT(topic->data_.payload_alignment <= LibXR::CACHE_LINE_SIZE);
29
30 ASSERT(topic->data_.payload_size + PACK_BASE_SIZE <= parse_buff_.size_);
31
32 auto* node = new RBTree<uint32_t>::Node<TopicHandle>(topic);
33 topic_map_.Insert(*node, topic->key);
34}
红黑树实现,支持泛型键和值,并提供线程安全操作 (Red-Black Tree implementation supporting generic keys and values with thread...
Definition rbt.hpp:23
void Insert(BaseNode &node, KeyType &&key)
在树中插入新节点 (Insert a new node into the tree).
Definition rbt.hpp:235

◆ ResetParser()

void Topic::Server::ResetParser ( )
private

清空当前包的解析上下文并回到找起点状态 / Clear the current packet parsing context and return to the start-search state

Definition at line 197 of file server.cpp.

198{
200 data_len_ = 0;
201 current_topic_ = nullptr;
203}
微秒时间戳 / Microsecond timestamp

◆ SyncToPacketStart()

bool Topic::Server::SyncToPacketStart ( )
private

把输入流同步到下一条 packet 起点 / Synchronize the input stream to the next packet start

Returns
若已找到起始字节则返回 true,否则返回 false / Returns true when a packet start byte is found, otherwise false

Definition at line 80 of file server.cpp.

81{
82 auto queue_size = queue_.Size();
83 for (uint32_t i = 0; i < queue_size; i++)
84 {
85 uint8_t prefix = 0;
86 queue_.Peek(&prefix);
87 if (prefix == PACKET_PREFIX)
88 {
90 return true;
91 }
92 queue_.Pop();
93 }
94
95 return false;
96}
ErrorCode Peek(void *data)
获取队列头部的元素但不移除 (Peek at the front element without removing it).
Definition queue.cpp:56
ErrorCode Pop(void *data=nullptr)
移除队列头部的元素 (Pop the front element from the queue).
Definition queue.cpp:71

Field Documentation

◆ current_timestamp_

MicrosecondTimestamp LibXR::Topic::Server::current_timestamp_
private

当前包头里的时间戳。Timestamp carried by the current packet header.

Definition at line 142 of file server.hpp.

◆ current_topic_

TopicHandle LibXR::Topic::Server::current_topic_ = nullptr
private

当前包命中的目标 topic。Target topic matched by the current packet.

Definition at line 141 of file server.hpp.

◆ data_len_

uint32_t LibXR::Topic::Server::data_len_ = 0
private

当前包头声明的 payload 长度。Payload length declared by the current header.

Definition at line 137 of file server.hpp.

◆ parse_buff_

RawData LibXR::Topic::Server::parse_buff_
private

当前包头和 payload 的暂存缓冲区。Staging buffer holding the current header and payload.

Definition at line 140 of file server.hpp.

◆ queue_

BaseQueue LibXR::Topic::Server::queue_
private

输入字节 FIFO。Input byte FIFO.

Definition at line 139 of file server.hpp.

◆ status_

Status LibXR::Topic::Server::status_ = Status::WAIT_START
private

当前 parser 阶段。Current parser stage.

Definition at line 136 of file server.hpp.

◆ topic_map_

RBTree<uint32_t> LibXR::Topic::Server::topic_map_
private

从 topic 名称 CRC32 到 topic 句柄的映射。Map from topic-name CRC32 to topic handle.

Definition at line 138 of file server.hpp.


The documentation for this class was generated from the following files: