libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
LibXR::WritePort Class Reference

WritePort class for handling write operations. More...

#include <write_port.hpp>

Collaboration diagram for LibXR::WritePort:
[legend]

Data Structures

class  Stream
 

Public Types

enum class  BusyState : uint32_t {
  LOCKED , BLOCK_PUBLISHING = 1 , BLOCK_WAITING = 2 , BLOCK_CLAIMED ,
  BLOCK_DETACHED = 4 , RESETTING = 5 , IDLE = UINT32_MAX
}
 

Public Member Functions

 WritePort (size_t queue_size=3, size_t buffer_size=128)
 构造一个新的 WritePort 对象。 Constructs a new WritePort object.
 
size_t EmptySize ()
 获取数据队列的剩余可用空间。 Gets the remaining available space in the data queue.
 
size_t Size ()
 获取当前数据队列的已使用大小。 Gets the used size of the current data queue.
 
bool Writable ()
 判断端口是否可写。 Checks whether the port is writable.
 
WritePortoperator= (WriteFun fun)
 赋值运算符重载,用于设置写入函数。 Overloaded assignment operator to set the write function.
 
void Finish (bool in_isr, ErrorCode ans, WriteInfoBlock &info)
 更新写入操作的状态。 Updates the status of the write operation.
 
void MarkAsRunning (WriteOperation &op)
 标记写入操作为运行中。 Marks the write operation as running.
 
ErrorCode operator() (ConstRawData data, WriteOperation &op, bool in_isr=false)
 执行写入操作。 Performs a write operation.
 
void FailAndClearAll (ErrorCode reason, bool in_isr)
 失败完成并清空当前所有挂起写操作。
 
ErrorCode CommitWrite (ConstRawData data, WriteOperation &op, bool pushed=false, bool in_isr=false)
 提交写入操作。 Commits a write operation.
 

Data Fields

WriteFun write_fun_ = nullptr
 Driver/backend write entry. 底层驱动或后端写入入口。
 
LockFreeQueue< WriteInfoBlock > * queue_info_
 Metadata queue for pending write batches. 挂起写批次的元数据队列。
 
LockFreeQueue< uint8_t > * queue_data_
 Payload queue for pending write bytes. 挂起写入字节的数据队列。
 
std::atomic< BusyStatebusy_ {BusyState::IDLE}
 Shared submit/wait handoff state. 共享的提交/等待交接状态。
 
ErrorCode block_result_ = ErrorCode::OK
 Final status for the current BLOCK write. 当前 BLOCK 写入的最终结果。
 

Detailed Description

WritePort class for handling write operations.

处理写入操作的WritePort类。

Definition at line 18 of file write_port.hpp.

Member Enumeration Documentation

◆ BusyState

enum class LibXR::WritePort::BusyState : uint32_t
strong
Enumerator
LOCKED 

Submission path owns queue mutation. 提交路径占有写队列/元数据修改权。

BLOCK_PUBLISHING 

BLOCK submitter is publishing queue metadata. BLOCK 提交者正在发布队列元数据。

BLOCK_WAITING 

One BLOCK waiter is armed and waiting for final completion. 一个 BLOCK 等待者已经挂起,等待最终完成。

BLOCK_CLAIMED 

Final wakeup belongs to the current waiter. 最终唤醒已归当前等待者所有。

BLOCK_DETACHED 

Waiter already timed out/detached; completion must not post. 等待者已超时/被分离,完成侧不能再 Post。

RESETTING 

Fail-and-clear owns queue mutation. Fail-and-clear 路径占有写队列/元数据修改权。

IDLE 

No active submitter and no armed BLOCK waiter. 没有活动提交者,也没有挂起中的 BLOCK 等待者。

Definition at line 45 of file write_port.hpp.

46 {
47 LOCKED =
48 0,
51 BLOCK_WAITING = 2,
54 3,
55 BLOCK_DETACHED = 4,
57 RESETTING = 5,
59 IDLE = UINT32_MAX
61 };
@ BLOCK_CLAIMED
Final wakeup belongs to the current waiter. 最终唤醒已归当前等待者所有。
@ LOCKED
Submission path owns queue mutation. 提交路径占有写队列/元数据修改权。

Constructor & Destructor Documentation

◆ WritePort()

WritePort::WritePort ( size_t queue_size = 3,
size_t buffer_size = 128 )

构造一个新的 WritePort 对象。 Constructs a new WritePort object.

该构造函数初始化无锁操作队列 queue_info_ 和数据块队列 queue_data_。 This constructor initializes the lock-free operation queue queue_info_ and the data block queue queue_data_.

Parameters
queue_size队列的大小,默认为 3。 The size of the queue, default is 3.
buffer_size缓存数据字节队列的容量,默认为 128。 Capacity of the cached-byte queue, default is 128.
Note
包含动态内存分配。 Contains dynamic memory allocation.

Definition at line 10 of file write_port.cpp.

11 : queue_info_(new (std::align_val_t(LibXR::CACHE_LINE_SIZE))
13 queue_data_(buffer_size > 0 ? new (std::align_val_t(LibXR::CACHE_LINE_SIZE))
14 LockFreeQueue<uint8_t>(buffer_size)
15 : nullptr)
16{
17}
无锁队列实现 / Lock-free queue implementation
LockFreeQueue< uint8_t > * queue_data_
Payload queue for pending write bytes. 挂起写入字节的数据队列。
LockFreeQueue< WriteInfoBlock > * queue_info_
Metadata queue for pending write batches. 挂起写批次的元数据队列。
constexpr size_t CACHE_LINE_SIZE
缓存行大小 / Cache line size
Definition libxr_def.hpp:56

Member Function Documentation

◆ CommitWrite()

ErrorCode WritePort::CommitWrite ( ConstRawData data,
WriteOperation & op,
bool pushed = false,
bool in_isr = false )

提交写入操作。 Commits a write operation.

Parameters
data写入的原始数据 / Raw data to be written
op写入操作对象,包含操作类型和同步机制。 Write operation object containing the operation type and synchronization
pushed数据是否已经推送到缓冲区 / Whether the data has been pushed to the buffer
in_isr指示是否在中断上下文中执行。 Indicates whether the operation is executed in an interrupt context.
Returns
返回操作的 ErrorCode,指示操作结果。 Returns an ErrorCode indicating the result of the operation.

Definition at line 117 of file write_port.cpp.

119{
120 if (!meta_pushed && queue_info_->EmptySize() < 1)
121 {
122 busy_.store(BusyState::IDLE, std::memory_order_release);
123 return ErrorCode::FULL;
124 }
125
127 if (!meta_pushed)
128 {
129 if (queue_data_->EmptySize() < data.size_)
130 {
131 busy_.store(BusyState::IDLE, std::memory_order_release);
132 return ErrorCode::FULL;
133 }
134
135 ans =
136 queue_data_->PushBatch(reinterpret_cast<const uint8_t*>(data.addr_), data.size_);
137 UNUSED(ans);
138 ASSERT(ans == ErrorCode::OK);
139
140 if (op.type == WriteOperation::OperationType::BLOCK)
141 {
142 // The BLOCK waiter must be armed before queue_info_ becomes visible to a
143 // backend/completion thread.
144 // queue_info_ 对后端/完成线程可见前,必须先挂起 BLOCK waiter。
145 op.MarkAsRunning();
146 busy_.store(BusyState::BLOCK_PUBLISHING, std::memory_order_release);
147 }
148
149 WriteInfoBlock info{data, op};
150 ans = queue_info_->Push(info);
151
152 ASSERT(ans == ErrorCode::OK);
153 }
154
155 if (op.type != WriteOperation::OperationType::BLOCK)
156 {
157 op.MarkAsRunning();
158 }
159 else if (meta_pushed)
160 {
161 op.MarkAsRunning();
162 }
163
164 if (op.type == WriteOperation::OperationType::BLOCK)
165 {
167 if (!busy_.compare_exchange_strong(expected, BusyState::BLOCK_WAITING,
168 std::memory_order_acq_rel,
169 std::memory_order_acquire))
170 {
171 ASSERT(expected == BusyState::BLOCK_CLAIMED);
172 }
173 }
174
175 ans = write_fun_(*this, in_isr);
176
177 if (ans != ErrorCode::PENDING)
178 {
179 if (op.type == WriteOperation::OperationType::BLOCK)
180 {
181 auto state = busy_.load(std::memory_order_acquire);
182 while (state == BusyState::RESETTING)
183 {
184 state = busy_.load(std::memory_order_acquire);
185 }
186
187 if (state == BusyState::BLOCK_CLAIMED)
188 {
189 auto finish_wait_ans = op.data.sem_info.sem->Wait(UINT32_MAX);
190 UNUSED(finish_wait_ans);
191 ASSERT(finish_wait_ans == ErrorCode::OK);
192 busy_.store(BusyState::IDLE, std::memory_order_release);
193 return block_result_;
194 }
195
196 if (state == BusyState::BLOCK_DETACHED)
197 {
198 busy_.store(BusyState::IDLE, std::memory_order_release);
199 return ErrorCode::TIMEOUT;
200 }
201
202 ASSERT(state == BusyState::BLOCK_WAITING || state == BusyState::IDLE ||
203 state == BusyState::LOCKED);
204 busy_.store(BusyState::IDLE, std::memory_order_release);
205 return ans;
206 }
207
208 if (!meta_pushed)
209 {
210 busy_.store(BusyState::IDLE, std::memory_order_release);
211 }
212
213 if (op.type != WriteOperation::OperationType::BLOCK)
214 {
215 op.UpdateStatus(in_isr, ans);
216 }
217 return (static_cast<int8_t>(ans) < 0) ? ans : ErrorCode::OK;
218 }
219
220 if (op.type == WriteOperation::OperationType::BLOCK)
221 {
222 ASSERT(!in_isr);
223 auto wait_ans = op.data.sem_info.sem->Wait(op.data.sem_info.timeout);
224 if (wait_ans == ErrorCode::OK)
225 {
226 // BLOCK_CLAIMED is always released by the waiter itself.
227 // BLOCK_CLAIMED 始终由 waiter 自己释放。
228#ifdef LIBXR_DEBUG_BUILD
229 auto state = busy_.load(std::memory_order_acquire);
230 ASSERT(state == BusyState::BLOCK_CLAIMED);
231#endif
232 busy_.store(BusyState::IDLE, std::memory_order_release);
233 return block_result_;
234 }
235
236 // Timeout won before completion claimed the waiter.
237 // 超时先赢,完成侧还没 claim 当前 waiter。
239 if (busy_.compare_exchange_strong(expected, BusyState::BLOCK_DETACHED,
240 std::memory_order_acq_rel,
241 std::memory_order_acquire))
242 {
243 return ErrorCode::TIMEOUT;
244 }
245
246 if (expected != BusyState::BLOCK_CLAIMED)
247 {
248 while (expected == BusyState::RESETTING)
249 {
250 expected = busy_.load(std::memory_order_acquire);
251 }
252
253 // A detached late completion may already have cleared BLOCK_DETACHED
254 // back to IDLE before this waiter wakes from timeout.
255 // 分离后的迟到完成可能会在当前 waiter 超时醒来前,先把
256 // BLOCK_DETACHED 清回 IDLE。
257 ASSERT(expected == BusyState::BLOCK_DETACHED || expected == BusyState::IDLE ||
258 expected == BusyState::LOCKED);
259 if (expected == BusyState::BLOCK_DETACHED)
260 {
261 busy_.store(BusyState::IDLE, std::memory_order_release);
262 }
263 return ErrorCode::TIMEOUT;
264 }
265
266 // Timeout lost after completion had already claimed the waiter.
267 // 超时发生得太晚,完成侧已经 claim 了当前 waiter。
268 auto finish_wait_ans = op.data.sem_info.sem->Wait(UINT32_MAX);
269 UNUSED(finish_wait_ans);
270 ASSERT(finish_wait_ans == ErrorCode::OK);
271 busy_.store(BusyState::IDLE, std::memory_order_release);
272
273 return block_result_;
274 }
275
276 if (!meta_pushed)
277 {
278 busy_.store(BusyState::IDLE, std::memory_order_release);
279 }
280
281 return ErrorCode::OK;
282}
size_t size_
数据字节数 / Data size in bytes
const void * addr_
数据起始地址 / Data start address
ErrorCode PushBatch(const Data *data, size_t size)
批量推入数据 / Pushes multiple elements into the queue
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
union LibXR::Operation::@5 data
void UpdateStatus(bool in_isr, Status &&status)
Updates operation status based on type.
void MarkAsRunning()
标记操作为运行状态。 Marks the operation as running.
OperationType type
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待(减少)信号量 Waits (decrements) the semaphore
Definition semaphore.cpp:53
ErrorCode block_result_
Final status for the current BLOCK write. 当前 BLOCK 写入的最终结果。
std::atomic< BusyState > busy_
Shared submit/wait handoff state. 共享的提交/等待交接状态。
WriteFun write_fun_
Driver/backend write entry. 底层驱动或后端写入入口。
ErrorCode
定义错误码枚举
@ TIMEOUT
超时 | Timeout
@ FULL
已满 | Full
@ PENDING
等待中 | Pending
@ OK
操作成功 | Operation successful

◆ EmptySize()

size_t WritePort::EmptySize ( )

获取数据队列的剩余可用空间。 Gets the remaining available space in the data queue.

Returns
返回数据队列的空闲大小。 Returns the empty size of the data queue.

Definition at line 19 of file write_port.cpp.

20{
21 ASSERT(queue_data_ != nullptr);
22 return queue_data_->EmptySize();
23}

◆ FailAndClearAll()

void WritePort::FailAndClearAll ( ErrorCode reason,
bool in_isr )

失败完成并清空当前所有挂起写操作。

Fail-complete and clear all currently pending write operations.

Note
Driver-only: call this only after the backend is known to be unavailable.
仅供驱动层在后端已明确不可用后调用。
The surrounding driver must already guarantee that no new front-end submissions or back-end completion/data events can still arrive for this port.
外围驱动还必须先保证:这条端口后续不会再收到新的前端提交,也不会再收到 新的后端完成或数据事件。
Seeing LOCKED / BLOCK_PUBLISHING / RESETTING here means the caller violated that driver-side precondition.
若此时仍看到 LOCKED / BLOCK_PUBLISHING / RESETTING,说明调用方违反了 上述驱动层前提。
Dev builds fire DEV_ASSERT, while release builds still back off.
开发期会触发 DEV_ASSERT,发布构建仍保持直接退回。
Parameters
reason最终失败原因 / Final failure reason
in_isr是否在 ISR 上下文 / Whether running in ISR context

Definition at line 284 of file write_port.cpp.

285{
286 ASSERT(queue_data_ != nullptr);
287 WriteInfoBlock info{};
288
289 while (true)
290 {
291 auto state = busy_.load(std::memory_order_acquire);
292
293 if (state == BusyState::LOCKED)
294 {
295 DEV_ASSERT_FROM_CALLBACK(false, in_isr);
296 return;
297 }
298
299 if (state == BusyState::BLOCK_PUBLISHING)
300 {
301 DEV_ASSERT_FROM_CALLBACK(false, in_isr);
302 return;
303 }
304
305 if (state == BusyState::RESETTING)
306 {
307 DEV_ASSERT_FROM_CALLBACK(false, in_isr);
308 return;
309 }
310
311 if (state == BusyState::IDLE)
312 {
313 BusyState expected = BusyState::IDLE;
314 if (!busy_.compare_exchange_strong(expected, BusyState::RESETTING,
315 std::memory_order_acq_rel,
316 std::memory_order_acquire))
317 {
318 continue;
319 }
320
322 while (queue_info_->Pop(info) == ErrorCode::OK)
323 {
324 Finish(in_isr, reason, info);
325 }
327 busy_.store(BusyState::IDLE, std::memory_order_release);
328 return;
329 }
330
331 if (state == BusyState::BLOCK_WAITING)
332 {
333 // Keep BLOCK_WAITING visible until Finish() hands the terminal wakeup to
334 // the blocked caller. Switching to RESETTING here would break that
335 // existing waiter handoff.
336 // 这里必须保留 BLOCK_WAITING,直到 Finish() 把最终唤醒交给当前
337 // BLOCK waiter;若先切成 RESETTING,会破坏既有 waiter 交接。
339 while (queue_info_->Pop(info) == ErrorCode::OK)
340 {
341 Finish(in_isr, reason, info);
342 }
343 return;
344 }
345
346 if (state == BusyState::BLOCK_DETACHED)
347 {
348 // The waiter is already gone, but BLOCK_DETACHED still blocks reentrant
349 // submissions while old queue entries are drained.
350 // waiter 已经离开,但 BLOCK_DETACHED 仍能在清理旧队列期间挡住重入提交。
352 while (queue_info_->Pop(info) == ErrorCode::OK)
353 {
354 // The waiter has already detached. Finish() will clear the local state
355 // without re-posting that waiter.
356 // waiter 已经分离。Finish() 会清理本地状态,但不会重新唤醒该 waiter。
357 Finish(in_isr, reason, info);
358 }
360 busy_.store(BusyState::IDLE, std::memory_order_release);
361 return;
362 }
363
364 if (state == BusyState::BLOCK_CLAIMED)
365 {
366 return;
367 }
368 }
369}
void Reset()
重置队列 / Resets the queue
void Finish(bool in_isr, ErrorCode ans, WriteInfoBlock &info)
更新写入操作的状态。 Updates the status of the write operation.

◆ Finish()

void WritePort::Finish ( bool in_isr,
ErrorCode ans,
WriteInfoBlock & info )

更新写入操作的状态。 Updates the status of the write operation.

该函数在写入操作完成时更新对应 info 的状态,并调用 UpdateStatus 更新 op。 This function updates the status stored in info and calls UpdateStatus on op when a write operation completes.

Parameters
in_isr指示是否在中断上下文中执行。 Indicates whether the operation is executed in an interrupt context.
ans错误码,用于指示操作的结果。 Error code indicating the result of the operation.
op需要更新状态的 WriteOperation 引用。 Reference to the WriteOperation whose status needs to be updated.

Definition at line 39 of file write_port.cpp.

40{
41 if (info.op.type == WriteOperation::OperationType::BLOCK)
42 {
43 block_result_ = ans;
44
45 // Write completion claims the active BLOCK waiter and hands the wakeup to it.
46 // 写完成 claim 当前 BLOCK waiter,并把唤醒交给它。
48 if (busy_.compare_exchange_strong(expected, BusyState::BLOCK_CLAIMED,
49 std::memory_order_acq_rel,
50 std::memory_order_acquire))
51 {
52 info.op.data.sem_info.sem->PostFromCallback(in_isr);
53 return;
54 }
55
56 // The waiter may have timed out and detached before this late completion is
57 // reported.
58 // waiter 可能已经先超时分离,随后迟到完成才上报。
59 if (expected == BusyState::BLOCK_PUBLISHING)
60 {
62 if (busy_.compare_exchange_strong(expected, BusyState::BLOCK_CLAIMED,
63 std::memory_order_acq_rel,
64 std::memory_order_acquire))
65 {
66 info.op.data.sem_info.sem->PostFromCallback(in_isr);
67 return;
68 }
69 }
70
71 ASSERT(expected == BusyState::BLOCK_DETACHED || expected == BusyState::IDLE ||
72 expected == BusyState::LOCKED || expected == BusyState::RESETTING);
73 if (expected == BusyState::BLOCK_DETACHED)
74 {
76 (void)busy_.compare_exchange_strong(expected, BusyState::IDLE,
77 std::memory_order_acq_rel,
78 std::memory_order_acquire);
79 }
80 return;
81 }
82
83 info.op.UpdateStatus(in_isr, ans);
84}
void PostFromCallback(bool in_isr)
从中断回调中释放(增加)信号量 Releases (increments) the semaphore from an ISR (Interrupt Service Routine)
WriteOperation op
Write operation instance. 写入操作实例。

◆ MarkAsRunning()

void WritePort::MarkAsRunning ( WriteOperation & op)

标记写入操作为运行中。 Marks the write operation as running.

该函数用于将 op 标记为运行状态,以指示当前正在进行写入操作。 This function marks op as running to indicate an ongoing write operation.

Parameters
op需要更新状态的 WriteOperation 引用。 Reference to the WriteOperation whose status needs to be updated.

Definition at line 86 of file write_port.cpp.

86{ op.MarkAsRunning(); }

◆ operator()()

ErrorCode WritePort::operator() ( ConstRawData data,
WriteOperation & op,
bool in_isr = false )

执行写入操作。 Performs a write operation.

该函数检查端口是否可写,并根据 data.size_ 和 op 的类型执行不同的操作。 This function checks if the port is writable and performs different actions based on data.size_ and the type of op.

Parameters
data需要写入的原始数据。 Raw data to be written.
op写入操作对象,包含操作类型和同步机制。 Write operation object containing the operation type and synchronization mechanism.
in_isr指示是否在中断上下文中执行。 Indicates whether the operation is executed in an interrupt context.
Returns
返回操作的 ErrorCode,指示操作结果。 Returns an ErrorCode indicating the result of the operation.

Definition at line 88 of file write_port.cpp.

89{
90 if (Writable())
91 {
92 if (data.size_ == 0)
93 {
94 if (op.type != WriteOperation::OperationType::BLOCK)
95 {
96 op.UpdateStatus(in_isr, ErrorCode::OK);
97 }
98 return ErrorCode::OK;
99 }
100
101 BusyState expected = BusyState::IDLE;
102 if (!busy_.compare_exchange_strong(expected, BusyState::LOCKED,
103 std::memory_order_acq_rel,
104 std::memory_order_acquire))
105 {
106 return ErrorCode::BUSY;
107 }
108
109 return CommitWrite(data, op, false, in_isr);
110 }
111 else
112 {
114 }
115}
bool Writable()
判断端口是否可写。 Checks whether the port is writable.
ErrorCode CommitWrite(ConstRawData data, WriteOperation &op, bool pushed=false, bool in_isr=false)
提交写入操作。 Commits a write operation.
@ BUSY
忙碌 | Busy
@ NOT_SUPPORT
不支持 | Not supported

◆ operator=()

WritePort & WritePort::operator= ( WriteFun fun)

赋值运算符重载,用于设置写入函数。 Overloaded assignment operator to set the write function.

该函数允许使用 WriteFun 类型的函数对象赋值给 WritePort,从而设置 write_fun_。 This function allows assigning a WriteFun function object to WritePort, setting write_fun_.

Parameters
fun要分配的写入函数。 The write function to be assigned.
Returns
返回自身的引用,以支持链式调用。 Returns a reference to itself for chaining.

Definition at line 33 of file write_port.cpp.

34{
35 write_fun_ = fun;
36 return *this;
37}

◆ Size()

size_t WritePort::Size ( )

获取当前数据队列的已使用大小。 Gets the used size of the current data queue.

Returns
返回数据队列的已使用大小。 Returns the size of the data queue.

Definition at line 25 of file write_port.cpp.

26{
27 ASSERT(queue_data_ != nullptr);
28 return queue_data_->Size();
29}
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue

◆ Writable()

bool WritePort::Writable ( )

判断端口是否可写。 Checks whether the port is writable.

Returns
如果 write_fun_ 不为空,则返回 true,否则返回 false。 Returns true if write_fun_ is not null, otherwise returns false.

Definition at line 31 of file write_port.cpp.

31{ return write_fun_ != nullptr; }

Field Documentation

◆ block_result_

ErrorCode LibXR::WritePort::block_result_ = ErrorCode::OK

Final status for the current BLOCK write. 当前 BLOCK 写入的最终结果。

Definition at line 69 of file write_port.hpp.

◆ busy_

std::atomic<BusyState> LibXR::WritePort::busy_ {BusyState::IDLE}

Shared submit/wait handoff state. 共享的提交/等待交接状态。

Definition at line 68 of file write_port.hpp.

◆ queue_data_

LockFreeQueue<uint8_t>* LibXR::WritePort::queue_data_
Initial value:
=
nullptr

Payload queue for pending write bytes. 挂起写入字节的数据队列。

Definition at line 66 of file write_port.hpp.

◆ queue_info_

LockFreeQueue<WriteInfoBlock>* LibXR::WritePort::queue_info_
Initial value:
=
nullptr

Metadata queue for pending write batches. 挂起写批次的元数据队列。

Definition at line 64 of file write_port.hpp.

◆ write_fun_

WriteFun LibXR::WritePort::write_fun_ = nullptr

Driver/backend write entry. 底层驱动或后端写入入口。

Definition at line 63 of file write_port.hpp.


The documentation for this class was generated from the following files: