3#if defined(LIBXR_SYSTEM_POSIX_HOST)
19#include <linux/futex.h>
22#include <sys/syscall.h>
28#include "libxr_def.hpp"
30#include "monotonic_time.hpp"
38enum class LinuxSharedSubscriberMode : uint8_t
41 BROADCAST_DROP_OLD = 1,
49struct LinuxSharedTopicConfig
51 uint32_t slot_num = 64;
52 uint32_t subscriber_num = 8;
53 uint32_t queue_num = 64;
71template <
typename TopicData>
72class LinuxSharedTopic :
public Topic
74 static_assert(std::is_trivially_copyable<TopicData>::value,
75 "LinuxSharedTopic requires trivially copyable data");
76 static_assert(std::atomic<uint32_t>::is_always_lock_free,
77 "LinuxSharedTopic requires lock-free 32-bit atomics");
78 static_assert(std::atomic<uint64_t>::is_always_lock_free,
79 "LinuxSharedTopic requires lock-free 64-bit atomics");
81 enum class SharedDataState : uint8_t
90 using Data = SharedData;
91 static constexpr const char* DEFAULT_DOMAIN_NAME =
"libxr_def_domain";
101 using Data = SharedData;
106 Subscriber() =
default;
116 explicit Subscriber(
const char* name,
117 LinuxSharedSubscriberMode mode =
118 LinuxSharedSubscriberMode::BROADCAST_FULL)
119 : owned_topic_(new LinuxSharedTopic(name))
121 if (Attach(*owned_topic_, mode) != ErrorCode::OK)
124 owned_topic_ =
nullptr;
128 Subscriber(
const char* name,
const char* domain_name,
129 LinuxSharedSubscriberMode mode = LinuxSharedSubscriberMode::BROADCAST_FULL)
130 : owned_topic_(new LinuxSharedTopic(name, domain_name))
132 if (Attach(*owned_topic_, mode) != ErrorCode::OK)
135 owned_topic_ =
nullptr;
139 Subscriber(
const char* name, Topic::Domain& domain,
140 LinuxSharedSubscriberMode mode = LinuxSharedSubscriberMode::BROADCAST_FULL)
141 : owned_topic_(new LinuxSharedTopic(name, domain))
143 if (Attach(*owned_topic_, mode) != ErrorCode::OK)
146 owned_topic_ =
nullptr;
156 explicit Subscriber(LinuxSharedTopic& topic,
157 LinuxSharedSubscriberMode mode =
158 LinuxSharedSubscriberMode::BROADCAST_FULL)
160 (void)Attach(topic, mode);
166 ~Subscriber() { Reset(); }
168 Subscriber(
const Subscriber&) =
delete;
169 Subscriber& operator=(
const Subscriber&) =
delete;
171 Subscriber(Subscriber&& other)
noexcept { *
this = std::move(other); }
173 Subscriber& operator=(Subscriber&& other)
noexcept
182 topic_ = other.topic_;
183 owned_topic_ = other.owned_topic_;
184 subscriber_index_ = other.subscriber_index_;
185 current_slot_index_ = other.current_slot_index_;
186 current_sequence_ = other.current_sequence_;
188 other.topic_ =
nullptr;
189 other.owned_topic_ =
nullptr;
190 other.subscriber_index_ = kInvalidIndex;
191 other.current_slot_index_ = kInvalidIndex;
192 other.current_sequence_ = 0;
203 return topic_ !=
nullptr && subscriber_index_ != kInvalidIndex;
213 ErrorCode Wait(uint32_t timeout_ms = UINT32_MAX)
217 return ErrorCode::STATE_ERR;
220 const uint64_t deadline_ms =
221 (timeout_ms == UINT32_MAX) ? 0 : (NowMonotonicMs() + timeout_ms);
223 Descriptor desc = {};
226 ErrorCode pop_ans = topic_->TryPopDescriptor(subscriber_index_, desc);
227 if (pop_ans == ErrorCode::OK)
230 topic_->HoldSlot(subscriber_index_, desc.slot_index);
231 current_slot_index_ = desc.slot_index;
232 current_sequence_ = desc.sequence;
233 return ErrorCode::OK;
236 uint32_t wait_ms = UINT32_MAX;
237 if (timeout_ms != UINT32_MAX)
239 const uint64_t now_ms = NowMonotonicMs();
240 if (now_ms >= deadline_ms)
242 return ErrorCode::TIMEOUT;
244 wait_ms =
static_cast<uint32_t
>(deadline_ms - now_ms);
247 const ErrorCode wait_ans = topic_->WaitReady(topic_->subscribers_[subscriber_index_],
249 if (wait_ans == ErrorCode::OK)
265 ErrorCode Wait(SharedData& data, uint32_t timeout_ms = UINT32_MAX)
271 return ErrorCode::STATE_ERR;
274 const uint64_t deadline_ms =
275 (timeout_ms == UINT32_MAX) ? 0 : (NowMonotonicMs() + timeout_ms);
277 Descriptor desc = {};
280 ErrorCode pop_ans = topic_->TryPopDescriptor(subscriber_index_, desc);
281 if (pop_ans == ErrorCode::OK)
283 data.topic_ = topic_;
284 data.slot_index_ = desc.slot_index;
285 data.sequence_ = desc.sequence;
286 data.state_ = SharedDataState::SUBSCRIBER;
287 data.subscriber_index_ = subscriber_index_;
288 topic_->HoldSlot(subscriber_index_, desc.slot_index);
289 return ErrorCode::OK;
292 uint32_t wait_ms = UINT32_MAX;
293 if (timeout_ms != UINT32_MAX)
295 const uint64_t now_ms = NowMonotonicMs();
296 if (now_ms >= deadline_ms)
298 return ErrorCode::TIMEOUT;
300 wait_ms =
static_cast<uint32_t
>(deadline_ms - now_ms);
303 const ErrorCode wait_ans = topic_->WaitReady(topic_->subscribers_[subscriber_index_],
305 if (wait_ans == ErrorCode::OK)
318 const TopicData* GetData()
const
320 if (!Valid() || current_slot_index_ == kInvalidIndex)
325 return &topic_->payloads_[current_slot_index_];
331 uint64_t GetSequence()
const {
return current_sequence_; }
336 uint32_t GetPendingNum()
const
343 const SubscriberControl& control = topic_->subscribers_[subscriber_index_];
344 const uint32_t head = control.queue_head.load(std::memory_order_acquire);
345 const uint32_t tail = control.queue_tail.load(std::memory_order_acquire);
350 return topic_->queue_capacity_ - (head - tail);
356 uint64_t GetDropNum()
const
363 return topic_->subscribers_[subscriber_index_].dropped_messages.load(
364 std::memory_order_acquire);
372 if (!Valid() || current_slot_index_ == kInvalidIndex)
377 topic_->ClearHeldSlot(subscriber_index_, current_slot_index_);
378 topic_->ReleaseSlot(current_slot_index_);
379 current_slot_index_ = kInvalidIndex;
380 current_sequence_ = 0;
394 topic_->UnregisterBalancedSubscriber(subscriber_index_);
395 topic_->subscribers_[subscriber_index_].active.store(0, std::memory_order_release);
396 topic_->subscribers_[subscriber_index_].owner_pid.store(0, std::memory_order_release);
397 topic_->subscribers_[subscriber_index_].owner_starttime.store(0,
398 std::memory_order_release);
400 Descriptor desc = {};
401 while (topic_->TryPopDescriptor(subscriber_index_, desc) == ErrorCode::OK)
403 topic_->ReleaseSlot(desc.slot_index);
410 owned_topic_ =
nullptr;
411 subscriber_index_ = kInvalidIndex;
412 current_slot_index_ = kInvalidIndex;
413 current_sequence_ = 0;
417 ErrorCode Attach(LinuxSharedTopic& topic, LinuxSharedSubscriberMode mode)
423 return ErrorCode::STATE_ERR;
426 if (topic.self_identity_.starttime == 0)
428 return ErrorCode::STATE_ERR;
431 for (uint32_t i = 0; i < topic.subscriber_capacity_; ++i)
433 uint32_t expected = 0;
434 auto& active = topic.subscribers_[i].active;
435 if (active.compare_exchange_strong(expected, 1, std::memory_order_acq_rel,
436 std::memory_order_relaxed))
438 topic.subscribers_[i].queue_head.store(0, std::memory_order_release);
439 topic.subscribers_[i].queue_tail.store(0, std::memory_order_release);
440 topic.subscribers_[i].ready_sem_count.store(0, std::memory_order_release);
441 topic.subscribers_[i].dropped_messages.store(0, std::memory_order_release);
442 topic.subscribers_[i].owner_pid.store(topic.self_identity_.pid,
443 std::memory_order_release);
444 topic.subscribers_[i].owner_starttime.store(topic.self_identity_.starttime,
445 std::memory_order_release);
446 topic.subscribers_[i].held_slot.store(kInvalidIndex, std::memory_order_release);
447 topic.subscribers_[i].mode.store(
static_cast<uint32_t
>(mode),
448 std::memory_order_release);
449 if (mode == LinuxSharedSubscriberMode::BALANCE_RR)
451 const ErrorCode join_ans = topic.RegisterBalancedSubscriber(i);
452 if (join_ans != ErrorCode::OK)
454 topic.subscribers_[i].active.store(0, std::memory_order_release);
455 topic.subscribers_[i].owner_pid.store(0, std::memory_order_release);
456 topic.subscribers_[i].owner_starttime.store(0,
457 std::memory_order_release);
458 topic.subscribers_[i].mode.store(
459 static_cast<uint32_t
>(LinuxSharedSubscriberMode::BROADCAST_FULL),
460 std::memory_order_release);
465 subscriber_index_ = i;
466 current_slot_index_ = kInvalidIndex;
467 current_sequence_ = 0;
468 return ErrorCode::OK;
472 return ErrorCode::FULL;
475 LinuxSharedTopic* topic_ =
nullptr;
476 LinuxSharedTopic* owned_topic_ =
nullptr;
477 uint32_t subscriber_index_ = kInvalidIndex;
478 uint32_t current_slot_index_ = kInvalidIndex;
479 uint64_t current_sequence_ = 0;
497 SharedData() =
default;
503 ~SharedData() { Reset(); }
505 SharedData(
const SharedData&) =
delete;
506 SharedData& operator=(
const SharedData&) =
delete;
508 SharedData(SharedData&& other)
noexcept { *
this = std::move(other); }
513 SharedData& operator=(SharedData&& other)
noexcept
522 topic_ = other.topic_;
523 slot_index_ = other.slot_index_;
524 sequence_ = other.sequence_;
525 state_ = other.state_;
526 subscriber_index_ = other.subscriber_index_;
528 other.topic_ =
nullptr;
529 other.slot_index_ = kInvalidIndex;
531 other.state_ = SharedDataState::EMPTY;
532 other.subscriber_index_ = kInvalidIndex;
539 bool Valid()
const {
return topic_ !=
nullptr && slot_index_ != kInvalidIndex; }
544 bool Empty()
const {
return !Valid(); }
549 uint64_t GetSequence()
const {
return sequence_; }
562 return &topic_->payloads_[slot_index_];
570 const TopicData* GetData()
const
576 return &topic_->payloads_[slot_index_];
589 if (state_ == SharedDataState::PUBLISHER)
591 topic_->RecycleSlot(slot_index_);
593 else if (state_ == SharedDataState::SUBSCRIBER)
595 topic_->ClearHeldSlot(subscriber_index_, slot_index_);
596 topic_->ReleaseSlot(slot_index_);
599 slot_index_ = kInvalidIndex;
601 state_ = SharedDataState::EMPTY;
602 subscriber_index_ = kInvalidIndex;
606 friend class LinuxSharedTopic<TopicData>;
607 friend class Subscriber;
609 LinuxSharedTopic* topic_ =
nullptr;
610 uint32_t slot_index_ = kInvalidIndex;
611 uint64_t sequence_ = 0;
612 SharedDataState state_ = SharedDataState::EMPTY;
613 uint32_t subscriber_index_ = kInvalidIndex;
620 explicit LinuxSharedTopic(
const char* topic_name)
621 : LinuxSharedTopic(topic_name, DEFAULT_DOMAIN_NAME)
625 LinuxSharedTopic(
const char* topic_name,
const char* domain_name)
629 domain_crc32_(ResolveDomainKey(domain_name)),
630 topic_name_(ResolveTopicName(topic_name)),
631 name_key_(BuildNameKey(domain_crc32_, topic_name_)),
632 shm_name_(BuildShmName(name_key_))
634 (void)ReadProcessIdentity(
static_cast<uint32_t
>(getpid()), self_identity_);
638 LinuxSharedTopic(
const char* topic_name, Topic::Domain& domain)
642 domain_crc32_(domain.node_ != nullptr ? domain.node_->key : 0),
643 topic_name_(ResolveTopicName(topic_name)),
644 name_key_(BuildNameKey(domain_crc32_, topic_name_)),
645 shm_name_(BuildShmName(name_key_))
647 (void)ReadProcessIdentity(
static_cast<uint32_t
>(getpid()), self_identity_);
657 LinuxSharedTopic(
const char* topic_name,
const LinuxSharedTopicConfig& config)
658 : LinuxSharedTopic(topic_name, DEFAULT_DOMAIN_NAME, config)
662 LinuxSharedTopic(
const char* topic_name,
const char* domain_name,
663 const LinuxSharedTopicConfig& config)
667 domain_crc32_(ResolveDomainKey(domain_name)),
668 topic_name_(ResolveTopicName(topic_name)),
669 name_key_(BuildNameKey(domain_crc32_, topic_name_)),
670 shm_name_(BuildShmName(name_key_))
672 (void)ReadProcessIdentity(
static_cast<uint32_t
>(getpid()), self_identity_);
676 LinuxSharedTopic(
const char* topic_name, Topic::Domain& domain,
677 const LinuxSharedTopicConfig& config)
681 domain_crc32_(domain.node_ != nullptr ? domain.node_->key : 0),
682 topic_name_(ResolveTopicName(topic_name)),
683 name_key_(BuildNameKey(domain_crc32_, topic_name_)),
684 shm_name_(BuildShmName(name_key_))
686 (void)ReadProcessIdentity(
static_cast<uint32_t
>(getpid()), self_identity_);
694 using SyncSubscriber = Subscriber;
699 ~LinuxSharedTopic() { Close(); }
701 LinuxSharedTopic(
const LinuxSharedTopic&) =
delete;
702 LinuxSharedTopic& operator=(
const LinuxSharedTopic&) =
delete;
704 LinuxSharedTopic(LinuxSharedTopic&&) =
delete;
705 LinuxSharedTopic& operator=(LinuxSharedTopic&&) =
delete;
711 bool Valid()
const {
return open_ok_; }
716 ErrorCode GetError()
const {
return open_status_; }
721 uint32_t GetSubscriberNum()
const
729 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
731 if (subscribers_[i].active.load(std::memory_order_acquire) != 0)
749 return ErrorCode::STATE_ERR;
752 if (!PublisherValid())
754 return ErrorCode::STATE_ERR;
759 uint32_t slot_index = kInvalidIndex;
760 ErrorCode pop_ans = PopFreeSlot(slot_index);
761 if (pop_ans != ErrorCode::OK)
763 ScavengeDeadSubscribers();
764 pop_ans = PopFreeSlot(slot_index);
765 if (pop_ans != ErrorCode::OK)
771 slots_[slot_index].refcount.store(0, std::memory_order_release);
772 slots_[slot_index].sequence.store(0, std::memory_order_release);
775 data.slot_index_ = slot_index;
777 data.state_ = SharedDataState::PUBLISHER;
778 data.subscriber_index_ = kInvalidIndex;
779 return ErrorCode::OK;
789 SharedData topic_data;
790 const ErrorCode acquire_ans = CreateData(topic_data);
791 if (acquire_ans != ErrorCode::OK)
796 *topic_data.GetData() = data;
797 return Publish(topic_data);
803 ErrorCode Publish(SharedData&& data) {
return PublishData(data); }
808 ErrorCode Publish(SharedData& data) {
return PublishData(data); }
813 uint64_t GetPublishFailedNum()
const
819 return header_->publish_failures.load(std::memory_order_acquire);
827 static ErrorCode Remove(
const char* topic_name)
829 return Remove(topic_name, DEFAULT_DOMAIN_NAME);
832 static ErrorCode Remove(
const char* topic_name,
const char* domain_name)
834 const std::string shm_name = BuildShmName(
835 BuildNameKey(ResolveDomainKey(domain_name), ResolveTopicName(topic_name)));
836 if (shm_unlink(shm_name.c_str()) == 0 || errno == ENOENT)
838 return ErrorCode::OK;
840 return ErrorCode::FAILED;
843 static ErrorCode Remove(
const char* topic_name, Topic::Domain& domain)
845 const uint32_t domain_crc32 = (domain.node_ !=
nullptr) ? domain.node_->key : 0;
846 const std::string shm_name =
847 BuildShmName(BuildNameKey(domain_crc32, ResolveTopicName(topic_name)));
848 if (shm_unlink(shm_name.c_str()) == 0 || errno == ENOENT)
850 return ErrorCode::OK;
852 return ErrorCode::FAILED;
856 struct alignas(64) SharedHeader
859 uint64_t name_key = 0;
860 uint32_t domain_crc32 = 0;
861 uint32_t version = 0;
862 uint32_t data_size = 0;
863 uint32_t slot_count = 0;
864 uint32_t subscriber_capacity = 0;
865 uint32_t queue_capacity = 0;
866 uint32_t topic_name_len = 0;
867 std::atomic<uint32_t> init_state;
868 std::atomic<uint32_t> publisher_pid;
869 std::atomic<uint64_t> publisher_starttime;
870 std::atomic<uint64_t> free_queue_head;
871 std::atomic<uint64_t> free_queue_tail;
872 std::atomic<uint64_t> next_sequence;
873 std::atomic<uint64_t> publish_failures;
876 struct alignas(64) SlotControl
878 std::atomic<uint32_t> refcount;
879 std::atomic<uint64_t> sequence;
882 struct alignas(16) FreeSlotCell
884 std::atomic<uint64_t> sequence;
885 uint32_t slot_index = 0;
886 uint32_t reserved = 0;
891 uint32_t slot_index = kInvalidIndex;
892 uint32_t reserved = 0;
893 uint64_t sequence = 0;
896 struct alignas(64) SubscriberControl
898 std::atomic<uint32_t> active;
899 std::atomic<uint32_t> mode;
900 std::atomic<uint32_t> queue_head;
901 std::atomic<uint32_t> queue_tail;
902 std::atomic<uint32_t> ready_sem_count;
903 std::atomic<uint64_t> dropped_messages;
904 std::atomic<uint32_t> owner_pid;
905 std::atomic<uint64_t> owner_starttime;
906 std::atomic<uint32_t> held_slot;
909 struct alignas(64) BalancedGroupControl
911 std::atomic<uint64_t> rr_cursor;
914 struct ProcessIdentity
917 uint64_t starttime = 0;
920 static constexpr uint64_t kMagic = 0x4c58524950435348ULL;
921 static constexpr uint32_t kVersion = 1;
922 static constexpr uint32_t kInitReady = 1;
923 static constexpr uint32_t kInvalidIndex = UINT32_MAX;
925 static uint32_t ResolveDomainKey(
const char* domain_name)
927 const std::string resolved =
928 (domain_name ==
nullptr || domain_name[0] ==
'\0') ? std::string(DEFAULT_DOMAIN_NAME)
929 : std::string(domain_name);
930 return CRC32::Calculate(resolved.data(), resolved.size());
933 static std::string ResolveTopicName(
const char* topic_name)
935 return (topic_name !=
nullptr) ? std::string(topic_name) : std::string();
938 static uint64_t BuildNameKey(uint32_t domain_crc32,
const std::string& topic_name)
940 const uint32_t topic_len =
static_cast<uint32_t
>(topic_name.size());
941 std::string key_material;
942 key_material.reserve(
sizeof(domain_crc32) +
sizeof(topic_len) + topic_len);
943 key_material.append(
reinterpret_cast<const char*
>(&domain_crc32),
sizeof(domain_crc32));
944 key_material.append(
reinterpret_cast<const char*
>(&topic_len),
sizeof(topic_len));
945 key_material.append(topic_name.data(), topic_name.size());
946 return CRC64::Calculate(key_material.data(), key_material.size());
949 static std::string BuildShmName(uint64_t name_key)
951 char buffer[64] = {};
952 std::snprintf(buffer,
sizeof(buffer),
"/libxr_ipc_%016" PRIx64, name_key);
953 return std::string(buffer);
956 static size_t AlignUp(
size_t value,
size_t alignment)
958 return (value + alignment - 1U) & ~(alignment - 1U);
961 static uint64_t NowMonotonicMs() {
return MonotonicTime::NowMilliseconds(); }
963 static bool ReadProcessIdentity(uint32_t pid, ProcessIdentity& identity)
972 std::snprintf(path,
sizeof(path),
"/proc/%u/stat", pid);
974 std::ifstream file(path);
981 std::getline(file, line);
987 const size_t rparen = line.rfind(
')');
988 if (rparen == std::string::npos || rparen + 2U >= line.size())
993 std::istringstream iss(line.substr(rparen + 2U));
995 for (
int field = 3; field <= 22; ++field)
1005 identity.starttime = std::strtoull(token.c_str(),
nullptr, 10);
1006 return identity.starttime != 0;
1013 static int FutexWait(std::atomic<uint32_t>* word, uint32_t expected, uint32_t timeout_ms)
1015 struct timespec timeout = {};
1016 struct timespec* timeout_ptr =
nullptr;
1017 if (timeout_ms != UINT32_MAX)
1019 timeout.tv_sec =
static_cast<time_t
>(timeout_ms / 1000U);
1020 timeout.tv_nsec =
static_cast<long>(timeout_ms % 1000U) * 1000000L;
1021 timeout_ptr = &timeout;
1024 return static_cast<int>(syscall(SYS_futex,
1025 reinterpret_cast<uint32_t*
>(word),
1033 static int FutexWake(std::atomic<uint32_t>* word)
1035 return static_cast<int>(
1036 syscall(SYS_futex,
reinterpret_cast<uint32_t*
>(word), FUTEX_WAKE, INT32_MAX,
nullptr,
1040 static size_t ComputeSharedBytes(uint32_t slot_count,
1041 uint32_t subscriber_capacity,
1042 uint32_t queue_capacity,
1043 uint32_t topic_name_len)
1046 offset = AlignUp(offset,
alignof(SharedHeader));
1047 offset +=
sizeof(SharedHeader);
1049 offset +=
static_cast<size_t>(topic_name_len) + 1U;
1051 offset = AlignUp(offset,
alignof(SlotControl));
1052 offset +=
sizeof(SlotControl) * slot_count;
1054 offset = AlignUp(offset,
alignof(SubscriberControl));
1055 offset +=
sizeof(SubscriberControl) * subscriber_capacity;
1057 offset = AlignUp(offset,
alignof(BalancedGroupControl));
1058 offset +=
sizeof(BalancedGroupControl);
1060 offset = AlignUp(offset,
alignof(std::atomic<uint32_t>));
1061 offset +=
sizeof(std::atomic<uint32_t>) * subscriber_capacity;
1063 offset = AlignUp(offset,
alignof(FreeSlotCell));
1064 offset +=
sizeof(FreeSlotCell) * slot_count;
1066 offset = AlignUp(offset,
alignof(Descriptor));
1067 offset +=
sizeof(Descriptor) * subscriber_capacity * queue_capacity;
1069 offset = AlignUp(offset,
alignof(TopicData));
1070 offset +=
sizeof(TopicData) * slot_count;
1074 void SetupPointers()
1078 offset = AlignUp(offset,
alignof(SharedHeader));
1079 header_ =
reinterpret_cast<SharedHeader*
>(base_ + offset);
1080 offset +=
sizeof(SharedHeader);
1082 topic_name_ptr_ =
reinterpret_cast<char*
>(base_ + offset);
1083 offset +=
static_cast<size_t>(header_->topic_name_len) + 1U;
1085 offset = AlignUp(offset,
alignof(SlotControl));
1086 slots_ =
reinterpret_cast<SlotControl*
>(base_ + offset);
1087 offset +=
sizeof(SlotControl) * slot_count_;
1089 offset = AlignUp(offset,
alignof(SubscriberControl));
1090 subscribers_ =
reinterpret_cast<SubscriberControl*
>(base_ + offset);
1091 offset +=
sizeof(SubscriberControl) * subscriber_capacity_;
1093 offset = AlignUp(offset,
alignof(BalancedGroupControl));
1094 balanced_group_ =
reinterpret_cast<BalancedGroupControl*
>(base_ + offset);
1095 offset +=
sizeof(BalancedGroupControl);
1097 offset = AlignUp(offset,
alignof(std::atomic<uint32_t>));
1098 balanced_members_ =
reinterpret_cast<std::atomic<uint32_t>*
>(base_ + offset);
1099 offset +=
sizeof(std::atomic<uint32_t>) * subscriber_capacity_;
1101 offset = AlignUp(offset,
alignof(FreeSlotCell));
1102 free_slots_ =
reinterpret_cast<FreeSlotCell*
>(base_ + offset);
1103 offset +=
sizeof(FreeSlotCell) * slot_count_;
1105 offset = AlignUp(offset,
alignof(Descriptor));
1106 descriptors_ =
reinterpret_cast<Descriptor*
>(base_ + offset);
1107 offset +=
sizeof(Descriptor) * subscriber_capacity_ * queue_capacity_;
1109 offset = AlignUp(offset,
alignof(TopicData));
1110 payloads_ =
reinterpret_cast<TopicData*
>(base_ + offset);
1113 bool HeaderMatchesIdentity()
const
1115 if (header_->name_key != name_key_)
1119 if (header_->domain_crc32 != domain_crc32_)
1123 if (header_->topic_name_len != topic_name_.size())
1127 if (std::memcmp(topic_name_ptr_, topic_name_.c_str(), topic_name_.size() + 1U) != 0)
1136 if (config_.slot_num == 0 || config_.subscriber_num == 0 || config_.queue_num < 2)
1138 return ErrorCode::ARG_ERR;
1141 const size_t bytes =
1142 ComputeSharedBytes(config_.slot_num, config_.subscriber_num, config_.queue_num,
1143 static_cast<uint32_t
>(topic_name_.size()));
1145 if (ftruncate(fd_,
static_cast<off_t
>(bytes)) != 0)
1147 return ErrorCode::INIT_ERR;
1150 const struct stat st = GetStat();
1151 if (st.st_size <= 0)
1153 return ErrorCode::INIT_ERR;
1156 mapping_size_ =
static_cast<size_t>(st.st_size);
1157 mapping_ = mmap(
nullptr, mapping_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
1158 if (mapping_ == MAP_FAILED)
1161 return ErrorCode::INIT_ERR;
1164 base_ =
static_cast<uint8_t*
>(mapping_);
1165 slot_count_ = config_.slot_num;
1166 subscriber_capacity_ = config_.subscriber_num;
1167 queue_capacity_ = config_.queue_num;
1168 header_ =
reinterpret_cast<SharedHeader*
>(base_ + AlignUp(0,
alignof(SharedHeader)));
1169 header_->topic_name_len =
static_cast<uint32_t
>(topic_name_.size());
1172 header_->magic = kMagic;
1173 header_->name_key = name_key_;
1174 header_->domain_crc32 = domain_crc32_;
1175 header_->version = kVersion;
1176 header_->data_size =
sizeof(TopicData);
1177 header_->slot_count = slot_count_;
1178 header_->subscriber_capacity = subscriber_capacity_;
1179 header_->queue_capacity = queue_capacity_;
1180 std::memcpy(topic_name_ptr_, topic_name_.c_str(), topic_name_.size() + 1U);
1181 header_->publisher_pid.store(self_identity_.pid, std::memory_order_release);
1182 header_->publisher_starttime.store(self_identity_.starttime, std::memory_order_release);
1183 header_->free_queue_head.store(0, std::memory_order_release);
1184 header_->free_queue_tail.store(slot_count_, std::memory_order_release);
1185 header_->next_sequence.store(0, std::memory_order_release);
1186 header_->publish_failures.store(0, std::memory_order_release);
1188 for (uint32_t i = 0; i < slot_count_; ++i)
1190 slots_[i].refcount.store(0, std::memory_order_release);
1191 slots_[i].sequence.store(0, std::memory_order_release);
1192 std::memset(&payloads_[i], 0,
sizeof(TopicData));
1193 free_slots_[i].slot_index = i;
1194 free_slots_[i].sequence.store(
static_cast<uint64_t
>(i) + 1U,
1195 std::memory_order_release);
1198 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1200 subscribers_[i].active.store(0, std::memory_order_release);
1201 subscribers_[i].mode.store(
static_cast<uint32_t
>(LinuxSharedSubscriberMode::BROADCAST_FULL),
1202 std::memory_order_release);
1203 subscribers_[i].queue_head.store(0, std::memory_order_release);
1204 subscribers_[i].queue_tail.store(0, std::memory_order_release);
1205 subscribers_[i].ready_sem_count.store(0, std::memory_order_release);
1206 subscribers_[i].dropped_messages.store(0, std::memory_order_release);
1207 subscribers_[i].owner_pid.store(0, std::memory_order_release);
1208 subscribers_[i].owner_starttime.store(0, std::memory_order_release);
1209 subscribers_[i].held_slot.store(kInvalidIndex, std::memory_order_release);
1210 balanced_members_[i].store(kInvalidIndex, std::memory_order_release);
1213 balanced_group_->rr_cursor.store(0, std::memory_order_release);
1215 for (
size_t i = 0; i < static_cast<size_t>(subscriber_capacity_) * queue_capacity_; ++i)
1217 descriptors_[i] = Descriptor{};
1220 header_->init_state.store(kInitReady, std::memory_order_release);
1221 return ErrorCode::OK;
1226 const struct stat st = GetStat();
1227 if (st.st_size <= 0)
1229 return ErrorCode::NOT_FOUND;
1232 mapping_size_ =
static_cast<size_t>(st.st_size);
1233 mapping_ = mmap(
nullptr, mapping_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
1234 if (mapping_ == MAP_FAILED)
1237 return ErrorCode::INIT_ERR;
1240 base_ =
static_cast<uint8_t*
>(mapping_);
1241 header_ =
reinterpret_cast<SharedHeader*
>(base_);
1243 while (header_->init_state.load(std::memory_order_acquire) != kInitReady)
1248 if (header_->magic != kMagic || header_->version != kVersion ||
1249 header_->data_size !=
sizeof(TopicData))
1251 return ErrorCode::CHECK_ERR;
1254 slot_count_ = header_->slot_count;
1255 subscriber_capacity_ = header_->subscriber_capacity;
1256 queue_capacity_ = header_->queue_capacity;
1258 if (!HeaderMatchesIdentity())
1260 return ErrorCode::CHECK_ERR;
1262 return ErrorCode::OK;
1265 bool TryReclaimStaleSegment()
1267 int stale_fd = shm_open(shm_name_.c_str(), O_RDWR, 0600);
1270 return errno == ENOENT;
1273 struct stat st = {};
1274 if (fstat(stale_fd, &st) != 0)
1280 bool reclaim =
false;
1281 if (st.st_size <
static_cast<off_t
>(
sizeof(SharedHeader)))
1287 void* mapping = mmap(
nullptr,
static_cast<size_t>(st.st_size), PROT_READ | PROT_WRITE,
1288 MAP_SHARED, stale_fd, 0);
1289 if (mapping != MAP_FAILED)
1291 uint8_t* base =
static_cast<uint8_t*
>(mapping);
1292 auto* header =
reinterpret_cast<SharedHeader*
>(mapping);
1293 const uint32_t init_state = header->init_state.load(std::memory_order_acquire);
1294 bool identity_match =
false;
1295 const size_t mapping_size =
static_cast<size_t>(st.st_size);
1296 const size_t topic_name_bytes = topic_name_.size() + 1U;
1297 if (header->magic == kMagic && header->version == kVersion &&
1298 header->domain_crc32 == domain_crc32_ &&
1299 header->topic_name_len == topic_name_.size())
1301 size_t offset = AlignUp(0,
alignof(SharedHeader));
1302 offset +=
sizeof(SharedHeader);
1303 if (offset <= mapping_size && topic_name_bytes <= (mapping_size - offset))
1305 const char* mapped_topic =
reinterpret_cast<const char*
>(base + offset);
1307 (header->name_key == name_key_) &&
1308 (std::memcmp(mapped_topic, topic_name_.c_str(), topic_name_bytes) == 0);
1311 const ProcessIdentity publisher_identity = {
1312 header->publisher_pid.load(std::memory_order_acquire),
1313 header->publisher_starttime.load(std::memory_order_acquire),
1316 if (!identity_match)
1320 else if (init_state != kInitReady)
1322 reclaim = !ProcessAlive(publisher_identity);
1324 else if (!ProcessAlive(publisher_identity))
1329 munmap(mapping,
static_cast<size_t>(st.st_size));
1340 return shm_unlink(shm_name_.c_str()) == 0 || errno == ENOENT;
1345 open_status_ = ErrorCode::STATE_ERR;
1350 for (
int attempt = 0; attempt < 2; ++attempt)
1352 fd_ = shm_open(shm_name_.c_str(), O_CREAT | O_EXCL | O_RDWR, 0600);
1358 if (errno != EEXIST || !TryReclaimStaleSegment())
1370 open_status_ = InitializeLayout();
1374 fd_ = shm_open(shm_name_.c_str(), O_RDWR, 0600);
1381 open_status_ = AttachLayout();
1390 open_ok_ = (open_status_ == ErrorCode::OK);
1395 if (mapping_ !=
nullptr)
1397 munmap(mapping_, mapping_size_);
1404 subscribers_ =
nullptr;
1405 free_slots_ =
nullptr;
1406 descriptors_ =
nullptr;
1407 payloads_ =
nullptr;
1410 open_status_ = ErrorCode::STATE_ERR;
1413 struct stat GetStat() const
1415 struct stat st = {};
1420 Descriptor* DescriptorRing(uint32_t subscriber_index)
const
1422 return descriptors_ +
static_cast<size_t>(subscriber_index) * queue_capacity_;
1425 static bool ProcessAlive(
const ProcessIdentity& identity)
1427 ProcessIdentity current = {};
1428 if (!ReadProcessIdentity(identity.pid, current))
1433 return current.starttime == identity.starttime;
1436 bool PublisherValid()
const
1438 if (!publisher_ || header_ ==
nullptr)
1443 const ProcessIdentity owner = {
1444 header_->publisher_pid.load(std::memory_order_acquire),
1445 header_->publisher_starttime.load(std::memory_order_acquire),
1447 return owner.pid == self_identity_.pid && owner.starttime == self_identity_.starttime;
1450 void HoldSlot(uint32_t subscriber_index, uint32_t slot_index)
1452 subscribers_[subscriber_index].held_slot.store(slot_index, std::memory_order_release);
1455 void ClearHeldSlot(uint32_t subscriber_index, uint32_t slot_index)
1457 uint32_t expected = slot_index;
1458 subscribers_[subscriber_index].held_slot.compare_exchange_strong(
1459 expected, kInvalidIndex, std::memory_order_acq_rel, std::memory_order_relaxed);
1462 ErrorCode RegisterBalancedSubscriber(uint32_t subscriber_index)
1464 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1466 uint32_t expected = kInvalidIndex;
1467 if (balanced_members_[i].compare_exchange_strong(expected, subscriber_index,
1468 std::memory_order_acq_rel,
1469 std::memory_order_relaxed))
1471 return ErrorCode::OK;
1474 return ErrorCode::FULL;
1477 void UnregisterBalancedSubscriber(uint32_t subscriber_index)
1479 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1481 uint32_t expected = subscriber_index;
1482 if (balanced_members_[i].compare_exchange_strong(expected, kInvalidIndex,
1483 std::memory_order_acq_rel,
1484 std::memory_order_relaxed))
1491 bool SelectBalancedSubscriber(uint32_t& subscriber_index)
1493 const uint64_t base = balanced_group_->rr_cursor.fetch_add(1, std::memory_order_acq_rel);
1494 for (uint32_t offset = 0; offset < subscriber_capacity_; ++offset)
1496 const uint32_t member_index =
1497 balanced_members_[(base + offset) % subscriber_capacity_].load(std::memory_order_acquire);
1498 if (member_index == kInvalidIndex)
1502 if (subscribers_[member_index].active.load(std::memory_order_acquire) == 0)
1506 if (subscribers_[member_index].mode.load(std::memory_order_acquire) !=
1507 static_cast<uint32_t
>(LinuxSharedSubscriberMode::BALANCE_RR))
1511 if (!QueueHasSpace(member_index))
1515 subscriber_index = member_index;
1521 static void PostReady(SubscriberControl& control)
1523 control.ready_sem_count.fetch_add(1, std::memory_order_release);
1524 FutexWake(&control.ready_sem_count);
1527 static void ConsumeReady(SubscriberControl& control)
1529 const uint32_t prev = control.ready_sem_count.fetch_sub(1, std::memory_order_acq_rel);
1533 static ErrorCode WaitReady(SubscriberControl& control, uint32_t timeout_ms)
1535 if (control.ready_sem_count.load(std::memory_order_acquire) != 0)
1537 return ErrorCode::OK;
1540 const bool infinite_wait = (timeout_ms == UINT32_MAX);
1541 const uint64_t deadline_ms = infinite_wait ? 0 : (NowMonotonicMs() + timeout_ms);
1545 if (control.ready_sem_count.load(std::memory_order_acquire) != 0)
1547 return ErrorCode::OK;
1550 uint32_t wait_ms = UINT32_MAX;
1553 wait_ms = MonotonicTime::RemainingMilliseconds(deadline_ms);
1556 return ErrorCode::TIMEOUT;
1560 wait_ms = MonotonicTime::WaitSliceMilliseconds(wait_ms);
1562 const int futex_ans = FutexWait(&control.ready_sem_count, 0, wait_ms);
1563 if (futex_ans == 0 || errno == EAGAIN || errno == EINTR)
1568 if (errno == ETIMEDOUT)
1574 if (MonotonicTime::RemainingMilliseconds(deadline_ms) == 0 &&
1575 control.ready_sem_count.load(std::memory_order_acquire) == 0)
1577 return ErrorCode::TIMEOUT;
1582 return ErrorCode::FAILED;
1586 void ScavengeDeadSubscribers()
1588 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1590 if (subscribers_[i].active.load(std::memory_order_acquire) == 0)
1595 const ProcessIdentity owner_identity = {
1596 subscribers_[i].owner_pid.load(std::memory_order_acquire),
1597 subscribers_[i].owner_starttime.load(std::memory_order_acquire),
1599 if (ProcessAlive(owner_identity))
1604 uint32_t expected = 1;
1605 if (!subscribers_[i].active.compare_exchange_strong(expected, 0, std::memory_order_acq_rel,
1606 std::memory_order_relaxed))
1611 subscribers_[i].owner_pid.store(0, std::memory_order_release);
1612 subscribers_[i].owner_starttime.store(0, std::memory_order_release);
1614 const uint32_t held_slot =
1615 subscribers_[i].held_slot.exchange(kInvalidIndex, std::memory_order_acq_rel);
1616 if (held_slot != kInvalidIndex)
1618 ReleaseSlot(held_slot);
1621 Descriptor desc = {};
1622 while (TryPopDescriptor(i, desc) == ErrorCode::OK)
1624 ReleaseSlot(desc.slot_index);
1629 bool QueueHasSpace(uint32_t subscriber_index)
const
1631 const SubscriberControl& control = subscribers_[subscriber_index];
1632 const uint32_t head = control.queue_head.load(std::memory_order_acquire);
1633 const uint32_t tail = control.queue_tail.load(std::memory_order_relaxed);
1634 const uint32_t next_tail = (tail + 1U) % queue_capacity_;
1635 return next_tail != head;
1638 void PushDescriptor(uint32_t subscriber_index,
const Descriptor& descriptor)
1640 SubscriberControl& control = subscribers_[subscriber_index];
1641 Descriptor* ring = DescriptorRing(subscriber_index);
1643 const uint32_t tail = control.queue_tail.load(std::memory_order_relaxed);
1644 ring[tail] = descriptor;
1645 const uint32_t next_tail = (tail + 1U) % queue_capacity_;
1646 control.queue_tail.store(next_tail, std::memory_order_release);
1650 ErrorCode TryPopDescriptor(uint32_t subscriber_index, Descriptor& descriptor)
1652 SubscriberControl& control = subscribers_[subscriber_index];
1653 Descriptor* ring = DescriptorRing(subscriber_index);
1657 uint32_t head = control.queue_head.load(std::memory_order_relaxed);
1658 const uint32_t tail = control.queue_tail.load(std::memory_order_acquire);
1661 return ErrorCode::EMPTY;
1664 descriptor = ring[head];
1665 const uint32_t next_head = (head + 1U) % queue_capacity_;
1666 if (control.queue_head.compare_exchange_weak(head, next_head, std::memory_order_acq_rel,
1667 std::memory_order_relaxed))
1669 ConsumeReady(control);
1670 return ErrorCode::OK;
1675 ErrorCode DropDescriptor(uint32_t subscriber_index)
1677 SubscriberControl& control = subscribers_[subscriber_index];
1678 Descriptor* ring = DescriptorRing(subscriber_index);
1682 uint32_t head = control.queue_head.load(std::memory_order_relaxed);
1683 const uint32_t tail = control.queue_tail.load(std::memory_order_acquire);
1686 return ErrorCode::EMPTY;
1689 const Descriptor descriptor = ring[head];
1690 const uint32_t next_head = (head + 1U) % queue_capacity_;
1691 if (control.queue_head.compare_exchange_weak(head, next_head, std::memory_order_acq_rel,
1692 std::memory_order_relaxed))
1694 control.dropped_messages.fetch_add(1, std::memory_order_relaxed);
1695 ConsumeReady(control);
1696 ReleaseSlot(descriptor.slot_index);
1697 return ErrorCode::OK;
1702 ErrorCode PopFreeSlot(uint32_t& slot_index)
1706 uint64_t head = header_->free_queue_head.load(std::memory_order_relaxed);
1707 FreeSlotCell& cell = free_slots_[head % slot_count_];
1708 const uint64_t seq = cell.sequence.load(std::memory_order_acquire);
1709 const intptr_t diff =
static_cast<intptr_t
>(seq) -
static_cast<intptr_t
>(head + 1U);
1713 if (header_->free_queue_head.compare_exchange_weak(head, head + 1U,
1714 std::memory_order_acq_rel,
1715 std::memory_order_relaxed))
1717 slot_index = cell.slot_index;
1718 cell.sequence.store(head + slot_count_, std::memory_order_release);
1719 return ErrorCode::OK;
1724 return ErrorCode::FULL;
1729 void RecycleSlot(uint32_t slot_index)
1731 slots_[slot_index].sequence.store(0, std::memory_order_release);
1735 uint64_t tail = header_->free_queue_tail.load(std::memory_order_relaxed);
1736 FreeSlotCell& cell = free_slots_[tail % slot_count_];
1737 const uint64_t seq = cell.sequence.load(std::memory_order_acquire);
1738 const intptr_t diff =
static_cast<intptr_t
>(seq) -
static_cast<intptr_t
>(tail);
1742 if (header_->free_queue_tail.compare_exchange_weak(tail, tail + 1U,
1743 std::memory_order_acq_rel,
1744 std::memory_order_relaxed))
1746 cell.slot_index = slot_index;
1747 cell.sequence.store(tail + 1U, std::memory_order_release);
1754 void ReleaseSlot(uint32_t slot_index)
1756 const uint32_t prev = slots_[slot_index].refcount.fetch_sub(1, std::memory_order_acq_rel);
1760 RecycleSlot(slot_index);
1766 if (!data.Valid() || data.topic_ !=
this)
1768 return ErrorCode::STATE_ERR;
1771 uint32_t active_count = 0;
1772 uint32_t balanced_target = kInvalidIndex;
1773 bool has_balanced_subscriber =
false;
1774 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1776 if (subscribers_[i].active.load(std::memory_order_acquire) == 0)
1781 const LinuxSharedSubscriberMode mode =
static_cast<LinuxSharedSubscriberMode
>(
1782 subscribers_[i].mode.load(std::memory_order_acquire));
1783 if (mode == LinuxSharedSubscriberMode::BALANCE_RR)
1785 has_balanced_subscriber =
true;
1789 if (!QueueHasSpace(i))
1791 ScavengeDeadSubscribers();
1792 if (subscribers_[i].active.load(std::memory_order_acquire) == 0)
1797 if (mode == LinuxSharedSubscriberMode::BROADCAST_DROP_OLD)
1799 if (DropDescriptor(i) != ErrorCode::OK)
1801 header_->publish_failures.fetch_add(1, std::memory_order_relaxed);
1803 return ErrorCode::FULL;
1808 subscribers_[i].dropped_messages.fetch_add(1, std::memory_order_relaxed);
1809 header_->publish_failures.fetch_add(1, std::memory_order_relaxed);
1811 return ErrorCode::FULL;
1818 if (has_balanced_subscriber)
1820 if (!SelectBalancedSubscriber(balanced_target))
1822 ScavengeDeadSubscribers();
1823 if (!SelectBalancedSubscriber(balanced_target))
1825 header_->publish_failures.fetch_add(1, std::memory_order_relaxed);
1827 return ErrorCode::FULL;
1833 if (active_count == 0)
1836 return ErrorCode::OK;
1839 const uint64_t sequence =
1840 header_->next_sequence.fetch_add(1, std::memory_order_acq_rel) + 1ULL;
1841 SlotControl& slot = slots_[data.slot_index_];
1842 slot.refcount.store(active_count, std::memory_order_release);
1843 slot.sequence.store(sequence, std::memory_order_release);
1845 const Descriptor descriptor = {data.slot_index_, 0U, sequence};
1846 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1848 if (subscribers_[i].active.load(std::memory_order_acquire) == 0)
1852 const LinuxSharedSubscriberMode mode =
static_cast<LinuxSharedSubscriberMode
>(
1853 subscribers_[i].mode.load(std::memory_order_acquire));
1854 if (mode == LinuxSharedSubscriberMode::BALANCE_RR)
1858 PushDescriptor(i, descriptor);
1861 if (balanced_target != kInvalidIndex)
1863 PushDescriptor(balanced_target, descriptor);
1866 data.topic_ =
nullptr;
1867 data.slot_index_ = kInvalidIndex;
1868 return ErrorCode::OK;
1871 bool create_ =
false;
1872 bool publisher_ =
false;
1873 LinuxSharedTopicConfig config_;
1874 uint32_t domain_crc32_ = 0;
1875 std::string topic_name_;
1876 uint64_t name_key_ = 0;
1877 std::string shm_name_;
1878 ProcessIdentity self_identity_ = {};
1881 void* mapping_ =
nullptr;
1882 uint8_t* base_ =
nullptr;
1883 size_t mapping_size_ = 0;
1885 SharedHeader* header_ =
nullptr;
1886 char* topic_name_ptr_ =
nullptr;
1887 SlotControl* slots_ =
nullptr;
1888 SubscriberControl* subscribers_ =
nullptr;
1889 BalancedGroupControl* balanced_group_ =
nullptr;
1890 std::atomic<uint32_t>* balanced_members_ =
nullptr;
1891 FreeSlotCell* free_slots_ =
nullptr;
1892 Descriptor* descriptors_ =
nullptr;
1893 TopicData* payloads_ =
nullptr;
1895 uint32_t slot_count_ = 0;
1896 uint32_t subscriber_capacity_ = 0;
1897 uint32_t queue_capacity_ = 0;
1899 bool open_ok_ =
false;
1900 ErrorCode open_status_ = ErrorCode::STATE_ERR;
@ INIT_ERR
初始化错误 | Initialization error