Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 182 additions & 10 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ const SWEEPER_TIMER: Duration = Duration::from_secs(30);
#[cfg(test)]
const SWEEPER_TIMER: Duration = Duration::from_secs(1);

#[cfg(not(test))]
const FIRST_ARCHIVE_STALE_MONITORS_TIMER: Duration = Duration::from_secs(15);
#[cfg(test)]
const FIRST_ARCHIVE_STALE_MONITORS_TIMER: Duration = Duration::ZERO;

#[cfg(not(test))]
const ARCHIVE_STALE_MONITORS_TIMER: Duration = Duration::from_secs(60 * 10);
#[cfg(test)]
const ARCHIVE_STALE_MONITORS_TIMER: Duration = Duration::from_secs(1);

/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
const fn min_duration(a: Duration, b: Duration) -> Duration {
if a.as_nanos() < b.as_nanos() {
Expand Down Expand Up @@ -1018,8 +1028,10 @@ where
let mut last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
let mut last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
let mut last_sweeper_call = sleeper(SWEEPER_TIMER);
let mut last_archive_call = sleeper(FIRST_ARCHIVE_STALE_MONITORS_TIMER);
let mut have_pruned = false;
let mut have_decayed_scorer = false;
let mut have_archived = false;

let mut last_forwards_processing_call = sleeper(batch_delay.get());

Expand Down Expand Up @@ -1147,11 +1159,31 @@ where
log_trace!(logger, "Done persisting ChannelManager.");
}

// Note that we want to run a graph prune once not long after startup before
// falling back to our usual hourly prunes. This avoids short-lived clients never
// pruning their network graph. We run once 60 seconds after startup before
// continuing our normal cadence. For RGS, since 60 seconds is likely too long,
// we prune after an initial sync completes.
// Note that we want to archive stale ChannelMonitors and run a network graph prune once
// not long after startup before falling back to their usual infrequent runs. This avoids
// short-lived clients never archiving stale ChannelMonitors or pruning their network
// graph. For network graph pruning, in the case of RGS sync, we run a prune immediately
// after initial sync completes, otherwise we do so on a timer which should be long enough
// to give us a chance to get most of the network graph from our peers.
let archive_timer = if have_archived {
ARCHIVE_STALE_MONITORS_TIMER
} else {
FIRST_ARCHIVE_STALE_MONITORS_TIMER
};
let archive_timer_elapsed = {
match check_and_reset_sleeper(&mut last_archive_call, || sleeper(archive_timer)) {
Some(false) => true,
Some(true) => break,
None => false,
}
};
if archive_timer_elapsed {
log_trace!(logger, "Archiving stale ChannelMonitors.");
chain_monitor.archive_fully_resolved_channel_monitors();
have_archived = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do wonder if we should have a more verbose return value and then use that to explicitly prune the corresponding OutputSweeper entries now, rather than just adding a buffer on top of the archival delay.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly, yea, though if we're still concerned about it its much easier to double the prune delay in the sweeper...Either way seems unrelated to this PR (which just makes sure the sweeper works properly...)

log_trace!(logger, "Archived stale ChannelMonitors.");
}

let prune_timer = if gossip_sync.prunable_network_graph().is_some() {
NETWORK_PRUNE_TIMER
} else {
Expand Down Expand Up @@ -1601,8 +1633,10 @@ impl BackgroundProcessor {
let mut last_scorer_persist_call = Instant::now();
let mut last_rebroadcast_call = Instant::now();
let mut last_sweeper_call = Instant::now();
let mut last_archive_call = Instant::now();
let mut have_pruned = false;
let mut have_decayed_scorer = false;
let mut have_archived = false;

let mut cur_batch_delay = batch_delay.get();
let mut last_forwards_processing_call = Instant::now();
Expand Down Expand Up @@ -1691,11 +1725,26 @@ impl BackgroundProcessor {
});
}

// Note that we want to run a graph prune once not long after startup before
// falling back to our usual hourly prunes. This avoids short-lived clients never
// pruning their network graph. We run once 60 seconds after startup before
// continuing our normal cadence. For RGS, since 60 seconds is likely too long,
// we prune after an initial sync completes.
// Note that we want to archive stale ChannelMonitors and run a network graph prune once
// not long after startup before falling back to their usual infrequent runs. This avoids
// short-lived clients never archiving stale ChannelMonitors or pruning their network
// graph. For network graph pruning, in the case of RGS sync, we run a prune immediately
// after initial sync completes, otherwise we do so on a timer which should be long enough
// to give us a chance to get most of the network graph from our peers.
let archive_timer = if have_archived {
ARCHIVE_STALE_MONITORS_TIMER
} else {
FIRST_ARCHIVE_STALE_MONITORS_TIMER
};
let archive_timer_elapsed = last_archive_call.elapsed() > archive_timer;
if archive_timer_elapsed {
log_trace!(logger, "Archiving stale ChannelMonitors.");
chain_monitor.archive_fully_resolved_channel_monitors();
have_archived = true;
last_archive_call = Instant::now();
log_trace!(logger, "Archived stale ChannelMonitors.");
}

let prune_timer =
if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
let prune_timer_elapsed = last_prune_call.elapsed() > prune_timer;
Expand Down Expand Up @@ -3698,4 +3747,127 @@ mod tests {
exit_sender.send(()).unwrap();
t1.await.unwrap().unwrap();
}

#[test]
fn test_monitor_archive() {
let (persist_dir, nodes) = create_nodes(2, "test_monitor_archive");
// Open a channel, but don't confirm it so that it prunes immediately on FC.
open_channel!(nodes[0], nodes[1], 100000);

let data_dir = nodes[1].kv_store.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
let event_handler = |_: _| Ok(());
let bp = BackgroundProcessor::start(
persister,
event_handler,
Arc::clone(&nodes[1].chain_monitor),
Arc::clone(&nodes[1].node),
Some(Arc::clone(&nodes[1].messenger)),
nodes[1].p2p_gossip_sync(),
Arc::clone(&nodes[1].peer_manager),
Some(Arc::clone(&nodes[1].liquidity_manager)),
Some(Arc::clone(&nodes[1].sweeper)),
Arc::clone(&nodes[1].logger),
Some(Arc::clone(&nodes[1].scorer)),
);

let dir = format!("{}_persister_1/monitors", &persist_dir);
let mut mons = std::fs::read_dir(&dir).unwrap();
let mut mon = mons.next().unwrap().unwrap();
if mon.path().to_str().unwrap().ends_with(".tmp") {
mon = mons.next().unwrap().unwrap();
assert_eq!(mon.path().extension(), None);
}
assert!(mons.next().is_none());

// Because the channel wasn't funded, we'll archive the ChannelMonitor immedaitely after
// its force-closed (at least on node B, which didn't put their money into it).
nodes[1].node.force_close_all_channels_broadcasting_latest_txn("".to_owned());
loop {
let mut mons = std::fs::read_dir(&dir).unwrap();
if let Some(new_mon) = mons.next() {
let mut new_mon = new_mon.unwrap();
if new_mon.path().to_str().unwrap().ends_with(".tmp") {
new_mon = mons.next().unwrap().unwrap();
assert_eq!(new_mon.path().extension(), None);
}
assert_eq!(new_mon.path(), mon.path());
assert!(mons.next().is_none());
} else {
break;
}
}

bp.stop().unwrap();
}

#[tokio::test]
#[cfg(not(c_bindings))]
async fn test_monitor_archive_async() {
let (persist_dir, nodes) = create_nodes(2, "test_monitor_archive_async");
// Open a channel, but don't confirm it so that it prunes immediately on FC.
open_channel!(nodes[0], nodes[1], 100000);

let kv_store = KVStoreSyncWrapper(Arc::clone(&nodes[0].kv_store));
let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
as &'static OutputSweeper<_, _, _, _, _, _, _>
};
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
let bp_future = tokio::spawn(super::process_events_async(
kv_store,
move |_: Event| async move { Ok(()) },
Arc::clone(&nodes[1].chain_monitor),
Arc::clone(&nodes[1].node),
crate::NO_ONION_MESSENGER,
nodes[1].no_gossip_sync(),
Arc::clone(&nodes[1].peer_manager),
crate::NO_LIQUIDITY_MANAGER,
Some(sweeper_async),
Arc::clone(&nodes[1].logger),
Some(Arc::clone(&nodes[1].scorer)),
move |dur: Duration| {
let mut exit_receiver = exit_receiver.clone();
Box::pin(async move {
tokio::select! {
_ = tokio::time::sleep(dur) => false,
_ = exit_receiver.changed() => true,
}
})
},
false,
|| Some(Duration::ZERO),
));

let dir = format!("{}_persister_1/monitors", &persist_dir);
let mut mons = std::fs::read_dir(&dir).unwrap();
let mut mon = mons.next().unwrap().unwrap();
if mon.path().to_str().unwrap().ends_with(".tmp") {
mon = mons.next().unwrap().unwrap();
assert_eq!(mon.path().extension(), None);
}
assert!(mons.next().is_none());

// Because the channel wasn't funded, we'll archive the ChannelMonitor immedaitely after
// its force-closed (at least on node B, which didn't put their money into it).
nodes[1].node.force_close_all_channels_broadcasting_latest_txn("".to_owned());
loop {
let mut mons = std::fs::read_dir(&dir).unwrap();
if let Some(new_mon) = mons.next() {
let mut new_mon = new_mon.unwrap();
if new_mon.path().to_str().unwrap().ends_with(".tmp") {
new_mon = mons.next().unwrap().unwrap();
assert_eq!(new_mon.path().extension(), None);
}
assert_eq!(new_mon.path(), mon.path());
assert!(mons.next().is_none());
} else {
break;
}
tokio::task::yield_now().await;
}

exit_sender.send(()).unwrap();
bp_future.await.unwrap().unwrap();
}
}