libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
read_port.cpp
1#include "read_port.hpp"
2
3#include <new>
4
5using namespace LibXR;
6
7ReadPort::ReadPort(size_t buffer_size)
8 : queue_data_(buffer_size > 0 ? new (std::align_val_t(LibXR::CACHE_LINE_SIZE))
9 LockFreeQueue<uint8_t>(buffer_size)
10 : nullptr)
11{
12}
13
15{
16 ASSERT(queue_data_ != nullptr);
17 return queue_data_->EmptySize();
18}
19
21{
22 ASSERT(queue_data_ != nullptr);
23 return queue_data_->Size();
24}
25
26bool ReadPort::Readable() { return read_fun_ != nullptr; }
27
29{
30 read_fun_ = fun;
31 return *this;
32}
33
34void ReadPort::Finish(bool in_isr, ErrorCode ans, ReadInfoBlock& info)
35{
36 if (info.op.type == ReadOperation::OperationType::BLOCK)
37 {
38 // Read completion is queue-driven. ProcessPendingReads() must claim the
39 // BLOCK waiter before data is copied and Finish() is called.
40 // 读完成只走队列路径。ProcessPendingReads() 必须先 claim BLOCK waiter,
41 // 再拷贝数据并调用 Finish()。
42 ASSERT(busy_.load(std::memory_order_acquire) == BusyState::BLOCK_CLAIMED);
43 block_result_ = ans;
44 info.op.data.sem_info.sem->PostFromCallback(in_isr);
45 return;
46 }
47
48 busy_.store(BusyState::IDLE, std::memory_order_release);
49 info.op.UpdateStatus(in_isr, ans);
50}
51
53
55{
56 if (Readable())
57 {
58 BusyState is_busy = busy_.load(std::memory_order_acquire);
59
60 if (is_busy != BusyState::IDLE && is_busy != BusyState::EVENT)
61 {
62 return ErrorCode::BUSY;
63 }
64
65 while (true)
66 {
67 busy_.store(BusyState::IDLE, std::memory_order_release);
68
69 auto readable_size = queue_data_->Size();
70
71 if (readable_size >= data.size_ && readable_size != 0)
72 {
73 if (data.size_ > 0)
74 {
75 auto ans =
76 queue_data_->PopBatch(reinterpret_cast<uint8_t*>(data.addr_), data.size_);
77 ASSERT(ans == ErrorCode::OK);
78 OnRxDequeue(in_isr);
79 }
80
81 if (op.type != ReadOperation::OperationType::BLOCK)
82 {
83 op.UpdateStatus(in_isr, ErrorCode::OK);
84 }
85 return ErrorCode::OK;
86 }
87
88 info_ = ReadInfoBlock{data, op};
89
90 op.MarkAsRunning();
91
92 BusyState expected = BusyState::IDLE;
93 if (!busy_.compare_exchange_strong(expected, BusyState::PENDING,
94 std::memory_order_acq_rel,
95 std::memory_order_acquire))
96 {
97 ASSERT(expected == BusyState::EVENT);
98 continue;
99 }
100
101 auto ans = read_fun_(*this, in_isr);
102 if (static_cast<int8_t>(ans) >= 0)
103 {
104 break;
105 }
106
107 // read_fun_ failed while arming/notifying the backend. Roll back only if no
108 // producer has completed this pending read concurrently.
109 // read_fun_ 挂起/通知底层失败;只有未被 producer 并发完成时,才回滚 pending。
110 expected = BusyState::PENDING;
111 if (busy_.compare_exchange_strong(expected, BusyState::IDLE,
112 std::memory_order_acq_rel,
113 std::memory_order_acquire))
114 {
115 if (op.type != ReadOperation::OperationType::BLOCK)
116 {
117 op.UpdateStatus(in_isr, ans);
118 }
119 return ans;
120 }
121
122 if (expected == BusyState::BLOCK_DETACHED)
123 {
124 return ErrorCode::TIMEOUT;
125 }
126 if (expected == BusyState::IDLE)
127 {
128 // A non-BLOCK read may have completed through ProcessPendingReads() before the
129 // arm failure returned.
130 // 非 BLOCK 读可能在挂起失败返回前,已经通过 ProcessPendingReads() 完成。
131 ASSERT(op.type != ReadOperation::OperationType::BLOCK);
132 return ErrorCode::OK;
133 }
134 ASSERT(expected == BusyState::BLOCK_CLAIMED);
135 break;
136 }
137
138 if (op.type == ReadOperation::OperationType::BLOCK)
139 {
140 ASSERT(!in_isr);
141 auto wait_ans = op.data.sem_info.sem->Wait(op.data.sem_info.timeout);
142 if (wait_ans == ErrorCode::OK)
143 {
144 // BLOCK_CLAIMED is always released by the waiter itself.
145 // BLOCK_CLAIMED 始终由 waiter 自己释放。
146#ifdef LIBXR_DEBUG_BUILD
147 auto state = busy_.load(std::memory_order_acquire);
148 ASSERT(state == BusyState::BLOCK_CLAIMED);
149#endif
150 busy_.store(BusyState::IDLE, std::memory_order_release);
151 return block_result_;
152 }
153
154 // BLOCK wait timed out after the backend had accepted the read. Cancel only if
155 // completion has not claimed this waiter.
156 // 底层已接受读请求后,BLOCK 等待超时;只有完成侧尚未 claim 当前 waiter 时才取消。
157 BusyState expected = BusyState::PENDING;
158 if (busy_.compare_exchange_strong(expected, BusyState::IDLE,
159 std::memory_order_acq_rel,
160 std::memory_order_acquire))
161 {
162 return ErrorCode::TIMEOUT;
163 }
164
165 if (expected == BusyState::BLOCK_DETACHED)
166 {
167 // The waiter had already detached before the timeout-side cancel won.
168 // 当前 waiter 已经先分离;超时侧负责把端口收回 IDLE。
169 busy_.store(BusyState::IDLE, std::memory_order_release);
170 return ErrorCode::TIMEOUT;
171 }
172
173 ASSERT(expected == BusyState::BLOCK_CLAIMED);
174
175 // Timeout lost after completion had already claimed the waiter.
176 // 超时发生得太晚,完成侧已经 claim 了当前 waiter。
177 auto finish_wait_ans = op.data.sem_info.sem->Wait(UINT32_MAX);
178 UNUSED(finish_wait_ans);
179 ASSERT(finish_wait_ans == ErrorCode::OK);
180 busy_.store(BusyState::IDLE, std::memory_order_release);
181 return block_result_;
182 }
183 else
184 {
185 return ErrorCode::OK;
186 }
187 }
188 else
189 {
191 }
192}
193
195{
196 ASSERT(queue_data_ != nullptr);
197
198 while (true)
199 {
200 auto is_busy = busy_.load(std::memory_order_acquire);
201
202 if (is_busy == BusyState::PENDING)
203 {
204 auto size = queue_data_->Size();
205 if (size > 0 && size >= info_.data.size_)
206 {
207 if (info_.op.type == ReadOperation::OperationType::BLOCK)
208 {
209 // Read BLOCK completion is claimed here before copying data.
210 // BLOCK 读完成在这里先 claim,再拷数据。
211 BusyState expected = BusyState::PENDING;
212 if (!busy_.compare_exchange_strong(expected, BusyState::BLOCK_CLAIMED,
213 std::memory_order_acq_rel,
214 std::memory_order_acquire))
215 {
216 continue;
217 }
218 }
219
220 if (info_.data.size_ > 0)
221 {
222 auto ans = queue_data_->PopBatch(reinterpret_cast<uint8_t*>(info_.data.addr_),
224 UNUSED(ans);
225 ASSERT(ans == ErrorCode::OK);
226 Finish(in_isr, ErrorCode::OK, info_);
227 OnRxDequeue(in_isr);
228 }
229 else
230 {
231 Finish(in_isr, ErrorCode::OK, info_);
232 }
233 }
234 return;
235 }
236
237 if (is_busy == BusyState::IDLE)
238 {
239 // Data arrived before a waiter was armed. This must be a CAS: a reader may
240 // publish PENDING after the load above, and EVENT must not overwrite it.
241 // 数据先于 waiter 到达。这里必须用 CAS:读线程可能在上面的 load 之后发布
242 // PENDING,EVENT 不能覆盖它。
243 BusyState expected = BusyState::IDLE;
244 if (busy_.compare_exchange_strong(expected, BusyState::EVENT,
245 std::memory_order_acq_rel,
246 std::memory_order_acquire))
247 {
248 return;
249 }
250 continue;
251 }
252
253 return;
254 }
255}
256
257void ReadPort::FailAndClearAll(ErrorCode reason, bool in_isr)
258{
259 ASSERT(queue_data_ != nullptr);
261
262 auto state = busy_.load(std::memory_order_acquire);
263 if (state == BusyState::PENDING)
264 {
265 if (info_.op.type == ReadOperation::OperationType::BLOCK)
266 {
267 // Backend is already unavailable. Claim the waiter and complete it with
268 // the requested failure reason instead of leaving it to timeout.
269 // 后端已经不可用。这里直接 claim waiter,并用指定错误收口,
270 // 而不是让它继续超时等待。
271 BusyState expected = BusyState::PENDING;
272 if (busy_.compare_exchange_strong(expected, BusyState::BLOCK_CLAIMED,
273 std::memory_order_acq_rel,
274 std::memory_order_acquire))
275 {
276 Finish(in_isr, reason, info_);
277 return;
278 }
279 state = expected;
280 }
281 else
282 {
283 busy_.store(BusyState::IDLE, std::memory_order_release);
284 info_.op.UpdateStatus(in_isr, reason);
285 return;
286 }
287 }
288
290 {
291 return;
292 }
293
295 busy_.store(BusyState::IDLE, std::memory_order_release);
296}
无锁队列实现 / Lock-free queue implementation
void Reset()
重置队列 / Resets the queue
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue
ErrorCode PopBatch(Data *data, size_t size)
批量弹出数据 / Pops multiple elements from the queue
union LibXR::Operation::@5 data
void UpdateStatus(bool in_isr, Status &&status)
Updates operation status based on type.
void MarkAsRunning()
标记操作为运行状态。 Marks the operation as running.
OperationType type
可写原始数据视图 / Mutable raw data view
size_t size_
数据字节数 / Data size in bytes
void * addr_
数据起始地址 / Data start address
ReadPort class for handling read operations.
Definition read_port.hpp:18
bool Readable()
Checks if read operations are supported.
Definition read_port.cpp:26
ReadFun read_fun_
Driver/backend read notification entry. 底层驱动或后端读取通知入口。
Definition read_port.hpp:49
void Finish(bool in_isr, ErrorCode ans, ReadInfoBlock &info)
完成已由队列路径认领的读取操作。 Completes a read operation already claimed by the queue path.
Definition read_port.cpp:34
ReadPort(size_t buffer_size=128)
Constructs a ReadPort with queue sizes.
Definition read_port.cpp:7
ErrorCode block_result_
Final status for the current BLOCK read.
Definition read_port.hpp:53
@ IDLE
No active waiter and no pending completion. 无等待者、无挂起完成。
std::atomic< BusyState > busy_
Shared read-progress handoff state. 共享的读进度交接状态。
Definition read_port.hpp:52
size_t EmptySize()
获取队列的剩余可用空间。 Gets the remaining available space in the queue.
Definition read_port.cpp:14
LockFreeQueue< uint8_t > * queue_data_
RX payload queue. 接收数据字节队列。
Definition read_port.hpp:50
ErrorCode operator()(RawData data, ReadOperation &op, bool in_isr=false)
读取操作符重载,用于执行读取操作。 Overloaded function call operator to perform a read operation.
Definition read_port.cpp:54
void ProcessPendingReads(bool in_isr)
Processes pending reads.
ReadPort & operator=(ReadFun fun)
赋值运算符重载,用于设置读取函数。 Overloaded assignment operator to set the read function.
Definition read_port.cpp:28
size_t Size()
获取当前队列的已使用大小。 Gets the currently used size of the queue.
Definition read_port.cpp:20
void FailAndClearAll(ErrorCode reason, bool in_isr)
失败完成并清空当前所有挂起读操作。
virtual void OnRxDequeue(bool)
RX 数据从软件队列成功出队后的通知。 Notification after bytes are popped from RX data queue.
ReadInfoBlock info_
In-flight read request metadata. 当前在途读取请求的元数据。
Definition read_port.hpp:51
void MarkAsRunning(ReadInfoBlock &info)
标记读取操作为运行中。 Marks the read operation as running.
Definition read_port.cpp:52
void PostFromCallback(bool in_isr)
从中断回调中释放(增加)信号量 Releases (increments) the semaphore from an ISR (Interrupt Service Routine)
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待(减少)信号量 Waits (decrements) the semaphore
Definition semaphore.cpp:53
LibXR 命名空间
Definition ch32_can.hpp:14
ErrorCode
定义错误码枚举
@ TIMEOUT
超时 | Timeout
@ BUSY
忙碌 | Busy
@ NOT_SUPPORT
不支持 | Not supported
@ OK
操作成功 | Operation successful
ErrorCode(* ReadFun)(ReadPort &port, bool in_isr)
Function pointer type for read notifications.
constexpr size_t CACHE_LINE_SIZE
缓存行大小 / Cache line size
Definition libxr_def.hpp:32
Read information block structure.
RawData data
Data buffer. 数据缓冲区。
ReadOperation op
Read operation instance. 读取操作实例。