Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 88 additions & 92 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3370,42 +3370,10 @@ macro_rules! handle_monitor_update_completion {
}};
}

/// Returns whether the monitor update is completed, `false` if the update is in-progress.
fn handle_monitor_update_res<CM: AChannelManager, LG: Logger>(
cm: &CM, update_res: ChannelMonitorUpdateStatus, logger: LG,
) -> bool {
debug_assert!(cm.get_cm().background_events_processed_since_startup.load(Ordering::Acquire));
match update_res {
ChannelMonitorUpdateStatus::UnrecoverableError => {
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
log_error!(logger, "{}", err_str);
panic!("{}", err_str);
},
ChannelMonitorUpdateStatus::InProgress => {
#[cfg(not(any(test, feature = "_externalize_tests")))]
if cm.get_cm().monitor_update_type.swap(1, Ordering::Relaxed) == 2 {
panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart");
}
log_debug!(
logger,
"ChannelMonitor update in flight, holding messages until the update completes.",
);
false
},
ChannelMonitorUpdateStatus::Completed => {
#[cfg(not(any(test, feature = "_externalize_tests")))]
if cm.get_cm().monitor_update_type.swap(2, Ordering::Relaxed) == 1 {
panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart");
}
true
},
}
}

macro_rules! handle_initial_monitor {
($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => {
let logger = WithChannelContext::from(&$self.logger, &$chan.context, None);
let update_completed = handle_monitor_update_res($self, $update_res, logger);
let update_completed = $self.handle_monitor_update_res($update_res, logger);
if update_completed {
handle_monitor_update_completion!(
$self,
Expand All @@ -3418,69 +3386,17 @@ macro_rules! handle_initial_monitor {
};
}

fn handle_new_monitor_update_internal<CM: AChannelManager, LG: Logger>(
cm: &CM,
in_flight_monitor_updates: &mut BTreeMap<ChannelId, (OutPoint, Vec<ChannelMonitorUpdate>)>,
channel_id: ChannelId, funding_txo: OutPoint, counterparty_node_id: PublicKey,
new_update: ChannelMonitorUpdate, logger: LG,
) -> (bool, bool) {
let in_flight_updates = &mut in_flight_monitor_updates
.entry(channel_id)
.or_insert_with(|| (funding_txo, Vec::new()))
.1;
// During startup, we push monitor updates as background events through to here in
// order to replay updates that were in-flight when we shut down. Thus, we have to
// filter for uniqueness here.
let update_idx =
in_flight_updates.iter().position(|upd| upd == &new_update).unwrap_or_else(|| {
in_flight_updates.push(new_update);
in_flight_updates.len() - 1
});

if cm.get_cm().background_events_processed_since_startup.load(Ordering::Acquire) {
let update_res =
cm.get_cm().chain_monitor.update_channel(channel_id, &in_flight_updates[update_idx]);
let update_completed = handle_monitor_update_res(cm, update_res, logger);
if update_completed {
let _ = in_flight_updates.remove(update_idx);
}
(update_completed, update_completed && in_flight_updates.is_empty())
} else {
// We blindly assume that the ChannelMonitorUpdate will be regenerated on startup if we
// fail to persist it. This is a fairly safe assumption, however, since anything we do
// during the startup sequence should be replayed exactly if we immediately crash.
let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id,
funding_txo,
channel_id,
update: in_flight_updates[update_idx].clone(),
};
// We want to track the in-flight update both in `in_flight_monitor_updates` and in
// `pending_background_events` to avoid a race condition during
// `pending_background_events` processing where we complete one
// `ChannelMonitorUpdate` (but there are more pending as background events) but we
// conclude that all pending `ChannelMonitorUpdate`s have completed and its safe to
// run post-completion actions.
// We could work around that with some effort, but its simpler to just track updates
// twice.
cm.get_cm().pending_background_events.lock().unwrap().push(event);
(false, false)
}
}

macro_rules! handle_post_close_monitor_update {
(
$self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr,
$per_peer_state_lock: expr, $counterparty_node_id: expr, $channel_id: expr
) => {{
let (update_completed, all_updates_complete) = handle_new_monitor_update_internal(
$self,
let (update_completed, all_updates_complete) = $self.handle_new_monitor_update_internal(
&mut $peer_state.in_flight_monitor_updates,
$channel_id,
$funding_txo,
$counterparty_node_id,
$update,
WithContext::from(&$self.logger, Some($counterparty_node_id), Some($channel_id), None),
);
if all_updates_complete {
let update_actions = $peer_state
Expand Down Expand Up @@ -3510,14 +3426,12 @@ macro_rules! handle_new_monitor_update_locked_actions_handled_by_caller {
(
$self: ident, $funding_txo: expr, $update: expr, $in_flight_monitor_updates: expr, $chan_context: expr
) => {{
let (update_completed, _all_updates_complete) = handle_new_monitor_update_internal(
$self,
let (update_completed, _all_updates_complete) = $self.handle_new_monitor_update_internal(
$in_flight_monitor_updates,
$chan_context.channel_id(),
$funding_txo,
$chan_context.get_counterparty_node_id(),
$update,
WithChannelContext::from(&$self.logger, &$chan_context, None),
);
update_completed
}};
Expand All @@ -3528,14 +3442,12 @@ macro_rules! handle_new_monitor_update {
$self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr,
$per_peer_state_lock: expr, $chan: expr
) => {{
let (update_completed, all_updates_complete) = handle_new_monitor_update_internal(
$self,
let (update_completed, all_updates_complete) = $self.handle_new_monitor_update_internal(
&mut $peer_state.in_flight_monitor_updates,
$chan.context.channel_id(),
$funding_txo,
$chan.context.get_counterparty_node_id(),
$update,
WithChannelContext::from(&$self.logger, &$chan.context, None),
);
if all_updates_complete {
handle_monitor_update_completion!(
Expand Down Expand Up @@ -9795,6 +9707,90 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
}
}

fn handle_new_monitor_update_internal(
&self,
in_flight_monitor_updates: &mut BTreeMap<ChannelId, (OutPoint, Vec<ChannelMonitorUpdate>)>,
channel_id: ChannelId, funding_txo: OutPoint, counterparty_node_id: PublicKey,
new_update: ChannelMonitorUpdate,
) -> (bool, bool) {
let in_flight_updates = &mut in_flight_monitor_updates
.entry(channel_id)
.or_insert_with(|| (funding_txo, Vec::new()))
.1;
// During startup, we push monitor updates as background events through to here in
// order to replay updates that were in-flight when we shut down. Thus, we have to
// filter for uniqueness here.
let update_idx =
in_flight_updates.iter().position(|upd| upd == &new_update).unwrap_or_else(|| {
in_flight_updates.push(new_update);
in_flight_updates.len() - 1
});

if self.background_events_processed_since_startup.load(Ordering::Acquire) {
let update_res =
self.chain_monitor.update_channel(channel_id, &in_flight_updates[update_idx]);
let logger =
WithContext::from(&self.logger, Some(counterparty_node_id), Some(channel_id), None);
let update_completed = self.handle_monitor_update_res(update_res, logger);
if update_completed {
let _ = in_flight_updates.remove(update_idx);
}
(update_completed, update_completed && in_flight_updates.is_empty())
} else {
// We blindly assume that the ChannelMonitorUpdate will be regenerated on startup if we
// fail to persist it. This is a fairly safe assumption, however, since anything we do
// during the startup sequence should be replayed exactly if we immediately crash.
let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id,
funding_txo,
channel_id,
update: in_flight_updates[update_idx].clone(),
};
// We want to track the in-flight update both in `in_flight_monitor_updates` and in
// `pending_background_events` to avoid a race condition during
// `pending_background_events` processing where we complete one
// `ChannelMonitorUpdate` (but there are more pending as background events) but we
// conclude that all pending `ChannelMonitorUpdate`s have completed and its safe to
// run post-completion actions.
// We could work around that with some effort, but its simpler to just track updates
// twice.
self.pending_background_events.lock().unwrap().push(event);
(false, false)
}
}

/// Returns whether the monitor update is completed, `false` if the update is in-progress.
fn handle_monitor_update_res<LG: Logger>(
&self, update_res: ChannelMonitorUpdateStatus, logger: LG,
) -> bool {
debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire));
match update_res {
ChannelMonitorUpdateStatus::UnrecoverableError => {
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
log_error!(logger, "{}", err_str);
panic!("{}", err_str);
},
ChannelMonitorUpdateStatus::InProgress => {
#[cfg(not(any(test, feature = "_externalize_tests")))]
if self.monitor_update_type.swap(1, Ordering::Relaxed) == 2 {
panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart");
}
log_debug!(
logger,
"ChannelMonitor update in flight, holding messages until the update completes.",
);
false
},
ChannelMonitorUpdateStatus::Completed => {
#[cfg(not(any(test, feature = "_externalize_tests")))]
if self.monitor_update_type.swap(2, Ordering::Relaxed) == 1 {
panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart");
}
true
},
}
}

/// Handles a channel reentering a functional state, either due to reconnect or a monitor
/// update completion.
#[rustfmt::skip]
Expand Down