libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
write_port.cpp
1#include "write_port.hpp"
2
3#include <new>
4
5using namespace LibXR;
6
9
10WritePort::WritePort(size_t queue_size, size_t buffer_size)
11 : queue_info_(new (std::align_val_t(LibXR::CACHE_LINE_SIZE))
12 LockFreeQueue<WriteInfoBlock>(queue_size)),
13 queue_data_(buffer_size > 0 ? new (std::align_val_t(LibXR::CACHE_LINE_SIZE))
14 LockFreeQueue<uint8_t>(buffer_size)
15 : nullptr)
16{
17}
18
20{
21 ASSERT(queue_data_ != nullptr);
22 return queue_data_->EmptySize();
23}
24
26{
27 ASSERT(queue_data_ != nullptr);
28 return queue_data_->Size();
29}
30
31bool WritePort::Writable() { return write_fun_ != nullptr; }
32
34{
35 write_fun_ = fun;
36 return *this;
37}
38
39void WritePort::Finish(bool in_isr, ErrorCode ans, WriteInfoBlock& info)
40{
41 if (info.op.type == WriteOperation::OperationType::BLOCK)
42 {
43 block_result_ = ans;
44
45 // Write completion claims the active BLOCK waiter and hands the wakeup to it.
46 // 写完成 claim 当前 BLOCK waiter,并把唤醒交给它。
48 if (busy_.compare_exchange_strong(expected, BusyState::BLOCK_CLAIMED,
49 std::memory_order_acq_rel,
50 std::memory_order_acquire))
51 {
52 info.op.data.sem_info.sem->PostFromCallback(in_isr);
53 return;
54 }
55
56 // The waiter may have timed out and detached before this late completion is
57 // reported.
58 // waiter 可能已经先超时分离,随后迟到完成才上报。
59 if (expected == BusyState::BLOCK_PUBLISHING)
60 {
62 if (busy_.compare_exchange_strong(expected, BusyState::BLOCK_CLAIMED,
63 std::memory_order_acq_rel,
64 std::memory_order_acquire))
65 {
66 info.op.data.sem_info.sem->PostFromCallback(in_isr);
67 return;
68 }
69 }
70
71 ASSERT(expected == BusyState::BLOCK_DETACHED || expected == BusyState::IDLE ||
72 expected == BusyState::LOCKED || expected == BusyState::RESETTING);
73 if (expected == BusyState::BLOCK_DETACHED)
74 {
76 (void)busy_.compare_exchange_strong(expected, BusyState::IDLE,
77 std::memory_order_acq_rel,
78 std::memory_order_acquire);
79 }
80 return;
81 }
82
83 info.op.UpdateStatus(in_isr, ans);
84}
85
87
89{
90 if (Writable())
91 {
92 if (data.size_ == 0)
93 {
94 if (op.type != WriteOperation::OperationType::BLOCK)
95 {
96 op.UpdateStatus(in_isr, ErrorCode::OK);
97 }
98 return ErrorCode::OK;
99 }
100
101 BusyState expected = BusyState::IDLE;
102 if (!busy_.compare_exchange_strong(expected, BusyState::LOCKED,
103 std::memory_order_acq_rel,
104 std::memory_order_acquire))
105 {
106 return ErrorCode::BUSY;
107 }
108
109 return CommitWrite(data, op, false, in_isr);
110 }
111 else
112 {
114 }
115}
116
118 bool in_isr)
119{
120 if (!meta_pushed && queue_info_->EmptySize() < 1)
121 {
122 busy_.store(BusyState::IDLE, std::memory_order_release);
123 return ErrorCode::FULL;
124 }
125
127 if (!meta_pushed)
128 {
129 if (queue_data_->EmptySize() < data.size_)
130 {
131 busy_.store(BusyState::IDLE, std::memory_order_release);
132 return ErrorCode::FULL;
133 }
134
135 ans =
136 queue_data_->PushBatch(reinterpret_cast<const uint8_t*>(data.addr_), data.size_);
137 UNUSED(ans);
138 ASSERT(ans == ErrorCode::OK);
139
140 if (op.type == WriteOperation::OperationType::BLOCK)
141 {
142 // The BLOCK waiter must be armed before queue_info_ becomes visible to a
143 // backend/completion thread.
144 // queue_info_ 对后端/完成线程可见前,必须先挂起 BLOCK waiter。
145 op.MarkAsRunning();
146 busy_.store(BusyState::BLOCK_PUBLISHING, std::memory_order_release);
147 }
148
149 WriteInfoBlock info{data, op};
150 ans = queue_info_->Push(info);
151
152 ASSERT(ans == ErrorCode::OK);
153 }
154
155 if (op.type != WriteOperation::OperationType::BLOCK)
156 {
157 op.MarkAsRunning();
158 }
159 else if (meta_pushed)
160 {
161 op.MarkAsRunning();
162 }
163
164 if (op.type == WriteOperation::OperationType::BLOCK)
165 {
167 if (!busy_.compare_exchange_strong(expected, BusyState::BLOCK_WAITING,
168 std::memory_order_acq_rel,
169 std::memory_order_acquire))
170 {
171 ASSERT(expected == BusyState::BLOCK_CLAIMED);
172 }
173 }
174
175 ans = write_fun_(*this, in_isr);
176
177 if (ans != ErrorCode::PENDING)
178 {
179 if (op.type == WriteOperation::OperationType::BLOCK)
180 {
181 auto state = busy_.load(std::memory_order_acquire);
182 while (state == BusyState::RESETTING)
183 {
184 state = busy_.load(std::memory_order_acquire);
185 }
186
187 if (state == BusyState::BLOCK_CLAIMED)
188 {
189 auto finish_wait_ans = op.data.sem_info.sem->Wait(UINT32_MAX);
190 UNUSED(finish_wait_ans);
191 ASSERT(finish_wait_ans == ErrorCode::OK);
192 busy_.store(BusyState::IDLE, std::memory_order_release);
193 return block_result_;
194 }
195
196 if (state == BusyState::BLOCK_DETACHED)
197 {
198 busy_.store(BusyState::IDLE, std::memory_order_release);
199 return ErrorCode::TIMEOUT;
200 }
201
202 ASSERT(state == BusyState::BLOCK_WAITING || state == BusyState::IDLE ||
203 state == BusyState::LOCKED);
204 busy_.store(BusyState::IDLE, std::memory_order_release);
205 return ans;
206 }
207
208 if (!meta_pushed)
209 {
210 busy_.store(BusyState::IDLE, std::memory_order_release);
211 }
212
213 if (op.type != WriteOperation::OperationType::BLOCK)
214 {
215 op.UpdateStatus(in_isr, ans);
216 }
217 return (static_cast<int8_t>(ans) < 0) ? ans : ErrorCode::OK;
218 }
219
220 if (op.type == WriteOperation::OperationType::BLOCK)
221 {
222 ASSERT(!in_isr);
223 auto wait_ans = op.data.sem_info.sem->Wait(op.data.sem_info.timeout);
224 if (wait_ans == ErrorCode::OK)
225 {
226 // BLOCK_CLAIMED is always released by the waiter itself.
227 // BLOCK_CLAIMED 始终由 waiter 自己释放。
228#ifdef LIBXR_DEBUG_BUILD
229 auto state = busy_.load(std::memory_order_acquire);
230 ASSERT(state == BusyState::BLOCK_CLAIMED);
231#endif
232 busy_.store(BusyState::IDLE, std::memory_order_release);
233 return block_result_;
234 }
235
236 // Timeout won before completion claimed the waiter.
237 // 超时先赢,完成侧还没 claim 当前 waiter。
239 if (busy_.compare_exchange_strong(expected, BusyState::BLOCK_DETACHED,
240 std::memory_order_acq_rel,
241 std::memory_order_acquire))
242 {
243 return ErrorCode::TIMEOUT;
244 }
245
246 if (expected != BusyState::BLOCK_CLAIMED)
247 {
248 while (expected == BusyState::RESETTING)
249 {
250 expected = busy_.load(std::memory_order_acquire);
251 }
252
253 // A detached late completion may already have cleared BLOCK_DETACHED
254 // back to IDLE before this waiter wakes from timeout.
255 // 分离后的迟到完成可能会在当前 waiter 超时醒来前,先把
256 // BLOCK_DETACHED 清回 IDLE。
257 ASSERT(expected == BusyState::BLOCK_DETACHED || expected == BusyState::IDLE ||
258 expected == BusyState::LOCKED);
259 if (expected == BusyState::BLOCK_DETACHED)
260 {
261 busy_.store(BusyState::IDLE, std::memory_order_release);
262 }
263 return ErrorCode::TIMEOUT;
264 }
265
266 // Timeout lost after completion had already claimed the waiter.
267 // 超时发生得太晚,完成侧已经 claim 了当前 waiter。
268 auto finish_wait_ans = op.data.sem_info.sem->Wait(UINT32_MAX);
269 UNUSED(finish_wait_ans);
270 ASSERT(finish_wait_ans == ErrorCode::OK);
271 busy_.store(BusyState::IDLE, std::memory_order_release);
272
273 return block_result_;
274 }
275
276 if (!meta_pushed)
277 {
278 busy_.store(BusyState::IDLE, std::memory_order_release);
279 }
280
281 return ErrorCode::OK;
282}
283
284void WritePort::FailAndClearAll(ErrorCode reason, bool in_isr)
285{
286 ASSERT(queue_data_ != nullptr);
287 WriteInfoBlock info{};
288
289 while (true)
290 {
291 auto state = busy_.load(std::memory_order_acquire);
292
293 if (state == BusyState::LOCKED)
294 {
295 DEV_ASSERT_FROM_CALLBACK(false, in_isr);
296 return;
297 }
298
299 if (state == BusyState::BLOCK_PUBLISHING)
300 {
301 DEV_ASSERT_FROM_CALLBACK(false, in_isr);
302 return;
303 }
304
305 if (state == BusyState::RESETTING)
306 {
307 DEV_ASSERT_FROM_CALLBACK(false, in_isr);
308 return;
309 }
310
311 if (state == BusyState::IDLE)
312 {
313 BusyState expected = BusyState::IDLE;
314 if (!busy_.compare_exchange_strong(expected, BusyState::RESETTING,
315 std::memory_order_acq_rel,
316 std::memory_order_acquire))
317 {
318 continue;
319 }
320
322 while (queue_info_->Pop(info) == ErrorCode::OK)
323 {
324 Finish(in_isr, reason, info);
325 }
327 busy_.store(BusyState::IDLE, std::memory_order_release);
328 return;
329 }
330
331 if (state == BusyState::BLOCK_WAITING)
332 {
333 // Keep BLOCK_WAITING visible until Finish() hands the terminal wakeup to
334 // the blocked caller. Switching to RESETTING here would break that
335 // existing waiter handoff.
336 // 这里必须保留 BLOCK_WAITING,直到 Finish() 把最终唤醒交给当前
337 // BLOCK waiter;若先切成 RESETTING,会破坏既有 waiter 交接。
339 while (queue_info_->Pop(info) == ErrorCode::OK)
340 {
341 Finish(in_isr, reason, info);
342 }
343 return;
344 }
345
346 if (state == BusyState::BLOCK_DETACHED)
347 {
348 // The waiter is already gone, but BLOCK_DETACHED still blocks reentrant
349 // submissions while old queue entries are drained.
350 // waiter 已经离开,但 BLOCK_DETACHED 仍能在清理旧队列期间挡住重入提交。
352 while (queue_info_->Pop(info) == ErrorCode::OK)
353 {
354 // The waiter has already detached. Finish() will clear the local state
355 // without re-posting that waiter.
356 // waiter 已经分离。Finish() 会清理本地状态,但不会重新唤醒该 waiter。
357 Finish(in_isr, reason, info);
358 }
360 busy_.store(BusyState::IDLE, std::memory_order_release);
361 return;
362 }
363
364 if (state == BusyState::BLOCK_CLAIMED)
365 {
366 return;
367 }
368 }
369}
只读原始数据视图 / Immutable raw data view
size_t size_
数据字节数 / Data size in bytes
const void * addr_
数据起始地址 / Data start address
无锁队列实现 / Lock-free queue implementation
void Reset()
重置队列 / Resets the queue
ErrorCode PushBatch(const Data *data, size_t size)
批量推入数据 / Pushes multiple elements into the queue
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue
union LibXR::Operation::@5 data
void UpdateStatus(bool in_isr, Status &&status)
Updates operation status based on type.
void MarkAsRunning()
标记操作为运行状态。 Marks the operation as running.
OperationType type
void PostFromCallback(bool in_isr)
从中断回调中释放(增加)信号量 Releases (increments) the semaphore from an ISR (Interrupt Service Routine)
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待(减少)信号量 Waits (decrements) the semaphore
Definition semaphore.cpp:53
WritePort class for handling write operations.
size_t EmptySize()
获取数据队列的剩余可用空间。 Gets the remaining available space in the data queue.
void FailAndClearAll(ErrorCode reason, bool in_isr)
失败完成并清空当前所有挂起写操作。
size_t Size()
获取当前数据队列的已使用大小。 Gets the used size of the current data queue.
void Finish(bool in_isr, ErrorCode ans, WriteInfoBlock &info)
更新写入操作的状态。 Updates the status of the write operation.
ErrorCode block_result_
Final status for the current BLOCK write. 当前 BLOCK 写入的最终结果。
WritePort(size_t queue_size=3, size_t buffer_size=128)
构造一个新的 WritePort 对象。 Constructs a new WritePort object.
WritePort & operator=(WriteFun fun)
赋值运算符重载,用于设置写入函数。 Overloaded assignment operator to set the write function.
std::atomic< BusyState > busy_
Shared submit/wait handoff state. 共享的提交/等待交接状态。
@ BLOCK_CLAIMED
Final wakeup belongs to the current waiter. 最终唤醒已归当前等待者所有。
@ LOCKED
Submission path owns queue mutation. 提交路径占有写队列/元数据修改权。
bool Writable()
判断端口是否可写。 Checks whether the port is writable.
WriteFun write_fun_
Driver/backend write entry. 底层驱动或后端写入入口。
ErrorCode CommitWrite(ConstRawData data, WriteOperation &op, bool pushed=false, bool in_isr=false)
提交写入操作。 Commits a write operation.
void MarkAsRunning(WriteOperation &op)
标记写入操作为运行中。 Marks the write operation as running.
LockFreeQueue< uint8_t > * queue_data_
Payload queue for pending write bytes. 挂起写入字节的数据队列。
LockFreeQueue< WriteInfoBlock > * queue_info_
Metadata queue for pending write batches. 挂起写批次的元数据队列。
ErrorCode operator()(ConstRawData data, WriteOperation &op, bool in_isr=false)
执行写入操作。 Performs a write operation.
LibXR 命名空间
Definition ch32_can.hpp:14
ErrorCode
定义错误码枚举
@ TIMEOUT
超时 | Timeout
@ BUSY
忙碌 | Busy
@ NOT_SUPPORT
不支持 | Not supported
@ FULL
已满 | Full
@ PENDING
等待中 | Pending
@ OK
操作成功 | Operation successful
ErrorCode(* WriteFun)(WritePort &port, bool in_isr)
Function pointer type for write operations.
constexpr size_t CACHE_LINE_SIZE
缓存行大小 / Cache line size
Definition libxr_def.hpp:32
WriteOperation op
Write operation instance. 写入操作实例。