libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
libxr_rw.cpp
1#include "libxr_rw.hpp"
2
3#include "libxr_def.hpp"
4#include "mutex.hpp"
5
6using namespace LibXR;
7
10
11ReadPort::ReadPort(size_t buffer_size)
12 : queue_data_(buffer_size > 0 ? new (std::align_val_t(LIBXR_CACHE_LINE_SIZE))
13 LockFreeQueue<uint8_t>(buffer_size)
14 : nullptr)
15{
16}
17
19{
20 ASSERT(queue_data_ != nullptr);
21 return queue_data_->EmptySize();
22}
23
25{
26 ASSERT(queue_data_ != nullptr);
27 return queue_data_->Size();
28}
29
30bool ReadPort::Readable() { return read_fun_ != nullptr; }
31
33{
34 read_fun_ = fun;
35 return *this;
36}
37
38void ReadPort::Finish(bool in_isr, ErrorCode ans, ReadInfoBlock& info)
39{
40 busy_.store(BusyState::IDLE, std::memory_order_release);
41 info.op.UpdateStatus(in_isr, ans);
42}
43
45
46ErrorCode ReadPort::operator()(RawData data, ReadOperation& op, bool in_isr)
47{
48 if (Readable())
49 {
50 BusyState is_busy = busy_.load(std::memory_order_acquire);
51
52 if (is_busy == BusyState::PENDING)
53 {
54 return ErrorCode::BUSY;
55 }
56
57 while (true)
58 {
59 busy_.store(BusyState::IDLE, std::memory_order_release);
60
61 auto readable_size = queue_data_->Size();
62
63 if (readable_size >= data.size_ && readable_size != 0)
64 {
65 if (data.size_ > 0)
66 {
67 auto ans =
68 queue_data_->PopBatch(reinterpret_cast<uint8_t*>(data.addr_), data.size_);
69 ASSERT(ans == ErrorCode::OK);
70 }
71
72 OnRxDequeue(in_isr);
73
74 if (op.type != ReadOperation::OperationType::BLOCK)
75 {
76 op.UpdateStatus(in_isr, ErrorCode::OK);
77 }
78 return ErrorCode::OK;
79 }
80
81 info_ = ReadInfoBlock{data, op};
82
83 op.MarkAsRunning();
84
85 auto ans = read_fun_(*this, in_isr);
86
87 if (ans == ErrorCode::PENDING)
88 {
89 BusyState expected = BusyState::IDLE;
90 if (busy_.compare_exchange_weak(expected, BusyState::PENDING,
91 std::memory_order_acq_rel,
92 std::memory_order_acquire))
93 {
94 break;
95 }
96 else
97 {
98 continue;
99 }
100 }
101 else
102 {
103 if (op.type != ReadOperation::OperationType::BLOCK)
104 {
105 op.UpdateStatus(in_isr, ans);
106 }
107 return ErrorCode::OK;
108 }
109 }
110
111 if (op.type == ReadOperation::OperationType::BLOCK)
112 {
113 ASSERT(!in_isr);
114 return op.data.sem_info.sem->Wait(op.data.sem_info.timeout);
115 }
116 else
117 {
118 return ErrorCode::OK;
119 }
120 }
121 else
122 {
123 return ErrorCode::NOT_SUPPORT;
124 }
125}
126
128{
129 ASSERT(queue_data_ != nullptr);
130
131 auto is_busy = busy_.load(std::memory_order_relaxed);
132
133 if (is_busy == BusyState::PENDING)
134 {
135 auto size = queue_data_->Size();
136 if (size > 0 && size >= info_.data.size_)
137 {
138 if (info_.data.size_ > 0)
139 {
140 auto ans = queue_data_->PopBatch(reinterpret_cast<uint8_t*>(info_.data.addr_),
141 info_.data.size_);
142 UNUSED(ans);
143 ASSERT(ans == ErrorCode::OK);
144 }
145
146 Finish(in_isr, ErrorCode::OK, info_);
147 OnRxDequeue(in_isr);
148 }
149 }
150 else if (is_busy == BusyState::IDLE)
151 {
152 busy_.store(BusyState::EVENT, std::memory_order_release);
153 }
154}
155
157{
158 ASSERT(queue_data_ != nullptr);
159 queue_data_->Reset();
160}
161
162WritePort::WritePort(size_t queue_size, size_t buffer_size)
163 : queue_info_(new (std::align_val_t(LIBXR_CACHE_LINE_SIZE))
164 LockFreeQueue<WriteInfoBlock>(queue_size)),
165 queue_data_(buffer_size > 0 ? new (std::align_val_t(LIBXR_CACHE_LINE_SIZE))
166 LockFreeQueue<uint8_t>(buffer_size)
167 : nullptr)
168{
169}
170
172{
173 ASSERT(queue_data_ != nullptr);
174 return queue_data_->EmptySize();
175}
176
178{
179 ASSERT(queue_data_ != nullptr);
180 return queue_data_->Size();
181}
182
183bool WritePort::Writable() { return write_fun_ != nullptr; }
184
186{
187 write_fun_ = fun;
188 return *this;
189}
190
191void WritePort::Finish(bool in_isr, ErrorCode ans, WriteInfoBlock& info)
192{
193 info.op.UpdateStatus(in_isr, ans);
194}
195
197
198ErrorCode WritePort::operator()(ConstRawData data, WriteOperation& op, bool in_isr)
199{
200 if (Writable())
201 {
202 if (data.size_ == 0)
203 {
204 if (op.type != WriteOperation::OperationType::BLOCK)
205 {
206 op.UpdateStatus(in_isr, ErrorCode::OK);
207 }
208 return ErrorCode::OK;
209 }
210
211 LockState expected = LockState::UNLOCKED;
212 if (!lock_.compare_exchange_strong(expected, LockState::LOCKED))
213 {
214 return ErrorCode::BUSY;
215 }
216
217 return CommitWrite(data, op, false, in_isr);
218 }
219 else
220 {
221 return ErrorCode::NOT_SUPPORT;
222 }
223}
224
225ErrorCode WritePort::CommitWrite(ConstRawData data, WriteOperation& op, bool meta_pushed,
226 bool in_isr)
227{
228 if (!meta_pushed && queue_info_->EmptySize() < 1)
229 {
230 lock_.store(LockState::UNLOCKED, std::memory_order_release);
231 return ErrorCode::FULL;
232 }
233
234 ErrorCode ans = ErrorCode::OK;
235 if (!meta_pushed)
236 {
237 if (queue_data_->EmptySize() < data.size_)
238 {
239 lock_.store(LockState::UNLOCKED, std::memory_order_release);
240 return ErrorCode::FULL;
241 }
242
243 ans =
244 queue_data_->PushBatch(reinterpret_cast<const uint8_t*>(data.addr_), data.size_);
245 UNUSED(ans);
246 ASSERT(ans == ErrorCode::OK);
247
248 WriteInfoBlock info{data, op};
249 ans = queue_info_->Push(info);
250
251 ASSERT(ans == ErrorCode::OK);
252 }
253
254 op.MarkAsRunning();
255
256 ans = write_fun_(*this, in_isr);
257
258 if (!meta_pushed)
259 {
260 lock_.store(LockState::UNLOCKED, std::memory_order_release);
261 }
262
263 if (ans != ErrorCode::PENDING)
264 {
265 if (op.type != WriteOperation::OperationType::BLOCK)
266 {
267 op.UpdateStatus(in_isr, ans);
268 }
269 return ErrorCode::OK;
270 }
271
272 if (op.type == WriteOperation::OperationType::BLOCK)
273 {
274 ASSERT(!in_isr);
275 return op.data.sem_info.sem->Wait(op.data.sem_info.timeout);
276 }
277
278 return ErrorCode::OK;
279}
280
282{
283 ASSERT(queue_data_ != nullptr);
284 queue_data_->Reset();
285 queue_info_->Reset();
286}
287
289 : port_(port), op_(op)
290{
291 LockState expected = LockState::UNLOCKED;
292 if (port_->lock_.compare_exchange_strong(expected, LockState::LOCKED))
293 {
294 if (port_->queue_info_->EmptySize() < 1)
295 {
296 locked_ = false;
297 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
298 return;
299 }
300 locked_ = true;
301 cap_ = port_->queue_data_->EmptySize();
302 }
303}
304
306{
307 if (locked_ && size_ > 0)
308 {
309 port_->queue_info_->Push(WriteInfoBlock{ConstRawData{nullptr, size_}, op_});
310 port_->CommitWrite({nullptr, size_}, op_, true);
311 }
312
313 if (locked_)
314 {
315 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
316 }
317}
318
320{
321 if (!locked_)
322 {
323 LockState expected = LockState::UNLOCKED;
324 if (port_->lock_.compare_exchange_strong(expected, LockState::LOCKED))
325 {
326 if (port_->queue_info_->EmptySize() < 1)
327 {
328 locked_ = false;
329 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
330 return *this;
331 }
332 else
333 {
334 locked_ = true;
335 cap_ = port_->queue_data_->EmptySize();
336 }
337 }
338 else
339 {
340 return *this;
341 }
342 }
343 if (size_ + data.size_ <= cap_)
344 {
345 port_->queue_data_->PushBatch(reinterpret_cast<const uint8_t*>(data.addr_),
346 data.size_);
347 size_ += data.size_;
348 }
349
350 return *this;
351}
352
354{
355 auto ans = ErrorCode::OK;
356
357 if (locked_ && size_ > 0)
358 {
359 ans = port_->queue_info_->Push(WriteInfoBlock{ConstRawData{nullptr, size_}, op_});
360 ASSERT(ans == ErrorCode::OK);
361 ans = port_->CommitWrite({nullptr, size_}, op_, true);
362 ASSERT(ans == ErrorCode::OK);
363 size_ = 0;
364 }
365
366 if (port_->queue_info_->EmptySize() < 1)
367 {
368 locked_ = false;
369 port_->lock_.store(LockState::UNLOCKED, std::memory_order_release);
370 }
371 else
372 {
373 cap_ = port_->queue_data_->EmptySize();
374 }
375
376 return ans;
377}
378
379// NOLINTNEXTLINE
380int STDIO::Printf(const char* fmt, ...)
381{
382#if LIBXR_PRINTF_BUFFER_SIZE > 0
384 {
385 return -1;
386 }
387
388 if (!write_mutex_)
389 {
390 write_mutex_ = new LibXR::Mutex();
391 }
392
393 LibXR::Mutex::LockGuard lock_guard(*write_mutex_);
394
395 va_list args;
396 va_start(args, fmt);
397 int len = vsnprintf(STDIO::printf_buff_, LIBXR_PRINTF_BUFFER_SIZE, fmt, args);
398 va_end(args);
399
400 // Check result and limit length
401 if (len < 0)
402 {
403 return -1;
404 }
405 if (static_cast<size_t>(len) >= LIBXR_PRINTF_BUFFER_SIZE)
406 {
407 len = LIBXR_PRINTF_BUFFER_SIZE - 1;
408 }
409
410 ConstRawData data = {reinterpret_cast<const uint8_t*>(STDIO::printf_buff_),
411 static_cast<size_t>(len)};
412
413 static WriteOperation op; // NOLINT
414 auto ans = ErrorCode::OK;
415 if (write_stream_ == nullptr)
416 {
417 ans = STDIO::write_->operator()(data, op);
418 }
419 else
420 {
421 (*write_stream_) << data;
422 ans = write_stream_->Commit();
423 }
424
425 if (ans == ErrorCode::OK)
426 {
427 return len;
428 }
429 else
430 {
431 return -1;
432 }
433
434#else
435 UNUSED(fmt);
436 return 0;
437#endif
438}
常量原始数据封装类。 A class for encapsulating constant raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
const void * addr_
数据存储地址(常量)。 The storage address of the data (constant).
无锁队列实现 / Lock-free queue implementation
void Reset()
重置队列 / Resets the queue
ErrorCode PushBatch(const Data *data, size_t size)
批量推入数据 / Pushes multiple elements into the queue
size_t EmptySize()
计算队列剩余可用空间 / Calculates the remaining available space in the queue
size_t Size() const
获取当前队列中的元素数量 / Returns the number of elements currently in the queue
ErrorCode PopBatch(Data *data, size_t size)
批量弹出数据 / Pops multiple elements from the queue
互斥锁的 RAII 机制封装 (RAII-style mechanism for automatic mutex management).
Definition mutex.hpp:65
互斥锁类,提供线程同步机制 (Mutex class providing thread synchronization mechanisms).
Definition mutex.hpp:18
union LibXR::Operation::@5 data
void UpdateStatus(bool in_isr, Status &&status)
Updates operation status based on type.
Definition libxr_rw.hpp:172
void MarkAsRunning()
标记操作为运行状态。 Marks the operation as running.
Definition libxr_rw.hpp:205
OperationType type
Definition libxr_rw.hpp:229
原始数据封装类。 A class for encapsulating raw data.
size_t size_
数据大小(字节)。 The size of the data (in bytes).
void * addr_
数据存储地址。 The storage address of the data.
ReadPort class for handling read operations.
Definition libxr_rw.hpp:272
bool Readable()
Checks if read operations are supported.
Definition libxr_rw.cpp:30
void Finish(bool in_isr, ErrorCode ans, ReadInfoBlock &info)
更新读取操作的状态。 Updates the status of the read operation.
Definition libxr_rw.cpp:38
ReadPort(size_t buffer_size=128)
Constructs a ReadPort with queue sizes.
Definition libxr_rw.cpp:11
void Reset()
Resets the ReadPort.
Definition libxr_rw.cpp:156
size_t EmptySize()
获取队列的剩余可用空间。 Gets the remaining available space in the queue.
Definition libxr_rw.cpp:18
ErrorCode operator()(RawData data, ReadOperation &op, bool in_isr=false)
读取操作符重载,用于执行读取操作。 Overloaded function call operator to perform a read operation.
Definition libxr_rw.cpp:46
void ProcessPendingReads(bool in_isr)
Processes pending reads.
Definition libxr_rw.cpp:127
ReadPort & operator=(ReadFun fun)
赋值运算符重载,用于设置读取函数。 Overloaded assignment operator to set the read function.
Definition libxr_rw.cpp:32
size_t Size()
获取当前队列的已使用大小。 Gets the currently used size of the queue.
Definition libxr_rw.cpp:24
virtual void OnRxDequeue(bool)
RX 数据从软件队列成功出队后的通知。 Notification after bytes are popped from RX data queue.
Definition libxr_rw.hpp:392
void MarkAsRunning(ReadInfoBlock &info)
标记读取操作为运行中。 Marks the read operation as running.
Definition libxr_rw.cpp:44
static int Printf(const char *fmt,...)
Prints a formatted string to the write port (like printf).
Definition libxr_rw.cpp:380
static WritePort * write_
Write port instance. 写入端口。
Definition libxr_rw.hpp:624
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待(减少)信号量 Waits (decrements) the semaphore
Definition semaphore.cpp:25
WritePort 的流式写入操作器,支持链式 << 操作和批量提交。
Definition libxr_rw.hpp:438
bool locked_
是否持有写锁 Whether write lock is held
Definition libxr_rw.hpp:481
ErrorCode Commit()
手动提交已写入的数据到队列,并尝试续锁。
Definition libxr_rw.cpp:353
size_t cap_
当前队列容量 Current queue capacity
Definition libxr_rw.hpp:479
LibXR::WritePort * port_
写端口指针 Pointer to the WritePort
Definition libxr_rw.hpp:477
~Stream()
析构时自动提交已累积的数据并释放锁。
Definition libxr_rw.cpp:305
Stream(LibXR::WritePort *port, LibXR::WriteOperation op)
构造流写入对象,并尝试锁定端口。
Definition libxr_rw.cpp:288
Stream & operator<<(const ConstRawData &data)
追加写入数据,支持链式调用。
Definition libxr_rw.cpp:319
WritePort class for handling write operations.
Definition libxr_rw.hpp:413
size_t EmptySize()
获取数据队列的剩余可用空间。 Gets the remaining available space in the data queue.
Definition libxr_rw.cpp:171
size_t Size()
获取当前数据队列的已使用大小。 Gets the used size of the current data queue.
Definition libxr_rw.cpp:177
void Finish(bool in_isr, ErrorCode ans, WriteInfoBlock &info)
更新写入操作的状态。 Updates the status of the write operation.
Definition libxr_rw.cpp:191
WritePort(size_t queue_size=3, size_t buffer_size=128)
构造一个新的 WritePort 对象。 Constructs a new WritePort object.
Definition libxr_rw.cpp:162
WritePort & operator=(WriteFun fun)
赋值运算符重载,用于设置写入函数。 Overloaded assignment operator to set the write function.
Definition libxr_rw.cpp:185
bool Writable()
判断端口是否可写。 Checks whether the port is writable.
Definition libxr_rw.cpp:183
ErrorCode CommitWrite(ConstRawData data, WriteOperation &op, bool pushed=false, bool in_isr=false)
提交写入操作。 Commits a write operation.
Definition libxr_rw.cpp:225
void MarkAsRunning(WriteOperation &op)
标记写入操作为运行中。 Marks the write operation as running.
Definition libxr_rw.cpp:196
void Reset()
Resets the WritePort.
Definition libxr_rw.cpp:281
ErrorCode operator()(ConstRawData data, WriteOperation &op, bool in_isr=false)
执行写入操作。 Performs a write operation.
Definition libxr_rw.cpp:198
LibXR 命名空间
Definition ch32_can.hpp:14
ErrorCode(* ReadFun)(ReadPort &port, bool in_isr)
Function pointer type for read operations.
Definition libxr_rw.hpp:249
ErrorCode(* WriteFun)(WritePort &port, bool in_isr)
Function pointer type for write operations.
Definition libxr_rw.hpp:245
Read information block structure.
Definition libxr_rw.hpp:256
RawData data
Data buffer. 数据缓冲区。
Definition libxr_rw.hpp:257
ReadOperation op
Read operation instance. 读取操作实例。
Definition libxr_rw.hpp:258
WriteOperation op
Write operation instance. 写入操作实例。
Definition libxr_rw.hpp:264