3#include "libxr_def.hpp"
12 : queue_data_(buffer_size > 0 ? new (std::align_val_t(LIBXR_CACHE_LINE_SIZE))
20 ASSERT(queue_data_ !=
nullptr);
26 ASSERT(queue_data_ !=
nullptr);
27 return queue_data_->
Size();
40 busy_.store(BusyState::IDLE, std::memory_order_release);
50 BusyState is_busy = busy_.load(std::memory_order_acquire);
52 if (is_busy == BusyState::PENDING)
54 return ErrorCode::BUSY;
59 busy_.store(BusyState::IDLE, std::memory_order_release);
61 auto readable_size = queue_data_->
Size();
63 if (readable_size >= data.
size_ && readable_size != 0)
69 ASSERT(ans == ErrorCode::OK);
74 if (op.
type != ReadOperation::OperationType::BLOCK)
85 auto ans = read_fun_(*
this, in_isr);
87 if (ans == ErrorCode::PENDING)
89 BusyState expected = BusyState::IDLE;
90 if (busy_.compare_exchange_weak(expected, BusyState::PENDING,
91 std::memory_order_acq_rel,
92 std::memory_order_acquire))
103 if (op.
type != ReadOperation::OperationType::BLOCK)
107 return ErrorCode::OK;
111 if (op.
type == ReadOperation::OperationType::BLOCK)
114 return op.
data.sem_info.sem->
Wait(op.
data.sem_info.timeout);
118 return ErrorCode::OK;
123 return ErrorCode::NOT_SUPPORT;
129 ASSERT(queue_data_ !=
nullptr);
131 auto is_busy = busy_.load(std::memory_order_relaxed);
133 if (is_busy == BusyState::PENDING)
135 auto size = queue_data_->
Size();
136 if (size > 0 && size >= info_.
data.
size_)
143 ASSERT(ans == ErrorCode::OK);
146 Finish(in_isr, ErrorCode::OK, info_);
150 else if (is_busy == BusyState::IDLE)
152 busy_.store(BusyState::EVENT, std::memory_order_release);
158 ASSERT(queue_data_ !=
nullptr);
159 queue_data_->
Reset();
163 : queue_info_(new (std::align_val_t(LIBXR_CACHE_LINE_SIZE))
165 queue_data_(buffer_size > 0 ? new (std::align_val_t(LIBXR_CACHE_LINE_SIZE))
173 ASSERT(queue_data_ !=
nullptr);
179 ASSERT(queue_data_ !=
nullptr);
180 return queue_data_->
Size();
204 if (op.
type != WriteOperation::OperationType::BLOCK)
208 return ErrorCode::OK;
211 LockState expected = LockState::UNLOCKED;
212 if (!lock_.compare_exchange_strong(expected, LockState::LOCKED))
214 return ErrorCode::BUSY;
221 return ErrorCode::NOT_SUPPORT;
228 if (!meta_pushed && queue_info_->EmptySize() < 1)
230 lock_.store(LockState::UNLOCKED, std::memory_order_release);
231 return ErrorCode::FULL;
234 ErrorCode ans = ErrorCode::OK;
239 lock_.store(LockState::UNLOCKED, std::memory_order_release);
240 return ErrorCode::FULL;
246 ASSERT(ans == ErrorCode::OK);
249 ans = queue_info_->Push(info);
251 ASSERT(ans == ErrorCode::OK);
256 ans = write_fun_(*
this, in_isr);
260 lock_.store(LockState::UNLOCKED, std::memory_order_release);
263 if (ans != ErrorCode::PENDING)
265 if (op.
type != WriteOperation::OperationType::BLOCK)
269 return ErrorCode::OK;
272 if (op.
type == WriteOperation::OperationType::BLOCK)
275 return op.
data.sem_info.sem->
Wait(op.
data.sem_info.timeout);
278 return ErrorCode::OK;
283 ASSERT(queue_data_ !=
nullptr);
284 queue_data_->
Reset();
285 queue_info_->Reset();
289 : port_(port), op_(op)
291 LockState expected = LockState::UNLOCKED;
292 if (
port_->lock_.compare_exchange_strong(expected, LockState::LOCKED))
294 if (port_->queue_info_->EmptySize() < 1)
297 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
307 if (locked_ && size_ > 0)
310 port_->CommitWrite({
nullptr, size_}, op_,
true);
315 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
323 LockState expected = LockState::UNLOCKED;
324 if (port_->lock_.compare_exchange_strong(expected, LockState::LOCKED))
326 if (port_->queue_info_->EmptySize() < 1)
329 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
335 cap_ = port_->queue_data_->EmptySize();
343 if (size_ + data.
size_ <= cap_)
345 port_->queue_data_->PushBatch(
reinterpret_cast<const uint8_t*
>(data.
addr_),
355 auto ans = ErrorCode::OK;
357 if (locked_ && size_ > 0)
360 ASSERT(ans == ErrorCode::OK);
361 ans = port_->CommitWrite({
nullptr, size_}, op_,
true);
362 ASSERT(ans == ErrorCode::OK);
366 if (port_->queue_info_->EmptySize() < 1)
369 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
373 cap_ = port_->queue_data_->EmptySize();
382#if LIBXR_PRINTF_BUFFER_SIZE > 0
397 int len = vsnprintf(STDIO::printf_buff_, LIBXR_PRINTF_BUFFER_SIZE, fmt, args);
405 if (
static_cast<size_t>(len) >= LIBXR_PRINTF_BUFFER_SIZE)
407 len = LIBXR_PRINTF_BUFFER_SIZE - 1;
410 ConstRawData data = {
reinterpret_cast<const uint8_t*
>(STDIO::printf_buff_),
411 static_cast<size_t>(len)};
414 auto ans = ErrorCode::OK;
415 if (write_stream_ ==
nullptr)
421 (*write_stream_) << data;
422 ans = write_stream_->Commit();
425 if (ans == ErrorCode::OK)
常量原始数据封装类。 A class for encapsulating constant raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
const void * addr_
数据存储地址(常量)。 The storage address of the data (constant).
无锁队列实现 / Lock-free queue implementation
void Reset()
重置队列 / Resets the queue
ErrorCode PushBatch(const Data *data, size_t size)
批量推入数据 / Pushes multiple elements into the queue
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue
ErrorCode PopBatch(Data *data, size_t size)
批量弹出数据 / Pops multiple elements from the queue
互斥锁的 RAII 机制封装 (RAII-style mechanism for automatic mutex management).
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
union LibXR::Operation::@5 data
void UpdateStatus(bool in_isr, Status &&status)
Updates operation status based on type.
void MarkAsRunning()
标记操作为运行状态。 Marks the operation as running.
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
ReadPort class for handling read operations.
bool Readable()
Checks if read operations are supported.
void Finish(bool in_isr, ErrorCode ans, ReadInfoBlock &info)
更新读取操作的状态。 Updates the status of the read operation.
ReadPort(size_t buffer_size=128)
Constructs a ReadPort with queue sizes.
void Reset()
Resets the ReadPort.
size_t EmptySize()
获取队列的剩余可用空间。 Gets the remaining available space in the queue.
ErrorCode operator()(RawData data, ReadOperation &op, bool in_isr=false)
读取操作符重载,用于执行读取操作。 Overloaded function call operator to perform a read operation.
void ProcessPendingReads(bool in_isr)
Processes pending reads.
ReadPort & operator=(ReadFun fun)
赋值运算符重载,用于设置读取函数。 Overloaded assignment operator to set the read function.
size_t Size()
获取当前队列的已使用大小。 Gets the currently used size of the queue.
virtual void OnRxDequeue(bool)
RX 数据从软件队列成功出队后的通知。 Notification after bytes are popped from RX data queue.
void MarkAsRunning(ReadInfoBlock &info)
标记读取操作为运行中。 Marks the read operation as running.
static int Printf(const char *fmt,...)
Prints a formatted string to the write port (like printf).
static WritePort * write_
Write port instance. 写入端口。
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待(减少)信号量 Waits (decrements) the semaphore
WritePort 的流式写入操作器,支持链式 << 操作和批量提交。
bool locked_
是否持有写锁 Whether write lock is held
ErrorCode Commit()
手动提交已写入的数据到队列,并尝试续锁。
size_t cap_
当前队列容量 Current queue capacity
LibXR::WritePort * port_
写端口指针 Pointer to the WritePort
~Stream()
析构时自动提交已累积的数据并释放锁。
Stream(LibXR::WritePort *port, LibXR::WriteOperation op)
构造流写入对象,并尝试锁定端口。
Stream & operator<<(const ConstRawData &data)
追加写入数据,支持链式调用。
WritePort class for handling write operations.
size_t EmptySize()
获取数据队列的剩余可用空间。 Gets the remaining available space in the data queue.
size_t Size()
获取当前数据队列的已使用大小。 Gets the used size of the current data queue.
void Finish(bool in_isr, ErrorCode ans, WriteInfoBlock &info)
更新写入操作的状态。 Updates the status of the write operation.
WritePort(size_t queue_size=3, size_t buffer_size=128)
构造一个新的 WritePort 对象。 Constructs a new WritePort object.
WritePort & operator=(WriteFun fun)
赋值运算符重载,用于设置写入函数。 Overloaded assignment operator to set the write function.
bool Writable()
判断端口是否可写。 Checks whether the port is writable.
ErrorCode CommitWrite(ConstRawData data, WriteOperation &op, bool pushed=false, bool in_isr=false)
提交写入操作。 Commits a write operation.
void MarkAsRunning(WriteOperation &op)
标记写入操作为运行中。 Marks the write operation as running.
void Reset()
Resets the WritePort.
ErrorCode operator()(ConstRawData data, WriteOperation &op, bool in_isr=false)
执行写入操作。 Performs a write operation.
ErrorCode(* ReadFun)(ReadPort &port, bool in_isr)
Function pointer type for read operations.
ErrorCode(* WriteFun)(WritePort &port, bool in_isr)
Function pointer type for write operations.
Read information block structure.
RawData data
Data buffer. 数据缓冲区。
ReadOperation op
Read operation instance. 读取操作实例。
WriteOperation op
Write operation instance. 写入操作实例。