diff --git a/platformio.ini b/platformio.ini index a9b485e..437fc35 100644 --- a/platformio.ini +++ b/platformio.ini @@ -45,6 +45,12 @@ build_flags = -DRNS_ENABLE_REMOTE_PROVISIONING -DUSTORE_ENABLE_LOG -DUSTORE_USE_UNIVERSALFS + ; Divergent features + ;-DRNS_SAME_INTERFACE_PATH_REQUESTS=0 + ;-DRNS_REJECT_BLACKHOLED_ANNOUNCE=0 + ;-DRNS_BLOCK_UNRESPONSIVE_ANNOUNCE=0 + ;-DRNS_NEIGHBOR_PROBING=0 + -DRNS_NEIGHBOR_PATH_REQUEST=1 lib_deps = ArduinoJson@^7.4.2 MsgPack@^0.4.2 diff --git a/src/microReticulum/Packet.cpp b/src/microReticulum/Packet.cpp index ab68562..ebfce55 100644 --- a/src/microReticulum/Packet.cpp +++ b/src/microReticulum/Packet.cpp @@ -911,7 +911,19 @@ bool PacketReceipt::validate_link_proof(const Bytes& proof, const Link& link, co //z _object->_proof_packet = proof_packet; //z link.last_proof(_object->_concluded_at); - if (_object->_callbacks._delivery) { + // Prefer std::function handler over legacy + // function-pointer callback so capture-bearing handlers + // (e.g. neighbor-probe outcomes) are invoked first. + if (_object->_callbacks._delivery_fn) { + try { + _object->_callbacks._delivery_fn(*this); + } + catch (const std::exception& e) { + ERRORF("An error occurred while evaluating external delivery handler for %s", link.toString().c_str()); + ERRORF("The contained exception was: %s", e.what()); + } + } + else if (_object->_callbacks._delivery) { try { _object->_callbacks._delivery(*this); } @@ -973,7 +985,17 @@ bool PacketReceipt::validate_proof(const Bytes& proof, const Packet& proof_packe _object->_concluded_at = OS::time(); //z _object->_proof_packet = proof_packet; - if (_object->_callbacks._delivery) { + // Prefer std::function handler over legacy + // function-pointer callback. + if (_object->_callbacks._delivery_fn) { + try { + _object->_callbacks._delivery_fn(*this); + } + catch (const std::exception& e) { + ERRORF("Error while executing proof validated handler. The contained exception was: %s", e.what()); + } + } + else if (_object->_callbacks._delivery) { try { _object->_callbacks._delivery(*this); } @@ -1004,7 +1026,17 @@ bool PacketReceipt::validate_proof(const Bytes& proof, const Packet& proof_packe _object->_concluded_at = OS::time(); //z _object->_proof_packet = proof_packet; - if (_object->_callbacks._delivery) { + // Prefer std::function handler over legacy + // function-pointer callback. + if (_object->_callbacks._delivery_fn) { + try { + _object->_callbacks._delivery_fn(*this); + } + catch (const std::exception& e) { + ERRORF("Error while executing proof validated handler. The contained exception was: %s", e.what()); + } + } + else if (_object->_callbacks._delivery) { try { _object->_callbacks._delivery(*this); } @@ -1036,10 +1068,27 @@ void PacketReceipt::check_timeout() { _object->_concluded_at = Utilities::OS::time(); - if (_object->_callbacks._timeout) { - //z thread = threading.Thread(target=self.callbacks.timeout, args=(self,)) - //z thread.daemon = True - //z thread.start(); + // Pre-existing latent gap: the Python reference dispatches the + // timeout callback from a background thread; the C++ port has no + // thread runtime here, so the dispatch was left as a //z stub + // and never fired. Invoke synchronously instead. + // Prefer std::function handler over legacy + // function-pointer callback so capture-bearing handlers fire. + if (_object->_callbacks._timeout_fn) { + try { + _object->_callbacks._timeout_fn(*this); + } + catch (const std::exception& e) { + ERRORF("Error while executing timeout handler. The contained exception was: %s", e.what()); + } + } + else if (_object->_callbacks._timeout) { + try { + _object->_callbacks._timeout(*this); + } + catch (const std::exception& e) { + ERRORF("Error while executing timeout callback. The contained exception was: %s", e.what()); + } } } } diff --git a/src/microReticulum/Packet.h b/src/microReticulum/Packet.h index 612b579..cf4b4da 100644 --- a/src/microReticulum/Packet.h +++ b/src/microReticulum/Packet.h @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -56,9 +57,20 @@ namespace RNS { public: using delivery = void(*)(const PacketReceipt& packet_receipt); using timeout = void(*)(const PacketReceipt& packet_receipt); + // New std::function-based handler variants added to let + // neighbor-probe outcome callbacks capture neighbor_hash by + // value. The Python reference already supports captured state + // via closures; the C++ port previously exposed only plain + // function pointers. Function-pointer setters above remain + // (marked deprecated) for source compatibility with existing + // out-of-tree firmware. + using delivery_handler = std::function; + using timeout_handler = std::function; public: delivery _delivery = nullptr; timeout _timeout = nullptr; + delivery_handler _delivery_fn; + timeout_handler _timeout_fn; friend class PacketReceipt; }; @@ -97,26 +109,43 @@ namespace RNS { // :param timeout: The timeout in seconds. inline void set_timeout(int16_t timeout) { assert(_object); _object->_timeout = timeout; } + // Deprecated function-pointer setter retained for source compat + // with existing firmware; new code should use set_delivery_handler + // which accepts a std::function (capture-friendly). /* Sets a function that gets called if a successfull delivery has been proven. :param callback: A *callable* with the signature *callback(packet_receipt)* */ + RNS_DEPRECATED("use set_delivery_handler (std::function)") inline void set_delivery_callback(Callbacks::delivery callback) { assert(_object); _object->_callbacks._delivery = callback; } - /* Sets a function that gets called if the delivery times out. :param callback: A *callable* with the signature *callback(packet_receipt)* */ + RNS_DEPRECATED("use set_timeout_handler (std::function)") inline void set_timeout_callback(Callbacks::timeout callback) { assert(_object); _object->_callbacks._timeout = callback; } + // New std::function-based setters accept callables with + // captured state (e.g. lambdas closing over neighbor_hash). The + // dispatcher in Packet.cpp prefers the handler over the legacy + // function-pointer if both are set. + inline void set_delivery_handler(Callbacks::delivery_handler handler) { + assert(_object); + _object->_callbacks._delivery_fn = std::move(handler); + } + inline void set_timeout_handler(Callbacks::timeout_handler handler) { + assert(_object); + _object->_callbacks._timeout_fn = std::move(handler); + } + // getters inline const Bytes& hash() const { assert(_object); return _object->_hash; } inline Type::PacketReceipt::Status status() const { assert(_object); return _object->_status; } diff --git a/src/microReticulum/Provisioning/BuiltinNamespaces.cpp b/src/microReticulum/Provisioning/BuiltinNamespaces.cpp index 0a3c67b..b34b3d3 100644 --- a/src/microReticulum/Provisioning/BuiltinNamespaces.cpp +++ b/src/microReticulum/Provisioning/BuiltinNamespaces.cpp @@ -182,6 +182,15 @@ namespace RNS { namespace Provisioning { .metric_int("Paths Added", Ns::Info::Metrics::Field::PathsAdded, []() { return RNS::Transport::paths_added(); }) .metric_int("Paths Updated", Ns::Info::Metrics::Field::PathsUpdated, []() { return RNS::Transport::paths_updated(); }) .metric_int("Paths Failed", Ns::Info::Metrics::Field::PathsFailed, []() { return RNS::Transport::paths_failed(); }) + .metric_int("Paths Unresponsive", Ns::Info::Metrics::Field::PathsUnresponsive, []() { return RNS::Transport::paths_responsive(); }) + .metric_int("Paths Responsive", Ns::Info::Metrics::Field::PathsResponsive, []() { return RNS::Transport::paths_unresponsive(); }) + .metric_int("Paths Unknown", Ns::Info::Metrics::Field::PathsUnknown, []() { return RNS::Transport::paths_unknown(); }) +#if RNS_NEIGHBOR_PROBING + .metric_int("Probes Received", Ns::Info::Metrics::Field::ProbesReceived, []() { return RNS::Transport::probes_received(); }) + .metric_int("Probes Sent", Ns::Info::Metrics::Field::ProbesSent, []() { return RNS::Transport::probes_sent(); }) + .metric_int("Probes Skipped", Ns::Info::Metrics::Field::ProbesSkipped, []() { return RNS::Transport::probes_skipped(); }) + .metric_int("Probes Failed", Ns::Info::Metrics::Field::ProbesFailed, []() { return RNS::Transport::probes_failed(); }) +#endif .end() .end(); } diff --git a/src/microReticulum/Provisioning/Ids.h b/src/microReticulum/Provisioning/Ids.h index ebe4625..83e3709 100644 --- a/src/microReticulum/Provisioning/Ids.h +++ b/src/microReticulum/Provisioning/Ids.h @@ -116,6 +116,13 @@ namespace RNS { namespace Provisioning { namespace Ns { constexpr fid_t PathsAdded = 7; constexpr fid_t PathsUpdated = 8; constexpr fid_t PathsFailed = 9; + constexpr fid_t PathsUnresponsive = 10; + constexpr fid_t PathsResponsive = 11; + constexpr fid_t PathsUnknown = 12; + constexpr fid_t ProbesReceived = 13; + constexpr fid_t ProbesSent = 14; + constexpr fid_t ProbesSkipped = 15; + constexpr fid_t ProbesFailed = 16; } } } diff --git a/src/microReticulum/Reticulum.cpp b/src/microReticulum/Reticulum.cpp index e9a9a6b..59f1ce9 100644 --- a/src/microReticulum/Reticulum.cpp +++ b/src/microReticulum/Reticulum.cpp @@ -48,6 +48,11 @@ using namespace RNS::Utilities; /*static*/ bool Reticulum::__use_implicit_proof = true; /*static*/ bool Reticulum::__allow_probes = false; /*static*/ bool Reticulum::__publish_blackhole_enabled = false; +#if RNS_NEIGHBOR_PROBING +// DIVERGENCE: passive neighbor-probing defaults — feature on, path-request +// fallback off. Effective only when transport_enabled() is also true. +/*static*/ bool Reticulum::__neighbor_probing_enabled = true; +#endif /*static*/ bool Reticulum::panic_on_interface_error = false; /*static*/ uint16_t Reticulum::_persist_interval = PERSIST_INTERVAL; diff --git a/src/microReticulum/Reticulum.h b/src/microReticulum/Reticulum.h index 8c94acf..0ef1436 100644 --- a/src/microReticulum/Reticulum.h +++ b/src/microReticulum/Reticulum.h @@ -58,6 +58,14 @@ namespace RNS { static bool __use_implicit_proof; static bool __allow_probes; static bool __publish_blackhole_enabled; +#if RNS_NEIGHBOR_PROBING + // DIVERGENCE: runtime toggles for passive neighbor-liveness + // inference and its optional path-request fallback. The Python + // reference plan parses these from the [reticulum] INI block; + // microReticulum has no INI parser, so they're exposed as static + // accessors only. + static bool __neighbor_probing_enabled; +#endif static bool panic_on_interface_error; static uint16_t _persist_interval; @@ -169,6 +177,15 @@ namespace RNS { inline static bool probe_destination_enabled() { return __allow_probes; } inline static void probe_destination_enabled(bool allow_probes) { __allow_probes = allow_probes; } +#if RNS_NEIGHBOR_PROBING + // DIVERGENCE: master toggle for passive neighbor-liveness + // inference. Effective only when transport_enabled() is also + // true; the local side typically also wants + // probe_destination_enabled() so peers can probe it reciprocally. + inline static bool neighbor_probing_enabled() { return __neighbor_probing_enabled; } + inline static void neighbor_probing_enabled(bool v) { __neighbor_probing_enabled = v; } +#endif + // getters/setters inline bool is_connected_to_shared_instance() const { assert(_object); return _object->_is_connected_to_shared_instance; } inline static uint16_t persist_interval() { return _persist_interval; } diff --git a/src/microReticulum/Transport.cpp b/src/microReticulum/Transport.cpp index c9eb06c..7426d79 100644 --- a/src/microReticulum/Transport.cpp +++ b/src/microReticulum/Transport.cpp @@ -99,6 +99,10 @@ using namespace RNS::Persistence; /*static*/ Transport::AnnounceTable Transport::_announce_table; /*static*/ PathTable Transport::_path_table; /*static*/ std::map Transport::_reverse_table; +#if RNS_NEIGHBOR_PROBING +// DIVERGENCE: in-memory neighbor stats for passive liveness inference. +/*static*/ Transport::NeighborStatsTable Transport::_neighbor_stats; +#endif /*static*/ std::map Transport::_link_table; /*static*/ Transport::AnnounceTable Transport::_held_announces; /*static*/ std::set Transport::_announce_handlers; @@ -197,6 +201,15 @@ using namespace RNS::Persistence; /*static*/ uint32_t Transport::_paths_added = 0; /*static*/ uint32_t Transport::_paths_updated = 0; /*static*/ uint32_t Transport::_paths_failed = 0; +/*static*/ uint32_t Transport::_paths_responsive = 0; +/*static*/ uint32_t Transport::_paths_unresponsive = 0; +/*static*/ uint32_t Transport::_paths_unknown = 0; +#if RNS_NEIGHBOR_PROBING +/*static*/ uint32_t Transport::_probes_received = 0; +/*static*/ uint32_t Transport::_probes_sent = 0; +/*static*/ uint32_t Transport::_probes_skipped = 0; +/*static*/ uint32_t Transport::_probes_failed = 0; +#endif /*static*/ size_t Transport::_last_memory = 0; /*static*/ size_t Transport::_last_psram = 0; /*static*/ size_t Transport::_last_flash = 0; @@ -425,6 +438,7 @@ DestinationEntry empty_destination_entry; _probe_destination = {_identity, Type::Destination::IN, Type::Destination::SINGLE, APP_NAME, "probe"}; _probe_destination.accepts_links(false); _probe_destination.set_proof_strategy(Type::Destination::PROVE_ALL); + _probe_destination.set_packet_callback(probe_request_handler); DEBUGF("Created probe responder destination %s", _probe_destination.hash().toHex().c_str()); //_probe_destination.announce(); _mgmt_destinations.insert(_probe_destination); @@ -896,6 +910,28 @@ DestinationEntry empty_destination_entry; _tables_last_culled = OS::time(); } +#if RNS_NEIGHBOR_PROBING + // DIVERGENCE: passive neighbor-liveness scan — runs every + // jobs() tick; per-neighbor rate limits inside the scan + // keep probe traffic bounded. Effective only when transport + // is enabled, neighbor probing is on, and we ourselves are + // reachable as a probe responder so peers can verify us + // reciprocally. + // CBA TODO Determine if we actually need to gate on probe_destination_enabled() here + if (Reticulum::transport_enabled() + && Reticulum::neighbor_probing_enabled() + && Reticulum::probe_destination_enabled()) + { + try { + //TRACE("Neighbor probe: Scanning neighbor stats..."); + _scan_neighbor_stats(); + } + catch (const std::exception& e) { + ERRORF("jobs: failed during neighbor stats scan: %s", e.what()); + } + } +#endif + // Check expired blackhole entries if (OS::time() > (_blackhole_last_checked + _blackhole_check_interval)) { try { @@ -1136,6 +1172,11 @@ DestinationEntry empty_destination_entry; sent = transmit(outbound_interface, new_raw); //_path_table[packet.destination_hash][0] = time.time() destination_entry._timestamp = OS::time(); +#if RNS_NEIGHBOR_PROBING + // DIVERGENCE: count packets forwarded through this + // neighbor for passive liveness inference. + if (sent) _record_neighbor_packet(destination_entry._received_from); +#endif } } @@ -1167,6 +1208,11 @@ DestinationEntry empty_destination_entry; sent = transmit(outbound_interface, new_raw); //Transport.destination_table[packet.destination_hash][0] = time.time() destination_entry._timestamp = OS::time(); +#if RNS_NEIGHBOR_PROBING + // DIVERGENCE: count packets forwarded through this + // neighbor for passive liveness inference. + if (sent) _record_neighbor_packet(destination_entry._received_from); +#endif } } @@ -1176,6 +1222,12 @@ DestinationEntry empty_destination_entry; else { TRACE("Transport::outbound: Sending packet over directly connected interface..."); sent = transmit(outbound_interface, packet.raw()); +#if RNS_NEIGHBOR_PROBING + // DIVERGENCE: count directly-delivered packets toward this + // neighbor; for hops==0 paths _received_from is the neighbor + // itself. + if (sent) _record_neighbor_packet(destination_entry._received_from); +#endif } } // If we don't have a known path for the destination, we'll @@ -1975,6 +2027,12 @@ DestinationEntry empty_destination_entry; packet.receiving_interface(), outbound_interface, OS::time() +#if RNS_NEIGHBOR_PROBING + // DIVERGENCE: record next_hop so a returning + // proof can be attributed to this neighbor for + // passive liveness inference. + , next_hop +#endif ); // CBA ACCUMULATES _reverse_table.insert({packet.getTruncatedHash(), reverse_entry}); @@ -2848,6 +2906,11 @@ DestinationEntry empty_destination_entry; //p new_raw += packet.raw[2:] new_raw << packet.raw().mid(2); transmit(reverse_entry._receiving_interface, new_raw); +#if RNS_NEIGHBOR_PROBING + // DIVERGENCE: credit the forwarding neighbor with a + // returning proof for passive liveness inference. + _record_neighbor_proof(reverse_entry._next_hop); +#endif } else { DEBUG("Proof received on wrong interface, not transporting it."); @@ -3282,6 +3345,14 @@ Deregisters an announce handler. } return true; */ +#if RNS_NEIGHBOR_PROBING + // DIVERGENCE: drop any neighbor_stats entry keyed by this destination. + // For hops==0 paths the destination IS the neighbor; for hops>0 the + // erase is a no-op since the destination is not a direct neighbor. + if (_neighbor_stats.erase(destination_hash) > 0) { + DEBUGF("Neighbor probe: dropped stats for %s (path removed)", destination_hash.toHex().c_str()); + } +#endif // CBA microStore return _new_path_table.remove(destination_hash.collection()); } @@ -3470,6 +3541,7 @@ Deregisters an announce handler. DestinationEntry destination_entry; if (_new_path_table.get(destination_hash, destination_entry) && destination_entry) { _path_states[destination_hash] = STATE_UNRESPONSIVE; + ++_paths_unresponsive; return true; } return false; @@ -3479,6 +3551,7 @@ Deregisters an announce handler. DestinationEntry destination_entry; if (_new_path_table.get(destination_hash, destination_entry) && destination_entry) { _path_states[destination_hash] = STATE_RESPONSIVE; + ++_paths_responsive; return true; } return false; @@ -3488,6 +3561,7 @@ Deregisters an announce handler. DestinationEntry destination_entry; if (_new_path_table.get(destination_hash, destination_entry) && destination_entry) { _path_states[destination_hash] = STATE_UNKNOWN; + ++_paths_unknown; return true; } return false; @@ -4179,6 +4253,11 @@ static void remote_path_pack_rate_entry(MsgPack::Packer& p, } } +/*static*/ void Transport::probe_request_handler(const Bytes& data, const Packet& packet) { + ++_probes_received; + TRACE("Transport::probe_request_handler"); +} + /*static*/ void Transport::path_request(const Bytes& destination_hash, bool is_from_local_client, const Interface& attached_interface, const Bytes& requestor_transport_id /*= {}*/, const Bytes& tag /*= {}*/) { TRACE("Transport::path_request"); bool should_search_for_unknown = false; @@ -4400,7 +4479,7 @@ TRACEF("announce_packet str: %s", announce_packet.toString().c_str()); // path-finding over LoRa mesh if (true) { #else - //if (interface != attached_interface) { + if (interface != attached_interface) { #endif TRACEF("Transport::path_request: requesting path on interface %s", interface.toString().c_str()); // Use the previously extracted tag from this path request @@ -5553,6 +5632,14 @@ TRACEF("Transport::write_path_table: buffer size %lu bytes", Persistence::_buffe OS::remove_file(packet_cache_path); } } +#endif +#if RNS_NEIGHBOR_PROBING + // DIVERGENCE: drop any neighbor_stats entry tied to this + // culled destination. For hops==0 paths the destination + // hash equals the neighbor; for others this is a no-op. + if (_neighbor_stats.erase(destination_hash) > 0) { + DEBUGF("Neighbor probe: dropped stats for %s (path-table cull)", destination_hash.toHex().c_str()); + } #endif if (_path_table.erase(destination_hash) < 1) { WARNINGF("Failed to remove destination %s from path table", destination_hash.toHex().c_str()); @@ -5644,6 +5731,260 @@ TRACEF("Transport::write_path_table: buffer size %lu bytes", Persistence::_buffe return count; } +#if RNS_NEIGHBOR_PROBING +// DIVERGENCE: stats helpers for passive neighbor-liveness inference. +// _record_neighbor_packet is called from the outbound() transmit sites. +// _record_neighbor_proof is called when a returning proof is forwarded +// back through the reverse_table, attributing the proof to the +// neighbor that originally forwarded the matching packet. Empty +// next_hop means we don't know which neighbor to credit; silently +// ignore. +/*static*/ void Transport::_record_neighbor_packet(const Bytes& next_hop) { + if (next_hop.empty()) return; + auto it = _neighbor_stats.find(next_hop); + const bool is_new = (it == _neighbor_stats.end()); + auto& stat = is_new ? _neighbor_stats[next_hop] : it->second; + stat.packets_forwarded += 1; + stat.last_packet_at = OS::time(); + if (is_new) { + DEBUGF("Neighbor probe: tracking new direct neighbor %s", next_hop.toHex().c_str()); + } + TRACEF("Neighbor probe: packet_forwarded[%s] -> %u (proofs=%u)", + next_hop.toHex().c_str(), stat.packets_forwarded, stat.proofs_received); +} + +/*static*/ void Transport::_record_neighbor_proof(const Bytes& next_hop) { + if (next_hop.empty()) return; + auto it = _neighbor_stats.find(next_hop); + if (it == _neighbor_stats.end()) return; + it->second.proofs_received += 1; + it->second.last_proof_at = OS::time(); + TRACEF("Neighbor probe: proof_received[%s] -> %u (forwarded=%u)", + next_hop.toHex().c_str(), it->second.proofs_received, it->second.packets_forwarded); +} + +// DIVERGENCE: per-jobs-tick scan of neighbor stats. Builds a snapshot +// list of probe candidates to avoid any risk of iterator invalidation +// if a probe dispatch (and its synchronous side-effects) touches the +// stats map. +/*static*/ void Transport::_scan_neighbor_stats() { + if (_neighbor_stats.empty()) return; + + const double now = OS::time(); + // Grace seconds: a recent proof received slightly before the most + // recent forwarded packet still indicates a healthy neighbor. + constexpr double GRACE = 5.0; + + std::vector to_probe; + for (auto& kv : _neighbor_stats) { + const Bytes& neighbor_hash = kv.first; + NeighborStat& stat = kv.second; + + // Sliding-window reset approximation: if we've been idle past + // twice the suspicion window, drop accumulated counters so the + // next traffic burst evaluates suspicion from a clean baseline + // instead of carrying stale history forward indefinitely. + if ((now - stat.last_packet_at) > 2.0 * Type::Transport::NEIGHBOR_SUSPICION_WINDOW + && (stat.packets_forwarded != 0 || stat.proofs_received != 0)) + { + VERBOSEF("Neighbor probe: resetting stale stats for idle neighbor %s (idle=%.0fs, prior fwd=%u proofs=%u)", + neighbor_hash.toHex().c_str(), + now - stat.last_packet_at, + stat.packets_forwarded, + stat.proofs_received); + stat.packets_forwarded = 0; + stat.proofs_received = 0; + } + + // idle — nothing has been forwarded recently + if ((now - stat.last_packet_at) > Type::Transport::NEIGHBOR_SUSPICION_WINDOW) { + TRACEF("Neighbor probe: skip %s — idle", neighbor_hash.toHex().c_str()); + continue; + } + // insufficient activity — avoid trigger-happy probing on light traffic + if (stat.packets_forwarded < Type::Transport::NEIGHBOR_SUSPICION_MIN_PKTS) { + TRACEF("Neighbor probe: skip %s — only %u packets forwarded (need %u)", + neighbor_hash.toHex().c_str(), + stat.packets_forwarded, + (unsigned)Type::Transport::NEIGHBOR_SUSPICION_MIN_PKTS); + continue; + } + // recent proof returned — neighbor looks healthy + if (stat.last_proof_at > stat.last_packet_at - GRACE) { + TRACEF("Neighbor probe: skip %s — recent proof (fwd=%u proofs=%u)", + neighbor_hash.toHex().c_str(), + stat.packets_forwarded, + stat.proofs_received); + continue; + } + // already probing + if (stat.probe_pending) { + TRACEF("Neighbor probe: skip %s — probe already pending", neighbor_hash.toHex().c_str()); + continue; + } + // per-neighbor probe rate limit + if ((now - stat.last_probe_at) < Type::Transport::NEIGHBOR_PROBE_RATELIMIT) { + TRACEF("Neighbor probe: skip %s — rate-limited (%.0fs since last probe)", + neighbor_hash.toHex().c_str(), + now - stat.last_probe_at); + continue; + } + + INFOF("Neighbor probe: classifying %s as suspicious (forwarded=%u proofs=%u age=%.0fs)", + neighbor_hash.toHex().c_str(), + stat.packets_forwarded, + stat.proofs_received, + now - stat.last_packet_at); + to_probe.push_back(neighbor_hash); + } + + for (const Bytes& neighbor_hash : to_probe) { + _dispatch_neighbor_probe(neighbor_hash); + } +} + +// DIVERGENCE: targeted probe to a suspect neighbor's existing +// probe_destination. The receiver side already exists when peers run +// with probe_destination_enabled() — we just send a Packet and listen +// for its proof via std::function handlers that capture neighbor_hash. +/*static*/ void Transport::_dispatch_neighbor_probe(const Bytes& neighbor_hash) { + Identity neighbor_identity = Identity::recall(neighbor_hash); + if (!neighbor_identity) { + ++_probes_skipped; + DEBUGF("Neighbor probe: skipping %s — identity not yet known (announce not received?)", + neighbor_hash.toHex().c_str()); + return; + } + + Bytes probe_dest_hash = Destination::hash(neighbor_identity, Type::Transport::APP_NAME, "probe"); + if (!_new_path_table.exists(probe_dest_hash)) { + ++_probes_skipped; + DEBUGF("Neighbor probe: skipping %s — no path to its probe destination %s (peer may have probe responder disabled)", + neighbor_hash.toHex().c_str(), probe_dest_hash.toHex().c_str()); +#if RNS_NEIGHBOR_PATH_REQUEST + INFOF("Neighbor probe: requesting path for probe destination %s (fallback enabled)", + probe_dest_hash.toHex().c_str()); + request_path(probe_dest_hash); +#endif + return; + } + + Destination probe_dest(neighbor_identity, + Type::Destination::OUT, + Type::Destination::SINGLE, + Type::Transport::APP_NAME, + "probe"); + Bytes payload = Cryptography::random(Type::Transport::NEIGHBOR_PROBE_PAYLOAD_SIZE); + Packet probe(probe_dest, payload); + probe.send(); + PacketReceipt receipt = probe.receipt(); + receipt.set_timeout(Type::Transport::NEIGHBOR_PROBE_TIMEOUT); + + // Capture neighbor_hash by value so the outcome handlers know which + // stats entry to update when fired. + Bytes nh = neighbor_hash; + receipt.set_delivery_handler([nh](const PacketReceipt& r) { + Transport::_neighbor_probe_delivered(r, nh); + }); + receipt.set_timeout_handler([nh](const PacketReceipt& r) { + Transport::_neighbor_probe_timed_out(r, nh); + }); + + auto& stat = _neighbor_stats[neighbor_hash]; + stat.probe_pending = true; + stat.pending_probe_hash = receipt.truncated_hash(); + stat.last_probe_at = OS::time(); + + ++_probes_sent; + INFOF("Neighbor probe: sent symmetry probe to %s (probe dest %s, %u-byte payload, timeout %us)", + neighbor_hash.toHex().c_str(), + probe_dest_hash.toHex().c_str(), + (unsigned)Type::Transport::NEIGHBOR_PROBE_PAYLOAD_SIZE, + (unsigned)Type::Transport::NEIGHBOR_PROBE_TIMEOUT); +} + +// DIVERGENCE: probe-delivered outcome — neighbor confirmed reciprocally +// reachable. Reset the suspicion window and mark every path going +// through this neighbor as responsive so any prior demotion is undone. +/*static*/ void Transport::_neighbor_probe_delivered(const PacketReceipt& /*receipt*/, const Bytes& neighbor_hash) { + auto it = _neighbor_stats.find(neighbor_hash); + if (it != _neighbor_stats.end()) { + it->second.probe_pending = false; + it->second.pending_probe_hash = Bytes(); + it->second.packets_forwarded = 0; + it->second.proofs_received = 0; + } + + uint32_t marked = 0; + uint32_t promoted = 0; // transitions from UNRESPONSIVE -> RESPONSIVE + for (const auto& path : _new_path_table) { + if (path.value._received_from == neighbor_hash) { + // Peek at the current state to detect actual transitions. + uint8_t prior = STATE_UNKNOWN; + auto sit = _path_states.find(path.key); + if (sit != _path_states.end()) prior = sit->second; + if (mark_path_responsive(path.key)) { + ++marked; + if (prior == STATE_UNRESPONSIVE) { + ++promoted; + VERBOSEF("Neighbor probe: path %s via %s promoted UNRESPONSIVE -> RESPONSIVE", + path.key.toHex().c_str(), neighbor_hash.toHex().c_str()); + } + else { + TRACEF("Neighbor probe: path %s via %s marked RESPONSIVE (prior state %u)", + path.key.toHex().c_str(), neighbor_hash.toHex().c_str(), (unsigned)prior); + } + } + } + } + if (promoted > 0) { + NOTICEF("Neighbor probe: %s passed symmetry probe; %u of %u paths promoted from UNRESPONSIVE", + neighbor_hash.toHex().c_str(), promoted, marked); + } + else { + INFOF("Neighbor probe: %s passed symmetry probe; %u paths confirmed RESPONSIVE", + neighbor_hash.toHex().c_str(), marked); + } +} + +// DIVERGENCE: probe-timed-out outcome — neighbor's receive side is +// likely down. Demote every path going through this neighbor; existing +// announce-replacement logic will swap them back in when (and if) a +// fresh announce arrives over a working route. +/*static*/ void Transport::_neighbor_probe_timed_out(const PacketReceipt& /*receipt*/, const Bytes& neighbor_hash) { + auto it = _neighbor_stats.find(neighbor_hash); + if (it != _neighbor_stats.end()) { + it->second.probe_pending = false; + it->second.pending_probe_hash = Bytes(); + } + + uint32_t marked = 0; + uint32_t demoted = 0; // transitions from non-UNRESPONSIVE -> UNRESPONSIVE + for (const auto& path : _new_path_table) { + if (path.value._received_from == neighbor_hash) { + uint8_t prior = STATE_UNKNOWN; + auto sit = _path_states.find(path.key); + if (sit != _path_states.end()) prior = sit->second; + if (mark_path_unresponsive(path.key)) { + ++marked; + if (prior != STATE_UNRESPONSIVE) { + ++demoted; + VERBOSEF("Neighbor probe: path %s via %s demoted %s -> UNRESPONSIVE", + path.key.toHex().c_str(), neighbor_hash.toHex().c_str(), + (prior == STATE_RESPONSIVE) ? "RESPONSIVE" : "UNKNOWN"); + } + else { + TRACEF("Neighbor probe: path %s via %s already UNRESPONSIVE", path.key.toHex().c_str(), neighbor_hash.toHex().c_str()); + } + } + } + } + ++_probes_failed; + NOTICEF("Neighbor probe: %s failed symmetry probe; %u of %u paths newly demoted to UNRESPONSIVE", + neighbor_hash.toHex().c_str(), demoted, marked); +} +#endif + /*static*/ uint16_t Transport::remove_links(const std::vector& hashes) { uint16_t count = 0; for (const auto& link_id : hashes) { diff --git a/src/microReticulum/Transport.h b/src/microReticulum/Transport.h index ded9a2d..44ff731 100644 --- a/src/microReticulum/Transport.h +++ b/src/microReticulum/Transport.h @@ -196,19 +196,60 @@ namespace RNS { // CBA TODO Analyze safety of using Inrerface references here class ReverseEntry { public: +#if RNS_NEIGHBOR_PROBING + // DIVERGENCE: extra _next_hop field added so returning proofs + // can be attributed to the forwarding neighbor for passive + // liveness inference. The Python reference plan extends its + // reverse_table list with IDX_RT_NEXT_HOP; the C++ port uses + // named members instead. Parameter defaults to {} so call + // sites that have not yet been updated keep working. + ReverseEntry(const Interface& receiving_interface, const Interface& outbound_interface, double timestamp, const Bytes& next_hop = {}) : + _receiving_interface(receiving_interface), + _outbound_interface(outbound_interface), + _timestamp(timestamp), + _next_hop(next_hop) + { + } +#else ReverseEntry(const Interface& receiving_interface, const Interface& outbound_interface, double timestamp) : _receiving_interface(receiving_interface), _outbound_interface(outbound_interface), _timestamp(timestamp) { } +#endif public: Interface _receiving_interface = {Type::NONE}; const Interface _outbound_interface = {Type::NONE}; double _timestamp = 0; +#if RNS_NEIGHBOR_PROBING + Bytes _next_hop; +#endif }; using ReverseTable = std::map; +#if RNS_NEIGHBOR_PROBING + // DIVERGENCE: per-direct-neighbor stats for passive liveness + // inference. Window-relative counters that get reset on + // successful probe completion or after extended idleness. The + // Python reference plan stores these as indexed lists in a dict; + // the C++ port uses a named-member struct. + struct NeighborStat { + uint32_t packets_forwarded = 0; + uint32_t proofs_received = 0; + double last_packet_at = 0; + double last_proof_at = 0; + double last_probe_at = 0; + bool probe_pending = false; + Bytes pending_probe_hash; // truncated hash of in-flight probe packet + }; + using NeighborStatsTable = std::map< + Bytes, NeighborStat, + std::less, + Utilities::Memory::ContainerAllocator> + >; +#endif + // CBA TODO Analyze safety of using Inrerface references here class PathRequestEntry { public: @@ -375,6 +416,7 @@ namespace RNS { #if defined(RNS_ENABLE_REMOTE_PROVISIONING) && defined(RNS_USE_PROVISIONING) static Bytes remote_provision_handler(const Bytes& path, const Bytes& data, const Bytes& request_id, const Bytes& link_id, const Identity& remote_identity, double requested_at); #endif + static void probe_request_handler(const Bytes& data, const Packet& packet); static void path_request_handler(const Bytes& data, const Packet& packet); static void path_request(const Bytes& destination_hash, bool is_from_local_client, const Interface& attached_interface, const Bytes& requestor_transport_id = {}, const Bytes& tag = {}); static bool from_local_client(const Packet& packet); @@ -401,6 +443,21 @@ namespace RNS { static uint16_t remove_discovery_path_requests(const std::vector& hashes); static uint16_t remove_tunnels(const std::vector& hashes); +#if RNS_NEIGHBOR_PROBING + // DIVERGENCE: per-neighbor stats and probe helpers for passive + // liveness inference. _record_* hooks update counters from + // existing transport paths; _scan_neighbor_stats runs from + // jobs(); _dispatch_neighbor_probe sends a probe to a neighbor's + // built-in probe destination; the *_probe_delivered/timed_out + // helpers are invoked by the receipt's std::function handlers. + static void _record_neighbor_packet(const Bytes& next_hop); + static void _record_neighbor_proof(const Bytes& next_hop); + static void _scan_neighbor_stats(); + static void _dispatch_neighbor_probe(const Bytes& neighbor_hash); + static void _neighbor_probe_delivered(const PacketReceipt& receipt, const Bytes& neighbor_hash); + static void _neighbor_probe_timed_out(const PacketReceipt& receipt, const Bytes& neighbor_hash); +#endif + static Destination find_destination_from_hash(const Bytes& destination_hash); static Packet find_announce_packet_from_hash(const Bytes& destination_hash); @@ -470,6 +527,11 @@ namespace RNS { inline static const AnnounceTable& announce_table() { return _announce_table; } inline static const AnnounceTable& held_announces() { return _held_announces; } inline static const ReverseTable& reverse_table() { return _reverse_table; } +#if RNS_NEIGHBOR_PROBING + // DIVERGENCE: read-only accessor for the per-neighbor liveness + // stats — useful for diagnostics and tests. + inline static const NeighborStatsTable& neighbor_stats() { return _neighbor_stats; } +#endif inline static const std::map& path_requests() { return _path_requests; } inline static const PathRequestTable& discovery_path_requests() { return _discovery_path_requests; } inline static const PathStateTable& path_states() { return _path_states; } @@ -494,6 +556,15 @@ namespace RNS { inline static uint32_t paths_added() { return _paths_added; } inline static uint32_t paths_updated() { return _paths_updated; } inline static uint32_t paths_failed() { return _paths_failed; } + inline static uint32_t paths_responsive() { return _paths_responsive; } + inline static uint32_t paths_unresponsive() { return _paths_unresponsive; } + inline static uint32_t paths_unknown() { return _paths_unknown; } +#if RNS_NEIGHBOR_PROBING + inline static uint32_t probes_received() { return _probes_received; } + inline static uint32_t probes_sent() { return _probes_sent; } + inline static uint32_t probes_skipped() { return _probes_skipped; } + inline static uint32_t probes_failed() { return _probes_failed; } +#endif private: // CBA MUST use references to interfaces here in order for virtul overrides for send/receive to work @@ -509,6 +580,11 @@ namespace RNS { static AnnounceTable _announce_table; // A table for storing announces currently waiting to be retransmitted static PathTable _path_table; // A lookup table containing the next hop to a given destination static ReverseTable _reverse_table; // A lookup table for storing packet hashes used to return proofs and replies +#if RNS_NEIGHBOR_PROBING + // DIVERGENCE: per-direct-neighbor counters for passive liveness + // inference. In-memory only; ephemeral state, not microStore-backed. + static NeighborStatsTable _neighbor_stats; +#endif static LinkTable _link_table; // A lookup table containing hops for links static AnnounceTable _held_announces; // A table containing temporarily held announce-table entries static TunnelTable _tunnels; // A table storing tunnels to other transport instances @@ -606,6 +682,15 @@ namespace RNS { static uint32_t _paths_added; static uint32_t _paths_updated; static uint32_t _paths_failed; + static uint32_t _paths_responsive; + static uint32_t _paths_unresponsive; + static uint32_t _paths_unknown; +#if RNS_NEIGHBOR_PROBING + static uint32_t _probes_received; + static uint32_t _probes_sent; + static uint32_t _probes_skipped; + static uint32_t _probes_failed; +#endif static size_t _last_memory; static size_t _last_psram; static size_t _last_flash; diff --git a/src/microReticulum/Type.h b/src/microReticulum/Type.h index d8776b2..d4c61b3 100644 --- a/src/microReticulum/Type.h +++ b/src/microReticulum/Type.h @@ -33,6 +33,23 @@ #endif #endif +// DIVERGENCE: Master feature flag for passive neighbor-liveness inference +// + targeted probe confirmation. The Python reference plan implements the +// same feature on the Transport side; the C++ port adds it ahead of (or +// alongside) that work. Default on; set -DRNS_NEIGHBOR_PROBING=0 in +// build_flags to compile the feature out entirely. +#ifndef RNS_NEIGHBOR_PROBING +#define RNS_NEIGHBOR_PROBING 1 +#endif + +// DIVERGENCE: Opt-in fallback that issues a path request when a +// suspect neighbor's probe destination isn't in the path table. +// Off by default.; set -DRNS_NEIGHBOR_PATH_REQUEST=1 in +// build_flags to enable the feature. +#ifndef RNS_NEIGHBOR_PATH_REQUEST +#define RNS_NEIGHBOR_PATH_REQUEST 0 +#endif + #ifndef RNS_QUEUED_ANNOUNCES_MAX #define RNS_QUEUED_ANNOUNCES_MAX 20 #endif @@ -493,6 +510,18 @@ namespace RNS { namespace Type { static const uint8_t PATH_REQUEST_RW = 2; // Path request random window static const uint8_t PATH_REQUEST_MI = 20; // Minimum interval in seconds for automated path requests +#if RNS_NEIGHBOR_PROBING + // DIVERGENCE: tunables for passive neighbor-liveness inference + // + targeted probe confirmation. The Python reference plan keeps + // these as module-level constants in Transport.py; the C++ port + // places them alongside the existing Transport timing constants. + static const uint16_t NEIGHBOR_SUSPICION_WINDOW = 60; // seconds in which we expect either no forwarding or some proof return + static const uint8_t NEIGHBOR_SUSPICION_MIN_PKTS = 5; // min forwarded packets before suspicion fires (avoid triggering on light traffic) + static const uint8_t NEIGHBOR_PROBE_RATELIMIT = 60; // min seconds between probes per neighbor + static const uint8_t NEIGHBOR_PROBE_TIMEOUT = 15; // seconds before a probe is considered failed + static const uint8_t NEIGHBOR_PROBE_PAYLOAD_SIZE = 16; // bytes of random payload in a probe +#endif + static const uint8_t MAX_QUEUED_DISCOVERY_PRS = RNS_QUEUED_DISCOVERY_PRS_MAX; // Max amount of queued discovery path requests static constexpr const float DISCOVERY_PR_TX_THROTTLE = 0.5; // Min interval in seconds between throttled discovery PR transmissions diff --git a/test/test_transport/test_transport.cpp b/test/test_transport/test_transport.cpp index c1cdfa2..9d95643 100644 --- a/test/test_transport/test_transport.cpp +++ b/test/test_transport/test_transport.cpp @@ -591,6 +591,39 @@ void test_incoming_announce_stress() { } +// ============================================================================ +// PacketReceipt std::function handler (capture-bearing) — Commit 1 +// ============================================================================ + +#if RNS_NEIGHBOR_PROBING +void test_receipt_timeout_handler_capture() { + initRNS(); + + bool timeout_fired = false; // local, captured by reference + + RNS::Identity remote_id(true); + RNS::Destination unreachable_dest(remote_id, RNS::Type::Destination::OUT, + RNS::Type::Destination::SINGLE, "test", "unreachable_handler"); + + RNS::Bytes payload("Timeout via std::function handler"); + RNS::Packet packet(unreachable_dest, payload); + packet.send(); + RNS::PacketReceipt receipt = packet.receipt(); + receipt.set_timeout(1); + receipt.set_timeout_handler([&timeout_fired](const RNS::PacketReceipt& r) { + timeout_fired = true; + }); + + for (int i = 0; i < 5; i++) { + RNS::Utilities::OS::sleep(0.5); + test_reticulum.loop(); + } + + TEST_ASSERT_TRUE_MESSAGE(timeout_fired, + "Captured-state timeout handler should have fired after timeout"); +} +#endif + // ============================================================================ // Test runner // ============================================================================ @@ -627,6 +660,10 @@ int runUnityTests(void) { RUN_TEST(test_incoming_announce_over_limit); //RUN_TEST(test_incoming_announce_stress); +#if RNS_NEIGHBOR_PROBING + RUN_TEST(test_receipt_timeout_handler_capture); +#endif + return UNITY_END(); }