diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index aae738ab1c1..c794e4663c9 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -171,6 +171,16 @@ const SWEEPER_TIMER: Duration = Duration::from_secs(30); #[cfg(test)] const SWEEPER_TIMER: Duration = Duration::from_secs(1); +#[cfg(not(test))] +const FIRST_ARCHIVE_STALE_MONITORS_TIMER: Duration = Duration::from_secs(15); +#[cfg(test)] +const FIRST_ARCHIVE_STALE_MONITORS_TIMER: Duration = Duration::ZERO; + +#[cfg(not(test))] +const ARCHIVE_STALE_MONITORS_TIMER: Duration = Duration::from_secs(60 * 10); +#[cfg(test)] +const ARCHIVE_STALE_MONITORS_TIMER: Duration = Duration::from_secs(1); + /// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement const fn min_duration(a: Duration, b: Duration) -> Duration { if a.as_nanos() < b.as_nanos() { @@ -1018,8 +1028,10 @@ where let mut last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER); let mut last_rebroadcast_call = sleeper(REBROADCAST_TIMER); let mut last_sweeper_call = sleeper(SWEEPER_TIMER); + let mut last_archive_call = sleeper(FIRST_ARCHIVE_STALE_MONITORS_TIMER); let mut have_pruned = false; let mut have_decayed_scorer = false; + let mut have_archived = false; let mut last_forwards_processing_call = sleeper(batch_delay.get()); @@ -1147,11 +1159,31 @@ where log_trace!(logger, "Done persisting ChannelManager."); } - // Note that we want to run a graph prune once not long after startup before - // falling back to our usual hourly prunes. This avoids short-lived clients never - // pruning their network graph. We run once 60 seconds after startup before - // continuing our normal cadence. For RGS, since 60 seconds is likely too long, - // we prune after an initial sync completes. + // Note that we want to archive stale ChannelMonitors and run a network graph prune once + // not long after startup before falling back to their usual infrequent runs. This avoids + // short-lived clients never archiving stale ChannelMonitors or pruning their network + // graph. For network graph pruning, in the case of RGS sync, we run a prune immediately + // after initial sync completes, otherwise we do so on a timer which should be long enough + // to give us a chance to get most of the network graph from our peers. + let archive_timer = if have_archived { + ARCHIVE_STALE_MONITORS_TIMER + } else { + FIRST_ARCHIVE_STALE_MONITORS_TIMER + }; + let archive_timer_elapsed = { + match check_and_reset_sleeper(&mut last_archive_call, || sleeper(archive_timer)) { + Some(false) => true, + Some(true) => break, + None => false, + } + }; + if archive_timer_elapsed { + log_trace!(logger, "Archiving stale ChannelMonitors."); + chain_monitor.archive_fully_resolved_channel_monitors(); + have_archived = true; + log_trace!(logger, "Archived stale ChannelMonitors."); + } + let prune_timer = if gossip_sync.prunable_network_graph().is_some() { NETWORK_PRUNE_TIMER } else { @@ -1601,8 +1633,10 @@ impl BackgroundProcessor { let mut last_scorer_persist_call = Instant::now(); let mut last_rebroadcast_call = Instant::now(); let mut last_sweeper_call = Instant::now(); + let mut last_archive_call = Instant::now(); let mut have_pruned = false; let mut have_decayed_scorer = false; + let mut have_archived = false; let mut cur_batch_delay = batch_delay.get(); let mut last_forwards_processing_call = Instant::now(); @@ -1691,11 +1725,26 @@ impl BackgroundProcessor { }); } - // Note that we want to run a graph prune once not long after startup before - // falling back to our usual hourly prunes. This avoids short-lived clients never - // pruning their network graph. We run once 60 seconds after startup before - // continuing our normal cadence. For RGS, since 60 seconds is likely too long, - // we prune after an initial sync completes. + // Note that we want to archive stale ChannelMonitors and run a network graph prune once + // not long after startup before falling back to their usual infrequent runs. This avoids + // short-lived clients never archiving stale ChannelMonitors or pruning their network + // graph. For network graph pruning, in the case of RGS sync, we run a prune immediately + // after initial sync completes, otherwise we do so on a timer which should be long enough + // to give us a chance to get most of the network graph from our peers. + let archive_timer = if have_archived { + ARCHIVE_STALE_MONITORS_TIMER + } else { + FIRST_ARCHIVE_STALE_MONITORS_TIMER + }; + let archive_timer_elapsed = last_archive_call.elapsed() > archive_timer; + if archive_timer_elapsed { + log_trace!(logger, "Archiving stale ChannelMonitors."); + chain_monitor.archive_fully_resolved_channel_monitors(); + have_archived = true; + last_archive_call = Instant::now(); + log_trace!(logger, "Archived stale ChannelMonitors."); + } + let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; let prune_timer_elapsed = last_prune_call.elapsed() > prune_timer; @@ -3698,4 +3747,127 @@ mod tests { exit_sender.send(()).unwrap(); t1.await.unwrap().unwrap(); } + + #[test] + fn test_monitor_archive() { + let (persist_dir, nodes) = create_nodes(2, "test_monitor_archive"); + // Open a channel, but don't confirm it so that it prunes immediately on FC. + open_channel!(nodes[0], nodes[1], 100000); + + let data_dir = nodes[1].kv_store.get_data_dir(); + let persister = Arc::new(Persister::new(data_dir)); + let event_handler = |_: _| Ok(()); + let bp = BackgroundProcessor::start( + persister, + event_handler, + Arc::clone(&nodes[1].chain_monitor), + Arc::clone(&nodes[1].node), + Some(Arc::clone(&nodes[1].messenger)), + nodes[1].p2p_gossip_sync(), + Arc::clone(&nodes[1].peer_manager), + Some(Arc::clone(&nodes[1].liquidity_manager)), + Some(Arc::clone(&nodes[1].sweeper)), + Arc::clone(&nodes[1].logger), + Some(Arc::clone(&nodes[1].scorer)), + ); + + let dir = format!("{}_persister_1/monitors", &persist_dir); + let mut mons = std::fs::read_dir(&dir).unwrap(); + let mut mon = mons.next().unwrap().unwrap(); + if mon.path().to_str().unwrap().ends_with(".tmp") { + mon = mons.next().unwrap().unwrap(); + assert_eq!(mon.path().extension(), None); + } + assert!(mons.next().is_none()); + + // Because the channel wasn't funded, we'll archive the ChannelMonitor immedaitely after + // its force-closed (at least on node B, which didn't put their money into it). + nodes[1].node.force_close_all_channels_broadcasting_latest_txn("".to_owned()); + loop { + let mut mons = std::fs::read_dir(&dir).unwrap(); + if let Some(new_mon) = mons.next() { + let mut new_mon = new_mon.unwrap(); + if new_mon.path().to_str().unwrap().ends_with(".tmp") { + new_mon = mons.next().unwrap().unwrap(); + assert_eq!(new_mon.path().extension(), None); + } + assert_eq!(new_mon.path(), mon.path()); + assert!(mons.next().is_none()); + } else { + break; + } + } + + bp.stop().unwrap(); + } + + #[tokio::test] + #[cfg(not(c_bindings))] + async fn test_monitor_archive_async() { + let (persist_dir, nodes) = create_nodes(2, "test_monitor_archive_async"); + // Open a channel, but don't confirm it so that it prunes immediately on FC. + open_channel!(nodes[0], nodes[1], 100000); + + let kv_store = KVStoreSyncWrapper(Arc::clone(&nodes[0].kv_store)); + let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe { + &*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>) + as &'static OutputSweeper<_, _, _, _, _, _, _> + }; + let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); + let bp_future = tokio::spawn(super::process_events_async( + kv_store, + move |_: Event| async move { Ok(()) }, + Arc::clone(&nodes[1].chain_monitor), + Arc::clone(&nodes[1].node), + crate::NO_ONION_MESSENGER, + nodes[1].no_gossip_sync(), + Arc::clone(&nodes[1].peer_manager), + crate::NO_LIQUIDITY_MANAGER, + Some(sweeper_async), + Arc::clone(&nodes[1].logger), + Some(Arc::clone(&nodes[1].scorer)), + move |dur: Duration| { + let mut exit_receiver = exit_receiver.clone(); + Box::pin(async move { + tokio::select! { + _ = tokio::time::sleep(dur) => false, + _ = exit_receiver.changed() => true, + } + }) + }, + false, + || Some(Duration::ZERO), + )); + + let dir = format!("{}_persister_1/monitors", &persist_dir); + let mut mons = std::fs::read_dir(&dir).unwrap(); + let mut mon = mons.next().unwrap().unwrap(); + if mon.path().to_str().unwrap().ends_with(".tmp") { + mon = mons.next().unwrap().unwrap(); + assert_eq!(mon.path().extension(), None); + } + assert!(mons.next().is_none()); + + // Because the channel wasn't funded, we'll archive the ChannelMonitor immedaitely after + // its force-closed (at least on node B, which didn't put their money into it). + nodes[1].node.force_close_all_channels_broadcasting_latest_txn("".to_owned()); + loop { + let mut mons = std::fs::read_dir(&dir).unwrap(); + if let Some(new_mon) = mons.next() { + let mut new_mon = new_mon.unwrap(); + if new_mon.path().to_str().unwrap().ends_with(".tmp") { + new_mon = mons.next().unwrap().unwrap(); + assert_eq!(new_mon.path().extension(), None); + } + assert_eq!(new_mon.path(), mon.path()); + assert!(mons.next().is_none()); + } else { + break; + } + tokio::task::yield_now().await; + } + + exit_sender.send(()).unwrap(); + bp_future.await.unwrap().unwrap(); + } }