libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
sync.hpp
1#pragma once
2
3#include "../topic.hpp"
4
5namespace LibXR
6{
12{
23 enum WaitState : uint32_t
24 {
26 WAITING = 1,
27 WAIT_CLAIMED = 2
28 };
29
30 void* buff_addr;
31 void (*copy_payload)(void* dst,
32 void* payload_addr);
34 std::atomic<uint32_t> wait_state = WAIT_IDLE;
36};
37
44template <typename Data>
46{
47 public:
60 SyncSubscriber(const char* name, Data& data, Domain* domain = nullptr)
61 : SyncSubscriber(Topic(WaitTopic(name, UINT32_MAX, domain)), data)
62 {
63 }
64
76 SyncSubscriber(Topic topic, Data& data)
77 {
79
82 block_->data_.timestamp = MicrosecondTimestamp();
83 block_->data_.wait_state.store(SyncBlock::WAIT_IDLE, std::memory_order_relaxed);
84 block_->data_.buff_addr = &data;
85 block_->data_.copy_payload = &Topic::CopyPayload<Data>;
86 topic.block_->data_.subers.Add(*block_);
87 }
88
94 SyncSubscriber(const SyncSubscriber& other) = delete;
95
102 SyncSubscriber& operator=(const SyncSubscriber& other) = delete;
103
112 SyncSubscriber(SyncSubscriber&& other) noexcept : block_(other.block_)
113 {
114 other.block_ = nullptr;
115 }
116
128 {
129 if (this != &other)
130 {
131 block_ = other.block_;
132 other.block_ = nullptr;
133 }
134 return *this;
135 }
136
151 ErrorCode Wait(uint32_t timeout = UINT32_MAX)
152 {
153 ASSERT(block_ != nullptr);
154
155 auto& data = block_->data_;
156 uint32_t expected = SyncBlock::WAIT_IDLE;
157 if (!data.wait_state.compare_exchange_strong(expected, SyncBlock::WAITING,
158 std::memory_order_acq_rel,
159 std::memory_order_acquire))
160 {
161 return ErrorCode::BUSY;
162 }
163
164 auto wait_ans = data.sem.Wait(timeout);
165 if (wait_ans == ErrorCode::OK)
166 {
167 data.wait_state.store(SyncBlock::WAIT_IDLE, std::memory_order_release);
168 return ErrorCode::OK;
169 }
170
171 expected = SyncBlock::WAITING;
172 if (data.wait_state.compare_exchange_strong(expected, SyncBlock::WAIT_IDLE,
173 std::memory_order_acq_rel,
174 std::memory_order_acquire))
175 {
176 return wait_ans;
177 }
178
179 ASSERT(data.wait_state.load(std::memory_order_acquire) == SyncBlock::WAIT_CLAIMED);
180
181 auto finish_wait_ans = data.sem.Wait(UINT32_MAX);
182 UNUSED(finish_wait_ans);
183 ASSERT(finish_wait_ans == ErrorCode::OK);
184 data.wait_state.store(SyncBlock::WAIT_IDLE, std::memory_order_release);
185 return ErrorCode::OK;
186 }
187
192 MicrosecondTimestamp GetTimestamp() const { return block_->data_.timestamp; }
193
195};
196} // namespace LibXR
数据节点模板,继承自 BaseNode,用于存储具体数据类型。 Template data node that inherits from BaseNode to store specific data...
Data data_
存储的数据。 The stored data.
微秒时间戳 / Microsecond timestamp
Data data_
存储的数据 (Stored data).
Definition rbt.hpp:98
信号量类,实现线程同步机制 Semaphore class implementing thread synchronization
Definition semaphore.hpp:23
topic 所属的命名域 / Naming domain that groups topics
Definition topic.hpp:179
调用 Wait() 收消息的订阅者 / Subscriber that receives messages by calling Wait()
Definition topic.hpp:225
SyncSubscriber(Topic topic, Data &data)
通过 Topic 句柄构造同步订阅者 / Construct a synchronous subscriber using a Topic handle
Definition sync.hpp:76
ErrorCode Wait(uint32_t timeout=UINT32_MAX)
等待接收数据 / Wait for data reception
Definition sync.hpp:151
LockFreeList::Node< SyncBlock > * block_
订阅者数据块。Subscriber data block.
Definition sync.hpp:194
SyncSubscriber & operator=(const SyncSubscriber &other)=delete
禁止拷贝赋值同步订阅者 / Copy assignment is disabled for synchronous subscribers
MicrosecondTimestamp GetTimestamp() const
获取最近一次接收的消息时间戳 / Get the latest received message timestamp
Definition sync.hpp:192
SyncSubscriber(const char *name, Data &data, Domain *domain=nullptr)
通过主题名称构造同步订阅者 / Construct a synchronous subscriber by topic name
Definition sync.hpp:60
SyncSubscriber(SyncSubscriber &&other) noexcept
移动构造同步订阅者 / Move-construct one synchronous subscriber
Definition sync.hpp:112
SyncSubscriber & operator=(SyncSubscriber &&other) noexcept
移动赋值同步订阅者 / Move-assign one synchronous subscriber
Definition sync.hpp:127
SyncSubscriber(const SyncSubscriber &other)=delete
禁止拷贝同步订阅者 / Copy construction is disabled for synchronous subscribers
发布订阅主题 / Publish-subscribe topic
Definition topic.hpp:57
@ SYNC
同步等待型订阅者。Synchronous wait-based subscriber.
static void CopyPayload(void *dst, void *payload_addr)
按精确类型把一份 payload 拷到订阅者缓冲区 / Copy one payload into a subscriber buffer using the exact type
Definition topic.hpp:605
static void CheckSubscriberType(Topic topic)
断言订阅者看到的精确 payload 类型与 topic 契约一致 / Assert that the exact payload type seen by a subscriber matches t...
Definition topic.hpp:574
static TopicHandle WaitTopic(const char *name, uint32_t timeout=UINT32_MAX, Domain *domain=nullptr)
等待指定名称的 topic 出现 / Wait until a topic with the given name exists
Definition topic.cpp:191
TopicHandle block_
当前 topic 视图绑定的状态块。Runtime state block bound to the current topic view.
Definition topic.hpp:533
LibXR 命名空间
Definition ch32_can.hpp:14
ErrorCode
定义错误码枚举
@ BUSY
忙碌 | Busy
@ OK
操作成功 | Operation successful
所有订阅块共用的公共头 / Common header shared by all subscriber blocks
Definition topic.hpp:207
同步订阅者自己挂的数据块 / Data block owned by one synchronous subscriber
Definition sync.hpp:12
void(* copy_payload)(void *dst, void *payload_addr)
按订阅精确类型执行负载拷贝的适配函数。Adapter that copies one payload using the subscriber's exact type.
Definition sync.hpp:31
void * buff_addr
收到消息后要拷到这里。Received payloads are copied here.
Definition sync.hpp:30
std::atomic< uint32_t > wait_state
当前 Wait() 的挂起状态。Current pending state of Wait().
Definition sync.hpp:34
Semaphore sem
用来唤醒 Wait() 的信号量。Semaphore used to wake Wait().
Definition sync.hpp:35
WaitState
同步等待状态 / Synchronous wait state
Definition sync.hpp:24
@ WAIT_IDLE
当前没有挂起等待。No wait is currently pending.
Definition sync.hpp:25
@ WAITING
当前有一个挂起的等待者。One waiter is currently pending.
Definition sync.hpp:26
@ WAIT_CLAIMED
某次发布已归一个刚超时的等待者所有。One publish wakeup is reserved for a waiter that just timed out.
Definition sync.hpp:27
MicrosecondTimestamp timestamp
这里对应那份数据的时间戳。Timestamp paired with the buffered data.
Definition sync.hpp:33