diff --git a/.github/workflows/audit.yml b/.github/workflows/audit.yml index d32c55cd5..e2ae378dd 100644 --- a/.github/workflows/audit.yml +++ b/.github/workflows/audit.yml @@ -19,8 +19,8 @@ jobs: runs-on: ${{ matrix.platform }} steps: - name: Checkout source code - uses: actions/checkout@v3 + uses: actions/checkout@v6 - name: Run security audit - uses: rustsec/audit-check@v1.4.1 + uses: rustsec/audit-check@v2 with: token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index f5cf79033..6d0056e9a 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -13,20 +13,20 @@ jobs: TOOLCHAIN: stable steps: - name: Checkout source code - uses: actions/checkout@v3 + uses: actions/checkout@v6 - name: Install Rust toolchain run: | curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --profile=minimal --default-toolchain stable rustup override set stable - name: Enable caching for bitcoind id: cache-bitcoind - uses: actions/cache@v4 + uses: actions/cache@v5 with: path: bin/bitcoind-${{ runner.os }}-${{ runner.arch }} key: bitcoind-29.0-${{ runner.os }}-${{ runner.arch }} - name: Enable caching for electrs id: cache-electrs - uses: actions/cache@v4 + uses: actions/cache@v5 with: path: bin/electrs-${{ runner.os }}-${{ runner.arch }} key: electrs-${{ runner.os }}-${{ runner.arch }} diff --git a/.github/workflows/cln-integration.yml b/.github/workflows/cln-integration.yml index 2becf086a..5bdcb75bb 100644 --- a/.github/workflows/cln-integration.yml +++ b/.github/workflows/cln-integration.yml @@ -11,7 +11,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout repository - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Install dependencies run: | diff --git a/.github/workflows/cron-weekly-rustfmt.yml b/.github/workflows/cron-weekly-rustfmt.yml index d6326f03b..9e54ab9f3 100644 --- a/.github/workflows/cron-weekly-rustfmt.yml +++ b/.github/workflows/cron-weekly-rustfmt.yml @@ -13,7 +13,7 @@ jobs: name: Nightly rustfmt runs-on: ubuntu-24.04 steps: - - uses: actions/checkout@v5 + - uses: actions/checkout@v6 - uses: dtolnay/rust-toolchain@nightly with: components: rustfmt @@ -23,7 +23,7 @@ jobs: - name: Get the current date run: echo "date=$(date +'%Y-%m-%d')" >> $GITHUB_ENV - name: Create Pull Request - uses: peter-evans/create-pull-request@v7 + uses: peter-evans/create-pull-request@v8 with: author: Fmt Bot title: Automated nightly rustfmt (${{ env.date }}) diff --git a/.github/workflows/kotlin.yml b/.github/workflows/kotlin.yml index 01a840d60..627051c31 100644 --- a/.github/workflows/kotlin.yml +++ b/.github/workflows/kotlin.yml @@ -16,10 +16,10 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Set up JDK - uses: actions/setup-java@v3 + uses: actions/setup-java@v5 with: distribution: temurin java-version: 11 diff --git a/.github/workflows/lnd-integration.yml b/.github/workflows/lnd-integration.yml index 6f71f19d7..47ed7c311 100644 --- a/.github/workflows/lnd-integration.yml +++ b/.github/workflows/lnd-integration.yml @@ -11,7 +11,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout repository - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Check and install CMake if needed # lnd_grpc_rust (via prost-build v0.10.4) requires CMake >= 3.5 but is incompatible with CMake >= 4.0. diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 802f7c3d4..4576bf550 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -15,7 +15,7 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Install uv uses: astral-sh/setup-uv@v7 diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 18589e612..dfa952b2b 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -34,7 +34,7 @@ jobs: runs-on: ${{ matrix.platform }} steps: - name: Checkout source code - uses: actions/checkout@v3 + uses: actions/checkout@v6 - name: Install Rust ${{ matrix.toolchain }} toolchain run: | curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --profile=minimal --default-toolchain ${{ matrix.toolchain }} @@ -50,13 +50,13 @@ jobs: run: echo "RUSTFLAGS=-D warnings" >> "$GITHUB_ENV" - name: Enable caching for bitcoind id: cache-bitcoind - uses: actions/cache@v4 + uses: actions/cache@v5 with: path: bin/bitcoind-${{ runner.os }}-${{ runner.arch }} key: bitcoind-29.0-${{ runner.os }}-${{ runner.arch }} - name: Enable caching for electrs id: cache-electrs - uses: actions/cache@v4 + uses: actions/cache@v5 with: path: bin/electrs-${{ runner.os }}-${{ runner.arch }} key: electrs-${{ runner.os }}-${{ runner.arch }} diff --git a/.github/workflows/semver.yml b/.github/workflows/semver.yml index 2a3b14ef8..0fdfbe213 100644 --- a/.github/workflows/semver.yml +++ b/.github/workflows/semver.yml @@ -6,6 +6,6 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout source code - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Check SemVer uses: obi1kenobi/cargo-semver-checks-action@v2 diff --git a/.github/workflows/swift.yml b/.github/workflows/swift.yml index 3410d09aa..c1e385e2d 100644 --- a/.github/workflows/swift.yml +++ b/.github/workflows/swift.yml @@ -12,7 +12,7 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Set default Rust version to stable run: rustup default stable diff --git a/.github/workflows/vss-integration.yml b/.github/workflows/vss-integration.yml index b5c4e9a0b..959175162 100644 --- a/.github/workflows/vss-integration.yml +++ b/.github/workflows/vss-integration.yml @@ -27,11 +27,11 @@ jobs: steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v6 with: path: ldk-node - name: Checkout VSS - uses: actions/checkout@v3 + uses: actions/checkout@v6 with: repository: lightningdevkit/vss-server path: vss-server diff --git a/.github/workflows/vss-no-auth-integration.yml b/.github/workflows/vss-no-auth-integration.yml index 8a5408092..950ff3e5f 100644 --- a/.github/workflows/vss-no-auth-integration.yml +++ b/.github/workflows/vss-no-auth-integration.yml @@ -27,11 +27,11 @@ jobs: steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v6 with: path: ldk-node - name: Checkout VSS - uses: actions/checkout@v3 + uses: actions/checkout@v6 with: repository: lightningdevkit/vss-server path: vss-server diff --git a/Cargo.toml b/Cargo.toml index a7daf5438..d9afeb9a7 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,17 +39,17 @@ default = [] #lightning-liquidity = { version = "0.2.0", features = ["std"] } #lightning-macros = { version = "0.2.0" } -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7", features = ["std"] } -lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7" } -lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7", features = ["std"] } -lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7" } -lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7", features = ["tokio"] } -lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7" } -lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7" } -lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7", features = ["rest-client", "rpc-client", "tokio"] } -lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } -lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7", features = ["std"] } -lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7" } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["std"] } +lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6" } +lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["std"] } +lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6" } +lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["tokio"] } +lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6" } +lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6" } +lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["rest-client", "rpc-client", "tokio"] } +lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } +lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["std"] } +lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6" } bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] } bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]} @@ -79,13 +79,13 @@ async-trait = { version = "0.1", default-features = false } vss-client = { package = "vss-client-ng", version = "0.5" } prost = { version = "0.11.6", default-features = false} #bitcoin-payment-instructions = { version = "0.6" } -bitcoin-payment-instructions = { git = "https://github.com/joostjager/bitcoin-payment-instructions", branch = "ldk-dcf0c203e166da2348bef12b2e5eff4a250cdec7" } +bitcoin-payment-instructions = { git = "https://github.com/jkczyz/bitcoin-payment-instructions", rev = "a7b32d5fded9bb45f73bf82e6d7187adf705171c" } [target.'cfg(windows)'.dependencies] winapi = { version = "0.3", features = ["winbase"] } [dev-dependencies] -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "dcf0c203e166da2348bef12b2e5eff4a250cdec7", features = ["std", "_test_utils"] } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "38a62c32454d3eac22578144c479dbf9a6d9bff6", features = ["std", "_test_utils"] } rand = { version = "0.9.2", default-features = false, features = ["std", "thread_rng", "os_rng"] } proptest = "1.0.0" regex = "1.5.6" diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 014993690..c1b97e0e7 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -113,6 +113,10 @@ interface Node { [Throws=NodeError] UserChannelId open_announced_channel_with_all(PublicKey node_id, SocketAddress address, u64? push_to_counterparty_msat, ChannelConfig? channel_config); [Throws=NodeError] + UserChannelId open_0reserve_channel(PublicKey node_id, SocketAddress address, u64 channel_amount_sats, u64? push_to_counterparty_msat, ChannelConfig? channel_config); + [Throws=NodeError] + UserChannelId open_0reserve_channel_with_all(PublicKey node_id, SocketAddress address, u64? push_to_counterparty_msat, ChannelConfig? channel_config); + [Throws=NodeError] void splice_in([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, u64 splice_amount_sats); [Throws=NodeError] void splice_in_with_all([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); diff --git a/src/builder.rs b/src/builder.rs index cd8cc184f..228ea8919 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -54,6 +54,7 @@ use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; +use crate::io::tier_store::TierStore; use crate::io::utils::{ read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments, @@ -151,6 +152,21 @@ impl std::fmt::Debug for LogWriterConfig { } } +#[derive(Default)] +struct TierStoreConfig { + ephemeral: Option>, + backup: Option>, +} + +impl std::fmt::Debug for TierStoreConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TierStoreConfig") + .field("ephemeral", &self.ephemeral.as_ref().map(|_| "Arc")) + .field("backup", &self.backup.as_ref().map(|_| "Arc")) + .finish() + } +} + /// An error encountered during building a [`Node`]. /// /// [`Node`]: crate::Node @@ -278,6 +294,7 @@ pub struct NodeBuilder { liquidity_source_config: Option, log_writer_config: Option, async_payments_role: Option, + tier_store_config: Option, runtime_handle: Option, pathfinding_scores_sync_config: Option, recovery_mode: bool, @@ -296,6 +313,7 @@ impl NodeBuilder { let gossip_source_config = None; let liquidity_source_config = None; let log_writer_config = None; + let tier_store_config = None; let runtime_handle = None; let pathfinding_scores_sync_config = None; let recovery_mode = false; @@ -305,6 +323,7 @@ impl NodeBuilder { gossip_source_config, liquidity_source_config, log_writer_config, + tier_store_config, runtime_handle, async_payments_role: None, pathfinding_scores_sync_config, @@ -614,6 +633,36 @@ impl NodeBuilder { self } + /// Configures the backup store for local disaster recovery. + /// + /// When building with tiered storage, this store receives a second durable + /// copy of data written to the primary store. + /// + /// Writes and removals for primary-backed data only succeed once both the + /// primary and backup stores complete successfully. + /// + /// If not set, durable data will be stored only in the primary store. + #[allow(dead_code)] // Used by subsequent FFI/test integration commits. + pub fn set_backup_store(&mut self, backup_store: Arc) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.backup = Some(backup_store); + self + } + + /// Configures the ephemeral store for non-critical, frequently-accessed data. + /// + /// When building with tiered storage, this store is used for ephemeral data like + /// the network graph and scorer data to reduce latency for reads. Data stored here + /// can be rebuilt if lost. + /// + /// If not set, non-critical data will be stored in the primary store. + #[allow(dead_code)] // Used by subsequent FFI/test integration commits. + pub fn set_ephemeral_store(&mut self, ephemeral_store: Arc) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.ephemeral = Some(ephemeral_store); + self + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: NodeEntropy) -> Result { @@ -762,8 +811,23 @@ impl NodeBuilder { } /// Builds a [`Node`] instance according to the options previously configured. + /// + /// The provided `kv_store` will be used as the primary storage backend. Optionally, + /// an ephemeral store for frequently-accessed non-critical data (e.g., network graph, scorer) + /// and a backup store for local disaster recovery can be configured via + /// [`set_ephemeral_store`] and [`set_backup_store`]. + /// + /// [`set_ephemeral_store`]: Self::set_ephemeral_store + /// [`set_backup_store`]: Self::set_backup_store pub fn build_with_store( &self, node_entropy: NodeEntropy, kv_store: S, + ) -> Result { + let primary_store: Arc = Arc::new(DynStoreWrapper(kv_store)); + self.build_with_dynstore(node_entropy, primary_store) + } + + fn build_with_dynstore( + &self, node_entropy: NodeEntropy, primary_store: Arc, ) -> Result { let logger = setup_logger(&self.log_writer_config, &self.config)?; @@ -776,6 +840,13 @@ impl NodeBuilder { })?) }; + let ts_config = self.tier_store_config.as_ref(); + let mut tier_store = TierStore::new(primary_store, Arc::clone(&logger)); + if let Some(config) = ts_config { + config.ephemeral.as_ref().map(|s| tier_store.set_ephemeral_store(Arc::clone(s))); + config.backup.as_ref().map(|s| tier_store.set_backup_store(Arc::clone(s))); + } + let seed_bytes = node_entropy.to_seed_bytes(); let config = Arc::new(self.config.clone()); @@ -790,7 +861,7 @@ impl NodeBuilder { seed_bytes, runtime, logger, - Arc::new(DynStoreWrapper(kv_store)), + Arc::new(DynStoreWrapper(tier_store)), ) } } diff --git a/src/event.rs b/src/event.rs index f06d701bc..ebaf89dac 100644 --- a/src/event.rs +++ b/src/event.rs @@ -22,7 +22,7 @@ use lightning::events::{ ReplayEvent, }; use lightning::impl_writeable_tlv_based_enum; -use lightning::ln::channelmanager::PaymentId; +use lightning::ln::channelmanager::{PaymentId, TrustedChannelFeatures}; use lightning::ln::types::ChannelId; use lightning::routing::gossip::NodeId; use lightning::sign::EntropySource; @@ -1285,10 +1285,11 @@ where } } let res = if allow_0conf { - self.channel_manager.accept_inbound_channel_from_trusted_peer_0conf( + self.channel_manager.accept_inbound_channel_from_trusted_peer( &temporary_channel_id, &counterparty_node_id, user_channel_id, + TrustedChannelFeatures::ZeroConf, channel_override_config, ) } else { diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 5a1420882..6fe95a2b3 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -25,6 +25,7 @@ pub use bitcoin::{Address, BlockHash, Network, OutPoint, ScriptBuf, Txid}; pub use lightning::chain::channelmonitor::BalanceSource; use lightning::events::PaidBolt12Invoice as LdkPaidBolt12Invoice; pub use lightning::events::{ClosureReason, PaymentFailureReason}; +use lightning::ln::channel_state::ChannelShutdownState; use lightning::ln::channelmanager::PaymentId; use lightning::ln::msgs::DecodeError; pub use lightning::ln::types::ChannelId; @@ -1415,6 +1416,26 @@ uniffi::custom_type!(LSPSDateTime, String, { }, }); +/// The shutdown state of a channel as returned in [`ChannelDetails::channel_shutdown_state`]. +/// +/// [`ChannelDetails::channel_shutdown_state`]: crate::ChannelDetails::channel_shutdown_state +#[uniffi::remote(Enum)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum ChannelShutdownState { + /// Channel has not sent or received a shutdown message. + NotShuttingDown, + /// Local node has sent a shutdown message for this channel. + ShutdownInitiated, + /// Shutdown message exchanges have concluded and the channels are in the midst of + /// resolving all existing open HTLCs before closing can continue. + ResolvingHTLCs, + /// All HTLCs have been resolved, nodes are currently negotiating channel close onchain fee rates. + NegotiatingClosingFee, + /// We've successfully negotiated a closing_signed dance. At this point `ChannelManager` is about + /// to drop the channel. + ShutdownComplete, +} + /// The reason the channel was closed. See individual variants for more details. #[uniffi::remote(Enum)] #[derive(Clone, Debug, PartialEq, Eq)] diff --git a/src/io/mod.rs b/src/io/mod.rs index e080d39f7..bf6366c45 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -10,6 +10,7 @@ pub mod sqlite_store; #[cfg(test)] pub(crate) mod test_utils; +pub(crate) mod tier_store; pub(crate) mod utils; pub mod vss_store; diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs new file mode 100644 index 000000000..70e9eb883 --- /dev/null +++ b/src/io/tier_store.rs @@ -0,0 +1,907 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. +#![allow(dead_code)] // TODO: Temporal warning silencer. Will be removed in later commit. + +use crate::io::utils::check_namespace_key_validity; +use crate::logger::{LdkLogger, Logger}; +use crate::types::DynStore; + +use lightning::util::persist::{ + KVStore, KVStoreSync, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, +}; +use lightning::{io, log_error}; + +use std::future::Future; +use std::sync::Arc; + +/// A 3-tiered [`KVStoreSync`] implementation that routes data across storage +/// backends that may be local or remote: +/// - a primary store for durable, authoritative persistence, +/// - an optional backup store that maintains an additional durable copy of +/// primary-backed data, and +/// - an optional ephemeral store for non-critical, rebuildable cached data. +/// +/// When a backup store is configured, writes and removals for primary-backed data +/// are issued to the primary and backup stores concurrently and only succeed once +/// both stores complete successfully. +/// +/// Reads and lists do not consult the backup store during normal operation. +/// Ephemeral data is read from and written to the ephemeral store when configured. +/// +/// Note that dual-store writes and removals are not atomic across the primary and +/// backup stores. If one store succeeds and the other fails, the operation +/// returns an error even though one store may already reflect the change. +pub(crate) struct TierStore { + inner: Arc, + logger: Arc, +} + +impl TierStore { + pub fn new(primary_store: Arc, logger: Arc) -> Self { + let inner = Arc::new(TierStoreInner::new(primary_store, Arc::clone(&logger))); + + Self { inner, logger } + } + + /// Configures a backup store for primary-backed data. + /// + /// Once set, writes and removals targeting the primary tier succeed only if both + /// the primary and backup stores succeed. The two operations are issued + /// concurrently, and any failure is returned to the caller. + /// + /// Note: dual-store writes/removals are not atomic. An error may be returned + /// after the primary store has already been updated if the backup store fails. + /// + /// The backup store is not consulted for normal reads or lists. + pub fn set_backup_store(&mut self, backup: Arc) { + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.backup_store = Some(backup); + } + + /// Configures the ephemeral store for non-critical, rebuildable data. + /// + /// When configured, selected cache-like data is routed to this store instead of + /// the primary store. + pub fn set_ephemeral_store(&mut self, ephemeral: Arc) { + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.ephemeral_store = Some(ephemeral); + } +} + +impl KVStore for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.read_internal(primary_namespace, secondary_namespace, key).await } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.write_internal(primary_namespace, secondary_namespace, key, buf).await } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.remove_internal(primary_namespace, secondary_namespace, key, lazy).await } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + + async move { inner.list_internal(primary_namespace, secondary_namespace).await } + } +} + +impl KVStoreSync for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + self.inner.read_internal_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + ) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + self.inner.write_internal_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + buf, + ) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + self.inner.remove_internal_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + lazy, + ) + } + + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { + self.inner + .list_internal_sync(primary_namespace.to_string(), secondary_namespace.to_string()) + } +} + +struct TierStoreInner { + /// The authoritative store for durable data. + primary_store: Arc, + /// The store used for non-critical, rebuildable cached data. + ephemeral_store: Option>, + /// An optional second durable store for primary-backed data. + backup_store: Option>, + logger: Arc, +} + +impl TierStoreInner { + /// Creates a tier store with the primary data store. + pub fn new(primary_store: Arc, logger: Arc) -> Self { + Self { primary_store, ephemeral_store: None, backup_store: None, logger } + } + + /// Reads from the primary data store. + async fn read_primary( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + match KVStore::read( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + ) + .await + { + Ok(data) => Ok(data), + Err(e) => { + log_error!( + self.logger, + "Failed to read from primary store for key {}/{}/{}: {}.", + primary_namespace, + secondary_namespace, + key, + e + ); + Err(e) + }, + } + } + + fn read_primary_sync( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + match KVStoreSync::read( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + ) { + Ok(data) => Ok(data), + Err(e) => { + log_error!( + self.logger, + "Failed to read from primary store for key {}/{}/{}: {}.", + primary_namespace, + secondary_namespace, + key, + e + ); + Err(e) + }, + } + } + + /// Lists keys from the primary data store. + async fn list_primary( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + match KVStore::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) + .await + { + Ok(keys) => Ok(keys), + Err(e) => { + log_error!( + self.logger, + "Failed to list from primary store for namespace {}/{}: {}.", + primary_namespace, + secondary_namespace, + e + ); + Err(e) + }, + } + } + + fn list_primary_sync( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + match KVStoreSync::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) + { + Ok(keys) => Ok(keys), + Err(e) => { + log_error!( + self.logger, + "Failed to list keys in namespace {}/{} from primary store: {}.", + primary_namespace, + secondary_namespace, + e + ); + Err(e) + }, + } + } + + async fn write_primary_backup_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + let primary_fut = KVStore::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ); + + if let Some(backup_store) = self.backup_store.as_ref() { + let backup_fut = KVStore::write( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf, + ); + + let (primary_res, backup_res) = tokio::join!(primary_fut, backup_fut); + + self.handle_primary_backup_results( + "write", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + primary_fut.await + } + } + + fn write_primary_backup_sync( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + if let Some(backup_store) = self.backup_store.as_ref() { + let primary_res = KVStoreSync::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ); + let backup_res = KVStoreSync::write( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf, + ); + + self.handle_primary_backup_results( + "write", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + KVStoreSync::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf, + ) + } + } + + async fn remove_primary_backup_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + let primary_fut = KVStore::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + if let Some(backup_store) = self.backup_store.as_ref() { + let backup_fut = KVStore::remove( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + let (primary_res, backup_res) = tokio::join!(primary_fut, backup_fut); + + self.handle_primary_backup_results( + "removal", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + primary_fut.await + } + } + + fn remove_primary_backup_sync( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + if let Some(backup_store) = self.backup_store.as_ref() { + let primary_res = KVStoreSync::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + let backup_res = KVStoreSync::remove( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + self.handle_primary_backup_results( + "removal", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + KVStoreSync::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ) + } + } + + async fn read_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "read", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + // We don't retry ephemeral-store reads here. Local failures are treated as + // terminal for this access path rather than falling back to another store. + KVStore::read(eph_store.as_ref(), &primary_namespace, &secondary_namespace, &key).await + } else { + self.read_primary(&primary_namespace, &secondary_namespace, &key).await + } + } + + fn read_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "read", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStoreSync::read(eph_store.as_ref(), &primary_namespace, &secondary_namespace, &key) + } else { + self.read_primary_sync(&primary_namespace, &secondary_namespace, &key) + } + } + + async fn write_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "write", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStore::write( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + } else { + self.write_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + } + } + + fn write_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "write", + )?; + + if let Some(ephemeral_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStoreSync::write( + ephemeral_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + } else { + self.write_primary_backup_sync( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + } + } + + async fn remove_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "remove", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStore::remove( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + } else { + self.remove_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + } + } + + fn remove_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "remove", + )?; + + if let Some(ephemeral_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStoreSync::remove( + ephemeral_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + } else { + self.remove_primary_backup_sync( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + } + } + + async fn list_internal( + &self, primary_namespace: String, secondary_namespace: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + None, + "list", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str()) { + ( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + ) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + // We don't retry ephemeral-store lists here. Local failures are treated as + // terminal for this access path rather than falling back to another store. + KVStore::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace) + .await + } else { + self.list_primary(&primary_namespace, &secondary_namespace).await + } + }, + _ => self.list_primary(&primary_namespace, &secondary_namespace).await, + } + } + + fn list_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + None, + "list", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str()) { + ( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + ) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { + if let Some(ephemeral_store) = self.ephemeral_store.as_ref() { + KVStoreSync::list( + ephemeral_store.as_ref(), + &primary_namespace, + &secondary_namespace, + ) + } else { + self.list_primary_sync(&primary_namespace, &secondary_namespace) + } + }, + _ => self.list_primary_sync(&primary_namespace, &secondary_namespace), + } + } + + fn ephemeral_store( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Option<&Arc> { + self.ephemeral_store + .as_ref() + .filter(|_s| is_ephemeral_cached_key(primary_namespace, secondary_namespace, key)) + } + + fn handle_primary_backup_results( + &self, op: &str, primary_namespace: &str, secondary_namespace: &str, key: &str, + primary_res: io::Result<()>, backup_res: io::Result<()>, + ) -> io::Result<()> { + match (primary_res, backup_res) { + (Ok(()), Ok(())) => Ok(()), + (Err(primary_err), Ok(())) => Err(primary_err), + (Ok(()), Err(backup_err)) => Err(backup_err), + (Err(primary_err), Err(backup_err)) => { + log_error!( + self.logger, + "Primary and backup {}s both failed for key {}/{}/{}: primary={}, backup={}", + op, + primary_namespace, + secondary_namespace, + key, + primary_err, + backup_err + ); + Err(primary_err) + }, + } + } +} + +fn is_ephemeral_cached_key(pn: &str, sn: &str, key: &str) -> bool { + matches!( + (pn, sn, key), + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) + ) +} + +#[cfg(test)] +mod tests { + use std::panic::RefUnwindSafe; + use std::path::PathBuf; + use std::sync::Arc; + + use lightning::util::logger::Level; + use lightning::util::persist::{ + CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + }; + use lightning_persister::fs_store::v1::FilesystemStore; + + use crate::io::test_utils::{do_read_write_remove_list_persist, random_storage_path}; + use crate::io::tier_store::TierStore; + use crate::logger::Logger; + use crate::types::DynStore; + use crate::types::DynStoreWrapper; + + use super::*; + + impl RefUnwindSafe for TierStore {} + + struct CleanupDir(PathBuf); + impl Drop for CleanupDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } + } + + fn setup_tier_store(primary_store: Arc, logger: Arc) -> TierStore { + TierStore::new(primary_store, logger) + } + + #[test] + fn write_read_list_remove() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let tier = setup_tier_store(primary_store, logger); + + do_read_write_remove_list_persist(&tier); + } + + #[test] + fn ephemeral_routing() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let ephemeral_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("ephemeral")))); + tier.set_ephemeral_store(Arc::clone(&ephemeral_store)); + + let data = vec![42u8; 32]; + + KVStoreSync::write( + &tier, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + let primary_read_ng = KVStoreSync::read( + &*primary_store, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + let ephemeral_read_ng = KVStoreSync::read( + &*ephemeral_store, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + + let primary_read_cm = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + let ephemeral_read_cm = KVStoreSync::read( + &*ephemeral_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + + assert!(primary_read_ng.is_err()); + assert_eq!(ephemeral_read_ng.unwrap(), data); + + assert!(ephemeral_read_cm.is_err()); + assert_eq!(primary_read_cm.unwrap(), data); + } + + #[test] + fn backup_write_is_part_of_success_path() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup")))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + let primary_read = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + let backup_read = KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + + assert_eq!(primary_read.unwrap(), data); + assert_eq!(backup_read.unwrap(), data); + } + + #[test] + fn backup_remove_is_part_of_success_path() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup")))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + let key = CHANNEL_MANAGER_PERSISTENCE_KEY; + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + data, + ) + .unwrap(); + + KVStoreSync::remove( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + true, + ) + .unwrap(); + + let primary_read = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ); + let backup_read = KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ); + + assert!(primary_read.is_err()); + assert!(backup_read.is_err()); + } +} diff --git a/src/lib.rs b/src/lib.rs index 2ac4697e8..dbaffae1c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -148,7 +148,8 @@ pub use lightning; use lightning::chain::BestBlock; use lightning::impl_writeable_tlv_based; use lightning::ln::chan_utils::FUNDING_TRANSACTION_WITNESS_WEIGHT; -use lightning::ln::channel_state::{ChannelDetails as LdkChannelDetails, ChannelShutdownState}; +use lightning::ln::channel_state::ChannelDetails as LdkChannelDetails; +pub use lightning::ln::channel_state::ChannelShutdownState; use lightning::ln::channelmanager::PaymentId; use lightning::ln::msgs::SocketAddress; use lightning::routing::gossip::NodeAlias; @@ -1128,7 +1129,7 @@ impl Node { fn open_channel_inner( &self, node_id: PublicKey, address: SocketAddress, channel_amount_sats: FundingAmount, push_to_counterparty_msat: Option, channel_config: Option, - announce_for_forwarding: bool, + announce_for_forwarding: bool, set_0reserve: bool, ) -> Result { if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); @@ -1196,25 +1197,46 @@ impl Node { self.keys_manager.get_secure_random_bytes()[..16].try_into().unwrap(), ); - match self.channel_manager.create_channel( - peer_info.node_id, - channel_amount_sats, - push_msat, - user_channel_id, - None, - Some(user_config), - ) { + let result = if set_0reserve { + self.channel_manager.create_channel_to_trusted_peer_0reserve( + peer_info.node_id, + channel_amount_sats, + push_msat, + user_channel_id, + None, + Some(user_config), + ) + } else { + self.channel_manager.create_channel( + peer_info.node_id, + channel_amount_sats, + push_msat, + user_channel_id, + None, + Some(user_config), + ) + }; + + let zero_reserve_string = if set_0reserve { "0reserve " } else { "" }; + + match result { Ok(_) => { log_info!( self.logger, - "Initiated channel creation with peer {}. ", + "Initiated {}channel creation with peer {}. ", + zero_reserve_string, peer_info.node_id ); self.peer_store.add_peer(peer_info)?; Ok(UserChannelId(user_channel_id)) }, Err(e) => { - log_error!(self.logger, "Failed to initiate channel creation: {:?}", e); + log_error!( + self.logger, + "Failed to initiate {}channel creation: {:?}", + zero_reserve_string, + e + ); Err(Error::ChannelCreationFailed) }, } @@ -1290,6 +1312,7 @@ impl Node { push_to_counterparty_msat, channel_config, false, + false, ) } @@ -1330,6 +1353,7 @@ impl Node { push_to_counterparty_msat, channel_config, true, + false, ) } @@ -1358,6 +1382,7 @@ impl Node { push_to_counterparty_msat, channel_config, false, + false, ) } @@ -1395,6 +1420,70 @@ impl Node { push_to_counterparty_msat, channel_config, true, + false, + ) + } + + /// Connect to a node and open a new unannounced channel, in which the target node can + /// spend its entire balance. + /// + /// This channel allows the target node to try to steal your funds with no financial + /// penalty, so this channel should only be opened to nodes you trust. + /// + /// Disconnects and reconnects are handled automatically. + /// + /// If `push_to_counterparty_msat` is set, the given value will be pushed (read: sent) to the + /// channel counterparty on channel open. This can be useful to start out with the balance not + /// entirely shifted to one side, therefore allowing to receive payments from the getgo. + /// + /// If Anchor channels are enabled, this will ensure the configured + /// [`AnchorChannelsConfig::per_channel_reserve_sats`] is available and will be retained before + /// opening the channel. + /// + /// Returns a [`UserChannelId`] allowing to locally keep track of the channel. + /// + /// [`AnchorChannelsConfig::per_channel_reserve_sats`]: crate::config::AnchorChannelsConfig::per_channel_reserve_sats + pub fn open_0reserve_channel( + &self, node_id: PublicKey, address: SocketAddress, channel_amount_sats: u64, + push_to_counterparty_msat: Option, channel_config: Option, + ) -> Result { + self.open_channel_inner( + node_id, + address, + FundingAmount::Exact { amount_sats: channel_amount_sats }, + push_to_counterparty_msat, + channel_config, + false, + true, + ) + } + + /// Connect to a node and open a new unannounced channel, using all available on-chain funds + /// minus fees and anchor reserves. The target node will be able to spend its entire channel + /// balance. + /// + /// This channel allows the target node to try to steal your funds with no financial + /// penalty, so this channel should only be opened to nodes you trust. + /// + /// Disconnects and reconnects are handled automatically. + /// + /// If `push_to_counterparty_msat` is set, the given value will be pushed (read: sent) to the + /// channel counterparty on channel open. This can be useful to start out with the balance not + /// entirely shifted to one side, therefore allowing to receive payments from the getgo. + /// + /// Returns a [`UserChannelId`] allowing to locally keep track of the channel. + pub fn open_0reserve_channel_with_all( + &self, node_id: PublicKey, address: SocketAddress, push_to_counterparty_msat: Option, + channel_config: Option, + ) -> Result { + self.open_channel_inner( + node_id, + address, + FundingAmount::Max, + push_to_counterparty_msat, + channel_config, + false, + true, ) } @@ -1469,12 +1558,7 @@ impl Node { let funding_template = self .channel_manager - .splice_channel( - &channel_details.channel_id, - &counterparty_node_id, - min_feerate, - max_feerate, - ) + .splice_channel(&channel_details.channel_id, &counterparty_node_id) .map_err(|e| { log_error!(self.logger, "Failed to splice channel: {:?}", e); Error::ChannelSplicingFailed @@ -1482,12 +1566,14 @@ impl Node { let contribution = self .runtime - .block_on( - funding_template - .splice_in(Amount::from_sat(splice_amount_sats), Arc::clone(&self.wallet)), - ) - .map_err(|()| { - log_error!(self.logger, "Failed to splice channel: coin selection failed"); + .block_on(funding_template.splice_in( + Amount::from_sat(splice_amount_sats), + min_feerate, + max_feerate, + Arc::clone(&self.wallet), + )) + .map_err(|e| { + log_error!(self.logger, "Failed to splice channel: {}", e); Error::ChannelSplicingFailed })?; @@ -1585,12 +1671,7 @@ impl Node { let funding_template = self .channel_manager - .splice_channel( - &channel_details.channel_id, - &counterparty_node_id, - min_feerate, - max_feerate, - ) + .splice_channel(&channel_details.channel_id, &counterparty_node_id) .map_err(|e| { log_error!(self.logger, "Failed to splice channel: {:?}", e); Error::ChannelSplicingFailed @@ -1602,9 +1683,14 @@ impl Node { }]; let contribution = self .runtime - .block_on(funding_template.splice_out(outputs, Arc::clone(&self.wallet))) - .map_err(|()| { - log_error!(self.logger, "Failed to splice channel: coin selection failed"); + .block_on(funding_template.splice_out( + outputs, + min_feerate, + max_feerate, + Arc::clone(&self.wallet), + )) + .map_err(|e| { + log_error!(self.logger, "Failed to splice channel: {}", e); Error::ChannelSplicingFailed })?; diff --git a/src/liquidity.rs b/src/liquidity.rs index 485da941c..ca859286f 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -142,6 +142,10 @@ pub struct LSPS2ServiceConfig { /// /// [`bLIP-52`]: https://github.com/lightning/blips/blob/master/blip-0052.md#trust-models pub client_trusts_lsp: bool, + /// When set, clients that we open channels to will be allowed to spend their entire channel + /// balance. This allows clients to try to steal your funds with no financial penalty, so + /// this should only be set if you trust your clients. + pub allow_client_0reserve: bool, } pub(crate) struct LiquiditySourceBuilder @@ -786,22 +790,38 @@ where config.channel_config.forwarding_fee_base_msat = 0; config.channel_config.forwarding_fee_proportional_millionths = 0; - match self.channel_manager.create_channel( - their_network_key, - channel_amount_sats, - 0, - user_channel_id, - None, - Some(config), - ) { + let result = if service_config.allow_client_0reserve { + self.channel_manager.create_channel_to_trusted_peer_0reserve( + their_network_key, + channel_amount_sats, + 0, + user_channel_id, + None, + Some(config), + ) + } else { + self.channel_manager.create_channel( + their_network_key, + channel_amount_sats, + 0, + user_channel_id, + None, + Some(config), + ) + }; + + match result { Ok(_) => {}, Err(e) => { // TODO: We just silently fail here. Eventually we will need to remember // the pending requests and regularly retry opening the channel until we // succeed. + let zero_reserve_string = + if service_config.allow_client_0reserve { "0reserve " } else { "" }; log_error!( self.logger, - "Failed to open LSPS2 channel to {}: {:?}", + "Failed to open LSPS2 {}channel to {}: {:?}", + zero_reserve_string, their_network_key, e ); diff --git a/src/types.rs b/src/types.rs index dae315ae0..693c01d16 100644 --- a/src/types.rs +++ b/src/types.rs @@ -15,7 +15,7 @@ use bitcoin::{OutPoint, ScriptBuf}; use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver; use lightning::chain::chainmonitor; use lightning::impl_writeable_tlv_based; -use lightning::ln::channel_state::ChannelDetails as LdkChannelDetails; +use lightning::ln::channel_state::{ChannelDetails as LdkChannelDetails, ChannelShutdownState}; use lightning::ln::msgs::{RoutingMessageHandler, SocketAddress}; use lightning::ln::peer_handler::IgnoringMessageHandler; use lightning::ln::types::ChannelId; @@ -51,7 +51,7 @@ where { } -pub(crate) trait DynStoreTrait: Send + Sync { +pub trait DynStoreTrait: Send + Sync { fn read_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Pin, bitcoin::io::Error>> + Send + 'static>>; @@ -558,6 +558,10 @@ pub struct ChannelDetails { pub inbound_htlc_maximum_msat: Option, /// Set of configurable parameters that affect channel operation. pub config: ChannelConfig, + /// The current shutdown state of the channel, if any. + /// + /// Will be `None` for objects serialized with LDK Node v0.1 and earlier. + pub channel_shutdown_state: Option, } impl From for ChannelDetails { @@ -613,6 +617,7 @@ impl From for ChannelDetails { inbound_htlc_maximum_msat: value.inbound_htlc_maximum_msat, // unwrap safety: `config` is only `None` for LDK objects serialized prior to 0.0.109. config: value.config.map(|c| c.into()).unwrap(), + channel_shutdown_state: value.channel_shutdown_state, } } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 4f68f9825..a44aee174 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -32,8 +32,8 @@ use ldk_node::entropy::{generate_entropy_mnemonic, NodeEntropy}; use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; use ldk_node::{ - Builder, CustomTlvRecord, Event, LightningBalance, Node, NodeError, PendingSweepBalance, - UserChannelId, + Builder, ChannelShutdownState, CustomTlvRecord, Event, LightningBalance, Node, NodeError, + PendingSweepBalance, UserChannelId, }; use lightning::io; use lightning::ln::msgs::SocketAddress; @@ -916,6 +916,12 @@ pub(crate) async fn do_channel_full_cycle( let user_channel_id_a = expect_channel_ready_event!(node_a, node_b.node_id()); let user_channel_id_b = expect_channel_ready_event!(node_b, node_a.node_id()); + // After channel_ready, no shutdown should be in progress. + assert!(node_a.list_channels().iter().all(|c| matches!( + c.channel_shutdown_state, + None | Some(ChannelShutdownState::NotShuttingDown) + ))); + println!("\nB receive"); let invoice_amount_1_msat = 2500_000; let invoice_description: Bolt11InvoiceDescription = @@ -1267,6 +1273,20 @@ pub(crate) async fn do_channel_full_cycle( node_a.force_close_channel(&user_channel_id_a, node_b.node_id(), None).unwrap(); } else { node_a.close_channel(&user_channel_id_a, node_b.node_id()).unwrap(); + // The cooperative shutdown may complete before we get to check, but if the channel + // is still visible it must already be in a shutdown state. + if let Some(channel) = + node_a.list_channels().into_iter().find(|c| c.user_channel_id == user_channel_id_a) + { + assert!( + !matches!( + channel.channel_shutdown_state, + None | Some(ChannelShutdownState::NotShuttingDown) + ), + "Expected shutdown in progress on node_a, got {:?}", + channel.channel_shutdown_state, + ); + } } expect_event!(node_a, ChannelClosed); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 413b2d44a..9d1f99bed 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1705,6 +1705,7 @@ async fn do_lsps2_client_service_integration(client_trusts_lsp: bool) { min_channel_opening_fee_msat: 0, max_client_to_self_delay: 1024, client_trusts_lsp, + allow_client_0reserve: false, }; let service_config = random_config(true); @@ -2023,6 +2024,7 @@ async fn lsps2_client_trusts_lsp() { min_channel_opening_fee_msat: 0, max_client_to_self_delay: 1024, client_trusts_lsp: true, + allow_client_0reserve: false, }; let service_config = random_config(true); @@ -2197,6 +2199,7 @@ async fn lsps2_lsp_trusts_client_but_client_does_not_claim() { min_channel_opening_fee_msat: 0, max_client_to_self_delay: 1024, client_trusts_lsp: false, + allow_client_0reserve: false, }; let service_config = random_config(true);