diff --git a/CMakeLists.txt b/CMakeLists.txt index 9fc30c6..3f651ee 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,8 +20,10 @@ endif() option(RNS_BUILD_TESTS "Build Unity test suites under test/" ON) option(RNS_BUILD_EXAMPLES "Build native example applications" ON) option(RNS_BUILD_INTEROP "Build interop senders under test_interop/" ON) -option(RNS_USE_FS "Enable filesystem persistence" ON) -option(RNS_PERSIST_PATHS "Persist path table to storage" ON) +option(RNS_USE_FS "Enable filesystem persistence" ON) +option(RNS_PERSIST_PATHS "Persist path table to storage" ON) +option(RNS_PERSIST_KNOWN_DESTINATIONS "Persist known destinations to storage" ON) +option(RNS_PERSIST_HASHLIST "Persist packet hashlist to storage" ON) option(RNS_USE_PROVISIONING "Auto-start Provisioning subsystem from Reticulum::start()" ON) option(RNS_DEBUG_MEMORY "Enable memory/heap/metrics debug logging" OFF) option(RNS_SANITIZE "Build with AddressSanitizer + frame pointers" OFF) @@ -126,6 +128,12 @@ endif() if(RNS_PERSIST_PATHS) target_compile_definitions(microReticulum PUBLIC RNS_PERSIST_PATHS) endif() +if(RNS_PERSIST_KNOWN_DESTINATIONS) + target_compile_definitions(microReticulum PUBLIC RNS_PERSIST_KNOWN_DESTINATIONS) +endif() +if(RNS_PERSIST_HASHLIST) + target_compile_definitions(microReticulum PUBLIC RNS_PERSIST_HASHLIST) +endif() if(RNS_USE_PROVISIONING) target_compile_definitions(microReticulum PUBLIC RNS_USE_PROVISIONING) endif() diff --git a/README.md b/README.md index 0cda1cf..c01f5b9 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,9 @@ This API is dependent on the following external libraries: - `-DRNS_MEM_LOG` Used to enable logging of low-level memory operations for debug purposes - `-DRNS_USE_FS` Used to enable use of file system by RNS for persistence -- `-DRNS_PERSIST_PATHS` Used to enable persistence of RNS paths in file system (also requires `-DRNS_USE_FS`) +- `-DRNS_PERSIST_PATHS=0` Used to disable persistence of RNS paths in file system (enabled by default) +- `-DRNS_PERSIST_KNOWN_DESTINATIONS=0` Used to disable persistence of RNS known destinations in file system (enabled by default) +- `-DRNS_PERSIST_HASHLIST=0` Used to disable persistence of RNS packet hashlist in file system (enabled by default) - `-DRNS_USE_PROVISIONING` Used to enable the Provisioning subsystem (auto-started from `Reticulum::start()`). Disk persistence within the subsystem is additionally gated on `-DRNS_USE_FS`. Without this flag, none of the provisioning code is linked into the final binary — see the [Provisioning](#provisioning) section below. ## Memory Management Build Options diff --git a/examples/lora_announce/platformio.ini b/examples/lora_announce/platformio.ini index 1c2069c..bac2d7a 100644 --- a/examples/lora_announce/platformio.ini +++ b/examples/lora_announce/platformio.ini @@ -19,7 +19,6 @@ build_flags = -Wno-format ;-DRNS_MEM_LOG=1 -DRNS_USE_FS - -DRNS_PERSIST_PATHS -DUSTORE_USE_UNIVERSALFS lib_deps = ArduinoJson@^7.4.2 diff --git a/examples/lora_transport/platformio.ini b/examples/lora_transport/platformio.ini index 43d272b..5fdcdd4 100644 --- a/examples/lora_transport/platformio.ini +++ b/examples/lora_transport/platformio.ini @@ -19,7 +19,6 @@ build_flags = -Wno-format ;-DRNS_MEM_LOG=1 -DRNS_USE_FS - -DRNS_PERSIST_PATHS -DUSTORE_USE_UNIVERSALFS lib_deps = ArduinoJson@^7.4.2 diff --git a/examples/udp_transport/platformio.ini b/examples/udp_transport/platformio.ini index b140eaa..d7eed87 100644 --- a/examples/udp_transport/platformio.ini +++ b/examples/udp_transport/platformio.ini @@ -15,7 +15,6 @@ build_type = debug build_flags = ;-DRNS_MEM_LOG -DRNS_USE_FS - -DRNS_PERSIST_PATHS -DUSTORE_USE_UNIVERSALFS lib_deps = ArduinoJson@^7.4.2 diff --git a/platformio.ini b/platformio.ini index 26e3edf..a9b485e 100644 --- a/platformio.ini +++ b/platformio.ini @@ -33,6 +33,7 @@ build_flags = ; Only define LIBRARY_TEST when testing library directly -DLIBRARY_TEST ;-DNDEBUG + -DDEBUGLOG_DEFAULT_LOG_LEVEL_TRACE -DRNS_LOG_LEVEL=RNS_LOG_LEVEL_TRACE -DRNS_LOW_MEMORY_REBOOT -DRNS_DEBUG_HEAP @@ -40,9 +41,9 @@ build_flags = -DRNS_DEBUG_METRICS -DRNS_DEBUG_PATHSTORE -DRNS_USE_FS - -DRNS_PERSIST_PATHS -DRNS_USE_PROVISIONING -DRNS_ENABLE_REMOTE_PROVISIONING + -DUSTORE_ENABLE_LOG -DUSTORE_USE_UNIVERSALFS lib_deps = ArduinoJson@^7.4.2 diff --git a/src/microReticulum/Identity.cpp b/src/microReticulum/Identity.cpp index 5f1da7d..7a315b7 100644 --- a/src/microReticulum/Identity.cpp +++ b/src/microReticulum/Identity.cpp @@ -37,16 +37,28 @@ using namespace RNS::Utilities; #define RNS_KNOWN_DESTINATIONS_MAX 100 #endif +#ifndef RNS_KNOWN_DESTINATIONS_SEGMENT_SIZE +#define RNS_KNOWN_DESTINATIONS_SEGMENT_SIZE 65536 +#endif + +#ifndef RNS_KNOWN_DESTINATIONS_SEGMENT_COUNT +#define RNS_KNOWN_DESTINATIONS_SEGMENT_COUNT 8 +#endif + #ifndef RNS_IDENTITY_ANNOUNCE_RECALL #define RNS_IDENTITY_ANNOUNCE_RECALL 1 #endif -/*static*/ Identity::IdentityTable Identity::_known_destinations; -/*static*/ bool Identity::_saving_known_destinations = false; -// CBA -// CBA ACCUMULATES /*static*/ uint16_t Identity::_known_destinations_maxsize = RNS_KNOWN_DESTINATIONS_MAX; +/*static*/ uint32_t Identity::_known_store_segment_size = 0; +/*static*/ uint8_t Identity::_known_store_segment_count = 0; +#if defined(RNS_USE_FS) && RNS_PERSIST_KNOWN_DESTINATIONS +/*static*/ Persistence::KnownStore Identity::_known_store(RNS_KNOWN_DESTINATIONS_SEGMENT_SIZE, RNS_KNOWN_DESTINATIONS_SEGMENT_COUNT); +#else +/*static*/ Persistence::KnownStore Identity::_known_store; +#endif +/*static*/ Persistence::KnownDestinations Identity::_known_destinations(Identity::_known_store); Identity::Identity(bool create_keys /*= true*/) : _object(new Object()) { if (create_keys) { @@ -223,20 +235,9 @@ Can be used to load previously created and saved identities into Reticulum. if (public_key.size() != Type::Identity::KEYSIZE/8) { throw std::invalid_argument("Can't remember " + destination_hash.toHex() + ", the public key size of " + std::to_string(public_key.size()) + " is not valid."); } - else { - //p _known_destinations[destination_hash] = {OS::time(), packet_hash, public_key, app_data}; - // CBA ACCUMULATES - try { - _known_destinations.insert({destination_hash, {OS::time(), packet_hash, public_key, app_data}}); - // CBA IMMEDIATE CULL - cull_known_destinations(); - } - catch (const std::bad_alloc&) { - ERRORF("remember: bad_alloc - OUT OF MEMORY, identity not stored for %s", destination_hash.toHex().c_str()); - } - catch (const std::exception& e) { - ERRORF("remember: exception storing identity: %s", e.what()); - } + IdentityEntry entry(OS::time(), packet_hash, public_key, app_data); + if (!_known_destinations.put(destination_hash, entry)) { + ERRORF("remember: failed to store identity for %s", destination_hash.toHex().c_str()); } } @@ -249,10 +250,9 @@ Recall identity for a destination hash. /*static*/ Identity Identity::recall(const Bytes& destination_hash) { TRACE("Identity::recall..."); - auto iter = _known_destinations.find(destination_hash); - if (iter != _known_destinations.end()) { + IdentityEntry identity_data; + if (_known_destinations.get(destination_hash, identity_data) && identity_data) { TRACEF("Identity::recall: Found identity entry for destination %s", destination_hash.toHex().c_str()); - const IdentityEntry& identity_data = (*iter).second; Identity identity(false); identity.load_public_key(identity_data._public_key); identity.app_data(identity_data._app_data); @@ -305,154 +305,13 @@ Recall last heard app_data for a destination hash. */ /*static*/ Bytes Identity::recall_app_data(const Bytes& destination_hash) { TRACE("Identity::recall_app_data..."); - auto iter = _known_destinations.find(destination_hash); - if (iter != _known_destinations.end()) { + IdentityEntry identity_data; + if (_known_destinations.get(destination_hash, identity_data) && identity_data) { TRACEF("Identity::recall_app_data: Found identity entry for destination %s", destination_hash.toHex().c_str()); - const IdentityEntry& identity_data = (*iter).second; return identity_data._app_data; } - else { - TRACEF("Identity::recall_app_data: Unable to find identity entry for destination %s", destination_hash.toHex().c_str()); - return {Bytes::NONE}; - } -} - -/*static*/ bool Identity::save_known_destinations() { - // TODO: Improve the storage method so we don't have to - // deserialize and serialize the entire table on every - // save, but the only changes. It might be possible to - // simply overwrite on exit now that every local client - // disconnect triggers a data persist. - - bool success = false; - try { - if (_saving_known_destinations) { - double wait_interval = 0.2; - double wait_timeout = 5; - double wait_start = OS::time(); - while (_saving_known_destinations) { - OS::sleep(wait_interval); - if (OS::time() > (wait_start + wait_timeout)) { - ERROR("Could not save known destinations to storage, waiting for previous save operation timed out."); - return false; - } - } - } - - _saving_known_destinations = true; - double save_start = OS::time(); - - std::map storage_known_destinations; -// TODO -/* - if os.path.isfile(RNS.Reticulum.storagepath+"./known_destinations"): - try: - file = open(RNS.Reticulum.storagepath+"./known_destinations","rb") - storage_known_destinations = umsgpack.load(file) - file.close() - except: - pass -*/ - - for (auto& [destination_hash, identity_entry] : storage_known_destinations) { - if (_known_destinations.find(destination_hash) == _known_destinations.end()) { - //_known_destinations[destination_hash] = storage_known_destinations[destination_hash]; - //_known_destinations[destination_hash] = identity_entry; - // CBA ACCUMULATES - _known_destinations.insert({destination_hash, identity_entry}); - // CBA IMMEDIATE CULL - cull_known_destinations(); - } - } - -// TODO -/* - DEBUGF("Saving %lu known destinations to storage...", _known_destinations.size()); - file = open(RNS.Reticulum.storagepath+"./known_destinations","wb") - umsgpack.dump(Identity.known_destinations, file) - file.close() - DEBUGF("Saved known destinations to storage in %.3f seconds", OS::round(OS::time() - save_start, 3)); -*/ - - success = true; - } - catch (const std::exception& e) { - ERRORF("Error while saving known destinations to disk, the contained exception was: %s", e.what()); - } - - _saving_known_destinations = false; - - return success; -} - -/*static*/ void Identity::load_known_destinations() { -// TODO -/* - if os.path.isfile(RNS.Reticulum.storagepath+"./known_destinations"): - try: - file = open(RNS.Reticulum.storagepath+"./known_destinations","rb") - loaded_known_destinations = umsgpack.load(file) - file.close() - - Identity.known_destinations = {} - for known_destination in loaded_known_destinations: - if len(known_destination) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8: - Identity.known_destinations[known_destination] = loaded_known_destinations[known_destination] - - RNS.log("Loaded "+str(len(Identity.known_destinations))+" known destination from storage", RNS.LOG_VERBOSE) - except: - RNS.log("Error loading known destinations from disk, file will be recreated on exit", RNS.LOG_ERROR) - else: - RNS.log("Destinations file does not exist, no known destinations loaded", RNS.LOG_VERBOSE) -*/ - -} - -/*static*/ void Identity::cull_known_destinations() { - TRACE("Identity::cull_known_destinations()"); - if (_known_destinations.size() > _known_destinations_maxsize) { - try { - // Build lightweight (timestamp, key) index to avoid copying full IdentityEntry - // objects — prevents OOM on heap-constrained devices when the table is full. - std::vector> sorted_keys; - sorted_keys.reserve(_known_destinations.size()); - for (const auto& [key, entry] : _known_destinations) { - sorted_keys.emplace_back(entry._timestamp, key); - } - // Sort ascending by timestamp (oldest first) - std::sort(sorted_keys.begin(), sorted_keys.end()); - - uint16_t count = 0; - for (const auto& [timestamp, destination_hash] : sorted_keys) { - TRACEF("Identity::cull_known_destinations: Removing destination %s from known destinations", destination_hash.toHex().c_str()); - if (_known_destinations.erase(destination_hash) < 1) { - WARNINGF("Failed to remove destination %s from known destinations", destination_hash.toHex().c_str()); - } - ++count; - if (_known_destinations.size() <= _known_destinations_maxsize) { - break; - } - } - DEBUGF("Removed %d path(s) from known destinations", count); - } - catch (const std::bad_alloc& e) { - ERROR("cull_known_destinations: bad_alloc - OUT OF MEMORY building sort index, falling back to single erase"); - // Fallback: std::min_element does no heap allocation — erase one oldest entry - auto oldest = std::min_element( - _known_destinations.begin(), _known_destinations.end(), - [](const std::pair& a, - const std::pair& b) { - return a.second._timestamp < b.second._timestamp; - } - ); - if (oldest != _known_destinations.end()) { - _known_destinations.erase(oldest); - } - } - catch (const std::exception& e) { - ERRORF("cull_known_destinations: exception: %s", e.what()); - } - } + TRACEF("Identity::recall_app_data: Unable to find identity entry for destination %s", destination_hash.toHex().c_str()); + return {Bytes::NONE}; } /*static*/ bool Identity::validate_announce(const Packet& packet, bool only_validate_signature /*= false*/) { @@ -527,9 +386,8 @@ Recall last heard app_data for a destination hash. if (destination_hash == expected_hash) { // Check if we already have a public key for this destination // and make sure the public key is not different. - auto iter = _known_destinations.find(destination_hash); - if (iter != _known_destinations.end()) { - IdentityEntry& identity_entry = (*iter).second; + IdentityEntry identity_entry; + if (_known_destinations.get(destination_hash, identity_entry) && identity_entry) { if (public_key != identity_entry._public_key) { // In reality, this should never occur, but in the odd case // that someone manages a hash collision, we reject the announce. @@ -586,16 +444,6 @@ Recall last heard app_data for a destination hash. return false; } -/*static*/ void Identity::persist_data() { - if (!Transport::reticulum() || !Transport::reticulum().is_connected_to_shared_instance()) { - save_known_destinations(); - } -} - -/*static*/ void Identity::exit_handler() { - persist_data(); -} - /* Encrypts information for the identity. diff --git a/src/microReticulum/Identity.h b/src/microReticulum/Identity.h index 3a41eb2..aef2231 100644 --- a/src/microReticulum/Identity.h +++ b/src/microReticulum/Identity.h @@ -22,6 +22,7 @@ #include "Cryptography/X25519.h" #include "Cryptography/Token.h" #include "Utilities/Memory.h" +#include "Persistence/IdentityEntry.h" #include #include @@ -35,30 +36,16 @@ namespace RNS { class Identity { - private: - class IdentityEntry { - public: - IdentityEntry(double timestamp, const Bytes& packet_hash, const Bytes& public_key, const Bytes& app_data) : - _timestamp(timestamp), - _packet_hash(packet_hash), - _public_key(public_key), - _app_data(app_data) - { - } - public: - double _timestamp = 0; - Bytes _packet_hash; - Bytes _public_key; - Bytes _app_data; - }; - //using IdentityTable = std::map; - using IdentityTable = std::map, Utilities::Memory::ContainerAllocator>>; + public: + using IdentityEntry = Persistence::IdentityEntry; private: - static IdentityTable _known_destinations; - static bool _saving_known_destinations; + static Persistence::KnownStore _known_store; + static Persistence::KnownDestinations _known_destinations; // CBA static uint16_t _known_destinations_maxsize; + static uint32_t _known_store_segment_size; + static uint8_t _known_store_segment_count; public: Identity(bool create_keys = true); @@ -128,10 +115,6 @@ namespace RNS { static void remember(const Bytes& packet_hash, const Bytes& destination_hash, const Bytes& public_key, const Bytes& app_data = {Bytes::NONE}); static Identity recall(const Bytes& destination_hash); static Bytes recall_app_data(const Bytes& destination_hash); - static bool save_known_destinations(); - static void load_known_destinations(); - // CBA - static void cull_known_destinations(); /* Get a SHA-256 hash of passed data. @@ -165,8 +148,6 @@ namespace RNS { } static bool validate_announce(const Packet& packet, bool only_validate_signature = false); - static void persist_data(); - static void exit_handler(); // getters/setters inline const Bytes& encryptionPrivateKey() const { assert(_object); return _object->_prv_bytes; } @@ -182,9 +163,16 @@ namespace RNS { inline const Cryptography::X25519PublicKey::Ptr pub() const { assert(_object); return _object->_pub; } inline const Cryptography::Ed25519PublicKey::Ptr sig_pub() const { assert(_object); return _object->_sig_pub; } inline static uint16_t known_destinations_maxsize() { return _known_destinations_maxsize; } - inline static void known_destinations_maxsize(uint16_t known_destinations_maxsize) { _known_destinations_maxsize = known_destinations_maxsize; } + inline static void known_destinations_maxsize(uint16_t known_destinations_maxsize) { + _known_destinations_maxsize = known_destinations_maxsize; + _known_store.set_max_recs(_known_destinations_maxsize); + } + inline static uint32_t known_store_segment_size() { return _known_store_segment_size; } + inline static void known_store_segment_size(uint32_t value) { _known_store_segment_size = value; } + inline static uint8_t known_store_segment_count() { return _known_store_segment_count; } + inline static void known_store_segment_count(uint8_t value) { _known_store_segment_count = value; } - inline static const IdentityTable& known_destinations() { return _known_destinations; } + inline static const Persistence::KnownDestinations& known_destinations() { return _known_destinations; } inline std::string toString() const { if (!_object) return ""; return "{Identity:" + _object->_hash.toHex() + "}"; } diff --git a/src/microReticulum/Interface.cpp b/src/microReticulum/Interface.cpp index fc9b313..46cd09d 100644 --- a/src/microReticulum/Interface.cpp +++ b/src/microReticulum/Interface.cpp @@ -20,7 +20,7 @@ using namespace RNS; using namespace RNS::Type::Interface; -/*static*/ uint8_t Interface::DISCOVER_PATHS_FOR = MODE_ACCESS_POINT | MODE_GATEWAY; +/*static*/ uint8_t Interface::DISCOVER_PATHS_FOR = MODE_ACCESS_POINT | MODE_GATEWAY | MODE_ROAMING; void InterfaceImpl::handle_outgoing(const Bytes& data) { //TRACEF("InterfaceImpl.handle_outgoing: data: %s", data.toHex().c_str()); diff --git a/src/microReticulum/Interface.h b/src/microReticulum/Interface.h index 618c27e..2fbf0eb 100644 --- a/src/microReticulum/Interface.h +++ b/src/microReticulum/Interface.h @@ -24,6 +24,7 @@ #include #include #include +#include #include namespace RNS { @@ -60,6 +61,16 @@ namespace RNS { virtual bool start() { return true; } virtual void stop() {} virtual void loop() {} + // Called by Transport::detach_interfaces() during clean shutdown so + // subclasses can release resources (sockets, threads, hardware) before + // destruction. Default is a no-op; idempotency is the subclass's call. + virtual void detach() {} + // Stat hooks called by Transport::outbound() after a successful transmit + // of an announce or path-request packet. Subclasses can override to + // maintain per-interface frequency/burst counters (Python's oa_freq_deque + // and op_freq_deque equivalents). Defaults are no-ops. + virtual void sent_announce() {} + virtual void sent_path_request() {} // CBA Virtual override method for custom interface to send outgoing data virtual bool send_outgoing(const Bytes& data) = 0; @@ -91,6 +102,14 @@ namespace RNS { bool _FIXED_MTU = false; double _announce_allowed_at = 0; float _announce_cap = 0.0; + // Announce rate-limit configuration. 0 (target) means disabled; a positive + // target is the minimum interval (seconds) between announces from a given + // destination before that destination accumulates rate violations. After + // _announce_rate_grace consecutive violations, the destination is blocked + // for _announce_rate_target + _announce_rate_penalty seconds. + float _announce_rate_target = 0.0; + uint8_t _announce_rate_grace = 0; + float _announce_rate_penalty = 0.0; std::list _announce_queue; bool _is_connected_to_shared_instance = false; bool _is_local_shared_instance = false; @@ -104,6 +123,22 @@ namespace RNS { size_t _rxbytes = 0; size_t _txbytes = 0; + // Per-interface traffic counter state for Transport::count_traffic(). + // _traffic_counter_ts = 0.0 means the counter has not been initialised yet. + double _traffic_counter_ts = 0.0; + size_t _traffic_counter_rxb = 0; + size_t _traffic_counter_txb = 0; + double _current_rx_speed = 0.0; // bits/sec + double _current_tx_speed = 0.0; // bits/sec + + // Last received-packet signal-quality stats reported by the hardware. + // NaN means "not present" -- interface subclasses populate these + // synchronously with handle_incoming(bytes), and Transport::inbound + // snapshots the values onto the Packet at construction time. + float _r_stat_rssi = std::numeric_limits::quiet_NaN(); + float _r_stat_snr = std::numeric_limits::quiet_NaN(); + float _r_stat_q = std::numeric_limits::quiet_NaN(); + friend class Interface; }; @@ -192,7 +227,6 @@ namespace RNS { inline void FWD(bool FWD) { assert(_impl); _impl->_FWD = FWD; } inline void RPT(bool RPT) { assert(_impl); _impl->_RPT = RPT; } inline void name(const char* name) { assert(_impl); _impl->_name = name; } - inline void bitrate(uint32_t bitrate) { assert(_impl); _impl->_bitrate = bitrate; } inline void online(bool online) { assert(_impl); _impl->_online = online; } inline void announce_allowed_at(double announce_allowed_at) { assert(_impl); _impl->_announce_allowed_at = announce_allowed_at; } public: @@ -207,6 +241,13 @@ namespace RNS { inline Type::Interface::modes mode() const { assert(_impl); return _impl->_mode; } inline void mode(Type::Interface::modes mode) { assert(_impl); _impl->_mode = mode; } inline uint32_t bitrate() const { assert(_impl); return _impl->_bitrate; } + inline void bitrate(uint32_t bitrate) { assert(_impl); _impl->_bitrate = bitrate; } + inline float announce_rate_target() const { assert(_impl); return _impl->_announce_rate_target; } + inline void announce_rate_target(float seconds) { assert(_impl); _impl->_announce_rate_target = seconds; } + inline uint8_t announce_rate_grace() const { assert(_impl); return _impl->_announce_rate_grace; } + inline void announce_rate_grace(uint8_t violations) { assert(_impl); _impl->_announce_rate_grace = violations; } + inline float announce_rate_penalty() const { assert(_impl); return _impl->_announce_rate_penalty; } + inline void announce_rate_penalty(float seconds) { assert(_impl); _impl->_announce_rate_penalty = seconds; } inline uint16_t HW_MTU() const { assert(_impl); return _impl->_HW_MTU; } inline bool AUTOCONFIGURE_MTU() const { assert(_impl); return _impl->_AUTOCONFIGURE_MTU; } inline bool FIXED_MTU() const { assert(_impl); return _impl->_FIXED_MTU; } @@ -216,7 +257,29 @@ namespace RNS { inline size_t tx() const { assert(_impl); return _impl->_tx; } inline size_t rxbytes() const { assert(_impl); return _impl->_rxbytes; } inline size_t txbytes() const { assert(_impl); return _impl->_txbytes; } + inline double traffic_counter_ts() const { assert(_impl); return _impl->_traffic_counter_ts; } + inline size_t traffic_counter_rxb() const { assert(_impl); return _impl->_traffic_counter_rxb; } + inline size_t traffic_counter_txb() const { assert(_impl); return _impl->_traffic_counter_txb; } + inline double current_rx_speed() const { assert(_impl); return _impl->_current_rx_speed; } + inline double current_tx_speed() const { assert(_impl); return _impl->_current_tx_speed; } + inline float r_stat_rssi() const { assert(_impl); return _impl->_r_stat_rssi; } + inline void r_stat_rssi(float rssi) const { assert(_impl); _impl->_r_stat_rssi = rssi; } + inline float r_stat_snr() const { assert(_impl); return _impl->_r_stat_snr; } + inline void r_stat_snr(float snr) const { assert(_impl); _impl->_r_stat_snr = snr; } + inline float r_stat_q() const { assert(_impl); return _impl->_r_stat_q; } + inline void r_stat_q(float q) const { assert(_impl); _impl->_r_stat_q = q; } + inline void update_traffic_counter(double ts, size_t rxb, size_t txb) const { + assert(_impl); + _impl->_traffic_counter_ts = ts; + _impl->_traffic_counter_rxb = rxb; + _impl->_traffic_counter_txb = txb; + } + inline void current_rx_speed(double speed) const { assert(_impl); _impl->_current_rx_speed = speed; } + inline void current_tx_speed(double speed) const { assert(_impl); _impl->_current_tx_speed = speed; } inline std::list& announce_queue() const { assert(_impl); return _impl->_announce_queue; } + inline void detach() const { assert(_impl); _impl->detach(); } + inline void sent_announce() const { assert(_impl); _impl->sent_announce(); } + inline void sent_path_request() const { assert(_impl); _impl->sent_path_request(); } inline bool is_connected_to_shared_instance() const { assert(_impl); return _impl->_is_connected_to_shared_instance; } inline bool is_local_shared_instance() const { assert(_impl); return _impl->_is_local_shared_instance; } inline HInterface parent_interface() const { assert(_impl); return _impl->_parent_interface; } diff --git a/src/microReticulum/Link.cpp b/src/microReticulum/Link.cpp index ef6d4dd..c6b9ccd 100644 --- a/src/microReticulum/Link.cpp +++ b/src/microReticulum/Link.cpp @@ -31,6 +31,7 @@ #include #include +#include #include @@ -1161,6 +1162,14 @@ void Link::receive(const Packet& packet) { } _object->_rx += 1; _object->_rxbytes += packet.data().size(); + + // Snapshot per-packet signal-quality stats onto the link's + // "last received" fields so consumers can read link.rssi/snr/q + // without holding the original Packet. NaN means the source + // packet (or its receiving interface) didn't carry that metric. + if (!std::isnan(packet.rssi())) _object->_rssi = packet.rssi(); + if (!std::isnan(packet.snr())) _object->_snr = packet.snr(); + if (!std::isnan(packet.q())) _object->_q = packet.q(); if (_object->_status == STALE) { _object->_status = Type::Link::ACTIVE; } diff --git a/src/microReticulum/LinkData.h b/src/microReticulum/LinkData.h index 467ba57..3eb369a 100644 --- a/src/microReticulum/LinkData.h +++ b/src/microReticulum/LinkData.h @@ -26,6 +26,7 @@ #include "Type.h" #include "Cryptography/Token.h" +#include #include namespace RNS { @@ -64,9 +65,12 @@ namespace RNS { uint16_t _rx = 0; uint16_t _txbytes = 0; uint16_t _rxbytes = 0; - float _rssi = 0.0; - float _snr = 0.0; - float _q = 0.0; + // Last received-packet signal-quality stats snapshotted from Packet + // in Link::receive(). NaN means the source packet didn't carry that + // metric (e.g., received over a non-radio interface). + float _rssi = std::numeric_limits::quiet_NaN(); + float _snr = std::numeric_limits::quiet_NaN(); + float _q = std::numeric_limits::quiet_NaN(); uint8_t _traffic_timeout_factor = Type::Link::TRAFFIC_TIMEOUT_FACTOR; uint16_t _keepalive_timeout_factor = Type::Link::KEEPALIVE_TIMEOUT_FACTOR; uint16_t _keepalive = Type::Link::KEEPALIVE; diff --git a/src/microReticulum/Packet.h b/src/microReticulum/Packet.h index c9ed4c8..612b579 100644 --- a/src/microReticulum/Packet.h +++ b/src/microReticulum/Packet.h @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -280,6 +281,14 @@ namespace RNS { inline uint8_t flags() const { assert(_object); return _object->_flags; } inline uint8_t hops() const { assert(_object); return _object->_hops; } inline bool cached() const { assert(_object); return _object->_cached; } + inline bool is_outbound_pr() const { assert(_object); return _object->_is_outbound_pr; } + inline void is_outbound_pr(bool flag) { assert(_object); _object->_is_outbound_pr = flag; } + inline float rssi() const { assert(_object); return _object->_rssi; } + inline Packet& rssi(float rssi) { assert(_object); _object->_rssi = rssi; return *this; } + inline float snr() const { assert(_object); return _object->_snr; } + inline Packet& snr(float snr) { assert(_object); _object->_snr = snr; return *this; } + inline float q() const { assert(_object); return _object->_q; } + inline Packet& q(float q) { assert(_object); _object->_q = q; return *this; } inline const Bytes& packet_hash() const { assert(_object); return _object->_packet_hash; } inline const Bytes& destination_hash() const { assert(_object); return _object->_destination_hash; } inline const Bytes& transport_id() const { assert(_object); return _object->_transport_id; } @@ -367,14 +376,18 @@ namespace RNS { bool _truncated = false; // whether data was truncated bool _encrypted = false; // whether data is encrypted bool _cached = false; // whether packet has been cached + bool _is_outbound_pr = false; // set by Transport::request_path before send(); used by outbound() to fire interface.sent_path_request() PacketReceipt _receipt = {Type::NONE}; uint16_t _MTU = Type::Reticulum::MTU; double _sent_at = 0; - float _rssi = 0.0; - float _snr = 0.0; - float _q = 0.0; + // Signal-quality stats stamped by Transport::inbound from the + // receiving interface at packet-construction time. NaN means + // the receiving interface didn't report this metric. + float _rssi = std::numeric_limits::quiet_NaN(); + float _snr = std::numeric_limits::quiet_NaN(); + float _q = std::numeric_limits::quiet_NaN(); Bytes _packet_hash; Bytes _ratchet_id; diff --git a/src/microReticulum/Persistence/DestinationEntry.cpp b/src/microReticulum/Persistence/DestinationEntry.cpp index 8d38049..3ab141f 100644 --- a/src/microReticulum/Persistence/DestinationEntry.cpp +++ b/src/microReticulum/Persistence/DestinationEntry.cpp @@ -15,148 +15,130 @@ #include "DestinationEntry.h" #include "../Transport.h" -#include "../Type.h" + +#include using namespace RNS; using namespace RNS::Persistence; +// Encodes a DestinationEntry as a 7-element MsgPack array: +// [timestamp, hops, expires, received_from, random_blobs, receiving_interface, announce_packet] +// +// random_blobs is itself an array of length-prefixed binary blobs, tail-trimmed +// to PERSIST_RANDOM_BLOBS to bound on-disk size. Adding new fields in the future +// is a matter of bumping the outer array length and appending; old decoders see +// the extra elements as trailing array members they can ignore. /*static*/ std::vector microStore::Codec::encode(const DestinationEntry& entry) { // If invalid/empty entry then return empty if (!entry) return {}; - std::vector out; - - auto write = [&](const void* ptr, size_t len) - { - const uint8_t* p=(const uint8_t*)ptr; - out.insert(out.end(), p, p+len); - }; + MsgPack::Packer p; + p.packArraySize(7); // timestamp -//TRACEF("Writing %lu byte timestamp: %f", sizeof(entry._timestamp), entry._timestamp); - write(&entry._timestamp, sizeof(entry._timestamp)); + p.packFloat64(entry._timestamp); // hops -//TRACEF("Writing %lu byte hops: %u", sizeof(entry._hops), entry._hops); - write(&entry._hops, sizeof(entry._hops)); + p.serialize(entry._hops); // expires -//TRACEF("Writing %lu byte expires: %f", sizeof(entry._expires), entry._expires); - write(&entry._expires, sizeof(entry._expires)); + p.packFloat64(entry._expires); // received_from -//TRACEF("Writing %lu byte received_from", entry._received_from.collection().size()); - out.insert(out.end(), entry._received_from.collection().begin(), entry._received_from.collection().end()); - - // random_blobs - uint16_t blob_count = entry._random_blobs.size(); -//TRACEF("Writing %lu byte blob_count: %u", sizeof(blob_count), blob_count); - write(&blob_count, sizeof(blob_count)); - for (auto& blob : entry._random_blobs) { - uint16_t blob_size = blob.collection().size(); -//TRACEF("Writing %lu byte blob_size: %u", sizeof(blob_size), blob_size); - write(&blob_size, sizeof(blob_size)); -//TRACEF("Writing %lu byte blob", blob.collection().size()); - out.insert(out.end(), blob.collection().begin(), blob.collection().end()); + p.packBinary(entry._received_from.data(), entry._received_from.size()); + + // random_blobs -- write only the tail-newest PERSIST_RANDOM_BLOBS entries + const size_t persist_cap = Type::Transport::PERSIST_RANDOM_BLOBS; + const size_t total_blobs = entry._random_blobs.size(); + const size_t persist_n = (total_blobs > persist_cap) ? persist_cap : total_blobs; + const size_t start_idx = total_blobs - persist_n; + p.packArraySize(persist_n); + for (size_t i = start_idx; i < total_blobs; i++) { + const auto& blob = entry._random_blobs[i]; + p.packBinary(blob.data(), blob.size()); } - // receiving_interface + // receiving_interface (hash only; the live Interface is re-bound on decode) Bytes interface_hash(entry._receiving_interface.get_hash()); -//TRACEF("Writing %lu byte receiving_interface hash", interface_hash.collection().size()); - out.insert(out.end(), interface_hash.collection().begin(), interface_hash.collection().end()); - - // announce_packet - uint16_t packet_size = entry._announce_packet.raw().size(); -//TRACEF("Writing %lu byte packet_size: %u", sizeof(packet_size), packet_size); - write(&packet_size, sizeof(packet_size)); -//TRACEF("Writing %lu byte packet", entry._announce_packet.raw().collection().size()); - out.insert(out.end(), entry._announce_packet.raw().collection().begin(), entry._announce_packet.raw().collection().end()); + p.packBinary(interface_hash.data(), interface_hash.size()); -//TRACEF("Encoded %lu byte DestinationEntry", out.size()); + // announce_packet (raw bytes, including header) + const Bytes& raw = entry._announce_packet.raw(); + p.packBinary(raw.data(), raw.size()); - return out; + return std::vector(p.data(), p.data() + p.size()); } /*static*/ bool microStore::Codec::decode(const std::vector& data, DestinationEntry& entry) { - size_t pos = 0; + if (data.empty()) return false; -//TRACEF("Decoding %lu byte DestinationEntry", data.size()); + MsgPack::Unpacker u; + u.feed(data.data(), data.size()); - auto read=[&](void* dst, size_t len)->bool - { - if(pos+len > data.size()) return false; - memcpy(dst, &data[pos], len); - pos+=len; - return true; - }; + if (!u.isArray()) return false; + const size_t n = u.unpackArraySize(); + if (n < 7) return false; // timestamp - if(!read(&entry._timestamp, sizeof(entry._timestamp))) return false; -//TRACEF("Read %lu byte timestamp: %f", sizeof(entry._timestamp), entry._timestamp); + if (!u.deserialize(entry._timestamp)) return false; // hops - if(!read(&entry._hops, sizeof(entry._hops))) return false; -//TRACEF("Read %lu byte hops: %u", sizeof(entry._hops), entry._hops); + if (!u.deserialize(entry._hops)) return false; // expires - if(!read(&entry._expires, sizeof(entry._expires))) return false; -//TRACEF("Read %lu byte expires: %f", sizeof(entry._expires), entry._expires); + if (!u.deserialize(entry._expires)) return false; // received_from - if(!read((void*)entry._received_from.writable(Type::Reticulum::DESTINATION_LENGTH), Type::Reticulum::DESTINATION_LENGTH)) return false; - entry._received_from.resize(Type::Reticulum::DESTINATION_LENGTH); -//TRACEF("Read %lu byte received_from", entry._received_from.size()); + { + MsgPack::bin_t b; + if (!u.deserialize(b)) return false; + entry._received_from = Bytes(b.data(), b.size()); + } // random_blobs - uint16_t blob_count; - if(!read(&blob_count, sizeof(blob_count))) return false; -//TRACEF("Read %lu byte blob_count: %u", sizeof(blob_count), blob_count); - for (int i = 0; i < blob_count; i++) { - uint16_t blob_size; - if(!read(&blob_size, sizeof(blob_size))) return false; -//TRACEF("Read %lu byte blob_size: %u", sizeof(blob_size), blob_size); - Bytes blob(blob_size); - if(!read((void*)blob.writable(blob_size), blob_size)) return false; - blob.resize(blob_size); - entry._random_blobs.insert(blob); -//TRACEF("Read %lu byte blob", blob.size()); + { + if (!u.isArray()) return false; + const size_t blob_n = u.unpackArraySize(); + entry._random_blobs.clear(); + entry._random_blobs.reserve(blob_n); + for (size_t i = 0; i < blob_n; i++) { + MsgPack::bin_t b; + if (!u.deserialize(b)) return false; + entry._random_blobs.push_back(Bytes(b.data(), b.size())); + } } - // receiving_interface - Bytes interface_hash(Type::Reticulum::HASHLENGTH/8); - if(!read((void*)interface_hash.writable(Type::Reticulum::HASHLENGTH/8), Type::Reticulum::HASHLENGTH/8)) return false; - interface_hash.resize(Type::Reticulum::HASHLENGTH/8); -//TRACEF("Read %lu byte interface_hash", interface_hash.size()); - entry._receiving_interface = Transport::find_interface_from_hash(interface_hash); - if (!entry._receiving_interface) { - WARNINGF("Path Interface %s not found", interface_hash.toHex().c_str()); + // receiving_interface (rebind to live Interface by hash) + { + MsgPack::bin_t b; + if (!u.deserialize(b)) return false; + Bytes interface_hash(b.data(), b.size()); + entry._receiving_interface = Transport::find_interface_from_hash(interface_hash); + if (!entry._receiving_interface) { + WARNINGF("Path Interface %s not found", interface_hash.toHex().c_str()); + } } // announce_packet - uint16_t packet_size; - if(!read(&packet_size, sizeof(packet_size))) return false; -//TRACEF("Read %lu byte packet_size: %u", sizeof(packet_size), packet_size); - Bytes packet_data(packet_size); - if(!read((void*)packet_data.writable(packet_size), packet_size)) return false; - packet_data.resize(packet_size); - if (packet_data.size() > 0) { - entry._announce_packet = Packet(packet_data); + { + MsgPack::bin_t b; + if (!u.deserialize(b)) return false; + Bytes packet_data(b.data(), b.size()); + if (packet_data.size() > 0) { + entry._announce_packet = Packet(packet_data); + } } -//TRACEF("Read %lu byte packet", packet_data.size()); if (entry._announce_packet) { // Announce packet is cached in packed state // so we need to unpack it before accessing. -//TRACE("Unpacking packet..."); if (entry._announce_packet.unpack()) { -//TRACEF("Packet: %s", entry._announce_packet.debugString().c_str()); // We increase the hops, since reading a packet // from cache is equivalent to receiving it again - // over an interface. It is cached with it's non- + // over an interface. It is cached with its non- // increased hop-count. -//TRACE("Incrementing packet hop count..."); entry._announce_packet.hops(entry._announce_packet.hops() + 1); } } diff --git a/src/microReticulum/Persistence/DestinationEntry.h b/src/microReticulum/Persistence/DestinationEntry.h index bf33b5d..3b2473a 100644 --- a/src/microReticulum/Persistence/DestinationEntry.h +++ b/src/microReticulum/Persistence/DestinationEntry.h @@ -17,9 +17,10 @@ #include "../Interface.h" #include "../Packet.h" #include "../Bytes.h" +#include "../Type.h" // CBA microStore -#if defined(RNS_USE_FS) && defined(RNS_PERSIST_PATHS) +#if defined(RNS_USE_FS) && RNS_PERSIST_PATHS #include #else #include @@ -27,7 +28,7 @@ #include #include -#include +#include #include namespace RNS { namespace Persistence { @@ -35,7 +36,7 @@ namespace RNS { namespace Persistence { class DestinationEntry { public: DestinationEntry() {} - DestinationEntry(double timestamp, const RNS::Bytes& received_from, uint8_t announce_hops, double expires, const std::set& random_blobs, const Interface& receiving_interface, const Packet& announce_packet) : + DestinationEntry(double timestamp, const RNS::Bytes& received_from, uint8_t announce_hops, double expires, const std::vector& random_blobs, const Interface& receiving_interface, const Packet& announce_packet) : _timestamp(timestamp), _received_from(received_from), _hops(announce_hops), @@ -63,7 +64,7 @@ class DestinationEntry { RNS::Bytes _received_from; uint8_t _hops = 0; double _expires = 0; - std::set _random_blobs; + std::vector _random_blobs; // Oldest at front, newest at back; capped at MAX_RANDOM_BLOBS in callers Interface _receiving_interface = {Type::NONE}; Packet _announce_packet = {Type::NONE}; public: @@ -89,7 +90,7 @@ class DestinationEntry { //using PathTable = std::map; using PathTable = std::map, Utilities::Memory::ContainerAllocator>>; -#if defined(RNS_USE_FS) && defined(RNS_PERSIST_PATHS) +#if defined(RNS_USE_FS) && RNS_PERSIST_PATHS using PathStore = microStore::BasicFileStore>; #else using PathStore = microStore::BasicHeapStore>; diff --git a/src/microReticulum/Persistence/IdentityEntry.cpp b/src/microReticulum/Persistence/IdentityEntry.cpp new file mode 100644 index 0000000..c961635 --- /dev/null +++ b/src/microReticulum/Persistence/IdentityEntry.cpp @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2026 Chad Attermann + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +#include "IdentityEntry.h" + +#include + +using namespace RNS; +using namespace RNS::Persistence; + +// Encodes an IdentityEntry as a 4-element MsgPack array: +// [timestamp, packet_hash, public_key, app_data] +// +// New fields are appended by bumping the array length; old decoders see the +// extra elements as trailing array members they can ignore. +/*static*/ std::vector microStore::Codec::encode(const IdentityEntry& entry) { + + if (!entry) return {}; + + MsgPack::Packer p; + p.packArraySize(4); + + // timestamp + p.packFloat64(entry._timestamp); + + // packet_hash + p.packBinary(entry._packet_hash.data(), entry._packet_hash.size()); + + // public_key + p.packBinary(entry._public_key.data(), entry._public_key.size()); + + // app_data + p.packBinary(entry._app_data.data(), entry._app_data.size()); + + return std::vector(p.data(), p.data() + p.size()); +} + +/*static*/ bool microStore::Codec::decode(const std::vector& data, IdentityEntry& entry) { + if (data.empty()) return false; + + MsgPack::Unpacker u; + u.feed(data.data(), data.size()); + + if (!u.isArray()) return false; + const size_t n = u.unpackArraySize(); + if (n < 4) return false; + + // timestamp + if (!u.deserialize(entry._timestamp)) return false; + + // packet_hash + { + MsgPack::bin_t b; + if (!u.deserialize(b)) return false; + entry._packet_hash = Bytes(b.data(), b.size()); + } + + // public_key + { + MsgPack::bin_t b; + if (!u.deserialize(b)) return false; + entry._public_key = Bytes(b.data(), b.size()); + } + + // app_data + { + MsgPack::bin_t b; + if (!u.deserialize(b)) return false; + entry._app_data = Bytes(b.data(), b.size()); + } + + return true; +} diff --git a/src/microReticulum/Persistence/IdentityEntry.h b/src/microReticulum/Persistence/IdentityEntry.h new file mode 100644 index 0000000..1283ece --- /dev/null +++ b/src/microReticulum/Persistence/IdentityEntry.h @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2026 Chad Attermann + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +#pragma once + +#include "../Bytes.h" +#include "../Type.h" +#include "../Utilities/Memory.h" + +#if defined(RNS_USE_FS) && RNS_PERSIST_KNOWN_DESTINATIONS +#include +#else +#include +#endif +#include +#include + +namespace RNS { namespace Persistence { + +class IdentityEntry { +public: + IdentityEntry() {} + IdentityEntry(double timestamp, const RNS::Bytes& packet_hash, const RNS::Bytes& public_key, const RNS::Bytes& app_data) : + _timestamp(timestamp), + _packet_hash(packet_hash), + _public_key(public_key), + _app_data(app_data) + { + } + inline explicit operator bool() const { + // Treat presence of a public key as the validity test — callers that today + // compare find() against end() are really asking "do we have keys for this". + return _public_key.size() > 0; + } + inline bool operator < (const IdentityEntry& entry) const { + return _timestamp < entry._timestamp; + } +public: + double _timestamp = 0; + RNS::Bytes _packet_hash; + RNS::Bytes _public_key; + RNS::Bytes _app_data; +public: +#ifndef NDEBUG + inline std::string debugString() const { + return "IdentityEntry: timestamp=" + std::to_string(_timestamp) + + " packet_hash=" + _packet_hash.toHex() + + " public_key=" + _public_key.toHex() + + " app_data=" + _app_data.toHex(); + } +#endif +}; + +#if defined(RNS_USE_FS) && RNS_PERSIST_KNOWN_DESTINATIONS +using KnownStore = microStore::BasicFileStore>; +#else +using KnownStore = microStore::BasicHeapStore>; +#endif +using KnownDestinations = microStore::TypedStore; + +} } + +namespace microStore { +template<> +struct Codec +{ + static std::vector encode(const RNS::Persistence::IdentityEntry& entry); + static bool decode(const std::vector& data, RNS::Persistence::IdentityEntry& entry); +}; +} diff --git a/src/microReticulum/Provisioning/BuiltinNamespaces.cpp b/src/microReticulum/Provisioning/BuiltinNamespaces.cpp index dd59df8..0a3c67b 100644 --- a/src/microReticulum/Provisioning/BuiltinNamespaces.cpp +++ b/src/microReticulum/Provisioning/BuiltinNamespaces.cpp @@ -114,6 +114,10 @@ namespace RNS { namespace Provisioning { FF_LIVE_APPLY, (int64_t)RNS::Transport::max_pr_tags(), 1, 65535, [](const Value& v) { RNS::Transport::max_pr_tags((uint16_t)v.as_int()); return true; }, []() { return (int64_t)RNS::Transport::max_pr_tags(); }) + .field_int("Known Destinations Max Size", Ns::TransportConfig::Field::KnownDestinationsMaxsize, + FF_LIVE_APPLY, (int64_t)RNS::Identity::known_destinations_maxsize(), 1, 65535, + [](const Value& v) { RNS::Identity::known_destinations_maxsize((uint16_t)v.as_int()); return true; }, + []() { return (int64_t)RNS::Identity::known_destinations_maxsize(); }) .command_void("Clear Storage", Ns::TransportConfig::Field::ClearStorage, []() { RNS::Transport::clear_storage(); return true; }) .end(); @@ -123,6 +127,8 @@ namespace RNS { namespace Provisioning { if (!p.registry().find(Ns::Storage::Id)) { p.register_namespace("uReticulum Storage", Ns::Storage::Id) .metric_int("Paths", Ns::Storage::Field::Paths, []() { return RNS::Transport::new_path_table().size(); }) + .metric_int("Known Destinations", Ns::Storage::Field::KnownDestinations, []() { return RNS::Identity::known_destinations().size(); }) + .metric_int("Packet Hashes", Ns::Storage::Field::PacketHashes, []() { return RNS::Transport::packet_hashlist().size(); }) .metric_int("Destinations", Ns::Storage::Field::Destinations, []() { return RNS::Transport::destinations().size(); }) .metric_int("Announces", Ns::Storage::Field::Announces, []() { return RNS::Transport::announce_table().size(); }) .metric_int("Held Announces", Ns::Storage::Field::HeldAnnounces, []() { return RNS::Transport::held_announces().size(); }) @@ -134,7 +140,6 @@ namespace RNS { namespace Provisioning { .metric_int("Control Destinations", Ns::Storage::Field::ControlDestinations, []() { return RNS::Transport::control_destinations().size(); }) .metric_int("Control Hashes", Ns::Storage::Field::ControlHashes, []() { return RNS::Transport::control_hashes().size(); }) - .metric_int("Packet Hashes", Ns::Storage::Field::PacketHashes, []() { return RNS::Transport::packet_hashlist().size(); }) .metric_int("Reverse Hashes", Ns::Storage::Field::ReverseHashes, []() { return RNS::Transport::reverse_table().size(); }) .metric_int("Receipts", Ns::Storage::Field::Receipts, []() { return RNS::Transport::receipts().size(); }) @@ -143,7 +148,6 @@ namespace RNS { namespace Provisioning { .metric_int("Active Links", Ns::Storage::Field::ActiveLinks, []() { return RNS::Transport::active_links().size(); }) .metric_int("Tunnels", Ns::Storage::Field::Tunnels, []() { return RNS::Transport::tunnels().size(); }) - .metric_int("Known Destinations", Ns::Storage::Field::KnownDestinations, []() { return RNS::Identity::known_destinations().size(); }) .metric_int("Destination Path Responses", Ns::Storage::Field::DestinationPathResponses, []() { uint32_t destination_path_responses = 0; for (auto& [destination_hash, destination] : RNS::Transport::destinations()) { @@ -153,7 +157,7 @@ namespace RNS { namespace Provisioning { }) .metric_int("Queued Announces", Ns::Storage::Field::QueuedAnnounces, []() { uint32_t queued_announces = 0; - for (auto& [interface_hash, interface] : RNS::Transport::get_interfaces()) { + for (auto& interface : RNS::Transport::get_interfaces()) { queued_announces += interface.announce_queue().size(); } return queued_announces; @@ -163,16 +167,22 @@ namespace RNS { namespace Provisioning { } // Metrics - if (!p.registry().find(Ns::Metrics::Id)) { - p.register_namespace("uReticulum Metrics", Ns::Metrics::Id) - .metric_int("Packets Sent", Ns::Metrics::Field::PacketsSent, []() { return RNS::Transport::packets_sent(); }) - .metric_int("Packets Received", Ns::Metrics::Field::PacketsReceived, []() { return RNS::Transport::packets_received(); }) - .metric_int("Announces Received", Ns::Metrics::Field::AnnouncesReceived, []() { return RNS::Transport::announces_received(); }) - .metric_int("Path Requests Received", Ns::Metrics::Field::PathRequestsReceived, []() { return RNS::Transport::path_requests_received(); }) - .metric_int("Paths Added", Ns::Metrics::Field::PathsAdded, []() { return RNS::Transport::paths_added(); }) - .metric_int("Paths Updated", Ns::Metrics::Field::PathsUpdated, []() { return RNS::Transport::paths_updated(); }) - .metric_int("Paths Failed", Ns::Metrics::Field::PathsFailed, []() { return RNS::Transport::paths_failed(); }) - + if (!p.registry().find(Ns::Info::Id)) { + p.register_namespace("uReticulum Info", Ns::Info::Id) + .register_namespace("Addresses", Ns::Info::Addresses::Id) + .metric_bytes("Transport Identity", Ns::Info::Addresses::Field::TransportIdentity, []() { return RNS::Transport::identity() ? RNS::Transport::identity().hash() : RNS::Bytes{}; }) + .metric_bytes("Probe Destination", Ns::Info::Addresses::Field::ProbeDestination, []() { return RNS::Transport::probe_destination() ? RNS::Transport::probe_destination().hash() : RNS::Bytes{}; }) + .metric_bytes("Mgmt Destination", Ns::Info::Addresses::Field::MgmtDestination, []() { return RNS::Transport::remote_management_destination() ? RNS::Transport::remote_management_destination().hash() : RNS::Bytes{}; }) + .end() + .register_namespace("Metrics", Ns::Info::Metrics::Id) + .metric_int("Packets Sent", Ns::Info::Metrics::Field::PacketsSent, []() { return RNS::Transport::packets_sent(); }) + .metric_int("Packets Received", Ns::Info::Metrics::Field::PacketsReceived, []() { return RNS::Transport::packets_received(); }) + .metric_int("Announces Received", Ns::Info::Metrics::Field::AnnouncesReceived, []() { return RNS::Transport::announces_received(); }) + .metric_int("Path Requests Received", Ns::Info::Metrics::Field::PathRequestsReceived, []() { return RNS::Transport::path_requests_received(); }) + .metric_int("Paths Added", Ns::Info::Metrics::Field::PathsAdded, []() { return RNS::Transport::paths_added(); }) + .metric_int("Paths Updated", Ns::Info::Metrics::Field::PathsUpdated, []() { return RNS::Transport::paths_updated(); }) + .metric_int("Paths Failed", Ns::Info::Metrics::Field::PathsFailed, []() { return RNS::Transport::paths_failed(); }) + .end() .end(); } diff --git a/src/microReticulum/Provisioning/Ids.h b/src/microReticulum/Provisioning/Ids.h index a1a302d..ebe4625 100644 --- a/src/microReticulum/Provisioning/Ids.h +++ b/src/microReticulum/Provisioning/Ids.h @@ -37,7 +37,7 @@ namespace RNS { namespace Provisioning { namespace Ns { constexpr fid_t CleanInterval = 7; constexpr fid_t RemoteManagementAllowed = 8; // BytesList of 16-byte dest hashes constexpr fid_t TransportIdentity = 9; // Bytes (64) — private key; SECRET - constexpr fid_t ClearStorage = 10; // command (write-only): wipe persisted provisioning files + constexpr fid_t ClearStorage = 100; // command (write-only): wipe persisted provisioning files } } @@ -45,13 +45,14 @@ namespace RNS { namespace Provisioning { namespace Ns { namespace TransportConfig { constexpr nid_t Id = 2; namespace Field { - constexpr fid_t SchemaVersion = 0; // reserved - constexpr fid_t PathTableMaxsize = 1; - constexpr fid_t AnnounceTableMaxsize = 2; - constexpr fid_t HashlistMaxsize = 3; - constexpr fid_t MaxPrTags = 4; - constexpr fid_t PathTableMaxpersist = 5; - constexpr fid_t ClearStorage = 6; // command (write-only): Transport::clear_storage() + constexpr fid_t SchemaVersion = 0; // reserved + constexpr fid_t PathTableMaxsize = 1; + constexpr fid_t AnnounceTableMaxsize = 2; + constexpr fid_t HashlistMaxsize = 3; + constexpr fid_t MaxPrTags = 4; + constexpr fid_t PathTableMaxpersist = 5; + constexpr fid_t KnownDestinationsMaxsize = 6; + constexpr fid_t ClearStorage = 100; // command (write-only): Transport::clear_storage() } } @@ -88,19 +89,34 @@ namespace RNS { namespace Provisioning { namespace Ns { } // Metrics namespace - usage counts - namespace Metrics { + namespace Info { constexpr nid_t Id = 60; namespace Field { - constexpr fid_t SchemaVersion = 0; // reserved - constexpr fid_t PacketsSent = 1; - constexpr fid_t PacketsReceived = 2; - constexpr fid_t AnnouncesSent = 3; - constexpr fid_t AnnouncesReceived = 4; - constexpr fid_t PathRequestSent = 5; - constexpr fid_t PathRequestsReceived = 6; - constexpr fid_t PathsAdded = 7; - constexpr fid_t PathsUpdated = 8; - constexpr fid_t PathsFailed = 9; + constexpr fid_t SchemaVersion = 0; // reserved + } + namespace Addresses { + constexpr nid_t Id = 61; + namespace Field { + constexpr fid_t SchemaVersion = 0; // reserved + constexpr fid_t TransportIdentity = 1; + constexpr fid_t ProbeDestination = 2; + constexpr fid_t MgmtDestination = 3; + } + } + namespace Metrics { + constexpr nid_t Id = 62; + namespace Field { + constexpr fid_t SchemaVersion = 0; // reserved + constexpr fid_t PacketsSent = 1; + constexpr fid_t PacketsReceived = 2; + constexpr fid_t AnnouncesSent = 3; + constexpr fid_t AnnouncesReceived = 4; + constexpr fid_t PathRequestSent = 5; + constexpr fid_t PathRequestsReceived = 6; + constexpr fid_t PathsAdded = 7; + constexpr fid_t PathsUpdated = 8; + constexpr fid_t PathsFailed = 9; + } } } diff --git a/src/microReticulum/Reticulum.cpp b/src/microReticulum/Reticulum.cpp index 582cc2c..e9a9a6b 100644 --- a/src/microReticulum/Reticulum.cpp +++ b/src/microReticulum/Reticulum.cpp @@ -16,6 +16,7 @@ #include "Transport.h" #include "Log.h" +#include "Type.h" #include "Utilities/Memory.h" #ifdef RNS_USE_PROVISIONING @@ -46,6 +47,7 @@ using namespace RNS::Utilities; /*static*/ bool Reticulum::__remote_management_enabled = false; /*static*/ bool Reticulum::__use_implicit_proof = true; /*static*/ bool Reticulum::__allow_probes = false; +/*static*/ bool Reticulum::__publish_blackhole_enabled = false; /*static*/ bool Reticulum::panic_on_interface_error = false; /*static*/ uint16_t Reticulum::_persist_interval = PERSIST_INTERVAL; @@ -227,7 +229,7 @@ void Reticulum::loop() { } // Perform Interface processing - for (auto& [hash, interface] : Transport::get_interfaces()) { + for (auto& interface : Transport::get_interfaces()) { const_cast(interface).loop(); } @@ -317,7 +319,6 @@ void Reticulum::should_persist_data() { void Reticulum::persist_data() { TRACE("Persisting transport and identity data..."); Transport::persist_data(); - Identity::persist_data(); _object->_last_data_persist = OS::time(); } @@ -326,7 +327,7 @@ void Reticulum::clean_caches() { TRACE("Cleaning resource and packet caches..."); double now = OS::time(); -#if defined(RNS_USE_FS) && defined(RNS_PERSIST_PATHS) +#if defined(RNS_USE_FS) && RNS_PERSIST_PATHS /* // Clean resource caches for (auto& filename : OS::list_directory(resourcepath) { @@ -366,9 +367,6 @@ void Reticulum::clean_caches() { */ Transport::clean_caches(); - - // CBA - Identity::cull_known_destinations(); #endif _object->_last_cache_clean = OS::time(); diff --git a/src/microReticulum/Reticulum.h b/src/microReticulum/Reticulum.h index f954ff3..8c94acf 100644 --- a/src/microReticulum/Reticulum.h +++ b/src/microReticulum/Reticulum.h @@ -57,6 +57,7 @@ namespace RNS { static bool __remote_management_enabled; static bool __use_implicit_proof; static bool __allow_probes; + static bool __publish_blackhole_enabled; static bool panic_on_interface_error; static uint16_t _persist_interval; @@ -134,6 +135,12 @@ namespace RNS { inline static bool transport_enabled() { return __transport_enabled; } inline static void transport_enabled(bool transport_enabled) { __transport_enabled = transport_enabled; } + // Whether this instance publishes its local blackhole list over RNS via + // Transport's /list request handler. Default false (opt-in); enable per + // deployment to let trusted peers query this node's blackhole entries. + inline static bool publish_blackhole_enabled() { return __publish_blackhole_enabled; } + inline static void publish_blackhole_enabled(bool enabled) { __publish_blackhole_enabled = enabled; } + /* Returns whether link MTU discovery is enabled for the running instance. diff --git a/src/microReticulum/Transport.cpp b/src/microReticulum/Transport.cpp index 19b6abb..15f4592 100644 --- a/src/microReticulum/Transport.cpp +++ b/src/microReticulum/Transport.cpp @@ -32,6 +32,7 @@ #include #include +#include #include #include @@ -60,6 +61,14 @@ using namespace RNS::Persistence; #define RNS_HASHLIST_MAX 100 #endif +#ifndef RNS_HASHLIST_SEGMENT_SIZE +#define RNS_HASHLIST_SEGMENT_SIZE 32768 +#endif + +#ifndef RNS_HASHLIST_SEGMENT_COUNT +#define RNS_HASHLIST_SEGMENT_COUNT 2 +#endif + #ifndef RNS_PR_TAGS_MAX #define RNS_PR_TAGS_MAX 32 #endif @@ -72,7 +81,11 @@ using namespace RNS::Persistence; /*static*/ Transport::DestinationTable Transport::_destinations; /*static*/ std::set Transport::_pending_links; /*static*/ std::set Transport::_active_links; -/*static*/ Transport::BytesList Transport::_packet_hashlist; +#if defined(RNS_USE_FS) && RNS_PERSIST_HASHLIST +/*static*/ Transport::HashlistStore Transport::_packet_hashlist(RNS_HASHLIST_SEGMENT_SIZE, RNS_HASHLIST_SEGMENT_COUNT); +#else +/*static*/ Transport::HashlistStore Transport::_packet_hashlist; +#endif /*static*/ std::list Transport::_receipts; /*static*/ Transport::AnnounceTable Transport::_announce_table; @@ -87,6 +100,10 @@ using namespace RNS::Persistence; /*static*/ std::map Transport::_discovery_path_requests; /*static*/ Transport::BytesList Transport::_discovery_pr_tags; +/*static*/ Transport::PathStateTable Transport::_path_states; +/*static*/ Transport::PendingDiscoveryPRs Transport::_pending_discovery_prs; +/*static*/ double Transport::_pending_discovery_prs_last_tx = 0.0; +/*static*/ Transport::BlackholeTable Transport::_blackholed_identities; /*static*/ std::set Transport::_control_destinations; /*static*/ std::set Transport::_control_hashes; @@ -114,10 +131,18 @@ using namespace RNS::Persistence; /*static*/ float Transport::_receipts_check_interval = 1.0; /*static*/ double Transport::_announces_last_checked = 0.0; /*static*/ float Transport::_announces_check_interval = 1.0; +/*static*/ double Transport::_pending_prs_last_checked = 0.0; +/*static*/ float Transport::_pending_prs_check_interval = 30.0; /*static*/ double Transport::_tables_last_culled = 0.0; // CBA MCU /*static*/ //float Transport::_tables_cull_interval = 5.0; /*static*/ float Transport::_tables_cull_interval = 60.0; +/*static*/ double Transport::_traffic_last_checked = 0.0; +/*static*/ float Transport::_traffic_check_interval = 1.0; +/*static*/ double Transport::_interface_last_jobs = 0.0; +/*static*/ float Transport::_interface_jobs_interval = 5.0; +/*static*/ double Transport::_blackhole_last_checked = 0.0; +/*static*/ float Transport::_blackhole_check_interval = 60.0; /*static*/ double Transport::_last_mgmt_announce = 0.0; /*static*/ float Transport::_mgmt_announce_interval = 7200.0; /*static*/ bool Transport::_saving_path_table = false; @@ -155,6 +180,10 @@ using namespace RNS::Persistence; // CBA Stats /*static*/ uint32_t Transport::_packets_sent = 0; /*static*/ uint32_t Transport::_packets_received = 0; +/*static*/ uint64_t Transport::_traffic_rxb = 0; +/*static*/ uint64_t Transport::_traffic_txb = 0; +/*static*/ double Transport::_speed_rx = 0.0; +/*static*/ double Transport::_speed_tx = 0.0; /*static*/ uint32_t Transport::_announces_received = 0; /*static*/ uint32_t Transport::_path_requests_received = 0; /*static*/ uint32_t Transport::_paths_added = 0; @@ -169,7 +198,9 @@ using namespace RNS::Persistence; // CBA microStore /*static*/ uint32_t Transport::_path_store_segment_size = 0; /*static*/ uint8_t Transport::_path_store_segment_count = 0; -#if defined(RNS_USE_FS) && defined(RNS_PERSIST_PATHS) +/*static*/ uint32_t Transport::_hashlist_segment_size = 0; +/*static*/ uint8_t Transport::_hashlist_segment_count = 0; +#if defined(RNS_USE_FS) && RNS_PERSIST_PATHS /*static*/ PathStore Transport::_path_store(RNS_PATH_TABLE_SEGMENT_SIZE, RNS_PATH_TABLE_SEGMENT_COUNT); #else /*static*/ PathStore Transport::_path_store; @@ -184,9 +215,9 @@ DestinationEntry empty_destination_entry; _jobs_running = true; - // Wire size caps into the GenerationalSet containers (no-op if already set - // via hashlist_maxsize()/max_pr_tags() setters before start()). - _packet_hashlist.max_size(_hashlist_maxsize); + // Wire size caps (no-op if already set via hashlist_maxsize()/max_pr_tags() + // setters before start()). + _packet_hashlist.set_max_recs(_hashlist_maxsize); _discovery_pr_tags.max_size(_max_pr_tags); try { @@ -196,6 +227,8 @@ DestinationEntry empty_destination_entry; _receipts_last_checked = OS::time(); _announces_last_checked = OS::time(); _tables_last_culled = OS::time(); + _traffic_last_checked = OS::time(); + _blackhole_last_checked = OS::time(); _last_saved = OS::time(); // Ensure required directories exist @@ -230,24 +263,6 @@ DestinationEntry empty_destination_entry; } } -// TODO -/* - // Load packet hashlist - packet_hashlist_path = Reticulum::storagepath + "/packet_hashlist"; - if (!owner.is_connected_to_shared_instance()) { - if (os.path.isfile(packet_hashlist_path)) { - try { - //p file = open(packet_hashlist_path, "rb") - //p Transport.packet_hashlist = umsgpack.unpackb(file.read()) - //p file.close() - } - catch (const std::exception& e) { - ERRORF("Could not load packet hashlist from storage, the contained exception was: %s", e.what()); - } - } - } -*/ - // Create transport-specific destination for path request Destination path_request_destination({Type::NONE}, Type::Destination::IN, Type::Destination::PLAIN, APP_NAME, "path.request"); path_request_destination.set_packet_callback(path_request_handler); @@ -286,27 +301,37 @@ DestinationEntry empty_destination_entry; #endif } -/*p - //if (Reticulum::publish_blackhole_enabled() && !_owner.is_connected_to_shared_instance()) { + // Load any persisted blackhole list (always, regardless of publish flag). + reload_blackhole(); + + if (Reticulum::publish_blackhole_enabled() && !_owner.is_connected_to_shared_instance()) { _blackhole_destination = {_identity, Type::Destination::IN, Type::Destination::SINGLE, APP_NAME, "info.blackhole"}; _blackhole_destination.register_request_handler({"/list"}, blackhole_list_handler, Type::Destination::ALLOW_ALL); _mgmt_destinations.insert(_blackhole_destination); _mgmt_hashes.insert(_blackhole_destination.hash()); NOTICEF("Enabled blackhole list publishing for transport identity %s", _identity.hash().toHex().c_str()); - //} + } - //if (network_identity() && !_owner.is_connected_to_shared_instance()) { - //p Transport.instance_destination = RNS.Destination(Transport.network_identity, RNS.Destination.IN, RNS.Destination.SINGLE, Transport.APP_NAME, "network", "instance", RNS.hexrep(Transport.network_identity.hash, delimit=False)) + // If a network identity has been set on this transport, register two + // IN/SINGLE destinations under it: one specific to this instance + // ("network.instance.") and one shared across the named network + // ("network"). Both are added to the management announce rotation so + // peers can discover members of the network overlay. + // + // Python additionally gates this on `not is_connected_to_shared_instance` + // because a shared-instance master would own these destinations on the + // client's behalf. microReticulum does not support being a shared-instance + // client, so that guard collapses and we just check network_identity. + if (has_network_identity()) { std::string instance_aspect = "network.instance." + _network_identity.hash().toHex(); _instance_destination = {_network_identity, Type::Destination::IN, Type::Destination::SINGLE, APP_NAME, instance_aspect.c_str()}; - //p Transport.network_destination = RNS.Destination(Transport.network_identity, RNS.Destination.IN, RNS.Destination.SINGLE, Transport.APP_NAME, "network") _network_destination = {_network_identity, Type::Destination::IN, Type::Destination::SINGLE, APP_NAME, "network"}; _mgmt_destinations.insert(_instance_destination); _mgmt_destinations.insert(_network_destination); _mgmt_hashes.insert(_instance_destination.hash()); _mgmt_hashes.insert(_network_destination.hash()); - //} -*/ + NOTICEF("Registered network identity destinations under %s", _network_identity.hash().toHex().c_str()); + } } catch (const std::exception& e) { ERROR("An exception occurred while starting Transport."); @@ -326,7 +351,7 @@ DestinationEntry empty_destination_entry; // Read in path table //read_path_table(); -#if defined(RNS_USE_FS) && defined(RNS_PERSIST_PATHS) +#if defined(RNS_USE_FS) && RNS_PERSIST_PATHS // CBA microStore if (Utilities::OS::get_filesystem()) { INFOF("FileSystem available: %lu bytes", Utilities::OS::get_filesystem().storageAvailable()); @@ -334,7 +359,8 @@ DestinationEntry empty_destination_entry; #if defined(ARDUINO) microStore::set_time_offset(Utilities::OS::getTimeOffset() / 1000); #endif - _path_store.init(Utilities::OS::get_filesystem(), "./path_store", false, _path_store_segment_size, _path_store_segment_count); + TRACE("Initializing path table store..."); + _path_store.init(Utilities::OS::get_filesystem(), "./path_store/", false, _path_store_segment_size, _path_store_segment_count); // If the filesystem is full then clear the path store since it's of no use full anyway if (Utilities::OS::get_filesystem().storageAvailable() > 0 && Utilities::OS::get_filesystem().storageAvailable() < 1024) { WARNING("FileSystem is full, clearing existing path store"); @@ -343,6 +369,39 @@ DestinationEntry empty_destination_entry; } #endif // RNS_USE_FS && RNS_PERSIST_PATHS +#if defined(RNS_USE_FS) && RNS_PERSIST_KNOWN_DESTINATIONS + if (Utilities::OS::get_filesystem()) { + // CBA Must pass time offset into microStore for accurate timestamps on devices without a real-time clock +#if defined(ARDUINO) + microStore::set_time_offset(Utilities::OS::getTimeOffset() / 1000); +#endif + TRACE("Initializing known destinations store..."); + Identity::_known_store.init(Utilities::OS::get_filesystem(), "./known_store/", false, + Identity::_known_store_segment_size, Identity::_known_store_segment_count); + if (Utilities::OS::get_filesystem().storageAvailable() > 0 && Utilities::OS::get_filesystem().storageAvailable() < 1024) { + WARNING("FileSystem is full, clearing existing known destinations store"); + Identity::_known_store.clear(); + } + } +#endif // RNS_USE_FS && RNS_PERSIST_KNOWN_DESTINATIONS + Identity::_known_store.set_max_recs(Identity::_known_destinations_maxsize); + +#if defined(RNS_USE_FS) && RNS_PERSIST_HASHLIST + if (Utilities::OS::get_filesystem()) { + // CBA Must pass time offset into microStore for accurate timestamps on devices without a real-time clock +#if defined(ARDUINO) + microStore::set_time_offset(Utilities::OS::getTimeOffset() / 1000); +#endif + TRACE("Initializing packet hashlist store..."); + _packet_hashlist.init(Utilities::OS::get_filesystem(), "./hashlist_store/", false, + _hashlist_segment_size, _hashlist_segment_count); + if (Utilities::OS::get_filesystem().storageAvailable() > 0 && Utilities::OS::get_filesystem().storageAvailable() < 1024) { + WARNING("FileSystem is full, clearing existing packet hashlist store"); + _packet_hashlist.clear(); + } + } +#endif // RNS_USE_FS && RNS_PERSIST_HASHLIST + // CBA The following write and clean is very resource intensive so skip at startup // and let a later (optimized) scheduled write and clean take care of it. // Write path table back and clean caches in case any entries are invalid @@ -371,9 +430,8 @@ DestinationEntry empty_destination_entry; INFO("Transport mode is disabled"); } - // CBA TODO // Sort interfaces according to bitrate - //p Transport.prioritize_interfaces() + prioritize_interfaces(); // TODO /*p @@ -401,7 +459,7 @@ DestinationEntry empty_destination_entry; //TRACE("Transport::jobs()"); std::vector outgoing; - std::set path_requests; + std::map path_requests; // destination_hash -> blocked_interface ({NONE} = no interface to avoid) int count; _jobs_running = true; @@ -431,10 +489,8 @@ DestinationEntry empty_destination_entry; if ((OS::time() - last_path_request) > Type::Transport::PATH_REQUEST_MI) { DEBUGF("Trying to rediscover path for %s since an attempted link was never established", link.destination().hash().toHex().c_str()); - //if (path_requests.find(link.destination().hash()) == path_requests.end()) { if (path_requests.count(link.destination().hash()) == 0) { - // CBA ACCUMULATES - path_requests.insert(link.destination().hash()); + path_requests.emplace(link.destination().hash(), Interface{Type::NONE}); } } } @@ -587,28 +643,50 @@ DestinationEntry empty_destination_entry; _announces_last_checked = OS::time(); } - // CBA Culling no longer necessary since switch to GenerationalSet<> - /* // Cull the packet hashlist if it has reached its max size - if (_packet_hashlist.size() > _hashlist_maxsize) { - std::set::iterator iter = _packet_hashlist.begin(); - std::advance(iter, _packet_hashlist.size() - _hashlist_maxsize); - _packet_hashlist.erase(_packet_hashlist.begin(), iter); + // CBA microStore + // CBA Culling no longer necessary since switch to microStore + + // Cull invalidated path requests + if (OS::time() > (_pending_prs_last_checked + _pending_prs_check_interval)) { + std::vector stale_local_prs; + for (auto& [destination_hash, interface] : _pending_local_path_requests) { + if (!find_interface_from_hash(destination_hash)) { + stale_local_prs.push_back(destination_hash); + } + } + for (auto& destination_hash : stale_local_prs) { + _pending_local_path_requests.erase(destination_hash); + } + + _pending_prs_last_checked = OS::time(); } // Cull the path request tags list if it has reached its max size - if (_discovery_pr_tags.size() > _max_pr_tags) { - std::set::iterator iter = _discovery_pr_tags.begin(); - std::advance(iter, _discovery_pr_tags.size() - _max_pr_tags); - _discovery_pr_tags.erase(_discovery_pr_tags.begin(), iter); - } - */ + // CBA Culling no longer necessary since switch to GenerationalSet<> if (OS::time() > (_tables_last_culled + _tables_cull_interval)) { - // CBA Disabled following since we're calling immediately after adding to path table now - // Cull the path table if it has reached its max size - //cull_path_table(); + // Remove unneeded path state entries + try { + std::vector stale_path_states; + stale_path_states.reserve(_path_states.size()); + for (const auto& [destination_hash, state] : _path_states) { + DestinationEntry destination_entry; + if (!_new_path_table.get(destination_hash, destination_entry) || !destination_entry) { + stale_path_states.push_back(destination_hash); + } + } + for (const Bytes& destination_hash : stale_path_states) { + _path_states.erase(destination_hash); + } + } + catch (const std::bad_alloc&) { + ERROR("jobs: bad_alloc - out of memory culling path states"); + } + catch (const std::exception& e) { + ERRORF("jobs: failed to cull path states: %s", e.what()); + } // Cull the reverse table according to timeout try { @@ -652,7 +730,8 @@ DestinationEntry empty_destination_entry; bool path_request_throttle = (OS::time() - last_path_request) < PATH_REQUEST_MI; bool path_request_conditions = false; - + Interface blocked_if{Type::NONE}; + // If the path has been invalidated between the time of // making the link request and now, try to rediscover it if (!has_path(link_entry._destination_hash)) { @@ -671,25 +750,39 @@ DestinationEntry empty_destination_entry; // If the link destination was previously only 1 hop // away, this likely means that it was local to one // of our interfaces, and that it roamed somewhere else. - // In that case, try to discover a new path. + // In that case, try to discover a new path, and mark + // the old one as unresponsive. else if (!path_request_throttle && hops_to(link_entry._destination_hash) == 1) { DEBUGF("Trying to rediscover path for %s since an attempted link was never established, and destination was previously local to an interface on this instance", link_entry._destination_hash.toHex().c_str()); path_request_conditions = true; + blocked_if = link_entry._receiving_interface; + + if (Reticulum::transport_enabled()) { + if (link_entry._receiving_interface && link_entry._receiving_interface.mode() != Type::Interface::MODE_BOUNDARY) { + mark_path_unresponsive(link_entry._destination_hash); + } + } } - // If the link destination was previously only 1 hop - // away, this likely means that it was local to one - // of our interfaces, and that it roamed somewhere else. - // In that case, try to discover a new path. + // If the link initiator is only 1 hop away, + // this likely means that network topology has + // changed. In that case, we try to discover a new path, + // and mark the old one as potentially unresponsive. else if ( !path_request_throttle and lr_taken_hops == 1) { DEBUGF("Trying to rediscover path for %s since an attempted link was never established, and link initiator is local to an interface on this instance", link_entry._destination_hash.toHex().c_str()); path_request_conditions = true; + blocked_if = link_entry._receiving_interface; + + if (Reticulum::transport_enabled()) { + if (link_entry._receiving_interface && link_entry._receiving_interface.mode() != Type::Interface::MODE_BOUNDARY) { + mark_path_unresponsive(link_entry._destination_hash); + } + } } if (path_request_conditions) { if (path_requests.count(link_entry._destination_hash) == 0) { - // CBA ACCUMULATES - path_requests.insert(link_entry._destination_hash); + path_requests.emplace(link_entry._destination_hash, blocked_if); } if (!Reticulum::transport_enabled()) { @@ -711,45 +804,25 @@ DestinationEntry empty_destination_entry; ERRORF("jobs: failed to cull link table: %s", e.what()); } - // CBA microStore - // Path expiry handled internally by microStore -/* // Cull the path table - DEBUG("Culling path table..."); - try { - std::vector stale_paths; - stale_paths.reserve(_path_table.size()); - for (auto& [destination_hash, destination_entry] : _path_table) { - const Interface& attached_interface = destination_entry.receiving_interface(); - double destination_expiry; - if (attached_interface && attached_interface.mode() == Type::Interface::MODE_ACCESS_POINT) { - destination_expiry = destination_entry._timestamp + AP_PATH_TIME; - } - else if (attached_interface && attached_interface.mode() == Type::Interface::MODE_ROAMING) { - destination_expiry = destination_entry._timestamp + ROAMING_PATH_TIME; - } - else { - destination_expiry = destination_entry._timestamp + DESTINATION_TIMEOUT; - } + // CBA microStore + // CBA Culling of path table no longer necessary since switch to microStore - if (OS::time() > destination_expiry) { - stale_paths.push_back(destination_hash); - DEBUGF("Path to %s timed out and was removed", destination_hash.toHex().c_str()); - } - else if (_interfaces.count(attached_interface.get_hash()) == 0) { - stale_paths.push_back(destination_hash); - DEBUGF("Path to %s was removed since the attached interface no longer exists", destination_hash.toHex().c_str()); + // Cull the pending path requests table + try { + std::vector stale_path_requests; + for (const auto& [destination_hash, timestamp] : _path_requests) { + if (OS::time() > (timestamp + PATH_REQUEST_GATE_TIMEOUT)) { + stale_path_requests.push_back(destination_hash); } } - remove_paths(stale_paths); - } - catch (const std::bad_alloc&) { - ERROR("jobs: bad_alloc - out of memory culling path table"); + for (const Bytes& destination_hash : stale_path_requests) { + _path_requests.erase(destination_hash); + } } catch (const std::exception& e) { - ERRORF("jobs: failed to cull path table: %s", e.what()); + ERRORF("jobs: failed to cull path requests: %s", e.what()); } -*/ // Cull the pending discovery path requests table try { @@ -770,22 +843,6 @@ DestinationEntry empty_destination_entry; ERRORF("jobs: failed to cull discovery path requests: %s", e.what()); } - // Cull the path requests table - try { - std::vector stale_path_requests; - for (const auto& [destination_hash, timestamp] : _path_requests) { - if (OS::time() > (timestamp + DESTINATION_TIMEOUT)) { - stale_path_requests.push_back(destination_hash); - } - } - for (const Bytes& destination_hash : stale_path_requests) { - _path_requests.erase(destination_hash); - } - } - catch (const std::exception& e) { - ERRORF("jobs: failed to cull path requests: %s", e.what()); - } - // Cull the tunnel table try { count = 0; @@ -831,6 +888,60 @@ DestinationEntry empty_destination_entry; _tables_last_culled = OS::time(); } + // Check expired blackhole entries + if (OS::time() > (_blackhole_last_checked + _blackhole_check_interval)) { + try { + std::vector stale_blackholes; + double now = OS::time(); + for (const auto& [identity_hash, entry] : _blackholed_identities) { + if (entry._until > 0.0 && now > entry._until) { + stale_blackholes.push_back(identity_hash); + } + } + for (const Bytes& identity_hash : stale_blackholes) { + _blackholed_identities.erase(identity_hash); + } + if (!stale_blackholes.empty()) { + VERBOSEF("Removed %zu expired blackhole entries", stale_blackholes.size()); + } + } + catch (const std::exception& e) { + ERRORF("jobs: failed to expire blackhole entries: %s", e.what()); + } + _blackhole_last_checked = OS::time(); + } + + // Refresh per-interface and class-level traffic counters and speeds + if (OS::time() > (_traffic_last_checked + _traffic_check_interval)) { + try { + count_traffic(); + } + catch (const std::exception& e) { + ERRORF("jobs: failed to count traffic: %s", e.what()); + } + _traffic_last_checked = OS::time(); + } + + // Run interface-related jobs + if (OS::time() > (_interface_last_jobs + _interface_jobs_interval)) { + prioritize_interfaces(); + // TODO +/* + try { + for (auto& interface : _interfaces) { + interface.should_ingress_limit(); + interface.should_ingress_limit_pr(); + interface.process_held_announces(); + if (interface.phy_keepalive()) interface.send_keepalive(); + } + } + catch (const std::exception& e) { + ERRORF("Error while processing held per-interface announces: %s", e.what()); + } +*/ + _interface_last_jobs = OS::time(); + } + // CBA Periodically persist data //if (OS::time() > (_last_saved + _save_interval)) { // persist_data(); @@ -854,9 +965,28 @@ DestinationEntry empty_destination_entry; packet.send(); } - // CBA send link-related path requests - for (auto& destination_hash : path_requests) { - request_path(destination_hash); + // Queue link-related path requests into the bounded discovery PR queue + // for throttled transmission via handle_disovery_path_requests(). + if (!path_requests.empty()) { + for (const auto& [destination_hash, blocked_if] : path_requests) { + // Skip if this destination is already queued + bool already_queued = false; + for (const auto& entry : _pending_discovery_prs) { + if (entry._destination_hash == destination_hash) { + already_queued = true; + break; + } + } + if (already_queued) continue; + // Skip if queue is at capacity + if (_pending_discovery_prs.size() >= MAX_QUEUED_DISCOVERY_PRS) break; + _pending_discovery_prs.emplace_back(destination_hash, blocked_if); + } + } + + // Drain one queued discovery path request if the throttle has elapsed + if (!_pending_discovery_prs.empty()) { + handle_disovery_path_requests(); } // Send announces for management destinations @@ -1047,7 +1177,7 @@ DestinationEntry empty_destination_entry; else { TRACE("Transport::outbound: Path to destination is unknown"); bool stored_hash = false; - for (auto& [hash, interface] : _interfaces) { + for (auto& interface : _interfaces) { TRACEF("Transport::outbound: Checking interface %s", interface.toString().c_str()); if (interface.OUT()) { bool should_transmit = true; @@ -1239,7 +1369,7 @@ DestinationEntry empty_destination_entry; TRACE("Transport::outbound: Packet transmission allowed"); if (!stored_hash) { // CBA ACCUMULATES - _packet_hashlist.insert(packet.packet_hash()); + _packet_hashlist.put(packet.packet_hash().data(), (uint8_t)packet.packet_hash().size(), nullptr, 0); stored_hash = true; } @@ -1251,6 +1381,16 @@ DestinationEntry empty_destination_entry; // thread.start() sent = transmit(interface, packet.raw()); + + // Per-interface stat hooks. Matches Python Transport.py:1323-1324. + if (sent) { + if (packet.packet_type() == Type::Packet::ANNOUNCE) { + interface.sent_announce(); + } + if (packet.destination().type() == Type::Destination::PLAIN && packet.is_outbound_pr()) { + interface.sent_path_request(); + } + } } else { TRACE("Transport::outbound: Packet transmission refused"); @@ -1343,7 +1483,7 @@ DestinationEntry empty_destination_entry; } } - if (!_packet_hashlist.contains(packet.packet_hash())) { + if (!_packet_hashlist.exists(packet.packet_hash().data(), (uint8_t)packet.packet_hash().size())) { TRACE("Transport::packet_filter: packet not previously seen"); return true; } @@ -1420,7 +1560,7 @@ DestinationEntry empty_destination_entry; } } - if (!_packet_hashlist.contains(packet.packet_hash())) { + if (!_packet_hashlist.exists(packet.packet_hash().data(), (uint8_t)packet.packet_hash().size())) { TRACE("Transport::packet_filter: packet not previously seen"); return true; } @@ -1547,28 +1687,19 @@ DestinationEntry empty_destination_entry; packet.receiving_interface(interface); packet.hops(packet.hops() + 1); -// TODO -/*p + // Stamp signal-quality stats from the receiving interface onto the packet. + // The interface subclass is expected to populate r_stat_rssi/snr/q + // synchronously with handle_incoming() so the values describe THIS packet. + // A NaN value means the interface didn't report that metric. Python keeps + // a class-level cache keyed by packet_hash so shared-instance clients can + // look up signal stats via RPC; microReticulum doesn't support being a + // shared-instance client, so we stamp the packet directly and skip the + // cache. if (interface) { - if hasattr(interface, "r_stat_rssi"): - if interface.r_stat_rssi != None: - packet.rssi = interface.r_stat_rssi - if len(Transport.local_client_interfaces) > 0: - Transport.local_client_rssi_cache.append([packet.packet_hash, packet.rssi]) - - while len(Transport.local_client_rssi_cache) > Transport.LOCAL_CLIENT_CACHE_MAXSIZE: - Transport.local_client_rssi_cache.pop() - - if hasattr(interface, "r_stat_snr"): - if interface.r_stat_rssi != None: - packet.snr = interface.r_stat_snr - if len(Transport.local_client_interfaces) > 0: - Transport.local_client_snr_cache.append([packet.packet_hash, packet.snr]) - - while len(Transport.local_client_snr_cache) > Transport.LOCAL_CLIENT_CACHE_MAXSIZE: - Transport.local_client_snr_cache.pop() + if (!std::isnan(interface.r_stat_rssi())) packet.rssi(interface.r_stat_rssi()); + if (!std::isnan(interface.r_stat_snr())) packet.snr(interface.r_stat_snr()); + if (!std::isnan(interface.r_stat_q())) packet.q(interface.r_stat_q()); } -*/ if (_local_client_interfaces.size() > 0) { if (is_local_client_interface(interface)) { @@ -1611,7 +1742,7 @@ DestinationEntry empty_destination_entry; } if (remember_packet_hash) { // CBA ACCUMULATES - _packet_hashlist.insert(packet.packet_hash()); + _packet_hashlist.put(packet.packet_hash().data(), (uint8_t)packet.packet_hash().size(), nullptr, 0); } // CBA Currently this packet cache is a noop since it's not forced @@ -1675,7 +1806,7 @@ DestinationEntry empty_destination_entry; if (packet.destination_type() == Type::Destination::PLAIN && packet.transport_type() == Type::Transport::BROADCAST) { // Send to all interfaces except the one the packet was recieved on if (from_local_client) { - for (auto& [hash, interface] : _interfaces) { + for (auto& interface : _interfaces) { if (interface != packet.receiving_interface()) { TRACEF("Transport::inbound: Broadcasting packet on %s", interface.toString().c_str()); transmit(interface, packet.raw()); @@ -1913,7 +2044,7 @@ DestinationEntry empty_destination_entry; transmit(outbound_interface, new_raw); link_entry._timestamp = OS::time(); // Deferred hashlist insertion for link transport packets - _packet_hashlist.insert(packet.packet_hash()); + _packet_hashlist.put(packet.packet_hash().data(), (uint8_t)packet.packet_hash().size(), nullptr, 0); } else { //p pass @@ -1988,8 +2119,8 @@ DestinationEntry empty_destination_entry; //p random_blob = packet.data[RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8:RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8+10] Bytes random_blob = packet.data().mid(Type::Identity::KEYSIZE/8 + Type::Identity::NAME_HASH_LENGTH/8, Type::Identity::RANDOM_HASH_LENGTH/8); //p random_blobs = [] - std::set empty_random_blobs; - std::set& random_blobs = empty_random_blobs; + std::vector empty_random_blobs; + std::vector& random_blobs = empty_random_blobs; TRACEF("Checking for existing path to %s", packet.destination_hash().toHex().c_str()); // CBA microStore //auto& destination_entry = get_path(packet.destination_hash()); @@ -2007,12 +2138,15 @@ DestinationEntry empty_destination_entry; // less, we'll update our tables. if (packet.hops() <= destination_entry._hops) { // Make sure we haven't heard the random - // blob before, so announces can't be - // replayed to forge paths. - // TODO: Check whether this approach works - // under all circumstances - //p if not random_blob in random_blobs: - if (random_blobs.find(random_blob) == random_blobs.end()) { + // blob before, and that the announce is + // newer than any we've already seen for + // this path. Together this prevents both + // replay forgery and acceptance of an + // out-of-order older announce. + uint64_t path_timebase = timebase_from_random_blobs(random_blobs); + if (std::find(random_blobs.begin(), random_blobs.end(), random_blob) == random_blobs.end() + && announce_emitted > path_timebase) { + mark_path_unknown_state(packet.destination_hash()); should_add = true; } else { @@ -2040,10 +2174,11 @@ DestinationEntry empty_destination_entry; // We also check that the announce is // different from ones we've already heard, // to avoid loops in the network - if (random_blobs.find(random_blob) == random_blobs.end()) { + if (std::find(random_blobs.begin(), random_blobs.end(), random_blob) == random_blobs.end()) { // TODO: Check that this ^ approach actually // works under all circumstances DEBUGF("Replacing destination table entry for %s with new announce due to expired path", packet.destination_hash().toHex().c_str()); + mark_path_unknown_state(packet.destination_hash()); should_add = true; } else { @@ -2052,8 +2187,23 @@ DestinationEntry empty_destination_entry; } else { if (announce_emitted > path_announce_emitted) { - if (random_blobs.find(random_blob) == random_blobs.end()) { + if (std::find(random_blobs.begin(), random_blobs.end(), random_blob) == random_blobs.end()) { DEBUGF("Replacing destination table entry for %s with new announce, since it was more recently emitted", packet.destination_hash().toHex().c_str()); + mark_path_unknown_state(packet.destination_hash()); + should_add = true; + } + else { + should_add = false; + } + } + + // If we have already heard this announce before, + // but the path has been marked as unresponsive + // by a failed communications attempt or similar, + // allow updating the path table to this one. + else if (announce_emitted == path_announce_emitted) { + if (path_is_unresponsive(packet.destination_hash())) { + DEBUGF("Replacing destination table entry for %s with new announce, since previously tried path was unresponsive", packet.destination_hash().toHex().c_str()); should_add = true; } else { @@ -2069,46 +2219,73 @@ DestinationEntry empty_destination_entry; should_add = true; } + // DIVERGENCE + // Python (Transport.py:313-337) only purges blackholed paths + // at three triggers: explicit blackhole_identity() call, + // reload_blackhole() at startup, and the 60s expiry sweep. + // New announces from a blackholed identity still get accepted + // into the path table and stay there until the next manual + // purge. To make blackholing actually effective on a leaf + // node we filter inline here: if the announce's associated + // identity is blackholed, reject path acceptance now. + if (should_add && !_blackholed_identities.empty()) { + Identity announced_identity = Identity::recall(packet.destination_hash()); + if (announced_identity && is_blackholed(announced_identity.hash())) { + DEBUGF("Dropping announce from blackholed identity %s", announced_identity.hash().toHex().c_str()); + should_add = false; + } + } + if (should_add) { double now = OS::time(); bool rate_blocked = false; -// TODO -/*p - if packet.context != RNS.Packet.PATH_RESPONSE and packet.receiving_interface.announce_rate_target != None: - if not packet.destination_hash in Transport.announce_rate_table: - rate_entry = { "last": now, "rate_violations": 0, "blocked_until": 0, "timestamps": [now]} - Transport.announce_rate_table[packet.destination_hash] = rate_entry - - else: - rate_entry = Transport.announce_rate_table[packet.destination_hash] - rate_entry["timestamps"].append(now) - - while len(rate_entry["timestamps"]) > Transport.MAX_RATE_TIMESTAMPS: - rate_entry["timestamps"].pop(0) - - current_rate = now - rate_entry["last"] - - if now > rate_entry["blocked_until"]: - - if current_rate < packet.receiving_interface.announce_rate_target: - rate_entry["rate_violations"] += 1 + // Announce rate-limit enforcement. The receiving interface opts in + // by setting announce_rate_target > 0 on itself. Repeated announces + // from the same destination faster than the target accumulate + // rate_violations; once over the grace count, the destination is + // blocked for target+penalty seconds. + if (packet.context() != Type::Packet::PATH_RESPONSE + && packet.receiving_interface() + && packet.receiving_interface().announce_rate_target() > 0) { + auto iter = _announce_rate_table.find(packet.destination_hash()); + if (iter == _announce_rate_table.end()) { + _announce_rate_table.emplace(packet.destination_hash(), RateEntry(now)); + } + else { + RateEntry& rate_entry = iter->second; + rate_entry._timestamps.push_back(now); + while (rate_entry._timestamps.size() > Type::Transport::MAX_RATE_TIMESTAMPS) { + rate_entry._timestamps.erase(rate_entry._timestamps.begin()); + } - else: - rate_entry["rate_violations"] = std::max(0, rate_entry["rate_violations"]-1) + const double current_rate = now - rate_entry._last; - if rate_entry["rate_violations"] > packet.receiving_interface.announce_rate_grace: - rate_target = packet.receiving_interface.announce_rate_target - rate_penalty = packet.receiving_interface.announce_rate_penalty - rate_entry["blocked_until"] = rate_entry["last"] + rate_target + rate_penalty - rate_blocked = True - else: - rate_entry["last"] = now + if (now > rate_entry._blocked_until) { + const float rate_target = packet.receiving_interface().announce_rate_target(); + if (current_rate < rate_target) { + rate_entry._rate_violations += 1.0; + } + else { + rate_entry._rate_violations = std::max(0.0, rate_entry._rate_violations - 1.0); + } - else: - rate_blocked = True -*/ + const uint8_t rate_grace = packet.receiving_interface().announce_rate_grace(); + if (rate_entry._rate_violations > rate_grace) { + const float rate_penalty = packet.receiving_interface().announce_rate_penalty(); + rate_entry._blocked_until = rate_entry._last + rate_target + rate_penalty; + rate_blocked = true; + } + else { + rate_entry._last = now; + } + } + else { + rate_blocked = true; + } + } + } uint8_t retries = 0; uint8_t announce_hops = packet.hops(); @@ -2129,7 +2306,16 @@ DestinationEntry empty_destination_entry; expires = now + PATHFINDER_E; } - random_blobs.insert(random_blob); + // Append the new blob and cap the list at MAX_RANDOM_BLOBS, + // dropping oldest entries from the front. Matches Python's + // `random_blobs = random_blobs[-MAX_RANDOM_BLOBS:]` semantics. + if (std::find(random_blobs.begin(), random_blobs.end(), random_blob) == random_blobs.end()) { + random_blobs.push_back(random_blob); + if (random_blobs.size() > MAX_RANDOM_BLOBS) { + random_blobs.erase(random_blobs.begin(), + random_blobs.begin() + (random_blobs.size() - MAX_RANDOM_BLOBS)); + } + } if ((Reticulum::transport_enabled() || Transport::from_local_client(packet)) && packet.context() != Type::Packet::PATH_RESPONSE) { // Insert announce into announce table for retransmission @@ -2330,6 +2516,7 @@ DestinationEntry empty_destination_entry; } if (_new_path_table.put(packet.destination_hash().collection(), destination_table_entry, ttl)) { TRACEF("Added destination %s to path table!", packet.destination_hash().toHex().c_str()); + mark_path_unknown_state(packet.destination_hash()); if (path_found) ++_paths_updated; else ++_paths_added; } @@ -2421,10 +2608,53 @@ DestinationEntry empty_destination_entry; auto& destination = (*iter).second; if (destination.type() == packet.destination_type()) { TRACE("Transport::inbound: Found local destination for LINKREQUEST"); - packet.destination(destination); - // CBA iterator over std::set is always const so need to make temporarily mutable - //destination.receive(packet); - destination.receive(packet); + + // MTU clamping. The link request carries the initiator's + // proposed path MTU in its trailing LINK_MTU_SIZE bytes. + // If our receiving interface asks for autoconfigured or + // fixed MTU, clamp the path MTU down to our hardware MTU + // before the destination parses the link request. + // Matches Python Transport.py:2099-2118. + bool drop_packet = false; + uint16_t path_mtu = Link::mtu_from_lr_packet(packet); + Type::Link::link_mode mode = Link::mode_from_lr_packet(packet); + uint16_t nh_mtu = 0; + if (packet.receiving_interface().AUTOCONFIGURE_MTU() + || packet.receiving_interface().FIXED_MTU()) { + nh_mtu = packet.receiving_interface().HW_MTU(); + } + else { + nh_mtu = Type::Reticulum::MTU; + } + + if (path_mtu > 0) { + const Bytes& orig = packet.data(); + if (packet.receiving_interface().HW_MTU() == 0) { + // No hardware MTU known on the receiving + // interface; strip the trailing MTU bytes so + // the destination sees a plain link request. + if (orig.size() >= Type::Link::LINK_MTU_SIZE) { + packet.data(orig.left(orig.size() - Type::Link::LINK_MTU_SIZE)); + } + } + else if (nh_mtu < path_mtu) { + try { + Bytes clamped = Link::signalling_bytes(nh_mtu, mode); + if (orig.size() >= Type::Link::LINK_MTU_SIZE) { + packet.data(orig.left(orig.size() - Type::Link::LINK_MTU_SIZE) + clamped); + } + } + catch (const std::exception& e) { + WARNINGF("Dropping link request packet to local destination: %s", e.what()); + drop_packet = true; + } + } + } + + if (!drop_packet) { + packet.destination(destination); + destination.receive(packet); + } } } } @@ -2439,9 +2669,22 @@ DestinationEntry empty_destination_entry; std::set active_links(_active_links); for (auto& link : active_links) { if (link.link_id() == packet.destination_hash()) { - TRACE("Transport::inbound: Packet is DATA for an active LINK"); - packet.link(link); - const_cast(link).receive(packet); + if (link.attached_interface() == packet.receiving_interface()) { + TRACE("Transport::inbound: Packet is DATA for an active LINK"); + packet.link(link); + const_cast(link).receive(packet); + } + else { + // In the strange and rare case that an interface is + // partly malfunctioning and a link-associated packet + // arrives on an interface that has failed sending -- + // and transport has failed over to another path -- + // drop the packet hash from the dedup filter so the + // link can still receive the packet when it finally + // arrives over the correct interface. + _packet_hashlist.remove(packet.packet_hash().data(), (uint8_t)packet.packet_hash().size()); + } + break; } } } @@ -2748,17 +2991,20 @@ DestinationEntry empty_destination_entry; /*static*/ void Transport::register_interface(Interface& interface) { TRACEF("Transport: Registering interface %s %s", interface.get_hash().toHex().c_str(), interface.toString().c_str()); - _interfaces.insert({interface.get_hash(), interface}); + if (!find_interface_from_hash(interface.get_hash())) { + _interfaces.push_back(interface); + } // CBA TODO set or add transport as listener on interface to receive incoming packets? } /*static*/ void Transport::deregister_interface(const Interface& interface) { TRACEF("Transport: Deregistering interface %s", interface.toString().c_str()); - auto iter = _interfaces.find(interface.get_hash()); - if (iter != _interfaces.end()) { - TRACEF("Transport::deregister_interface: Found and removing interface %s", (*iter).second.toString().c_str()); - _interfaces.erase(iter); - } + const Bytes hash = interface.get_hash(); + _interfaces.erase( + std::remove_if(_interfaces.begin(), _interfaces.end(), + [&](const Interface& i) { return i.get_hash() == hash; }), + _interfaces.end() + ); } /*static*/ void Transport::register_destination(Destination& destination) { @@ -2855,21 +3101,15 @@ Deregisters an announce handler. } /*static*/ bool Transport::is_interface_from_hash(const Bytes& interface_hash) { - auto iter = _interfaces.find(interface_hash); - if (iter != _interfaces.end()) { - return true; - } - - return false; + return (bool)find_interface_from_hash(interface_hash); } /*static*/ Interface Transport::find_interface_from_hash(const Bytes& interface_hash) { - auto iter = _interfaces.find(interface_hash); + auto iter = std::find_if(_interfaces.begin(), _interfaces.end(), + [&](const Interface& i) { return i.get_hash() == interface_hash; }); if (iter != _interfaces.end()) { - //TRACEF("Transport::find_interface_from_hash: Found interface %s", (*iter).second.toString().c_str()); - return (*iter).second; + return *iter; } - return {Type::NONE}; } @@ -2890,7 +3130,7 @@ Deregisters an announce handler. // the packet cache. /*static*/ bool Transport::cache_packet(const Packet& packet, bool force_cache /*= false*/) { //TRACEF("Checking to see if packet %s should be cached", packet.get_hash().toHex().c_str()); -#if defined(RNS_USE_FS) && defined(RNS_PERSIST_PATHS) +#if defined(RNS_USE_FS) && RNS_PERSIST_PATHS if (should_cache_packet(packet) || force_cache) { TRACEF("Saving packet %s to storage", packet.get_hash().toHex().c_str()); try { @@ -2907,7 +3147,7 @@ Deregisters an announce handler. } /*static*/ bool Transport::is_cached_packet(const Bytes& packet_hash) { -#if defined(RNS_USE_FS) && defined(RNS_PERSIST_PATHS) +#if defined(RNS_USE_FS) && RNS_PERSIST_PATHS try { char packet_cache_path[Type::Reticulum::FILEPATH_MAXSIZE]; snprintf(packet_cache_path, Type::Reticulum::FILEPATH_MAXSIZE, "%s/%s", Reticulum::_cachepath, packet_hash.toHex().c_str()); @@ -2922,7 +3162,7 @@ Deregisters an announce handler. /*static*/ Packet Transport::get_cached_packet(const Bytes& packet_hash) { TRACEF("Loading packet %s from cache storage", packet_hash.toHex().c_str()); -#if defined(RNS_USE_FS) && defined(RNS_PERSIST_PATHS) +#if defined(RNS_USE_FS) && RNS_PERSIST_PATHS try { char packet_cache_path[Type::Reticulum::FILEPATH_MAXSIZE]; snprintf(packet_cache_path, Type::Reticulum::FILEPATH_MAXSIZE, "%s/%s", Reticulum::_cachepath, packet_hash.toHex().c_str()); @@ -2941,7 +3181,7 @@ Deregisters an announce handler. /*static*/ bool Transport::clear_cached_packet(const Bytes& packet_hash) { TRACEF("Clearing packet %s from cache storage", packet_hash.toHex().c_str()); -#if defined(RNS_USE_FS) && defined(RNS_PERSIST_PATHS) +#if defined(RNS_USE_FS) && RNS_PERSIST_PATHS try { char packet_cache_path[Type::Reticulum::FILEPATH_MAXSIZE]; snprintf(packet_cache_path, Type::Reticulum::FILEPATH_MAXSIZE, "%s/%s", Reticulum::_cachepath, packet_hash.toHex().c_str()); @@ -3216,6 +3456,125 @@ Deregisters an announce handler. */ +/*static*/ bool Transport::mark_path_unresponsive(const Bytes& destination_hash) { + DestinationEntry destination_entry; + if (_new_path_table.get(destination_hash, destination_entry) && destination_entry) { + _path_states[destination_hash] = STATE_UNRESPONSIVE; + return true; + } + return false; +} + +/*static*/ bool Transport::mark_path_responsive(const Bytes& destination_hash) { + DestinationEntry destination_entry; + if (_new_path_table.get(destination_hash, destination_entry) && destination_entry) { + _path_states[destination_hash] = STATE_RESPONSIVE; + return true; + } + return false; +} + +/*static*/ bool Transport::mark_path_unknown_state(const Bytes& destination_hash) { + DestinationEntry destination_entry; + if (_new_path_table.get(destination_hash, destination_entry) && destination_entry) { + _path_states[destination_hash] = STATE_UNKNOWN; + return true; + } + return false; +} + +/*static*/ bool Transport::path_is_unresponsive(const Bytes& destination_hash) { + auto iter = _path_states.find(destination_hash); + if (iter != _path_states.end()) { + return iter->second == STATE_UNRESPONSIVE; + } + return false; +} + +// Sorts _interfaces in place by bitrate descending so subsequent iteration +// in outbound paths, announce broadcast and discovery PR fanout prefers +// higher-bitrate interfaces. Called from start() and once per jobs() tick; +// std::sort over a small vector (typical n <= a few) is effectively free. +/*static*/ void Transport::prioritize_interfaces() { + try { + std::sort(_interfaces.begin(), _interfaces.end(), + [](const Interface& a, const Interface& b) { + return a.bitrate() > b.bitrate(); + }); + } + catch (const std::exception& e) { + ERRORF("Could not prioritize interfaces according to bitrate: %s", e.what()); + } +} + +// Refreshes interface byte-counter snapshots, per-interface current rx/tx +// speeds, and class-level cumulative byte totals and aggregate speeds. Child +// interfaces (those with a parent_interface) are skipped to avoid double- +// counting traffic already attributed to the parent. In Python this runs as +// a sleeping background thread (count_traffic_loop); here it is called once +// per tick of jobs() and gated by _traffic_check_interval. +/*static*/ void Transport::count_traffic() { + uint64_t rxb = 0; + uint64_t txb = 0; + double rxs = 0.0; + double txs = 0.0; + + for (const auto& interface : _interfaces) { + if (interface.parent_interface()) continue; + + double now = OS::time(); + size_t irxb = interface.rxbytes(); + size_t itxb = interface.txbytes(); + + if (interface.traffic_counter_ts() != 0.0) { + size_t rx_diff = irxb - interface.traffic_counter_rxb(); + size_t tx_diff = itxb - interface.traffic_counter_txb(); + double ts_diff = now - interface.traffic_counter_ts(); + rxb += rx_diff; + txb += tx_diff; + double crxs = 0.0; + double ctxs = 0.0; + if (ts_diff > 0) { + crxs = (static_cast(rx_diff) * 8.0) / ts_diff; + ctxs = (static_cast(tx_diff) * 8.0) / ts_diff; + } + interface.current_rx_speed(crxs); rxs += crxs; + interface.current_tx_speed(ctxs); txs += ctxs; + } + + interface.update_traffic_counter(now, irxb, itxb); + } + + _traffic_rxb += rxb; + _traffic_txb += txb; + _speed_rx = rxs; + _speed_tx = txs; +} + +// Drains one entry from the pending discovery path requests queue if the +// per-transmission throttle (DISCOVERY_PR_TX_THROTTLE) has elapsed. In Python +// this runs as a sleeping background thread; here we are called once per +// jobs() tick and drain at most one entry to enforce the same average rate. +/*static*/ void Transport::handle_disovery_path_requests() { + if (_pending_discovery_prs.empty()) return; + if (OS::time() < (_pending_discovery_prs_last_tx + DISCOVERY_PR_TX_THROTTLE)) return; + + PendingDiscoveryPREntry entry = _pending_discovery_prs.front(); + _pending_discovery_prs.pop_front(); + _pending_discovery_prs_last_tx = OS::time(); + + if (!entry._blocked_interface) { + request_path(entry._destination_hash); + } + else { + for (const auto& interface : _interfaces) { + if (interface.get_hash() != entry._blocked_interface.get_hash()) { + request_path(entry._destination_hash, interface); + } + } + } +} + /*p @staticmethod @@ -3302,6 +3661,7 @@ will announce it. } } + packet.is_outbound_pr(true); packet.send(); _path_requests[destination_hash] = OS::time(); } @@ -3346,13 +3706,20 @@ static std::string remote_status_interface_type_name(const Interface& iface) { // (e.g. heltec_wifi_lora_32_V4); the const-char-pointer overload produces // the same msgpack str wire format on all platforms. static void remote_status_pack_interface(MsgPack::Packer& p, const Interface& iface) { - // 11 entries: name, short_name, hash, type, status, mode, clients, - // bitrate, rxb, txb, announce_queue. + // 18 entries: name, short_name, hash, type, status, mode, clients, + // bitrate, rx, rxb, tx, txb, rxs, txs, rssi, snr, q, + // announce_queue. // // `clients` is accessed unguarded at rnstatus.py:429 (ifstat["clients"]), // so the key MUST be present even when no meaningful value -- emit nil // to signal "not a shared instance / no connected clients". - p.packMapSize(11); + // + // rssi / snr / q are a microReticulum-specific extension Python's + // get_interface_stats doesn't emit. Forward-compatible -- Python clients + // ignore unknown keys; clients that look for them get the receiving + // interface's last reported signal-quality stats, with nil for "not + // reported by this interface" (e.g. non-radio interfaces such as UDP). + p.packMapSize(18); // Cache std::string values so .c_str() / .size() reference stable storage. const std::string name_str = iface.toString(); @@ -3400,53 +3767,74 @@ static void remote_status_pack_interface(MsgPack::Packer& p, const Interface& if p.pack("txb"); p.serialize(static_cast(iface.txbytes())); + p.pack("rxs"); + p.packFloat64(iface.current_rx_speed()); + + p.pack("txs"); + p.packFloat64(iface.current_tx_speed()); + + p.pack("rssi"); + if (std::isnan(iface.r_stat_rssi())) { + p.packNil(); + } else { + p.packFloat64(static_cast(iface.r_stat_rssi())); + } + + p.pack("snr"); + if (std::isnan(iface.r_stat_snr())) { + p.packNil(); + } else { + p.packFloat64(static_cast(iface.r_stat_snr())); + } + + p.pack("q"); + if (std::isnan(iface.r_stat_q())) { + p.packNil(); + } else { + p.packFloat64(static_cast(iface.r_stat_q())); + } + p.pack("announce_queue"); p.serialize(static_cast(iface.announce_queue().size())); } // Builds the top-level stats map. Order matches Python's get_interface_stats(). -// Emits 5 keys: interfaces, rxb, txb, rxs, txs. transport_id / +// Emits 7 keys: interfaces, rx, rxb, tx, txb, rxs, txs. transport_id / // transport_uptime can be added once Transport exposes them. static Bytes remote_status_build_stats_payload() { MsgPack::Packer p; - p.packMapSize(5); + p.packMapSize(7); auto& interfaces = Transport::get_interfaces(); p.pack("interfaces"); p.packArraySize(static_cast(interfaces.size())); - for (auto& kv : interfaces) { - remote_status_pack_interface(p, kv.second); + for (auto& iface : interfaces) { + remote_status_pack_interface(p, iface); } uint64_t total_rx = 0; - uint64_t total_rxb = 0; uint64_t total_tx = 0; - uint64_t total_txb = 0; - for (auto& kv : interfaces) { - total_rx += kv.second.rx(); - total_rxb += kv.second.rxbytes(); - total_txb += kv.second.tx(); - total_txb += kv.second.txbytes(); + for (auto& iface : interfaces) { + total_rx += iface.rx(); + total_tx += iface.tx(); } p.pack("rx"); p.serialize(total_rx); p.pack("rxb"); - p.serialize(total_rxb); + p.serialize(Transport::traffic_rxb()); p.pack("tx"); p.serialize(total_tx); p.pack("txb"); - p.serialize(total_txb); + p.serialize(Transport::traffic_txb()); - // Current rx/tx speeds: C++ Interface base does not track these yet; - // rnstatus renders 0 sensibly. Emit as float64 to match Python. p.pack("rxs"); - p.packFloat64(0.0); + p.packFloat64(Transport::speed_rx()); p.pack("txs"); - p.packFloat64(0.0); + p.packFloat64(Transport::speed_tx()); return Bytes(p.data(), p.size()); } @@ -3848,6 +4236,13 @@ TRACEF("announce_packet str: %s", announce_packet.toString().c_str()); else { // TODO: Look at this timing retransmit_timeout = now + Type::Transport::PATH_REQUEST_GRACE /*+ (RNS.rand() * Transport.PATHFINDER_RW)*/; + + // If we are answering on a roaming-mode interface, wait a + // little longer, to allow potential more well-connected + // peers to answer first. + if (attached_interface && attached_interface.mode() == Type::Interface::MODE_ROAMING) { + retransmit_timeout += Type::Transport::PATH_REQUEST_RG; + } } // This handles an edge case where a peer sends a past @@ -3927,7 +4322,7 @@ TRACEF("announce_packet str: %s", announce_packet.toString().c_str()); // except the local client DEBUGF("Forwarding path request from local client for destination %s%s to all other interfaces", destination_hash.toHex().c_str(), interface_str.c_str()); Bytes request_tag = Identity::get_random_hash(); - for (auto& [hash, interface] : _interfaces) { + for (auto& interface : _interfaces) { if (interface != attached_interface) { request_path(destination_hash, interface, request_tag); } @@ -3951,7 +4346,7 @@ TRACEF("announce_packet str: %s", announce_packet.toString().c_str()); attached_interface }}); - for (auto& [hash, interface] : _interfaces) { + for (auto& interface : _interfaces) { #if RNS_SAME_INTERFACE_PATH_REQUESTS // DIVERGENCE // CBA EXPERIMENTAL forwarding path requests even on requestor interface in order to support @@ -4016,32 +4411,48 @@ TRACEF("announce_packet str: %s", announce_packet.toString().c_str()); } } +// Clean-shutdown sequence for the interface layer: tear down all links so peers +// see the connection drop, drain for 150ms to let teardown packets leave, then +// call detach() on every interface so they can release resources before +// destruction. Python additionally orders LocalServerInterface and +// LocalClientInterface teardown separately for its shared-instance protocol; +// microReticulum does not support shared-instance clients/servers, so those +// branches collapse to the unified loop here. /*static*/ void Transport::detach_interfaces() { -// TODO -/*p - detachable_interfaces = [] + TRACE("Transport::detach_interfaces()"); + + size_t closed_links = 0; + // Iterate by value -- Link is a value-type wrapping a shared impl, so a + // copy still routes teardown() to the same underlying object. std::set + // yields const references which can't call the non-const teardown(). + for (Link link : _active_links) { + try { link.teardown(); closed_links++; } + catch (const std::exception& e) { + WARNINGF("Could not tear down active link before interface detach: %s", e.what()); + } + } + for (Link link : _pending_links) { + try { link.teardown(); closed_links++; } + catch (const std::exception& e) { + WARNINGF("Could not tear down pending link before interface detach: %s", e.what()); + } + } - for interface in Transport.interfaces: - // Currently no rules are being applied - // here, and all interfaces will be sent - // the detach call on RNS teardown. - if True: - detachable_interfaces.append(interface) - else: - pass - - for interface in Transport.local_client_interfaces: - // Currently no rules are being applied - // here, and all interfaces will be sent - // the detach call on RNS teardown. - if True: - detachable_interfaces.append(interface) - else: - pass + // Provide a 150ms window to allow link teardown packets to leave local transport + if (closed_links > 0) { + OS::sleep(0.15f); + } - for interface in detachable_interfaces: - interface.detach() -*/ + DEBUG("Detaching interfaces"); + for (Interface& iface : _interfaces) { + try { + iface.detach(); + } + catch (const std::exception& e) { + ERRORF("Error while detaching %s: %s", iface.toString().c_str(), e.what()); + } + } + DEBUG("All interfaces detached"); } /*static*/ void Transport::shared_connection_disappeared() { @@ -4073,21 +4484,18 @@ TRACEF("announce_packet str: %s", announce_packet.toString().c_str()); */ } +// Empties every interface's pending announce queue. Useful when shutting down +// cleanly without flushing buffered outbound announces, or when an interface +// is being deregistered. /*static*/ void Transport::drop_announce_queues() { -// TODO -/*p - for interface in Transport.interfaces: - if hasattr(interface, "announce_queue") and interface.announce_queue != None: - na = len(interface.announce_queue) - if na > 0: - if na == 1: - na_str = "1 announce" - else: - na_str = str(na)+" announces" - - interface.announce_queue = [] - RNS.log("Dropped "+na_str+" on "+str(interface), RNS.LOG_VERBOSE) -*/ + for (Interface& iface : _interfaces) { + auto& queue = iface.announce_queue(); + size_t na = queue.size(); + if (na > 0) { + queue.clear(); + VERBOSEF("Dropped %zu announce%s on %s", na, na == 1 ? "" : "s", iface.toString().c_str()); + } + } } /*p @@ -4115,50 +4523,242 @@ TRACEF("announce_packet str: %s", announce_packet.toString().c_str()); return 0; } -/*static*/ void Transport::write_packet_hashlist() { -#if defined(RNS_USE_FS) && defined(RNS_PERSIST_PATHS) -// TODO -/*p - if not Transport.owner.is_connected_to_shared_instance: - if hasattr(Transport, "saving_packet_hashlist"): - wait_interval = 0.2 - wait_timeout = 5 - wait_start = time.time() - while Transport.saving_packet_hashlist: - time.sleep(wait_interval) - if time.time() > wait_start+wait_timeout: - RNS.log("Could not save packet hashlist to storage, waiting for previous save operation timed out.", RNS.LOG_ERROR) - return False +// Extracts the 5-byte big-endian emission timebase from offset 5 of a single +// random_blob. Returns 0 if the blob is shorter than 10 bytes. +/*static*/ uint64_t Transport::timebase_from_random_blob(const Bytes& random_blob) { + if (random_blob.size() < 10) { + return 0; + } + return OS::from_bytes_big_endian(random_blob.data() + 5, 5); +} - try: - Transport.saving_packet_hashlist = True - save_start = time.time() +// Returns the maximum emission timebase across all stored random blobs for +// a path. Used during announce ingestion to reject announces that are older +// than what we've already seen for the same destination. +/*static*/ uint64_t Transport::timebase_from_random_blobs(const std::vector& random_blobs) { + uint64_t timebase = 0; + for (const Bytes& blob : random_blobs) { + uint64_t emitted = timebase_from_random_blob(blob); + if (emitted > timebase) { + timebase = emitted; + } + } + return timebase; +} - if not RNS.Reticulum.transport_enabled(): - Transport.packet_hashlist = [] - else: - RNS.log("Saving packet hashlist to storage...", RNS.LOG_DEBUG) +// Adds an identity to the blackhole list. Source is set to this transport's +// own identity hash. If until > 0 it is treated as a unix-timestamp expiry; +// 0 means permanent. After insertion, any paths in the path table associated +// with the blackholed identity are removed and the local blackhole file is +// rewritten. Returns true if a new entry was added, false on error or if the +// identity was already blackholed. +/*static*/ bool Transport::blackhole_identity(const Bytes& identity_hash, double until, const std::string& reason) { + try { + if (_blackholed_identities.find(identity_hash) != _blackholed_identities.end()) { + return false; + } + _blackholed_identities.emplace(identity_hash, BlackholeEntry(_identity.hash(), until, reason)); + persist_blackhole(); + remove_blackholed_paths(); + INFOF("Blackholed identity %s", identity_hash.toHex().c_str()); + return true; + } + catch (const std::exception& e) { + ERRORF("Error while blackholing identity: %s", e.what()); + return false; + } +} - packet_hashlist_path = RNS.Reticulum.storagepath+"/packet_hashlist" - file = open(packet_hashlist_path, "wb") - file.write(umsgpack.packb(Transport.packet_hashlist)) - file.close() +// Removes a previously-blackholed identity from the list and rewrites the +// local blackhole file. Returns true if an entry was removed. +/*static*/ bool Transport::unblackhole_identity(const Bytes& identity_hash) { + try { + auto iter = _blackholed_identities.find(identity_hash); + if (iter == _blackholed_identities.end()) { + return false; + } + _blackholed_identities.erase(iter); + persist_blackhole(); + INFOF("Lifted blackhole for identity %s", identity_hash.toHex().c_str()); + return true; + } + catch (const std::exception& e) { + ERRORF("Error while unblackholing identity: %s", e.what()); + return false; + } +} - DEBUGF("Saved packet hashlist in %.3f seconds", OS::round(time.time() - save_start)) +// Quick membership check used by the inline announce filter. +/*static*/ bool Transport::is_blackholed(const Bytes& identity_hash) { + return _blackholed_identities.find(identity_hash) != _blackholed_identities.end(); +} - except Exception as e: - RNS.log("Could not save packet hashlist to storage, the contained exception was: "+str(e), RNS.LOG_ERROR) +// Loads the persisted blackhole list from {storagepath}/blackhole_local and +// then purges any path-table entries associated with the loaded identities. +// Entries with an expired 'until' timestamp are silently skipped during load. +// Called once during Transport::start(). +/*static*/ void Transport::reload_blackhole() { + std::string path = std::string(Reticulum::storagepath()) + "/blackhole_local"; + if (!OS::file_exists(path.c_str())) { + return; + } - Transport.saving_packet_hashlist = False -*/ -#endif + Bytes data; + size_t n = OS::read_file(path.c_str(), data); + if (n == 0 || data.size() == 0) { + return; + } + + try { + MsgPack::Unpacker u; + u.feed(data.data(), data.size()); + if (!u.isMap()) { + WARNING("Blackhole file is not a msgpack map; ignoring"); + return; + } + const size_t map_size = u.unpackMapSize(); + double now = OS::time(); + size_t loaded = 0; + for (size_t i = 0; i < map_size; i++) { + MsgPack::bin_t key_bin; + if (!u.deserialize(key_bin)) return; + Bytes identity_hash(key_bin.data(), key_bin.size()); + + // Each value is a 3-element array: [source, until, reason] + if (!u.isArray()) return; + const size_t arr_size = u.unpackArraySize(); + if (arr_size < 3) return; + MsgPack::bin_t src_bin; + if (!u.deserialize(src_bin)) return; + Bytes source(src_bin.data(), src_bin.size()); + double until = 0.0; + if (!u.deserialize(until)) return; + // Decode through MsgPack::str_t (aliased to std::string on native and + // Arduino's String on embedded) then normalise to std::string for storage. + MsgPack::str_t reason_str; + if (!u.deserialize(reason_str)) return; + std::string reason(reason_str.c_str(), reason_str.length()); + + if (until > 0.0 && now > until) { + continue; // expired, skip + } + _blackholed_identities.emplace(identity_hash, BlackholeEntry(source, until, reason)); + loaded++; + } + if (loaded > 0) { + NOTICEF("Loaded %zu blackholed identities from storage", loaded); + } + } + catch (const std::exception& e) { + ERRORF("Could not load blackholed identities: %s", e.what()); + } + + remove_blackholed_paths(); } +// Scans the path table and removes any destination whose associated identity +// (resolved via Identity::recall) is currently blackholed. Called after every +// blackhole_identity() and at startup after reload_blackhole(). +/*static*/ void Transport::remove_blackholed_paths() { + if (_blackholed_identities.empty()) return; + + std::vector drop_destinations; + try { + for (const auto& path : _new_path_table) { + Bytes destination_hash = path.key; + Identity associated = Identity::recall(destination_hash); + if (associated && is_blackholed(associated.hash())) { + drop_destinations.push_back(destination_hash); + } + } + } + catch (const std::exception& e) { + ERRORF("Error while enumerating blackhole-associated destinations: %s", e.what()); + } + + for (const Bytes& destination_hash : drop_destinations) { + try { + _new_path_table.remove(destination_hash); + } + catch (const std::exception& e) { + ERRORF("Error while dropping blackhole-associated destination from path table: %s", e.what()); + } + } + + if (!drop_destinations.empty()) { + INFOF("Removed %zu destinations associated with blackholed identities from path table", drop_destinations.size()); + } +} + +// Writes the local blackhole list (entries whose source is this transport's +// own identity) to {storagepath}/blackhole_local as a msgpack map of +// identity_hash -> [source, until, reason]. Entries originating from other +// sources are not re-persisted by us (they belong to their own source files +// in the multi-source design that is currently out of scope). +/*static*/ void Transport::persist_blackhole() { + try { + MsgPack::Packer p; + size_t local_count = 0; + for (const auto& [hash, entry] : _blackholed_identities) { + if (entry._source == _identity.hash()) local_count++; + } + p.packMapSize(local_count); + for (const auto& [hash, entry] : _blackholed_identities) { + if (entry._source != _identity.hash()) continue; + p.packBinary(hash.data(), hash.size()); + p.packArraySize(3); + p.packBinary(entry._source.data(), entry._source.size()); + p.packFloat64(entry._until); + p.pack(entry._reason.c_str(), entry._reason.size()); + } + + Bytes data(p.data(), p.size()); + std::string path = std::string(Reticulum::storagepath()) + "/blackhole_local"; + size_t written = OS::write_file(path.c_str(), data); + if (written != data.size()) { + WARNINGF("Short write while persisting blackhole list (%zu of %zu bytes)", written, data.size()); + } + } + catch (const std::exception& e) { + ERRORF("Error while persisting blackhole list: %s", e.what()); + } +} + +// Request handler for the /list endpoint on the blackhole publishing +// destination. Returns the current blackhole list as a msgpack map matching +// Python's serialization so cross-stack clients can ingest it. +/*static*/ Bytes Transport::blackhole_list_handler(const Bytes& path, const Bytes& data, const Bytes& request_id, const Bytes& link_id, const Identity& remote_identity, double requested_at) { + try { + // Wire format must match Python's Transport.blackholed_identities: + // {identity_hash: {"source": , "until": , "reason": }}. + // The local persistence file uses a compact positional 3-array + // instead -- this handler is the only Python-interop surface. + MsgPack::Packer p; + p.packMapSize(_blackholed_identities.size()); + for (const auto& [hash, entry] : _blackholed_identities) { + p.packBinary(hash.data(), hash.size()); + p.packMapSize(3); + p.pack("source"); + p.packBinary(entry._source.data(), entry._source.size()); + p.pack("until"); + p.packFloat64(entry._until); + p.pack("reason"); + p.pack(entry._reason.c_str(), entry._reason.size()); + } + return Bytes(p.data(), p.size()); + } + catch (const std::exception& e) { + ERRORF("Error while processing blackhole list request: %s", e.what()); + return {}; + } +} + + //#define CUSTOM 1 /*static*/ bool Transport::read_path_table() { DEBUG("Transport::read_path_table"); -#if defined(RNS_USE_FS) && defined(RNS_PERSIST_PATHS) +#if defined(RNS_USE_FS) && RNS_PERSIST_PATHS char destination_table_path[Type::Reticulum::FILEPATH_MAXSIZE]; snprintf(destination_table_path, Type::Reticulum::FILEPATH_MAXSIZE, "%s/destination_table", Reticulum::_storagepath); if (!_owner.is_connected_to_shared_instance() && OS::file_exists(destination_table_path)) { @@ -4252,7 +4852,7 @@ TRACEF("Transport::read_path_table: buffer size %d bytes", Persistence::_buffer. } bool success = false; -#if defined(RNS_USE_FS) && defined(RNS_PERSIST_PATHS) +#if defined(RNS_USE_FS) && RNS_PERSIST_PATHS if (_saving_path_table) { double wait_interval = 0.2; double wait_timeout = 5; @@ -4407,7 +5007,7 @@ TRACEF("Transport::write_path_table: buffer size %lu bytes", Persistence::_buffe /*static*/ void Transport::read_tunnel_table() { DEBUG("Transport::read_tunnel_table"); -#if defined(RNS_USE_FS) && defined(RNS_PERSIST_PATHS) +#if defined(RNS_USE_FS) && RNS_PERSIST_PATHS // TODO /*p tunnel_table_path = RNS.Reticulum.storagepath+"/tunnels" @@ -4463,7 +5063,7 @@ TRACEF("Transport::write_path_table: buffer size %lu bytes", Persistence::_buffe } /*static*/ void Transport::write_tunnel_table() { -#if defined(RNS_USE_FS) && defined(RNS_PERSIST_PATHS) +#if defined(RNS_USE_FS) && RNS_PERSIST_PATHS // TODO /*p if not Transport.owner.is_connected_to_shared_instance: @@ -4540,7 +5140,6 @@ TRACEF("Transport::write_path_table: buffer size %lu bytes", Persistence::_buffe /*static*/ void Transport::persist_data() { TRACE("Transport::persist_data()"); - write_packet_hashlist(); write_path_table(); write_tunnel_table(); } @@ -4555,7 +5154,7 @@ TRACEF("Transport::write_path_table: buffer size %lu bytes", Persistence::_buffe cleaning_caches = true; TRACE("Transport::clean_caches()"); -#if defined(RNS_USE_FS) && defined(RNS_PERSIST_PATHS) +#if defined(RNS_USE_FS) && RNS_PERSIST_PATHS // CBA Remove cached packets no longer in path list std::list remove_list; OS::list_directory(Reticulum::_cachepath, [&remove_list](const char* file_name) { @@ -4588,7 +5187,7 @@ TRACEF("Transport::write_path_table: buffer size %lu bytes", Persistence::_buffe /*static*/ void Transport::clear_storage() { TRACE("Transport::clear_storage()"); -#if defined(RNS_USE_FS) && defined(RNS_PERSIST_PATHS) +#if defined(RNS_USE_FS) && RNS_PERSIST_PATHS try { char file_path[Type::Reticulum::FILEPATH_MAXSIZE]; @@ -4597,18 +5196,14 @@ TRACEF("Transport::write_path_table: buffer size %lu bytes", Persistence::_buffe OS::remove_file(file_path); } - snprintf(file_path, Type::Reticulum::FILEPATH_MAXSIZE, "%s/packet_hashlist", Reticulum::_storagepath); - if (OS::file_exists(file_path)) { - OS::remove_file(file_path); - } - snprintf(file_path, Type::Reticulum::FILEPATH_MAXSIZE, "%s/tunnels", Reticulum::_storagepath); if (OS::file_exists(file_path)) { OS::remove_file(file_path); } - // Clear the microStore-backed path store (removes its segment files on disk) + // Clear the microStore-backed stores (removes their segment files on disk) _path_store.clear(); + _packet_hashlist.clear(); // Remove cached announce packets if (OS::directory_exists(Reticulum::_cachepath)) { @@ -4697,7 +5292,7 @@ TRACEF("Transport::write_path_table: buffer size %lu bytes", Persistence::_buffe destination_path_responses += destination.path_responses().size(); } uint32_t interface_announces = 0; - for (auto& [interface_hash, interface] : _interfaces) { + for (auto& interface : _interfaces) { interface_announces += interface.announce_queue().size(); } VERBOSEF("phl: %u rcp: %u lt: %u pl: %u al: %u tun: %u", _packet_hashlist.size(), _receipts.size(), _link_table.size(), _pending_links.size(), _active_links.size(), _tunnels.size()); @@ -4732,6 +5327,7 @@ TRACEF("Transport::write_path_table: buffer size %lu bytes", Persistence::_buffe if (!_owner.is_connected_to_shared_instance()) { persist_data(); } + detach_interfaces(); } /*p @@ -4897,7 +5493,7 @@ TRACEF("Transport::write_path_table: buffer size %lu bytes", Persistence::_buffe uint16_t count = 0; for (const auto& [timestamp, destination_hash] : sorted_keys) { TRACEF("Transport::cull_path_table: Removing destination %s from path table", destination_hash.toHex().c_str()); -#if defined(RNS_USE_FS) && defined(RNS_PERSIST_PATHS) +#if defined(RNS_USE_FS) && RNS_PERSIST_PATHS // CBA microStore //auto& destination_entry = get_path(destination_hash); DestinationEntry destination_entry; diff --git a/src/microReticulum/Transport.h b/src/microReticulum/Transport.h index 0de3c19..ded9a2d 100644 --- a/src/microReticulum/Transport.h +++ b/src/microReticulum/Transport.h @@ -21,10 +21,17 @@ #include "Utilities/GenerationalSet.h" #include "Persistence/DestinationEntry.h" +#if defined(RNS_USE_FS) && RNS_PERSIST_HASHLIST +#include +#else +#include +#endif + #include #include #include #include +#include #include #include #include @@ -73,9 +80,14 @@ namespace RNS { public: - using InterfaceTable = std::map; + using InterfaceTable = std::vector; using DestinationTable = std::map; using BytesList = RNS::Utilities::GenerationalSet; +#if defined(RNS_USE_FS) && RNS_PERSIST_HASHLIST + using HashlistStore = microStore::BasicFileStore>; +#else + using HashlistStore = microStore::BasicHeapStore>; +#endif class Callbacks { public: @@ -212,6 +224,29 @@ namespace RNS { const Interface _requesting_interface = {Type::NONE}; }; using PathRequestTable = std::map; + using PathStateTable = std::map; + + class PendingDiscoveryPREntry { + public: + PendingDiscoveryPREntry() : _blocked_interface({Type::NONE}) {} + PendingDiscoveryPREntry(const Bytes& destination_hash, const Interface& blocked_interface) : + _destination_hash(destination_hash), + _blocked_interface(blocked_interface) {} + const Bytes _destination_hash; + const Interface _blocked_interface; // {Type::NONE} = no specific interface to avoid + }; + using PendingDiscoveryPRs = std::deque; + + class BlackholeEntry { + public: + BlackholeEntry() = default; + BlackholeEntry(const Bytes& source, double until, const std::string& reason) : + _source(source), _until(until), _reason(reason) {} + Bytes _source; // identity hash of who blackholed this identity + double _until = 0.0; // 0.0 = permanent; otherwise unix timestamp expiry + std::string _reason; // optional human-readable reason + }; + using BlackholeTable = std::map; /* // CBA TODO Analyze safety of using Inrerface references here @@ -315,6 +350,23 @@ namespace RNS { static double first_hop_timeout(const Bytes& destination_hash); static double extra_link_proof_timeout(const Interface& interface); static bool expire_path(const Bytes& destination_hash); + static bool mark_path_unresponsive(const Bytes& destination_hash); + static bool mark_path_responsive(const Bytes& destination_hash); + static bool mark_path_unknown_state(const Bytes& destination_hash); + static bool path_is_unresponsive(const Bytes& destination_hash); + static void handle_disovery_path_requests(); // typo preserved to match Python reference + static void count_traffic(); //p count_traffic_loop() in Python; called once per tick in C++'s single-loop model + static void prioritize_interfaces(); // Sorts _interfaces in place by bitrate descending + static uint64_t timebase_from_random_blob(const Bytes& random_blob); + static uint64_t timebase_from_random_blobs(const std::vector& random_blobs); + + static bool blackhole_identity(const Bytes& identity_hash, double until = 0.0, const std::string& reason = ""); + static bool unblackhole_identity(const Bytes& identity_hash); + static bool is_blackholed(const Bytes& identity_hash); + static void reload_blackhole(); + static void remove_blackholed_paths(); + static void persist_blackhole(); + static Bytes blackhole_list_handler(const Bytes& path, const Bytes& data, const Bytes& request_id, const Bytes& link_id, const Identity& remote_identity, double requested_at); //static void request_path(const Bytes& destination_hash, const Interface& on_interface = {Type::NONE}, const Bytes& tag = {}, bool recursive = false); static void request_path(const Bytes& destination_hash, const Interface& on_interface, const Bytes& tag = {}, bool recursive = false); static void request_path(const Bytes& destination_hash); @@ -333,7 +385,6 @@ namespace RNS { static void shared_connection_reappeared(); static void drop_announce_queues(); static uint64_t announce_emitted(const Packet& packet); - static void write_packet_hashlist(); static bool read_path_table(); static bool write_path_table(); static void read_tunnel_table(); @@ -368,6 +419,11 @@ namespace RNS { static inline const Reticulum& reticulum() { return _owner; } static inline const Identity& identity() { return _identity; } static inline void identity(Identity& identity) { _identity = identity; } + static inline const Identity& network_identity() { return _network_identity; } + static inline void network_identity(Identity& identity) { + if (!_network_identity) { _network_identity = identity; } + } + static inline bool has_network_identity() { return (bool)_network_identity; } inline static uint16_t path_table_maxsize() { return _path_table_maxsize; } inline static void path_table_maxsize(uint16_t path_table_maxsize) { _path_table_maxsize = path_table_maxsize; _path_store.set_max_recs(_path_table_maxsize); } inline static uint16_t announce_table_maxsize() { return _announce_table_maxsize; } @@ -375,8 +431,12 @@ namespace RNS { inline static uint16_t hashlist_maxsize() { return _hashlist_maxsize; } inline static void hashlist_maxsize(uint16_t hashlist_maxsize) { _hashlist_maxsize = hashlist_maxsize; - _packet_hashlist.max_size(hashlist_maxsize); + _packet_hashlist.set_max_recs(hashlist_maxsize); } + inline static uint32_t hashlist_segment_size() { return _hashlist_segment_size; } + inline static void hashlist_segment_size(uint32_t value) { _hashlist_segment_size = value; } + inline static uint8_t hashlist_segment_count() { return _hashlist_segment_count; } + inline static void hashlist_segment_count(uint8_t value) { _hashlist_segment_count = value; } inline static uint16_t max_pr_tags() { return _max_pr_tags; } inline static void max_pr_tags(uint16_t max_pr_tags) { _max_pr_tags = max_pr_tags; @@ -412,16 +472,23 @@ namespace RNS { inline static const ReverseTable& reverse_table() { return _reverse_table; } inline static const std::map& path_requests() { return _path_requests; } inline static const PathRequestTable& discovery_path_requests() { return _discovery_path_requests; } + inline static const PathStateTable& path_states() { return _path_states; } + inline static const PendingDiscoveryPRs& pending_discovery_prs() { return _pending_discovery_prs; } + inline static const BlackholeTable& blackholed_identities() { return _blackholed_identities; } inline static const std::map& pending_local_path_requests() { return _pending_local_path_requests; } inline static const BytesList& discovery_pr_tags() { return _discovery_pr_tags; } inline static const std::set& control_destinations() { return _control_destinations; } inline static const std::set& control_hashes() { return _control_hashes; } - inline static const BytesList& packet_hashlist() { return _packet_hashlist; } + inline static const HashlistStore& packet_hashlist() { return _packet_hashlist; } inline static const std::list& receipts() { return _receipts; } inline static const TunnelTable& tunnels() { return _tunnels; } inline static uint32_t packets_sent() { return _packets_sent; } inline static uint32_t packets_received() { return _packets_received; } + inline static uint64_t traffic_rxb() { return _traffic_rxb; } + inline static uint64_t traffic_txb() { return _traffic_txb; } + inline static double speed_rx() { return _speed_rx; } + inline static double speed_tx() { return _speed_tx; } inline static uint32_t announces_received() { return _announces_received; } inline static uint32_t path_requests_received() { return _path_requests_received; } inline static uint32_t paths_added() { return _paths_added; } @@ -436,7 +503,7 @@ namespace RNS { // CBA TODO: Reconsider using std::set for enforcing uniqueness. Maybe consider std::map keyed on hash instead static std::set _pending_links; // Links that are being established static std::set _active_links; // Links that are active - static BytesList _packet_hashlist; // A list of packet hashes for duplicate detection + static HashlistStore _packet_hashlist; // Set of packet hashes for duplicate detection static std::list _receipts; // Receipts of all outgoing packets for proof processing static AnnounceTable _announce_table; // A table for storing announces currently waiting to be retransmitted @@ -451,6 +518,10 @@ namespace RNS { static PathRequestTable _discovery_path_requests; // A table for keeping track of path requests on behalf of other nodes static BytesList _discovery_pr_tags; // A table for keeping track of tagged path requests + static PathStateTable _path_states; // A table for keeping track of path states (UNKNOWN/UNRESPONSIVE/RESPONSIVE) + static PendingDiscoveryPRs _pending_discovery_prs; // A bounded queue of discovery path requests pending throttled transmission + static double _pending_discovery_prs_last_tx; // Timestamp of last discovery path request transmission + static BlackholeTable _blackholed_identities; // Identity hashes blocked from path-table population // Transport control destinations are used // for control purposes like path requests @@ -485,8 +556,16 @@ namespace RNS { static float _receipts_check_interval; static double _announces_last_checked; static float _announces_check_interval; + static double _pending_prs_last_checked; + static float _pending_prs_check_interval; static double _tables_last_culled; static float _tables_cull_interval; + static double _traffic_last_checked; + static float _traffic_check_interval; + static double _interface_last_jobs; + static float _interface_jobs_interval; + static double _blackhole_last_checked; + static float _blackhole_check_interval; static double _last_mgmt_announce; static float _mgmt_announce_interval; static bool _saving_path_table; @@ -518,6 +597,10 @@ namespace RNS { // CBA Stats static uint32_t _packets_sent; static uint32_t _packets_received; + static uint64_t _traffic_rxb; // Cumulative bytes received since startup (sums per-tick interface deltas) + static uint64_t _traffic_txb; // Cumulative bytes transmitted since startup + static double _speed_rx; // Current receive rate in bits/sec across all non-child interfaces + static double _speed_tx; // Current transmit rate in bits/sec across all non-child interfaces static uint32_t _announces_received; static uint32_t _path_requests_received; static uint32_t _paths_added; @@ -535,6 +618,9 @@ namespace RNS { static uint8_t _path_store_segment_count; static PathStore _path_store; static NewPathTable _new_path_table; + + static uint32_t _hashlist_segment_size; + static uint8_t _hashlist_segment_count; }; template diff --git a/src/microReticulum/Type.h b/src/microReticulum/Type.h index ea4968e..d8776b2 100644 --- a/src/microReticulum/Type.h +++ b/src/microReticulum/Type.h @@ -55,6 +55,25 @@ #define RNS_RANDOM_BLOBS_MAX 32 #endif +#ifndef RNS_QUEUED_DISCOVERY_PRS_MAX +#define RNS_QUEUED_DISCOVERY_PRS_MAX 32 +#endif + +// RNS_PERSIST_PATHS enabled by default +#ifndef RNS_PERSIST_PATHS +#define RNS_PERSIST_PATHS 1 +#endif + +// RNS_PERSIST_KNOWN_DESTINATIONS enabled by default +#ifndef RNS_PERSIST_KNOWN_DESTINATIONS +#define RNS_PERSIST_KNOWN_DESTINATIONS 1 +#endif + +// RNS_PERSIST_HASHLIST enabled by default +#ifndef RNS_PERSIST_HASHLIST +#define RNS_PERSIST_HASHLIST 1 +#endif + namespace RNS { namespace Type { @@ -468,9 +487,14 @@ namespace RNS { namespace Type { static const uint8_t LOCAL_REBROADCASTS_MAX = 2; // How many local rebroadcasts of an announce is allowed static const uint8_t PATH_REQUEST_TIMEOUT = 15; // Default timuout for client path requests in seconds + static const uint8_t PATH_REQUEST_GATE_TIMEOUT = 120; // Default timeout for client path request gate control in seconds static constexpr const float PATH_REQUEST_GRACE = 0.4; // Grace time before a path announcement is made, allows directly reachable peers to respond first + static constexpr const float PATH_REQUEST_RG = 1.5; // Extra grace time on roaming-mode interfaces, gives better-connected peers a chance to answer first static const uint8_t PATH_REQUEST_RW = 2; // Path request random window - static const uint8_t PATH_REQUEST_MI = 5; // Minimum interval in seconds for automated path requests + static const uint8_t PATH_REQUEST_MI = 20; // Minimum interval in seconds for automated path requests + + static const uint8_t MAX_QUEUED_DISCOVERY_PRS = RNS_QUEUED_DISCOVERY_PRS_MAX; // Max amount of queued discovery path requests + static constexpr const float DISCOVERY_PR_TX_THROTTLE = 0.5; // Min interval in seconds between throttled discovery PR transmissions static constexpr const float LINK_TIMEOUT = Link::STALE_TIME * 1.25; static const uint16_t REVERSE_TIMEOUT = 8*60; // Reverse table entries are removed after 8 minutes @@ -480,15 +504,10 @@ namespace RNS { namespace Type { static const uint8_t PERSIST_RANDOM_BLOBS = RNS_RANDOM_BLOBS_PERSIST_MAX; // Maximum number of random blobs per destination to persist to disk static const uint8_t MAX_RANDOM_BLOBS = RNS_RANDOM_BLOBS_MAX; // Maximum number of random blobs per destination to keep in memory - // CBA MCU - //static const uint32_t DESTINATION_TIMEOUT = 60*60*24*7; // Destination table entries are removed if unused for one week - //static const uint32_t PATHFINDER_E = 60*60*24*7; // Path expiration of one week - //static const uint32_t AP_PATH_TIME = 60*60*24; // Path expiration of one day for Access Point paths - //static const uint32_t ROAMING_PATH_TIME = 60*60*6; // Path expiration of 6 hours for Roaming paths - static const uint32_t DESTINATION_TIMEOUT = 60*60*24*1; // Destination table entries are removed if unused for one day - static const uint32_t PATHFINDER_E = 60*60*24*1; // Path expiration of one day - static const uint32_t AP_PATH_TIME = 60*60*6; // Path expiration of 6 hours for Access Point paths - static const uint32_t ROAMING_PATH_TIME = 60*60*1; // Path expiration of 1 hour for Roaming paths + static const uint32_t DESTINATION_TIMEOUT = 60*60*24*7; // Destination table entries are removed if unused for one week + static const uint32_t PATHFINDER_E = 60*60*24*7; // Path expiration of one week + static const uint32_t AP_PATH_TIME = 60*60*24; // Path expiration of one day for Access Point paths + static const uint32_t ROAMING_PATH_TIME = 60*60*6; // Path expiration of 6 hours for Roaming paths static const uint16_t LOCAL_CLIENT_CACHE_MAXSIZE = 512; } diff --git a/src/microReticulum/Utilities/Persistence.h b/src/microReticulum/Utilities/Persistence.h index 926e6b3..7b09bcf 100644 --- a/src/microReticulum/Utilities/Persistence.h +++ b/src/microReticulum/Utilities/Persistence.h @@ -338,7 +338,7 @@ namespace ArduinoJson { dst._received_from = src["received_from"]; dst._hops = src["announce_hops"]; dst._expires = src["expires"]; - dst._random_blobs = src["random_blobs"].as>(); + dst._random_blobs = src["random_blobs"].as>(); RNS::Bytes interface_hash = src["interface_hash"]; if (interface_hash) { @@ -359,7 +359,7 @@ namespace ArduinoJson { src["received_from"].as(), src["announce_hops"].as(), src["expires"].as(), - src["random_blobs"].as>(), + src["random_blobs"].as>(), src["receiving_interface"].as(), src["packet"].as() ); @@ -448,7 +448,7 @@ namespace RNS { dst._received_from = src["received_from"]; dst._hops = src["announce_hops"]; dst._expires = src["expires"]; - dst._random_blobs = src["random_blobs"].as>(); + dst._random_blobs = src["random_blobs"].as>(); RNS::Bytes interface_hash = src["interface_hash"]; if (interface_hash) { @@ -469,7 +469,7 @@ namespace RNS { src["received_from"].as(), src["announce_hops"].as(), src["expires"].as(), - src["random_blobs"].as>(), + src["random_blobs"].as>(), src["receiving_interface"].as(), src["packet"].as() ); diff --git a/test/test_persistence/test_persistence.cpp b/test/test_persistence/test_persistence.cpp index a9bd9bc..1963231 100644 --- a/test/test_persistence/test_persistence.cpp +++ b/test/test_persistence/test_persistence.cpp @@ -506,13 +506,13 @@ void testSerializeDestinationTable() { RNS::Interface test_interface(new TestInterface()); //RNS::Packet packet({RNS::Type::NONE}); static std::map map; - //DestinationEntry(double time, const Bytes& received_from, uint8_t announce_hops, double expires, const std::set& random_blobs, Interface& receiving_interface, const Packet& packet) : + //DestinationEntry(double time, const Bytes& received_from, uint8_t announce_hops, double expires, const std::vector& random_blobs, Interface& receiving_interface, const Packet& packet) : //RNS::Persistence::DestinationEntry entry_one(1.0, empty, 1, 0.0, blobs, interface, packet); RNS::Bytes received; received.assignHex("deadbeef"); RNS::Bytes blob; blob.assignHex("b10bb10b"); - std::set blobs({received, blob}); + std::vector blobs({received, blob}); RNS::Persistence::DestinationEntry entry_one; entry_one._timestamp = 1.0; entry_one._received_from = received; @@ -1056,7 +1056,7 @@ void test_codec_destination_entry() { RNS::Bytes blob; blob.assignHex("b10bb10b"); - std::set blobs({received, blob}); + std::vector blobs({received, blob}); microStore::Codec codec; diff --git a/test/test_rns_persistence/test_rns_persistence.cpp b/test/test_rns_persistence/test_rns_persistence.cpp index a366c28..116adac 100644 --- a/test/test_rns_persistence/test_rns_persistence.cpp +++ b/test/test_rns_persistence/test_rns_persistence.cpp @@ -51,7 +51,7 @@ void testSerializeDestinationTable() { received.assignHex("deadbeef"); RNS::Bytes blob; blob.assignHex("b10bb10b"); - std::set blobs({received, blob}); + std::vector blobs({received, blob}); RNS::Persistence::DestinationEntry entry_one; entry_one._timestamp = 1.0; entry_one._received_from = received; diff --git a/test/test_transport/test_transport.cpp b/test/test_transport/test_transport.cpp index 3b55d1f..c1cdfa2 100644 --- a/test/test_transport/test_transport.cpp +++ b/test/test_transport/test_transport.cpp @@ -438,6 +438,31 @@ void test_incoming_announce_limit() { printf("test_incoming_announce_limit: END\n"); } +void test_prioritize_interfaces() { + + printf("test_prioritize_interfaces: BEGIN\n"); + + initRNS(); + + // Set distinct bitrates so the sort has something to order by. + in_interface.bitrate(1000); + out_interface.bitrate(9600); + + RNS::Transport::prioritize_interfaces(); + + const auto& ifaces = RNS::Transport::get_interfaces(); + TEST_ASSERT_GREATER_OR_EQUAL_UINT32(2, ifaces.size()); + // Highest bitrate must come first. + TEST_ASSERT_EQUAL_UINT32(9600, ifaces[0].bitrate()); + TEST_ASSERT_EQUAL_UINT32(1000, ifaces[1].bitrate()); + + // Reset for any downstream tests. + in_interface.bitrate(0); + out_interface.bitrate(0); + + printf("test_prioritize_interfaces: END\n"); +} + void test_incoming_announce_over_limit() { printf("test_incoming_announce_over_limit: BEGIN\n"); @@ -598,6 +623,7 @@ int runUnityTests(void) { //RUN_TEST(test_incoming_announce_limit); */ + RUN_TEST(test_prioritize_interfaces); RUN_TEST(test_incoming_announce_over_limit); //RUN_TEST(test_incoming_announce_stress); diff --git a/test_interop/link_interop_sender/platformio.ini b/test_interop/link_interop_sender/platformio.ini index 6d72cf9..de92c82 100644 --- a/test_interop/link_interop_sender/platformio.ini +++ b/test_interop/link_interop_sender/platformio.ini @@ -9,7 +9,6 @@ monitor_speed = 115200 build_type = debug build_flags = -DRNS_USE_FS - -DRNS_PERSIST_PATHS -DUSTORE_USE_UNIVERSALFS -DDEFAULT_UDP_LOCAL_HOST=\"127.0.0.1\" -DDEFAULT_UDP_REMOTE_HOST=\"127.0.0.1\" diff --git a/test_interop/packet_interop_sender/platformio.ini b/test_interop/packet_interop_sender/platformio.ini index 7df69c4..5a206a3 100644 --- a/test_interop/packet_interop_sender/platformio.ini +++ b/test_interop/packet_interop_sender/platformio.ini @@ -7,7 +7,6 @@ monitor_speed = 115200 build_type = debug build_flags = -DRNS_USE_FS - -DRNS_PERSIST_PATHS -DUSTORE_USE_UNIVERSALFS ; Point-to-point loopback on a dedicated port pair so multiple ; interop tests can in principle coexist without UDP collision. diff --git a/test_interop/request_interop_sender/platformio.ini b/test_interop/request_interop_sender/platformio.ini index d7f2a29..34d6f3e 100644 --- a/test_interop/request_interop_sender/platformio.ini +++ b/test_interop/request_interop_sender/platformio.ini @@ -7,7 +7,6 @@ monitor_speed = 115200 build_type = debug build_flags = -DRNS_USE_FS - -DRNS_PERSIST_PATHS -DUSTORE_USE_UNIVERSALFS -DDEFAULT_UDP_LOCAL_HOST=\"127.0.0.1\" -DDEFAULT_UDP_REMOTE_HOST=\"127.0.0.1\" diff --git a/test_interop/resource_interop_sender/platformio.ini b/test_interop/resource_interop_sender/platformio.ini index a2d318c..921ab15 100644 --- a/test_interop/resource_interop_sender/platformio.ini +++ b/test_interop/resource_interop_sender/platformio.ini @@ -8,7 +8,6 @@ monitor_speed = 115200 build_type = debug build_flags = -DRNS_USE_FS - -DRNS_PERSIST_PATHS -DUSTORE_USE_UNIVERSALFS ; Point-to-point loopback to the Python receiver: ; Python listens on 14242, sends to 127.0.0.1:14243