10#include "libxr_def.hpp"
11#include "libxr_type.hpp"
12#include "lockfree_queue.hpp"
15#include "semaphore.hpp"
27template <
typename... Args>
66 data.sem_info.sem = &sem;
67 data.sem_info.timeout = timeout;
77 data.callback = &callback;
87 data.status = &status;
103 case OperationType::CALLBACK:
106 case OperationType::BLOCK:
107 data.sem_info.sem = op.
data.sem_info.sem;
108 data.sem_info.timeout = op.
data.sem_info.timeout;
110 case OperationType::POLLING:
113 case OperationType::NONE:
133 case OperationType::CALLBACK:
134 data.callback = op.data.callback;
136 case OperationType::BLOCK:
137 data.sem_info.sem = op.data.sem_info.sem;
138 data.sem_info.timeout = op.data.sem_info.timeout;
140 case OperationType::POLLING:
141 data.status = op.data.status;
143 case OperationType::NONE:
159 typename InitOperation,
160 typename = std::enable_if_t<std::is_same_v<std::decay_t<InitOperation>,
Operation>>>
163 *
this = std::forward<InitOperation>(op);
172 template <
typename... Status>
177 case OperationType::CALLBACK:
178 data.callback->Run(in_isr, std::forward<Args>(status)...);
180 case OperationType::BLOCK:
181 data.sem_info.sem->PostFromCallback(in_isr);
183 case OperationType::POLLING:
184 *
data.status = OperationPollingStatus::DONE;
186 case OperationType::NONE:
206 if (
type == OperationType::POLLING)
208 *
data.status = OperationPollingStatus::RUNNING;
272 enum class BusyState : uint32_t
281 size_t read_size_ = 0;
284 std::atomic<BusyState> busy_{BusyState::Idle};
293 : queue_data_(buffer_size > 0 ? new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
311 ASSERT(queue_data_ !=
nullptr);
327 ASSERT(queue_data_ !=
nullptr);
328 return queue_data_->
Size();
374 busy_.store(BusyState::Idle, std::memory_order_release);
412 BusyState is_busy = busy_.load(std::memory_order_relaxed);
414 if (is_busy == BusyState::Pending)
417 return ErrorCode::BUSY;
422 busy_.store(BusyState::Idle, std::memory_order_release);
424 if (queue_data_ !=
nullptr)
426 auto readable_size = queue_data_->
Size();
428 if (readable_size >= data.
size_ && readable_size != 0)
430 auto ans = queue_data_->
PopBatch(
reinterpret_cast<uint8_t *
>(data.
addr_),
433 read_size_ = data.
size_;
434 ASSERT(ans == ErrorCode::OK);
435 if (op.
type != ReadOperation::OperationType::BLOCK)
440 return ErrorCode::OK;
448 auto ans = read_fun_(*
this);
450 if (ans != ErrorCode::OK)
452 BusyState expected = BusyState::Idle;
453 if (busy_.compare_exchange_strong(expected, BusyState::Pending,
454 std::memory_order_acq_rel,
455 std::memory_order_acquire))
461 expected = BusyState::Pending;
467 read_size_ = data.
size_;
468 if (op.
type != ReadOperation::OperationType::BLOCK)
473 return ErrorCode::OK;
479 if (op.
type == ReadOperation::OperationType::BLOCK)
481 return op.
data.sem_info.sem->
Wait(op.
data.sem_info.timeout);
485 return ErrorCode::OK;
490 return ErrorCode::NOT_SUPPORT;
503 ASSERT(queue_data_ !=
nullptr);
507 auto is_busy = busy_.load(std::memory_order_relaxed);
509 if (is_busy == BusyState::Pending)
518 ASSERT(ans == ErrorCode::OK);
523 else if (is_busy == BusyState::Idle)
525 busy_.store(BusyState::Event, std::memory_order_release);
531 if (busy_.load(std::memory_order_relaxed) == BusyState::Pending)
540 ASSERT(ans == ErrorCode::OK);
552 ASSERT(queue_data_ !=
nullptr);
554 queue_data_->
Reset();
570 size_t write_size_ = 0;
585 WritePort(
size_t queue_size = 3,
size_t buffer_size = 128)
586 : queue_info_(new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
588 queue_data_(buffer_size > 0 ? new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
603 ASSERT(queue_data_ !=
nullptr);
616 ASSERT(queue_data_ !=
nullptr);
617 return queue_data_->
Size();
668 info.op.
UpdateStatus(in_isr, std::forward<ErrorCode>(ans));
706 if (op.
type != WriteOperation::OperationType::BLOCK)
710 return ErrorCode::OK;
718 return ErrorCode::FULL;
726 return ErrorCode::FULL;
729 auto ans = queue_data_->
PushBatch(
reinterpret_cast<const uint8_t *
>(data.
addr_),
732 ASSERT(ans == ErrorCode::OK);
735 ans = queue_info_->
Push(info);
737 ASSERT(ans == ErrorCode::OK);
741 ans = write_fun_(*
this);
745 if (ans == ErrorCode::OK)
747 write_size_ = data.
size_;
748 if (op.
type != WriteOperation::OperationType::BLOCK)
752 return ErrorCode::OK;
755 if (op.
type == WriteOperation::OperationType::BLOCK)
757 return op.
data.sem_info.sem->
Wait(op.
data.sem_info.timeout);
760 return ErrorCode::OK;
765 auto ans = queue_info_->
Push(info);
767 ASSERT(ans == ErrorCode::OK);
771 ans = write_fun_(*
this);
775 if (ans == ErrorCode::OK)
777 write_size_ = data.
size_;
778 if (op.
type != WriteOperation::OperationType::BLOCK)
782 return ErrorCode::OK;
785 if (op.
type == WriteOperation::OperationType::BLOCK)
787 return op.
data.sem_info.sem->
Wait(op.
data.sem_info.timeout);
791 return ErrorCode::OK;
797 return ErrorCode::NOT_SUPPORT;
805 ASSERT(queue_data_ !=
nullptr);
807 queue_info_->
Reset();
808 queue_data_->
Reset();
823#if LIBXR_PRINTF_BUFFER_SIZE > 0
825 printf_buff_[LIBXR_PRINTF_BUFFER_SIZE];
840#if LIBXR_PRINTF_BUFFER_SIZE > 0
852 int len = vsnprintf(STDIO::printf_buff_, LIBXR_PRINTF_BUFFER_SIZE, fmt, args);
860 if (
static_cast<size_t>(len) >= LIBXR_PRINTF_BUFFER_SIZE)
862 len = LIBXR_PRINTF_BUFFER_SIZE - 1;
865 ConstRawData data = {
reinterpret_cast<const uint8_t *
>(STDIO::printf_buff_),
866 static_cast<size_t>(len)};
869 return static_cast<int>(
STDIO::write_->operator()(data, op));
提供一个通用的回调包装,支持动态参数传递。 Provides a generic callback wrapper, supporting dynamic argument passing.
常量原始数据封装类。 A class for encapsulating constant raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
const void * addr_
数据存储地址(常量)。 The storage address of the data (constant).
事件管理系统,允许基于事件 ID 注册和触发回调函数。 Event management system that allows registration and triggering of callba...
无锁队列实现 / Lock-free queue implementation
void Reset()
重置队列 / Resets the queue
ErrorCode PushBatch(const Data *data, size_t size)
批量推入数据 / Pushes multiple elements into the queue
ErrorCode Push(ElementData &&item)
向队列中推入数据 / Pushes data into the queue
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue
ErrorCode PopBatch(Data *data, size_t size)
批量弹出数据 / Pops multiple elements from the queue
互斥锁的 RAII 机制封装 (RAII-style mechanism for automatic mutex management).
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
ErrorCode Lock()
加锁,如果锁已被占用,则阻塞等待 (Lock the mutex, blocking if it is already locked).
void Unlock()
解锁互斥锁 (Unlock the mutex).
Defines an operation with different execution modes.
Operation & operator=(const Operation &op)
Copy assignment operator.
Operation(Semaphore &sem, uint32_t timeout=UINT32_MAX)
Constructs a blocking operation with a semaphore and timeout.
Operation & operator=(Operation &&op) noexcept
Move assignment operator.
Operation(Callback &callback)
Constructs a callback-based operation.
Operation()
Default constructor, initializes with NONE type.
void MarkAsRunning()
标记操作为运行状态。 Marks the operation as running.
void UpdateStatus(bool in_isr, Status &&...status)
Updates operation status based on type.
union LibXR::Operation::@4 data
Operation(OperationPollingStatus &status)
Constructs a polling operation.
Operation(InitOperation &&op)
构造一个新的 Operation 对象(初始化操作)。 Constructs a new Operation object (initialization operation).
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
ReadPort class for handling read operations.
void MarkAsRunning(ReadInfoBlock &info)
标记读取操作为运行中。 Marks the read operation as running.
ReadPort & operator=(ReadFun fun)
赋值运算符重载,用于设置读取函数。 Overloaded assignment operator to set the read function.
ReadPort(size_t buffer_size=128)
Constructs a ReadPort with queue sizes.
void Finish(bool in_isr, ErrorCode ans, ReadInfoBlock &info, uint32_t size)
更新读取操作的状态。 Updates the status of the read operation.
virtual size_t Size()
获取当前队列的已使用大小。 Gets the currently used size of the queue.
bool Readable()
Checks if read operations are supported.
virtual void ProcessPendingReads(bool in_isr)
Processes pending reads.
virtual void Reset()
Resets the ReadPort.
ErrorCode operator()(RawData data, ReadOperation &op)
读取操作符重载,用于执行读取操作。 Overloaded function call operator to perform a read operation.
virtual size_t EmptySize()
获取队列的剩余可用空间。 Gets the remaining available space in the queue.
STDIO interface for read/write ports.
static ReadPort * read_
Read port instance. 读取端口。
static WritePort * write_
Write port instance. 写入端口。
static int Printf(const char *fmt,...)
Prints a formatted string to the write port (like printf).
信号量类,实现线程同步机制 Semaphore class implementing thread synchronization
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待(减少)信号量 Waits (decrements) the semaphore
WritePort class for handling write operations.
void Finish(bool in_isr, ErrorCode ans, WriteInfoBlock &info, uint32_t size)
更新写入操作的状态。 Updates the status of the write operation.
ErrorCode operator()(ConstRawData data, WriteOperation &op)
执行写入操作。 Performs a write operation.
WritePort & operator=(WriteFun fun)
赋值运算符重载,用于设置写入函数。 Overloaded assignment operator to set the write function.
WritePort(size_t queue_size=3, size_t buffer_size=128)
构造一个新的 WritePort 对象。 Constructs a new WritePort object.
virtual size_t EmptySize()
获取数据队列的剩余可用空间。 Gets the remaining available space in the data queue.
bool Writable()
判断端口是否可写。 Checks whether the port is writable.
virtual size_t Size()
获取当前数据队列的已使用大小。 Gets the used size of the current data queue.
virtual void Reset()
Resets the WritePort.
void MarkAsRunning(WriteOperation &op)
标记写入操作为运行中。 Marks the write operation as running.
ErrorCode(* ReadFun)(ReadPort &port)
Function pointer type for read operations.
Operation< ErrorCode > ReadOperation
Read operation type.
ErrorCode(* WriteFun)(WritePort &port)
Function pointer type for write operations.
Operation< ErrorCode > WriteOperation
Write operation type.
Read information block structure.
RawData data
Data buffer. 数据缓冲区。
ReadOperation op
Read operation instance. 读取操作实例。