libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
linux_shared_topic_impl.hpp
1#pragma once
2
3#if defined(LIBXR_SYSTEM_POSIX_HOST)
4
5#include <atomic>
6#include <cerrno>
7#include <chrono>
8#include <cinttypes>
9#include <cstddef>
10#include <cstdint>
11#include <cstdio>
12#include <cstring>
13#include <fstream>
14#include <sstream>
15#include <string>
16#include <type_traits>
17
18#include <fcntl.h>
19#include <linux/futex.h>
20#include <sys/mman.h>
21#include <sys/stat.h>
22#include <sys/syscall.h>
23#include <time.h>
24#include <unistd.h>
25#include <signal.h>
26
27#include "crc.hpp"
28#include "libxr_def.hpp"
29#include "message.hpp"
30#include "monotonic_time.hpp"
31
32namespace LibXR
33{
38enum class LinuxSharedSubscriberMode : uint8_t
39{
40 BROADCAST_FULL = 0,
41 BROADCAST_DROP_OLD = 1,
42 BALANCE_RR = 2,
43};
44
49struct LinuxSharedTopicConfig
50{
51 uint32_t slot_num = 64;
52 uint32_t subscriber_num = 8;
53 uint32_t queue_num = 64;
54};
55
71template <typename TopicData>
72class LinuxSharedTopic : public Topic
73{
74 static_assert(std::is_trivially_copyable<TopicData>::value,
75 "LinuxSharedTopic requires trivially copyable data");
76 static_assert(std::atomic<uint32_t>::is_always_lock_free,
77 "LinuxSharedTopic requires lock-free 32-bit atomics");
78 static_assert(std::atomic<uint64_t>::is_always_lock_free,
79 "LinuxSharedTopic requires lock-free 64-bit atomics");
80
81 enum class SharedDataState : uint8_t
82 {
83 EMPTY = 0,
84 PUBLISHER = 1,
85 SUBSCRIBER = 2,
86 };
87
88 public:
89 class SharedData;
90 using Data = SharedData;
91 static constexpr const char* DEFAULT_DOMAIN_NAME = "libxr_def_domain";
92
98 class Subscriber
99 {
100 public:
101 using Data = SharedData;
102
106 Subscriber() = default;
107
116 explicit Subscriber(const char* name,
117 LinuxSharedSubscriberMode mode =
118 LinuxSharedSubscriberMode::BROADCAST_FULL)
119 : owned_topic_(new LinuxSharedTopic(name))
120 {
121 if (Attach(*owned_topic_, mode) != ErrorCode::OK)
122 {
123 delete owned_topic_;
124 owned_topic_ = nullptr;
125 }
126 }
127
128 Subscriber(const char* name, const char* domain_name,
129 LinuxSharedSubscriberMode mode = LinuxSharedSubscriberMode::BROADCAST_FULL)
130 : owned_topic_(new LinuxSharedTopic(name, domain_name))
131 {
132 if (Attach(*owned_topic_, mode) != ErrorCode::OK)
133 {
134 delete owned_topic_;
135 owned_topic_ = nullptr;
136 }
137 }
138
139 Subscriber(const char* name, Topic::Domain& domain,
140 LinuxSharedSubscriberMode mode = LinuxSharedSubscriberMode::BROADCAST_FULL)
141 : owned_topic_(new LinuxSharedTopic(name, domain))
142 {
143 if (Attach(*owned_topic_, mode) != ErrorCode::OK)
144 {
145 delete owned_topic_;
146 owned_topic_ = nullptr;
147 }
148 }
149
156 explicit Subscriber(LinuxSharedTopic& topic,
157 LinuxSharedSubscriberMode mode =
158 LinuxSharedSubscriberMode::BROADCAST_FULL)
159 {
160 (void)Attach(topic, mode);
161 }
162
166 ~Subscriber() { Reset(); }
167
168 Subscriber(const Subscriber&) = delete;
169 Subscriber& operator=(const Subscriber&) = delete;
170
171 Subscriber(Subscriber&& other) noexcept { *this = std::move(other); }
172
173 Subscriber& operator=(Subscriber&& other) noexcept
174 {
175 if (this == &other)
176 {
177 return *this;
178 }
179
180 Reset();
181
182 topic_ = other.topic_;
183 owned_topic_ = other.owned_topic_;
184 subscriber_index_ = other.subscriber_index_;
185 current_slot_index_ = other.current_slot_index_;
186 current_sequence_ = other.current_sequence_;
187
188 other.topic_ = nullptr;
189 other.owned_topic_ = nullptr;
190 other.subscriber_index_ = kInvalidIndex;
191 other.current_slot_index_ = kInvalidIndex;
192 other.current_sequence_ = 0;
193 return *this;
194 }
195
201 bool Valid() const
202 {
203 return topic_ != nullptr && subscriber_index_ != kInvalidIndex;
204 }
205
213 ErrorCode Wait(uint32_t timeout_ms = UINT32_MAX)
214 {
215 if (!Valid())
216 {
217 return ErrorCode::STATE_ERR;
218 }
219
220 const uint64_t deadline_ms =
221 (timeout_ms == UINT32_MAX) ? 0 : (NowMonotonicMs() + timeout_ms);
222
223 Descriptor desc = {};
224 while (true)
225 {
226 ErrorCode pop_ans = topic_->TryPopDescriptor(subscriber_index_, desc);
227 if (pop_ans == ErrorCode::OK)
228 {
229 Release();
230 topic_->HoldSlot(subscriber_index_, desc.slot_index);
231 current_slot_index_ = desc.slot_index;
232 current_sequence_ = desc.sequence;
233 return ErrorCode::OK;
234 }
235
236 uint32_t wait_ms = UINT32_MAX;
237 if (timeout_ms != UINT32_MAX)
238 {
239 const uint64_t now_ms = NowMonotonicMs();
240 if (now_ms >= deadline_ms)
241 {
242 return ErrorCode::TIMEOUT;
243 }
244 wait_ms = static_cast<uint32_t>(deadline_ms - now_ms);
245 }
246
247 const ErrorCode wait_ans = topic_->WaitReady(topic_->subscribers_[subscriber_index_],
248 wait_ms);
249 if (wait_ans == ErrorCode::OK)
250 {
251 continue;
252 }
253 return wait_ans;
254 }
255 }
256
265 ErrorCode Wait(SharedData& data, uint32_t timeout_ms = UINT32_MAX)
266 {
267 data.Reset();
268
269 if (!Valid())
270 {
271 return ErrorCode::STATE_ERR;
272 }
273
274 const uint64_t deadline_ms =
275 (timeout_ms == UINT32_MAX) ? 0 : (NowMonotonicMs() + timeout_ms);
276
277 Descriptor desc = {};
278 while (true)
279 {
280 ErrorCode pop_ans = topic_->TryPopDescriptor(subscriber_index_, desc);
281 if (pop_ans == ErrorCode::OK)
282 {
283 data.topic_ = topic_;
284 data.slot_index_ = desc.slot_index;
285 data.sequence_ = desc.sequence;
286 data.state_ = SharedDataState::SUBSCRIBER;
287 data.subscriber_index_ = subscriber_index_;
288 topic_->HoldSlot(subscriber_index_, desc.slot_index);
289 return ErrorCode::OK;
290 }
291
292 uint32_t wait_ms = UINT32_MAX;
293 if (timeout_ms != UINT32_MAX)
294 {
295 const uint64_t now_ms = NowMonotonicMs();
296 if (now_ms >= deadline_ms)
297 {
298 return ErrorCode::TIMEOUT;
299 }
300 wait_ms = static_cast<uint32_t>(deadline_ms - now_ms);
301 }
302
303 const ErrorCode wait_ans = topic_->WaitReady(topic_->subscribers_[subscriber_index_],
304 wait_ms);
305 if (wait_ans == ErrorCode::OK)
306 {
307 continue;
308 }
309 return wait_ans;
310 }
311 }
312
318 const TopicData* GetData() const
319 {
320 if (!Valid() || current_slot_index_ == kInvalidIndex)
321 {
322 return nullptr;
323 }
324
325 return &topic_->payloads_[current_slot_index_];
326 }
327
331 uint64_t GetSequence() const { return current_sequence_; }
332
336 uint32_t GetPendingNum() const
337 {
338 if (!Valid())
339 {
340 return 0;
341 }
342
343 const SubscriberControl& control = topic_->subscribers_[subscriber_index_];
344 const uint32_t head = control.queue_head.load(std::memory_order_acquire);
345 const uint32_t tail = control.queue_tail.load(std::memory_order_acquire);
346 if (tail >= head)
347 {
348 return tail - head;
349 }
350 return topic_->queue_capacity_ - (head - tail);
351 }
352
356 uint64_t GetDropNum() const
357 {
358 if (!Valid())
359 {
360 return 0;
361 }
362
363 return topic_->subscribers_[subscriber_index_].dropped_messages.load(
364 std::memory_order_acquire);
365 }
366
370 void Release()
371 {
372 if (!Valid() || current_slot_index_ == kInvalidIndex)
373 {
374 return;
375 }
376
377 topic_->ClearHeldSlot(subscriber_index_, current_slot_index_);
378 topic_->ReleaseSlot(current_slot_index_);
379 current_slot_index_ = kInvalidIndex;
380 current_sequence_ = 0;
381 }
382
387 void Reset()
388 {
389 if (!Valid())
390 {
391 return;
392 }
393
394 topic_->UnregisterBalancedSubscriber(subscriber_index_);
395 topic_->subscribers_[subscriber_index_].active.store(0, std::memory_order_release);
396 topic_->subscribers_[subscriber_index_].owner_pid.store(0, std::memory_order_release);
397 topic_->subscribers_[subscriber_index_].owner_starttime.store(0,
398 std::memory_order_release);
399
400 Descriptor desc = {};
401 while (topic_->TryPopDescriptor(subscriber_index_, desc) == ErrorCode::OK)
402 {
403 topic_->ReleaseSlot(desc.slot_index);
404 }
405
406 Release();
407
408 topic_ = nullptr;
409 delete owned_topic_;
410 owned_topic_ = nullptr;
411 subscriber_index_ = kInvalidIndex;
412 current_slot_index_ = kInvalidIndex;
413 current_sequence_ = 0;
414 }
415
416 private:
417 ErrorCode Attach(LinuxSharedTopic& topic, LinuxSharedSubscriberMode mode)
418 {
419 Reset();
420
421 if (!topic.Valid())
422 {
423 return ErrorCode::STATE_ERR;
424 }
425
426 if (topic.self_identity_.starttime == 0)
427 {
428 return ErrorCode::STATE_ERR;
429 }
430
431 for (uint32_t i = 0; i < topic.subscriber_capacity_; ++i)
432 {
433 uint32_t expected = 0;
434 auto& active = topic.subscribers_[i].active;
435 if (active.compare_exchange_strong(expected, 1, std::memory_order_acq_rel,
436 std::memory_order_relaxed))
437 {
438 topic.subscribers_[i].queue_head.store(0, std::memory_order_release);
439 topic.subscribers_[i].queue_tail.store(0, std::memory_order_release);
440 topic.subscribers_[i].ready_sem_count.store(0, std::memory_order_release);
441 topic.subscribers_[i].dropped_messages.store(0, std::memory_order_release);
442 topic.subscribers_[i].owner_pid.store(topic.self_identity_.pid,
443 std::memory_order_release);
444 topic.subscribers_[i].owner_starttime.store(topic.self_identity_.starttime,
445 std::memory_order_release);
446 topic.subscribers_[i].held_slot.store(kInvalidIndex, std::memory_order_release);
447 topic.subscribers_[i].mode.store(static_cast<uint32_t>(mode),
448 std::memory_order_release);
449 if (mode == LinuxSharedSubscriberMode::BALANCE_RR)
450 {
451 const ErrorCode join_ans = topic.RegisterBalancedSubscriber(i);
452 if (join_ans != ErrorCode::OK)
453 {
454 topic.subscribers_[i].active.store(0, std::memory_order_release);
455 topic.subscribers_[i].owner_pid.store(0, std::memory_order_release);
456 topic.subscribers_[i].owner_starttime.store(0,
457 std::memory_order_release);
458 topic.subscribers_[i].mode.store(
459 static_cast<uint32_t>(LinuxSharedSubscriberMode::BROADCAST_FULL),
460 std::memory_order_release);
461 return join_ans;
462 }
463 }
464 topic_ = &topic;
465 subscriber_index_ = i;
466 current_slot_index_ = kInvalidIndex;
467 current_sequence_ = 0;
468 return ErrorCode::OK;
469 }
470 }
471
472 return ErrorCode::FULL;
473 }
474
475 LinuxSharedTopic* topic_ = nullptr;
476 LinuxSharedTopic* owned_topic_ = nullptr;
477 uint32_t subscriber_index_ = kInvalidIndex;
478 uint32_t current_slot_index_ = kInvalidIndex;
479 uint64_t current_sequence_ = 0;
480 };
481
491 class SharedData
492 {
493 public:
497 SharedData() = default;
498
503 ~SharedData() { Reset(); }
504
505 SharedData(const SharedData&) = delete;
506 SharedData& operator=(const SharedData&) = delete;
507
508 SharedData(SharedData&& other) noexcept { *this = std::move(other); }
509
513 SharedData& operator=(SharedData&& other) noexcept
514 {
515 if (this == &other)
516 {
517 return *this;
518 }
519
520 Reset();
521
522 topic_ = other.topic_;
523 slot_index_ = other.slot_index_;
524 sequence_ = other.sequence_;
525 state_ = other.state_;
526 subscriber_index_ = other.subscriber_index_;
527
528 other.topic_ = nullptr;
529 other.slot_index_ = kInvalidIndex;
530 other.sequence_ = 0;
531 other.state_ = SharedDataState::EMPTY;
532 other.subscriber_index_ = kInvalidIndex;
533 return *this;
534 }
535
539 bool Valid() const { return topic_ != nullptr && slot_index_ != kInvalidIndex; }
540
544 bool Empty() const { return !Valid(); }
545
549 uint64_t GetSequence() const { return sequence_; }
550
556 TopicData* GetData()
557 {
558 if (!Valid())
559 {
560 return nullptr;
561 }
562 return &topic_->payloads_[slot_index_];
563 }
564
570 const TopicData* GetData() const
571 {
572 if (!Valid())
573 {
574 return nullptr;
575 }
576 return &topic_->payloads_[slot_index_];
577 }
578
582 void Reset()
583 {
584 if (!Valid())
585 {
586 return;
587 }
588
589 if (state_ == SharedDataState::PUBLISHER)
590 {
591 topic_->RecycleSlot(slot_index_);
592 }
593 else if (state_ == SharedDataState::SUBSCRIBER)
594 {
595 topic_->ClearHeldSlot(subscriber_index_, slot_index_);
596 topic_->ReleaseSlot(slot_index_);
597 }
598 topic_ = nullptr;
599 slot_index_ = kInvalidIndex;
600 sequence_ = 0;
601 state_ = SharedDataState::EMPTY;
602 subscriber_index_ = kInvalidIndex;
603 }
604
605 private:
606 friend class LinuxSharedTopic<TopicData>;
607 friend class Subscriber;
608
609 LinuxSharedTopic* topic_ = nullptr;
610 uint32_t slot_index_ = kInvalidIndex;
611 uint64_t sequence_ = 0;
612 SharedDataState state_ = SharedDataState::EMPTY;
613 uint32_t subscriber_index_ = kInvalidIndex;
614 };
615
620 explicit LinuxSharedTopic(const char* topic_name)
621 : LinuxSharedTopic(topic_name, DEFAULT_DOMAIN_NAME)
622 {
623 }
624
625 LinuxSharedTopic(const char* topic_name, const char* domain_name)
626 : create_(false),
627 publisher_(false),
628 config_(),
629 domain_crc32_(ResolveDomainKey(domain_name)),
630 topic_name_(ResolveTopicName(topic_name)),
631 name_key_(BuildNameKey(domain_crc32_, topic_name_)),
632 shm_name_(BuildShmName(name_key_))
633 {
634 (void)ReadProcessIdentity(static_cast<uint32_t>(getpid()), self_identity_);
635 Open();
636 }
637
638 LinuxSharedTopic(const char* topic_name, Topic::Domain& domain)
639 : create_(false),
640 publisher_(false),
641 config_(),
642 domain_crc32_(domain.node_ != nullptr ? domain.node_->key : 0),
643 topic_name_(ResolveTopicName(topic_name)),
644 name_key_(BuildNameKey(domain_crc32_, topic_name_)),
645 shm_name_(BuildShmName(name_key_))
646 {
647 (void)ReadProcessIdentity(static_cast<uint32_t>(getpid()), self_identity_);
648 Open();
649 }
650
657 LinuxSharedTopic(const char* topic_name, const LinuxSharedTopicConfig& config)
658 : LinuxSharedTopic(topic_name, DEFAULT_DOMAIN_NAME, config)
659 {
660 }
661
662 LinuxSharedTopic(const char* topic_name, const char* domain_name,
663 const LinuxSharedTopicConfig& config)
664 : create_(true),
665 publisher_(true),
666 config_(config),
667 domain_crc32_(ResolveDomainKey(domain_name)),
668 topic_name_(ResolveTopicName(topic_name)),
669 name_key_(BuildNameKey(domain_crc32_, topic_name_)),
670 shm_name_(BuildShmName(name_key_))
671 {
672 (void)ReadProcessIdentity(static_cast<uint32_t>(getpid()), self_identity_);
673 Open();
674 }
675
676 LinuxSharedTopic(const char* topic_name, Topic::Domain& domain,
677 const LinuxSharedTopicConfig& config)
678 : create_(true),
679 publisher_(true),
680 config_(config),
681 domain_crc32_(domain.node_ != nullptr ? domain.node_->key : 0),
682 topic_name_(ResolveTopicName(topic_name)),
683 name_key_(BuildNameKey(domain_crc32_, topic_name_)),
684 shm_name_(BuildShmName(name_key_))
685 {
686 (void)ReadProcessIdentity(static_cast<uint32_t>(getpid()), self_identity_);
687 Open();
688 }
689
694 using SyncSubscriber = Subscriber;
695
699 ~LinuxSharedTopic() { Close(); }
700
701 LinuxSharedTopic(const LinuxSharedTopic&) = delete;
702 LinuxSharedTopic& operator=(const LinuxSharedTopic&) = delete;
703
704 LinuxSharedTopic(LinuxSharedTopic&&) = delete;
705 LinuxSharedTopic& operator=(LinuxSharedTopic&&) = delete;
706
711 bool Valid() const { return open_ok_; }
712
716 ErrorCode GetError() const { return open_status_; }
717
721 uint32_t GetSubscriberNum() const
722 {
723 if (!Valid())
724 {
725 return 0;
726 }
727
728 uint32_t count = 0;
729 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
730 {
731 if (subscribers_[i].active.load(std::memory_order_acquire) != 0)
732 {
733 ++count;
734 }
735 }
736 return count;
737 }
738
745 ErrorCode CreateData(SharedData& data)
746 {
747 if (!Valid())
748 {
749 return ErrorCode::STATE_ERR;
750 }
751
752 if (!PublisherValid())
753 {
754 return ErrorCode::STATE_ERR;
755 }
756
757 data.Reset();
758
759 uint32_t slot_index = kInvalidIndex;
760 ErrorCode pop_ans = PopFreeSlot(slot_index);
761 if (pop_ans != ErrorCode::OK)
762 {
763 ScavengeDeadSubscribers();
764 pop_ans = PopFreeSlot(slot_index);
765 if (pop_ans != ErrorCode::OK)
766 {
767 return pop_ans;
768 }
769 }
770
771 slots_[slot_index].refcount.store(0, std::memory_order_release);
772 slots_[slot_index].sequence.store(0, std::memory_order_release);
773
774 data.topic_ = this;
775 data.slot_index_ = slot_index;
776 data.sequence_ = 0;
777 data.state_ = SharedDataState::PUBLISHER;
778 data.subscriber_index_ = kInvalidIndex;
779 return ErrorCode::OK;
780 }
781
787 ErrorCode Publish(const TopicData& data)
788 {
789 SharedData topic_data;
790 const ErrorCode acquire_ans = CreateData(topic_data);
791 if (acquire_ans != ErrorCode::OK)
792 {
793 return acquire_ans;
794 }
795
796 *topic_data.GetData() = data;
797 return Publish(topic_data);
798 }
799
803 ErrorCode Publish(SharedData&& data) { return PublishData(data); }
804
808 ErrorCode Publish(SharedData& data) { return PublishData(data); }
809
813 uint64_t GetPublishFailedNum() const
814 {
815 if (!Valid())
816 {
817 return 0;
818 }
819 return header_->publish_failures.load(std::memory_order_acquire);
820 }
821
827 static ErrorCode Remove(const char* topic_name)
828 {
829 return Remove(topic_name, DEFAULT_DOMAIN_NAME);
830 }
831
832 static ErrorCode Remove(const char* topic_name, const char* domain_name)
833 {
834 const std::string shm_name = BuildShmName(
835 BuildNameKey(ResolveDomainKey(domain_name), ResolveTopicName(topic_name)));
836 if (shm_unlink(shm_name.c_str()) == 0 || errno == ENOENT)
837 {
838 return ErrorCode::OK;
839 }
840 return ErrorCode::FAILED;
841 }
842
843 static ErrorCode Remove(const char* topic_name, Topic::Domain& domain)
844 {
845 const uint32_t domain_crc32 = (domain.node_ != nullptr) ? domain.node_->key : 0;
846 const std::string shm_name =
847 BuildShmName(BuildNameKey(domain_crc32, ResolveTopicName(topic_name)));
848 if (shm_unlink(shm_name.c_str()) == 0 || errno == ENOENT)
849 {
850 return ErrorCode::OK;
851 }
852 return ErrorCode::FAILED;
853 }
854
855 private:
856 struct alignas(64) SharedHeader
857 {
858 uint64_t magic = 0;
859 uint64_t name_key = 0;
860 uint32_t domain_crc32 = 0;
861 uint32_t version = 0;
862 uint32_t data_size = 0;
863 uint32_t slot_count = 0;
864 uint32_t subscriber_capacity = 0;
865 uint32_t queue_capacity = 0;
866 uint32_t topic_name_len = 0;
867 std::atomic<uint32_t> init_state;
868 std::atomic<uint32_t> publisher_pid;
869 std::atomic<uint64_t> publisher_starttime;
870 std::atomic<uint64_t> free_queue_head;
871 std::atomic<uint64_t> free_queue_tail;
872 std::atomic<uint64_t> next_sequence;
873 std::atomic<uint64_t> publish_failures;
874 };
875
876 struct alignas(64) SlotControl
877 {
878 std::atomic<uint32_t> refcount;
879 std::atomic<uint64_t> sequence;
880 };
881
882 struct alignas(16) FreeSlotCell
883 {
884 std::atomic<uint64_t> sequence;
885 uint32_t slot_index = 0;
886 uint32_t reserved = 0;
887 };
888
889 struct Descriptor
890 {
891 uint32_t slot_index = kInvalidIndex;
892 uint32_t reserved = 0;
893 uint64_t sequence = 0;
894 };
895
896 struct alignas(64) SubscriberControl
897 {
898 std::atomic<uint32_t> active;
899 std::atomic<uint32_t> mode;
900 std::atomic<uint32_t> queue_head;
901 std::atomic<uint32_t> queue_tail;
902 std::atomic<uint32_t> ready_sem_count;
903 std::atomic<uint64_t> dropped_messages;
904 std::atomic<uint32_t> owner_pid;
905 std::atomic<uint64_t> owner_starttime;
906 std::atomic<uint32_t> held_slot;
907 };
908
909 struct alignas(64) BalancedGroupControl
910 {
911 std::atomic<uint64_t> rr_cursor;
912 };
913
914 struct ProcessIdentity
915 {
916 uint32_t pid = 0;
917 uint64_t starttime = 0;
918 };
919
920 static constexpr uint64_t kMagic = 0x4c58524950435348ULL;
921 static constexpr uint32_t kVersion = 1;
922 static constexpr uint32_t kInitReady = 1;
923 static constexpr uint32_t kInvalidIndex = UINT32_MAX;
924
925 static uint32_t ResolveDomainKey(const char* domain_name)
926 {
927 const std::string resolved =
928 (domain_name == nullptr || domain_name[0] == '\0') ? std::string(DEFAULT_DOMAIN_NAME)
929 : std::string(domain_name);
930 return CRC32::Calculate(resolved.data(), resolved.size());
931 }
932
933 static std::string ResolveTopicName(const char* topic_name)
934 {
935 return (topic_name != nullptr) ? std::string(topic_name) : std::string();
936 }
937
938 static uint64_t BuildNameKey(uint32_t domain_crc32, const std::string& topic_name)
939 {
940 const uint32_t topic_len = static_cast<uint32_t>(topic_name.size());
941 std::string key_material;
942 key_material.reserve(sizeof(domain_crc32) + sizeof(topic_len) + topic_len);
943 key_material.append(reinterpret_cast<const char*>(&domain_crc32), sizeof(domain_crc32));
944 key_material.append(reinterpret_cast<const char*>(&topic_len), sizeof(topic_len));
945 key_material.append(topic_name.data(), topic_name.size());
946 return CRC64::Calculate(key_material.data(), key_material.size());
947 }
948
949 static std::string BuildShmName(uint64_t name_key)
950 {
951 char buffer[64] = {};
952 std::snprintf(buffer, sizeof(buffer), "/libxr_ipc_%016" PRIx64, name_key);
953 return std::string(buffer);
954 }
955
956 static size_t AlignUp(size_t value, size_t alignment)
957 {
958 return (value + alignment - 1U) & ~(alignment - 1U);
959 }
960
961 static uint64_t NowMonotonicMs() { return MonotonicTime::NowMilliseconds(); }
962
963 static bool ReadProcessIdentity(uint32_t pid, ProcessIdentity& identity)
964 {
965 identity = {};
966 if (pid == 0)
967 {
968 return false;
969 }
970
971 char path[64] = {};
972 std::snprintf(path, sizeof(path), "/proc/%u/stat", pid);
973
974 std::ifstream file(path);
975 if (!file.is_open())
976 {
977 return false;
978 }
979
980 std::string line;
981 std::getline(file, line);
982 if (line.empty())
983 {
984 return false;
985 }
986
987 const size_t rparen = line.rfind(')');
988 if (rparen == std::string::npos || rparen + 2U >= line.size())
989 {
990 return false;
991 }
992
993 std::istringstream iss(line.substr(rparen + 2U));
994 std::string token;
995 for (int field = 3; field <= 22; ++field)
996 {
997 if (!(iss >> token))
998 {
999 return false;
1000 }
1001
1002 if (field == 22)
1003 {
1004 identity.pid = pid;
1005 identity.starttime = std::strtoull(token.c_str(), nullptr, 10);
1006 return identity.starttime != 0;
1007 }
1008 }
1009
1010 return false;
1011 }
1012
1013 static int FutexWait(std::atomic<uint32_t>* word, uint32_t expected, uint32_t timeout_ms)
1014 {
1015 struct timespec timeout = {};
1016 struct timespec* timeout_ptr = nullptr;
1017 if (timeout_ms != UINT32_MAX)
1018 {
1019 timeout.tv_sec = static_cast<time_t>(timeout_ms / 1000U);
1020 timeout.tv_nsec = static_cast<long>(timeout_ms % 1000U) * 1000000L;
1021 timeout_ptr = &timeout;
1022 }
1023
1024 return static_cast<int>(syscall(SYS_futex,
1025 reinterpret_cast<uint32_t*>(word),
1026 FUTEX_WAIT,
1027 expected,
1028 timeout_ptr,
1029 nullptr,
1030 0));
1031 }
1032
1033 static int FutexWake(std::atomic<uint32_t>* word)
1034 {
1035 return static_cast<int>(
1036 syscall(SYS_futex, reinterpret_cast<uint32_t*>(word), FUTEX_WAKE, INT32_MAX, nullptr,
1037 nullptr, 0));
1038 }
1039
1040 static size_t ComputeSharedBytes(uint32_t slot_count,
1041 uint32_t subscriber_capacity,
1042 uint32_t queue_capacity,
1043 uint32_t topic_name_len)
1044 {
1045 size_t offset = 0;
1046 offset = AlignUp(offset, alignof(SharedHeader));
1047 offset += sizeof(SharedHeader);
1048
1049 offset += static_cast<size_t>(topic_name_len) + 1U;
1050
1051 offset = AlignUp(offset, alignof(SlotControl));
1052 offset += sizeof(SlotControl) * slot_count;
1053
1054 offset = AlignUp(offset, alignof(SubscriberControl));
1055 offset += sizeof(SubscriberControl) * subscriber_capacity;
1056
1057 offset = AlignUp(offset, alignof(BalancedGroupControl));
1058 offset += sizeof(BalancedGroupControl);
1059
1060 offset = AlignUp(offset, alignof(std::atomic<uint32_t>));
1061 offset += sizeof(std::atomic<uint32_t>) * subscriber_capacity;
1062
1063 offset = AlignUp(offset, alignof(FreeSlotCell));
1064 offset += sizeof(FreeSlotCell) * slot_count;
1065
1066 offset = AlignUp(offset, alignof(Descriptor));
1067 offset += sizeof(Descriptor) * subscriber_capacity * queue_capacity;
1068
1069 offset = AlignUp(offset, alignof(TopicData));
1070 offset += sizeof(TopicData) * slot_count;
1071 return offset;
1072 }
1073
1074 void SetupPointers()
1075 {
1076 size_t offset = 0;
1077
1078 offset = AlignUp(offset, alignof(SharedHeader));
1079 header_ = reinterpret_cast<SharedHeader*>(base_ + offset);
1080 offset += sizeof(SharedHeader);
1081
1082 topic_name_ptr_ = reinterpret_cast<char*>(base_ + offset);
1083 offset += static_cast<size_t>(header_->topic_name_len) + 1U;
1084
1085 offset = AlignUp(offset, alignof(SlotControl));
1086 slots_ = reinterpret_cast<SlotControl*>(base_ + offset);
1087 offset += sizeof(SlotControl) * slot_count_;
1088
1089 offset = AlignUp(offset, alignof(SubscriberControl));
1090 subscribers_ = reinterpret_cast<SubscriberControl*>(base_ + offset);
1091 offset += sizeof(SubscriberControl) * subscriber_capacity_;
1092
1093 offset = AlignUp(offset, alignof(BalancedGroupControl));
1094 balanced_group_ = reinterpret_cast<BalancedGroupControl*>(base_ + offset);
1095 offset += sizeof(BalancedGroupControl);
1096
1097 offset = AlignUp(offset, alignof(std::atomic<uint32_t>));
1098 balanced_members_ = reinterpret_cast<std::atomic<uint32_t>*>(base_ + offset);
1099 offset += sizeof(std::atomic<uint32_t>) * subscriber_capacity_;
1100
1101 offset = AlignUp(offset, alignof(FreeSlotCell));
1102 free_slots_ = reinterpret_cast<FreeSlotCell*>(base_ + offset);
1103 offset += sizeof(FreeSlotCell) * slot_count_;
1104
1105 offset = AlignUp(offset, alignof(Descriptor));
1106 descriptors_ = reinterpret_cast<Descriptor*>(base_ + offset);
1107 offset += sizeof(Descriptor) * subscriber_capacity_ * queue_capacity_;
1108
1109 offset = AlignUp(offset, alignof(TopicData));
1110 payloads_ = reinterpret_cast<TopicData*>(base_ + offset);
1111 }
1112
1113 bool HeaderMatchesIdentity() const
1114 {
1115 if (header_->name_key != name_key_)
1116 {
1117 return false;
1118 }
1119 if (header_->domain_crc32 != domain_crc32_)
1120 {
1121 return false;
1122 }
1123 if (header_->topic_name_len != topic_name_.size())
1124 {
1125 return false;
1126 }
1127 if (std::memcmp(topic_name_ptr_, topic_name_.c_str(), topic_name_.size() + 1U) != 0)
1128 {
1129 return false;
1130 }
1131 return true;
1132 }
1133
1134 ErrorCode InitializeLayout()
1135 {
1136 if (config_.slot_num == 0 || config_.subscriber_num == 0 || config_.queue_num < 2)
1137 {
1138 return ErrorCode::ARG_ERR;
1139 }
1140
1141 const size_t bytes =
1142 ComputeSharedBytes(config_.slot_num, config_.subscriber_num, config_.queue_num,
1143 static_cast<uint32_t>(topic_name_.size()));
1144
1145 if (ftruncate(fd_, static_cast<off_t>(bytes)) != 0)
1146 {
1147 return ErrorCode::INIT_ERR;
1148 }
1149
1150 const struct stat st = GetStat();
1151 if (st.st_size <= 0)
1152 {
1153 return ErrorCode::INIT_ERR;
1154 }
1155
1156 mapping_size_ = static_cast<size_t>(st.st_size);
1157 mapping_ = mmap(nullptr, mapping_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
1158 if (mapping_ == MAP_FAILED)
1159 {
1160 mapping_ = nullptr;
1161 return ErrorCode::INIT_ERR;
1162 }
1163
1164 base_ = static_cast<uint8_t*>(mapping_);
1165 slot_count_ = config_.slot_num;
1166 subscriber_capacity_ = config_.subscriber_num;
1167 queue_capacity_ = config_.queue_num;
1168 header_ = reinterpret_cast<SharedHeader*>(base_ + AlignUp(0, alignof(SharedHeader)));
1169 header_->topic_name_len = static_cast<uint32_t>(topic_name_.size());
1170 SetupPointers();
1171
1172 header_->magic = kMagic;
1173 header_->name_key = name_key_;
1174 header_->domain_crc32 = domain_crc32_;
1175 header_->version = kVersion;
1176 header_->data_size = sizeof(TopicData);
1177 header_->slot_count = slot_count_;
1178 header_->subscriber_capacity = subscriber_capacity_;
1179 header_->queue_capacity = queue_capacity_;
1180 std::memcpy(topic_name_ptr_, topic_name_.c_str(), topic_name_.size() + 1U);
1181 header_->publisher_pid.store(self_identity_.pid, std::memory_order_release);
1182 header_->publisher_starttime.store(self_identity_.starttime, std::memory_order_release);
1183 header_->free_queue_head.store(0, std::memory_order_release);
1184 header_->free_queue_tail.store(slot_count_, std::memory_order_release);
1185 header_->next_sequence.store(0, std::memory_order_release);
1186 header_->publish_failures.store(0, std::memory_order_release);
1187
1188 for (uint32_t i = 0; i < slot_count_; ++i)
1189 {
1190 slots_[i].refcount.store(0, std::memory_order_release);
1191 slots_[i].sequence.store(0, std::memory_order_release);
1192 std::memset(&payloads_[i], 0, sizeof(TopicData));
1193 free_slots_[i].slot_index = i;
1194 free_slots_[i].sequence.store(static_cast<uint64_t>(i) + 1U,
1195 std::memory_order_release);
1196 }
1197
1198 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1199 {
1200 subscribers_[i].active.store(0, std::memory_order_release);
1201 subscribers_[i].mode.store(static_cast<uint32_t>(LinuxSharedSubscriberMode::BROADCAST_FULL),
1202 std::memory_order_release);
1203 subscribers_[i].queue_head.store(0, std::memory_order_release);
1204 subscribers_[i].queue_tail.store(0, std::memory_order_release);
1205 subscribers_[i].ready_sem_count.store(0, std::memory_order_release);
1206 subscribers_[i].dropped_messages.store(0, std::memory_order_release);
1207 subscribers_[i].owner_pid.store(0, std::memory_order_release);
1208 subscribers_[i].owner_starttime.store(0, std::memory_order_release);
1209 subscribers_[i].held_slot.store(kInvalidIndex, std::memory_order_release);
1210 balanced_members_[i].store(kInvalidIndex, std::memory_order_release);
1211 }
1212
1213 balanced_group_->rr_cursor.store(0, std::memory_order_release);
1214
1215 for (size_t i = 0; i < static_cast<size_t>(subscriber_capacity_) * queue_capacity_; ++i)
1216 {
1217 descriptors_[i] = Descriptor{};
1218 }
1219
1220 header_->init_state.store(kInitReady, std::memory_order_release);
1221 return ErrorCode::OK;
1222 }
1223
1224 ErrorCode AttachLayout()
1225 {
1226 const struct stat st = GetStat();
1227 if (st.st_size <= 0)
1228 {
1229 return ErrorCode::NOT_FOUND;
1230 }
1231
1232 mapping_size_ = static_cast<size_t>(st.st_size);
1233 mapping_ = mmap(nullptr, mapping_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
1234 if (mapping_ == MAP_FAILED)
1235 {
1236 mapping_ = nullptr;
1237 return ErrorCode::INIT_ERR;
1238 }
1239
1240 base_ = static_cast<uint8_t*>(mapping_);
1241 header_ = reinterpret_cast<SharedHeader*>(base_);
1242
1243 while (header_->init_state.load(std::memory_order_acquire) != kInitReady)
1244 {
1245 usleep(1000);
1246 }
1247
1248 if (header_->magic != kMagic || header_->version != kVersion ||
1249 header_->data_size != sizeof(TopicData))
1250 {
1251 return ErrorCode::CHECK_ERR;
1252 }
1253
1254 slot_count_ = header_->slot_count;
1255 subscriber_capacity_ = header_->subscriber_capacity;
1256 queue_capacity_ = header_->queue_capacity;
1257 SetupPointers();
1258 if (!HeaderMatchesIdentity())
1259 {
1260 return ErrorCode::CHECK_ERR;
1261 }
1262 return ErrorCode::OK;
1263 }
1264
1265 bool TryReclaimStaleSegment()
1266 {
1267 int stale_fd = shm_open(shm_name_.c_str(), O_RDWR, 0600);
1268 if (stale_fd < 0)
1269 {
1270 return errno == ENOENT;
1271 }
1272
1273 struct stat st = {};
1274 if (fstat(stale_fd, &st) != 0)
1275 {
1276 close(stale_fd);
1277 return false;
1278 }
1279
1280 bool reclaim = false;
1281 if (st.st_size < static_cast<off_t>(sizeof(SharedHeader)))
1282 {
1283 reclaim = true;
1284 }
1285 else
1286 {
1287 void* mapping = mmap(nullptr, static_cast<size_t>(st.st_size), PROT_READ | PROT_WRITE,
1288 MAP_SHARED, stale_fd, 0);
1289 if (mapping != MAP_FAILED)
1290 {
1291 uint8_t* base = static_cast<uint8_t*>(mapping);
1292 auto* header = reinterpret_cast<SharedHeader*>(mapping);
1293 const uint32_t init_state = header->init_state.load(std::memory_order_acquire);
1294 bool identity_match = false;
1295 const size_t mapping_size = static_cast<size_t>(st.st_size);
1296 const size_t topic_name_bytes = topic_name_.size() + 1U;
1297 if (header->magic == kMagic && header->version == kVersion &&
1298 header->domain_crc32 == domain_crc32_ &&
1299 header->topic_name_len == topic_name_.size())
1300 {
1301 size_t offset = AlignUp(0, alignof(SharedHeader));
1302 offset += sizeof(SharedHeader);
1303 if (offset <= mapping_size && topic_name_bytes <= (mapping_size - offset))
1304 {
1305 const char* mapped_topic = reinterpret_cast<const char*>(base + offset);
1306 identity_match =
1307 (header->name_key == name_key_) &&
1308 (std::memcmp(mapped_topic, topic_name_.c_str(), topic_name_bytes) == 0);
1309 }
1310 }
1311 const ProcessIdentity publisher_identity = {
1312 header->publisher_pid.load(std::memory_order_acquire),
1313 header->publisher_starttime.load(std::memory_order_acquire),
1314 };
1315
1316 if (!identity_match)
1317 {
1318 reclaim = false;
1319 }
1320 else if (init_state != kInitReady)
1321 {
1322 reclaim = !ProcessAlive(publisher_identity);
1323 }
1324 else if (!ProcessAlive(publisher_identity))
1325 {
1326 reclaim = true;
1327 }
1328
1329 munmap(mapping, static_cast<size_t>(st.st_size));
1330 }
1331 }
1332
1333 close(stale_fd);
1334
1335 if (!reclaim)
1336 {
1337 return false;
1338 }
1339
1340 return shm_unlink(shm_name_.c_str()) == 0 || errno == ENOENT;
1341 }
1342
1343 void Open()
1344 {
1345 open_status_ = ErrorCode::STATE_ERR;
1346 open_ok_ = false;
1347
1348 if (create_)
1349 {
1350 for (int attempt = 0; attempt < 2; ++attempt)
1351 {
1352 fd_ = shm_open(shm_name_.c_str(), O_CREAT | O_EXCL | O_RDWR, 0600);
1353 if (fd_ >= 0)
1354 {
1355 break;
1356 }
1357
1358 if (errno != EEXIST || !TryReclaimStaleSegment())
1359 {
1360 break;
1361 }
1362 }
1363
1364 if (fd_ < 0)
1365 {
1366 open_status_ = (errno == EEXIST) ? ErrorCode::BUSY : ErrorCode::INIT_ERR;
1367 return;
1368 }
1369
1370 open_status_ = InitializeLayout();
1371 }
1372 else
1373 {
1374 fd_ = shm_open(shm_name_.c_str(), O_RDWR, 0600);
1375 if (fd_ < 0)
1376 {
1377 open_status_ = (errno == ENOENT) ? ErrorCode::NOT_FOUND : ErrorCode::INIT_ERR;
1378 return;
1379 }
1380
1381 open_status_ = AttachLayout();
1382 }
1383
1384 if (fd_ >= 0)
1385 {
1386 close(fd_);
1387 fd_ = -1;
1388 }
1389
1390 open_ok_ = (open_status_ == ErrorCode::OK);
1391 }
1392
1393 void Close()
1394 {
1395 if (mapping_ != nullptr)
1396 {
1397 munmap(mapping_, mapping_size_);
1398 }
1399
1400 mapping_ = nullptr;
1401 base_ = nullptr;
1402 header_ = nullptr;
1403 slots_ = nullptr;
1404 subscribers_ = nullptr;
1405 free_slots_ = nullptr;
1406 descriptors_ = nullptr;
1407 payloads_ = nullptr;
1408 mapping_size_ = 0;
1409 open_ok_ = false;
1410 open_status_ = ErrorCode::STATE_ERR;
1411 }
1412
1413 struct stat GetStat() const
1414 {
1415 struct stat st = {};
1416 fstat(fd_, &st);
1417 return st;
1418 }
1419
1420 Descriptor* DescriptorRing(uint32_t subscriber_index) const
1421 {
1422 return descriptors_ + static_cast<size_t>(subscriber_index) * queue_capacity_;
1423 }
1424
1425 static bool ProcessAlive(const ProcessIdentity& identity)
1426 {
1427 ProcessIdentity current = {};
1428 if (!ReadProcessIdentity(identity.pid, current))
1429 {
1430 return false;
1431 }
1432
1433 return current.starttime == identity.starttime;
1434 }
1435
1436 bool PublisherValid() const
1437 {
1438 if (!publisher_ || header_ == nullptr)
1439 {
1440 return false;
1441 }
1442
1443 const ProcessIdentity owner = {
1444 header_->publisher_pid.load(std::memory_order_acquire),
1445 header_->publisher_starttime.load(std::memory_order_acquire),
1446 };
1447 return owner.pid == self_identity_.pid && owner.starttime == self_identity_.starttime;
1448 }
1449
1450 void HoldSlot(uint32_t subscriber_index, uint32_t slot_index)
1451 {
1452 subscribers_[subscriber_index].held_slot.store(slot_index, std::memory_order_release);
1453 }
1454
1455 void ClearHeldSlot(uint32_t subscriber_index, uint32_t slot_index)
1456 {
1457 uint32_t expected = slot_index;
1458 subscribers_[subscriber_index].held_slot.compare_exchange_strong(
1459 expected, kInvalidIndex, std::memory_order_acq_rel, std::memory_order_relaxed);
1460 }
1461
1462 ErrorCode RegisterBalancedSubscriber(uint32_t subscriber_index)
1463 {
1464 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1465 {
1466 uint32_t expected = kInvalidIndex;
1467 if (balanced_members_[i].compare_exchange_strong(expected, subscriber_index,
1468 std::memory_order_acq_rel,
1469 std::memory_order_relaxed))
1470 {
1471 return ErrorCode::OK;
1472 }
1473 }
1474 return ErrorCode::FULL;
1475 }
1476
1477 void UnregisterBalancedSubscriber(uint32_t subscriber_index)
1478 {
1479 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1480 {
1481 uint32_t expected = subscriber_index;
1482 if (balanced_members_[i].compare_exchange_strong(expected, kInvalidIndex,
1483 std::memory_order_acq_rel,
1484 std::memory_order_relaxed))
1485 {
1486 return;
1487 }
1488 }
1489 }
1490
1491 bool SelectBalancedSubscriber(uint32_t& subscriber_index)
1492 {
1493 const uint64_t base = balanced_group_->rr_cursor.fetch_add(1, std::memory_order_acq_rel);
1494 for (uint32_t offset = 0; offset < subscriber_capacity_; ++offset)
1495 {
1496 const uint32_t member_index =
1497 balanced_members_[(base + offset) % subscriber_capacity_].load(std::memory_order_acquire);
1498 if (member_index == kInvalidIndex)
1499 {
1500 continue;
1501 }
1502 if (subscribers_[member_index].active.load(std::memory_order_acquire) == 0)
1503 {
1504 continue;
1505 }
1506 if (subscribers_[member_index].mode.load(std::memory_order_acquire) !=
1507 static_cast<uint32_t>(LinuxSharedSubscriberMode::BALANCE_RR))
1508 {
1509 continue;
1510 }
1511 if (!QueueHasSpace(member_index))
1512 {
1513 continue;
1514 }
1515 subscriber_index = member_index;
1516 return true;
1517 }
1518 return false;
1519 }
1520
1521 static void PostReady(SubscriberControl& control)
1522 {
1523 control.ready_sem_count.fetch_add(1, std::memory_order_release);
1524 FutexWake(&control.ready_sem_count);
1525 }
1526
1527 static void ConsumeReady(SubscriberControl& control)
1528 {
1529 const uint32_t prev = control.ready_sem_count.fetch_sub(1, std::memory_order_acq_rel);
1530 ASSERT(prev > 0);
1531 }
1532
1533 static ErrorCode WaitReady(SubscriberControl& control, uint32_t timeout_ms)
1534 {
1535 if (control.ready_sem_count.load(std::memory_order_acquire) != 0)
1536 {
1537 return ErrorCode::OK;
1538 }
1539
1540 const bool infinite_wait = (timeout_ms == UINT32_MAX);
1541 const uint64_t deadline_ms = infinite_wait ? 0 : (NowMonotonicMs() + timeout_ms);
1542
1543 while (true)
1544 {
1545 if (control.ready_sem_count.load(std::memory_order_acquire) != 0)
1546 {
1547 return ErrorCode::OK;
1548 }
1549
1550 uint32_t wait_ms = UINT32_MAX;
1551 if (!infinite_wait)
1552 {
1553 wait_ms = MonotonicTime::RemainingMilliseconds(deadline_ms);
1554 if (wait_ms == 0)
1555 {
1556 return ErrorCode::TIMEOUT;
1557 }
1558 }
1559
1560 wait_ms = MonotonicTime::WaitSliceMilliseconds(wait_ms);
1561
1562 const int futex_ans = FutexWait(&control.ready_sem_count, 0, wait_ms);
1563 if (futex_ans == 0 || errno == EAGAIN || errno == EINTR)
1564 {
1565 continue;
1566 }
1567
1568 if (errno == ETIMEDOUT)
1569 {
1570 if (infinite_wait)
1571 {
1572 continue;
1573 }
1574 if (MonotonicTime::RemainingMilliseconds(deadline_ms) == 0 &&
1575 control.ready_sem_count.load(std::memory_order_acquire) == 0)
1576 {
1577 return ErrorCode::TIMEOUT;
1578 }
1579 continue;
1580 }
1581
1582 return ErrorCode::FAILED;
1583 }
1584 }
1585
1586 void ScavengeDeadSubscribers()
1587 {
1588 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1589 {
1590 if (subscribers_[i].active.load(std::memory_order_acquire) == 0)
1591 {
1592 continue;
1593 }
1594
1595 const ProcessIdentity owner_identity = {
1596 subscribers_[i].owner_pid.load(std::memory_order_acquire),
1597 subscribers_[i].owner_starttime.load(std::memory_order_acquire),
1598 };
1599 if (ProcessAlive(owner_identity))
1600 {
1601 continue;
1602 }
1603
1604 uint32_t expected = 1;
1605 if (!subscribers_[i].active.compare_exchange_strong(expected, 0, std::memory_order_acq_rel,
1606 std::memory_order_relaxed))
1607 {
1608 continue;
1609 }
1610
1611 subscribers_[i].owner_pid.store(0, std::memory_order_release);
1612 subscribers_[i].owner_starttime.store(0, std::memory_order_release);
1613
1614 const uint32_t held_slot =
1615 subscribers_[i].held_slot.exchange(kInvalidIndex, std::memory_order_acq_rel);
1616 if (held_slot != kInvalidIndex)
1617 {
1618 ReleaseSlot(held_slot);
1619 }
1620
1621 Descriptor desc = {};
1622 while (TryPopDescriptor(i, desc) == ErrorCode::OK)
1623 {
1624 ReleaseSlot(desc.slot_index);
1625 }
1626 }
1627 }
1628
1629 bool QueueHasSpace(uint32_t subscriber_index) const
1630 {
1631 const SubscriberControl& control = subscribers_[subscriber_index];
1632 const uint32_t head = control.queue_head.load(std::memory_order_acquire);
1633 const uint32_t tail = control.queue_tail.load(std::memory_order_relaxed);
1634 const uint32_t next_tail = (tail + 1U) % queue_capacity_;
1635 return next_tail != head;
1636 }
1637
1638 void PushDescriptor(uint32_t subscriber_index, const Descriptor& descriptor)
1639 {
1640 SubscriberControl& control = subscribers_[subscriber_index];
1641 Descriptor* ring = DescriptorRing(subscriber_index);
1642
1643 const uint32_t tail = control.queue_tail.load(std::memory_order_relaxed);
1644 ring[tail] = descriptor;
1645 const uint32_t next_tail = (tail + 1U) % queue_capacity_;
1646 control.queue_tail.store(next_tail, std::memory_order_release);
1647 PostReady(control);
1648 }
1649
1650 ErrorCode TryPopDescriptor(uint32_t subscriber_index, Descriptor& descriptor)
1651 {
1652 SubscriberControl& control = subscribers_[subscriber_index];
1653 Descriptor* ring = DescriptorRing(subscriber_index);
1654
1655 while (true)
1656 {
1657 uint32_t head = control.queue_head.load(std::memory_order_relaxed);
1658 const uint32_t tail = control.queue_tail.load(std::memory_order_acquire);
1659 if (head == tail)
1660 {
1661 return ErrorCode::EMPTY;
1662 }
1663
1664 descriptor = ring[head];
1665 const uint32_t next_head = (head + 1U) % queue_capacity_;
1666 if (control.queue_head.compare_exchange_weak(head, next_head, std::memory_order_acq_rel,
1667 std::memory_order_relaxed))
1668 {
1669 ConsumeReady(control);
1670 return ErrorCode::OK;
1671 }
1672 }
1673 }
1674
1675 ErrorCode DropDescriptor(uint32_t subscriber_index)
1676 {
1677 SubscriberControl& control = subscribers_[subscriber_index];
1678 Descriptor* ring = DescriptorRing(subscriber_index);
1679
1680 while (true)
1681 {
1682 uint32_t head = control.queue_head.load(std::memory_order_relaxed);
1683 const uint32_t tail = control.queue_tail.load(std::memory_order_acquire);
1684 if (head == tail)
1685 {
1686 return ErrorCode::EMPTY;
1687 }
1688
1689 const Descriptor descriptor = ring[head];
1690 const uint32_t next_head = (head + 1U) % queue_capacity_;
1691 if (control.queue_head.compare_exchange_weak(head, next_head, std::memory_order_acq_rel,
1692 std::memory_order_relaxed))
1693 {
1694 control.dropped_messages.fetch_add(1, std::memory_order_relaxed);
1695 ConsumeReady(control);
1696 ReleaseSlot(descriptor.slot_index);
1697 return ErrorCode::OK;
1698 }
1699 }
1700 }
1701
1702 ErrorCode PopFreeSlot(uint32_t& slot_index)
1703 {
1704 while (true)
1705 {
1706 uint64_t head = header_->free_queue_head.load(std::memory_order_relaxed);
1707 FreeSlotCell& cell = free_slots_[head % slot_count_];
1708 const uint64_t seq = cell.sequence.load(std::memory_order_acquire);
1709 const intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(head + 1U);
1710
1711 if (diff == 0)
1712 {
1713 if (header_->free_queue_head.compare_exchange_weak(head, head + 1U,
1714 std::memory_order_acq_rel,
1715 std::memory_order_relaxed))
1716 {
1717 slot_index = cell.slot_index;
1718 cell.sequence.store(head + slot_count_, std::memory_order_release);
1719 return ErrorCode::OK;
1720 }
1721 }
1722 else if (diff < 0)
1723 {
1724 return ErrorCode::FULL;
1725 }
1726 }
1727 }
1728
1729 void RecycleSlot(uint32_t slot_index)
1730 {
1731 slots_[slot_index].sequence.store(0, std::memory_order_release);
1732
1733 while (true)
1734 {
1735 uint64_t tail = header_->free_queue_tail.load(std::memory_order_relaxed);
1736 FreeSlotCell& cell = free_slots_[tail % slot_count_];
1737 const uint64_t seq = cell.sequence.load(std::memory_order_acquire);
1738 const intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(tail);
1739
1740 if (diff == 0)
1741 {
1742 if (header_->free_queue_tail.compare_exchange_weak(tail, tail + 1U,
1743 std::memory_order_acq_rel,
1744 std::memory_order_relaxed))
1745 {
1746 cell.slot_index = slot_index;
1747 cell.sequence.store(tail + 1U, std::memory_order_release);
1748 return;
1749 }
1750 }
1751 }
1752 }
1753
1754 void ReleaseSlot(uint32_t slot_index)
1755 {
1756 const uint32_t prev = slots_[slot_index].refcount.fetch_sub(1, std::memory_order_acq_rel);
1757 ASSERT(prev > 0);
1758 if (prev == 1)
1759 {
1760 RecycleSlot(slot_index);
1761 }
1762 }
1763
1764 ErrorCode PublishData(SharedData& data)
1765 {
1766 if (!data.Valid() || data.topic_ != this)
1767 {
1768 return ErrorCode::STATE_ERR;
1769 }
1770
1771 uint32_t active_count = 0;
1772 uint32_t balanced_target = kInvalidIndex;
1773 bool has_balanced_subscriber = false;
1774 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1775 {
1776 if (subscribers_[i].active.load(std::memory_order_acquire) == 0)
1777 {
1778 continue;
1779 }
1780
1781 const LinuxSharedSubscriberMode mode = static_cast<LinuxSharedSubscriberMode>(
1782 subscribers_[i].mode.load(std::memory_order_acquire));
1783 if (mode == LinuxSharedSubscriberMode::BALANCE_RR)
1784 {
1785 has_balanced_subscriber = true;
1786 continue;
1787 }
1788
1789 if (!QueueHasSpace(i))
1790 {
1791 ScavengeDeadSubscribers();
1792 if (subscribers_[i].active.load(std::memory_order_acquire) == 0)
1793 {
1794 continue;
1795 }
1796
1797 if (mode == LinuxSharedSubscriberMode::BROADCAST_DROP_OLD)
1798 {
1799 if (DropDescriptor(i) != ErrorCode::OK)
1800 {
1801 header_->publish_failures.fetch_add(1, std::memory_order_relaxed);
1802 data.Reset();
1803 return ErrorCode::FULL;
1804 }
1805 }
1806 else
1807 {
1808 subscribers_[i].dropped_messages.fetch_add(1, std::memory_order_relaxed);
1809 header_->publish_failures.fetch_add(1, std::memory_order_relaxed);
1810 data.Reset();
1811 return ErrorCode::FULL;
1812 }
1813 }
1814
1815 ++active_count;
1816 }
1817
1818 if (has_balanced_subscriber)
1819 {
1820 if (!SelectBalancedSubscriber(balanced_target))
1821 {
1822 ScavengeDeadSubscribers();
1823 if (!SelectBalancedSubscriber(balanced_target))
1824 {
1825 header_->publish_failures.fetch_add(1, std::memory_order_relaxed);
1826 data.Reset();
1827 return ErrorCode::FULL;
1828 }
1829 }
1830 ++active_count;
1831 }
1832
1833 if (active_count == 0)
1834 {
1835 data.Reset();
1836 return ErrorCode::OK;
1837 }
1838
1839 const uint64_t sequence =
1840 header_->next_sequence.fetch_add(1, std::memory_order_acq_rel) + 1ULL;
1841 SlotControl& slot = slots_[data.slot_index_];
1842 slot.refcount.store(active_count, std::memory_order_release);
1843 slot.sequence.store(sequence, std::memory_order_release);
1844
1845 const Descriptor descriptor = {data.slot_index_, 0U, sequence};
1846 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1847 {
1848 if (subscribers_[i].active.load(std::memory_order_acquire) == 0)
1849 {
1850 continue;
1851 }
1852 const LinuxSharedSubscriberMode mode = static_cast<LinuxSharedSubscriberMode>(
1853 subscribers_[i].mode.load(std::memory_order_acquire));
1854 if (mode == LinuxSharedSubscriberMode::BALANCE_RR)
1855 {
1856 continue;
1857 }
1858 PushDescriptor(i, descriptor);
1859 }
1860
1861 if (balanced_target != kInvalidIndex)
1862 {
1863 PushDescriptor(balanced_target, descriptor);
1864 }
1865
1866 data.topic_ = nullptr;
1867 data.slot_index_ = kInvalidIndex;
1868 return ErrorCode::OK;
1869 }
1870
1871 bool create_ = false;
1872 bool publisher_ = false;
1873 LinuxSharedTopicConfig config_;
1874 uint32_t domain_crc32_ = 0;
1875 std::string topic_name_;
1876 uint64_t name_key_ = 0;
1877 std::string shm_name_;
1878 ProcessIdentity self_identity_ = {};
1879
1880 int fd_ = -1;
1881 void* mapping_ = nullptr;
1882 uint8_t* base_ = nullptr;
1883 size_t mapping_size_ = 0;
1884
1885 SharedHeader* header_ = nullptr;
1886 char* topic_name_ptr_ = nullptr;
1887 SlotControl* slots_ = nullptr;
1888 SubscriberControl* subscribers_ = nullptr;
1889 BalancedGroupControl* balanced_group_ = nullptr;
1890 std::atomic<uint32_t>* balanced_members_ = nullptr;
1891 FreeSlotCell* free_slots_ = nullptr;
1892 Descriptor* descriptors_ = nullptr;
1893 TopicData* payloads_ = nullptr;
1894
1895 uint32_t slot_count_ = 0;
1896 uint32_t subscriber_capacity_ = 0;
1897 uint32_t queue_capacity_ = 0;
1898
1899 bool open_ok_ = false;
1900 ErrorCode open_status_ = ErrorCode::STATE_ERR;
1901
1902};
1903
1904} // namespace LibXR
1905
1906#endif
LibXR 命名空间
Definition ch32_can.hpp:14
ErrorCode
定义错误码枚举
@ INIT_ERR
初始化错误 | Initialization error
@ EMPTY
为空 | Empty