From b741919c6c7ac07f327655a33affadfd86b82c31 Mon Sep 17 00:00:00 2001 From: Krish-vemula Date: Tue, 17 Feb 2026 14:00:28 -0800 Subject: [PATCH 1/2] add pollHealthChecker interface for optional RPC health checks Add optional interface for chain-specific RPC clients to run extra health checks during alive-loop polling. Failures count toward poll failure threshold. Enables chain integrations to detect issues like missing historical state. --- multinode/mock_rpc_client_test.go | 18 ++++++++++++++++++ multinode/node_lifecycle.go | 13 +++++++++++++ multinode/node_lifecycle_test.go | 25 +++++++++++++++++++++++++ 3 files changed, 56 insertions(+) diff --git a/multinode/mock_rpc_client_test.go b/multinode/mock_rpc_client_test.go index a90063e..a0ae372 100644 --- a/multinode/mock_rpc_client_test.go +++ b/multinode/mock_rpc_client_test.go @@ -296,6 +296,24 @@ func (_m *mockRPCClient[CHAIN_ID, HEAD]) IsSyncing(ctx context.Context) (bool, e return r0, r1 } +// PollHealthCheck provides a mock function with given fields: _a0 +func (_m *mockRPCClient[CHAIN_ID, HEAD]) PollHealthCheck(_a0 context.Context) error { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for PollHealthCheck") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // mockRPCClient_IsSyncing_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsSyncing' type mockRPCClient_IsSyncing_Call[CHAIN_ID ID, HEAD Head] struct { *mock.Call diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index e2974c0..0e92cb7 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -29,6 +29,12 @@ const ( msgDegradedState = "Chainlink is now operating in a degraded state and urgent action is required to resolve the issue" ) +// pollHealthChecker is an optional RPC capability for running extra liveness checks during alive-loop polling. +// If implemented by a chain RPC client, failures are treated the same as ClientVersion polling failures. +type pollHealthChecker interface { + PollHealthCheck(context.Context) error +} + // Node is a FSM // Each state has a loop that goes with it, which monitors the node and moves it into another state as necessary. // Only one loop must run at a time. @@ -111,6 +117,13 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { lggr.Tracew("Pinging RPC", "nodeState", n.State(), "pollFailures", pollFailures) pollCtx, cancel := context.WithTimeout(ctx, pollInterval) version, pingErr := n.RPC().ClientVersion(pollCtx) + if pingErr == nil { + if healthChecker, ok := any(n.RPC()).(pollHealthChecker); ok { + if err := healthChecker.PollHealthCheck(pollCtx); err != nil { + pingErr = fmt.Errorf("poll health check failed: %w", err) + } + } + } cancel() if pingErr != nil { // prevent overflow diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index 684d0c7..e86261b 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -176,6 +176,31 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { return nodeStateUnreachable == node.State() }) }) + t.Run("optional poll health check failure counts as poll failure and transitions to unreachable", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + node := newSubscribedNode(t, testNodeOpts{ + config: testNodeConfig{ + pollFailureThreshold: 1, + pollInterval: tests.TestInterval, + }, + rpc: rpc, + lggr: lggr, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("ClientVersion", mock.Anything).Return("mock-version", nil) + rpc.On("PollHealthCheck", mock.Anything).Return(errors.New("health check failed")) + rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() + + node.declareAlive() + tests.AssertLogEventually(t, observedLogs, "poll health check failed: health check failed") + tests.AssertEventually(t, func() bool { + return nodeStateUnreachable == node.State() + }) + }) t.Run("with threshold poll failures, but we are the last node alive, forcibly keeps it alive", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) From a81093e56a4172b03cf42fad8c760f2321c1d58a Mon Sep 17 00:00:00 2001 From: Krish-vemula Date: Wed, 18 Feb 2026 11:46:57 -0800 Subject: [PATCH 2/2] added fixes for build and lint --- multinode/mock_rpc_client_test.go | 54 +++++++++++++++++++++++-------- multinode/node_lifecycle.go | 12 ++----- multinode/node_lifecycle_test.go | 8 ++++- multinode/rpc_client_base.go | 7 ++++ multinode/types.go | 4 +++ 5 files changed, 61 insertions(+), 24 deletions(-) diff --git a/multinode/mock_rpc_client_test.go b/multinode/mock_rpc_client_test.go index a0ae372..6e129e3 100644 --- a/multinode/mock_rpc_client_test.go +++ b/multinode/mock_rpc_client_test.go @@ -296,9 +296,37 @@ func (_m *mockRPCClient[CHAIN_ID, HEAD]) IsSyncing(ctx context.Context) (bool, e return r0, r1 } -// PollHealthCheck provides a mock function with given fields: _a0 -func (_m *mockRPCClient[CHAIN_ID, HEAD]) PollHealthCheck(_a0 context.Context) error { - ret := _m.Called(_a0) +// mockRPCClient_IsSyncing_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsSyncing' +type mockRPCClient_IsSyncing_Call[CHAIN_ID ID, HEAD Head] struct { + *mock.Call +} + +// IsSyncing is a helper method to define mock.On call +// - ctx context.Context +func (_e *mockRPCClient_Expecter[CHAIN_ID, HEAD]) IsSyncing(ctx interface{}) *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD] { + return &mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD]{Call: _e.mock.On("IsSyncing", ctx)} +} + +func (_c *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD]) Run(run func(ctx context.Context)) *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD] { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD]) Return(_a0 bool, _a1 error) *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD] { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(context.Context) (bool, error)) *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD] { + _c.Call.Return(run) + return _c +} + +// PollHealthCheck provides a mock function with given fields: ctx +func (_m *mockRPCClient[CHAIN_ID, HEAD]) PollHealthCheck(ctx context.Context) error { + ret := _m.Called(ctx) if len(ret) == 0 { panic("no return value specified for PollHealthCheck") @@ -306,7 +334,7 @@ func (_m *mockRPCClient[CHAIN_ID, HEAD]) PollHealthCheck(_a0 context.Context) er var r0 error if rf, ok := ret.Get(0).(func(context.Context) error); ok { - r0 = rf(_a0) + r0 = rf(ctx) } else { r0 = ret.Error(0) } @@ -314,30 +342,30 @@ func (_m *mockRPCClient[CHAIN_ID, HEAD]) PollHealthCheck(_a0 context.Context) er return r0 } -// mockRPCClient_IsSyncing_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsSyncing' -type mockRPCClient_IsSyncing_Call[CHAIN_ID ID, HEAD Head] struct { +// mockRPCClient_PollHealthCheck_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PollHealthCheck' +type mockRPCClient_PollHealthCheck_Call[CHAIN_ID ID, HEAD Head] struct { *mock.Call } -// IsSyncing is a helper method to define mock.On call +// PollHealthCheck is a helper method to define mock.On call // - ctx context.Context -func (_e *mockRPCClient_Expecter[CHAIN_ID, HEAD]) IsSyncing(ctx interface{}) *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD] { - return &mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD]{Call: _e.mock.On("IsSyncing", ctx)} +func (_e *mockRPCClient_Expecter[CHAIN_ID, HEAD]) PollHealthCheck(ctx interface{}) *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD] { + return &mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD]{Call: _e.mock.On("PollHealthCheck", ctx)} } -func (_c *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD]) Run(run func(ctx context.Context)) *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD] { +func (_c *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD]) Run(run func(ctx context.Context)) *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD] { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context)) }) return _c } -func (_c *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD]) Return(_a0 bool, _a1 error) *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD] { - _c.Call.Return(_a0, _a1) +func (_c *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD]) Return(_a0 error) *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD] { + _c.Call.Return(_a0) return _c } -func (_c *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(context.Context) (bool, error)) *mockRPCClient_IsSyncing_Call[CHAIN_ID, HEAD] { +func (_c *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(context.Context) error) *mockRPCClient_PollHealthCheck_Call[CHAIN_ID, HEAD] { _c.Call.Return(run) return _c } diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index 0e92cb7..dffdcf3 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -29,12 +29,6 @@ const ( msgDegradedState = "Chainlink is now operating in a degraded state and urgent action is required to resolve the issue" ) -// pollHealthChecker is an optional RPC capability for running extra liveness checks during alive-loop polling. -// If implemented by a chain RPC client, failures are treated the same as ClientVersion polling failures. -type pollHealthChecker interface { - PollHealthCheck(context.Context) error -} - // Node is a FSM // Each state has a loop that goes with it, which monitors the node and moves it into another state as necessary. // Only one loop must run at a time. @@ -118,10 +112,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { pollCtx, cancel := context.WithTimeout(ctx, pollInterval) version, pingErr := n.RPC().ClientVersion(pollCtx) if pingErr == nil { - if healthChecker, ok := any(n.RPC()).(pollHealthChecker); ok { - if err := healthChecker.PollHealthCheck(pollCtx); err != nil { - pingErr = fmt.Errorf("poll health check failed: %w", err) - } + if healthErr := n.RPC().PollHealthCheck(pollCtx); healthErr != nil { + pingErr = fmt.Errorf("poll health check failed: %w", healthErr) } } cancel() diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index e86261b..dca2531 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -147,6 +147,8 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }).Once() // redundant call to stay in alive state rpc.On("ClientVersion", mock.Anything).Return("", nil) + // PollHealthCheck is called after successful ClientVersion - return nil to pass + rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() node.declareAlive() tests.AssertLogCountEventually(t, observedLogs, fmt.Sprintf("Poll failure, RPC endpoint %s failed to respond properly", node.String()), pollFailureThreshold) tests.AssertLogCountEventually(t, observedLogs, "Ping successful", 2) @@ -196,7 +198,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() node.declareAlive() - tests.AssertLogEventually(t, observedLogs, "poll health check failed: health check failed") + tests.AssertLogCountEventually(t, observedLogs, fmt.Sprintf("Poll failure, RPC endpoint %s failed to respond properly", node.String()), 1) tests.AssertEventually(t, func() bool { return nodeStateUnreachable == node.State() }) @@ -272,6 +274,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) + rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}) poolInfo := newMockPoolChainInfoProvider(t) @@ -307,6 +310,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) + rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}) poolInfo := newMockPoolChainInfoProvider(t) @@ -335,6 +339,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) + rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}).Twice() poolInfo := newMockPoolChainInfoProvider(t) @@ -369,6 +374,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) + rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe() const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}) node.declareAlive() diff --git a/multinode/rpc_client_base.go b/multinode/rpc_client_base.go index b4a886c..a06200d 100644 --- a/multinode/rpc_client_base.go +++ b/multinode/rpc_client_base.go @@ -296,3 +296,10 @@ func (m *RPCClientBase[HEAD]) GetInterceptedChainInfo() (latest, highestUserObse defer m.chainInfoLock.RUnlock() return m.latestChainInfo, m.highestUserObservations } + +// PollHealthCheck provides a default no-op implementation for the RPCClient interface. +// Chain-specific RPC clients can override this method to perform additional health checks +// during polling (e.g., verifying historical state availability). +func (m *RPCClientBase[HEAD]) PollHealthCheck(ctx context.Context) error { + return nil +} diff --git a/multinode/types.go b/multinode/types.go index b31c6ca..e9aa954 100644 --- a/multinode/types.go +++ b/multinode/types.go @@ -77,6 +77,10 @@ type RPCClient[ // Ensure implementation does not have a race condition when values are reset before request completion and as // a result latest ChainInfo contains information from the previous cycle. GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo) + // PollHealthCheck - performs an optional additional health check during polling. + // Implementations can use this for chain-specific health verification (e.g., historical state availability). + // Return nil if the check passes or is not applicable, or an error if the check fails. + PollHealthCheck(ctx context.Context) error } // Head is the interface required by the NodeClient