3#if defined(LIBXR_SYSTEM_POSIX_HOST)
20#include <linux/futex.h>
23#include <sys/syscall.h>
29#include "libxr_def.hpp"
38enum class LinuxSharedSubscriberMode : uint8_t
41 BROADCAST_DROP_OLD = 1,
49struct LinuxSharedTopicConfig
51 uint32_t slot_num = 64;
52 uint32_t subscriber_num = 8;
53 uint32_t queue_num = 64;
71template <
typename TopicData>
72class LinuxSharedTopic :
public Topic
74 static_assert(std::is_trivially_copyable<TopicData>::value,
75 "LinuxSharedTopic requires trivially copyable data");
76 static_assert(std::atomic<uint32_t>::is_always_lock_free,
77 "LinuxSharedTopic requires lock-free 32-bit atomics");
78 static_assert(std::atomic<uint64_t>::is_always_lock_free,
79 "LinuxSharedTopic requires lock-free 64-bit atomics");
81 enum class SharedDataState : uint8_t
90 using Data = SharedData;
91 static constexpr const char* DEFAULT_DOMAIN_NAME =
"libxr_def_domain";
101 using Data = SharedData;
106 Subscriber() =
default;
116 explicit Subscriber(
const char* name,
117 LinuxSharedSubscriberMode mode =
118 LinuxSharedSubscriberMode::BROADCAST_FULL)
119 : owned_topic_(new LinuxSharedTopic(name))
121 if (Attach(*owned_topic_, mode) != ErrorCode::OK)
124 owned_topic_ =
nullptr;
128 Subscriber(
const char* name,
const char* domain_name,
129 LinuxSharedSubscriberMode mode = LinuxSharedSubscriberMode::BROADCAST_FULL)
130 : owned_topic_(new LinuxSharedTopic(name, domain_name))
132 if (Attach(*owned_topic_, mode) != ErrorCode::OK)
135 owned_topic_ =
nullptr;
139 Subscriber(
const char* name, Topic::Domain& domain,
140 LinuxSharedSubscriberMode mode = LinuxSharedSubscriberMode::BROADCAST_FULL)
141 : owned_topic_(new LinuxSharedTopic(name, domain))
143 if (Attach(*owned_topic_, mode) != ErrorCode::OK)
146 owned_topic_ =
nullptr;
156 explicit Subscriber(LinuxSharedTopic& topic,
157 LinuxSharedSubscriberMode mode =
158 LinuxSharedSubscriberMode::BROADCAST_FULL)
160 (void)Attach(topic, mode);
166 ~Subscriber() { Reset(); }
168 Subscriber(
const Subscriber&) =
delete;
169 Subscriber& operator=(
const Subscriber&) =
delete;
171 Subscriber(Subscriber&& other)
noexcept { *
this = std::move(other); }
173 Subscriber& operator=(Subscriber&& other)
noexcept
182 topic_ = other.topic_;
183 owned_topic_ = other.owned_topic_;
184 subscriber_index_ = other.subscriber_index_;
185 current_slot_index_ = other.current_slot_index_;
186 current_sequence_ = other.current_sequence_;
187 current_timestamp_ = other.current_timestamp_;
189 other.topic_ =
nullptr;
190 other.owned_topic_ =
nullptr;
191 other.subscriber_index_ = INVALID_INDEX;
192 other.current_slot_index_ = INVALID_INDEX;
193 other.current_sequence_ = 0;
194 other.current_timestamp_ = MicrosecondTimestamp();
205 return topic_ !=
nullptr && subscriber_index_ != INVALID_INDEX;
215 ErrorCode Wait(uint32_t timeout_ms = UINT32_MAX)
219 return ErrorCode::STATE_ERR;
222 const uint64_t deadline_ms =
223 (timeout_ms == UINT32_MAX) ? 0 : (NowMonotonicMs() + timeout_ms);
225 Descriptor desc = {};
228 ErrorCode pop_ans = topic_->TryPopDescriptor(subscriber_index_, desc);
229 if (pop_ans == ErrorCode::OK)
232 topic_->HoldSlot(subscriber_index_, desc.slot_index);
233 current_slot_index_ = desc.slot_index;
234 current_sequence_ = desc.sequence;
235 current_timestamp_ = topic_->SlotTimestamp(desc.slot_index);
236 return ErrorCode::OK;
239 uint32_t wait_ms = UINT32_MAX;
240 if (timeout_ms != UINT32_MAX)
242 const uint64_t now_ms = NowMonotonicMs();
243 if (now_ms >= deadline_ms)
245 return ErrorCode::TIMEOUT;
247 wait_ms =
static_cast<uint32_t
>(deadline_ms - now_ms);
250 const ErrorCode wait_ans = topic_->WaitReady(topic_->subscribers_[subscriber_index_],
252 if (wait_ans == ErrorCode::OK)
268 ErrorCode Wait(SharedData& data, uint32_t timeout_ms = UINT32_MAX)
274 return ErrorCode::STATE_ERR;
277 const uint64_t deadline_ms =
278 (timeout_ms == UINT32_MAX) ? 0 : (NowMonotonicMs() + timeout_ms);
280 Descriptor desc = {};
283 ErrorCode pop_ans = topic_->TryPopDescriptor(subscriber_index_, desc);
284 if (pop_ans == ErrorCode::OK)
286 data.topic_ = topic_;
287 data.slot_index_ = desc.slot_index;
288 data.sequence_ = desc.sequence;
289 data.state_ = SharedDataState::SUBSCRIBER;
290 data.subscriber_index_ = subscriber_index_;
291 topic_->HoldSlot(subscriber_index_, desc.slot_index);
292 return ErrorCode::OK;
295 uint32_t wait_ms = UINT32_MAX;
296 if (timeout_ms != UINT32_MAX)
298 const uint64_t now_ms = NowMonotonicMs();
299 if (now_ms >= deadline_ms)
301 return ErrorCode::TIMEOUT;
303 wait_ms =
static_cast<uint32_t
>(deadline_ms - now_ms);
306 const ErrorCode wait_ans = topic_->WaitReady(topic_->subscribers_[subscriber_index_],
308 if (wait_ans == ErrorCode::OK)
321 TopicData* GetData()
const
323 if (!Valid() || current_slot_index_ == INVALID_INDEX)
328 return &topic_->payloads_[current_slot_index_];
334 uint64_t GetSequence()
const {
return current_sequence_; }
339 MicrosecondTimestamp GetTimestamp()
const {
return current_timestamp_; }
344 uint32_t GetPendingNum()
const
351 const SubscriberControl& control = topic_->subscribers_[subscriber_index_];
352 const uint32_t head = control.queue_head.load(std::memory_order_acquire);
353 const uint32_t tail = control.queue_tail.load(std::memory_order_acquire);
358 return topic_->queue_capacity_ - (head - tail);
364 uint64_t GetDropNum()
const
371 return topic_->subscribers_[subscriber_index_].dropped_messages.load(
372 std::memory_order_acquire);
380 if (!Valid() || current_slot_index_ == INVALID_INDEX)
385 topic_->ClearHeldSlot(subscriber_index_, current_slot_index_);
386 topic_->ReleaseSlot(current_slot_index_);
387 current_slot_index_ = INVALID_INDEX;
388 current_sequence_ = 0;
389 current_timestamp_ = MicrosecondTimestamp();
403 topic_->UnregisterBalancedSubscriber(subscriber_index_);
404 topic_->subscribers_[subscriber_index_].active.store(0, std::memory_order_release);
405 topic_->subscribers_[subscriber_index_].owner_pid.store(0, std::memory_order_release);
406 topic_->subscribers_[subscriber_index_].owner_starttime.store(0,
407 std::memory_order_release);
409 Descriptor desc = {};
410 while (topic_->TryPopDescriptor(subscriber_index_, desc) == ErrorCode::OK)
412 topic_->ReleaseSlot(desc.slot_index);
419 owned_topic_ =
nullptr;
420 subscriber_index_ = INVALID_INDEX;
421 current_slot_index_ = INVALID_INDEX;
422 current_sequence_ = 0;
423 current_timestamp_ = MicrosecondTimestamp();
427 ErrorCode Attach(LinuxSharedTopic& topic, LinuxSharedSubscriberMode mode)
433 return ErrorCode::STATE_ERR;
436 if (topic.self_identity_.starttime == 0)
438 return ErrorCode::STATE_ERR;
441 for (uint32_t i = 0; i < topic.subscriber_capacity_; ++i)
443 uint32_t expected = 0;
444 auto& active = topic.subscribers_[i].active;
445 if (active.compare_exchange_strong(expected, 1, std::memory_order_acq_rel,
446 std::memory_order_relaxed))
448 topic.subscribers_[i].queue_head.store(0, std::memory_order_release);
449 topic.subscribers_[i].queue_tail.store(0, std::memory_order_release);
450 topic.subscribers_[i].ready_sem_count.store(0, std::memory_order_release);
451 topic.subscribers_[i].dropped_messages.store(0, std::memory_order_release);
452 topic.subscribers_[i].owner_pid.store(topic.self_identity_.pid,
453 std::memory_order_release);
454 topic.subscribers_[i].owner_starttime.store(topic.self_identity_.starttime,
455 std::memory_order_release);
456 topic.subscribers_[i].held_slot.store(INVALID_INDEX, std::memory_order_release);
457 topic.subscribers_[i].mode.store(
static_cast<uint32_t
>(mode),
458 std::memory_order_release);
459 if (mode == LinuxSharedSubscriberMode::BALANCE_RR)
461 const ErrorCode join_ans = topic.RegisterBalancedSubscriber(i);
462 if (join_ans != ErrorCode::OK)
464 topic.subscribers_[i].active.store(0, std::memory_order_release);
465 topic.subscribers_[i].owner_pid.store(0, std::memory_order_release);
466 topic.subscribers_[i].owner_starttime.store(0,
467 std::memory_order_release);
468 topic.subscribers_[i].mode.store(
469 static_cast<uint32_t
>(LinuxSharedSubscriberMode::BROADCAST_FULL),
470 std::memory_order_release);
475 subscriber_index_ = i;
476 current_slot_index_ = INVALID_INDEX;
477 current_sequence_ = 0;
478 current_timestamp_ = MicrosecondTimestamp();
479 return ErrorCode::OK;
483 return ErrorCode::FULL;
486 LinuxSharedTopic* topic_ =
nullptr;
487 LinuxSharedTopic* owned_topic_ =
nullptr;
488 uint32_t subscriber_index_ = INVALID_INDEX;
489 uint32_t current_slot_index_ = INVALID_INDEX;
490 uint64_t current_sequence_ = 0;
491 MicrosecondTimestamp current_timestamp_;
509 SharedData() =
default;
515 ~SharedData() { Reset(); }
517 SharedData(
const SharedData&) =
delete;
518 SharedData& operator=(
const SharedData&) =
delete;
520 SharedData(SharedData&& other)
noexcept { *
this = std::move(other); }
525 SharedData& operator=(SharedData&& other)
noexcept
534 topic_ = other.topic_;
535 slot_index_ = other.slot_index_;
536 sequence_ = other.sequence_;
537 state_ = other.state_;
538 subscriber_index_ = other.subscriber_index_;
540 other.topic_ =
nullptr;
541 other.slot_index_ = INVALID_INDEX;
543 other.state_ = SharedDataState::EMPTY;
544 other.subscriber_index_ = INVALID_INDEX;
551 bool Valid()
const {
return topic_ !=
nullptr && slot_index_ != INVALID_INDEX; }
556 bool Empty()
const {
return !Valid(); }
561 uint64_t GetSequence()
const {
return sequence_; }
566 MicrosecondTimestamp GetTimestamp()
const
568 if (!Valid() || state_ != SharedDataState::SUBSCRIBER)
570 return MicrosecondTimestamp();
572 return topic_->SlotTimestamp(slot_index_);
586 return &topic_->payloads_[slot_index_];
594 TopicData* GetData()
const
600 return &topic_->payloads_[slot_index_];
613 if (state_ == SharedDataState::PUBLISHER)
615 topic_->RecycleSlot(slot_index_);
617 else if (state_ == SharedDataState::SUBSCRIBER)
619 topic_->ClearHeldSlot(subscriber_index_, slot_index_);
620 topic_->ReleaseSlot(slot_index_);
623 slot_index_ = INVALID_INDEX;
625 state_ = SharedDataState::EMPTY;
626 subscriber_index_ = INVALID_INDEX;
630 friend class LinuxSharedTopic<TopicData>;
631 friend class Subscriber;
633 LinuxSharedTopic* topic_ =
nullptr;
634 uint32_t slot_index_ = INVALID_INDEX;
635 uint64_t sequence_ = 0;
636 SharedDataState state_ = SharedDataState::EMPTY;
637 uint32_t subscriber_index_ = INVALID_INDEX;
644 explicit LinuxSharedTopic(
const char* topic_name)
645 : LinuxSharedTopic(topic_name, DEFAULT_DOMAIN_NAME)
649 LinuxSharedTopic(
const char* topic_name,
const char* domain_name)
653 domain_crc32_(ResolveDomainKey(domain_name)),
654 topic_name_(ResolveTopicName(topic_name)),
655 name_key_(BuildNameKey(domain_crc32_, topic_name_)),
656 shm_name_(BuildShmName(name_key_))
658 (void)ReadProcessIdentity(
static_cast<uint32_t
>(getpid()), self_identity_);
662 LinuxSharedTopic(
const char* topic_name, Topic::Domain& domain)
666 domain_crc32_(domain.node_ != nullptr ? domain.node_->key : 0),
667 topic_name_(ResolveTopicName(topic_name)),
668 name_key_(BuildNameKey(domain_crc32_, topic_name_)),
669 shm_name_(BuildShmName(name_key_))
671 (void)ReadProcessIdentity(
static_cast<uint32_t
>(getpid()), self_identity_);
681 LinuxSharedTopic(
const char* topic_name,
const LinuxSharedTopicConfig& config)
682 : LinuxSharedTopic(topic_name, DEFAULT_DOMAIN_NAME, config)
686 LinuxSharedTopic(
const char* topic_name,
const char* domain_name,
687 const LinuxSharedTopicConfig& config)
691 domain_crc32_(ResolveDomainKey(domain_name)),
692 topic_name_(ResolveTopicName(topic_name)),
693 name_key_(BuildNameKey(domain_crc32_, topic_name_)),
694 shm_name_(BuildShmName(name_key_))
696 (void)ReadProcessIdentity(
static_cast<uint32_t
>(getpid()), self_identity_);
700 LinuxSharedTopic(
const char* topic_name, Topic::Domain& domain,
701 const LinuxSharedTopicConfig& config)
705 domain_crc32_(domain.node_ != nullptr ? domain.node_->key : 0),
706 topic_name_(ResolveTopicName(topic_name)),
707 name_key_(BuildNameKey(domain_crc32_, topic_name_)),
708 shm_name_(BuildShmName(name_key_))
710 (void)ReadProcessIdentity(
static_cast<uint32_t
>(getpid()), self_identity_);
718 using SyncSubscriber = Subscriber;
723 ~LinuxSharedTopic() { Close(); }
725 LinuxSharedTopic(
const LinuxSharedTopic&) =
delete;
726 LinuxSharedTopic& operator=(
const LinuxSharedTopic&) =
delete;
728 LinuxSharedTopic(LinuxSharedTopic&&) =
delete;
729 LinuxSharedTopic& operator=(LinuxSharedTopic&&) =
delete;
735 bool Valid()
const {
return open_ok_; }
740 ErrorCode GetError()
const {
return open_status_; }
745 uint32_t GetSubscriberNum()
const
753 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
755 if (subscribers_[i].active.load(std::memory_order_acquire) != 0)
773 return ErrorCode::STATE_ERR;
776 if (!PublisherValid())
778 return ErrorCode::STATE_ERR;
783 uint32_t slot_index = INVALID_INDEX;
784 ErrorCode pop_ans = PopFreeSlot(slot_index);
785 if (pop_ans != ErrorCode::OK)
787 ScavengeDeadSubscribers();
788 pop_ans = PopFreeSlot(slot_index);
789 if (pop_ans != ErrorCode::OK)
795 slots_[slot_index].refcount.store(0, std::memory_order_release);
796 slots_[slot_index].sequence.store(0, std::memory_order_release);
797 slots_[slot_index].timestamp_us = 0;
800 data.slot_index_ = slot_index;
802 data.state_ = SharedDataState::PUBLISHER;
803 data.subscriber_index_ = INVALID_INDEX;
804 return ErrorCode::OK;
814 SharedData topic_data;
815 const ErrorCode acquire_ans = CreateData(topic_data);
816 if (acquire_ans != ErrorCode::OK)
821 *topic_data.GetData() = data;
822 return Publish(topic_data);
825 ErrorCode Publish(
const TopicData& data, MicrosecondTimestamp timestamp)
827 SharedData topic_data;
828 const ErrorCode acquire_ans = CreateData(topic_data);
829 if (acquire_ans != ErrorCode::OK)
834 *topic_data.GetData() = data;
835 return Publish(topic_data, timestamp);
841 ErrorCode Publish(SharedData&& data) {
return PublishData<false>(data); }
843 ErrorCode Publish(SharedData&& data, MicrosecondTimestamp timestamp)
845 return PublishData<true>(data, timestamp);
851 ErrorCode Publish(SharedData& data) {
return PublishData<false>(data); }
853 ErrorCode Publish(SharedData& data, MicrosecondTimestamp timestamp)
855 return PublishData<true>(data, timestamp);
861 uint64_t GetPublishFailedNum()
const
867 return header_->publish_failures.load(std::memory_order_acquire);
875 static ErrorCode Remove(
const char* topic_name)
877 return Remove(topic_name, DEFAULT_DOMAIN_NAME);
880 static ErrorCode Remove(
const char* topic_name,
const char* domain_name)
882 const std::string shm_name = BuildShmName(
883 BuildNameKey(ResolveDomainKey(domain_name), ResolveTopicName(topic_name)));
884 if (shm_unlink(shm_name.c_str()) == 0 || errno == ENOENT)
886 return ErrorCode::OK;
888 return ErrorCode::FAILED;
891 static ErrorCode Remove(
const char* topic_name, Topic::Domain& domain)
893 const uint32_t domain_crc32 = (domain.node_ !=
nullptr) ? domain.node_->key : 0;
894 const std::string shm_name =
895 BuildShmName(BuildNameKey(domain_crc32, ResolveTopicName(topic_name)));
896 if (shm_unlink(shm_name.c_str()) == 0 || errno == ENOENT)
898 return ErrorCode::OK;
900 return ErrorCode::FAILED;
904 struct alignas(64) SharedHeader
907 uint64_t name_key = 0;
908 uint32_t domain_crc32 = 0;
909 uint32_t version = 0;
910 uint32_t data_size = 0;
911 uint32_t slot_count = 0;
912 uint32_t subscriber_capacity = 0;
913 uint32_t queue_capacity = 0;
914 uint32_t topic_name_len = 0;
915 std::atomic<uint32_t> init_state;
916 std::atomic<uint32_t> publisher_pid;
917 std::atomic<uint64_t> publisher_starttime;
918 std::atomic<uint64_t> free_queue_head;
919 std::atomic<uint64_t> free_queue_tail;
920 std::atomic<uint64_t> next_sequence;
921 std::atomic<uint64_t> publish_failures;
924 struct alignas(64) SlotControl
926 std::atomic<uint32_t> refcount;
927 std::atomic<uint64_t> sequence;
928 uint64_t timestamp_us;
931 struct alignas(16) FreeSlotCell
933 std::atomic<uint64_t> sequence;
934 uint32_t slot_index = 0;
935 uint32_t reserved = 0;
940 uint32_t slot_index = INVALID_INDEX;
941 uint32_t reserved = 0;
942 uint64_t sequence = 0;
945 struct alignas(64) SubscriberControl
947 std::atomic<uint32_t> active;
948 std::atomic<uint32_t> mode;
949 std::atomic<uint32_t> queue_head;
950 std::atomic<uint32_t> queue_tail;
951 std::atomic<uint32_t> ready_sem_count;
952 std::atomic<uint64_t> dropped_messages;
953 std::atomic<uint32_t> owner_pid;
954 std::atomic<uint64_t> owner_starttime;
955 std::atomic<uint32_t> held_slot;
958 struct alignas(64) BalancedGroupControl
960 std::atomic<uint64_t> rr_cursor;
963 struct ProcessIdentity
966 uint64_t starttime = 0;
969 static constexpr uint64_t MAGIC = 0x4c58524950435348ULL;
970 static constexpr uint32_t VERSION = 2;
971 static constexpr uint32_t INIT_READY = 1;
972 static constexpr uint32_t INVALID_INDEX = UINT32_MAX;
974 static uint32_t ResolveDomainKey(
const char* domain_name)
976 const std::string resolved =
977 (domain_name ==
nullptr || domain_name[0] ==
'\0') ? std::string(DEFAULT_DOMAIN_NAME)
978 : std::string(domain_name);
979 return CRC32::Calculate(resolved.data(), resolved.size());
982 static std::string ResolveTopicName(
const char* topic_name)
984 return (topic_name !=
nullptr) ? std::string(topic_name) : std::string();
987 static uint64_t BuildNameKey(uint32_t domain_crc32,
const std::string& topic_name)
989 const uint32_t topic_len =
static_cast<uint32_t
>(topic_name.size());
990 std::string key_material;
991 key_material.reserve(
sizeof(domain_crc32) +
sizeof(topic_len) + topic_len);
992 key_material.append(
reinterpret_cast<const char*
>(&domain_crc32),
sizeof(domain_crc32));
993 key_material.append(
reinterpret_cast<const char*
>(&topic_len),
sizeof(topic_len));
994 key_material.append(topic_name.data(), topic_name.size());
995 return CRC64::Calculate(key_material.data(), key_material.size());
998 static std::string BuildShmName(uint64_t name_key)
1000 char buffer[64] = {};
1001 std::snprintf(buffer,
sizeof(buffer),
"/libxr_ipc_%016" PRIx64, name_key);
1002 return std::string(buffer);
1005 static size_t AlignUp(
size_t value,
size_t alignment)
1007 return (value + alignment - 1U) & ~(alignment - 1U);
1010 static uint64_t NowMonotonicMs() {
return MonotonicTime::NowMilliseconds(); }
1012 static MicrosecondTimestamp NowMessageTimestamp() {
return Topic::NowTimestamp(); }
1014 static uint64_t ToSharedTimestamp(MicrosecondTimestamp timestamp)
1016 return MonotonicTime::XrToSharedMicroseconds(
static_cast<uint64_t
>(timestamp));
1019 static MicrosecondTimestamp FromSharedTimestamp(uint64_t timestamp_us)
1021 return MicrosecondTimestamp(MonotonicTime::SharedToXrMicroseconds(timestamp_us));
1024 static bool ReadProcessIdentity(uint32_t pid, ProcessIdentity& identity)
1033 std::snprintf(path,
sizeof(path),
"/proc/%u/stat", pid);
1035 std::ifstream file(path);
1036 if (!file.is_open())
1042 std::getline(file, line);
1048 const size_t rparen = line.rfind(
')');
1049 if (rparen == std::string::npos || rparen + 2U >= line.size())
1054 std::istringstream iss(line.substr(rparen + 2U));
1056 for (
int field = 3; field <= 22; ++field)
1058 if (!(iss >> token))
1066 identity.starttime = std::strtoull(token.c_str(),
nullptr, 10);
1067 return identity.starttime != 0;
1074 static int FutexWait(std::atomic<uint32_t>* word, uint32_t expected, uint32_t timeout_ms)
1076 struct timespec timeout = {};
1077 struct timespec* timeout_ptr =
nullptr;
1078 if (timeout_ms != UINT32_MAX)
1080 timeout.tv_sec =
static_cast<time_t
>(timeout_ms / 1000U);
1081 timeout.tv_nsec =
static_cast<long>(timeout_ms % 1000U) * 1000000L;
1082 timeout_ptr = &timeout;
1085 return static_cast<int>(syscall(SYS_futex,
1086 reinterpret_cast<uint32_t*
>(word),
1094 static int FutexWake(std::atomic<uint32_t>* word)
1096 return static_cast<int>(
1097 syscall(SYS_futex,
reinterpret_cast<uint32_t*
>(word), FUTEX_WAKE, INT32_MAX,
nullptr,
1101 static size_t ComputeSharedBytes(uint32_t slot_count,
1102 uint32_t subscriber_capacity,
1103 uint32_t queue_capacity,
1104 uint32_t topic_name_len)
1107 offset = AlignUp(offset,
alignof(SharedHeader));
1108 offset +=
sizeof(SharedHeader);
1110 offset +=
static_cast<size_t>(topic_name_len) + 1U;
1112 offset = AlignUp(offset,
alignof(SlotControl));
1113 offset +=
sizeof(SlotControl) * slot_count;
1115 offset = AlignUp(offset,
alignof(SubscriberControl));
1116 offset +=
sizeof(SubscriberControl) * subscriber_capacity;
1118 offset = AlignUp(offset,
alignof(BalancedGroupControl));
1119 offset +=
sizeof(BalancedGroupControl);
1121 offset = AlignUp(offset,
alignof(std::atomic<uint32_t>));
1122 offset +=
sizeof(std::atomic<uint32_t>) * subscriber_capacity;
1124 offset = AlignUp(offset,
alignof(FreeSlotCell));
1125 offset +=
sizeof(FreeSlotCell) * slot_count;
1127 offset = AlignUp(offset,
alignof(Descriptor));
1128 offset +=
sizeof(Descriptor) * subscriber_capacity * queue_capacity;
1130 offset = AlignUp(offset,
alignof(TopicData));
1131 offset +=
sizeof(TopicData) * slot_count;
1135 void SetupPointers()
1139 offset = AlignUp(offset,
alignof(SharedHeader));
1140 header_ =
reinterpret_cast<SharedHeader*
>(base_ + offset);
1141 offset +=
sizeof(SharedHeader);
1143 topic_name_ptr_ =
reinterpret_cast<char*
>(base_ + offset);
1144 offset +=
static_cast<size_t>(header_->topic_name_len) + 1U;
1146 offset = AlignUp(offset,
alignof(SlotControl));
1147 slots_ =
reinterpret_cast<SlotControl*
>(base_ + offset);
1148 offset +=
sizeof(SlotControl) * slot_count_;
1150 offset = AlignUp(offset,
alignof(SubscriberControl));
1151 subscribers_ =
reinterpret_cast<SubscriberControl*
>(base_ + offset);
1152 offset +=
sizeof(SubscriberControl) * subscriber_capacity_;
1154 offset = AlignUp(offset,
alignof(BalancedGroupControl));
1155 balanced_group_ =
reinterpret_cast<BalancedGroupControl*
>(base_ + offset);
1156 offset +=
sizeof(BalancedGroupControl);
1158 offset = AlignUp(offset,
alignof(std::atomic<uint32_t>));
1159 balanced_members_ =
reinterpret_cast<std::atomic<uint32_t>*
>(base_ + offset);
1160 offset +=
sizeof(std::atomic<uint32_t>) * subscriber_capacity_;
1162 offset = AlignUp(offset,
alignof(FreeSlotCell));
1163 free_slots_ =
reinterpret_cast<FreeSlotCell*
>(base_ + offset);
1164 offset +=
sizeof(FreeSlotCell) * slot_count_;
1166 offset = AlignUp(offset,
alignof(Descriptor));
1167 descriptors_ =
reinterpret_cast<Descriptor*
>(base_ + offset);
1168 offset +=
sizeof(Descriptor) * subscriber_capacity_ * queue_capacity_;
1170 offset = AlignUp(offset,
alignof(TopicData));
1171 payloads_ =
reinterpret_cast<TopicData*
>(base_ + offset);
1174 bool HeaderMatchesIdentity()
const
1176 if (header_->name_key != name_key_)
1180 if (header_->domain_crc32 != domain_crc32_)
1184 if (header_->topic_name_len != topic_name_.size())
1188 if (std::memcmp(topic_name_ptr_, topic_name_.c_str(), topic_name_.size() + 1U) != 0)
1197 if (config_.slot_num == 0 || config_.subscriber_num == 0 || config_.queue_num < 2)
1199 return ErrorCode::ARG_ERR;
1202 const size_t bytes =
1203 ComputeSharedBytes(config_.slot_num, config_.subscriber_num, config_.queue_num,
1204 static_cast<uint32_t
>(topic_name_.size()));
1206 if (ftruncate(fd_,
static_cast<off_t
>(bytes)) != 0)
1208 return ErrorCode::INIT_ERR;
1211 const struct stat st = GetStat();
1212 if (st.st_size <= 0)
1214 return ErrorCode::INIT_ERR;
1217 mapping_size_ =
static_cast<size_t>(st.st_size);
1218 mapping_ = mmap(
nullptr, mapping_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
1219 if (mapping_ == MAP_FAILED)
1222 return ErrorCode::INIT_ERR;
1225 base_ =
static_cast<uint8_t*
>(mapping_);
1226 slot_count_ = config_.slot_num;
1227 subscriber_capacity_ = config_.subscriber_num;
1228 queue_capacity_ = config_.queue_num;
1229 header_ =
reinterpret_cast<SharedHeader*
>(base_ + AlignUp(0,
alignof(SharedHeader)));
1230 header_->topic_name_len =
static_cast<uint32_t
>(topic_name_.size());
1233 header_->magic = MAGIC;
1234 header_->name_key = name_key_;
1235 header_->domain_crc32 = domain_crc32_;
1236 header_->version = VERSION;
1237 header_->data_size =
sizeof(TopicData);
1238 header_->slot_count = slot_count_;
1239 header_->subscriber_capacity = subscriber_capacity_;
1240 header_->queue_capacity = queue_capacity_;
1241 std::memcpy(topic_name_ptr_, topic_name_.c_str(), topic_name_.size() + 1U);
1242 header_->publisher_pid.store(self_identity_.pid, std::memory_order_release);
1243 header_->publisher_starttime.store(self_identity_.starttime, std::memory_order_release);
1244 header_->free_queue_head.store(0, std::memory_order_release);
1245 header_->free_queue_tail.store(slot_count_, std::memory_order_release);
1246 header_->next_sequence.store(0, std::memory_order_release);
1247 header_->publish_failures.store(0, std::memory_order_release);
1249 for (uint32_t i = 0; i < slot_count_; ++i)
1251 slots_[i].refcount.store(0, std::memory_order_release);
1252 slots_[i].sequence.store(0, std::memory_order_release);
1253 slots_[i].timestamp_us = 0;
1254 std::construct_at(&payloads_[i], TopicData{});
1255 free_slots_[i].slot_index = i;
1256 free_slots_[i].sequence.store(
static_cast<uint64_t
>(i) + 1U,
1257 std::memory_order_release);
1260 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1262 subscribers_[i].active.store(0, std::memory_order_release);
1263 subscribers_[i].mode.store(
static_cast<uint32_t
>(LinuxSharedSubscriberMode::BROADCAST_FULL),
1264 std::memory_order_release);
1265 subscribers_[i].queue_head.store(0, std::memory_order_release);
1266 subscribers_[i].queue_tail.store(0, std::memory_order_release);
1267 subscribers_[i].ready_sem_count.store(0, std::memory_order_release);
1268 subscribers_[i].dropped_messages.store(0, std::memory_order_release);
1269 subscribers_[i].owner_pid.store(0, std::memory_order_release);
1270 subscribers_[i].owner_starttime.store(0, std::memory_order_release);
1271 subscribers_[i].held_slot.store(INVALID_INDEX, std::memory_order_release);
1272 balanced_members_[i].store(INVALID_INDEX, std::memory_order_release);
1275 balanced_group_->rr_cursor.store(0, std::memory_order_release);
1277 for (
size_t i = 0; i < static_cast<size_t>(subscriber_capacity_) * queue_capacity_; ++i)
1279 descriptors_[i] = Descriptor{};
1282 header_->init_state.store(INIT_READY, std::memory_order_release);
1283 return ErrorCode::OK;
1288 const struct stat st = GetStat();
1289 if (st.st_size <= 0)
1291 return ErrorCode::NOT_FOUND;
1294 mapping_size_ =
static_cast<size_t>(st.st_size);
1295 mapping_ = mmap(
nullptr, mapping_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
1296 if (mapping_ == MAP_FAILED)
1299 return ErrorCode::INIT_ERR;
1302 base_ =
static_cast<uint8_t*
>(mapping_);
1303 header_ =
reinterpret_cast<SharedHeader*
>(base_);
1305 while (header_->init_state.load(std::memory_order_acquire) != INIT_READY)
1310 if (header_->magic != MAGIC || header_->version != VERSION ||
1311 header_->data_size !=
sizeof(TopicData))
1313 return ErrorCode::CHECK_ERR;
1316 slot_count_ = header_->slot_count;
1317 subscriber_capacity_ = header_->subscriber_capacity;
1318 queue_capacity_ = header_->queue_capacity;
1320 if (!HeaderMatchesIdentity())
1322 return ErrorCode::CHECK_ERR;
1324 return ErrorCode::OK;
1327 bool TryReclaimStaleSegment()
1329 int stale_fd = shm_open(shm_name_.c_str(), O_RDWR, 0600);
1332 return errno == ENOENT;
1335 struct stat st = {};
1336 if (fstat(stale_fd, &st) != 0)
1342 bool reclaim =
false;
1343 if (st.st_size <
static_cast<off_t
>(
sizeof(SharedHeader)))
1349 void* mapping = mmap(
nullptr,
static_cast<size_t>(st.st_size), PROT_READ | PROT_WRITE,
1350 MAP_SHARED, stale_fd, 0);
1351 if (mapping != MAP_FAILED)
1353 uint8_t* base =
static_cast<uint8_t*
>(mapping);
1354 auto* header =
reinterpret_cast<SharedHeader*
>(mapping);
1355 const uint32_t init_state = header->init_state.load(std::memory_order_acquire);
1356 bool identity_match =
false;
1357 const size_t mapping_size =
static_cast<size_t>(st.st_size);
1358 const size_t topic_name_bytes = topic_name_.size() + 1U;
1359 if (header->magic == MAGIC && header->version == VERSION &&
1360 header->domain_crc32 == domain_crc32_ &&
1361 header->topic_name_len == topic_name_.size())
1363 size_t offset = AlignUp(0,
alignof(SharedHeader));
1364 offset +=
sizeof(SharedHeader);
1365 if (offset <= mapping_size && topic_name_bytes <= (mapping_size - offset))
1367 const char* mapped_topic =
reinterpret_cast<const char*
>(base + offset);
1369 (header->name_key == name_key_) &&
1370 (std::memcmp(mapped_topic, topic_name_.c_str(), topic_name_bytes) == 0);
1373 const ProcessIdentity publisher_identity = {
1374 header->publisher_pid.load(std::memory_order_acquire),
1375 header->publisher_starttime.load(std::memory_order_acquire),
1378 if (!identity_match)
1382 else if (init_state != INIT_READY)
1384 reclaim = !ProcessAlive(publisher_identity);
1386 else if (!ProcessAlive(publisher_identity))
1391 munmap(mapping,
static_cast<size_t>(st.st_size));
1402 return shm_unlink(shm_name_.c_str()) == 0 || errno == ENOENT;
1407 open_status_ = ErrorCode::STATE_ERR;
1412 for (
int attempt = 0; attempt < 2; ++attempt)
1414 fd_ = shm_open(shm_name_.c_str(), O_CREAT | O_EXCL | O_RDWR, 0600);
1420 if (errno != EEXIST || !TryReclaimStaleSegment())
1432 open_status_ = InitializeLayout();
1436 fd_ = shm_open(shm_name_.c_str(), O_RDWR, 0600);
1443 open_status_ = AttachLayout();
1452 open_ok_ = (open_status_ == ErrorCode::OK);
1457 if (mapping_ !=
nullptr)
1459 munmap(mapping_, mapping_size_);
1466 subscribers_ =
nullptr;
1467 free_slots_ =
nullptr;
1468 descriptors_ =
nullptr;
1469 payloads_ =
nullptr;
1472 open_status_ = ErrorCode::STATE_ERR;
1475 struct stat GetStat() const
1477 struct stat st = {};
1482 Descriptor* DescriptorRing(uint32_t subscriber_index)
const
1484 return descriptors_ +
static_cast<size_t>(subscriber_index) * queue_capacity_;
1487 static bool ProcessAlive(
const ProcessIdentity& identity)
1489 ProcessIdentity current = {};
1490 if (!ReadProcessIdentity(identity.pid, current))
1495 return current.starttime == identity.starttime;
1498 bool PublisherValid()
const
1500 if (!publisher_ || header_ ==
nullptr)
1505 const ProcessIdentity owner = {
1506 header_->publisher_pid.load(std::memory_order_acquire),
1507 header_->publisher_starttime.load(std::memory_order_acquire),
1509 return owner.pid == self_identity_.pid && owner.starttime == self_identity_.starttime;
1512 void HoldSlot(uint32_t subscriber_index, uint32_t slot_index)
1514 subscribers_[subscriber_index].held_slot.store(slot_index, std::memory_order_release);
1517 void ClearHeldSlot(uint32_t subscriber_index, uint32_t slot_index)
1519 uint32_t expected = slot_index;
1520 subscribers_[subscriber_index].held_slot.compare_exchange_strong(
1521 expected, INVALID_INDEX, std::memory_order_acq_rel, std::memory_order_relaxed);
1524 MicrosecondTimestamp SlotTimestamp(uint32_t slot_index)
const
1526 return FromSharedTimestamp(slots_[slot_index].timestamp_us);
1529 ErrorCode RegisterBalancedSubscriber(uint32_t subscriber_index)
1531 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1533 uint32_t expected = INVALID_INDEX;
1534 if (balanced_members_[i].compare_exchange_strong(expected, subscriber_index,
1535 std::memory_order_acq_rel,
1536 std::memory_order_relaxed))
1538 return ErrorCode::OK;
1541 return ErrorCode::FULL;
1544 void UnregisterBalancedSubscriber(uint32_t subscriber_index)
1546 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1548 uint32_t expected = subscriber_index;
1549 if (balanced_members_[i].compare_exchange_strong(expected, INVALID_INDEX,
1550 std::memory_order_acq_rel,
1551 std::memory_order_relaxed))
1558 bool SelectBalancedSubscriber(uint32_t& subscriber_index)
1560 const uint64_t base = balanced_group_->rr_cursor.fetch_add(1, std::memory_order_acq_rel);
1561 for (uint32_t offset = 0; offset < subscriber_capacity_; ++offset)
1563 const uint32_t member_index =
1564 balanced_members_[(base + offset) % subscriber_capacity_].load(std::memory_order_acquire);
1565 if (member_index == INVALID_INDEX)
1569 if (subscribers_[member_index].active.load(std::memory_order_acquire) == 0)
1573 if (subscribers_[member_index].mode.load(std::memory_order_acquire) !=
1574 static_cast<uint32_t
>(LinuxSharedSubscriberMode::BALANCE_RR))
1578 const ProcessIdentity owner_identity = {
1579 subscribers_[member_index].owner_pid.load(std::memory_order_acquire),
1580 subscribers_[member_index].owner_starttime.load(std::memory_order_acquire),
1582 if (owner_identity.pid == 0 || owner_identity.starttime == 0)
1586 if (!ProcessAlive(owner_identity))
1588 ReclaimSubscriber(member_index);
1591 if (!QueueHasSpace(member_index))
1595 subscriber_index = member_index;
1601 bool ReclaimSubscriber(uint32_t subscriber_index)
1603 uint32_t expected = 1;
1604 if (!subscribers_[subscriber_index].active.compare_exchange_strong(
1605 expected, 0, std::memory_order_acq_rel, std::memory_order_relaxed))
1610 if (subscribers_[subscriber_index].mode.load(std::memory_order_acquire) ==
1611 static_cast<uint32_t
>(LinuxSharedSubscriberMode::BALANCE_RR))
1613 UnregisterBalancedSubscriber(subscriber_index);
1616 subscribers_[subscriber_index].owner_pid.store(0, std::memory_order_release);
1617 subscribers_[subscriber_index].owner_starttime.store(0, std::memory_order_release);
1618 subscribers_[subscriber_index].mode.store(
1619 static_cast<uint32_t
>(LinuxSharedSubscriberMode::BROADCAST_FULL),
1620 std::memory_order_release);
1622 const uint32_t held_slot =
1623 subscribers_[subscriber_index].held_slot.exchange(INVALID_INDEX,
1624 std::memory_order_acq_rel);
1625 if (held_slot != INVALID_INDEX)
1627 ReleaseSlot(held_slot);
1630 Descriptor desc = {};
1631 while (TryPopDescriptor(subscriber_index, desc) == ErrorCode::OK)
1633 ReleaseSlot(desc.slot_index);
1639 static void PostReady(SubscriberControl& control)
1641 control.ready_sem_count.fetch_add(1, std::memory_order_release);
1642 FutexWake(&control.ready_sem_count);
1645 static void ConsumeReady(SubscriberControl& control)
1647 const uint32_t prev = control.ready_sem_count.fetch_sub(1, std::memory_order_acq_rel);
1651 static ErrorCode WaitReady(SubscriberControl& control, uint32_t timeout_ms)
1653 if (control.ready_sem_count.load(std::memory_order_acquire) != 0)
1655 return ErrorCode::OK;
1658 const bool infinite_wait = (timeout_ms == UINT32_MAX);
1659 const uint64_t deadline_ms = infinite_wait ? 0 : (NowMonotonicMs() + timeout_ms);
1663 if (control.ready_sem_count.load(std::memory_order_acquire) != 0)
1665 return ErrorCode::OK;
1668 uint32_t wait_ms = UINT32_MAX;
1671 wait_ms = MonotonicTime::RemainingMilliseconds(deadline_ms);
1674 return ErrorCode::TIMEOUT;
1678 wait_ms = MonotonicTime::WaitSliceMilliseconds(wait_ms);
1680 const int futex_ans = FutexWait(&control.ready_sem_count, 0, wait_ms);
1681 if (futex_ans == 0 || errno == EAGAIN || errno == EINTR)
1686 if (errno == ETIMEDOUT)
1692 if (MonotonicTime::RemainingMilliseconds(deadline_ms) == 0 &&
1693 control.ready_sem_count.load(std::memory_order_acquire) == 0)
1695 return ErrorCode::TIMEOUT;
1700 return ErrorCode::FAILED;
1704 void ScavengeDeadSubscribers()
1706 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1708 if (subscribers_[i].active.load(std::memory_order_acquire) == 0)
1713 const ProcessIdentity owner_identity = {
1714 subscribers_[i].owner_pid.load(std::memory_order_acquire),
1715 subscribers_[i].owner_starttime.load(std::memory_order_acquire),
1717 if (ProcessAlive(owner_identity))
1722 ReclaimSubscriber(i);
1726 bool QueueHasSpace(uint32_t subscriber_index)
const
1728 const SubscriberControl& control = subscribers_[subscriber_index];
1729 const uint32_t head = control.queue_head.load(std::memory_order_acquire);
1730 const uint32_t tail = control.queue_tail.load(std::memory_order_relaxed);
1731 const uint32_t next_tail = (tail + 1U) % queue_capacity_;
1732 return next_tail != head;
1735 void PushDescriptor(uint32_t subscriber_index,
const Descriptor& descriptor)
1737 SubscriberControl& control = subscribers_[subscriber_index];
1738 Descriptor* ring = DescriptorRing(subscriber_index);
1740 const uint32_t tail = control.queue_tail.load(std::memory_order_relaxed);
1741 ring[tail] = descriptor;
1742 const uint32_t next_tail = (tail + 1U) % queue_capacity_;
1743 control.queue_tail.store(next_tail, std::memory_order_release);
1747 ErrorCode TryPopDescriptor(uint32_t subscriber_index, Descriptor& descriptor)
1749 SubscriberControl& control = subscribers_[subscriber_index];
1750 Descriptor* ring = DescriptorRing(subscriber_index);
1754 uint32_t head = control.queue_head.load(std::memory_order_relaxed);
1755 const uint32_t tail = control.queue_tail.load(std::memory_order_acquire);
1758 return ErrorCode::EMPTY;
1761 descriptor = ring[head];
1762 const uint32_t next_head = (head + 1U) % queue_capacity_;
1763 if (control.queue_head.compare_exchange_weak(head, next_head, std::memory_order_acq_rel,
1764 std::memory_order_relaxed))
1766 ConsumeReady(control);
1767 return ErrorCode::OK;
1772 ErrorCode DropDescriptor(uint32_t subscriber_index)
1774 SubscriberControl& control = subscribers_[subscriber_index];
1775 Descriptor* ring = DescriptorRing(subscriber_index);
1779 uint32_t head = control.queue_head.load(std::memory_order_relaxed);
1780 const uint32_t tail = control.queue_tail.load(std::memory_order_acquire);
1783 return ErrorCode::EMPTY;
1786 const Descriptor descriptor = ring[head];
1787 const uint32_t next_head = (head + 1U) % queue_capacity_;
1788 if (control.queue_head.compare_exchange_weak(head, next_head, std::memory_order_acq_rel,
1789 std::memory_order_relaxed))
1791 control.dropped_messages.fetch_add(1, std::memory_order_relaxed);
1792 ConsumeReady(control);
1793 ReleaseSlot(descriptor.slot_index);
1794 return ErrorCode::OK;
1799 ErrorCode PopFreeSlot(uint32_t& slot_index)
1803 uint64_t head = header_->free_queue_head.load(std::memory_order_relaxed);
1804 FreeSlotCell& cell = free_slots_[head % slot_count_];
1805 const uint64_t seq = cell.sequence.load(std::memory_order_acquire);
1806 const intptr_t diff =
static_cast<intptr_t
>(seq) -
static_cast<intptr_t
>(head + 1U);
1810 if (header_->free_queue_head.compare_exchange_weak(head, head + 1U,
1811 std::memory_order_acq_rel,
1812 std::memory_order_relaxed))
1814 slot_index = cell.slot_index;
1815 cell.sequence.store(head + slot_count_, std::memory_order_release);
1816 return ErrorCode::OK;
1821 return ErrorCode::FULL;
1826 void RecycleSlot(uint32_t slot_index)
1828 slots_[slot_index].sequence.store(0, std::memory_order_release);
1829 slots_[slot_index].timestamp_us = 0;
1833 uint64_t tail = header_->free_queue_tail.load(std::memory_order_relaxed);
1834 FreeSlotCell& cell = free_slots_[tail % slot_count_];
1835 const uint64_t seq = cell.sequence.load(std::memory_order_acquire);
1836 const intptr_t diff =
static_cast<intptr_t
>(seq) -
static_cast<intptr_t
>(tail);
1840 if (header_->free_queue_tail.compare_exchange_weak(tail, tail + 1U,
1841 std::memory_order_acq_rel,
1842 std::memory_order_relaxed))
1844 cell.slot_index = slot_index;
1845 cell.sequence.store(tail + 1U, std::memory_order_release);
1852 void ReleaseSlot(uint32_t slot_index)
1854 const uint32_t prev = slots_[slot_index].refcount.fetch_sub(1, std::memory_order_acq_rel);
1858 RecycleSlot(slot_index);
1862 template <
bool HAS_TIMESTAMP>
1864 MicrosecondTimestamp timestamp = MicrosecondTimestamp())
1866 if (!data.Valid() || data.topic_ !=
this)
1868 return ErrorCode::STATE_ERR;
1871 uint32_t active_count = 0;
1872 uint32_t balanced_target = INVALID_INDEX;
1873 bool has_balanced_subscriber =
false;
1874 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1876 if (subscribers_[i].active.load(std::memory_order_acquire) == 0)
1881 const LinuxSharedSubscriberMode mode =
static_cast<LinuxSharedSubscriberMode
>(
1882 subscribers_[i].mode.load(std::memory_order_acquire));
1883 if (mode == LinuxSharedSubscriberMode::BALANCE_RR)
1885 has_balanced_subscriber =
true;
1889 if (!QueueHasSpace(i))
1891 ScavengeDeadSubscribers();
1892 if (subscribers_[i].active.load(std::memory_order_acquire) == 0)
1897 if (mode == LinuxSharedSubscriberMode::BROADCAST_DROP_OLD)
1899 const ErrorCode drop_ans = DropDescriptor(i);
1900 if (drop_ans == ErrorCode::EMPTY && QueueHasSpace(i))
1905 else if (drop_ans != ErrorCode::OK)
1907 header_->publish_failures.fetch_add(1, std::memory_order_relaxed);
1909 return ErrorCode::FULL;
1914 subscribers_[i].dropped_messages.fetch_add(1, std::memory_order_relaxed);
1915 header_->publish_failures.fetch_add(1, std::memory_order_relaxed);
1917 return ErrorCode::FULL;
1924 if (has_balanced_subscriber)
1926 if (!SelectBalancedSubscriber(balanced_target))
1928 ScavengeDeadSubscribers();
1929 if (!SelectBalancedSubscriber(balanced_target))
1931 header_->publish_failures.fetch_add(1, std::memory_order_relaxed);
1933 return ErrorCode::FULL;
1939 if (active_count == 0)
1942 return ErrorCode::OK;
1945 const uint64_t sequence =
1946 header_->next_sequence.fetch_add(1, std::memory_order_acq_rel) + 1ULL;
1947 if constexpr (!HAS_TIMESTAMP)
1949 timestamp = NowMessageTimestamp();
1951 SlotControl& slot = slots_[data.slot_index_];
1952 slot.refcount.store(active_count, std::memory_order_release);
1953 slot.timestamp_us = ToSharedTimestamp(timestamp);
1954 slot.sequence.store(sequence, std::memory_order_release);
1956 const Descriptor descriptor = {data.slot_index_, 0U, sequence};
1957 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1959 if (subscribers_[i].active.load(std::memory_order_acquire) == 0)
1963 const LinuxSharedSubscriberMode mode =
static_cast<LinuxSharedSubscriberMode
>(
1964 subscribers_[i].mode.load(std::memory_order_acquire));
1965 if (mode == LinuxSharedSubscriberMode::BALANCE_RR)
1969 PushDescriptor(i, descriptor);
1972 if (balanced_target != INVALID_INDEX)
1974 PushDescriptor(balanced_target, descriptor);
1977 data.topic_ =
nullptr;
1978 data.slot_index_ = INVALID_INDEX;
1979 return ErrorCode::OK;
1982 bool create_ =
false;
1983 bool publisher_ =
false;
1984 LinuxSharedTopicConfig config_;
1985 uint32_t domain_crc32_ = 0;
1986 std::string topic_name_;
1987 uint64_t name_key_ = 0;
1988 std::string shm_name_;
1989 ProcessIdentity self_identity_ = {};
1992 void* mapping_ =
nullptr;
1993 uint8_t* base_ =
nullptr;
1994 size_t mapping_size_ = 0;
1996 SharedHeader* header_ =
nullptr;
1997 char* topic_name_ptr_ =
nullptr;
1998 SlotControl* slots_ =
nullptr;
1999 SubscriberControl* subscribers_ =
nullptr;
2000 BalancedGroupControl* balanced_group_ =
nullptr;
2001 std::atomic<uint32_t>* balanced_members_ =
nullptr;
2002 FreeSlotCell* free_slots_ =
nullptr;
2003 Descriptor* descriptors_ =
nullptr;
2004 TopicData* payloads_ =
nullptr;
2006 uint32_t slot_count_ = 0;
2007 uint32_t subscriber_capacity_ = 0;
2008 uint32_t queue_capacity_ = 0;
2010 bool open_ok_ =
false;
2011 ErrorCode open_status_ = ErrorCode::STATE_ERR;
@ INIT_ERR
初始化错误 | Initialization error