libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
libxr_rw.cpp
1#include "libxr_rw.hpp"
2
3#include "mutex.hpp"
4
5using namespace LibXR;
6
9
10ReadPort::ReadPort(size_t buffer_size)
11 : queue_data_(buffer_size > 0 ? new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
12 LockFreeQueue<uint8_t>(buffer_size)
13 : nullptr)
14{
15}
16
18{
19 ASSERT(queue_data_ != nullptr);
20 return queue_data_->EmptySize();
21}
22
24{
25 ASSERT(queue_data_ != nullptr);
26 return queue_data_->Size();
27}
28
29bool ReadPort::Readable() { return read_fun_ != nullptr; }
30
32{
33 read_fun_ = fun;
34 return *this;
35}
36
37void ReadPort::Finish(bool in_isr, ErrorCode ans, ReadInfoBlock& info, uint32_t size)
38{
39 read_size_ = size;
40 busy_.store(BusyState::IDLE, std::memory_order_release);
41 info.op.UpdateStatus(in_isr, std::forward<ErrorCode>(ans));
42}
43
45
47{
48 if (Readable())
49 {
50 BusyState is_busy = busy_.load(std::memory_order_relaxed);
51
52 if (is_busy == BusyState::PENDING)
53 {
54 return ErrorCode::BUSY;
55 }
56
57 while (true)
58 {
59 busy_.store(BusyState::IDLE, std::memory_order_release);
60
61 if (queue_data_ != nullptr)
62 {
63 auto readable_size = queue_data_->Size();
64
65 if (readable_size >= data.size_ && readable_size != 0)
66 {
67 auto ans =
68 queue_data_->PopBatch(reinterpret_cast<uint8_t*>(data.addr_), data.size_);
69 UNUSED(ans);
70 read_size_ = data.size_;
71 ASSERT(ans == ErrorCode::OK);
72 if (op.type != ReadOperation::OperationType::BLOCK)
73 {
74 op.UpdateStatus(false, ErrorCode::OK);
75 }
76 return ErrorCode::OK;
77 }
78 }
79
80 info_ = ReadInfoBlock{data, op};
81
82 op.MarkAsRunning();
83
84 auto ans = read_fun_(*this);
85
86 if (ans != ErrorCode::OK)
87 {
88 BusyState expected = BusyState::IDLE;
89 if (busy_.compare_exchange_weak(expected, BusyState::PENDING,
90 std::memory_order_acq_rel,
91 std::memory_order_acquire))
92 {
93 break;
94 }
95 else
96 {
97 expected = BusyState::PENDING;
98 continue;
99 }
100 }
101 else
102 {
103 read_size_ = data.size_;
104 if (op.type != ReadOperation::OperationType::BLOCK)
105 {
106 op.UpdateStatus(false, ErrorCode::OK);
107 }
108 return ErrorCode::OK;
109 }
110 }
111
112 if (op.type == ReadOperation::OperationType::BLOCK)
113 {
114 return op.data.sem_info.sem->Wait(op.data.sem_info.timeout);
115 }
116 else
117 {
118 return ErrorCode::OK;
119 }
120 }
121 else
122 {
123 return ErrorCode::NOT_SUPPORT;
124 }
125}
126
128{
129 ASSERT(queue_data_ != nullptr);
130
131 if (in_isr)
132 {
133 auto is_busy = busy_.load(std::memory_order_relaxed);
134
135 if (is_busy == BusyState::PENDING)
136 {
137 if (queue_data_->Size() >= info_.data.size_)
138 {
139 if (info_.data.size_ > 0)
140 {
141 auto ans = queue_data_->PopBatch(reinterpret_cast<uint8_t*>(info_.data.addr_),
142 info_.data.size_);
143 UNUSED(ans);
144 ASSERT(ans == ErrorCode::OK);
145 }
146 Finish(in_isr, ErrorCode::OK, info_, info_.data.size_);
147 }
148 }
149 else if (is_busy == BusyState::IDLE)
150 {
151 busy_.store(BusyState::EVENT, std::memory_order_release);
152 }
153 }
154 else
155 {
156 if (busy_.load(std::memory_order_relaxed) == BusyState::PENDING)
157 {
158 if (queue_data_->Size() >= info_.data.size_)
159 {
160 if (info_.data.size_ > 0)
161 {
162 auto ans = queue_data_->PopBatch(reinterpret_cast<uint8_t*>(info_.data.addr_),
163 info_.data.size_);
164 UNUSED(ans);
165 ASSERT(ans == ErrorCode::OK);
166 }
167 Finish(in_isr, ErrorCode::OK, info_, info_.data.size_);
168 }
169 }
170 }
171}
172
174{
175 ASSERT(queue_data_ != nullptr);
176 queue_data_->Reset();
177}
178
179WritePort::WritePort(size_t queue_size, size_t buffer_size)
180 : queue_info_(new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
181 LockFreeQueue<WriteInfoBlock>(queue_size)),
182 queue_data_(buffer_size > 0 ? new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
183 LockFreeQueue<uint8_t>(buffer_size)
184 : nullptr)
185{
186}
187
189{
190 ASSERT(queue_data_ != nullptr);
191 return queue_data_->EmptySize();
192}
193
195{
196 ASSERT(queue_data_ != nullptr);
197 return queue_data_->Size();
198}
199
200bool WritePort::Writable() { return write_fun_ != nullptr; }
201
203{
204 write_fun_ = fun;
205 return *this;
206}
207
208void WritePort::Finish(bool in_isr, ErrorCode ans, WriteInfoBlock& info, uint32_t size)
209{
210 write_size_ = size;
211 info.op.UpdateStatus(in_isr, std::forward<ErrorCode>(ans));
212}
213
215
217{
218 if (Writable())
219 {
220 if (data.size_ == 0)
221 {
222 write_size_ = 0;
223 if (op.type != WriteOperation::OperationType::BLOCK)
224 {
225 op.UpdateStatus(false, ErrorCode::OK);
226 }
227 return ErrorCode::OK;
228 }
229
230 LockState expected = LockState::UNLOCKED;
231 if (!lock_.compare_exchange_strong(expected, LockState::LOCKED))
232 {
233 return ErrorCode::BUSY;
234 }
235
236 return CommitWrite(data, op);
237 }
238 else
239 {
240 return ErrorCode::NOT_SUPPORT;
241 }
242}
243
244ErrorCode WritePort::CommitWrite(ConstRawData data, WriteOperation& op, bool pushed)
245{
246 if (!pushed && queue_info_->EmptySize() < 1)
247 {
248 lock_.store(LockState::UNLOCKED, std::memory_order_release);
249 return ErrorCode::FULL;
250 }
251
252 if (queue_data_)
253 {
254 ErrorCode ans = ErrorCode::OK;
255 if (!pushed)
256 {
257 if (queue_data_->EmptySize() < data.size_)
258 {
259 lock_.store(LockState::UNLOCKED, std::memory_order_release);
260 return ErrorCode::FULL;
261 }
262
263 ans = queue_data_->PushBatch(reinterpret_cast<const uint8_t*>(data.addr_),
264 data.size_);
265 UNUSED(ans);
266 ASSERT(ans == ErrorCode::OK);
267
268 WriteInfoBlock info{data, op};
269 ans = queue_info_->Push(info);
270
271 ASSERT(ans == ErrorCode::OK);
272 }
273
274 op.MarkAsRunning();
275
276 ans = write_fun_(*this);
277
278 lock_.store(LockState::UNLOCKED, std::memory_order_release);
279
280 if (ans == ErrorCode::OK)
281 {
282 write_size_ = data.size_;
283 if (op.type != WriteOperation::OperationType::BLOCK)
284 {
285 op.UpdateStatus(false, ErrorCode::OK);
286 }
287 return ErrorCode::OK;
288 }
289
290 if (op.type == WriteOperation::OperationType::BLOCK)
291 {
292 return op.data.sem_info.sem->Wait(op.data.sem_info.timeout);
293 }
294
295 return ErrorCode::OK;
296 }
297 else
298 {
299 WriteInfoBlock info{data, op};
300 auto ans = queue_info_->Push(info);
301
302 ASSERT(ans == ErrorCode::OK);
303
304 op.MarkAsRunning();
305
306 ans = write_fun_(*this);
307
308 lock_.store(LockState::UNLOCKED, std::memory_order_release);
309
310 if (ans == ErrorCode::OK)
311 {
312 write_size_ = data.size_;
313 if (op.type != WriteOperation::OperationType::BLOCK)
314 {
315 op.UpdateStatus(false, ErrorCode::OK);
316 }
317 return ErrorCode::OK;
318 }
319
320 if (op.type == WriteOperation::OperationType::BLOCK)
321 {
322 return op.data.sem_info.sem->Wait(op.data.sem_info.timeout);
323 }
324 else
325 {
326 return ErrorCode::OK;
327 }
328 }
329}
330
333 ASSERT(queue_data_ != nullptr);
334 queue_data_->Reset();
335 queue_info_->Reset();
336}
337
339 : port_(port), op_(op)
340{
341 if (!port->queue_data_ || !port->Writable())
342 {
343 fallback_to_normal_write_ = true;
344 return;
345 }
346 LockState expected = LockState::UNLOCKED;
347 if (port_->lock_.compare_exchange_strong(expected, LockState::LOCKED))
348 {
349 if (port_->queue_info_->EmptySize() < 1)
350 {
351 locked_ = false;
352 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
353 return;
354 }
355 locked_ = true;
356 cap_ = port_->queue_data_->EmptySize();
357 }
358}
359
361{
362 if (locked_ && size_ > 0)
363 {
364 port_->queue_info_->Push(WriteInfoBlock{RawData{nullptr, size_}, op_});
365 port_->CommitWrite({nullptr, size_}, op_, true);
366 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
367 }
368}
369
371{
372 if (fallback_to_normal_write_)
373 {
374 (*port_)(data, op_);
375 }
376 else
377 {
378 if (!locked_)
379 {
380 LockState expected = LockState::UNLOCKED;
381 if (port_->lock_.compare_exchange_strong(expected, LockState::LOCKED))
382 {
383 locked_ = true;
384 cap_ = port_->queue_data_->EmptySize();
385 }
386 else
387 {
388 return *this;
389 }
390 }
391 if (size_ + data.size_ <= cap_)
392 {
393 port_->queue_data_->PushBatch(reinterpret_cast<const uint8_t*>(data.addr_),
394 data.size_);
395 size_ += data.size_;
396 }
397 }
398
399 return *this;
400}
401
403{
404 auto ans = ErrorCode::OK;
405 if (!fallback_to_normal_write_)
406 {
407 if (locked_ && size_ > 0)
408 {
409 port_->queue_info_->Push(WriteInfoBlock{RawData{nullptr, size_}, op_});
410 ans = port_->CommitWrite({nullptr, size_}, op_, true);
411 size_ = 0;
412 }
413
414 if (port_->queue_info_->EmptySize() < 1)
415 {
416 locked_ = false;
417 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
418 }
419 else
420 {
421 cap_ = port_->queue_data_->EmptySize();
422 }
423 }
424
425 return ans;
426}
427
428// NOLINTNEXTLINE
429int STDIO::Printf(const char* fmt, ...)
430{
431#if LIBXR_PRINTF_BUFFER_SIZE > 0
433 {
434 return -1;
435 }
436
437 if (!write_mutex_)
438 {
439 write_mutex_ = new LibXR::Mutex();
440 }
441
442 LibXR::Mutex::LockGuard lock_guard(*write_mutex_);
443
444 va_list args;
445 va_start(args, fmt);
446 int len = vsnprintf(STDIO::printf_buff_, LIBXR_PRINTF_BUFFER_SIZE, fmt, args);
447 va_end(args);
448
449 // Check result and limit length
450 if (len < 0)
451 {
452 return -1;
453 }
454 if (static_cast<size_t>(len) >= LIBXR_PRINTF_BUFFER_SIZE)
455 {
456 len = LIBXR_PRINTF_BUFFER_SIZE - 1;
457 }
458
459 ConstRawData data = {reinterpret_cast<const uint8_t*>(STDIO::printf_buff_),
460 static_cast<size_t>(len)};
461
462 static WriteOperation op; // NOLINT
463 if (write_stream_ == nullptr)
464 {
465 return static_cast<int>(STDIO::write_->operator()(data, op));
466 }
467 else
468 {
469 (*write_stream_) << data;
470 return static_cast<int>(write_stream_->Commit());
471 }
472#else
473 UNUSED(fmt);
474 return 0;
475#endif
476}
常量原始数据封装类。 A class for encapsulating constant raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
const void * addr_
数据存储地址(常量)。 The storage address of the data (constant).
无锁队列实现 / Lock-free queue implementation
void Reset()
重置队列 / Resets the queue
ErrorCode PushBatch(const Data *data, size_t size)
批量推入数据 / Pushes multiple elements into the queue
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue
ErrorCode PopBatch(Data *data, size_t size)
批量弹出数据 / Pops multiple elements from the queue
互斥锁的 RAII 机制封装 (RAII-style mechanism for automatic mutex management).
Definition mutex.hpp:65
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
Definition mutex.hpp:18
void MarkAsRunning()
标记操作为运行状态。 Marks the operation as running.
Definition libxr_rw.hpp:202
void UpdateStatus(bool in_isr, Status &&...status)
Updates operation status based on type.
Definition libxr_rw.hpp:171
union LibXR::Operation::@4 data
OperationType type
Definition libxr_rw.hpp:225
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
ReadPort class for handling read operations.
Definition libxr_rw.hpp:268
bool Readable()
Checks if read operations are supported.
Definition libxr_rw.cpp:29
ReadPort(size_t buffer_size=128)
Constructs a ReadPort with queue sizes.
Definition libxr_rw.cpp:10
void Finish(bool in_isr, ErrorCode ans, ReadInfoBlock &info, uint32_t size)
更新读取操作的状态。 Updates the status of the read operation.
Definition libxr_rw.cpp:37
virtual void Reset()
Resets the ReadPort.
Definition libxr_rw.cpp:173
virtual size_t EmptySize()
获取队列的剩余可用空间。 Gets the remaining available space in the queue.
Definition libxr_rw.cpp:17
ErrorCode operator()(RawData data, ReadOperation &op)
读取操作符重载,用于执行读取操作。 Overloaded function call operator to perform a read operation.
Definition libxr_rw.cpp:46
virtual void ProcessPendingReads(bool in_isr)
Processes pending reads.
Definition libxr_rw.cpp:127
ReadPort & operator=(ReadFun fun)
赋值运算符重载,用于设置读取函数。 Overloaded assignment operator to set the read function.
Definition libxr_rw.cpp:31
virtual size_t Size()
获取当前队列的已使用大小。 Gets the currently used size of the queue.
Definition libxr_rw.cpp:23
void MarkAsRunning(ReadInfoBlock &info)
标记读取操作为运行中。 Marks the read operation as running.
Definition libxr_rw.cpp:44
static int Printf(const char *fmt,...)
Prints a formatted string to the write port (like printf).
Definition libxr_rw.cpp:429
static WritePort * write_
Write port instance. 写入端口。
Definition libxr_rw.hpp:599
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待(减少)信号量 Waits (decrements) the semaphore
Definition semaphore.cpp:25
WritePort 的流式写入操作器,支持链式 << 操作和批量提交。
Definition libxr_rw.hpp:428
bool locked_
是否持有写锁 Whether write lock is held
Definition libxr_rw.hpp:471
ErrorCode Commit()
手动提交已写入的数据到队列,并尝试续锁。
Definition libxr_rw.cpp:402
size_t cap_
当前队列容量 Current queue capacity
Definition libxr_rw.hpp:469
LibXR::WritePort * port_
写端口指针 Pointer to the WritePort
Definition libxr_rw.hpp:467
~Stream()
析构时自动提交已累积的数据并释放锁。
Definition libxr_rw.cpp:360
Stream(LibXR::WritePort *port, LibXR::WriteOperation op)
构造流写入对象,并尝试锁定端口。
Definition libxr_rw.cpp:338
Stream & operator<<(const ConstRawData &data)
追加写入数据,支持链式调用。
Definition libxr_rw.cpp:370
WritePort class for handling write operations.
Definition libxr_rw.hpp:402
virtual size_t EmptySize()
获取数据队列的剩余可用空间。 Gets the remaining available space in the data queue.
Definition libxr_rw.cpp:188
virtual size_t Size()
获取当前数据队列的已使用大小。 Gets the used size of the current data queue.
Definition libxr_rw.cpp:194
ErrorCode operator()(ConstRawData data, WriteOperation &op)
执行写入操作。 Performs a write operation.
Definition libxr_rw.cpp:216
WritePort(size_t queue_size=3, size_t buffer_size=128)
构造一个新的 WritePort 对象。 Constructs a new WritePort object.
Definition libxr_rw.cpp:179
WritePort & operator=(WriteFun fun)
赋值运算符重载,用于设置写入函数。 Overloaded assignment operator to set the write function.
Definition libxr_rw.cpp:202
bool Writable()
判断端口是否可写。 Checks whether the port is writable.
Definition libxr_rw.cpp:200
void MarkAsRunning(WriteOperation &op)
标记写入操作为运行中。 Marks the write operation as running.
Definition libxr_rw.cpp:214
virtual void Reset()
Resets the WritePort.
Definition libxr_rw.cpp:331
void Finish(bool in_isr, ErrorCode ans, WriteInfoBlock &info, uint32_t size)
更新写入操作的状态。 Updates the status of the write operation.
Definition libxr_rw.cpp:208
LibXR 命名空间
Definition ch32_gpio.hpp:9
ErrorCode(* ReadFun)(ReadPort &port)
Function pointer type for read operations.
Definition libxr_rw.hpp:245
ErrorCode(* WriteFun)(WritePort &port)
Function pointer type for write operations.
Definition libxr_rw.hpp:241
Read information block structure.
Definition libxr_rw.hpp:252
RawData data
Data buffer. 数据缓冲区。
Definition libxr_rw.hpp:253
ReadOperation op
Read operation instance. 读取操作实例。
Definition libxr_rw.hpp:254