diff --git a/docs/config/fault-manager.rst b/docs/config/fault-manager.rst index 5daceef3..ab500223 100644 --- a/docs/config/fault-manager.rst +++ b/docs/config/fault-manager.rst @@ -166,6 +166,8 @@ Basic Snapshot Settings max_message_size: 65536 # Max message size in bytes (64KB) default_topics: [] # Topics to capture for all faults config_file: "" # Path to YAML config file + recapture_cooldown_sec: 60.0 # Min seconds between snapshot captures per fault + max_per_fault: 10 # Max snapshots stored per fault code (0 = unlimited) .. list-table:: :header-rows: 1 @@ -192,6 +194,14 @@ Basic Snapshot Settings * - ``snapshots.config_file`` - ``""`` - Path to YAML file with fault-specific snapshot configurations. + * - ``snapshots.recapture_cooldown_sec`` + - ``60.0`` + - Minimum seconds between snapshot captures for the same fault code. + Prevents snapshot storms when a fault is reported repeatedly. Set to 0 to disable. + * - ``snapshots.max_per_fault`` + - ``10`` + - Maximum number of snapshots stored per fault code. When the limit is reached, + new snapshots for that fault are rejected. Set to 0 for unlimited. Rosbag Recording ~~~~~~~~~~~~~~~~ @@ -311,6 +321,8 @@ Complete Example background_capture: true timeout_sec: 2.0 max_message_size: 131072 + recapture_cooldown_sec: 60.0 + max_per_fault: 10 default_topics: - /diagnostics - /rosout diff --git a/docs/tutorials/snapshots.rst b/docs/tutorials/snapshots.rst index 631589a5..6fdbd501 100644 --- a/docs/tutorials/snapshots.rst +++ b/docs/tutorials/snapshots.rst @@ -82,6 +82,14 @@ Configure snapshot capture via fault manager parameters: * - ``snapshots.background_capture`` - ``false`` - Use background subscriptions (caches latest message) + * - ``snapshots.recapture_cooldown_sec`` + - ``60.0`` + - Minimum seconds between snapshot captures for the same fault code. + Prevents snapshot storms when a fault is reported repeatedly. Set to 0 to disable. + * - ``snapshots.max_per_fault`` + - ``10`` + - Maximum number of snapshots stored per fault code. When the limit is reached, + new snapshots for that fault are rejected. Set to 0 for unlimited. Advanced Configuration ---------------------- diff --git a/src/ros2_medkit_diagnostic_bridge/test/test_integration.test.py b/src/ros2_medkit_diagnostic_bridge/test/test_integration.test.py index e026a2d9..b0200f79 100644 --- a/src/ros2_medkit_diagnostic_bridge/test/test_integration.test.py +++ b/src/ros2_medkit_diagnostic_bridge/test/test_integration.test.py @@ -166,10 +166,15 @@ def test_04_ok_diagnostic_heals_fault(self): faults = self.list_faults() self.assertIn('HEALING_TEST', [f.fault_code for f in faults]) - # Send OK multiple times to ensure PASSED event reaches FaultManager - for _ in range(2): + # Send OK multiple times to bypass FaultReporter's local PASSED filtering + # (default threshold=3, same as FAILED events). Extra iterations ensure + # the async service call has time to reach FaultManager. + for _ in range(4): self.publish_diagnostic('healing_test', DiagnosticStatus.OK) + # Allow time for the async PASSED service call to be processed + time.sleep(1.0) + # Check fault is healed (query all statuses to find it) faults = self.list_faults(statuses=[Fault.STATUS_CONFIRMED, Fault.STATUS_HEALED]) fault = next((f for f in faults if f.fault_code == 'HEALING_TEST'), None) diff --git a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_manager_node.hpp b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_manager_node.hpp index 20b2f5c6..2e8840cd 100644 --- a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_manager_node.hpp +++ b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_manager_node.hpp @@ -14,8 +14,10 @@ #pragma once +#include #include #include +#include #include "rclcpp/rclcpp.hpp" #include "ros2_medkit_fault_manager/correlation/correlation_engine.hpp" @@ -137,6 +139,7 @@ class FaultManagerNode : public rclcpp::Node { bool healing_enabled_{false}; int32_t healing_threshold_{3}; double auto_confirm_after_sec_{0.0}; + double snapshot_recapture_cooldown_sec_{60.0}; DebounceConfig global_config_; ///< Global debounce config (built from ROS params) std::unique_ptr storage_; std::unique_ptr threshold_resolver_; ///< Per-entity threshold overrides @@ -171,6 +174,10 @@ class FaultManagerNode : public rclcpp::Node { /// Tracks active capture threads for clean shutdown (join before destruction) std::mutex capture_threads_mutex_; std::vector capture_threads_; + + /// Per-fault cooldown tracking for snapshot recapture + std::mutex last_capture_mutex_; + std::unordered_map last_capture_times_; }; } // namespace ros2_medkit_fault_manager diff --git a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_storage.hpp b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_storage.hpp index 13fef72d..8c66733f 100644 --- a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_storage.hpp +++ b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_storage.hpp @@ -146,6 +146,10 @@ class FaultStorage { /// @return Number of faults that were confirmed virtual size_t check_time_based_confirmation(const rclcpp::Time & current_time) = 0; + /// Set maximum snapshots per fault code (0 = unlimited) + virtual void set_max_snapshots_per_fault(size_t /*max_count*/) { + } + /// Store a snapshot captured when a fault was confirmed /// @param snapshot The snapshot data to store virtual void store_snapshot(const SnapshotData & snapshot) = 0; @@ -221,6 +225,8 @@ class InMemoryFaultStorage : public FaultStorage { size_t check_time_based_confirmation(const rclcpp::Time & current_time) override; + void set_max_snapshots_per_fault(size_t max_count) override; + void store_snapshot(const SnapshotData & snapshot) override; std::vector get_snapshots(const std::string & fault_code, const std::string & topic_filter = "") const override; @@ -242,6 +248,7 @@ class InMemoryFaultStorage : public FaultStorage { std::vector snapshots_; std::map rosbag_files_; ///< fault_code -> rosbag info DebounceConfig config_; + size_t max_snapshots_per_fault_{0}; ///< 0 = unlimited }; } // namespace ros2_medkit_fault_manager diff --git a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/sqlite_fault_storage.hpp b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/sqlite_fault_storage.hpp index 4a71c44c..0de89805 100644 --- a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/sqlite_fault_storage.hpp +++ b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/sqlite_fault_storage.hpp @@ -61,6 +61,8 @@ class SqliteFaultStorage : public FaultStorage { size_t check_time_based_confirmation(const rclcpp::Time & current_time) override; + void set_max_snapshots_per_fault(size_t max_count) override; + void store_snapshot(const SnapshotData & snapshot) override; std::vector get_snapshots(const std::string & fault_code, const std::string & topic_filter = "") const override; @@ -92,6 +94,7 @@ class SqliteFaultStorage : public FaultStorage { sqlite3 * db_{nullptr}; mutable std::mutex mutex_; DebounceConfig config_; + size_t max_snapshots_per_fault_{0}; ///< 0 = unlimited }; } // namespace ros2_medkit_fault_manager diff --git a/src/ros2_medkit_fault_manager/src/fault_manager_node.cpp b/src/ros2_medkit_fault_manager/src/fault_manager_node.cpp index 52daaf59..95c44fc4 100644 --- a/src/ros2_medkit_fault_manager/src/fault_manager_node.cpp +++ b/src/ros2_medkit_fault_manager/src/fault_manager_node.cpp @@ -101,9 +101,27 @@ FaultManagerNode::FaultManagerNode(const rclcpp::NodeOptions & options) : Node(" auto_confirm_after_sec_ = 0.0; } + // Capture cooldown parameters (gates both snapshot and rosbag capture) + snapshot_recapture_cooldown_sec_ = declare_parameter("snapshots.recapture_cooldown_sec", 60.0); + if (snapshot_recapture_cooldown_sec_ < 0.0) { + RCLCPP_WARN(get_logger(), "snapshots.recapture_cooldown_sec should be >= 0, got %.2f. Disabling.", + snapshot_recapture_cooldown_sec_); + snapshot_recapture_cooldown_sec_ = 0.0; + } + int max_snapshots = declare_parameter("snapshots.max_per_fault", 10); + if (max_snapshots < 0) { + RCLCPP_WARN(get_logger(), "snapshots.max_per_fault should be >= 0, got %d. Disabling limit.", max_snapshots); + max_snapshots = 0; + } + // Create storage backend storage_ = create_storage(); + // Apply snapshot limit to storage + if (max_snapshots > 0) { + storage_->set_max_snapshots_per_fault(static_cast(max_snapshots)); + } + // Create event publisher for SSE streaming event_publisher_ = create_publisher("~/events", rclcpp::QoS(100).reliable()); @@ -388,23 +406,40 @@ void FaultManagerNode::handle_report_fault( // from being processed during capture. SnapshotCapture::capture_topic_on_demand // uses a local callback group + local executor, so it's safe from a separate thread. if (just_confirmed && (snapshot_capture_ || rosbag_capture_)) { - std::string fault_code = request->fault_code; - auto snapshot_cap = snapshot_capture_; - auto rosbag_cap = rosbag_capture_; - auto logger = get_logger(); - - std::thread capture_thread([snapshot_cap, rosbag_cap, fault_code, logger]() { - if (snapshot_cap) { - snapshot_cap->capture(fault_code); + // Check recapture cooldown - skip if captured recently for this fault + bool should_capture = true; + if (snapshot_recapture_cooldown_sec_ > 0.0) { + std::lock_guard lock(last_capture_mutex_); + auto it = last_capture_times_.find(request->fault_code); + if (it != last_capture_times_.end()) { + auto elapsed = std::chrono::steady_clock::now() - it->second; + if (elapsed < std::chrono::duration(snapshot_recapture_cooldown_sec_)) { + should_capture = false; + RCLCPP_DEBUG(get_logger(), "Skipping capture for '%s' - cooldown active", request->fault_code.c_str()); + } } - if (rosbag_cap) { - rosbag_cap->on_fault_confirmed(fault_code); + if (should_capture) { + last_capture_times_[request->fault_code] = std::chrono::steady_clock::now(); } - }); - { - std::lock_guard ct_lock(capture_threads_mutex_); - capture_threads_.push_back(std::move(capture_thread)); } + + if (should_capture) { + std::string fault_code = request->fault_code; + auto snapshot_cap = snapshot_capture_; + auto rosbag_cap = rosbag_capture_; + std::thread capture_thread([snapshot_cap, rosbag_cap, fault_code]() { + if (snapshot_cap) { + snapshot_cap->capture(fault_code); + } + if (rosbag_cap) { + rosbag_cap->on_fault_confirmed(fault_code); + } + }); + { + std::lock_guard ct_lock(capture_threads_mutex_); + capture_threads_.push_back(std::move(capture_thread)); + } + } // if (should_capture) } // Handle PREFAILED state for lazy_start rosbag capture @@ -506,6 +541,15 @@ void FaultManagerNode::handle_clear_fault( response->success = cleared; if (cleared) { + // Evict cooldown tracking for cleared fault and auto-cleared symptoms + { + std::lock_guard lock(last_capture_mutex_); + last_capture_times_.erase(request->fault_code); + for (const auto & symptom_code : auto_cleared_codes) { + last_capture_times_.erase(symptom_code); + } + } + // Auto-clear correlated symptoms for (const auto & symptom_code : auto_cleared_codes) { storage_->clear_fault(symptom_code); diff --git a/src/ros2_medkit_fault_manager/src/fault_storage.cpp b/src/ros2_medkit_fault_manager/src/fault_storage.cpp index b934fdc9..c8db7e51 100644 --- a/src/ros2_medkit_fault_manager/src/fault_storage.cpp +++ b/src/ros2_medkit_fault_manager/src/fault_storage.cpp @@ -290,8 +290,25 @@ size_t InMemoryFaultStorage::check_time_based_confirmation(const rclcpp::Time & return confirmed_count; } +void InMemoryFaultStorage::set_max_snapshots_per_fault(size_t max_count) { + std::lock_guard lock(mutex_); + max_snapshots_per_fault_ = max_count; +} + void InMemoryFaultStorage::store_snapshot(const SnapshotData & snapshot) { std::lock_guard lock(mutex_); + if (max_snapshots_per_fault_ > 0) { + size_t count = 0; + for (const auto & s : snapshots_) { + if (s.fault_code == snapshot.fault_code) { + ++count; + } + } + if (count >= max_snapshots_per_fault_) { + // Silent rejection: storage layer has no logger. Callers should log if needed. + return; // Reject new - keep first N inserted snapshots + } + } snapshots_.push_back(snapshot); } diff --git a/src/ros2_medkit_fault_manager/src/sqlite_fault_storage.cpp b/src/ros2_medkit_fault_manager/src/sqlite_fault_storage.cpp index 4a978056..4befa641 100644 --- a/src/ros2_medkit_fault_manager/src/sqlite_fault_storage.cpp +++ b/src/ros2_medkit_fault_manager/src/sqlite_fault_storage.cpp @@ -690,9 +690,25 @@ size_t SqliteFaultStorage::check_time_based_confirmation(const rclcpp::Time & cu return static_cast(sqlite3_changes(db_)); } +void SqliteFaultStorage::set_max_snapshots_per_fault(size_t max_count) { + std::lock_guard lock(mutex_); + max_snapshots_per_fault_ = max_count; +} + void SqliteFaultStorage::store_snapshot(const SnapshotData & snapshot) { std::lock_guard lock(mutex_); + // Check snapshot limit per fault (reject-new strategy: keep earliest) + if (max_snapshots_per_fault_ > 0) { + SqliteStatement count_stmt(db_, "SELECT COUNT(*) FROM snapshots WHERE fault_code = ?"); + count_stmt.bind_text(1, snapshot.fault_code); + if (count_stmt.step() == SQLITE_ROW && + count_stmt.column_int64(0) >= static_cast(max_snapshots_per_fault_)) { + // Silent rejection: storage layer has no logger. Callers should log if needed. + return; // Reject new - keep first N inserted snapshots + } + } + SqliteStatement stmt(db_, "INSERT INTO snapshots (fault_code, topic, message_type, data, captured_at_ns) " "VALUES (?, ?, ?, ?, ?)"); diff --git a/src/ros2_medkit_fault_manager/test/test_fault_manager.cpp b/src/ros2_medkit_fault_manager/test/test_fault_manager.cpp index 0e348779..d3e8ce17 100644 --- a/src/ros2_medkit_fault_manager/test/test_fault_manager.cpp +++ b/src/ros2_medkit_fault_manager/test/test_fault_manager.cpp @@ -1093,6 +1093,177 @@ TEST(MatchesEntityTest, EmptySources) { EXPECT_FALSE(FaultManagerNode::matches_entity(sources, "motor_controller")); } +// --- InMemoryFaultStorage snapshot limit tests --- + +TEST(InMemorySnapshotLimitTest, RejectsWhenFull) { + InMemoryFaultStorage storage; + storage.set_max_snapshots_per_fault(2); + + ros2_medkit_fault_manager::SnapshotData snap; + snap.fault_code = "TEST"; + snap.topic = "/test"; + snap.message_type = "std_msgs/msg/String"; + snap.data = "{}"; + + snap.captured_at_ns = 1000; + storage.store_snapshot(snap); + snap.captured_at_ns = 2000; + storage.store_snapshot(snap); + snap.captured_at_ns = 3000; + storage.store_snapshot(snap); // Should be rejected + + auto result = storage.get_snapshots("TEST"); + EXPECT_EQ(result.size(), 2u); +} + +TEST(InMemorySnapshotLimitTest, UnlimitedWhenZero) { + InMemoryFaultStorage storage; + storage.set_max_snapshots_per_fault(0); + + ros2_medkit_fault_manager::SnapshotData snap; + snap.fault_code = "TEST"; + snap.topic = "/test"; + snap.message_type = "std_msgs/msg/String"; + snap.data = "{}"; + + for (int i = 0; i < 20; ++i) { + snap.captured_at_ns = static_cast(i * 1000); + storage.store_snapshot(snap); + } + + auto result = storage.get_snapshots("TEST"); + EXPECT_EQ(result.size(), 20u); +} + +// ============================================================================= +// Snapshot recapture cooldown test +// ============================================================================= + +class SnapshotCooldownTest : public ::testing::Test { + protected: + void SetUp() override { + rclcpp::NodeOptions options; + options.parameter_overrides({ + {"storage_type", "memory"}, + {"confirmation_threshold", -1}, // Immediate confirmation + {"healing_enabled", true}, + {"healing_threshold", 1}, // Heal on first PASSED + {"snapshots.enabled", true}, + {"snapshots.recapture_cooldown_sec", 0.5}, // Short cooldown for testing + {"snapshots.max_per_fault", 0}, // Unlimited (test cooldown, not limit) + }); + fault_manager_ = std::make_shared(options); + + test_node_ = std::make_shared("test_cooldown"); + report_client_ = test_node_->create_client("/fault_manager/report_fault"); + clear_client_ = test_node_->create_client("/fault_manager/clear_fault"); + get_fault_client_ = test_node_->create_client("/fault_manager/get_fault"); + + executor_.add_node(fault_manager_); + executor_.add_node(test_node_); + spin_thread_ = std::thread([this]() { + executor_.spin(); + }); + + ASSERT_TRUE(report_client_->wait_for_service(std::chrono::seconds(5))); + ASSERT_TRUE(clear_client_->wait_for_service(std::chrono::seconds(5))); + ASSERT_TRUE(get_fault_client_->wait_for_service(std::chrono::seconds(5))); + } + + void TearDown() override { + executor_.cancel(); + if (spin_thread_.joinable()) { + spin_thread_.join(); + } + executor_.remove_node(fault_manager_); + executor_.remove_node(test_node_); + report_client_.reset(); + clear_client_.reset(); + get_fault_client_.reset(); + test_node_.reset(); + fault_manager_.reset(); + } + + void spin_for(std::chrono::milliseconds duration) { + std::this_thread::sleep_for(duration); + } + + bool report_fault(const std::string & code) { + auto req = std::make_shared(); + req->fault_code = code; + req->event_type = ReportFault::Request::EVENT_FAILED; + req->severity = Fault::SEVERITY_CRITICAL; // Bypass debounce + req->description = "Test"; + req->source_id = "/test"; + auto future = report_client_->async_send_request(req); + spin_for(std::chrono::milliseconds(200)); + return future.wait_for(std::chrono::seconds(0)) == std::future_status::ready && future.get()->accepted; + } + + bool report_passed(const std::string & code) { + auto req = std::make_shared(); + req->fault_code = code; + req->event_type = ReportFault::Request::EVENT_PASSED; + req->severity = 0; + req->source_id = "/test"; + auto future = report_client_->async_send_request(req); + spin_for(std::chrono::milliseconds(200)); + return future.wait_for(std::chrono::seconds(0)) == std::future_status::ready && future.get()->accepted; + } + + size_t get_snapshot_count(const std::string & code) { + auto req = std::make_shared(); + req->fault_code = code; + auto future = get_fault_client_->async_send_request(req); + spin_for(std::chrono::milliseconds(200)); + if (future.wait_for(std::chrono::seconds(0)) != std::future_status::ready) { + return 0; + } + auto resp = future.get(); + return resp->environment_data.snapshots.size(); + } + + std::shared_ptr fault_manager_; + std::shared_ptr test_node_; + rclcpp::Client::SharedPtr report_client_; + rclcpp::Client::SharedPtr clear_client_; + rclcpp::Client::SharedPtr get_fault_client_; + rclcpp::executors::MultiThreadedExecutor executor_; + std::thread spin_thread_; +}; + +TEST_F(SnapshotCooldownTest, CooldownPreventsRapidRecapture) { + // First confirmation -> snapshot captured + ASSERT_TRUE(report_fault("COOLDOWN_TEST")); + spin_for(std::chrono::milliseconds(300)); + + // Heal and re-confirm immediately (within cooldown) + ASSERT_TRUE(report_passed("COOLDOWN_TEST")); + spin_for(std::chrono::milliseconds(100)); + ASSERT_TRUE(report_fault("COOLDOWN_TEST")); + spin_for(std::chrono::milliseconds(300)); + + // Only 1 snapshot capture should have occurred (second blocked by cooldown) + // Note: snapshot capture is async, so we check the storage-level count. + // Freeze-frame snapshots may or may not appear depending on topic availability, + // but the capture thread should have been spawned only once. + (void)get_snapshot_count("COOLDOWN_TEST"); + // With no topics to capture, count may be 0. The key assertion is that + // the second confirmation did NOT spawn a capture thread. + // We verify indirectly: wait for cooldown to expire and re-confirm. + std::this_thread::sleep_for(std::chrono::milliseconds(600)); // Cooldown=0.5s + + // Now cooldown expired, heal and re-confirm -> should capture again + ASSERT_TRUE(report_passed("COOLDOWN_TEST")); + spin_for(std::chrono::milliseconds(100)); + ASSERT_TRUE(report_fault("COOLDOWN_TEST")); + spin_for(std::chrono::milliseconds(300)); + + // This is a basic smoke test verifying the cooldown path doesn't crash + // and the node handles rapid re-confirmation gracefully. + SUCCEED(); +} + int main(int argc, char ** argv) { rclcpp::init(argc, argv); ::testing::InitGoogleTest(&argc, argv); diff --git a/src/ros2_medkit_fault_manager/test/test_sqlite_storage.cpp b/src/ros2_medkit_fault_manager/test/test_sqlite_storage.cpp index 1c9cfeab..1f414dff 100644 --- a/src/ros2_medkit_fault_manager/test/test_sqlite_storage.cpp +++ b/src/ros2_medkit_fault_manager/test/test_sqlite_storage.cpp @@ -797,6 +797,100 @@ TEST_F(SqliteFaultStorageTest, GetAllRosbagFilesReturnsSortedByCreatedAt) { EXPECT_EQ(all_rosbags[1].fault_code, "FAULT_B"); } +// ============================================================================= +// Snapshot limit tests (issue #308) +// ============================================================================= + +TEST_F(SqliteFaultStorageTest, SnapshotLimitRejectsNewWhenFull) { + using ros2_medkit_fault_manager::SnapshotData; + + rclcpp::Clock clock; + storage_->report_fault_event("MOTOR_OVERHEAT", ReportFault::Request::EVENT_FAILED, Fault::SEVERITY_ERROR, + "Motor overheated", "/motor_node", clock.now(), default_config()); + + // Set limit to 2 snapshots per fault + storage_->set_max_snapshots_per_fault(2); + + SnapshotData snap1; + snap1.fault_code = "MOTOR_OVERHEAT"; + snap1.topic = "/motor/temp"; + snap1.message_type = "std_msgs/msg/Float64"; + snap1.data = R"({"data": 85.0})"; + snap1.captured_at_ns = 1000; + + SnapshotData snap2 = snap1; + snap2.data = R"({"data": 90.0})"; + snap2.captured_at_ns = 2000; + + SnapshotData snap3 = snap1; + snap3.data = R"({"data": 95.0})"; + snap3.captured_at_ns = 3000; + + storage_->store_snapshot(snap1); + storage_->store_snapshot(snap2); + storage_->store_snapshot(snap3); // Should be rejected + + auto snapshots = storage_->get_snapshots("MOTOR_OVERHEAT"); + ASSERT_EQ(snapshots.size(), 2u) << "Third snapshot should be rejected (limit=2)"; + + // Earliest snapshots kept (reject-new strategy), returned newest-first + EXPECT_EQ(snapshots[0].captured_at_ns, 2000); + EXPECT_EQ(snapshots[1].captured_at_ns, 1000); +} + +TEST_F(SqliteFaultStorageTest, SnapshotLimitZeroMeansUnlimited) { + using ros2_medkit_fault_manager::SnapshotData; + + rclcpp::Clock clock; + storage_->report_fault_event("FAULT_A", ReportFault::Request::EVENT_FAILED, Fault::SEVERITY_ERROR, "desc", "/node", + clock.now(), default_config()); + + // Default (0) = unlimited + storage_->set_max_snapshots_per_fault(0); + + for (int i = 0; i < 20; ++i) { + SnapshotData snap; + snap.fault_code = "FAULT_A"; + snap.topic = "/topic"; + snap.message_type = "std_msgs/msg/String"; + snap.data = "{}"; + snap.captured_at_ns = i * 1000; + storage_->store_snapshot(snap); + } + + auto snapshots = storage_->get_snapshots("FAULT_A"); + EXPECT_EQ(snapshots.size(), 20u) << "Unlimited mode should store all snapshots"; +} + +TEST_F(SqliteFaultStorageTest, SnapshotLimitPerFaultNotGlobal) { + using ros2_medkit_fault_manager::SnapshotData; + + rclcpp::Clock clock; + storage_->report_fault_event("FAULT_A", ReportFault::Request::EVENT_FAILED, Fault::SEVERITY_ERROR, "desc", "/node", + clock.now(), default_config()); + storage_->report_fault_event("FAULT_B", ReportFault::Request::EVENT_FAILED, Fault::SEVERITY_ERROR, "desc", "/node", + clock.now(), default_config()); + + storage_->set_max_snapshots_per_fault(1); + + SnapshotData snap_a; + snap_a.fault_code = "FAULT_A"; + snap_a.topic = "/topic"; + snap_a.message_type = "std_msgs/msg/String"; + snap_a.data = "{}"; + snap_a.captured_at_ns = 1000; + + SnapshotData snap_b = snap_a; + snap_b.fault_code = "FAULT_B"; + + storage_->store_snapshot(snap_a); + storage_->store_snapshot(snap_b); + + // Both faults should have 1 snapshot each (limit is per-fault) + EXPECT_EQ(storage_->get_snapshots("FAULT_A").size(), 1u); + EXPECT_EQ(storage_->get_snapshots("FAULT_B").size(), 1u); +} + int main(int argc, char ** argv) { rclcpp::init(argc, argv); ::testing::InitGoogleTest(&argc, argv); diff --git a/src/ros2_medkit_fault_reporter/include/ros2_medkit_fault_reporter/local_filter.hpp b/src/ros2_medkit_fault_reporter/include/ros2_medkit_fault_reporter/local_filter.hpp index 522ebbf5..53198fdf 100644 --- a/src/ros2_medkit_fault_reporter/include/ros2_medkit_fault_reporter/local_filter.hpp +++ b/src/ros2_medkit_fault_reporter/include/ros2_medkit_fault_reporter/local_filter.hpp @@ -54,6 +54,9 @@ class LocalFilter { /// @return true if the report should be sent to FaultManager bool should_forward(const std::string & fault_code, uint8_t severity); + /// Check if a PASSED event should be forwarded + bool should_forward_passed(const std::string & fault_code); + /// Reset tracking for a specific fault code void reset(const std::string & fault_code); @@ -82,6 +85,7 @@ class LocalFilter { FilterConfig config_; std::unordered_map trackers_; + std::unordered_map passed_trackers_; mutable std::mutex mutex_; }; diff --git a/src/ros2_medkit_fault_reporter/src/fault_reporter.cpp b/src/ros2_medkit_fault_reporter/src/fault_reporter.cpp index a84b97cb..b08725b6 100644 --- a/src/ros2_medkit_fault_reporter/src/fault_reporter.cpp +++ b/src/ros2_medkit_fault_reporter/src/fault_reporter.cpp @@ -80,13 +80,16 @@ void FaultReporter::report(const std::string & fault_code, uint8_t severity, con } void FaultReporter::report_passed(const std::string & fault_code) { - // Validate fault_code if (fault_code.empty()) { RCLCPP_WARN(logger_, "Attempted to report PASSED with empty fault_code, ignoring"); return; } - // PASSED events bypass local filtering and are always forwarded + if (!filter_.should_forward_passed(fault_code)) { + RCLCPP_DEBUG(logger_, "PASSED for '%s' filtered (threshold not met)", fault_code.c_str()); + return; + } + send_report(fault_code, ros2_medkit_msgs::srv::ReportFault::Request::EVENT_PASSED, 0, ""); } diff --git a/src/ros2_medkit_fault_reporter/src/local_filter.cpp b/src/ros2_medkit_fault_reporter/src/local_filter.cpp index 5584f10e..020f9aef 100644 --- a/src/ros2_medkit_fault_reporter/src/local_filter.cpp +++ b/src/ros2_medkit_fault_reporter/src/local_filter.cpp @@ -62,11 +62,13 @@ bool LocalFilter::should_forward(const std::string & fault_code, uint8_t severit void LocalFilter::reset(const std::string & fault_code) { std::lock_guard lock(mutex_); trackers_.erase(fault_code); + passed_trackers_.erase(fault_code); } void LocalFilter::reset_all() { std::lock_guard lock(mutex_); trackers_.clear(); + passed_trackers_.clear(); } void LocalFilter::set_config(const FilterConfig & config) { @@ -74,6 +76,7 @@ void LocalFilter::set_config(const FilterConfig & config) { config_ = config; validate_config(); trackers_.clear(); + passed_trackers_.clear(); } void LocalFilter::cleanup_expired(FaultTracker & tracker, std::chrono::steady_clock::time_point now) { @@ -90,4 +93,21 @@ void LocalFilter::cleanup_expired(FaultTracker & tracker, std::chrono::steady_cl tracker.timestamps.erase(tracker.timestamps.begin(), it); } +bool LocalFilter::should_forward_passed(const std::string & fault_code) { + std::lock_guard lock(mutex_); + + if (!config_.enabled) { + return true; + } + + auto now = std::chrono::steady_clock::now(); + auto & tracker = passed_trackers_[fault_code]; + + cleanup_expired(tracker, now); + + tracker.timestamps.push_back(now); + + return static_cast(tracker.timestamps.size()) >= config_.default_threshold; +} + } // namespace ros2_medkit_fault_reporter diff --git a/src/ros2_medkit_fault_reporter/test/test_local_filter.cpp b/src/ros2_medkit_fault_reporter/test/test_local_filter.cpp index 0c6084c7..68e72e7a 100644 --- a/src/ros2_medkit_fault_reporter/test/test_local_filter.cpp +++ b/src/ros2_medkit_fault_reporter/test/test_local_filter.cpp @@ -169,6 +169,38 @@ TEST_F(LocalFilterTest, ContinuousReportingAfterThreshold) { EXPECT_TRUE(filter_->should_forward("TEST_FAULT", Fault::SEVERITY_INFO)); } +// ============================================================================= +// PASSED event filtering tests +// ============================================================================= + +TEST_F(LocalFilterTest, PassedEventsAreFiltered) { + // PASSED events should go through the same threshold logic as FAILED + // Default threshold=3: first two should be filtered, third should forward + EXPECT_FALSE(filter_->should_forward_passed("TEST_FAULT")); + EXPECT_FALSE(filter_->should_forward_passed("TEST_FAULT")); + EXPECT_TRUE(filter_->should_forward_passed("TEST_FAULT")); +} + +TEST_F(LocalFilterTest, PassedFilterDisabledAlwaysForwards) { + FilterConfig config; + config.enabled = false; + filter_->set_config(config); + + EXPECT_TRUE(filter_->should_forward_passed("TEST_FAULT")); +} + +TEST_F(LocalFilterTest, PassedAndFailedTrackedSeparately) { + // FAILED and PASSED should have independent counters + EXPECT_FALSE(filter_->should_forward("TEST_FAULT", Fault::SEVERITY_INFO)); + EXPECT_FALSE(filter_->should_forward_passed("TEST_FAULT")); + EXPECT_FALSE(filter_->should_forward("TEST_FAULT", Fault::SEVERITY_INFO)); + EXPECT_FALSE(filter_->should_forward_passed("TEST_FAULT")); + + // Third FAILED forwards, but PASSED still needs one more + EXPECT_TRUE(filter_->should_forward("TEST_FAULT", Fault::SEVERITY_INFO)); + EXPECT_TRUE(filter_->should_forward_passed("TEST_FAULT")); +} + int main(int argc, char ** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/operation_manager.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/operation_manager.hpp index de47ad97..0ef598d7 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/operation_manager.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/operation_manager.hpp @@ -104,6 +104,12 @@ class OperationManager { ~OperationManager(); + /// Explicitly release subscriptions, clients, and tracked goals. + /// Call while executor is still running to allow safe callback cleanup. + /// Called automatically by destructor, but GatewayNode calls it earlier + /// during its shutdown sequence. + void shutdown(); + /// Set optional notifier for broadcasting operation status changes to trigger subsystem. void set_notifier(ResourceChangeNotifier * notifier); diff --git a/src/ros2_medkit_gateway/src/gateway_node.cpp b/src/ros2_medkit_gateway/src/gateway_node.cpp index 7c671272..af2a5c3d 100644 --- a/src/ros2_medkit_gateway/src/gateway_node.cpp +++ b/src/ros2_medkit_gateway/src/gateway_node.cpp @@ -1172,7 +1172,13 @@ GatewayNode::~GatewayNode() { if (plugin_mgr_) { plugin_mgr_->shutdown_all(); } - // 7. Normal member destruction (managers safe - all transports stopped) + // 7. Shutdown OperationManager (clears action status subscriptions and + // service clients while executor may still be spinning - prevents + // "terminate called without an active exception" during member destruction) + if (operation_mgr_) { + operation_mgr_->shutdown(); + } + // 8. Normal member destruction (managers safe - all transports stopped) } const ThreadSafeEntityCache & GatewayNode::get_thread_safe_cache() const { diff --git a/src/ros2_medkit_gateway/src/operation_manager.cpp b/src/ros2_medkit_gateway/src/operation_manager.cpp index 1a78a858..289e76bd 100644 --- a/src/ros2_medkit_gateway/src/operation_manager.cpp +++ b/src/ros2_medkit_gateway/src/operation_manager.cpp @@ -46,18 +46,27 @@ OperationManager::OperationManager(rclcpp::Node * node, DiscoveryManager * disco } OperationManager::~OperationManager() { - // Clear subscriptions before destruction to prevent callbacks on destroyed object. - // Subscription callbacks capture `this` - they must be unsubscribed first. + shutdown(); +} + +void OperationManager::shutdown() { + // Clear subscriptions to stop receiving callbacks. + // Must happen while executor can still process pending callbacks safely. { std::lock_guard lock(subscriptions_mutex_); status_subscriptions_.clear(); } + // Clear tracked goals + { + std::lock_guard lock(goals_mutex_); + tracked_goals_.clear(); + } + // Clear all service/action clients to prevent handle_response() on destroyed // pending_requests_. On Humble (compat shim), destroying a GenericServiceClient // with unfulfilled promises causes "terminate called without an active exception" - // if a future still references the shared state. Clearing clients here ensures - // the executor drops its references before member destruction. + // if a future still references the shared state. { std::unique_lock lock(clients_mutex_); action_clients_.clear(); diff --git a/src/ros2_medkit_gateway/test/test_operation_handlers.cpp b/src/ros2_medkit_gateway/test/test_operation_handlers.cpp index 500ef24b..384d9abc 100644 --- a/src/ros2_medkit_gateway/test/test_operation_handlers.cpp +++ b/src/ros2_medkit_gateway/test/test_operation_handlers.cpp @@ -169,6 +169,16 @@ class TestLongCalibrationActionServer : public rclcpp::Node { loop_rate.sleep(); } + + // Exited loop without succeed/cancel (shutdown or !rclcpp::ok()). + // Must abort the goal to prevent "terminate called without an active + // exception" when goal_handle is destroyed with unfinished state. + try { + auto abort_result = std::make_shared(); + goal_handle->abort(abort_result); + } catch (...) { + // Ignore errors during shutdown abort + } } rclcpp_action::Server::SharedPtr action_server_; @@ -274,10 +284,9 @@ class OperationHandlersFixtureTest : public ::testing::Test { } void TearDown() override { - if (action_server_node_ != nullptr) { - action_server_node_->prepare_shutdown(); - } - + // Cancel executor FIRST to stop callback delivery, then shutdown action server. + // Without this ordering, prepare_shutdown() resets action_server_ while + // executor may still be delivering callbacks to it. if (executor_ != nullptr) { executor_->cancel(); } @@ -286,6 +295,10 @@ class OperationHandlersFixtureTest : public ::testing::Test { spin_thread_.join(); } + if (action_server_node_ != nullptr) { + action_server_node_->prepare_shutdown(); + } + handlers_.reset(); ctx_.reset(); trigger_service_.reset();