11 : queue_data_(buffer_size > 0 ? new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
19 ASSERT(queue_data_ !=
nullptr);
25 ASSERT(queue_data_ !=
nullptr);
26 return queue_data_->
Size();
40 busy_.store(BusyState::IDLE, std::memory_order_release);
50 BusyState is_busy = busy_.load(std::memory_order_relaxed);
52 if (is_busy == BusyState::PENDING)
54 return ErrorCode::BUSY;
59 busy_.store(BusyState::IDLE, std::memory_order_release);
61 if (queue_data_ !=
nullptr)
63 auto readable_size = queue_data_->
Size();
65 if (readable_size >= data.
size_ && readable_size != 0)
70 read_size_ = data.
size_;
71 ASSERT(ans == ErrorCode::OK);
72 if (op.
type != ReadOperation::OperationType::BLOCK)
84 auto ans = read_fun_(*
this);
86 if (ans != ErrorCode::OK)
88 BusyState expected = BusyState::IDLE;
89 if (busy_.compare_exchange_weak(expected, BusyState::PENDING,
90 std::memory_order_acq_rel,
91 std::memory_order_acquire))
97 expected = BusyState::PENDING;
103 read_size_ = data.
size_;
104 if (op.
type != ReadOperation::OperationType::BLOCK)
108 return ErrorCode::OK;
112 if (op.
type == ReadOperation::OperationType::BLOCK)
114 return op.
data.sem_info.sem->
Wait(op.
data.sem_info.timeout);
118 return ErrorCode::OK;
123 return ErrorCode::NOT_SUPPORT;
129 ASSERT(queue_data_ !=
nullptr);
133 auto is_busy = busy_.load(std::memory_order_relaxed);
135 if (is_busy == BusyState::PENDING)
144 ASSERT(ans == ErrorCode::OK);
149 else if (is_busy == BusyState::IDLE)
151 busy_.store(BusyState::EVENT, std::memory_order_release);
156 if (busy_.load(std::memory_order_relaxed) == BusyState::PENDING)
165 ASSERT(ans == ErrorCode::OK);
175 ASSERT(queue_data_ !=
nullptr);
176 queue_data_->
Reset();
180 : queue_info_(new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
182 queue_data_(buffer_size > 0 ? new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
190 ASSERT(queue_data_ !=
nullptr);
196 ASSERT(queue_data_ !=
nullptr);
197 return queue_data_->
Size();
211 info.op.
UpdateStatus(in_isr, std::forward<ErrorCode>(ans));
223 if (op.
type != WriteOperation::OperationType::BLOCK)
227 return ErrorCode::OK;
230 LockState expected = LockState::UNLOCKED;
231 if (!lock_.compare_exchange_strong(expected, LockState::LOCKED))
233 return ErrorCode::BUSY;
236 return CommitWrite(data, op);
240 return ErrorCode::NOT_SUPPORT;
246 if (!pushed && queue_info_->EmptySize() < 1)
248 lock_.store(LockState::UNLOCKED, std::memory_order_release);
249 return ErrorCode::FULL;
254 ErrorCode ans = ErrorCode::OK;
259 lock_.store(LockState::UNLOCKED, std::memory_order_release);
260 return ErrorCode::FULL;
263 ans = queue_data_->
PushBatch(
reinterpret_cast<const uint8_t*
>(data.
addr_),
266 ASSERT(ans == ErrorCode::OK);
269 ans = queue_info_->Push(info);
271 ASSERT(ans == ErrorCode::OK);
276 ans = write_fun_(*
this);
278 lock_.store(LockState::UNLOCKED, std::memory_order_release);
280 if (ans == ErrorCode::OK)
282 write_size_ = data.
size_;
283 if (op.
type != WriteOperation::OperationType::BLOCK)
287 return ErrorCode::OK;
290 if (op.
type == WriteOperation::OperationType::BLOCK)
292 return op.
data.sem_info.sem->
Wait(op.
data.sem_info.timeout);
295 return ErrorCode::OK;
300 auto ans = queue_info_->Push(info);
302 ASSERT(ans == ErrorCode::OK);
306 ans = write_fun_(*
this);
308 lock_.store(LockState::UNLOCKED, std::memory_order_release);
310 if (ans == ErrorCode::OK)
312 write_size_ = data.
size_;
313 if (op.
type != WriteOperation::OperationType::BLOCK)
317 return ErrorCode::OK;
320 if (op.
type == WriteOperation::OperationType::BLOCK)
322 return op.
data.sem_info.sem->
Wait(op.
data.sem_info.timeout);
326 return ErrorCode::OK;
333 ASSERT(queue_data_ !=
nullptr);
334 queue_data_->
Reset();
335 queue_info_->Reset();
339 : port_(port), op_(op)
341 if (!port->queue_data_ || !port->
Writable())
343 fallback_to_normal_write_ = true;
346 LockState expected = LockState::UNLOCKED;
347 if (
port_->lock_.compare_exchange_strong(expected, LockState::LOCKED))
349 if (port_->queue_info_->EmptySize() < 1)
352 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
362 if (locked_ && size_ > 0)
365 port_->CommitWrite({
nullptr, size_}, op_,
true);
366 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
372 if (fallback_to_normal_write_)
380 LockState expected = LockState::UNLOCKED;
381 if (port_->lock_.compare_exchange_strong(expected, LockState::LOCKED))
384 cap_ = port_->queue_data_->EmptySize();
391 if (size_ + data.
size_ <= cap_)
393 port_->queue_data_->PushBatch(
reinterpret_cast<const uint8_t*
>(data.
addr_),
404 auto ans = ErrorCode::OK;
405 if (!fallback_to_normal_write_)
407 if (locked_ && size_ > 0)
410 ans = port_->CommitWrite({
nullptr, size_}, op_,
true);
414 if (port_->queue_info_->EmptySize() < 1)
417 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
421 cap_ = port_->queue_data_->EmptySize();
431#if LIBXR_PRINTF_BUFFER_SIZE > 0
446 int len = vsnprintf(STDIO::printf_buff_, LIBXR_PRINTF_BUFFER_SIZE, fmt, args);
454 if (
static_cast<size_t>(len) >= LIBXR_PRINTF_BUFFER_SIZE)
456 len = LIBXR_PRINTF_BUFFER_SIZE - 1;
459 ConstRawData data = {
reinterpret_cast<const uint8_t*
>(STDIO::printf_buff_),
460 static_cast<size_t>(len)};
463 if (write_stream_ ==
nullptr)
465 return static_cast<int>(
STDIO::write_->operator()(data, op));
469 (*write_stream_) << data;
470 return static_cast<int>(write_stream_->Commit());
常量原始数据封装类。 A class for encapsulating constant raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
const void * addr_
数据存储地址(常量)。 The storage address of the data (constant).
无锁队列实现 / Lock-free queue implementation
void Reset()
重置队列 / Resets the queue
ErrorCode PushBatch(const Data *data, size_t size)
批量推入数据 / Pushes multiple elements into the queue
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue
ErrorCode PopBatch(Data *data, size_t size)
批量弹出数据 / Pops multiple elements from the queue
互斥锁的 RAII 机制封装 (RAII-style mechanism for automatic mutex management).
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
void MarkAsRunning()
标记操作为运行状态。 Marks the operation as running.
void UpdateStatus(bool in_isr, Status &&...status)
Updates operation status based on type.
union LibXR::Operation::@4 data
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
ReadPort class for handling read operations.
bool Readable()
Checks if read operations are supported.
ReadPort(size_t buffer_size=128)
Constructs a ReadPort with queue sizes.
void Finish(bool in_isr, ErrorCode ans, ReadInfoBlock &info, uint32_t size)
更新读取操作的状态。 Updates the status of the read operation.
virtual void Reset()
Resets the ReadPort.
virtual size_t EmptySize()
获取队列的剩余可用空间。 Gets the remaining available space in the queue.
ErrorCode operator()(RawData data, ReadOperation &op)
读取操作符重载,用于执行读取操作。 Overloaded function call operator to perform a read operation.
virtual void ProcessPendingReads(bool in_isr)
Processes pending reads.
ReadPort & operator=(ReadFun fun)
赋值运算符重载,用于设置读取函数。 Overloaded assignment operator to set the read function.
virtual size_t Size()
获取当前队列的已使用大小。 Gets the currently used size of the queue.
void MarkAsRunning(ReadInfoBlock &info)
标记读取操作为运行中。 Marks the read operation as running.
static int Printf(const char *fmt,...)
Prints a formatted string to the write port (like printf).
static WritePort * write_
Write port instance. 写入端口。
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待(减少)信号量 Waits (decrements) the semaphore
WritePort 的流式写入操作器,支持链式 << 操作和批量提交。
bool locked_
是否持有写锁 Whether write lock is held
ErrorCode Commit()
手动提交已写入的数据到队列,并尝试续锁。
size_t cap_
当前队列容量 Current queue capacity
LibXR::WritePort * port_
写端口指针 Pointer to the WritePort
~Stream()
析构时自动提交已累积的数据并释放锁。
Stream(LibXR::WritePort *port, LibXR::WriteOperation op)
构造流写入对象,并尝试锁定端口。
Stream & operator<<(const ConstRawData &data)
追加写入数据,支持链式调用。
WritePort class for handling write operations.
virtual size_t EmptySize()
获取数据队列的剩余可用空间。 Gets the remaining available space in the data queue.
virtual size_t Size()
获取当前数据队列的已使用大小。 Gets the used size of the current data queue.
ErrorCode operator()(ConstRawData data, WriteOperation &op)
执行写入操作。 Performs a write operation.
WritePort(size_t queue_size=3, size_t buffer_size=128)
构造一个新的 WritePort 对象。 Constructs a new WritePort object.
WritePort & operator=(WriteFun fun)
赋值运算符重载,用于设置写入函数。 Overloaded assignment operator to set the write function.
bool Writable()
判断端口是否可写。 Checks whether the port is writable.
void MarkAsRunning(WriteOperation &op)
标记写入操作为运行中。 Marks the write operation as running.
virtual void Reset()
Resets the WritePort.
void Finish(bool in_isr, ErrorCode ans, WriteInfoBlock &info, uint32_t size)
更新写入操作的状态。 Updates the status of the write operation.
ErrorCode(* ReadFun)(ReadPort &port)
Function pointer type for read operations.
ErrorCode(* WriteFun)(WritePort &port)
Function pointer type for write operations.
Read information block structure.
RawData data
Data buffer. 数据缓冲区。
ReadOperation op
Read operation instance. 读取操作实例。