libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
libxr_rw.hpp
1#pragma once
2
3#include <cstdarg>
4#include <cstddef>
5#include <cstdint>
6#include <cstdio>
7#include <utility>
8
9#include "libxr_cb.hpp"
10#include "libxr_def.hpp"
11#include "libxr_type.hpp"
12#include "lockfree_queue.hpp"
13#include "mutex.hpp"
14#include "queue.hpp"
15#include "semaphore.hpp"
16
17namespace LibXR
18{
19
27template <typename... Args>
29{
30 public:
31 using Callback = LibXR::Callback<Args...>;
32
35 enum class OperationType : uint8_t
36 {
37 CALLBACK,
38 BLOCK,
39 POLLING,
40 NONE
41 };
42
45 enum class OperationPollingStatus : uint8_t
46 {
47 READY,
48 RUNNING,
49 DONE
50 };
51
55 memset(&data, 0, sizeof(data));
56 }
57
64 Operation(Semaphore &sem, uint32_t timeout = UINT32_MAX) : type(OperationType::BLOCK)
65 {
66 data.sem_info.sem = &sem;
67 data.sem_info.timeout = timeout;
68 }
69
75 Operation(Callback &callback) : type(OperationType::CALLBACK)
76 {
77 data.callback = &callback;
78 }
79
86 {
87 data.status = &status;
88 }
89
97 {
98 if (this != &op)
99 {
100 type = op.type;
101 switch (type)
102 {
103 case OperationType::CALLBACK:
104 data.callback = op.data.callback;
105 break;
106 case OperationType::BLOCK:
107 data.sem_info.sem = op.data.sem_info.sem;
108 data.sem_info.timeout = op.data.sem_info.timeout;
109 break;
110 case OperationType::POLLING:
111 data.status = op.data.status;
112 break;
113 case OperationType::NONE:
114 break;
115 }
116 }
117 return *this;
118 }
119
127 {
128 if (this != &op)
129 {
130 type = op.type;
131 switch (type)
132 {
133 case OperationType::CALLBACK:
134 data.callback = op.data.callback;
135 break;
136 case OperationType::BLOCK:
137 data.sem_info.sem = op.data.sem_info.sem;
138 data.sem_info.timeout = op.data.sem_info.timeout;
139 break;
140 case OperationType::POLLING:
141 data.status = op.data.status;
142 break;
143 case OperationType::NONE:
144 break;
145 }
146 }
147 return *this;
148 }
149
158 template <
159 typename InitOperation,
160 typename = std::enable_if_t<std::is_same_v<std::decay_t<InitOperation>, Operation>>>
161 Operation(InitOperation &&op)
162 {
163 *this = std::forward<InitOperation>(op);
164 }
165
172 template <typename... Status>
173 void UpdateStatus(bool in_isr, Status &&...status)
174 {
175 switch (type)
176 {
177 case OperationType::CALLBACK:
178 data.callback->Run(in_isr, std::forward<Args>(status)...);
179 break;
180 case OperationType::BLOCK:
181 data.sem_info.sem->PostFromCallback(in_isr);
182 break;
183 case OperationType::POLLING:
184 *data.status = OperationPollingStatus::DONE;
185 break;
186 case OperationType::NONE:
187 break;
188 }
189 }
190
205 {
206 if (type == OperationType::POLLING)
207 {
208 *data.status = OperationPollingStatus::RUNNING;
209 }
210 }
211
214 union
215 {
216 Callback *callback;
217 struct
218 {
219 Semaphore *sem;
220 uint32_t timeout;
221 } sem_info;
224
228};
229
230class ReadPort;
231class WritePort;
232
236
240
243typedef ErrorCode (*WriteFun)(WritePort &port);
244
247typedef ErrorCode (*ReadFun)(ReadPort &port);
248
258
259typedef struct
260{
261 ConstRawData data;
264
270{
271 public:
272 enum class BusyState : uint32_t
273 {
274 Idle = 0,
275 Pending = 1,
276 Event = 2
277 };
278
279 ReadFun read_fun_ = nullptr;
280 LockFreeQueue<uint8_t> *queue_data_ = nullptr;
281 size_t read_size_ = 0;
282 Mutex mutex_;
283 ReadInfoBlock info_;
284 std::atomic<BusyState> busy_{BusyState::Idle};
285
292 ReadPort(size_t buffer_size = 128)
293 : queue_data_(buffer_size > 0 ? new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
294 LockFreeQueue<uint8_t>(buffer_size)
295 : nullptr)
296 {
297 }
298
309 virtual size_t EmptySize()
310 {
311 ASSERT(queue_data_ != nullptr);
312 return queue_data_->EmptySize();
313 }
314
325 virtual size_t Size()
326 {
327 ASSERT(queue_data_ != nullptr);
328 return queue_data_->Size();
329 }
330
333 bool Readable() { return read_fun_ != nullptr; }
334
349 {
350 read_fun_ = fun;
351 return *this;
352 }
353
371 void Finish(bool in_isr, ErrorCode ans, ReadInfoBlock &info, uint32_t size)
372 {
373 read_size_ = size;
374 busy_.store(BusyState::Idle, std::memory_order_release);
375 info.op.UpdateStatus(in_isr, std::forward<ErrorCode>(ans));
376 }
377
389
406 ErrorCode operator()(RawData data, ReadOperation &op)
407 {
408 if (Readable())
409 {
410 mutex_.Lock();
411
412 BusyState is_busy = busy_.load(std::memory_order_relaxed);
413
414 if (is_busy == BusyState::Pending)
415 {
416 mutex_.Unlock();
417 return ErrorCode::BUSY;
418 }
419
420 while (true)
421 {
422 busy_.store(BusyState::Idle, std::memory_order_release);
423
424 if (queue_data_ != nullptr)
425 {
426 auto readable_size = queue_data_->Size();
427
428 if (readable_size >= data.size_ && readable_size != 0)
429 {
430 auto ans = queue_data_->PopBatch(reinterpret_cast<uint8_t *>(data.addr_),
431 data.size_);
432 UNUSED(ans);
433 read_size_ = data.size_;
434 ASSERT(ans == ErrorCode::OK);
435 if (op.type != ReadOperation::OperationType::BLOCK)
436 {
437 op.UpdateStatus(false, ErrorCode::OK);
438 }
439 mutex_.Unlock();
440 return ErrorCode::OK;
441 }
442 }
443
444 info_ = ReadInfoBlock{data, op};
445
446 op.MarkAsRunning();
447
448 auto ans = read_fun_(*this);
449
450 if (ans != ErrorCode::OK)
451 {
452 BusyState expected = BusyState::Idle;
453 if (busy_.compare_exchange_strong(expected, BusyState::Pending,
454 std::memory_order_acq_rel,
455 std::memory_order_acquire))
456 {
457 break;
458 }
459 else
460 {
461 expected = BusyState::Pending;
462 continue;
463 }
464 }
465 else
466 {
467 read_size_ = data.size_;
468 if (op.type != ReadOperation::OperationType::BLOCK)
469 {
470 op.UpdateStatus(false, ErrorCode::OK);
471 }
472 mutex_.Unlock();
473 return ErrorCode::OK;
474 }
475 }
476
477 mutex_.Unlock();
478
479 if (op.type == ReadOperation::OperationType::BLOCK)
480 {
481 return op.data.sem_info.sem->Wait(op.data.sem_info.timeout);
482 }
483 else
484 {
485 return ErrorCode::OK;
486 }
487 }
488 else
489 {
490 return ErrorCode::NOT_SUPPORT;
491 }
492 }
493
501 virtual void ProcessPendingReads(bool in_isr)
502 {
503 ASSERT(queue_data_ != nullptr);
504
505 if (in_isr)
506 {
507 auto is_busy = busy_.load(std::memory_order_relaxed);
508
509 if (is_busy == BusyState::Pending)
510 {
511 if (queue_data_->Size() >= info_.data.size_)
512 {
513 if (info_.data.size_ > 0)
514 {
515 auto ans = queue_data_->PopBatch(
516 reinterpret_cast<uint8_t *>(info_.data.addr_), info_.data.size_);
517 UNUSED(ans);
518 ASSERT(ans == ErrorCode::OK);
519 }
520 Finish(in_isr, ErrorCode::OK, info_, info_.data.size_);
521 }
522 }
523 else if (is_busy == BusyState::Idle)
524 {
525 busy_.store(BusyState::Event, std::memory_order_release);
526 }
527 }
528 else
529 {
530 LibXR::Mutex::LockGuard lock_guard(mutex_);
531 if (busy_.load(std::memory_order_relaxed) == BusyState::Pending)
532 {
533 if (queue_data_->Size() >= info_.data.size_)
534 {
535 if (info_.data.size_ > 0)
536 {
537 auto ans = queue_data_->PopBatch(
538 reinterpret_cast<uint8_t *>(info_.data.addr_), info_.data.size_);
539 UNUSED(ans);
540 ASSERT(ans == ErrorCode::OK);
541 }
542 Finish(in_isr, ErrorCode::OK, info_, info_.data.size_);
543 }
544 }
545 }
546 }
547
550 virtual void Reset()
551 {
552 ASSERT(queue_data_ != nullptr);
553 Mutex::LockGuard lock_guard(mutex_);
554 queue_data_->Reset();
555 read_size_ = 0;
556 }
557};
558
564{
565 public:
566 WriteFun write_fun_ = nullptr;
568 LockFreeQueue<uint8_t> *queue_data_;
569 Mutex mutex_;
570 size_t write_size_ = 0;
571
585 WritePort(size_t queue_size = 3, size_t buffer_size = 128)
586 : queue_info_(new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
587 LockFreeQueue<WriteInfoBlock>(queue_size)),
588 queue_data_(buffer_size > 0 ? new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
589 LockFreeQueue<uint8_t>(buffer_size)
590 : nullptr)
591 {
592 }
593
601 virtual size_t EmptySize()
602 {
603 ASSERT(queue_data_ != nullptr);
604 return queue_data_->EmptySize();
605 }
606
614 virtual size_t Size()
615 {
616 ASSERT(queue_data_ != nullptr);
617 return queue_data_->Size();
618 }
619
627 bool Writable() { return write_fun_ != nullptr; }
628
643 {
644 write_fun_ = fun;
645 return *this;
646 }
647
665 void Finish(bool in_isr, ErrorCode ans, WriteInfoBlock &info, uint32_t size)
666 {
667 write_size_ = size;
668 info.op.UpdateStatus(in_isr, std::forward<ErrorCode>(ans));
669 }
670
682
700 {
701 if (Writable())
702 {
703 if (data.size_ == 0)
704 {
705 write_size_ = 0;
706 if (op.type != WriteOperation::OperationType::BLOCK)
707 {
708 op.UpdateStatus(false, ErrorCode::OK);
709 }
710 return ErrorCode::OK;
711 }
712
713 mutex_.Lock();
714
715 if (queue_info_->EmptySize() < 1)
716 {
717 mutex_.Unlock();
718 return ErrorCode::FULL;
719 }
720
721 if (queue_data_)
722 {
723 if (queue_data_->EmptySize() < data.size_)
724 {
725 mutex_.Unlock();
726 return ErrorCode::FULL;
727 }
728
729 auto ans = queue_data_->PushBatch(reinterpret_cast<const uint8_t *>(data.addr_),
730 data.size_);
731 UNUSED(ans);
732 ASSERT(ans == ErrorCode::OK);
733
734 WriteInfoBlock info{data, op};
735 ans = queue_info_->Push(info);
736
737 ASSERT(ans == ErrorCode::OK);
738
739 op.MarkAsRunning();
740
741 ans = write_fun_(*this);
742
743 mutex_.Unlock();
744
745 if (ans == ErrorCode::OK)
746 {
747 write_size_ = data.size_;
748 if (op.type != WriteOperation::OperationType::BLOCK)
749 {
750 op.UpdateStatus(false, ErrorCode::OK);
751 }
752 return ErrorCode::OK;
753 }
754
755 if (op.type == WriteOperation::OperationType::BLOCK)
756 {
757 return op.data.sem_info.sem->Wait(op.data.sem_info.timeout);
758 }
759
760 return ErrorCode::OK;
761 }
762 else
763 {
764 WriteInfoBlock info{data, op};
765 auto ans = queue_info_->Push(info);
766
767 ASSERT(ans == ErrorCode::OK);
768
769 op.MarkAsRunning();
770
771 ans = write_fun_(*this);
772
773 mutex_.Unlock();
774
775 if (ans == ErrorCode::OK)
776 {
777 write_size_ = data.size_;
778 if (op.type != WriteOperation::OperationType::BLOCK)
779 {
780 op.UpdateStatus(false, ErrorCode::OK);
781 }
782 return ErrorCode::OK;
783 }
784
785 if (op.type == WriteOperation::OperationType::BLOCK)
786 {
787 return op.data.sem_info.sem->Wait(op.data.sem_info.timeout);
788 }
789 else
790 {
791 return ErrorCode::OK;
792 }
793 }
794 }
795 else
796 {
797 return ErrorCode::NOT_SUPPORT;
798 }
799 }
800
803 virtual void Reset()
804 {
805 ASSERT(queue_data_ != nullptr);
806 Mutex::LockGuard lock_guard(mutex_);
807 queue_info_->Reset();
808 queue_data_->Reset();
809 write_size_ = 0;
810 }
811};
812
817class STDIO
818{
819 public:
820 // NOLINTBEGIN
821 static inline ReadPort *read_ = nullptr;
822 static inline WritePort *write_ = nullptr;
823#if LIBXR_PRINTF_BUFFER_SIZE > 0
824 static inline char
825 printf_buff_[LIBXR_PRINTF_BUFFER_SIZE];
826#endif
827 // NOLINTEND
828
838 static int Printf(const char *fmt, ...) // NOLINT
839 {
840#if LIBXR_PRINTF_BUFFER_SIZE > 0
841 if (!STDIO::write_ || !STDIO::write_->Writable())
842 {
843 return -1;
844 }
845
846 static LibXR::Mutex mutex; // NOLINT
847
848 LibXR::Mutex::LockGuard lock_guard(mutex);
849
850 va_list args;
851 va_start(args, fmt);
852 int len = vsnprintf(STDIO::printf_buff_, LIBXR_PRINTF_BUFFER_SIZE, fmt, args);
853 va_end(args);
854
855 // Check result and limit length
856 if (len < 0)
857 {
858 return -1;
859 }
860 if (static_cast<size_t>(len) >= LIBXR_PRINTF_BUFFER_SIZE)
861 {
862 len = LIBXR_PRINTF_BUFFER_SIZE - 1;
863 }
864
865 ConstRawData data = {reinterpret_cast<const uint8_t *>(STDIO::printf_buff_),
866 static_cast<size_t>(len)};
867
868 static WriteOperation op; // NOLINT
869 return static_cast<int>(STDIO::write_->operator()(data, op));
870#else
871 UNUSED(fmt);
872 return 0;
873#endif
874 }
875};
876} // namespace LibXR
提供一个通用的回调包装,支持动态参数传递。 Provides a generic callback wrapper, supporting dynamic argument passing.
Definition libxr_cb.hpp:125
常量原始数据封装类。 A class for encapsulating constant raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
const void * addr_
数据存储地址(常量)。 The storage address of the data (constant).
事件管理系统,允许基于事件 ID 注册和触发回调函数。 Event management system that allows registration and triggering of callba...
Definition event.hpp:18
无锁队列实现 / Lock-free queue implementation
void Reset()
重置队列 / Resets the queue
ErrorCode PushBatch(const Data *data, size_t size)
批量推入数据 / Pushes multiple elements into the queue
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue
ErrorCode PopBatch(Data *data, size_t size)
批量弹出数据 / Pops multiple elements from the queue
互斥锁的 RAII 机制封装 (RAII-style mechanism for automatic mutex management).
Definition mutex.hpp:65
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
Definition mutex.hpp:18
ErrorCode Lock()
加锁,如果锁已被占用,则阻塞等待 (Lock the mutex, blocking if it is already locked).
Definition mutex.cpp:14
void Unlock()
解锁互斥锁 (Unlock the mutex).
Definition mutex.cpp:28
Defines an operation with different execution modes.
Definition libxr_rw.hpp:29
Operation & operator=(const Operation &op)
Copy assignment operator.
Definition libxr_rw.hpp:96
Operation(Semaphore &sem, uint32_t timeout=UINT32_MAX)
Constructs a blocking operation with a semaphore and timeout.
Definition libxr_rw.hpp:64
Operation & operator=(Operation &&op) noexcept
Move assignment operator.
Definition libxr_rw.hpp:126
Operation(Callback &callback)
Constructs a callback-based operation.
Definition libxr_rw.hpp:75
Operation()
Default constructor, initializes with NONE type.
Definition libxr_rw.hpp:54
void MarkAsRunning()
标记操作为运行状态。 Marks the operation as running.
Definition libxr_rw.hpp:204
void UpdateStatus(bool in_isr, Status &&...status)
Updates operation status based on type.
Definition libxr_rw.hpp:173
union LibXR::Operation::@4 data
OperationType type
Definition libxr_rw.hpp:227
Operation(OperationPollingStatus &status)
Constructs a polling operation.
Definition libxr_rw.hpp:85
Operation(InitOperation &&op)
构造一个新的 Operation 对象(初始化操作)。 Constructs a new Operation object (initialization operation).
Definition libxr_rw.hpp:161
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
ReadPort class for handling read operations.
Definition libxr_rw.hpp:270
void MarkAsRunning(ReadInfoBlock &info)
标记读取操作为运行中。 Marks the read operation as running.
Definition libxr_rw.hpp:388
ReadPort & operator=(ReadFun fun)
赋值运算符重载,用于设置读取函数。 Overloaded assignment operator to set the read function.
Definition libxr_rw.hpp:348
ReadPort(size_t buffer_size=128)
Constructs a ReadPort with queue sizes.
Definition libxr_rw.hpp:292
void Finish(bool in_isr, ErrorCode ans, ReadInfoBlock &info, uint32_t size)
更新读取操作的状态。 Updates the status of the read operation.
Definition libxr_rw.hpp:371
virtual size_t Size()
获取当前队列的已使用大小。 Gets the currently used size of the queue.
Definition libxr_rw.hpp:325
bool Readable()
Checks if read operations are supported.
Definition libxr_rw.hpp:333
virtual void ProcessPendingReads(bool in_isr)
Processes pending reads.
Definition libxr_rw.hpp:501
virtual void Reset()
Resets the ReadPort.
Definition libxr_rw.hpp:550
ErrorCode operator()(RawData data, ReadOperation &op)
读取操作符重载,用于执行读取操作。 Overloaded function call operator to perform a read operation.
Definition libxr_rw.hpp:406
virtual size_t EmptySize()
获取队列的剩余可用空间。 Gets the remaining available space in the queue.
Definition libxr_rw.hpp:309
STDIO interface for read/write ports.
Definition libxr_rw.hpp:818
static ReadPort * read_
Read port instance. 读取端口。
Definition libxr_rw.hpp:821
static WritePort * write_
Write port instance. 写入端口。
Definition libxr_rw.hpp:822
static int Printf(const char *fmt,...)
Prints a formatted string to the write port (like printf).
Definition libxr_rw.hpp:838
信号量类,实现线程同步机制 Semaphore class implementing thread synchronization
Definition semaphore.hpp:23
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待(减少)信号量 Waits (decrements) the semaphore
Definition semaphore.cpp:25
WritePort class for handling write operations.
Definition libxr_rw.hpp:564
void Finish(bool in_isr, ErrorCode ans, WriteInfoBlock &info, uint32_t size)
更新写入操作的状态。 Updates the status of the write operation.
Definition libxr_rw.hpp:665
ErrorCode operator()(ConstRawData data, WriteOperation &op)
执行写入操作。 Performs a write operation.
Definition libxr_rw.hpp:699
WritePort & operator=(WriteFun fun)
赋值运算符重载,用于设置写入函数。 Overloaded assignment operator to set the write function.
Definition libxr_rw.hpp:642
WritePort(size_t queue_size=3, size_t buffer_size=128)
构造一个新的 WritePort 对象。 Constructs a new WritePort object.
Definition libxr_rw.hpp:585
virtual size_t EmptySize()
获取数据队列的剩余可用空间。 Gets the remaining available space in the data queue.
Definition libxr_rw.hpp:601
bool Writable()
判断端口是否可写。 Checks whether the port is writable.
Definition libxr_rw.hpp:627
virtual size_t Size()
获取当前数据队列的已使用大小。 Gets the used size of the current data queue.
Definition libxr_rw.hpp:614
virtual void Reset()
Resets the WritePort.
Definition libxr_rw.hpp:803
void MarkAsRunning(WriteOperation &op)
标记写入操作为运行中。 Marks the write operation as running.
Definition libxr_rw.hpp:681
LibXR 命名空间
Definition ch32_gpio.hpp:9
ErrorCode(* ReadFun)(ReadPort &port)
Function pointer type for read operations.
Definition libxr_rw.hpp:247
Operation< ErrorCode > ReadOperation
Read operation type.
Definition libxr_rw.hpp:235
ErrorCode(* WriteFun)(WritePort &port)
Function pointer type for write operations.
Definition libxr_rw.hpp:243
Operation< ErrorCode > WriteOperation
Write operation type.
Definition libxr_rw.hpp:239
Read information block structure.
Definition libxr_rw.hpp:254
RawData data
Data buffer. 数据缓冲区。
Definition libxr_rw.hpp:255
ReadOperation op
Read operation instance. 读取操作实例。
Definition libxr_rw.hpp:256