libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
libxr_rw.cpp
1#include "libxr_rw.hpp"
2
3#include "mutex.hpp"
4
5using namespace LibXR;
6
9
10ReadPort::ReadPort(size_t buffer_size)
11 : queue_data_(buffer_size > 0 ? new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
12 LockFreeQueue<uint8_t>(buffer_size)
13 : nullptr)
14{
15}
16
18{
19 ASSERT(queue_data_ != nullptr);
20 return queue_data_->EmptySize();
21}
22
24{
25 ASSERT(queue_data_ != nullptr);
26 return queue_data_->Size();
27}
28
29bool ReadPort::Readable() { return read_fun_ != nullptr; }
30
32{
33 read_fun_ = fun;
34 return *this;
35}
36
37void ReadPort::Finish(bool in_isr, ErrorCode ans, ReadInfoBlock& info, uint32_t size)
38{
39 read_size_ = size;
40 busy_.store(BusyState::IDLE, std::memory_order_release);
41 info.op.UpdateStatus(in_isr, std::forward<ErrorCode>(ans));
42}
43
45
47{
48 if (Readable())
49 {
50 BusyState is_busy = busy_.load(std::memory_order_relaxed);
51
52 if (is_busy == BusyState::PENDING)
53 {
54 return ErrorCode::BUSY;
55 }
56
57 while (true)
58 {
59 busy_.store(BusyState::IDLE, std::memory_order_release);
60
61 if (queue_data_ != nullptr)
62 {
63 auto readable_size = queue_data_->Size();
64
65 if (readable_size >= data.size_ && readable_size != 0)
66 {
67 auto ans =
68 queue_data_->PopBatch(reinterpret_cast<uint8_t*>(data.addr_), data.size_);
69 UNUSED(ans);
70 read_size_ = data.size_;
71 ASSERT(ans == ErrorCode::OK);
72 if (op.type != ReadOperation::OperationType::BLOCK)
73 {
74 op.UpdateStatus(false, ErrorCode::OK);
75 }
76 return ErrorCode::OK;
77 }
78 }
79
80 info_ = ReadInfoBlock{data, op};
81
82 op.MarkAsRunning();
83
84 auto ans = read_fun_(*this);
85
86 if (ans != ErrorCode::OK)
87 {
88 BusyState expected = BusyState::IDLE;
89 if (busy_.compare_exchange_weak(expected, BusyState::PENDING,
90 std::memory_order_acq_rel,
91 std::memory_order_acquire))
92 {
93 break;
94 }
95 else
96 {
97 continue;
98 }
99 }
100 else
101 {
102 read_size_ = data.size_;
103 if (op.type != ReadOperation::OperationType::BLOCK)
104 {
105 op.UpdateStatus(false, ErrorCode::OK);
106 }
107 return ErrorCode::OK;
108 }
109 }
110
111 if (op.type == ReadOperation::OperationType::BLOCK)
112 {
113 return op.data.sem_info.sem->Wait(op.data.sem_info.timeout);
114 }
115 else
116 {
117 return ErrorCode::OK;
118 }
119 }
120 else
121 {
122 return ErrorCode::NOT_SUPPORT;
123 }
124}
125
127{
128 ASSERT(queue_data_ != nullptr);
129
130 if (in_isr)
131 {
132 auto is_busy = busy_.load(std::memory_order_relaxed);
133
134 if (is_busy == BusyState::PENDING)
135 {
136 if (queue_data_->Size() >= info_.data.size_)
137 {
138 if (info_.data.size_ > 0)
139 {
140 auto ans = queue_data_->PopBatch(reinterpret_cast<uint8_t*>(info_.data.addr_),
141 info_.data.size_);
142 UNUSED(ans);
143 ASSERT(ans == ErrorCode::OK);
144 }
145 Finish(in_isr, ErrorCode::OK, info_, info_.data.size_);
146 }
147 }
148 else if (is_busy == BusyState::IDLE)
149 {
150 busy_.store(BusyState::EVENT, std::memory_order_release);
151 }
152 }
153 else
154 {
155 if (busy_.load(std::memory_order_relaxed) == BusyState::PENDING)
156 {
157 if (queue_data_->Size() >= info_.data.size_)
158 {
159 if (info_.data.size_ > 0)
160 {
161 auto ans = queue_data_->PopBatch(reinterpret_cast<uint8_t*>(info_.data.addr_),
162 info_.data.size_);
163 UNUSED(ans);
164 ASSERT(ans == ErrorCode::OK);
165 }
166 Finish(in_isr, ErrorCode::OK, info_, info_.data.size_);
167 }
168 }
169 }
170}
171
173{
174 ASSERT(queue_data_ != nullptr);
175 queue_data_->Reset();
176}
177
178WritePort::WritePort(size_t queue_size, size_t buffer_size)
179 : queue_info_(new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
180 LockFreeQueue<WriteInfoBlock>(queue_size)),
181 queue_data_(buffer_size > 0 ? new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
182 LockFreeQueue<uint8_t>(buffer_size)
183 : nullptr)
184{
185}
186
188{
189 ASSERT(queue_data_ != nullptr);
190 return queue_data_->EmptySize();
191}
192
194{
195 ASSERT(queue_data_ != nullptr);
196 return queue_data_->Size();
197}
198
199bool WritePort::Writable() { return write_fun_ != nullptr; }
200
202{
203 write_fun_ = fun;
204 return *this;
205}
206
207void WritePort::Finish(bool in_isr, ErrorCode ans, WriteInfoBlock& info, uint32_t size)
208{
209 write_size_ = size;
210 info.op.UpdateStatus(in_isr, std::forward<ErrorCode>(ans));
211}
212
214
216{
217 if (Writable())
218 {
219 if (data.size_ == 0)
220 {
221 write_size_ = 0;
222 if (op.type != WriteOperation::OperationType::BLOCK)
223 {
224 op.UpdateStatus(false, ErrorCode::OK);
225 }
226 return ErrorCode::OK;
227 }
228
229 LockState expected = LockState::UNLOCKED;
230 if (!lock_.compare_exchange_strong(expected, LockState::LOCKED))
231 {
232 return ErrorCode::BUSY;
233 }
234
235 return CommitWrite(data, op);
236 }
237 else
238 {
239 return ErrorCode::NOT_SUPPORT;
240 }
241}
242
243ErrorCode WritePort::CommitWrite(ConstRawData data, WriteOperation& op, bool meta_pushed)
244{
245 if (!meta_pushed && queue_info_->EmptySize() < 1)
246 {
247 lock_.store(LockState::UNLOCKED, std::memory_order_release);
248 return ErrorCode::FULL;
249 }
250
251 if (queue_data_)
252 {
253 ErrorCode ans = ErrorCode::OK;
254 if (!meta_pushed)
255 {
256 if (queue_data_->EmptySize() < data.size_)
257 {
258 lock_.store(LockState::UNLOCKED, std::memory_order_release);
259 return ErrorCode::FULL;
260 }
261
262 ans = queue_data_->PushBatch(reinterpret_cast<const uint8_t*>(data.addr_),
263 data.size_);
264 UNUSED(ans);
265 ASSERT(ans == ErrorCode::OK);
266
267 WriteInfoBlock info{data, op};
268 ans = queue_info_->Push(info);
269
270 ASSERT(ans == ErrorCode::OK);
271 }
272
273 op.MarkAsRunning();
274
275 ans = write_fun_(*this);
276
277 if (!meta_pushed)
278 {
279 lock_.store(LockState::UNLOCKED, std::memory_order_release);
280 }
281
282 if (ans == ErrorCode::OK)
283 {
284 write_size_ = data.size_;
285 if (op.type != WriteOperation::OperationType::BLOCK)
286 {
287 op.UpdateStatus(false, ErrorCode::OK);
288 }
289 return ErrorCode::OK;
290 }
291
292 if (op.type == WriteOperation::OperationType::BLOCK)
293 {
294 return op.data.sem_info.sem->Wait(op.data.sem_info.timeout);
295 }
296
297 return ErrorCode::OK;
298 }
299 else
300 {
301 WriteInfoBlock info{data, op};
302 auto ans = queue_info_->Push(info);
303
304 ASSERT(ans == ErrorCode::OK);
305
306 op.MarkAsRunning();
307
308 ans = write_fun_(*this);
309
310 lock_.store(LockState::UNLOCKED, std::memory_order_release);
311
312 if (ans == ErrorCode::OK)
313 {
314 write_size_ = data.size_;
315 if (op.type != WriteOperation::OperationType::BLOCK)
316 {
317 op.UpdateStatus(false, ErrorCode::OK);
318 }
319 return ErrorCode::OK;
320 }
321
322 if (op.type == WriteOperation::OperationType::BLOCK)
323 {
324 return op.data.sem_info.sem->Wait(op.data.sem_info.timeout);
325 }
326 else
327 {
328 return ErrorCode::OK;
329 }
330 }
331}
332
334{
335 ASSERT(queue_data_ != nullptr);
336 queue_data_->Reset();
337 queue_info_->Reset();
338}
339
341 : port_(port), op_(op)
342{
343 if (!port->queue_data_ || !port->Writable())
344 {
345 fallback_to_normal_write_ = true;
346 return;
347 }
348 LockState expected = LockState::UNLOCKED;
349 if (port_->lock_.compare_exchange_strong(expected, LockState::LOCKED))
350 {
351 if (port_->queue_info_->EmptySize() < 1)
352 {
353 locked_ = false;
354 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
355 return;
356 }
357 locked_ = true;
358 cap_ = port_->queue_data_->EmptySize();
359 }
360}
361
363{
364 if (locked_ && size_ > 0)
365 {
366 port_->queue_info_->Push(WriteInfoBlock{RawData{nullptr, size_}, op_});
367 port_->CommitWrite({nullptr, size_}, op_, true);
368 }
369
370 if (locked_)
371 {
372 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
373 }
374}
375
377{
378 if (fallback_to_normal_write_)
379 {
380 (*port_)(data, op_);
381 }
382 else
383 {
384 if (!locked_)
385 {
386 LockState expected = LockState::UNLOCKED;
387 if (port_->lock_.compare_exchange_strong(expected, LockState::LOCKED))
388 {
389 if (port_->queue_info_->EmptySize() < 1)
390 {
391 locked_ = false;
392 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
393 return *this;
394 }
395 else
396 {
397 locked_ = true;
398 cap_ = port_->queue_data_->EmptySize();
399 }
400 }
401 else
402 {
403 return *this;
404 }
405 }
406 if (size_ + data.size_ <= cap_)
407 {
408 port_->queue_data_->PushBatch(reinterpret_cast<const uint8_t*>(data.addr_),
409 data.size_);
410 size_ += data.size_;
411 }
412 }
413
414 return *this;
415}
416
418{
419 auto ans = ErrorCode::OK;
420 if (!fallback_to_normal_write_)
421 {
422 if (locked_ && size_ > 0)
423 {
424 ans = port_->queue_info_->Push(WriteInfoBlock{RawData{nullptr, size_}, op_});
425 ASSERT(ans == ErrorCode::OK);
426 ans = port_->CommitWrite({nullptr, size_}, op_, true);
427 ASSERT(ans == ErrorCode::OK);
428 size_ = 0;
429 }
430
431 if (port_->queue_info_->EmptySize() < 1)
432 {
433 locked_ = false;
434 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
435 }
436 else
437 {
438 cap_ = port_->queue_data_->EmptySize();
439 }
440 }
441
442 return ans;
443}
444
445// NOLINTNEXTLINE
446int STDIO::Printf(const char* fmt, ...)
447{
448#if LIBXR_PRINTF_BUFFER_SIZE > 0
450 {
451 return -1;
452 }
453
454 if (!write_mutex_)
455 {
456 write_mutex_ = new LibXR::Mutex();
457 }
458
459 LibXR::Mutex::LockGuard lock_guard(*write_mutex_);
460
461 va_list args;
462 va_start(args, fmt);
463 int len = vsnprintf(STDIO::printf_buff_, LIBXR_PRINTF_BUFFER_SIZE, fmt, args);
464 va_end(args);
465
466 // Check result and limit length
467 if (len < 0)
468 {
469 return -1;
470 }
471 if (static_cast<size_t>(len) >= LIBXR_PRINTF_BUFFER_SIZE)
472 {
473 len = LIBXR_PRINTF_BUFFER_SIZE - 1;
474 }
475
476 ConstRawData data = {reinterpret_cast<const uint8_t*>(STDIO::printf_buff_),
477 static_cast<size_t>(len)};
478
479 static WriteOperation op; // NOLINT
480 if (write_stream_ == nullptr)
481 {
482 return static_cast<int>(STDIO::write_->operator()(data, op));
483 }
484 else
485 {
486 (*write_stream_) << data;
487 return static_cast<int>(write_stream_->Commit());
488 }
489#else
490 UNUSED(fmt);
491 return 0;
492#endif
493}
常量原始数据封装类。 A class for encapsulating constant raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
const void * addr_
数据存储地址(常量)。 The storage address of the data (constant).
无锁队列实现 / Lock-free queue implementation
void Reset()
重置队列 / Resets the queue
ErrorCode PushBatch(const Data *data, size_t size)
批量推入数据 / Pushes multiple elements into the queue
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue
ErrorCode PopBatch(Data *data, size_t size)
批量弹出数据 / Pops multiple elements from the queue
互斥锁的 RAII 机制封装 (RAII-style mechanism for automatic mutex management).
Definition mutex.hpp:65
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
Definition mutex.hpp:18
void MarkAsRunning()
标记操作为运行状态。 Marks the operation as running.
Definition libxr_rw.hpp:202
void UpdateStatus(bool in_isr, Status &&...status)
Updates operation status based on type.
Definition libxr_rw.hpp:171
union LibXR::Operation::@4 data
OperationType type
Definition libxr_rw.hpp:226
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
ReadPort class for handling read operations.
Definition libxr_rw.hpp:269
bool Readable()
Checks if read operations are supported.
Definition libxr_rw.cpp:29
ReadPort(size_t buffer_size=128)
Constructs a ReadPort with queue sizes.
Definition libxr_rw.cpp:10
void Finish(bool in_isr, ErrorCode ans, ReadInfoBlock &info, uint32_t size)
更新读取操作的状态。 Updates the status of the read operation.
Definition libxr_rw.cpp:37
virtual void Reset()
Resets the ReadPort.
Definition libxr_rw.cpp:172
virtual size_t EmptySize()
获取队列的剩余可用空间。 Gets the remaining available space in the queue.
Definition libxr_rw.cpp:17
ErrorCode operator()(RawData data, ReadOperation &op)
读取操作符重载,用于执行读取操作。 Overloaded function call operator to perform a read operation.
Definition libxr_rw.cpp:46
virtual void ProcessPendingReads(bool in_isr)
Processes pending reads.
Definition libxr_rw.cpp:126
ReadPort & operator=(ReadFun fun)
赋值运算符重载,用于设置读取函数。 Overloaded assignment operator to set the read function.
Definition libxr_rw.cpp:31
virtual size_t Size()
获取当前队列的已使用大小。 Gets the currently used size of the queue.
Definition libxr_rw.cpp:23
void MarkAsRunning(ReadInfoBlock &info)
标记读取操作为运行中。 Marks the read operation as running.
Definition libxr_rw.cpp:44
static int Printf(const char *fmt,...)
Prints a formatted string to the write port (like printf).
Definition libxr_rw.cpp:446
static WritePort * write_
Write port instance. 写入端口。
Definition libxr_rw.hpp:600
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待(减少)信号量 Waits (decrements) the semaphore
Definition semaphore.cpp:25
WritePort 的流式写入操作器,支持链式 << 操作和批量提交。
Definition libxr_rw.hpp:429
bool locked_
是否持有写锁 Whether write lock is held
Definition libxr_rw.hpp:472
ErrorCode Commit()
手动提交已写入的数据到队列,并尝试续锁。
Definition libxr_rw.cpp:417
size_t cap_
当前队列容量 Current queue capacity
Definition libxr_rw.hpp:470
LibXR::WritePort * port_
写端口指针 Pointer to the WritePort
Definition libxr_rw.hpp:468
~Stream()
析构时自动提交已累积的数据并释放锁。
Definition libxr_rw.cpp:362
Stream(LibXR::WritePort *port, LibXR::WriteOperation op)
构造流写入对象,并尝试锁定端口。
Definition libxr_rw.cpp:340
Stream & operator<<(const ConstRawData &data)
追加写入数据,支持链式调用。
Definition libxr_rw.cpp:376
WritePort class for handling write operations.
Definition libxr_rw.hpp:403
virtual size_t EmptySize()
获取数据队列的剩余可用空间。 Gets the remaining available space in the data queue.
Definition libxr_rw.cpp:187
virtual size_t Size()
获取当前数据队列的已使用大小。 Gets the used size of the current data queue.
Definition libxr_rw.cpp:193
ErrorCode operator()(ConstRawData data, WriteOperation &op)
执行写入操作。 Performs a write operation.
Definition libxr_rw.cpp:215
WritePort(size_t queue_size=3, size_t buffer_size=128)
构造一个新的 WritePort 对象。 Constructs a new WritePort object.
Definition libxr_rw.cpp:178
WritePort & operator=(WriteFun fun)
赋值运算符重载,用于设置写入函数。 Overloaded assignment operator to set the write function.
Definition libxr_rw.cpp:201
bool Writable()
判断端口是否可写。 Checks whether the port is writable.
Definition libxr_rw.cpp:199
void MarkAsRunning(WriteOperation &op)
标记写入操作为运行中。 Marks the write operation as running.
Definition libxr_rw.cpp:213
virtual void Reset()
Resets the WritePort.
Definition libxr_rw.cpp:333
void Finish(bool in_isr, ErrorCode ans, WriteInfoBlock &info, uint32_t size)
更新写入操作的状态。 Updates the status of the write operation.
Definition libxr_rw.cpp:207
LibXR 命名空间
Definition ch32_gpio.hpp:9
ErrorCode(* ReadFun)(ReadPort &port)
Function pointer type for read operations.
Definition libxr_rw.hpp:246
ErrorCode(* WriteFun)(WritePort &port)
Function pointer type for write operations.
Definition libxr_rw.hpp:242
Read information block structure.
Definition libxr_rw.hpp:253
RawData data
Data buffer. 数据缓冲区。
Definition libxr_rw.hpp:254
ReadOperation op
Read operation instance. 读取操作实例。
Definition libxr_rw.hpp:255
WriteOperation op
Write operation instance. 写入操作实例。
Definition libxr_rw.hpp:261