From d03d60889ae67cb45e1e83b8bd5ca8fc8bfa21e9 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 16 May 2025 17:19:23 -0700 Subject: [PATCH 1/4] peer: create new rbfCloseActor to decouple RPC RBF close bumps In this commit, we create a new rbfCloseActor wrapper struct. This will wrap the RPC operations to trigger a new RBF close bump within a new actor. In the next commit, we'll now register this actor, and clean up the call graph from the rpc server to this actor. --- peer/rbf_close_wrapper_actor.go | 189 +++++++++++++++++++++++++++ peer/rbf_close_wrapper_actor_test.go | 46 +++++++ 2 files changed, 235 insertions(+) create mode 100644 peer/rbf_close_wrapper_actor.go create mode 100644 peer/rbf_close_wrapper_actor_test.go diff --git a/peer/rbf_close_wrapper_actor.go b/peer/rbf_close_wrapper_actor.go new file mode 100644 index 0000000000..5200f64140 --- /dev/null +++ b/peer/rbf_close_wrapper_actor.go @@ -0,0 +1,189 @@ +package peer + +import ( + "context" + "fmt" + + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/actor" + "github.com/lightningnetwork/lnd/contractcourt" + "github.com/lightningnetwork/lnd/fn/v2" + "github.com/lightningnetwork/lnd/htlcswitch" + "github.com/lightningnetwork/lnd/lnwallet/chainfee" + "github.com/lightningnetwork/lnd/lnwire" +) + +// rbfCloseMessage is a message type that is used to trigger a cooperative fee +// bump, or initiate a close for the first time. +type rbfCloseMessage struct { + actor.BaseMessage + + // Ctx is the context of the caller that initiated the RBF close. This + // is propagated to the underlying close request so that cancellation + // of the caller (e.g. RPC stream disconnect) tears down the associated + // observer goroutine. The caller's context is distinct from the + // actor's own lifecycle context that is passed to Receive. + Ctx context.Context //nolint:containedctx + + // ChanPoint is the channel point of the channel to be closed. + ChanPoint wire.OutPoint + + // FeeRate is the fee rate to use for the transaction. + FeeRate chainfee.SatPerKWeight + + // DeliveryScript is the script to use for the transaction. + DeliveryScript lnwire.DeliveryAddress +} + +// MessageType returns the type of the message. +// +// NOTE: This is part of the actor.Message interface. +func (r rbfCloseMessage) MessageType() string { + return fmt.Sprintf("RbfCloseMessage(%v)", r.ChanPoint) +} + +// NewRbfBumpCloseMsg returns a message that can be sent to the RBF actor to +// initiate a new fee bump. +func NewRbfBumpCloseMsg(ctx context.Context, op wire.OutPoint, + feeRate chainfee.SatPerKWeight, + deliveryScript lnwire.DeliveryAddress) rbfCloseMessage { + + return rbfCloseMessage{ + Ctx: ctx, + ChanPoint: op, + FeeRate: feeRate, + DeliveryScript: deliveryScript, + } +} + +// RbfCloseActorServiceKey is a service key that can be used to reach an RBF +// chan closer. +type RbfCloseActorServiceKey = actor.ServiceKey[ + rbfCloseMessage, *CoopCloseUpdates, +] + +// NewRbfCloserPeerServiceKey returns a new service key that can be used to +// reach an RBF chan closer, via an active peer. +func NewRbfCloserPeerServiceKey(op wire.OutPoint) RbfCloseActorServiceKey { + opStr := op.String() + + // Just using the channel point here is enough, as we have a unique + // type here rbfCloseMessage which will handle the final actor + // selection. + actorKey := fmt.Sprintf("Peer(RbfChanCloser(%v))", opStr) + + return actor.NewServiceKey[rbfCloseMessage, *CoopCloseUpdates](actorKey) +} + +// rbfCloseActor is a wrapper around the Brontide peer to expose the internal +// RBF close state machine as an actor. This is intended for callers that need +// to obtain streaming close updates related to the RBF close process. +type rbfCloseActor struct { + chanPeer *Brontide + actorSystem *actor.ActorSystem + chanPoint wire.OutPoint +} + +// newRbfCloseActor creates a new instance of the RBF close wrapper actor. +func newRbfCloseActor(chanPoint wire.OutPoint, + chanPeer *Brontide, actorSystem *actor.ActorSystem) *rbfCloseActor { + + return &rbfCloseActor{ + chanPeer: chanPeer, + actorSystem: actorSystem, + chanPoint: chanPoint, + } +} + +// registerActor registers a new RBF close actor with the actor system. If an +// instance with the same service key and types are registered, we'll +// unregister before proceeding. +func (r *rbfCloseActor) registerActor() error { + // First, we'll make the service key of this RBF actor. This'll allow + // us to spawn the actor in the actor system. + actorKey := NewRbfCloserPeerServiceKey(r.chanPoint) + + // We only want to have a single actor instance for this rbf closer, + // so we'll now attempt to unregister any other instances. + actorKey.UnregisterAll(r.actorSystem) + + // Now that we know that no instances of the actor are present, let's + // register a new instance. We don't actually need the ref though, as + // any interested parties can look up the actor via the service key. + actorID := fmt.Sprintf( + "PeerWrapper(RbfChanCloser(%s))", r.chanPoint, + ) + if _, err := actorKey.Spawn(r.actorSystem, actorID, r); err != nil { + return fmt.Errorf("unable to spawn RBF close actor for "+ + "channel %v: %w", r.chanPoint, err) + } + + return nil +} + +// Receive implements the actor.ActorBehavior interface for the rbf closer +// wrapper. This allows us to expose our specific processes around the coop +// close flow as an actor. +// +// NOTE: This implements the actor.ActorBehavior interface. +func (r *rbfCloseActor) Receive(_ context.Context, + msg rbfCloseMessage) fn.Result[*CoopCloseUpdates] { + + type retType = *CoopCloseUpdates + + // If RBF coop close isn't permitted, then we'll return an error. + if !r.chanPeer.rbfCoopCloseAllowed() { + return fn.Errf[retType]("rbf coop close not enabled for " + + "channel") + } + + closeUpdates := &CoopCloseUpdates{ + UpdateChan: make(chan interface{}, 1), + ErrChan: make(chan error, 1), + } + + // We'll re-use the existing switch struct here, even though we're + // bypassing the switch entirely. We use the caller's context from the + // message so that canceling the caller (e.g., RPC stream close) also + // tears down the observer goroutine. + closeReq := htlcswitch.ChanClose{ + CloseType: contractcourt.CloseRegular, + ChanPoint: &msg.ChanPoint, + TargetFeePerKw: msg.FeeRate, + DeliveryScript: msg.DeliveryScript, + Updates: closeUpdates.UpdateChan, + Err: closeUpdates.ErrChan, + Ctx: msg.Ctx, + } + + err := r.chanPeer.startRbfChanCloser( + newRPCShutdownInit(&closeReq), msg.ChanPoint, + ) + if err != nil { + peerLog.Errorf("unable to start RBF chan closer for "+ + "channel %v: %v", msg.ChanPoint, err) + + return fn.Errf[retType]("unable to start RBF chan "+ + "closer: %w", err) + } + + return fn.Ok(closeUpdates) +} + +// RbfChanCloseActor is a router that will route messages to the relevant RBF +// chan closer. +type RbfChanCloseActor = actor.Router[rbfCloseMessage, *CoopCloseUpdates] + +// RbfChanCloserRouter creates a new router that will route messages to the +// relevant RBF chan closer. +func RbfChanCloserRouter(actorSystem *actor.ActorSystem, + serviceKey RbfCloseActorServiceKey) *RbfChanCloseActor { + + strategy := actor.NewRoundRobinStrategy[ + rbfCloseMessage, *CoopCloseUpdates, + ]() + + return actor.NewRouter( + actorSystem.Receptionist(), serviceKey, strategy, nil, + ) +} diff --git a/peer/rbf_close_wrapper_actor_test.go b/peer/rbf_close_wrapper_actor_test.go new file mode 100644 index 0000000000..56fe3afcfe --- /dev/null +++ b/peer/rbf_close_wrapper_actor_test.go @@ -0,0 +1,46 @@ +package peer + +import ( + "testing" + + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/actor" + "github.com/stretchr/testify/require" +) + +// TestRbfCloseActorSingleton verifies that registering an RBF close actor for +// the same channel point twice results in only a single registered actor. The +// second call to registerActor should unregister the first actor before +// spawning a replacement. +func TestRbfCloseActorSingleton(t *testing.T) { + t.Parallel() + + actorSystem := actor.NewActorSystem() + t.Cleanup(func() { + require.NoError(t, actorSystem.Shutdown()) + }) + + chanPoint := wire.OutPoint{Index: 1} + serviceKey := NewRbfCloserPeerServiceKey(chanPoint) + + // Register the actor for the first time. + actor1 := newRbfCloseActor(chanPoint, nil, actorSystem) + require.NoError(t, actor1.registerActor()) + + // Verify exactly one actor is registered. + refs := actor.FindInReceptionist( + actorSystem.Receptionist(), serviceKey, + ) + require.Len(t, refs, 1) + + // Register the actor again for the same channel point. + actor2 := newRbfCloseActor(chanPoint, nil, actorSystem) + require.NoError(t, actor2.registerActor()) + + // Verify there is still exactly one actor registered (the second one + // replaced the first). + refs = actor.FindInReceptionist( + actorSystem.Receptionist(), serviceKey, + ) + require.Len(t, refs, 1) +} From 9c59d4d33d902c4bdb4903f6e4d1b12248667130 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 16 May 2025 17:21:38 -0700 Subject: [PATCH 2/4] peer: register the rbfCloseActor, have RPC route fee bumps to it In this commit, we now register the rbfCloseActor when we create the rbf chan closer state machine. Now the RPC server no longer neesd to traverse a series of maps and pointers (rpcServer -> server -> peer -> activeCloseMap -> rbf chan closer) to trigger a new fee bump. Instead, it just creates the service key that it knows that the closer can be reached at, and sends a message to it using the returned actorRef/router. We also hide additional details re the various methods in play, as we only care about the type of message we expect to send and receive. --- peer/brontide.go | 90 +++++++++++++++++++++++++++--------------------- rpcserver.go | 24 ++++++++++--- server.go | 74 ++++----------------------------------- 3 files changed, 76 insertions(+), 112 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index 5ab4e8fa6b..0e8d58b20d 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -1762,6 +1762,10 @@ func (p *Brontide) Disconnect(reason error) { // Stop the onion peer actor if one was spawned. p.StopOnionActorIfExists() + // Unregister any RBF close actors registered for channels of this + // peer so we don't leave stale entries in the actor system. + p.unregisterRbfCloseActors() + // Ensure that the TCP connection is properly closed before continuing. p.cfg.Conn.Close() @@ -1793,6 +1797,33 @@ func (p *Brontide) StopOnionActorIfExists() { ) } +// unregisterRbfCloseActors removes any RBF close actors registered for this +// peer's active channels from the actor system. This should be called on +// disconnect so we don't leave stale RBF close actors for a peer that is no +// longer connected. This is idempotent and safe to call multiple times. +func (p *Brontide) unregisterRbfCloseActors() { + if p.cfg.ActorSystem == nil { + return + } + + p.activeChannels.Range(func(_ lnwire.ChannelID, + channel *lnwallet.LightningChannel) bool { + + // Pending channels are tracked with a nil value in the map, + // so skip those as they have no channel point to look up. + if channel == nil { + return true + } + + actorKey := NewRbfCloserPeerServiceKey( + channel.ChannelPoint(), + ) + actorKey.UnregisterAll(p.cfg.ActorSystem) + + return true + }) +} + // readNextMessage reads, and returns the next message on the wire along with // any additional raw payload. func (p *Brontide) readNextMessage() (lnwire.Message, error) { @@ -4233,8 +4264,28 @@ func (p *Brontide) initRbfChanCloser( "close: %w", err) } + // We store the closer first so that any lookups that race with actor + // registration will find the chan closer already in place. p.activeChanCloses.Store(chanID, makeRbfCloser(&chanCloser)) + // In addition to the message router, we'll register the state machine + // with the actor system. + if p.cfg.ActorSystem != nil { + p.log.Infof("Registering RBF actor for channel %v", + channel.ChannelPoint()) + + actorWrapper := newRbfCloseActor( + channel.ChannelPoint(), p, p.cfg.ActorSystem, + ) + if err := actorWrapper.registerActor(); err != nil { + chanCloser.Stop() + p.activeChanCloses.Delete(chanID) + + return nil, fmt.Errorf("unable to register RBF close "+ + "actor: %w", err) + } + } + // Now that we've created the rbf closer state machine, we'll launch a // new goroutine to eventually send in the ChannelFlushed event once // needed. @@ -5817,42 +5868,3 @@ func (p *Brontide) ChanHasRbfCoopCloser(chanPoint wire.OutPoint) bool { return chanCloser.IsRight() } - -// TriggerCoopCloseRbfBump given a chan ID, and the params needed to trigger a -// new RBF co-op close update, a bump is attempted. A channel used for updates, -// along with one used to o=communicate any errors is returned. If no chan -// closer is found, then false is returned for the second argument. -func (p *Brontide) TriggerCoopCloseRbfBump(ctx context.Context, - chanPoint wire.OutPoint, feeRate chainfee.SatPerKWeight, - deliveryScript lnwire.DeliveryAddress) (*CoopCloseUpdates, error) { - - // If RBF coop close isn't permitted, then we'll an error. - if !p.rbfCoopCloseAllowed() { - return nil, fmt.Errorf("rbf coop close not enabled for " + - "channel") - } - - closeUpdates := &CoopCloseUpdates{ - UpdateChan: make(chan interface{}, 1), - ErrChan: make(chan error, 1), - } - - // We'll re-use the existing switch struct here, even though we're - // bypassing the switch entirely. - closeReq := htlcswitch.ChanClose{ - CloseType: contractcourt.CloseRegular, - ChanPoint: &chanPoint, - TargetFeePerKw: feeRate, - DeliveryScript: deliveryScript, - Updates: closeUpdates.UpdateChan, - Err: closeUpdates.ErrChan, - Ctx: ctx, - } - - err := p.startRbfChanCloser(newRPCShutdownInit(&closeReq), chanPoint) - if err != nil { - return nil, err - } - - return closeUpdates, nil -} diff --git a/rpcserver.go b/rpcserver.go index e62de95fb1..d4a9ce51bb 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3018,13 +3018,27 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, rpcsLog.Infof("Bypassing Switch to do fee bump "+ "for ChannelPoint(%v)", chanPoint) - closeUpdates, err := r.server.AttemptRBFCloseUpdate( - updateStream.Context(), *chanPoint, feeRate, - deliveryScript, + // To perform this RBF bump, we'll send a bump message + // to the RBF close actor. We propagate the stream + // context so that cancellation of the RPC client also + // tears down the observer goroutine. + ctx := updateStream.Context() + rbfBumpMsg := peer.NewRbfBumpCloseMsg( + ctx, *chanPoint, feeRate, deliveryScript, + ) + rbfActorKey := peer.NewRbfCloserPeerServiceKey( + *chanPoint, ) + rbfRouter := peer.RbfChanCloserRouter( + r.server.actorSystem, rbfActorKey, + ) + + closeUpdates, err := rbfRouter.Ask( + ctx, rbfBumpMsg, + ).Await(ctx).Unpack() if err != nil { - return fmt.Errorf("unable to do RBF close "+ - "update: %w", err) + return fmt.Errorf("unable to ask for RBF "+ + "close: %w", err) } updateChan = closeUpdates.UpdateChan diff --git a/server.go b/server.go index 9eddf92e69..68c917d2b5 100644 --- a/server.go +++ b/server.go @@ -2852,6 +2852,12 @@ func (s *server) Stop() error { s.sigPool.Stop() s.writePool.Stop() s.readPool.Stop() + + // Shut down the actor system last so any in-flight actor work + // triggered by the subsystems above has a chance to complete. + if err := s.actorSystem.Shutdown(); err != nil { + srvrLog.Warnf("failed to stop actor system: %v", err) + } }) return nil @@ -5622,74 +5628,6 @@ func (s *server) ChanHasRbfCoopCloser(peerPub *btcec.PublicKey, return targetPeer.ChanHasRbfCoopCloser(chanPoint) } -// attemptCoopRbfFeeBump attempts to look up the active chan closer for a -// channel given the outpoint. If found, we'll attempt to do a fee bump, -// returning channels used for updates. If the channel isn't currently active -// (p2p connection established), then his function will return an error. -func (s *server) attemptCoopRbfFeeBump(ctx context.Context, - chanPoint wire.OutPoint, feeRate chainfee.SatPerKWeight, - deliveryScript lnwire.DeliveryAddress) (*peer.CoopCloseUpdates, error) { - - // First, we'll attempt to look up the channel based on it's - // ChannelPoint. - channel, err := s.chanStateDB.FetchChannel(chanPoint) - if err != nil { - return nil, fmt.Errorf("unable to fetch channel: %w", err) - } - - // From the channel, we can now get the pubkey of the peer, then use - // that to eventually get the chan closer. - peerPub := channel.IdentityPub.SerializeCompressed() - - // Now that we have the peer pub, we can look up the peer itself. - s.mu.RLock() - targetPeer, ok := s.peersByPub[string(peerPub)] - s.mu.RUnlock() - if !ok { - return nil, fmt.Errorf("peer for ChannelPoint(%v) is "+ - "not online", chanPoint) - } - - closeUpdates, err := targetPeer.TriggerCoopCloseRbfBump( - ctx, chanPoint, feeRate, deliveryScript, - ) - if err != nil { - return nil, fmt.Errorf("unable to trigger coop rbf fee bump: "+ - "%w", err) - } - - return closeUpdates, nil -} - -// AttemptRBFCloseUpdate attempts to trigger a new RBF iteration for a co-op -// close update. This route it to be used only if the target channel in question -// is no longer active in the link. This can happen when we restart while we -// already have done a single RBF co-op close iteration. -func (s *server) AttemptRBFCloseUpdate(ctx context.Context, - chanPoint wire.OutPoint, feeRate chainfee.SatPerKWeight, - deliveryScript lnwire.DeliveryAddress) (*peer.CoopCloseUpdates, error) { - - // If the channel is present in the switch, then the request should flow - // through the switch instead. - chanID := lnwire.NewChanIDFromOutPoint(chanPoint) - if _, err := s.htlcSwitch.GetLink(chanID); err == nil { - return nil, fmt.Errorf("ChannelPoint(%v) is active in link, "+ - "invalid request", chanPoint) - } - - // At this point, we know that the channel isn't present in the link, so - // we'll check to see if we have an entry in the active chan closer map. - updates, err := s.attemptCoopRbfFeeBump( - ctx, chanPoint, feeRate, deliveryScript, - ) - if err != nil { - return nil, fmt.Errorf("unable to attempt coop rbf fee bump "+ - "ChannelPoint(%v)", chanPoint) - } - - return updates, nil -} - // calculateNodeAnnouncementTimestamp returns the timestamp to use for a node // announcement, ensuring it's at least one second after the previously // persisted timestamp. This ensures BOLT-07 compliance, which requires node From 1f3e547ff0280a7e81ff0d2e671490c20e21bf93 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 16 May 2025 17:23:06 -0700 Subject: [PATCH 3/4] protofsm: implement the actor.ActorBehavior interface for StateMachine In this commit, we implement the actor.ActorBehavior interface for StateMachine. This enables the state machine executor to be registered as an actor, and have messages be sent to it via a unique ServiceKey that a concrete instance will set. --- protofsm/actor_wrapper.go | 23 +++++++++++++++++++++++ protofsm/state_machine.go | 20 ++++++++++++++++++++ 2 files changed, 43 insertions(+) create mode 100644 protofsm/actor_wrapper.go diff --git a/protofsm/actor_wrapper.go b/protofsm/actor_wrapper.go new file mode 100644 index 0000000000..b0be9a07eb --- /dev/null +++ b/protofsm/actor_wrapper.go @@ -0,0 +1,23 @@ +package protofsm + +import ( + "fmt" + + "github.com/lightningnetwork/lnd/actor" +) + +// ActorMessage wraps an Event, in order to create a new message that can be +// used with the actor package. +type ActorMessage[Event any] struct { + actor.BaseMessage + + // Event is the event that is being sent to the actor. + Event Event +} + +// MessageType returns the type of the message. +// +// NOTE: This implements the actor.Message interface. +func (a ActorMessage[Event]) MessageType() string { + return fmt.Sprintf("ActorMessage(%T)", a.Event) +} diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index b3e16f5fd3..b0c376c306 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -259,6 +259,26 @@ func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) { } } +// Receive processes a message and returns a Result. The provided context is the +// actor's internal context, which can be used to detect actor shutdown +// requests. +// +// NOTE: This implements the actor.ActorBehavior interface. +func (s *StateMachine[Event, Env]) Receive(ctx context.Context, + e ActorMessage[Event]) fn.Result[bool] { + + select { + case s.events <- e.Event: + return fn.Ok(true) + + case <-ctx.Done(): + return fn.Err[bool](ctx.Err()) + + case <-s.quit: + return fn.Err[bool](ErrStateMachineShutdown) + } +} + // CanHandle returns true if the target message can be routed to the state // machine. func (s *StateMachine[Event, Env]) CanHandle(msg msgmux.PeerMsg) bool { From 78d2b9f03889d9802d8e82eec5931360fb7dd351 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 16 May 2025 17:23:46 -0700 Subject: [PATCH 4/4] lnwallet/chancloser: create unique ServiceKey for the RBF chan closer This can be used to allow any system to send a message to the RBF chan closer if it knows the proper service key. In the future, we can use this to redo the msgmux.Router in terms of the new actor abstractions. --- lnwallet/chancloser/rbf_coop_states.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lnwallet/chancloser/rbf_coop_states.go b/lnwallet/chancloser/rbf_coop_states.go index 61f7718b5a..9319c1d271 100644 --- a/lnwallet/chancloser/rbf_coop_states.go +++ b/lnwallet/chancloser/rbf_coop_states.go @@ -1026,3 +1026,7 @@ type RbfEvent = protofsm.EmittedEvent[ProtocolEvent] // RbfStateSub is a type alias for the state subscription type of the RBF chan // closer. type RbfStateSub = protofsm.StateSubscriber[ProtocolEvent, *Environment] + +// ChanCloserActorMsg is an adapter to enable the state machine executor that +// runs this state machine to be passed around as an actor. +type ChanCloserActorMsg = protofsm.ActorMessage[ProtocolEvent]