diff --git a/lnwallet/chancloser/rbf_coop_states.go b/lnwallet/chancloser/rbf_coop_states.go index 61f7718b5a6..9319c1d271e 100644 --- a/lnwallet/chancloser/rbf_coop_states.go +++ b/lnwallet/chancloser/rbf_coop_states.go @@ -1026,3 +1026,7 @@ type RbfEvent = protofsm.EmittedEvent[ProtocolEvent] // RbfStateSub is a type alias for the state subscription type of the RBF chan // closer. type RbfStateSub = protofsm.StateSubscriber[ProtocolEvent, *Environment] + +// ChanCloserActorMsg is an adapter to enable the state machine executor that +// runs this state machine to be passed around as an actor. +type ChanCloserActorMsg = protofsm.ActorMessage[ProtocolEvent] diff --git a/peer/brontide.go b/peer/brontide.go index 5ab4e8fa6bf..0e8d58b20d4 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -1762,6 +1762,10 @@ func (p *Brontide) Disconnect(reason error) { // Stop the onion peer actor if one was spawned. p.StopOnionActorIfExists() + // Unregister any RBF close actors registered for channels of this + // peer so we don't leave stale entries in the actor system. + p.unregisterRbfCloseActors() + // Ensure that the TCP connection is properly closed before continuing. p.cfg.Conn.Close() @@ -1793,6 +1797,33 @@ func (p *Brontide) StopOnionActorIfExists() { ) } +// unregisterRbfCloseActors removes any RBF close actors registered for this +// peer's active channels from the actor system. This should be called on +// disconnect so we don't leave stale RBF close actors for a peer that is no +// longer connected. This is idempotent and safe to call multiple times. +func (p *Brontide) unregisterRbfCloseActors() { + if p.cfg.ActorSystem == nil { + return + } + + p.activeChannels.Range(func(_ lnwire.ChannelID, + channel *lnwallet.LightningChannel) bool { + + // Pending channels are tracked with a nil value in the map, + // so skip those as they have no channel point to look up. + if channel == nil { + return true + } + + actorKey := NewRbfCloserPeerServiceKey( + channel.ChannelPoint(), + ) + actorKey.UnregisterAll(p.cfg.ActorSystem) + + return true + }) +} + // readNextMessage reads, and returns the next message on the wire along with // any additional raw payload. func (p *Brontide) readNextMessage() (lnwire.Message, error) { @@ -4233,8 +4264,28 @@ func (p *Brontide) initRbfChanCloser( "close: %w", err) } + // We store the closer first so that any lookups that race with actor + // registration will find the chan closer already in place. p.activeChanCloses.Store(chanID, makeRbfCloser(&chanCloser)) + // In addition to the message router, we'll register the state machine + // with the actor system. + if p.cfg.ActorSystem != nil { + p.log.Infof("Registering RBF actor for channel %v", + channel.ChannelPoint()) + + actorWrapper := newRbfCloseActor( + channel.ChannelPoint(), p, p.cfg.ActorSystem, + ) + if err := actorWrapper.registerActor(); err != nil { + chanCloser.Stop() + p.activeChanCloses.Delete(chanID) + + return nil, fmt.Errorf("unable to register RBF close "+ + "actor: %w", err) + } + } + // Now that we've created the rbf closer state machine, we'll launch a // new goroutine to eventually send in the ChannelFlushed event once // needed. @@ -5817,42 +5868,3 @@ func (p *Brontide) ChanHasRbfCoopCloser(chanPoint wire.OutPoint) bool { return chanCloser.IsRight() } - -// TriggerCoopCloseRbfBump given a chan ID, and the params needed to trigger a -// new RBF co-op close update, a bump is attempted. A channel used for updates, -// along with one used to o=communicate any errors is returned. If no chan -// closer is found, then false is returned for the second argument. -func (p *Brontide) TriggerCoopCloseRbfBump(ctx context.Context, - chanPoint wire.OutPoint, feeRate chainfee.SatPerKWeight, - deliveryScript lnwire.DeliveryAddress) (*CoopCloseUpdates, error) { - - // If RBF coop close isn't permitted, then we'll an error. - if !p.rbfCoopCloseAllowed() { - return nil, fmt.Errorf("rbf coop close not enabled for " + - "channel") - } - - closeUpdates := &CoopCloseUpdates{ - UpdateChan: make(chan interface{}, 1), - ErrChan: make(chan error, 1), - } - - // We'll re-use the existing switch struct here, even though we're - // bypassing the switch entirely. - closeReq := htlcswitch.ChanClose{ - CloseType: contractcourt.CloseRegular, - ChanPoint: &chanPoint, - TargetFeePerKw: feeRate, - DeliveryScript: deliveryScript, - Updates: closeUpdates.UpdateChan, - Err: closeUpdates.ErrChan, - Ctx: ctx, - } - - err := p.startRbfChanCloser(newRPCShutdownInit(&closeReq), chanPoint) - if err != nil { - return nil, err - } - - return closeUpdates, nil -} diff --git a/peer/rbf_close_wrapper_actor.go b/peer/rbf_close_wrapper_actor.go new file mode 100644 index 00000000000..5200f64140c --- /dev/null +++ b/peer/rbf_close_wrapper_actor.go @@ -0,0 +1,189 @@ +package peer + +import ( + "context" + "fmt" + + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/actor" + "github.com/lightningnetwork/lnd/contractcourt" + "github.com/lightningnetwork/lnd/fn/v2" + "github.com/lightningnetwork/lnd/htlcswitch" + "github.com/lightningnetwork/lnd/lnwallet/chainfee" + "github.com/lightningnetwork/lnd/lnwire" +) + +// rbfCloseMessage is a message type that is used to trigger a cooperative fee +// bump, or initiate a close for the first time. +type rbfCloseMessage struct { + actor.BaseMessage + + // Ctx is the context of the caller that initiated the RBF close. This + // is propagated to the underlying close request so that cancellation + // of the caller (e.g. RPC stream disconnect) tears down the associated + // observer goroutine. The caller's context is distinct from the + // actor's own lifecycle context that is passed to Receive. + Ctx context.Context //nolint:containedctx + + // ChanPoint is the channel point of the channel to be closed. + ChanPoint wire.OutPoint + + // FeeRate is the fee rate to use for the transaction. + FeeRate chainfee.SatPerKWeight + + // DeliveryScript is the script to use for the transaction. + DeliveryScript lnwire.DeliveryAddress +} + +// MessageType returns the type of the message. +// +// NOTE: This is part of the actor.Message interface. +func (r rbfCloseMessage) MessageType() string { + return fmt.Sprintf("RbfCloseMessage(%v)", r.ChanPoint) +} + +// NewRbfBumpCloseMsg returns a message that can be sent to the RBF actor to +// initiate a new fee bump. +func NewRbfBumpCloseMsg(ctx context.Context, op wire.OutPoint, + feeRate chainfee.SatPerKWeight, + deliveryScript lnwire.DeliveryAddress) rbfCloseMessage { + + return rbfCloseMessage{ + Ctx: ctx, + ChanPoint: op, + FeeRate: feeRate, + DeliveryScript: deliveryScript, + } +} + +// RbfCloseActorServiceKey is a service key that can be used to reach an RBF +// chan closer. +type RbfCloseActorServiceKey = actor.ServiceKey[ + rbfCloseMessage, *CoopCloseUpdates, +] + +// NewRbfCloserPeerServiceKey returns a new service key that can be used to +// reach an RBF chan closer, via an active peer. +func NewRbfCloserPeerServiceKey(op wire.OutPoint) RbfCloseActorServiceKey { + opStr := op.String() + + // Just using the channel point here is enough, as we have a unique + // type here rbfCloseMessage which will handle the final actor + // selection. + actorKey := fmt.Sprintf("Peer(RbfChanCloser(%v))", opStr) + + return actor.NewServiceKey[rbfCloseMessage, *CoopCloseUpdates](actorKey) +} + +// rbfCloseActor is a wrapper around the Brontide peer to expose the internal +// RBF close state machine as an actor. This is intended for callers that need +// to obtain streaming close updates related to the RBF close process. +type rbfCloseActor struct { + chanPeer *Brontide + actorSystem *actor.ActorSystem + chanPoint wire.OutPoint +} + +// newRbfCloseActor creates a new instance of the RBF close wrapper actor. +func newRbfCloseActor(chanPoint wire.OutPoint, + chanPeer *Brontide, actorSystem *actor.ActorSystem) *rbfCloseActor { + + return &rbfCloseActor{ + chanPeer: chanPeer, + actorSystem: actorSystem, + chanPoint: chanPoint, + } +} + +// registerActor registers a new RBF close actor with the actor system. If an +// instance with the same service key and types are registered, we'll +// unregister before proceeding. +func (r *rbfCloseActor) registerActor() error { + // First, we'll make the service key of this RBF actor. This'll allow + // us to spawn the actor in the actor system. + actorKey := NewRbfCloserPeerServiceKey(r.chanPoint) + + // We only want to have a single actor instance for this rbf closer, + // so we'll now attempt to unregister any other instances. + actorKey.UnregisterAll(r.actorSystem) + + // Now that we know that no instances of the actor are present, let's + // register a new instance. We don't actually need the ref though, as + // any interested parties can look up the actor via the service key. + actorID := fmt.Sprintf( + "PeerWrapper(RbfChanCloser(%s))", r.chanPoint, + ) + if _, err := actorKey.Spawn(r.actorSystem, actorID, r); err != nil { + return fmt.Errorf("unable to spawn RBF close actor for "+ + "channel %v: %w", r.chanPoint, err) + } + + return nil +} + +// Receive implements the actor.ActorBehavior interface for the rbf closer +// wrapper. This allows us to expose our specific processes around the coop +// close flow as an actor. +// +// NOTE: This implements the actor.ActorBehavior interface. +func (r *rbfCloseActor) Receive(_ context.Context, + msg rbfCloseMessage) fn.Result[*CoopCloseUpdates] { + + type retType = *CoopCloseUpdates + + // If RBF coop close isn't permitted, then we'll return an error. + if !r.chanPeer.rbfCoopCloseAllowed() { + return fn.Errf[retType]("rbf coop close not enabled for " + + "channel") + } + + closeUpdates := &CoopCloseUpdates{ + UpdateChan: make(chan interface{}, 1), + ErrChan: make(chan error, 1), + } + + // We'll re-use the existing switch struct here, even though we're + // bypassing the switch entirely. We use the caller's context from the + // message so that canceling the caller (e.g., RPC stream close) also + // tears down the observer goroutine. + closeReq := htlcswitch.ChanClose{ + CloseType: contractcourt.CloseRegular, + ChanPoint: &msg.ChanPoint, + TargetFeePerKw: msg.FeeRate, + DeliveryScript: msg.DeliveryScript, + Updates: closeUpdates.UpdateChan, + Err: closeUpdates.ErrChan, + Ctx: msg.Ctx, + } + + err := r.chanPeer.startRbfChanCloser( + newRPCShutdownInit(&closeReq), msg.ChanPoint, + ) + if err != nil { + peerLog.Errorf("unable to start RBF chan closer for "+ + "channel %v: %v", msg.ChanPoint, err) + + return fn.Errf[retType]("unable to start RBF chan "+ + "closer: %w", err) + } + + return fn.Ok(closeUpdates) +} + +// RbfChanCloseActor is a router that will route messages to the relevant RBF +// chan closer. +type RbfChanCloseActor = actor.Router[rbfCloseMessage, *CoopCloseUpdates] + +// RbfChanCloserRouter creates a new router that will route messages to the +// relevant RBF chan closer. +func RbfChanCloserRouter(actorSystem *actor.ActorSystem, + serviceKey RbfCloseActorServiceKey) *RbfChanCloseActor { + + strategy := actor.NewRoundRobinStrategy[ + rbfCloseMessage, *CoopCloseUpdates, + ]() + + return actor.NewRouter( + actorSystem.Receptionist(), serviceKey, strategy, nil, + ) +} diff --git a/peer/rbf_close_wrapper_actor_test.go b/peer/rbf_close_wrapper_actor_test.go new file mode 100644 index 00000000000..56fe3afcfe9 --- /dev/null +++ b/peer/rbf_close_wrapper_actor_test.go @@ -0,0 +1,46 @@ +package peer + +import ( + "testing" + + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/actor" + "github.com/stretchr/testify/require" +) + +// TestRbfCloseActorSingleton verifies that registering an RBF close actor for +// the same channel point twice results in only a single registered actor. The +// second call to registerActor should unregister the first actor before +// spawning a replacement. +func TestRbfCloseActorSingleton(t *testing.T) { + t.Parallel() + + actorSystem := actor.NewActorSystem() + t.Cleanup(func() { + require.NoError(t, actorSystem.Shutdown()) + }) + + chanPoint := wire.OutPoint{Index: 1} + serviceKey := NewRbfCloserPeerServiceKey(chanPoint) + + // Register the actor for the first time. + actor1 := newRbfCloseActor(chanPoint, nil, actorSystem) + require.NoError(t, actor1.registerActor()) + + // Verify exactly one actor is registered. + refs := actor.FindInReceptionist( + actorSystem.Receptionist(), serviceKey, + ) + require.Len(t, refs, 1) + + // Register the actor again for the same channel point. + actor2 := newRbfCloseActor(chanPoint, nil, actorSystem) + require.NoError(t, actor2.registerActor()) + + // Verify there is still exactly one actor registered (the second one + // replaced the first). + refs = actor.FindInReceptionist( + actorSystem.Receptionist(), serviceKey, + ) + require.Len(t, refs, 1) +} diff --git a/protofsm/actor_wrapper.go b/protofsm/actor_wrapper.go new file mode 100644 index 00000000000..b0be9a07eb4 --- /dev/null +++ b/protofsm/actor_wrapper.go @@ -0,0 +1,23 @@ +package protofsm + +import ( + "fmt" + + "github.com/lightningnetwork/lnd/actor" +) + +// ActorMessage wraps an Event, in order to create a new message that can be +// used with the actor package. +type ActorMessage[Event any] struct { + actor.BaseMessage + + // Event is the event that is being sent to the actor. + Event Event +} + +// MessageType returns the type of the message. +// +// NOTE: This implements the actor.Message interface. +func (a ActorMessage[Event]) MessageType() string { + return fmt.Sprintf("ActorMessage(%T)", a.Event) +} diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index b3e16f5fd35..b0c376c3066 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -259,6 +259,26 @@ func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) { } } +// Receive processes a message and returns a Result. The provided context is the +// actor's internal context, which can be used to detect actor shutdown +// requests. +// +// NOTE: This implements the actor.ActorBehavior interface. +func (s *StateMachine[Event, Env]) Receive(ctx context.Context, + e ActorMessage[Event]) fn.Result[bool] { + + select { + case s.events <- e.Event: + return fn.Ok(true) + + case <-ctx.Done(): + return fn.Err[bool](ctx.Err()) + + case <-s.quit: + return fn.Err[bool](ErrStateMachineShutdown) + } +} + // CanHandle returns true if the target message can be routed to the state // machine. func (s *StateMachine[Event, Env]) CanHandle(msg msgmux.PeerMsg) bool { diff --git a/rpcserver.go b/rpcserver.go index e62de95fb1d..d4a9ce51bb5 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3018,13 +3018,27 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, rpcsLog.Infof("Bypassing Switch to do fee bump "+ "for ChannelPoint(%v)", chanPoint) - closeUpdates, err := r.server.AttemptRBFCloseUpdate( - updateStream.Context(), *chanPoint, feeRate, - deliveryScript, + // To perform this RBF bump, we'll send a bump message + // to the RBF close actor. We propagate the stream + // context so that cancellation of the RPC client also + // tears down the observer goroutine. + ctx := updateStream.Context() + rbfBumpMsg := peer.NewRbfBumpCloseMsg( + ctx, *chanPoint, feeRate, deliveryScript, + ) + rbfActorKey := peer.NewRbfCloserPeerServiceKey( + *chanPoint, ) + rbfRouter := peer.RbfChanCloserRouter( + r.server.actorSystem, rbfActorKey, + ) + + closeUpdates, err := rbfRouter.Ask( + ctx, rbfBumpMsg, + ).Await(ctx).Unpack() if err != nil { - return fmt.Errorf("unable to do RBF close "+ - "update: %w", err) + return fmt.Errorf("unable to ask for RBF "+ + "close: %w", err) } updateChan = closeUpdates.UpdateChan diff --git a/server.go b/server.go index 9eddf92e691..68c917d2b5d 100644 --- a/server.go +++ b/server.go @@ -2852,6 +2852,12 @@ func (s *server) Stop() error { s.sigPool.Stop() s.writePool.Stop() s.readPool.Stop() + + // Shut down the actor system last so any in-flight actor work + // triggered by the subsystems above has a chance to complete. + if err := s.actorSystem.Shutdown(); err != nil { + srvrLog.Warnf("failed to stop actor system: %v", err) + } }) return nil @@ -5622,74 +5628,6 @@ func (s *server) ChanHasRbfCoopCloser(peerPub *btcec.PublicKey, return targetPeer.ChanHasRbfCoopCloser(chanPoint) } -// attemptCoopRbfFeeBump attempts to look up the active chan closer for a -// channel given the outpoint. If found, we'll attempt to do a fee bump, -// returning channels used for updates. If the channel isn't currently active -// (p2p connection established), then his function will return an error. -func (s *server) attemptCoopRbfFeeBump(ctx context.Context, - chanPoint wire.OutPoint, feeRate chainfee.SatPerKWeight, - deliveryScript lnwire.DeliveryAddress) (*peer.CoopCloseUpdates, error) { - - // First, we'll attempt to look up the channel based on it's - // ChannelPoint. - channel, err := s.chanStateDB.FetchChannel(chanPoint) - if err != nil { - return nil, fmt.Errorf("unable to fetch channel: %w", err) - } - - // From the channel, we can now get the pubkey of the peer, then use - // that to eventually get the chan closer. - peerPub := channel.IdentityPub.SerializeCompressed() - - // Now that we have the peer pub, we can look up the peer itself. - s.mu.RLock() - targetPeer, ok := s.peersByPub[string(peerPub)] - s.mu.RUnlock() - if !ok { - return nil, fmt.Errorf("peer for ChannelPoint(%v) is "+ - "not online", chanPoint) - } - - closeUpdates, err := targetPeer.TriggerCoopCloseRbfBump( - ctx, chanPoint, feeRate, deliveryScript, - ) - if err != nil { - return nil, fmt.Errorf("unable to trigger coop rbf fee bump: "+ - "%w", err) - } - - return closeUpdates, nil -} - -// AttemptRBFCloseUpdate attempts to trigger a new RBF iteration for a co-op -// close update. This route it to be used only if the target channel in question -// is no longer active in the link. This can happen when we restart while we -// already have done a single RBF co-op close iteration. -func (s *server) AttemptRBFCloseUpdate(ctx context.Context, - chanPoint wire.OutPoint, feeRate chainfee.SatPerKWeight, - deliveryScript lnwire.DeliveryAddress) (*peer.CoopCloseUpdates, error) { - - // If the channel is present in the switch, then the request should flow - // through the switch instead. - chanID := lnwire.NewChanIDFromOutPoint(chanPoint) - if _, err := s.htlcSwitch.GetLink(chanID); err == nil { - return nil, fmt.Errorf("ChannelPoint(%v) is active in link, "+ - "invalid request", chanPoint) - } - - // At this point, we know that the channel isn't present in the link, so - // we'll check to see if we have an entry in the active chan closer map. - updates, err := s.attemptCoopRbfFeeBump( - ctx, chanPoint, feeRate, deliveryScript, - ) - if err != nil { - return nil, fmt.Errorf("unable to attempt coop rbf fee bump "+ - "ChannelPoint(%v)", chanPoint) - } - - return updates, nil -} - // calculateNodeAnnouncementTimestamp returns the timestamp to use for a node // announcement, ensuring it's at least one second after the previously // persisted timestamp. This ensures BOLT-07 compliance, which requires node