From c493f0c0a9c8af19cb81bdb726fe455f5b77c45c Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 9 May 2025 16:14:43 -0700 Subject: [PATCH 1/2] peer+lnd: add new CLI option to control if we D/C on slow pongs In this commit, we add a new CLI option to control if we D/C on slow pongs or not. Due to the existence of head-of-the-line blocking at various levels of abstraction (app buffer, slow processing, TCP kernel buffers, etc), if there's a flurry of gossip messages (eg: 1K channel updates), then even with a reasonable processing latency, a peer may still not read our ping in time. To give users another option, we add a flag that allows users to disable this behavior. The default remains. --- config.go | 18 +++++-- peer/brontide.go | 32 ++++++++--- peer/ping_manager.go | 109 +++++++++++++++++++++++++++----------- peer/ping_manager_test.go | 88 +++++++++++++++++------------- sample-lnd.conf | 5 ++ server.go | 1 + 6 files changed, 175 insertions(+), 78 deletions(-) diff --git a/config.go b/config.go index 824bcc55f67..a0bfe02b782 100644 --- a/config.go +++ b/config.go @@ -251,6 +251,11 @@ const ( defaultPrunedNodeMaxPeers = 4 defaultNeutrinoMaxPeers = 8 + + // defaultNoDisconnectOnPongFailure is the default value for whether we + // should *not* disconnect from a peer if we don't receive a pong + // response in time after we send a ping. + defaultNoDisconnectOnPongFailure = false ) var ( @@ -527,6 +532,10 @@ type Config struct { // NumRestrictedSlots is the number of restricted slots we'll allocate // in the server. NumRestrictedSlots uint64 `long:"num-restricted-slots" description:"The number of restricted slots we'll allocate in the server."` + + // NoDisconnectOnPongFailure controls if we'll disconnect if a peer + // doesn't respond to a pong in time. + NoDisconnectOnPongFailure bool `long:"no-disconnect-on-pong-failure" description:"If true, a peer will *not* be disconnected if a pong is not received in time or is mismatched. Defaults to false, meaning peers *will* be disconnected on pong failure."` } // GRPCConfig holds the configuration options for the gRPC server. @@ -747,10 +756,11 @@ func DefaultConfig() Config { ServerPingTimeout: defaultGrpcServerPingTimeout, ClientPingMinWait: defaultGrpcClientPingMinWait, }, - LogConfig: build.DefaultLogConfig(), - WtClient: lncfg.DefaultWtClientCfg(), - HTTPHeaderTimeout: DefaultHTTPHeaderTimeout, - NumRestrictedSlots: DefaultNumRestrictedSlots, + LogConfig: build.DefaultLogConfig(), + WtClient: lncfg.DefaultWtClientCfg(), + HTTPHeaderTimeout: DefaultHTTPHeaderTimeout, + NumRestrictedSlots: DefaultNumRestrictedSlots, + NoDisconnectOnPongFailure: defaultNoDisconnectOnPongFailure, } } diff --git a/peer/brontide.go b/peer/brontide.go index bfc603ae8af..da4aa610a44 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -94,7 +94,7 @@ const ( torTimeoutMultiplier = 3 // msgStreamSize is the size of the message streams. - msgStreamSize = 5 + msgStreamSize = 50 ) var ( @@ -455,6 +455,10 @@ type Config struct { // experimental endorsement signals should be set. ShouldFwdExpEndorsement func() bool + // NoDisconnectOnPongFailure indicates whether the peer should *not* be + // disconnected if a pong is not received in time or is mismatched. + NoDisconnectOnPongFailure bool + // Quit is the server's quit channel. If this is closed, we halt operation. Quit chan struct{} } @@ -735,11 +739,27 @@ func NewBrontide(cfg Config) *Brontide { SendPing: func(ping *lnwire.Ping) { p.queueMsg(ping, nil) }, - OnPongFailure: func(err error) { - eStr := "pong response failure for %s: %v " + - "-- disconnecting" - p.log.Warnf(eStr, p, err) - go p.Disconnect(fmt.Errorf(eStr, p, err)) + OnPongFailure: func(reason error, + timeWaitedForPong time.Duration, + lastKnownRTT time.Duration) { + + logMsg := fmt.Sprintf("pong response "+ + "failure for %s: %v. Time waited for this "+ + "pong: %v. Last successful RTT: %v.", + p, reason, timeWaitedForPong, lastKnownRTT) + + // If NoDisconnectOnPongFailure is true, we don't + // disconnect. Otherwise (if it's false, the default), + // we disconnect. + if p.cfg.NoDisconnectOnPongFailure { + p.log.Warnf("%s -- not disconnecting "+ + "due to config", logMsg) + return + } + + p.log.Warnf("%s -- disconnecting", logMsg) + + go p.Disconnect(fmt.Errorf("pong failure: %w", reason)) }, }) diff --git a/peer/ping_manager.go b/peer/ping_manager.go index f5c6180be13..7686cbe8b01 100644 --- a/peer/ping_manager.go +++ b/peer/ping_manager.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "time" + "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/lnwire" ) @@ -36,7 +37,8 @@ type PingManagerConfig struct { // OnPongFailure is a closure that is responsible for executing the // logic when a Pong message is either late or does not match our // expectations for that Pong - OnPongFailure func(error) + OnPongFailure func(failureReason error, timeWaitedForPong time.Duration, + lastKnownRTT time.Duration) } // PingManager is a structure that is designed to manage the internal state @@ -108,6 +110,26 @@ func (m *PingManager) Start() error { return err } +// getLastRTT safely retrieves the last known RTT, returning 0 if none exists. +func (m *PingManager) getLastRTT() time.Duration { + rttPtr := m.pingTime.Load() + if rttPtr == nil { + return 0 + } + + return *rttPtr +} + +// pendingPingWait calculates the time waited since the last ping was sent. If +// no ping time is reported, None is returned. defaultDuration. +func (m *PingManager) pendingPingWait() fn.Option[time.Duration] { + if m.pingLastSend != nil { + return fn.Some(time.Since(*m.pingLastSend)) + } + + return fn.None[time.Duration]() +} + // pingHandler is the main goroutine responsible for enforcing the ping/pong // protocol. func (m *PingManager) pingHandler() { @@ -119,6 +141,10 @@ func (m *PingManager) pingHandler() { <-m.pingTimeout.C } + // Because we don't know if the OnPingFailure callback actually + // disconnects a peer (dependent on user config), we should never return + // from this loop unless the ping manager is stopped explicitly (which + // happens on disconnect). for { select { case <-m.pingTicker.C: @@ -127,12 +153,20 @@ func (m *PingManager) pingHandler() { // awaiting a pong response. This should never occur, // but if it does, it implies a timeout. if m.outstandingPongSize >= 0 { - e := errors.New("impossible: new ping" + - "in unclean state", + // Ping was outstanding, meaning it timed out by + // the arrival of the next ping interval. + timeWaited := m.pendingPingWait().UnwrapOr( + m.cfg.IntervalDuration, + ) + lastRTT := m.getLastRTT() + + m.cfg.OnPongFailure( + errors.New("ping timed "+ + "out by next interval"), + timeWaited, lastRTT, ) - m.cfg.OnPongFailure(e) - return + m.resetPingState() } pongSize := m.cfg.NewPongSize() @@ -143,53 +177,67 @@ func (m *PingManager) pingHandler() { // Set up our bookkeeping for the new Ping. if err := m.setPingState(pongSize); err != nil { - m.cfg.OnPongFailure(err) + // This is an internal error related to timer + // reset. Pass it to OnPongFailure as it's + // critical. Current and last RTT are not + // directly applicable here. + m.cfg.OnPongFailure(err, 0, 0) + + m.resetPingState() - return + continue } m.cfg.SendPing(ping) case <-m.pingTimeout.C: - m.resetPingState() - - e := errors.New("timeout while waiting for " + - "pong response", + timeWaited := m.pendingPingWait().UnwrapOr( + m.cfg.TimeoutDuration, ) + lastRTT := m.getLastRTT() - m.cfg.OnPongFailure(e) + m.cfg.OnPongFailure( + errors.New("timeout while waiting for "+ + "pong response"), + timeWaited, lastRTT, + ) - return + m.resetPingState() case pong := <-m.pongChan: pongSize := int32(len(pong.PongBytes)) - // Save off values we are about to override when we - // call resetPingState. + // Save off values we are about to override when we call + // resetPingState. expected := m.outstandingPongSize - lastPing := m.pingLastSend + lastPingTime := m.pingLastSend - m.resetPingState() + // This is an unexpected pong, we'll continue. + if lastPingTime == nil { + continue + } - // If the pong we receive doesn't match the ping we - // sent out, then we fail out. + actualRTT := time.Since(*lastPingTime) + + // If the pong we receive doesn't match the ping we sent + // out, then we fail out. if pongSize != expected { - e := errors.New("pong response does " + - "not match expected size", - ) + e := fmt.Errorf("pong response does not match "+ + "expected size. Expected: %d, Got: %d", + expected, pongSize) - m.cfg.OnPongFailure(e) + lastRTT := m.getLastRTT() + m.cfg.OnPongFailure(e, actualRTT, lastRTT) - return - } + m.resetPingState() - // Compute RTT of ping and save that for future - // querying. - if lastPing != nil { - rtt := time.Since(*lastPing) - m.pingTime.Store(&rtt) + continue } + // Pong is good, update RTT and reset state. + m.pingTime.Store(&actualRTT) + m.resetPingState() + case <-m.quit: return } @@ -231,6 +279,7 @@ func (m *PingManager) setPingState(pongSize uint16) error { func (m *PingManager) resetPingState() { m.pingLastSend = nil m.outstandingPongSize = -1 + if !m.pingTimeout.Stop() { select { case <-m.pingTimeout.C: diff --git a/peer/ping_manager_test.go b/peer/ping_manager_test.go index 0f4c3be4982..d0d96bb9432 100644 --- a/peer/ping_manager_test.go +++ b/peer/ping_manager_test.go @@ -1,6 +1,7 @@ package peer import ( + "sync" "testing" "time" @@ -23,19 +24,19 @@ func TestPingManager(t *testing.T) { result bool }{ { - name: "Happy Path", + name: "happy Path", delay: 0, pongSize: 4, result: true, }, { - name: "Bad Pong", + name: "bad Pong", delay: 0, pongSize: 3, result: false, }, { - name: "Timeout", + name: "timeout", delay: 2, pongSize: 4, result: false, @@ -44,45 +45,56 @@ func TestPingManager(t *testing.T) { payload := make([]byte, 4) for _, test := range testCases { - // Set up PingManager. - pingSent := make(chan struct{}) - disconnected := make(chan struct{}) - mgr := NewPingManager(&PingManagerConfig{ - NewPingPayload: func() []byte { - return payload - }, - NewPongSize: func() uint16 { - return 4 - }, - IntervalDuration: time.Second * 2, - TimeoutDuration: time.Second, - SendPing: func(ping *lnwire.Ping) { - close(pingSent) - }, - OnPongFailure: func(err error) { - close(disconnected) - }, - }) - require.NoError(t, mgr.Start(), "Could not start pingManager") + t.Run(test.name, func(t *testing.T) { + // Set up PingManager. + var pingOnce sync.Once + pingSent := make(chan struct{}) + disconnected := make(chan struct{}) + mgr := NewPingManager(&PingManagerConfig{ + NewPingPayload: func() []byte { + return payload + }, + NewPongSize: func() uint16 { + return 4 + }, + IntervalDuration: time.Second * 2, + TimeoutDuration: time.Second, + SendPing: func(ping *lnwire.Ping) { + pingOnce.Do(func() { + close(pingSent) + }) + }, + OnPongFailure: func(err error, + _ time.Duration, _ time.Duration) { + + close(disconnected) + }, + }) + require.NoError( + t, mgr.Start(), "Could not start pingManager", + ) - // Wait for initial Ping. - <-pingSent + // Wait for initial Ping. + <-pingSent - // Wait for pre-determined time before sending Pong response. - time.Sleep(time.Duration(test.delay) * time.Second) + // Wait for pre-determined time before sending Pong + // response. + time.Sleep(time.Duration(test.delay) * time.Second) - // Send Pong back. - res := lnwire.Pong{PongBytes: make([]byte, test.pongSize)} - mgr.ReceivedPong(&res) + // Send Pong back. + res := lnwire.Pong{ + PongBytes: make([]byte, test.pongSize), + } + mgr.ReceivedPong(&res) - // Evaluate result - select { - case <-time.NewTimer(time.Second / 2).C: - require.True(t, test.result) - case <-disconnected: - require.False(t, test.result) - } + select { + case <-time.NewTimer(time.Second / 2).C: + require.True(t, test.result) + case <-disconnected: + require.False(t, test.result) + } - mgr.Stop() + mgr.Stop() + }) } } diff --git a/sample-lnd.conf b/sample-lnd.conf index af4923846dc..ae929d8e6cf 100644 --- a/sample-lnd.conf +++ b/sample-lnd.conf @@ -569,6 +569,11 @@ ; The number of restricted slots the server will allocate for peers. ; num-restricted-slots=30 +; If true, a peer will *not* be disconnected if a pong is not received in time +; or is mismatched. Defaults to false, meaning peers *will* be disconnected on +; pong failure. +; no-disconnect-on-pong-failure=false + [fee] ; Optional URL for external fee estimation. If no URL is specified, the method diff --git a/server.go b/server.go index e816c3ca4cd..3a077a28f46 100644 --- a/server.go +++ b/server.go @@ -4493,6 +4493,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, EndorsementExperimentEnd, ) }, + NoDisconnectOnPongFailure: s.cfg.NoDisconnectOnPongFailure, } copy(pCfg.PubKeyBytes[:], peerAddr.IdentityKey.SerializeCompressed()) From e2c56af5194b31d0ee42262d95416ac13114da27 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 9 May 2025 16:18:03 -0700 Subject: [PATCH 2/2] docs/release-notes: add release notes entry for pong default change --- docs/release-notes/release-notes-0.19.0.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index 3b66186925f..d44ef9f6ea0 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -305,6 +305,10 @@ when running LND with an aux component injected (custom channels). * [remove x/exp/maps dependency](https://github.com/lightningnetwork/lnd/pull/9621) +* [Add a new configuration option](https://github.com/lightningnetwork/lnd/pull/9801) + `--no-disconnect-on-pong-failure` (defaulting to false) to control whether a + peer is disconnected if a pong message is not received in time or is mismatched. + ## RPC Updates * Some RPCs that previously just returned an empty response message now at least