11 : queue_data_(buffer_size > 0 ? new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
19 ASSERT(queue_data_ !=
nullptr);
25 ASSERT(queue_data_ !=
nullptr);
26 return queue_data_->
Size();
40 busy_.store(BusyState::IDLE, std::memory_order_release);
50 BusyState is_busy = busy_.load(std::memory_order_relaxed);
52 if (is_busy == BusyState::PENDING)
54 return ErrorCode::BUSY;
59 busy_.store(BusyState::IDLE, std::memory_order_release);
61 if (queue_data_ !=
nullptr)
63 auto readable_size = queue_data_->
Size();
65 if (readable_size >= data.
size_ && readable_size != 0)
70 read_size_ = data.
size_;
71 ASSERT(ans == ErrorCode::OK);
72 if (op.
type != ReadOperation::OperationType::BLOCK)
84 auto ans = read_fun_(*
this);
86 if (ans != ErrorCode::OK)
88 BusyState expected = BusyState::IDLE;
89 if (busy_.compare_exchange_weak(expected, BusyState::PENDING,
90 std::memory_order_acq_rel,
91 std::memory_order_acquire))
102 read_size_ = data.
size_;
103 if (op.
type != ReadOperation::OperationType::BLOCK)
107 return ErrorCode::OK;
111 if (op.
type == ReadOperation::OperationType::BLOCK)
113 return op.
data.sem_info.sem->
Wait(op.
data.sem_info.timeout);
117 return ErrorCode::OK;
122 return ErrorCode::NOT_SUPPORT;
128 ASSERT(queue_data_ !=
nullptr);
132 auto is_busy = busy_.load(std::memory_order_relaxed);
134 if (is_busy == BusyState::PENDING)
143 ASSERT(ans == ErrorCode::OK);
148 else if (is_busy == BusyState::IDLE)
150 busy_.store(BusyState::EVENT, std::memory_order_release);
155 if (busy_.load(std::memory_order_relaxed) == BusyState::PENDING)
164 ASSERT(ans == ErrorCode::OK);
174 ASSERT(queue_data_ !=
nullptr);
175 queue_data_->
Reset();
179 : queue_info_(new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
181 queue_data_(buffer_size > 0 ? new(std::align_val_t(LIBXR_CACHE_LINE_SIZE))
189 ASSERT(queue_data_ !=
nullptr);
195 ASSERT(queue_data_ !=
nullptr);
196 return queue_data_->
Size();
222 if (op.
type != WriteOperation::OperationType::BLOCK)
226 return ErrorCode::OK;
229 LockState expected = LockState::UNLOCKED;
230 if (!lock_.compare_exchange_strong(expected, LockState::LOCKED))
232 return ErrorCode::BUSY;
235 return CommitWrite(data, op);
239 return ErrorCode::NOT_SUPPORT;
245 if (!meta_pushed && queue_info_->EmptySize() < 1)
247 lock_.store(LockState::UNLOCKED, std::memory_order_release);
248 return ErrorCode::FULL;
253 ErrorCode ans = ErrorCode::OK;
258 lock_.store(LockState::UNLOCKED, std::memory_order_release);
259 return ErrorCode::FULL;
262 ans = queue_data_->
PushBatch(
reinterpret_cast<const uint8_t*
>(data.
addr_),
265 ASSERT(ans == ErrorCode::OK);
268 ans = queue_info_->Push(info);
270 ASSERT(ans == ErrorCode::OK);
275 ans = write_fun_(*
this);
279 lock_.store(LockState::UNLOCKED, std::memory_order_release);
282 if (ans == ErrorCode::OK)
284 write_size_ = data.
size_;
285 if (op.
type != WriteOperation::OperationType::BLOCK)
289 return ErrorCode::OK;
292 if (op.
type == WriteOperation::OperationType::BLOCK)
294 return op.
data.sem_info.sem->
Wait(op.
data.sem_info.timeout);
297 return ErrorCode::OK;
302 auto ans = queue_info_->Push(info);
304 ASSERT(ans == ErrorCode::OK);
308 ans = write_fun_(*
this);
310 lock_.store(LockState::UNLOCKED, std::memory_order_release);
312 if (ans == ErrorCode::OK)
314 write_size_ = data.
size_;
315 if (op.
type != WriteOperation::OperationType::BLOCK)
319 return ErrorCode::OK;
322 if (op.
type == WriteOperation::OperationType::BLOCK)
324 return op.
data.sem_info.sem->
Wait(op.
data.sem_info.timeout);
328 return ErrorCode::OK;
335 ASSERT(queue_data_ !=
nullptr);
336 queue_data_->
Reset();
337 queue_info_->Reset();
341 : port_(port), op_(op)
343 if (!port->queue_data_ || !port->
Writable())
345 fallback_to_normal_write_ = true;
348 LockState expected = LockState::UNLOCKED;
349 if (
port_->lock_.compare_exchange_strong(expected, LockState::LOCKED))
351 if (port_->queue_info_->EmptySize() < 1)
354 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
364 if (locked_ && size_ > 0)
367 port_->CommitWrite({
nullptr, size_}, op_,
true);
372 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
378 if (fallback_to_normal_write_)
386 LockState expected = LockState::UNLOCKED;
387 if (port_->lock_.compare_exchange_strong(expected, LockState::LOCKED))
389 if (port_->queue_info_->EmptySize() < 1)
392 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
398 cap_ = port_->queue_data_->EmptySize();
406 if (size_ + data.
size_ <= cap_)
408 port_->queue_data_->PushBatch(
reinterpret_cast<const uint8_t*
>(data.
addr_),
419 auto ans = ErrorCode::OK;
420 if (!fallback_to_normal_write_)
422 if (locked_ && size_ > 0)
425 ASSERT(ans == ErrorCode::OK);
426 ans = port_->CommitWrite({
nullptr, size_}, op_,
true);
427 ASSERT(ans == ErrorCode::OK);
431 if (port_->queue_info_->EmptySize() < 1)
434 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
438 cap_ = port_->queue_data_->EmptySize();
448#if LIBXR_PRINTF_BUFFER_SIZE > 0
463 int len = vsnprintf(STDIO::printf_buff_, LIBXR_PRINTF_BUFFER_SIZE, fmt, args);
471 if (
static_cast<size_t>(len) >= LIBXR_PRINTF_BUFFER_SIZE)
473 len = LIBXR_PRINTF_BUFFER_SIZE - 1;
476 ConstRawData data = {
reinterpret_cast<const uint8_t*
>(STDIO::printf_buff_),
477 static_cast<size_t>(len)};
480 if (write_stream_ ==
nullptr)
482 return static_cast<int>(
STDIO::write_->operator()(data, op));
486 (*write_stream_) << data;
487 return static_cast<int>(write_stream_->Commit());
常量原始数据封装类。 A class for encapsulating constant raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
const void * addr_
数据存储地址(常量)。 The storage address of the data (constant).
无锁队列实现 / Lock-free queue implementation
void Reset()
重置队列 / Resets the queue
ErrorCode PushBatch(const Data *data, size_t size)
批量推入数据 / Pushes multiple elements into the queue
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue
ErrorCode PopBatch(Data *data, size_t size)
批量弹出数据 / Pops multiple elements from the queue
互斥锁的 RAII 机制封装 (RAII-style mechanism for automatic mutex management).
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
void MarkAsRunning()
标记操作为运行状态。 Marks the operation as running.
void UpdateStatus(bool in_isr, Status &&...status)
Updates operation status based on type.
union LibXR::Operation::@4 data
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
ReadPort class for handling read operations.
bool Readable()
Checks if read operations are supported.
ReadPort(size_t buffer_size=128)
Constructs a ReadPort with queue sizes.
void Finish(bool in_isr, ErrorCode ans, ReadInfoBlock &info, uint32_t size)
更新读取操作的状态。 Updates the status of the read operation.
virtual void Reset()
Resets the ReadPort.
virtual size_t EmptySize()
获取队列的剩余可用空间。 Gets the remaining available space in the queue.
ErrorCode operator()(RawData data, ReadOperation &op)
读取操作符重载,用于执行读取操作。 Overloaded function call operator to perform a read operation.
virtual void ProcessPendingReads(bool in_isr)
Processes pending reads.
ReadPort & operator=(ReadFun fun)
赋值运算符重载,用于设置读取函数。 Overloaded assignment operator to set the read function.
virtual size_t Size()
获取当前队列的已使用大小。 Gets the currently used size of the queue.
void MarkAsRunning(ReadInfoBlock &info)
标记读取操作为运行中。 Marks the read operation as running.
static int Printf(const char *fmt,...)
Prints a formatted string to the write port (like printf).
static WritePort * write_
Write port instance. 写入端口。
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待(减少)信号量 Waits (decrements) the semaphore
WritePort 的流式写入操作器,支持链式 << 操作和批量提交。
bool locked_
是否持有写锁 Whether write lock is held
ErrorCode Commit()
手动提交已写入的数据到队列,并尝试续锁。
size_t cap_
当前队列容量 Current queue capacity
LibXR::WritePort * port_
写端口指针 Pointer to the WritePort
~Stream()
析构时自动提交已累积的数据并释放锁。
Stream(LibXR::WritePort *port, LibXR::WriteOperation op)
构造流写入对象,并尝试锁定端口。
Stream & operator<<(const ConstRawData &data)
追加写入数据,支持链式调用。
WritePort class for handling write operations.
virtual size_t EmptySize()
获取数据队列的剩余可用空间。 Gets the remaining available space in the data queue.
virtual size_t Size()
获取当前数据队列的已使用大小。 Gets the used size of the current data queue.
ErrorCode operator()(ConstRawData data, WriteOperation &op)
执行写入操作。 Performs a write operation.
WritePort(size_t queue_size=3, size_t buffer_size=128)
构造一个新的 WritePort 对象。 Constructs a new WritePort object.
WritePort & operator=(WriteFun fun)
赋值运算符重载,用于设置写入函数。 Overloaded assignment operator to set the write function.
bool Writable()
判断端口是否可写。 Checks whether the port is writable.
void MarkAsRunning(WriteOperation &op)
标记写入操作为运行中。 Marks the write operation as running.
virtual void Reset()
Resets the WritePort.
void Finish(bool in_isr, ErrorCode ans, WriteInfoBlock &info, uint32_t size)
更新写入操作的状态。 Updates the status of the write operation.
ErrorCode(* ReadFun)(ReadPort &port)
Function pointer type for read operations.
ErrorCode(* WriteFun)(WritePort &port)
Function pointer type for write operations.
Read information block structure.
RawData data
Data buffer. 数据缓冲区。
ReadOperation op
Read operation instance. 读取操作实例。
WriteOperation op
Write operation instance. 写入操作实例。