libxr  1.0
Want to be the best embedded framework
Loading...
Searching...
No Matches
linux_shared_topic_impl.hpp
1#pragma once
2
3#if defined(LIBXR_SYSTEM_POSIX_HOST)
4
5#include <atomic>
6#include <cerrno>
7#include <chrono>
8#include <cinttypes>
9#include <cstddef>
10#include <cstdint>
11#include <cstdio>
12#include <cstring>
13#include <fstream>
14#include <memory>
15#include <sstream>
16#include <string>
17#include <type_traits>
18
19#include <fcntl.h>
20#include <linux/futex.h>
21#include <sys/mman.h>
22#include <sys/stat.h>
23#include <sys/syscall.h>
24#include <time.h>
25#include <unistd.h>
26#include <signal.h>
27
28#include "crc.hpp"
29#include "libxr_def.hpp"
30#include "message.hpp"
31
32namespace LibXR
33{
38enum class LinuxSharedSubscriberMode : uint8_t
39{
40 BROADCAST_FULL = 0,
41 BROADCAST_DROP_OLD = 1,
42 BALANCE_RR = 2,
43};
44
49struct LinuxSharedTopicConfig
50{
51 uint32_t slot_num = 64;
52 uint32_t subscriber_num = 8;
53 uint32_t queue_num = 64;
54};
55
71template <typename TopicData>
72class LinuxSharedTopic : public Topic
73{
74 static_assert(std::is_trivially_copyable<TopicData>::value,
75 "LinuxSharedTopic requires trivially copyable data");
76 static_assert(std::atomic<uint32_t>::is_always_lock_free,
77 "LinuxSharedTopic requires lock-free 32-bit atomics");
78 static_assert(std::atomic<uint64_t>::is_always_lock_free,
79 "LinuxSharedTopic requires lock-free 64-bit atomics");
80
81 enum class SharedDataState : uint8_t
82 {
83 EMPTY = 0,
84 PUBLISHER = 1,
85 SUBSCRIBER = 2,
86 };
87
88 public:
89 class SharedData;
90 using Data = SharedData;
91 static constexpr const char* DEFAULT_DOMAIN_NAME = "libxr_def_domain";
92
98 class Subscriber
99 {
100 public:
101 using Data = SharedData;
102
106 Subscriber() = default;
107
116 explicit Subscriber(const char* name,
117 LinuxSharedSubscriberMode mode =
118 LinuxSharedSubscriberMode::BROADCAST_FULL)
119 : owned_topic_(new LinuxSharedTopic(name))
120 {
121 if (Attach(*owned_topic_, mode) != ErrorCode::OK)
122 {
123 delete owned_topic_;
124 owned_topic_ = nullptr;
125 }
126 }
127
128 Subscriber(const char* name, const char* domain_name,
129 LinuxSharedSubscriberMode mode = LinuxSharedSubscriberMode::BROADCAST_FULL)
130 : owned_topic_(new LinuxSharedTopic(name, domain_name))
131 {
132 if (Attach(*owned_topic_, mode) != ErrorCode::OK)
133 {
134 delete owned_topic_;
135 owned_topic_ = nullptr;
136 }
137 }
138
139 Subscriber(const char* name, Topic::Domain& domain,
140 LinuxSharedSubscriberMode mode = LinuxSharedSubscriberMode::BROADCAST_FULL)
141 : owned_topic_(new LinuxSharedTopic(name, domain))
142 {
143 if (Attach(*owned_topic_, mode) != ErrorCode::OK)
144 {
145 delete owned_topic_;
146 owned_topic_ = nullptr;
147 }
148 }
149
156 explicit Subscriber(LinuxSharedTopic& topic,
157 LinuxSharedSubscriberMode mode =
158 LinuxSharedSubscriberMode::BROADCAST_FULL)
159 {
160 (void)Attach(topic, mode);
161 }
162
166 ~Subscriber() { Reset(); }
167
168 Subscriber(const Subscriber&) = delete;
169 Subscriber& operator=(const Subscriber&) = delete;
170
171 Subscriber(Subscriber&& other) noexcept { *this = std::move(other); }
172
173 Subscriber& operator=(Subscriber&& other) noexcept
174 {
175 if (this == &other)
176 {
177 return *this;
178 }
179
180 Reset();
181
182 topic_ = other.topic_;
183 owned_topic_ = other.owned_topic_;
184 subscriber_index_ = other.subscriber_index_;
185 current_slot_index_ = other.current_slot_index_;
186 current_sequence_ = other.current_sequence_;
187 current_timestamp_ = other.current_timestamp_;
188
189 other.topic_ = nullptr;
190 other.owned_topic_ = nullptr;
191 other.subscriber_index_ = INVALID_INDEX;
192 other.current_slot_index_ = INVALID_INDEX;
193 other.current_sequence_ = 0;
194 other.current_timestamp_ = MicrosecondTimestamp();
195 return *this;
196 }
197
203 bool Valid() const
204 {
205 return topic_ != nullptr && subscriber_index_ != INVALID_INDEX;
206 }
207
215 ErrorCode Wait(uint32_t timeout_ms = UINT32_MAX)
216 {
217 if (!Valid())
218 {
219 return ErrorCode::STATE_ERR;
220 }
221
222 const uint64_t deadline_ms =
223 (timeout_ms == UINT32_MAX) ? 0 : (NowMonotonicMs() + timeout_ms);
224
225 Descriptor desc = {};
226 while (true)
227 {
228 ErrorCode pop_ans = topic_->TryPopDescriptor(subscriber_index_, desc);
229 if (pop_ans == ErrorCode::OK)
230 {
231 Release();
232 topic_->HoldSlot(subscriber_index_, desc.slot_index);
233 current_slot_index_ = desc.slot_index;
234 current_sequence_ = desc.sequence;
235 current_timestamp_ = topic_->SlotTimestamp(desc.slot_index);
236 return ErrorCode::OK;
237 }
238
239 uint32_t wait_ms = UINT32_MAX;
240 if (timeout_ms != UINT32_MAX)
241 {
242 const uint64_t now_ms = NowMonotonicMs();
243 if (now_ms >= deadline_ms)
244 {
245 return ErrorCode::TIMEOUT;
246 }
247 wait_ms = static_cast<uint32_t>(deadline_ms - now_ms);
248 }
249
250 const ErrorCode wait_ans = topic_->WaitReady(topic_->subscribers_[subscriber_index_],
251 wait_ms);
252 if (wait_ans == ErrorCode::OK)
253 {
254 continue;
255 }
256 return wait_ans;
257 }
258 }
259
268 ErrorCode Wait(SharedData& data, uint32_t timeout_ms = UINT32_MAX)
269 {
270 data.Reset();
271
272 if (!Valid())
273 {
274 return ErrorCode::STATE_ERR;
275 }
276
277 const uint64_t deadline_ms =
278 (timeout_ms == UINT32_MAX) ? 0 : (NowMonotonicMs() + timeout_ms);
279
280 Descriptor desc = {};
281 while (true)
282 {
283 ErrorCode pop_ans = topic_->TryPopDescriptor(subscriber_index_, desc);
284 if (pop_ans == ErrorCode::OK)
285 {
286 data.topic_ = topic_;
287 data.slot_index_ = desc.slot_index;
288 data.sequence_ = desc.sequence;
289 data.state_ = SharedDataState::SUBSCRIBER;
290 data.subscriber_index_ = subscriber_index_;
291 topic_->HoldSlot(subscriber_index_, desc.slot_index);
292 return ErrorCode::OK;
293 }
294
295 uint32_t wait_ms = UINT32_MAX;
296 if (timeout_ms != UINT32_MAX)
297 {
298 const uint64_t now_ms = NowMonotonicMs();
299 if (now_ms >= deadline_ms)
300 {
301 return ErrorCode::TIMEOUT;
302 }
303 wait_ms = static_cast<uint32_t>(deadline_ms - now_ms);
304 }
305
306 const ErrorCode wait_ans = topic_->WaitReady(topic_->subscribers_[subscriber_index_],
307 wait_ms);
308 if (wait_ans == ErrorCode::OK)
309 {
310 continue;
311 }
312 return wait_ans;
313 }
314 }
315
321 TopicData* GetData() const
322 {
323 if (!Valid() || current_slot_index_ == INVALID_INDEX)
324 {
325 return nullptr;
326 }
327
328 return &topic_->payloads_[current_slot_index_];
329 }
330
334 uint64_t GetSequence() const { return current_sequence_; }
335
339 MicrosecondTimestamp GetTimestamp() const { return current_timestamp_; }
340
344 uint32_t GetPendingNum() const
345 {
346 if (!Valid())
347 {
348 return 0;
349 }
350
351 const SubscriberControl& control = topic_->subscribers_[subscriber_index_];
352 const uint32_t head = control.queue_head.load(std::memory_order_acquire);
353 const uint32_t tail = control.queue_tail.load(std::memory_order_acquire);
354 if (tail >= head)
355 {
356 return tail - head;
357 }
358 return topic_->queue_capacity_ - (head - tail);
359 }
360
364 uint64_t GetDropNum() const
365 {
366 if (!Valid())
367 {
368 return 0;
369 }
370
371 return topic_->subscribers_[subscriber_index_].dropped_messages.load(
372 std::memory_order_acquire);
373 }
374
378 void Release()
379 {
380 if (!Valid() || current_slot_index_ == INVALID_INDEX)
381 {
382 return;
383 }
384
385 topic_->ClearHeldSlot(subscriber_index_, current_slot_index_);
386 topic_->ReleaseSlot(current_slot_index_);
387 current_slot_index_ = INVALID_INDEX;
388 current_sequence_ = 0;
389 current_timestamp_ = MicrosecondTimestamp();
390 }
391
396 void Reset()
397 {
398 if (!Valid())
399 {
400 return;
401 }
402
403 topic_->UnregisterBalancedSubscriber(subscriber_index_);
404 topic_->subscribers_[subscriber_index_].active.store(0, std::memory_order_release);
405 topic_->subscribers_[subscriber_index_].owner_pid.store(0, std::memory_order_release);
406 topic_->subscribers_[subscriber_index_].owner_starttime.store(0,
407 std::memory_order_release);
408
409 Descriptor desc = {};
410 while (topic_->TryPopDescriptor(subscriber_index_, desc) == ErrorCode::OK)
411 {
412 topic_->ReleaseSlot(desc.slot_index);
413 }
414
415 Release();
416
417 topic_ = nullptr;
418 delete owned_topic_;
419 owned_topic_ = nullptr;
420 subscriber_index_ = INVALID_INDEX;
421 current_slot_index_ = INVALID_INDEX;
422 current_sequence_ = 0;
423 current_timestamp_ = MicrosecondTimestamp();
424 }
425
426 private:
427 ErrorCode Attach(LinuxSharedTopic& topic, LinuxSharedSubscriberMode mode)
428 {
429 Reset();
430
431 if (!topic.Valid())
432 {
433 return ErrorCode::STATE_ERR;
434 }
435
436 if (topic.self_identity_.starttime == 0)
437 {
438 return ErrorCode::STATE_ERR;
439 }
440
441 for (uint32_t i = 0; i < topic.subscriber_capacity_; ++i)
442 {
443 uint32_t expected = 0;
444 auto& active = topic.subscribers_[i].active;
445 if (active.compare_exchange_strong(expected, 1, std::memory_order_acq_rel,
446 std::memory_order_relaxed))
447 {
448 topic.subscribers_[i].queue_head.store(0, std::memory_order_release);
449 topic.subscribers_[i].queue_tail.store(0, std::memory_order_release);
450 topic.subscribers_[i].ready_sem_count.store(0, std::memory_order_release);
451 topic.subscribers_[i].dropped_messages.store(0, std::memory_order_release);
452 topic.subscribers_[i].owner_pid.store(topic.self_identity_.pid,
453 std::memory_order_release);
454 topic.subscribers_[i].owner_starttime.store(topic.self_identity_.starttime,
455 std::memory_order_release);
456 topic.subscribers_[i].held_slot.store(INVALID_INDEX, std::memory_order_release);
457 topic.subscribers_[i].mode.store(static_cast<uint32_t>(mode),
458 std::memory_order_release);
459 if (mode == LinuxSharedSubscriberMode::BALANCE_RR)
460 {
461 const ErrorCode join_ans = topic.RegisterBalancedSubscriber(i);
462 if (join_ans != ErrorCode::OK)
463 {
464 topic.subscribers_[i].active.store(0, std::memory_order_release);
465 topic.subscribers_[i].owner_pid.store(0, std::memory_order_release);
466 topic.subscribers_[i].owner_starttime.store(0,
467 std::memory_order_release);
468 topic.subscribers_[i].mode.store(
469 static_cast<uint32_t>(LinuxSharedSubscriberMode::BROADCAST_FULL),
470 std::memory_order_release);
471 return join_ans;
472 }
473 }
474 topic_ = &topic;
475 subscriber_index_ = i;
476 current_slot_index_ = INVALID_INDEX;
477 current_sequence_ = 0;
478 current_timestamp_ = MicrosecondTimestamp();
479 return ErrorCode::OK;
480 }
481 }
482
483 return ErrorCode::FULL;
484 }
485
486 LinuxSharedTopic* topic_ = nullptr;
487 LinuxSharedTopic* owned_topic_ = nullptr;
488 uint32_t subscriber_index_ = INVALID_INDEX;
489 uint32_t current_slot_index_ = INVALID_INDEX;
490 uint64_t current_sequence_ = 0;
491 MicrosecondTimestamp current_timestamp_;
492 };
493
503 class SharedData
504 {
505 public:
509 SharedData() = default;
510
515 ~SharedData() { Reset(); }
516
517 SharedData(const SharedData&) = delete;
518 SharedData& operator=(const SharedData&) = delete;
519
520 SharedData(SharedData&& other) noexcept { *this = std::move(other); }
521
525 SharedData& operator=(SharedData&& other) noexcept
526 {
527 if (this == &other)
528 {
529 return *this;
530 }
531
532 Reset();
533
534 topic_ = other.topic_;
535 slot_index_ = other.slot_index_;
536 sequence_ = other.sequence_;
537 state_ = other.state_;
538 subscriber_index_ = other.subscriber_index_;
539
540 other.topic_ = nullptr;
541 other.slot_index_ = INVALID_INDEX;
542 other.sequence_ = 0;
543 other.state_ = SharedDataState::EMPTY;
544 other.subscriber_index_ = INVALID_INDEX;
545 return *this;
546 }
547
551 bool Valid() const { return topic_ != nullptr && slot_index_ != INVALID_INDEX; }
552
556 bool Empty() const { return !Valid(); }
557
561 uint64_t GetSequence() const { return sequence_; }
562
566 MicrosecondTimestamp GetTimestamp() const
567 {
568 if (!Valid() || state_ != SharedDataState::SUBSCRIBER)
569 {
570 return MicrosecondTimestamp();
571 }
572 return topic_->SlotTimestamp(slot_index_);
573 }
574
580 TopicData* GetData()
581 {
582 if (!Valid())
583 {
584 return nullptr;
585 }
586 return &topic_->payloads_[slot_index_];
587 }
588
594 TopicData* GetData() const
595 {
596 if (!Valid())
597 {
598 return nullptr;
599 }
600 return &topic_->payloads_[slot_index_];
601 }
602
606 void Reset()
607 {
608 if (!Valid())
609 {
610 return;
611 }
612
613 if (state_ == SharedDataState::PUBLISHER)
614 {
615 topic_->RecycleSlot(slot_index_);
616 }
617 else if (state_ == SharedDataState::SUBSCRIBER)
618 {
619 topic_->ClearHeldSlot(subscriber_index_, slot_index_);
620 topic_->ReleaseSlot(slot_index_);
621 }
622 topic_ = nullptr;
623 slot_index_ = INVALID_INDEX;
624 sequence_ = 0;
625 state_ = SharedDataState::EMPTY;
626 subscriber_index_ = INVALID_INDEX;
627 }
628
629 private:
630 friend class LinuxSharedTopic<TopicData>;
631 friend class Subscriber;
632
633 LinuxSharedTopic* topic_ = nullptr;
634 uint32_t slot_index_ = INVALID_INDEX;
635 uint64_t sequence_ = 0;
636 SharedDataState state_ = SharedDataState::EMPTY;
637 uint32_t subscriber_index_ = INVALID_INDEX;
638 };
639
644 explicit LinuxSharedTopic(const char* topic_name)
645 : LinuxSharedTopic(topic_name, DEFAULT_DOMAIN_NAME)
646 {
647 }
648
649 LinuxSharedTopic(const char* topic_name, const char* domain_name)
650 : create_(false),
651 publisher_(false),
652 config_(),
653 domain_crc32_(ResolveDomainKey(domain_name)),
654 topic_name_(ResolveTopicName(topic_name)),
655 name_key_(BuildNameKey(domain_crc32_, topic_name_)),
656 shm_name_(BuildShmName(name_key_))
657 {
658 (void)ReadProcessIdentity(static_cast<uint32_t>(getpid()), self_identity_);
659 Open();
660 }
661
662 LinuxSharedTopic(const char* topic_name, Topic::Domain& domain)
663 : create_(false),
664 publisher_(false),
665 config_(),
666 domain_crc32_(domain.node_ != nullptr ? domain.node_->key : 0),
667 topic_name_(ResolveTopicName(topic_name)),
668 name_key_(BuildNameKey(domain_crc32_, topic_name_)),
669 shm_name_(BuildShmName(name_key_))
670 {
671 (void)ReadProcessIdentity(static_cast<uint32_t>(getpid()), self_identity_);
672 Open();
673 }
674
681 LinuxSharedTopic(const char* topic_name, const LinuxSharedTopicConfig& config)
682 : LinuxSharedTopic(topic_name, DEFAULT_DOMAIN_NAME, config)
683 {
684 }
685
686 LinuxSharedTopic(const char* topic_name, const char* domain_name,
687 const LinuxSharedTopicConfig& config)
688 : create_(true),
689 publisher_(true),
690 config_(config),
691 domain_crc32_(ResolveDomainKey(domain_name)),
692 topic_name_(ResolveTopicName(topic_name)),
693 name_key_(BuildNameKey(domain_crc32_, topic_name_)),
694 shm_name_(BuildShmName(name_key_))
695 {
696 (void)ReadProcessIdentity(static_cast<uint32_t>(getpid()), self_identity_);
697 Open();
698 }
699
700 LinuxSharedTopic(const char* topic_name, Topic::Domain& domain,
701 const LinuxSharedTopicConfig& config)
702 : create_(true),
703 publisher_(true),
704 config_(config),
705 domain_crc32_(domain.node_ != nullptr ? domain.node_->key : 0),
706 topic_name_(ResolveTopicName(topic_name)),
707 name_key_(BuildNameKey(domain_crc32_, topic_name_)),
708 shm_name_(BuildShmName(name_key_))
709 {
710 (void)ReadProcessIdentity(static_cast<uint32_t>(getpid()), self_identity_);
711 Open();
712 }
713
718 using SyncSubscriber = Subscriber;
719
723 ~LinuxSharedTopic() { Close(); }
724
725 LinuxSharedTopic(const LinuxSharedTopic&) = delete;
726 LinuxSharedTopic& operator=(const LinuxSharedTopic&) = delete;
727
728 LinuxSharedTopic(LinuxSharedTopic&&) = delete;
729 LinuxSharedTopic& operator=(LinuxSharedTopic&&) = delete;
730
735 bool Valid() const { return open_ok_; }
736
740 ErrorCode GetError() const { return open_status_; }
741
745 uint32_t GetSubscriberNum() const
746 {
747 if (!Valid())
748 {
749 return 0;
750 }
751
752 uint32_t count = 0;
753 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
754 {
755 if (subscribers_[i].active.load(std::memory_order_acquire) != 0)
756 {
757 ++count;
758 }
759 }
760 return count;
761 }
762
769 ErrorCode CreateData(SharedData& data)
770 {
771 if (!Valid())
772 {
773 return ErrorCode::STATE_ERR;
774 }
775
776 if (!PublisherValid())
777 {
778 return ErrorCode::STATE_ERR;
779 }
780
781 data.Reset();
782
783 uint32_t slot_index = INVALID_INDEX;
784 ErrorCode pop_ans = PopFreeSlot(slot_index);
785 if (pop_ans != ErrorCode::OK)
786 {
787 ScavengeDeadSubscribers();
788 pop_ans = PopFreeSlot(slot_index);
789 if (pop_ans != ErrorCode::OK)
790 {
791 return pop_ans;
792 }
793 }
794
795 slots_[slot_index].refcount.store(0, std::memory_order_release);
796 slots_[slot_index].sequence.store(0, std::memory_order_release);
797 slots_[slot_index].timestamp_us = 0;
798
799 data.topic_ = this;
800 data.slot_index_ = slot_index;
801 data.sequence_ = 0;
802 data.state_ = SharedDataState::PUBLISHER;
803 data.subscriber_index_ = INVALID_INDEX;
804 return ErrorCode::OK;
805 }
806
812 ErrorCode Publish(const TopicData& data)
813 {
814 SharedData topic_data;
815 const ErrorCode acquire_ans = CreateData(topic_data);
816 if (acquire_ans != ErrorCode::OK)
817 {
818 return acquire_ans;
819 }
820
821 *topic_data.GetData() = data;
822 return Publish(topic_data);
823 }
824
825 ErrorCode Publish(const TopicData& data, MicrosecondTimestamp timestamp)
826 {
827 SharedData topic_data;
828 const ErrorCode acquire_ans = CreateData(topic_data);
829 if (acquire_ans != ErrorCode::OK)
830 {
831 return acquire_ans;
832 }
833
834 *topic_data.GetData() = data;
835 return Publish(topic_data, timestamp);
836 }
837
841 ErrorCode Publish(SharedData&& data) { return PublishData<false>(data); }
842
843 ErrorCode Publish(SharedData&& data, MicrosecondTimestamp timestamp)
844 {
845 return PublishData<true>(data, timestamp);
846 }
847
851 ErrorCode Publish(SharedData& data) { return PublishData<false>(data); }
852
853 ErrorCode Publish(SharedData& data, MicrosecondTimestamp timestamp)
854 {
855 return PublishData<true>(data, timestamp);
856 }
857
861 uint64_t GetPublishFailedNum() const
862 {
863 if (!Valid())
864 {
865 return 0;
866 }
867 return header_->publish_failures.load(std::memory_order_acquire);
868 }
869
875 static ErrorCode Remove(const char* topic_name)
876 {
877 return Remove(topic_name, DEFAULT_DOMAIN_NAME);
878 }
879
880 static ErrorCode Remove(const char* topic_name, const char* domain_name)
881 {
882 const std::string shm_name = BuildShmName(
883 BuildNameKey(ResolveDomainKey(domain_name), ResolveTopicName(topic_name)));
884 if (shm_unlink(shm_name.c_str()) == 0 || errno == ENOENT)
885 {
886 return ErrorCode::OK;
887 }
888 return ErrorCode::FAILED;
889 }
890
891 static ErrorCode Remove(const char* topic_name, Topic::Domain& domain)
892 {
893 const uint32_t domain_crc32 = (domain.node_ != nullptr) ? domain.node_->key : 0;
894 const std::string shm_name =
895 BuildShmName(BuildNameKey(domain_crc32, ResolveTopicName(topic_name)));
896 if (shm_unlink(shm_name.c_str()) == 0 || errno == ENOENT)
897 {
898 return ErrorCode::OK;
899 }
900 return ErrorCode::FAILED;
901 }
902
903 private:
904 struct alignas(64) SharedHeader
905 {
906 uint64_t magic = 0;
907 uint64_t name_key = 0;
908 uint32_t domain_crc32 = 0;
909 uint32_t version = 0;
910 uint32_t data_size = 0;
911 uint32_t slot_count = 0;
912 uint32_t subscriber_capacity = 0;
913 uint32_t queue_capacity = 0;
914 uint32_t topic_name_len = 0;
915 std::atomic<uint32_t> init_state;
916 std::atomic<uint32_t> publisher_pid;
917 std::atomic<uint64_t> publisher_starttime;
918 std::atomic<uint64_t> free_queue_head;
919 std::atomic<uint64_t> free_queue_tail;
920 std::atomic<uint64_t> next_sequence;
921 std::atomic<uint64_t> publish_failures;
922 };
923
924 struct alignas(64) SlotControl
925 {
926 std::atomic<uint32_t> refcount;
927 std::atomic<uint64_t> sequence;
928 uint64_t timestamp_us;
929 };
930
931 struct alignas(16) FreeSlotCell
932 {
933 std::atomic<uint64_t> sequence;
934 uint32_t slot_index = 0;
935 uint32_t reserved = 0;
936 };
937
938 struct Descriptor
939 {
940 uint32_t slot_index = INVALID_INDEX;
941 uint32_t reserved = 0;
942 uint64_t sequence = 0;
943 };
944
945 struct alignas(64) SubscriberControl
946 {
947 std::atomic<uint32_t> active;
948 std::atomic<uint32_t> mode;
949 std::atomic<uint32_t> queue_head;
950 std::atomic<uint32_t> queue_tail;
951 std::atomic<uint32_t> ready_sem_count;
952 std::atomic<uint64_t> dropped_messages;
953 std::atomic<uint32_t> owner_pid;
954 std::atomic<uint64_t> owner_starttime;
955 std::atomic<uint32_t> held_slot;
956 };
957
958 struct alignas(64) BalancedGroupControl
959 {
960 std::atomic<uint64_t> rr_cursor;
961 };
962
963 struct ProcessIdentity
964 {
965 uint32_t pid = 0;
966 uint64_t starttime = 0;
967 };
968
969 static constexpr uint64_t MAGIC = 0x4c58524950435348ULL;
970 static constexpr uint32_t VERSION = 2;
971 static constexpr uint32_t INIT_READY = 1;
972 static constexpr uint32_t INVALID_INDEX = UINT32_MAX;
973
974 static uint32_t ResolveDomainKey(const char* domain_name)
975 {
976 const std::string resolved =
977 (domain_name == nullptr || domain_name[0] == '\0') ? std::string(DEFAULT_DOMAIN_NAME)
978 : std::string(domain_name);
979 return CRC32::Calculate(resolved.data(), resolved.size());
980 }
981
982 static std::string ResolveTopicName(const char* topic_name)
983 {
984 return (topic_name != nullptr) ? std::string(topic_name) : std::string();
985 }
986
987 static uint64_t BuildNameKey(uint32_t domain_crc32, const std::string& topic_name)
988 {
989 const uint32_t topic_len = static_cast<uint32_t>(topic_name.size());
990 std::string key_material;
991 key_material.reserve(sizeof(domain_crc32) + sizeof(topic_len) + topic_len);
992 key_material.append(reinterpret_cast<const char*>(&domain_crc32), sizeof(domain_crc32));
993 key_material.append(reinterpret_cast<const char*>(&topic_len), sizeof(topic_len));
994 key_material.append(topic_name.data(), topic_name.size());
995 return CRC64::Calculate(key_material.data(), key_material.size());
996 }
997
998 static std::string BuildShmName(uint64_t name_key)
999 {
1000 char buffer[64] = {};
1001 std::snprintf(buffer, sizeof(buffer), "/libxr_ipc_%016" PRIx64, name_key);
1002 return std::string(buffer);
1003 }
1004
1005 static size_t AlignUp(size_t value, size_t alignment)
1006 {
1007 return (value + alignment - 1U) & ~(alignment - 1U);
1008 }
1009
1010 static uint64_t NowMonotonicMs() { return MonotonicTime::NowMilliseconds(); }
1011
1012 static MicrosecondTimestamp NowMessageTimestamp() { return Topic::NowTimestamp(); }
1013
1014 static uint64_t ToSharedTimestamp(MicrosecondTimestamp timestamp)
1015 {
1016 return MonotonicTime::XrToSharedMicroseconds(static_cast<uint64_t>(timestamp));
1017 }
1018
1019 static MicrosecondTimestamp FromSharedTimestamp(uint64_t timestamp_us)
1020 {
1021 return MicrosecondTimestamp(MonotonicTime::SharedToXrMicroseconds(timestamp_us));
1022 }
1023
1024 static bool ReadProcessIdentity(uint32_t pid, ProcessIdentity& identity)
1025 {
1026 identity = {};
1027 if (pid == 0)
1028 {
1029 return false;
1030 }
1031
1032 char path[64] = {};
1033 std::snprintf(path, sizeof(path), "/proc/%u/stat", pid);
1034
1035 std::ifstream file(path);
1036 if (!file.is_open())
1037 {
1038 return false;
1039 }
1040
1041 std::string line;
1042 std::getline(file, line);
1043 if (line.empty())
1044 {
1045 return false;
1046 }
1047
1048 const size_t rparen = line.rfind(')');
1049 if (rparen == std::string::npos || rparen + 2U >= line.size())
1050 {
1051 return false;
1052 }
1053
1054 std::istringstream iss(line.substr(rparen + 2U));
1055 std::string token;
1056 for (int field = 3; field <= 22; ++field)
1057 {
1058 if (!(iss >> token))
1059 {
1060 return false;
1061 }
1062
1063 if (field == 22)
1064 {
1065 identity.pid = pid;
1066 identity.starttime = std::strtoull(token.c_str(), nullptr, 10);
1067 return identity.starttime != 0;
1068 }
1069 }
1070
1071 return false;
1072 }
1073
1074 static int FutexWait(std::atomic<uint32_t>* word, uint32_t expected, uint32_t timeout_ms)
1075 {
1076 struct timespec timeout = {};
1077 struct timespec* timeout_ptr = nullptr;
1078 if (timeout_ms != UINT32_MAX)
1079 {
1080 timeout.tv_sec = static_cast<time_t>(timeout_ms / 1000U);
1081 timeout.tv_nsec = static_cast<long>(timeout_ms % 1000U) * 1000000L;
1082 timeout_ptr = &timeout;
1083 }
1084
1085 return static_cast<int>(syscall(SYS_futex,
1086 reinterpret_cast<uint32_t*>(word),
1087 FUTEX_WAIT,
1088 expected,
1089 timeout_ptr,
1090 nullptr,
1091 0));
1092 }
1093
1094 static int FutexWake(std::atomic<uint32_t>* word)
1095 {
1096 return static_cast<int>(
1097 syscall(SYS_futex, reinterpret_cast<uint32_t*>(word), FUTEX_WAKE, INT32_MAX, nullptr,
1098 nullptr, 0));
1099 }
1100
1101 static size_t ComputeSharedBytes(uint32_t slot_count,
1102 uint32_t subscriber_capacity,
1103 uint32_t queue_capacity,
1104 uint32_t topic_name_len)
1105 {
1106 size_t offset = 0;
1107 offset = AlignUp(offset, alignof(SharedHeader));
1108 offset += sizeof(SharedHeader);
1109
1110 offset += static_cast<size_t>(topic_name_len) + 1U;
1111
1112 offset = AlignUp(offset, alignof(SlotControl));
1113 offset += sizeof(SlotControl) * slot_count;
1114
1115 offset = AlignUp(offset, alignof(SubscriberControl));
1116 offset += sizeof(SubscriberControl) * subscriber_capacity;
1117
1118 offset = AlignUp(offset, alignof(BalancedGroupControl));
1119 offset += sizeof(BalancedGroupControl);
1120
1121 offset = AlignUp(offset, alignof(std::atomic<uint32_t>));
1122 offset += sizeof(std::atomic<uint32_t>) * subscriber_capacity;
1123
1124 offset = AlignUp(offset, alignof(FreeSlotCell));
1125 offset += sizeof(FreeSlotCell) * slot_count;
1126
1127 offset = AlignUp(offset, alignof(Descriptor));
1128 offset += sizeof(Descriptor) * subscriber_capacity * queue_capacity;
1129
1130 offset = AlignUp(offset, alignof(TopicData));
1131 offset += sizeof(TopicData) * slot_count;
1132 return offset;
1133 }
1134
1135 void SetupPointers()
1136 {
1137 size_t offset = 0;
1138
1139 offset = AlignUp(offset, alignof(SharedHeader));
1140 header_ = reinterpret_cast<SharedHeader*>(base_ + offset);
1141 offset += sizeof(SharedHeader);
1142
1143 topic_name_ptr_ = reinterpret_cast<char*>(base_ + offset);
1144 offset += static_cast<size_t>(header_->topic_name_len) + 1U;
1145
1146 offset = AlignUp(offset, alignof(SlotControl));
1147 slots_ = reinterpret_cast<SlotControl*>(base_ + offset);
1148 offset += sizeof(SlotControl) * slot_count_;
1149
1150 offset = AlignUp(offset, alignof(SubscriberControl));
1151 subscribers_ = reinterpret_cast<SubscriberControl*>(base_ + offset);
1152 offset += sizeof(SubscriberControl) * subscriber_capacity_;
1153
1154 offset = AlignUp(offset, alignof(BalancedGroupControl));
1155 balanced_group_ = reinterpret_cast<BalancedGroupControl*>(base_ + offset);
1156 offset += sizeof(BalancedGroupControl);
1157
1158 offset = AlignUp(offset, alignof(std::atomic<uint32_t>));
1159 balanced_members_ = reinterpret_cast<std::atomic<uint32_t>*>(base_ + offset);
1160 offset += sizeof(std::atomic<uint32_t>) * subscriber_capacity_;
1161
1162 offset = AlignUp(offset, alignof(FreeSlotCell));
1163 free_slots_ = reinterpret_cast<FreeSlotCell*>(base_ + offset);
1164 offset += sizeof(FreeSlotCell) * slot_count_;
1165
1166 offset = AlignUp(offset, alignof(Descriptor));
1167 descriptors_ = reinterpret_cast<Descriptor*>(base_ + offset);
1168 offset += sizeof(Descriptor) * subscriber_capacity_ * queue_capacity_;
1169
1170 offset = AlignUp(offset, alignof(TopicData));
1171 payloads_ = reinterpret_cast<TopicData*>(base_ + offset);
1172 }
1173
1174 bool HeaderMatchesIdentity() const
1175 {
1176 if (header_->name_key != name_key_)
1177 {
1178 return false;
1179 }
1180 if (header_->domain_crc32 != domain_crc32_)
1181 {
1182 return false;
1183 }
1184 if (header_->topic_name_len != topic_name_.size())
1185 {
1186 return false;
1187 }
1188 if (std::memcmp(topic_name_ptr_, topic_name_.c_str(), topic_name_.size() + 1U) != 0)
1189 {
1190 return false;
1191 }
1192 return true;
1193 }
1194
1195 ErrorCode InitializeLayout()
1196 {
1197 if (config_.slot_num == 0 || config_.subscriber_num == 0 || config_.queue_num < 2)
1198 {
1199 return ErrorCode::ARG_ERR;
1200 }
1201
1202 const size_t bytes =
1203 ComputeSharedBytes(config_.slot_num, config_.subscriber_num, config_.queue_num,
1204 static_cast<uint32_t>(topic_name_.size()));
1205
1206 if (ftruncate(fd_, static_cast<off_t>(bytes)) != 0)
1207 {
1208 return ErrorCode::INIT_ERR;
1209 }
1210
1211 const struct stat st = GetStat();
1212 if (st.st_size <= 0)
1213 {
1214 return ErrorCode::INIT_ERR;
1215 }
1216
1217 mapping_size_ = static_cast<size_t>(st.st_size);
1218 mapping_ = mmap(nullptr, mapping_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
1219 if (mapping_ == MAP_FAILED)
1220 {
1221 mapping_ = nullptr;
1222 return ErrorCode::INIT_ERR;
1223 }
1224
1225 base_ = static_cast<uint8_t*>(mapping_);
1226 slot_count_ = config_.slot_num;
1227 subscriber_capacity_ = config_.subscriber_num;
1228 queue_capacity_ = config_.queue_num;
1229 header_ = reinterpret_cast<SharedHeader*>(base_ + AlignUp(0, alignof(SharedHeader)));
1230 header_->topic_name_len = static_cast<uint32_t>(topic_name_.size());
1231 SetupPointers();
1232
1233 header_->magic = MAGIC;
1234 header_->name_key = name_key_;
1235 header_->domain_crc32 = domain_crc32_;
1236 header_->version = VERSION;
1237 header_->data_size = sizeof(TopicData);
1238 header_->slot_count = slot_count_;
1239 header_->subscriber_capacity = subscriber_capacity_;
1240 header_->queue_capacity = queue_capacity_;
1241 std::memcpy(topic_name_ptr_, topic_name_.c_str(), topic_name_.size() + 1U);
1242 header_->publisher_pid.store(self_identity_.pid, std::memory_order_release);
1243 header_->publisher_starttime.store(self_identity_.starttime, std::memory_order_release);
1244 header_->free_queue_head.store(0, std::memory_order_release);
1245 header_->free_queue_tail.store(slot_count_, std::memory_order_release);
1246 header_->next_sequence.store(0, std::memory_order_release);
1247 header_->publish_failures.store(0, std::memory_order_release);
1248
1249 for (uint32_t i = 0; i < slot_count_; ++i)
1250 {
1251 slots_[i].refcount.store(0, std::memory_order_release);
1252 slots_[i].sequence.store(0, std::memory_order_release);
1253 slots_[i].timestamp_us = 0;
1254 std::construct_at(&payloads_[i], TopicData{});
1255 free_slots_[i].slot_index = i;
1256 free_slots_[i].sequence.store(static_cast<uint64_t>(i) + 1U,
1257 std::memory_order_release);
1258 }
1259
1260 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1261 {
1262 subscribers_[i].active.store(0, std::memory_order_release);
1263 subscribers_[i].mode.store(static_cast<uint32_t>(LinuxSharedSubscriberMode::BROADCAST_FULL),
1264 std::memory_order_release);
1265 subscribers_[i].queue_head.store(0, std::memory_order_release);
1266 subscribers_[i].queue_tail.store(0, std::memory_order_release);
1267 subscribers_[i].ready_sem_count.store(0, std::memory_order_release);
1268 subscribers_[i].dropped_messages.store(0, std::memory_order_release);
1269 subscribers_[i].owner_pid.store(0, std::memory_order_release);
1270 subscribers_[i].owner_starttime.store(0, std::memory_order_release);
1271 subscribers_[i].held_slot.store(INVALID_INDEX, std::memory_order_release);
1272 balanced_members_[i].store(INVALID_INDEX, std::memory_order_release);
1273 }
1274
1275 balanced_group_->rr_cursor.store(0, std::memory_order_release);
1276
1277 for (size_t i = 0; i < static_cast<size_t>(subscriber_capacity_) * queue_capacity_; ++i)
1278 {
1279 descriptors_[i] = Descriptor{};
1280 }
1281
1282 header_->init_state.store(INIT_READY, std::memory_order_release);
1283 return ErrorCode::OK;
1284 }
1285
1286 ErrorCode AttachLayout()
1287 {
1288 const struct stat st = GetStat();
1289 if (st.st_size <= 0)
1290 {
1291 return ErrorCode::NOT_FOUND;
1292 }
1293
1294 mapping_size_ = static_cast<size_t>(st.st_size);
1295 mapping_ = mmap(nullptr, mapping_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
1296 if (mapping_ == MAP_FAILED)
1297 {
1298 mapping_ = nullptr;
1299 return ErrorCode::INIT_ERR;
1300 }
1301
1302 base_ = static_cast<uint8_t*>(mapping_);
1303 header_ = reinterpret_cast<SharedHeader*>(base_);
1304
1305 while (header_->init_state.load(std::memory_order_acquire) != INIT_READY)
1306 {
1307 usleep(1000);
1308 }
1309
1310 if (header_->magic != MAGIC || header_->version != VERSION ||
1311 header_->data_size != sizeof(TopicData))
1312 {
1313 return ErrorCode::CHECK_ERR;
1314 }
1315
1316 slot_count_ = header_->slot_count;
1317 subscriber_capacity_ = header_->subscriber_capacity;
1318 queue_capacity_ = header_->queue_capacity;
1319 SetupPointers();
1320 if (!HeaderMatchesIdentity())
1321 {
1322 return ErrorCode::CHECK_ERR;
1323 }
1324 return ErrorCode::OK;
1325 }
1326
1327 bool TryReclaimStaleSegment()
1328 {
1329 int stale_fd = shm_open(shm_name_.c_str(), O_RDWR, 0600);
1330 if (stale_fd < 0)
1331 {
1332 return errno == ENOENT;
1333 }
1334
1335 struct stat st = {};
1336 if (fstat(stale_fd, &st) != 0)
1337 {
1338 close(stale_fd);
1339 return false;
1340 }
1341
1342 bool reclaim = false;
1343 if (st.st_size < static_cast<off_t>(sizeof(SharedHeader)))
1344 {
1345 reclaim = true;
1346 }
1347 else
1348 {
1349 void* mapping = mmap(nullptr, static_cast<size_t>(st.st_size), PROT_READ | PROT_WRITE,
1350 MAP_SHARED, stale_fd, 0);
1351 if (mapping != MAP_FAILED)
1352 {
1353 uint8_t* base = static_cast<uint8_t*>(mapping);
1354 auto* header = reinterpret_cast<SharedHeader*>(mapping);
1355 const uint32_t init_state = header->init_state.load(std::memory_order_acquire);
1356 bool identity_match = false;
1357 const size_t mapping_size = static_cast<size_t>(st.st_size);
1358 const size_t topic_name_bytes = topic_name_.size() + 1U;
1359 if (header->magic == MAGIC && header->version == VERSION &&
1360 header->domain_crc32 == domain_crc32_ &&
1361 header->topic_name_len == topic_name_.size())
1362 {
1363 size_t offset = AlignUp(0, alignof(SharedHeader));
1364 offset += sizeof(SharedHeader);
1365 if (offset <= mapping_size && topic_name_bytes <= (mapping_size - offset))
1366 {
1367 const char* mapped_topic = reinterpret_cast<const char*>(base + offset);
1368 identity_match =
1369 (header->name_key == name_key_) &&
1370 (std::memcmp(mapped_topic, topic_name_.c_str(), topic_name_bytes) == 0);
1371 }
1372 }
1373 const ProcessIdentity publisher_identity = {
1374 header->publisher_pid.load(std::memory_order_acquire),
1375 header->publisher_starttime.load(std::memory_order_acquire),
1376 };
1377
1378 if (!identity_match)
1379 {
1380 reclaim = false;
1381 }
1382 else if (init_state != INIT_READY)
1383 {
1384 reclaim = !ProcessAlive(publisher_identity);
1385 }
1386 else if (!ProcessAlive(publisher_identity))
1387 {
1388 reclaim = true;
1389 }
1390
1391 munmap(mapping, static_cast<size_t>(st.st_size));
1392 }
1393 }
1394
1395 close(stale_fd);
1396
1397 if (!reclaim)
1398 {
1399 return false;
1400 }
1401
1402 return shm_unlink(shm_name_.c_str()) == 0 || errno == ENOENT;
1403 }
1404
1405 void Open()
1406 {
1407 open_status_ = ErrorCode::STATE_ERR;
1408 open_ok_ = false;
1409
1410 if (create_)
1411 {
1412 for (int attempt = 0; attempt < 2; ++attempt)
1413 {
1414 fd_ = shm_open(shm_name_.c_str(), O_CREAT | O_EXCL | O_RDWR, 0600);
1415 if (fd_ >= 0)
1416 {
1417 break;
1418 }
1419
1420 if (errno != EEXIST || !TryReclaimStaleSegment())
1421 {
1422 break;
1423 }
1424 }
1425
1426 if (fd_ < 0)
1427 {
1428 open_status_ = (errno == EEXIST) ? ErrorCode::BUSY : ErrorCode::INIT_ERR;
1429 return;
1430 }
1431
1432 open_status_ = InitializeLayout();
1433 }
1434 else
1435 {
1436 fd_ = shm_open(shm_name_.c_str(), O_RDWR, 0600);
1437 if (fd_ < 0)
1438 {
1439 open_status_ = (errno == ENOENT) ? ErrorCode::NOT_FOUND : ErrorCode::INIT_ERR;
1440 return;
1441 }
1442
1443 open_status_ = AttachLayout();
1444 }
1445
1446 if (fd_ >= 0)
1447 {
1448 close(fd_);
1449 fd_ = -1;
1450 }
1451
1452 open_ok_ = (open_status_ == ErrorCode::OK);
1453 }
1454
1455 void Close()
1456 {
1457 if (mapping_ != nullptr)
1458 {
1459 munmap(mapping_, mapping_size_);
1460 }
1461
1462 mapping_ = nullptr;
1463 base_ = nullptr;
1464 header_ = nullptr;
1465 slots_ = nullptr;
1466 subscribers_ = nullptr;
1467 free_slots_ = nullptr;
1468 descriptors_ = nullptr;
1469 payloads_ = nullptr;
1470 mapping_size_ = 0;
1471 open_ok_ = false;
1472 open_status_ = ErrorCode::STATE_ERR;
1473 }
1474
1475 struct stat GetStat() const
1476 {
1477 struct stat st = {};
1478 fstat(fd_, &st);
1479 return st;
1480 }
1481
1482 Descriptor* DescriptorRing(uint32_t subscriber_index) const
1483 {
1484 return descriptors_ + static_cast<size_t>(subscriber_index) * queue_capacity_;
1485 }
1486
1487 static bool ProcessAlive(const ProcessIdentity& identity)
1488 {
1489 ProcessIdentity current = {};
1490 if (!ReadProcessIdentity(identity.pid, current))
1491 {
1492 return false;
1493 }
1494
1495 return current.starttime == identity.starttime;
1496 }
1497
1498 bool PublisherValid() const
1499 {
1500 if (!publisher_ || header_ == nullptr)
1501 {
1502 return false;
1503 }
1504
1505 const ProcessIdentity owner = {
1506 header_->publisher_pid.load(std::memory_order_acquire),
1507 header_->publisher_starttime.load(std::memory_order_acquire),
1508 };
1509 return owner.pid == self_identity_.pid && owner.starttime == self_identity_.starttime;
1510 }
1511
1512 void HoldSlot(uint32_t subscriber_index, uint32_t slot_index)
1513 {
1514 subscribers_[subscriber_index].held_slot.store(slot_index, std::memory_order_release);
1515 }
1516
1517 void ClearHeldSlot(uint32_t subscriber_index, uint32_t slot_index)
1518 {
1519 uint32_t expected = slot_index;
1520 subscribers_[subscriber_index].held_slot.compare_exchange_strong(
1521 expected, INVALID_INDEX, std::memory_order_acq_rel, std::memory_order_relaxed);
1522 }
1523
1524 MicrosecondTimestamp SlotTimestamp(uint32_t slot_index) const
1525 {
1526 return FromSharedTimestamp(slots_[slot_index].timestamp_us);
1527 }
1528
1529 ErrorCode RegisterBalancedSubscriber(uint32_t subscriber_index)
1530 {
1531 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1532 {
1533 uint32_t expected = INVALID_INDEX;
1534 if (balanced_members_[i].compare_exchange_strong(expected, subscriber_index,
1535 std::memory_order_acq_rel,
1536 std::memory_order_relaxed))
1537 {
1538 return ErrorCode::OK;
1539 }
1540 }
1541 return ErrorCode::FULL;
1542 }
1543
1544 void UnregisterBalancedSubscriber(uint32_t subscriber_index)
1545 {
1546 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1547 {
1548 uint32_t expected = subscriber_index;
1549 if (balanced_members_[i].compare_exchange_strong(expected, INVALID_INDEX,
1550 std::memory_order_acq_rel,
1551 std::memory_order_relaxed))
1552 {
1553 return;
1554 }
1555 }
1556 }
1557
1558 bool SelectBalancedSubscriber(uint32_t& subscriber_index)
1559 {
1560 const uint64_t base = balanced_group_->rr_cursor.fetch_add(1, std::memory_order_acq_rel);
1561 for (uint32_t offset = 0; offset < subscriber_capacity_; ++offset)
1562 {
1563 const uint32_t member_index =
1564 balanced_members_[(base + offset) % subscriber_capacity_].load(std::memory_order_acquire);
1565 if (member_index == INVALID_INDEX)
1566 {
1567 continue;
1568 }
1569 if (subscribers_[member_index].active.load(std::memory_order_acquire) == 0)
1570 {
1571 continue;
1572 }
1573 if (subscribers_[member_index].mode.load(std::memory_order_acquire) !=
1574 static_cast<uint32_t>(LinuxSharedSubscriberMode::BALANCE_RR))
1575 {
1576 continue;
1577 }
1578 const ProcessIdentity owner_identity = {
1579 subscribers_[member_index].owner_pid.load(std::memory_order_acquire),
1580 subscribers_[member_index].owner_starttime.load(std::memory_order_acquire),
1581 };
1582 if (owner_identity.pid == 0 || owner_identity.starttime == 0)
1583 {
1584 continue;
1585 }
1586 if (!ProcessAlive(owner_identity))
1587 {
1588 ReclaimSubscriber(member_index);
1589 continue;
1590 }
1591 if (!QueueHasSpace(member_index))
1592 {
1593 continue;
1594 }
1595 subscriber_index = member_index;
1596 return true;
1597 }
1598 return false;
1599 }
1600
1601 bool ReclaimSubscriber(uint32_t subscriber_index)
1602 {
1603 uint32_t expected = 1;
1604 if (!subscribers_[subscriber_index].active.compare_exchange_strong(
1605 expected, 0, std::memory_order_acq_rel, std::memory_order_relaxed))
1606 {
1607 return false;
1608 }
1609
1610 if (subscribers_[subscriber_index].mode.load(std::memory_order_acquire) ==
1611 static_cast<uint32_t>(LinuxSharedSubscriberMode::BALANCE_RR))
1612 {
1613 UnregisterBalancedSubscriber(subscriber_index);
1614 }
1615
1616 subscribers_[subscriber_index].owner_pid.store(0, std::memory_order_release);
1617 subscribers_[subscriber_index].owner_starttime.store(0, std::memory_order_release);
1618 subscribers_[subscriber_index].mode.store(
1619 static_cast<uint32_t>(LinuxSharedSubscriberMode::BROADCAST_FULL),
1620 std::memory_order_release);
1621
1622 const uint32_t held_slot =
1623 subscribers_[subscriber_index].held_slot.exchange(INVALID_INDEX,
1624 std::memory_order_acq_rel);
1625 if (held_slot != INVALID_INDEX)
1626 {
1627 ReleaseSlot(held_slot);
1628 }
1629
1630 Descriptor desc = {};
1631 while (TryPopDescriptor(subscriber_index, desc) == ErrorCode::OK)
1632 {
1633 ReleaseSlot(desc.slot_index);
1634 }
1635
1636 return true;
1637 }
1638
1639 static void PostReady(SubscriberControl& control)
1640 {
1641 control.ready_sem_count.fetch_add(1, std::memory_order_release);
1642 FutexWake(&control.ready_sem_count);
1643 }
1644
1645 static void ConsumeReady(SubscriberControl& control)
1646 {
1647 const uint32_t prev = control.ready_sem_count.fetch_sub(1, std::memory_order_acq_rel);
1648 ASSERT(prev > 0);
1649 }
1650
1651 static ErrorCode WaitReady(SubscriberControl& control, uint32_t timeout_ms)
1652 {
1653 if (control.ready_sem_count.load(std::memory_order_acquire) != 0)
1654 {
1655 return ErrorCode::OK;
1656 }
1657
1658 const bool infinite_wait = (timeout_ms == UINT32_MAX);
1659 const uint64_t deadline_ms = infinite_wait ? 0 : (NowMonotonicMs() + timeout_ms);
1660
1661 while (true)
1662 {
1663 if (control.ready_sem_count.load(std::memory_order_acquire) != 0)
1664 {
1665 return ErrorCode::OK;
1666 }
1667
1668 uint32_t wait_ms = UINT32_MAX;
1669 if (!infinite_wait)
1670 {
1671 wait_ms = MonotonicTime::RemainingMilliseconds(deadline_ms);
1672 if (wait_ms == 0)
1673 {
1674 return ErrorCode::TIMEOUT;
1675 }
1676 }
1677
1678 wait_ms = MonotonicTime::WaitSliceMilliseconds(wait_ms);
1679
1680 const int futex_ans = FutexWait(&control.ready_sem_count, 0, wait_ms);
1681 if (futex_ans == 0 || errno == EAGAIN || errno == EINTR)
1682 {
1683 continue;
1684 }
1685
1686 if (errno == ETIMEDOUT)
1687 {
1688 if (infinite_wait)
1689 {
1690 continue;
1691 }
1692 if (MonotonicTime::RemainingMilliseconds(deadline_ms) == 0 &&
1693 control.ready_sem_count.load(std::memory_order_acquire) == 0)
1694 {
1695 return ErrorCode::TIMEOUT;
1696 }
1697 continue;
1698 }
1699
1700 return ErrorCode::FAILED;
1701 }
1702 }
1703
1704 void ScavengeDeadSubscribers()
1705 {
1706 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1707 {
1708 if (subscribers_[i].active.load(std::memory_order_acquire) == 0)
1709 {
1710 continue;
1711 }
1712
1713 const ProcessIdentity owner_identity = {
1714 subscribers_[i].owner_pid.load(std::memory_order_acquire),
1715 subscribers_[i].owner_starttime.load(std::memory_order_acquire),
1716 };
1717 if (ProcessAlive(owner_identity))
1718 {
1719 continue;
1720 }
1721
1722 ReclaimSubscriber(i);
1723 }
1724 }
1725
1726 bool QueueHasSpace(uint32_t subscriber_index) const
1727 {
1728 const SubscriberControl& control = subscribers_[subscriber_index];
1729 const uint32_t head = control.queue_head.load(std::memory_order_acquire);
1730 const uint32_t tail = control.queue_tail.load(std::memory_order_relaxed);
1731 const uint32_t next_tail = (tail + 1U) % queue_capacity_;
1732 return next_tail != head;
1733 }
1734
1735 void PushDescriptor(uint32_t subscriber_index, const Descriptor& descriptor)
1736 {
1737 SubscriberControl& control = subscribers_[subscriber_index];
1738 Descriptor* ring = DescriptorRing(subscriber_index);
1739
1740 const uint32_t tail = control.queue_tail.load(std::memory_order_relaxed);
1741 ring[tail] = descriptor;
1742 const uint32_t next_tail = (tail + 1U) % queue_capacity_;
1743 control.queue_tail.store(next_tail, std::memory_order_release);
1744 PostReady(control);
1745 }
1746
1747 ErrorCode TryPopDescriptor(uint32_t subscriber_index, Descriptor& descriptor)
1748 {
1749 SubscriberControl& control = subscribers_[subscriber_index];
1750 Descriptor* ring = DescriptorRing(subscriber_index);
1751
1752 while (true)
1753 {
1754 uint32_t head = control.queue_head.load(std::memory_order_relaxed);
1755 const uint32_t tail = control.queue_tail.load(std::memory_order_acquire);
1756 if (head == tail)
1757 {
1758 return ErrorCode::EMPTY;
1759 }
1760
1761 descriptor = ring[head];
1762 const uint32_t next_head = (head + 1U) % queue_capacity_;
1763 if (control.queue_head.compare_exchange_weak(head, next_head, std::memory_order_acq_rel,
1764 std::memory_order_relaxed))
1765 {
1766 ConsumeReady(control);
1767 return ErrorCode::OK;
1768 }
1769 }
1770 }
1771
1772 ErrorCode DropDescriptor(uint32_t subscriber_index)
1773 {
1774 SubscriberControl& control = subscribers_[subscriber_index];
1775 Descriptor* ring = DescriptorRing(subscriber_index);
1776
1777 while (true)
1778 {
1779 uint32_t head = control.queue_head.load(std::memory_order_relaxed);
1780 const uint32_t tail = control.queue_tail.load(std::memory_order_acquire);
1781 if (head == tail)
1782 {
1783 return ErrorCode::EMPTY;
1784 }
1785
1786 const Descriptor descriptor = ring[head];
1787 const uint32_t next_head = (head + 1U) % queue_capacity_;
1788 if (control.queue_head.compare_exchange_weak(head, next_head, std::memory_order_acq_rel,
1789 std::memory_order_relaxed))
1790 {
1791 control.dropped_messages.fetch_add(1, std::memory_order_relaxed);
1792 ConsumeReady(control);
1793 ReleaseSlot(descriptor.slot_index);
1794 return ErrorCode::OK;
1795 }
1796 }
1797 }
1798
1799 ErrorCode PopFreeSlot(uint32_t& slot_index)
1800 {
1801 while (true)
1802 {
1803 uint64_t head = header_->free_queue_head.load(std::memory_order_relaxed);
1804 FreeSlotCell& cell = free_slots_[head % slot_count_];
1805 const uint64_t seq = cell.sequence.load(std::memory_order_acquire);
1806 const intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(head + 1U);
1807
1808 if (diff == 0)
1809 {
1810 if (header_->free_queue_head.compare_exchange_weak(head, head + 1U,
1811 std::memory_order_acq_rel,
1812 std::memory_order_relaxed))
1813 {
1814 slot_index = cell.slot_index;
1815 cell.sequence.store(head + slot_count_, std::memory_order_release);
1816 return ErrorCode::OK;
1817 }
1818 }
1819 else if (diff < 0)
1820 {
1821 return ErrorCode::FULL;
1822 }
1823 }
1824 }
1825
1826 void RecycleSlot(uint32_t slot_index)
1827 {
1828 slots_[slot_index].sequence.store(0, std::memory_order_release);
1829 slots_[slot_index].timestamp_us = 0;
1830
1831 while (true)
1832 {
1833 uint64_t tail = header_->free_queue_tail.load(std::memory_order_relaxed);
1834 FreeSlotCell& cell = free_slots_[tail % slot_count_];
1835 const uint64_t seq = cell.sequence.load(std::memory_order_acquire);
1836 const intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(tail);
1837
1838 if (diff == 0)
1839 {
1840 if (header_->free_queue_tail.compare_exchange_weak(tail, tail + 1U,
1841 std::memory_order_acq_rel,
1842 std::memory_order_relaxed))
1843 {
1844 cell.slot_index = slot_index;
1845 cell.sequence.store(tail + 1U, std::memory_order_release);
1846 return;
1847 }
1848 }
1849 }
1850 }
1851
1852 void ReleaseSlot(uint32_t slot_index)
1853 {
1854 const uint32_t prev = slots_[slot_index].refcount.fetch_sub(1, std::memory_order_acq_rel);
1855 ASSERT(prev > 0);
1856 if (prev == 1)
1857 {
1858 RecycleSlot(slot_index);
1859 }
1860 }
1861
1862 template <bool HAS_TIMESTAMP>
1863 ErrorCode PublishData(SharedData& data,
1864 MicrosecondTimestamp timestamp = MicrosecondTimestamp())
1865 {
1866 if (!data.Valid() || data.topic_ != this)
1867 {
1868 return ErrorCode::STATE_ERR;
1869 }
1870
1871 uint32_t active_count = 0;
1872 uint32_t balanced_target = INVALID_INDEX;
1873 bool has_balanced_subscriber = false;
1874 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1875 {
1876 if (subscribers_[i].active.load(std::memory_order_acquire) == 0)
1877 {
1878 continue;
1879 }
1880
1881 const LinuxSharedSubscriberMode mode = static_cast<LinuxSharedSubscriberMode>(
1882 subscribers_[i].mode.load(std::memory_order_acquire));
1883 if (mode == LinuxSharedSubscriberMode::BALANCE_RR)
1884 {
1885 has_balanced_subscriber = true;
1886 continue;
1887 }
1888
1889 if (!QueueHasSpace(i))
1890 {
1891 ScavengeDeadSubscribers();
1892 if (subscribers_[i].active.load(std::memory_order_acquire) == 0)
1893 {
1894 continue;
1895 }
1896
1897 if (mode == LinuxSharedSubscriberMode::BROADCAST_DROP_OLD)
1898 {
1899 const ErrorCode drop_ans = DropDescriptor(i);
1900 if (drop_ans == ErrorCode::EMPTY && QueueHasSpace(i))
1901 {
1902 // 消费者可能在 QueueHasSpace() 和 DropDescriptor() 之间弹走旧描述符。
1903 // 此时队列已经有空位,发布者继续写入即可,不能把竞态误报成 FULL。
1904 }
1905 else if (drop_ans != ErrorCode::OK)
1906 {
1907 header_->publish_failures.fetch_add(1, std::memory_order_relaxed);
1908 data.Reset();
1909 return ErrorCode::FULL;
1910 }
1911 }
1912 else
1913 {
1914 subscribers_[i].dropped_messages.fetch_add(1, std::memory_order_relaxed);
1915 header_->publish_failures.fetch_add(1, std::memory_order_relaxed);
1916 data.Reset();
1917 return ErrorCode::FULL;
1918 }
1919 }
1920
1921 ++active_count;
1922 }
1923
1924 if (has_balanced_subscriber)
1925 {
1926 if (!SelectBalancedSubscriber(balanced_target))
1927 {
1928 ScavengeDeadSubscribers();
1929 if (!SelectBalancedSubscriber(balanced_target))
1930 {
1931 header_->publish_failures.fetch_add(1, std::memory_order_relaxed);
1932 data.Reset();
1933 return ErrorCode::FULL;
1934 }
1935 }
1936 ++active_count;
1937 }
1938
1939 if (active_count == 0)
1940 {
1941 data.Reset();
1942 return ErrorCode::OK;
1943 }
1944
1945 const uint64_t sequence =
1946 header_->next_sequence.fetch_add(1, std::memory_order_acq_rel) + 1ULL;
1947 if constexpr (!HAS_TIMESTAMP)
1948 {
1949 timestamp = NowMessageTimestamp();
1950 }
1951 SlotControl& slot = slots_[data.slot_index_];
1952 slot.refcount.store(active_count, std::memory_order_release);
1953 slot.timestamp_us = ToSharedTimestamp(timestamp);
1954 slot.sequence.store(sequence, std::memory_order_release);
1955
1956 const Descriptor descriptor = {data.slot_index_, 0U, sequence};
1957 for (uint32_t i = 0; i < subscriber_capacity_; ++i)
1958 {
1959 if (subscribers_[i].active.load(std::memory_order_acquire) == 0)
1960 {
1961 continue;
1962 }
1963 const LinuxSharedSubscriberMode mode = static_cast<LinuxSharedSubscriberMode>(
1964 subscribers_[i].mode.load(std::memory_order_acquire));
1965 if (mode == LinuxSharedSubscriberMode::BALANCE_RR)
1966 {
1967 continue;
1968 }
1969 PushDescriptor(i, descriptor);
1970 }
1971
1972 if (balanced_target != INVALID_INDEX)
1973 {
1974 PushDescriptor(balanced_target, descriptor);
1975 }
1976
1977 data.topic_ = nullptr;
1978 data.slot_index_ = INVALID_INDEX;
1979 return ErrorCode::OK;
1980 }
1981
1982 bool create_ = false;
1983 bool publisher_ = false;
1984 LinuxSharedTopicConfig config_;
1985 uint32_t domain_crc32_ = 0;
1986 std::string topic_name_;
1987 uint64_t name_key_ = 0;
1988 std::string shm_name_;
1989 ProcessIdentity self_identity_ = {};
1990
1991 int fd_ = -1;
1992 void* mapping_ = nullptr;
1993 uint8_t* base_ = nullptr;
1994 size_t mapping_size_ = 0;
1995
1996 SharedHeader* header_ = nullptr;
1997 char* topic_name_ptr_ = nullptr;
1998 SlotControl* slots_ = nullptr;
1999 SubscriberControl* subscribers_ = nullptr;
2000 BalancedGroupControl* balanced_group_ = nullptr;
2001 std::atomic<uint32_t>* balanced_members_ = nullptr;
2002 FreeSlotCell* free_slots_ = nullptr;
2003 Descriptor* descriptors_ = nullptr;
2004 TopicData* payloads_ = nullptr;
2005
2006 uint32_t slot_count_ = 0;
2007 uint32_t subscriber_capacity_ = 0;
2008 uint32_t queue_capacity_ = 0;
2009
2010 bool open_ok_ = false;
2011 ErrorCode open_status_ = ErrorCode::STATE_ERR;
2012
2013};
2014
2015} // namespace LibXR
2016
2017#endif
LibXR 命名空间
Definition ch32_can.hpp:14
ErrorCode
定义错误码枚举
@ INIT_ERR
初始化错误 | Initialization error
@ EMPTY
为空 | Empty