libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
write_stream.cpp
1#include "write_port.hpp"
2
3using namespace LibXR;
4
6 : port_(port), op_(op)
7{
8 UNUSED(Acquire());
9}
10
11// Stream batch helpers.
12// Stream 批次辅助逻辑。
14{
15 if (owns_port_ && buffered_size_ > 0)
16 {
17 UNUSED(SubmitBuffered());
18 }
19
20 if (owns_port_)
21 {
22 Release();
23 }
24}
25
27{
28 if (owns_port_)
29 {
30 return ErrorCode::OK;
31 }
32
33 if (port_ == nullptr)
34 {
36 }
37
38 DEV_ASSERT(port_->queue_info_ != nullptr);
39 DEV_ASSERT(port_->queue_data_ != nullptr);
40
41 if (!port_->Writable())
42 {
44 }
45
46 BusyState expected = BusyState::IDLE;
47 if (!port_->busy_.compare_exchange_strong(expected, BusyState::LOCKED,
48 std::memory_order_acq_rel,
49 std::memory_order_acquire))
50 {
51 return ErrorCode::BUSY;
52 }
53
54 if (port_->queue_info_->EmptySize() < 1)
55 {
56 port_->busy_.store(BusyState::IDLE, std::memory_order_release);
57 return ErrorCode::FULL;
58 }
59
60 owns_port_ = true;
61 batch_capacity_ = port_->queue_data_->EmptySize();
62 return ErrorCode::OK;
63}
64
66{
67 if (data.size_ == 0)
68 {
69 return ErrorCode::OK;
70 }
71
72 if (data.addr_ == nullptr)
73 {
75 }
76
77 auto lock_result = Acquire();
78 if (lock_result != ErrorCode::OK)
79 {
80 return lock_result;
81 }
82
83 auto ans = port_->queue_data_->PushBatch(reinterpret_cast<const uint8_t*>(data.addr_),
84 data.size_);
85 if (ans == ErrorCode::OK)
86 {
87 buffered_size_ += data.size_;
88 }
89 return ans;
90}
91
93{
94 ASSERT(owns_port_);
95 ASSERT(buffered_size_ > 0);
96
97 if (op_.type == WriteOperation::OperationType::BLOCK)
98 {
99 // Publish the wait state before the queued metadata can be consumed.
100 // 元数据可能被消费前,先发布等待状态。
101 op_.MarkAsRunning();
102 port_->busy_.store(BusyState::BLOCK_PUBLISHING, std::memory_order_release);
103 }
104
105 auto ans = port_->queue_info_->Push(
106 WriteInfoBlock{ConstRawData{nullptr, buffered_size_}, op_});
107 ASSERT(ans == ErrorCode::OK);
108
109 ans = port_->CommitWrite({nullptr, buffered_size_}, op_, true);
110 buffered_size_ = 0;
111
112 if (op_.type == WriteOperation::OperationType::BLOCK)
113 {
114 // WritePort now owns the BLOCK wait/finish state machine.
115 // BLOCK 等待/完成状态机此后由 WritePort 接管。
116 owns_port_ = false;
117 }
118
119 return ans;
120}
121
123{
124 if (owns_port_)
125 {
126 owns_port_ = false;
127 port_->busy_.store(BusyState::IDLE, std::memory_order_release);
128 }
129}
130
132{
133 if (Acquire() != ErrorCode::OK)
134 {
135 return *this;
136 }
137
138 if (EmptySize() < data.size_)
139 {
140 return *this;
141 }
142
143 UNUSED(Write(data));
144 return *this;
145}
146
148{
149 auto ans = ErrorCode::OK;
150
151 if (owns_port_ && buffered_size_ > 0)
152 {
153 ans = SubmitBuffered();
154 if (op_.type == WriteOperation::OperationType::BLOCK)
155 {
156 return ans;
157 }
158 }
159
160 if (owns_port_)
161 {
162 Release();
163 }
164
165 return ans;
166}
只读原始数据视图 / Immutable raw data view
size_t size_
数据字节数 / Data size in bytes
const void * addr_
数据起始地址 / Data start address
ErrorCode SubmitBuffered()
将当前已缓存批次提交给 WritePort。
ErrorCode Acquire()
为当前流批次获取一次可写入的端口所有权。
ErrorCode Commit()
手动提交已写入的数据到队列,并释放当前锁。
void Release()
将当前批次的端口所有权归还给 WritePort。
~Stream()
析构时自动提交已累积的数据并释放锁。
ErrorCode Write(ConstRawData data)
追加一个原始数据片段到当前流批次。
Stream(LibXR::WritePort *port, LibXR::WriteOperation op)
构造流写入对象,并尝试锁定端口。
Stream & operator<<(const ConstRawData &data)
追加写入数据的语法糖,忽略返回状态并支持链式调用。
WritePort class for handling write operations.
size_t EmptySize()
获取数据队列的剩余可用空间。 Gets the remaining available space in the data queue.
@ LOCKED
Submission path owns queue mutation. 提交路径占有写队列/元数据修改权。
LibXR 命名空间
Definition ch32_can.hpp:14
ErrorCode
定义错误码枚举
@ BUSY
忙碌 | Busy
@ PTR_NULL
空指针 | Null pointer
@ NOT_SUPPORT
不支持 | Not supported
@ FULL
已满 | Full
@ OK
操作成功 | Operation successful