Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/config/fault-manager.rst
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ Basic Snapshot Settings
max_message_size: 65536 # Max message size in bytes (64KB)
default_topics: [] # Topics to capture for all faults
config_file: "" # Path to YAML config file
recapture_cooldown_sec: 60.0 # Min seconds between snapshot captures per fault
max_per_fault: 10 # Max snapshots stored per fault code (0 = unlimited)

.. list-table::
:header-rows: 1
Expand All @@ -192,6 +194,14 @@ Basic Snapshot Settings
* - ``snapshots.config_file``
- ``""``
- Path to YAML file with fault-specific snapshot configurations.
* - ``snapshots.recapture_cooldown_sec``
- ``60.0``
- Minimum seconds between snapshot captures for the same fault code.
Prevents snapshot storms when a fault is reported repeatedly. Set to 0 to disable.
* - ``snapshots.max_per_fault``
- ``10``
- Maximum number of snapshots stored per fault code. When the limit is reached,
new snapshots for that fault are rejected. Set to 0 for unlimited.

Rosbag Recording
~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -311,6 +321,8 @@ Complete Example
background_capture: true
timeout_sec: 2.0
max_message_size: 131072
recapture_cooldown_sec: 60.0
max_per_fault: 10
default_topics:
- /diagnostics
- /rosout
Expand Down
8 changes: 8 additions & 0 deletions docs/tutorials/snapshots.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ Configure snapshot capture via fault manager parameters:
* - ``snapshots.background_capture``
- ``false``
- Use background subscriptions (caches latest message)
* - ``snapshots.recapture_cooldown_sec``
- ``60.0``
- Minimum seconds between snapshot captures for the same fault code.
Prevents snapshot storms when a fault is reported repeatedly. Set to 0 to disable.
* - ``snapshots.max_per_fault``
- ``10``
- Maximum number of snapshots stored per fault code. When the limit is reached,
new snapshots for that fault are rejected. Set to 0 for unlimited.

Advanced Configuration
----------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,15 @@ def test_04_ok_diagnostic_heals_fault(self):
faults = self.list_faults()
self.assertIn('HEALING_TEST', [f.fault_code for f in faults])

# Send OK multiple times to ensure PASSED event reaches FaultManager
for _ in range(2):
# Send OK multiple times to bypass FaultReporter's local PASSED filtering
# (default threshold=3, same as FAILED events). Extra iterations ensure
# the async service call has time to reach FaultManager.
for _ in range(4):
self.publish_diagnostic('healing_test', DiagnosticStatus.OK)

# Allow time for the async PASSED service call to be processed
time.sleep(1.0)

# Check fault is healed (query all statuses to find it)
faults = self.list_faults(statuses=[Fault.STATUS_CONFIRMED, Fault.STATUS_HEALED])
fault = next((f for f in faults if f.fault_code == 'HEALING_TEST'), None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

#pragma once

#include <chrono>
#include <memory>
#include <string>
#include <unordered_map>

#include "rclcpp/rclcpp.hpp"
#include "ros2_medkit_fault_manager/correlation/correlation_engine.hpp"
Expand Down Expand Up @@ -137,6 +139,7 @@ class FaultManagerNode : public rclcpp::Node {
bool healing_enabled_{false};
int32_t healing_threshold_{3};
double auto_confirm_after_sec_{0.0};
double snapshot_recapture_cooldown_sec_{60.0};
DebounceConfig global_config_; ///< Global debounce config (built from ROS params)
std::unique_ptr<FaultStorage> storage_;
std::unique_ptr<EntityThresholdResolver> threshold_resolver_; ///< Per-entity threshold overrides
Expand Down Expand Up @@ -171,6 +174,10 @@ class FaultManagerNode : public rclcpp::Node {
/// Tracks active capture threads for clean shutdown (join before destruction)
std::mutex capture_threads_mutex_;
std::vector<std::thread> capture_threads_;

/// Per-fault cooldown tracking for snapshot recapture
std::mutex last_capture_mutex_;
std::unordered_map<std::string, std::chrono::steady_clock::time_point> last_capture_times_;
};

} // namespace ros2_medkit_fault_manager
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ class FaultStorage {
/// @return Number of faults that were confirmed
virtual size_t check_time_based_confirmation(const rclcpp::Time & current_time) = 0;

/// Set maximum snapshots per fault code (0 = unlimited)
virtual void set_max_snapshots_per_fault(size_t /*max_count*/) {
}

Comment on lines +149 to +152
Copy link

Copilot AI Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FaultStorage is a public abstract interface, and adding a new pure-virtual method (set_max_snapshots_per_fault) is a source/ABI breaking change for any downstream storage implementations. If backwards compatibility matters, consider providing a default no-op implementation in the base class (non-pure virtual), or bump the relevant version / clearly mark this as a breaking change.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed - changed to virtual with default no-op to avoid breaking downstream.

/// Store a snapshot captured when a fault was confirmed
/// @param snapshot The snapshot data to store
virtual void store_snapshot(const SnapshotData & snapshot) = 0;
Expand Down Expand Up @@ -221,6 +225,8 @@ class InMemoryFaultStorage : public FaultStorage {

size_t check_time_based_confirmation(const rclcpp::Time & current_time) override;

void set_max_snapshots_per_fault(size_t max_count) override;

void store_snapshot(const SnapshotData & snapshot) override;
std::vector<SnapshotData> get_snapshots(const std::string & fault_code,
const std::string & topic_filter = "") const override;
Expand All @@ -242,6 +248,7 @@ class InMemoryFaultStorage : public FaultStorage {
std::vector<SnapshotData> snapshots_;
std::map<std::string, RosbagFileInfo> rosbag_files_; ///< fault_code -> rosbag info
DebounceConfig config_;
size_t max_snapshots_per_fault_{0}; ///< 0 = unlimited
};

} // namespace ros2_medkit_fault_manager
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class SqliteFaultStorage : public FaultStorage {

size_t check_time_based_confirmation(const rclcpp::Time & current_time) override;

void set_max_snapshots_per_fault(size_t max_count) override;

void store_snapshot(const SnapshotData & snapshot) override;
std::vector<SnapshotData> get_snapshots(const std::string & fault_code,
const std::string & topic_filter = "") const override;
Expand Down Expand Up @@ -92,6 +94,7 @@ class SqliteFaultStorage : public FaultStorage {
sqlite3 * db_{nullptr};
mutable std::mutex mutex_;
DebounceConfig config_;
size_t max_snapshots_per_fault_{0}; ///< 0 = unlimited
};

} // namespace ros2_medkit_fault_manager
72 changes: 58 additions & 14 deletions src/ros2_medkit_fault_manager/src/fault_manager_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,27 @@ FaultManagerNode::FaultManagerNode(const rclcpp::NodeOptions & options) : Node("
auto_confirm_after_sec_ = 0.0;
}

// Capture cooldown parameters (gates both snapshot and rosbag capture)
snapshot_recapture_cooldown_sec_ = declare_parameter<double>("snapshots.recapture_cooldown_sec", 60.0);
if (snapshot_recapture_cooldown_sec_ < 0.0) {
RCLCPP_WARN(get_logger(), "snapshots.recapture_cooldown_sec should be >= 0, got %.2f. Disabling.",
snapshot_recapture_cooldown_sec_);
snapshot_recapture_cooldown_sec_ = 0.0;
}
int max_snapshots = declare_parameter<int>("snapshots.max_per_fault", 10);
if (max_snapshots < 0) {
RCLCPP_WARN(get_logger(), "snapshots.max_per_fault should be >= 0, got %d. Disabling limit.", max_snapshots);
max_snapshots = 0;
}

// Create storage backend
storage_ = create_storage();

// Apply snapshot limit to storage
if (max_snapshots > 0) {
storage_->set_max_snapshots_per_fault(static_cast<size_t>(max_snapshots));
}

// Create event publisher for SSE streaming
event_publisher_ = create_publisher<ros2_medkit_msgs::msg::FaultEvent>("~/events", rclcpp::QoS(100).reliable());

Expand Down Expand Up @@ -388,23 +406,40 @@ void FaultManagerNode::handle_report_fault(
// from being processed during capture. SnapshotCapture::capture_topic_on_demand
// uses a local callback group + local executor, so it's safe from a separate thread.
if (just_confirmed && (snapshot_capture_ || rosbag_capture_)) {
std::string fault_code = request->fault_code;
auto snapshot_cap = snapshot_capture_;
auto rosbag_cap = rosbag_capture_;
auto logger = get_logger();

std::thread capture_thread([snapshot_cap, rosbag_cap, fault_code, logger]() {
if (snapshot_cap) {
snapshot_cap->capture(fault_code);
// Check recapture cooldown - skip if captured recently for this fault
bool should_capture = true;
if (snapshot_recapture_cooldown_sec_ > 0.0) {
std::lock_guard<std::mutex> lock(last_capture_mutex_);
auto it = last_capture_times_.find(request->fault_code);
if (it != last_capture_times_.end()) {
auto elapsed = std::chrono::steady_clock::now() - it->second;
if (elapsed < std::chrono::duration<double>(snapshot_recapture_cooldown_sec_)) {
should_capture = false;
RCLCPP_DEBUG(get_logger(), "Skipping capture for '%s' - cooldown active", request->fault_code.c_str());
}
}
if (rosbag_cap) {
rosbag_cap->on_fault_confirmed(fault_code);
if (should_capture) {
last_capture_times_[request->fault_code] = std::chrono::steady_clock::now();
}
});
{
std::lock_guard<std::mutex> ct_lock(capture_threads_mutex_);
capture_threads_.push_back(std::move(capture_thread));
}

if (should_capture) {
std::string fault_code = request->fault_code;
auto snapshot_cap = snapshot_capture_;
auto rosbag_cap = rosbag_capture_;
std::thread capture_thread([snapshot_cap, rosbag_cap, fault_code]() {
if (snapshot_cap) {
snapshot_cap->capture(fault_code);
}
if (rosbag_cap) {
rosbag_cap->on_fault_confirmed(fault_code);
}
});
{
std::lock_guard<std::mutex> ct_lock(capture_threads_mutex_);
capture_threads_.push_back(std::move(capture_thread));
}
} // if (should_capture)
}

// Handle PREFAILED state for lazy_start rosbag capture
Expand Down Expand Up @@ -506,6 +541,15 @@ void FaultManagerNode::handle_clear_fault(

response->success = cleared;
if (cleared) {
// Evict cooldown tracking for cleared fault and auto-cleared symptoms
{
std::lock_guard<std::mutex> lock(last_capture_mutex_);
last_capture_times_.erase(request->fault_code);
for (const auto & symptom_code : auto_cleared_codes) {
last_capture_times_.erase(symptom_code);
}
}

// Auto-clear correlated symptoms
for (const auto & symptom_code : auto_cleared_codes) {
storage_->clear_fault(symptom_code);
Expand Down
17 changes: 17 additions & 0 deletions src/ros2_medkit_fault_manager/src/fault_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,25 @@ size_t InMemoryFaultStorage::check_time_based_confirmation(const rclcpp::Time &
return confirmed_count;
}

void InMemoryFaultStorage::set_max_snapshots_per_fault(size_t max_count) {
std::lock_guard<std::mutex> lock(mutex_);
max_snapshots_per_fault_ = max_count;
}

void InMemoryFaultStorage::store_snapshot(const SnapshotData & snapshot) {
std::lock_guard<std::mutex> lock(mutex_);
if (max_snapshots_per_fault_ > 0) {
size_t count = 0;
for (const auto & s : snapshots_) {
if (s.fault_code == snapshot.fault_code) {
++count;
}
}
if (count >= max_snapshots_per_fault_) {
// Silent rejection: storage layer has no logger. Callers should log if needed.
return; // Reject new - keep first N inserted snapshots
}
}
snapshots_.push_back(snapshot);
}
Comment on lines 298 to 313
Copy link

Copilot AI Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InMemoryFaultStorage::store_snapshot() now scans the entire snapshots_ vector to count per-fault snapshots on every insert. This is O(N) per snapshot and can become a bottleneck even with a small per-fault limit if many faults are present. Consider tracking per-fault snapshot counts in a map (and decrementing on clear/delete) to keep this check O(1).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acceptable with max_per_fault limit (default 10). Total snapshots bounded by num_faults * 10.


Expand Down
16 changes: 16 additions & 0 deletions src/ros2_medkit_fault_manager/src/sqlite_fault_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -690,9 +690,25 @@ size_t SqliteFaultStorage::check_time_based_confirmation(const rclcpp::Time & cu
return static_cast<size_t>(sqlite3_changes(db_));
}

void SqliteFaultStorage::set_max_snapshots_per_fault(size_t max_count) {
std::lock_guard<std::mutex> lock(mutex_);
max_snapshots_per_fault_ = max_count;
}

void SqliteFaultStorage::store_snapshot(const SnapshotData & snapshot) {
std::lock_guard<std::mutex> lock(mutex_);

// Check snapshot limit per fault (reject-new strategy: keep earliest)
if (max_snapshots_per_fault_ > 0) {
SqliteStatement count_stmt(db_, "SELECT COUNT(*) FROM snapshots WHERE fault_code = ?");
count_stmt.bind_text(1, snapshot.fault_code);
if (count_stmt.step() == SQLITE_ROW &&
count_stmt.column_int64(0) >= static_cast<int64_t>(max_snapshots_per_fault_)) {
// Silent rejection: storage layer has no logger. Callers should log if needed.
return; // Reject new - keep first N inserted snapshots
}
}

SqliteStatement stmt(db_,
"INSERT INTO snapshots (fault_code, topic, message_type, data, captured_at_ns) "
"VALUES (?, ?, ?, ?, ?)");
Expand Down
Loading
Loading