Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lnwallet/chancloser/rbf_coop_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,3 +1026,7 @@ type RbfEvent = protofsm.EmittedEvent[ProtocolEvent]
// RbfStateSub is a type alias for the state subscription type of the RBF chan
// closer.
type RbfStateSub = protofsm.StateSubscriber[ProtocolEvent, *Environment]

// ChanCloserActorMsg is an adapter to enable the state machine executor that
// runs this state machine to be passed around as an actor.
type ChanCloserActorMsg = protofsm.ActorMessage[ProtocolEvent]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't used anywhere. The last two commits are dead code. 1f3e547 and 78d2b9f don't reference anything from the firsts two commits (d03d608 and 9c59d4d)

I'd drop them, the PR still works exactly the same.

90 changes: 51 additions & 39 deletions peer/brontide.go
Original file line number Diff line number Diff line change
Expand Up @@ -1762,6 +1762,10 @@ func (p *Brontide) Disconnect(reason error) {
// Stop the onion peer actor if one was spawned.
p.StopOnionActorIfExists()

// Unregister any RBF close actors registered for channels of this
// peer so we don't leave stale entries in the actor system.
p.unregisterRbfCloseActors()

// Ensure that the TCP connection is properly closed before continuing.
p.cfg.Conn.Close()

Expand Down Expand Up @@ -1793,6 +1797,33 @@ func (p *Brontide) StopOnionActorIfExists() {
)
}

// unregisterRbfCloseActors removes any RBF close actors registered for this
// peer's active channels from the actor system. This should be called on
// disconnect so we don't leave stale RBF close actors for a peer that is no
// longer connected. This is idempotent and safe to call multiple times.
func (p *Brontide) unregisterRbfCloseActors() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if a single channel close completes (confirmed on-chain) while the peer stays connected, the actor remains registered in the system even though the closer is gone. A subsequent Ask to that actor would call
startRbfChanCloser which would fail to find the closer in activeChanCloses.

Two of the activeChanCloses.Delete call sites should also unregister the actor for that channel point. Something like:

if p.cfg.ActorSystem != nil {
        actorKey := NewRbfCloserPeerServiceKey(chanPoint)
        actorKey.UnregisterAll(p.cfg.ActorSystem)
}

The bulk cleanup in unregisterRbfCloseActors on disconnect is still useful as a safety net, but individual cleanup at those Delete sites is needed to avoid stale actors accumulating during normal operation.

I've considered all 10 call sites, and I think the below two qualify for unregistering the actor for that channel point:

  • L3989 -- close request context cancelled. The RBF closer was active, gets deleted.
  • L4722 -- finalizeChanClosure / WipeChannel. Channel is done, fully closed on-chain.

if p.cfg.ActorSystem == nil {
return
}

p.activeChannels.Range(func(_ lnwire.ChannelID,
channel *lnwallet.LightningChannel) bool {

// Pending channels are tracked with a nil value in the map,
// so skip those as they have no channel point to look up.
if channel == nil {
return true
}

actorKey := NewRbfCloserPeerServiceKey(
channel.ChannelPoint(),
)
actorKey.UnregisterAll(p.cfg.ActorSystem)

return true
})
}

// readNextMessage reads, and returns the next message on the wire along with
// any additional raw payload.
func (p *Brontide) readNextMessage() (lnwire.Message, error) {
Expand Down Expand Up @@ -4233,8 +4264,28 @@ func (p *Brontide) initRbfChanCloser(
"close: %w", err)
}

// We store the closer first so that any lookups that race with actor
// registration will find the chan closer already in place.
p.activeChanCloses.Store(chanID, makeRbfCloser(&chanCloser))

// In addition to the message router, we'll register the state machine
// with the actor system.
if p.cfg.ActorSystem != nil {
p.log.Infof("Registering RBF actor for channel %v",
channel.ChannelPoint())

actorWrapper := newRbfCloseActor(
Comment thread
Roasbeef marked this conversation as resolved.
channel.ChannelPoint(), p, p.cfg.ActorSystem,
)
if err := actorWrapper.registerActor(); err != nil {
chanCloser.Stop()
p.activeChanCloses.Delete(chanID)

return nil, fmt.Errorf("unable to register RBF close "+
"actor: %w", err)
}
}

// Now that we've created the rbf closer state machine, we'll launch a
// new goroutine to eventually send in the ChannelFlushed event once
// needed.
Expand Down Expand Up @@ -5817,42 +5868,3 @@ func (p *Brontide) ChanHasRbfCoopCloser(chanPoint wire.OutPoint) bool {

return chanCloser.IsRight()
}

// TriggerCoopCloseRbfBump given a chan ID, and the params needed to trigger a
// new RBF co-op close update, a bump is attempted. A channel used for updates,
// along with one used to o=communicate any errors is returned. If no chan
// closer is found, then false is returned for the second argument.
func (p *Brontide) TriggerCoopCloseRbfBump(ctx context.Context,
chanPoint wire.OutPoint, feeRate chainfee.SatPerKWeight,
deliveryScript lnwire.DeliveryAddress) (*CoopCloseUpdates, error) {

// If RBF coop close isn't permitted, then we'll an error.
if !p.rbfCoopCloseAllowed() {
return nil, fmt.Errorf("rbf coop close not enabled for " +
"channel")
}

closeUpdates := &CoopCloseUpdates{
UpdateChan: make(chan interface{}, 1),
ErrChan: make(chan error, 1),
}

// We'll re-use the existing switch struct here, even though we're
// bypassing the switch entirely.
closeReq := htlcswitch.ChanClose{
CloseType: contractcourt.CloseRegular,
ChanPoint: &chanPoint,
TargetFeePerKw: feeRate,
DeliveryScript: deliveryScript,
Updates: closeUpdates.UpdateChan,
Err: closeUpdates.ErrChan,
Ctx: ctx,
}

err := p.startRbfChanCloser(newRPCShutdownInit(&closeReq), chanPoint)
if err != nil {
return nil, err
}

return closeUpdates, nil
}
189 changes: 189 additions & 0 deletions peer/rbf_close_wrapper_actor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package peer

import (
"context"
"fmt"

"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/actor"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwire"
)

// rbfCloseMessage is a message type that is used to trigger a cooperative fee
// bump, or initiate a close for the first time.
type rbfCloseMessage struct {
actor.BaseMessage

// Ctx is the context of the caller that initiated the RBF close. This
// is propagated to the underlying close request so that cancellation
// of the caller (e.g. RPC stream disconnect) tears down the associated
// observer goroutine. The caller's context is distinct from the
// actor's own lifecycle context that is passed to Receive.
Ctx context.Context //nolint:containedctx

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ctx passed to Receive is the actor's internal context (a.ctx), not the caller's RPC stream context. This means observeRbfCloseUpdates can never detect RPC client disconnection via closeReq.Ctx.Done(), leaking the observer goroutine. Propagating the caller's context through the message restores the old TriggerCoopCloseRbfBump behavior.

Suggested change
// Ctx is the caller's context (e.g., the RPC stream context), used to detect client disconnection.
Ctx context.Context

// ChanPoint is the channel point of the channel to be closed.
ChanPoint wire.OutPoint

// FeeRate is the fee rate to use for the transaction.
FeeRate chainfee.SatPerKWeight

// DeliveryScript is the script to use for the transaction.
DeliveryScript lnwire.DeliveryAddress
}

// MessageType returns the type of the message.
//
// NOTE: This is part of the actor.Message interface.
func (r rbfCloseMessage) MessageType() string {
return fmt.Sprintf("RbfCloseMessage(%v)", r.ChanPoint)
}

// NewRbfBumpCloseMsg returns a message that can be sent to the RBF actor to
// initiate a new fee bump.
func NewRbfBumpCloseMsg(ctx context.Context, op wire.OutPoint,
feeRate chainfee.SatPerKWeight,
deliveryScript lnwire.DeliveryAddress) rbfCloseMessage {

return rbfCloseMessage{
Ctx: ctx,
ChanPoint: op,
FeeRate: feeRate,
DeliveryScript: deliveryScript,
}
}

// RbfCloseActorServiceKey is a service key that can be used to reach an RBF
// chan closer.
type RbfCloseActorServiceKey = actor.ServiceKey[
rbfCloseMessage, *CoopCloseUpdates,
]

// NewRbfCloserPeerServiceKey returns a new service key that can be used to
Comment thread
Roasbeef marked this conversation as resolved.
// reach an RBF chan closer, via an active peer.
func NewRbfCloserPeerServiceKey(op wire.OutPoint) RbfCloseActorServiceKey {
opStr := op.String()

// Just using the channel point here is enough, as we have a unique
// type here rbfCloseMessage which will handle the final actor
// selection.
actorKey := fmt.Sprintf("Peer(RbfChanCloser(%v))", opStr)

return actor.NewServiceKey[rbfCloseMessage, *CoopCloseUpdates](actorKey)
}

// rbfCloseActor is a wrapper around the Brontide peer to expose the internal
// RBF close state machine as an actor. This is intended for callers that need
// to obtain streaming close updates related to the RBF close process.
type rbfCloseActor struct {
chanPeer *Brontide
actorSystem *actor.ActorSystem
chanPoint wire.OutPoint
}

// newRbfCloseActor creates a new instance of the RBF close wrapper actor.
func newRbfCloseActor(chanPoint wire.OutPoint,
chanPeer *Brontide, actorSystem *actor.ActorSystem) *rbfCloseActor {

return &rbfCloseActor{
chanPeer: chanPeer,
actorSystem: actorSystem,
chanPoint: chanPoint,
}
}

// registerActor registers a new RBF close actor with the actor system. If an
// instance with the same service key and types are registered, we'll
// unregister before proceeding.
func (r *rbfCloseActor) registerActor() error {
// First, we'll make the service key of this RBF actor. This'll allow
// us to spawn the actor in the actor system.
actorKey := NewRbfCloserPeerServiceKey(r.chanPoint)

// We only want to have a single actor instance for this rbf closer,
// so we'll now attempt to unregister any other instances.
actorKey.UnregisterAll(r.actorSystem)

// Now that we know that no instances of the actor are present, let's
// register a new instance. We don't actually need the ref though, as
// any interested parties can look up the actor via the service key.
actorID := fmt.Sprintf(
"PeerWrapper(RbfChanCloser(%s))", r.chanPoint,
)
if _, err := actorKey.Spawn(r.actorSystem, actorID, r); err != nil {
return fmt.Errorf("unable to spawn RBF close actor for "+
"channel %v: %w", r.chanPoint, err)
}

return nil
}

// Receive implements the actor.ActorBehavior interface for the rbf closer
// wrapper. This allows us to expose our specific processes around the coop
// close flow as an actor.
//
// NOTE: This implements the actor.ActorBehavior interface.
func (r *rbfCloseActor) Receive(_ context.Context,
msg rbfCloseMessage) fn.Result[*CoopCloseUpdates] {

type retType = *CoopCloseUpdates

// If RBF coop close isn't permitted, then we'll return an error.
if !r.chanPeer.rbfCoopCloseAllowed() {
return fn.Errf[retType]("rbf coop close not enabled for " +
"channel")
}

closeUpdates := &CoopCloseUpdates{
UpdateChan: make(chan interface{}, 1),
ErrChan: make(chan error, 1),
}

// We'll re-use the existing switch struct here, even though we're
// bypassing the switch entirely. We use the caller's context from the
// message so that canceling the caller (e.g., RPC stream close) also
// tears down the observer goroutine.
closeReq := htlcswitch.ChanClose{
CloseType: contractcourt.CloseRegular,
ChanPoint: &msg.ChanPoint,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check r.chanPoint != msg.ChanPoint? otherwise it just forwards blindly.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? The only way to send to this would be to make a service key, which itself uses the channel point of the incoming message to find the actor.

TargetFeePerKw: msg.FeeRate,
DeliveryScript: msg.DeliveryScript,
Updates: closeUpdates.UpdateChan,
Err: closeUpdates.ErrChan,
Ctx: msg.Ctx,
}

err := r.chanPeer.startRbfChanCloser(
newRPCShutdownInit(&closeReq), msg.ChanPoint,
)
if err != nil {
peerLog.Errorf("unable to start RBF chan closer for "+
"channel %v: %v", msg.ChanPoint, err)

return fn.Errf[retType]("unable to start RBF chan "+
"closer: %w", err)
}

return fn.Ok(closeUpdates)
}

// RbfChanCloseActor is a router that will route messages to the relevant RBF
// chan closer.
type RbfChanCloseActor = actor.Router[rbfCloseMessage, *CoopCloseUpdates]
Comment thread
erickcestari marked this conversation as resolved.

// RbfChanCloserRouter creates a new router that will route messages to the
// relevant RBF chan closer.
func RbfChanCloserRouter(actorSystem *actor.ActorSystem,
serviceKey RbfCloseActorServiceKey) *RbfChanCloseActor {

strategy := actor.NewRoundRobinStrategy[
rbfCloseMessage, *CoopCloseUpdates,
]()

return actor.NewRouter(
actorSystem.Receptionist(), serviceKey, strategy, nil,
)
}
46 changes: 46 additions & 0 deletions peer/rbf_close_wrapper_actor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package peer

import (
"testing"

"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/actor"
"github.com/stretchr/testify/require"
)

// TestRbfCloseActorSingleton verifies that registering an RBF close actor for
// the same channel point twice results in only a single registered actor. The
// second call to registerActor should unregister the first actor before
// spawning a replacement.
func TestRbfCloseActorSingleton(t *testing.T) {
t.Parallel()

actorSystem := actor.NewActorSystem()
t.Cleanup(func() {
require.NoError(t, actorSystem.Shutdown())
})

chanPoint := wire.OutPoint{Index: 1}
serviceKey := NewRbfCloserPeerServiceKey(chanPoint)

// Register the actor for the first time.
actor1 := newRbfCloseActor(chanPoint, nil, actorSystem)
require.NoError(t, actor1.registerActor())

// Verify exactly one actor is registered.
refs := actor.FindInReceptionist(
actorSystem.Receptionist(), serviceKey,
)
require.Len(t, refs, 1)

// Register the actor again for the same channel point.
actor2 := newRbfCloseActor(chanPoint, nil, actorSystem)
require.NoError(t, actor2.registerActor())

// Verify there is still exactly one actor registered (the second one
// replaced the first).
refs = actor.FindInReceptionist(
actorSystem.Receptionist(), serviceKey,
)
require.Len(t, refs, 1)
}
23 changes: 23 additions & 0 deletions protofsm/actor_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package protofsm

import (
"fmt"

"github.com/lightningnetwork/lnd/actor"
)

// ActorMessage wraps an Event, in order to create a new message that can be
// used with the actor package.
type ActorMessage[Event any] struct {
actor.BaseMessage

// Event is the event that is being sent to the actor.
Event Event
}

// MessageType returns the type of the message.
//
// NOTE: This implements the actor.Message interface.
func (a ActorMessage[Event]) MessageType() string {
return fmt.Sprintf("ActorMessage(%T)", a.Event)
}
Loading
Loading