Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions multinode/mock_rpc_client_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions multinode/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
lggr.Tracew("Pinging RPC", "nodeState", n.State(), "pollFailures", pollFailures)
pollCtx, cancel := context.WithTimeout(ctx, pollInterval)
version, pingErr := n.RPC().ClientVersion(pollCtx)
if pingErr == nil {
if healthErr := n.RPC().PollHealthCheck(pollCtx); healthErr != nil {
pingErr = fmt.Errorf("poll health check failed: %w", healthErr)
}
}
cancel()
if pingErr != nil {
// prevent overflow
Expand Down
31 changes: 31 additions & 0 deletions multinode/node_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
}).Once()
// redundant call to stay in alive state
rpc.On("ClientVersion", mock.Anything).Return("", nil)
// PollHealthCheck is called after successful ClientVersion - return nil to pass
rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe()
node.declareAlive()
tests.AssertLogCountEventually(t, observedLogs, fmt.Sprintf("Poll failure, RPC endpoint %s failed to respond properly", node.String()), pollFailureThreshold)
tests.AssertLogCountEventually(t, observedLogs, "Ping successful", 2)
Expand Down Expand Up @@ -176,6 +178,31 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
return nodeStateUnreachable == node.State()
})
})
t.Run("optional poll health check failure counts as poll failure and transitions to unreachable", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
node := newSubscribedNode(t, testNodeOpts{
config: testNodeConfig{
pollFailureThreshold: 1,
pollInterval: tests.TestInterval,
},
rpc: rpc,
lggr: lggr,
})
defer func() { assert.NoError(t, node.close()) }()

rpc.On("ClientVersion", mock.Anything).Return("mock-version", nil)
rpc.On("PollHealthCheck", mock.Anything).Return(errors.New("health check failed"))
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe()

node.declareAlive()
tests.AssertLogCountEventually(t, observedLogs, fmt.Sprintf("Poll failure, RPC endpoint %s failed to respond properly", node.String()), 1)
tests.AssertEventually(t, func() bool {
return nodeStateUnreachable == node.State()
})
})
t.Run("with threshold poll failures, but we are the last node alive, forcibly keeps it alive", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
Expand Down Expand Up @@ -247,6 +274,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
})
defer func() { assert.NoError(t, node.close()) }()
rpc.On("ClientVersion", mock.Anything).Return("", nil)
rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe()
const mostRecentBlock = 20
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30})
poolInfo := newMockPoolChainInfoProvider(t)
Expand Down Expand Up @@ -282,6 +310,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
})
defer func() { assert.NoError(t, node.close()) }()
rpc.On("ClientVersion", mock.Anything).Return("", nil)
rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe()
const mostRecentBlock = 20
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30})
poolInfo := newMockPoolChainInfoProvider(t)
Expand Down Expand Up @@ -310,6 +339,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
})
defer func() { assert.NoError(t, node.close()) }()
rpc.On("ClientVersion", mock.Anything).Return("", nil)
rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe()
const mostRecentBlock = 20
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}).Twice()
poolInfo := newMockPoolChainInfoProvider(t)
Expand Down Expand Up @@ -344,6 +374,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
})
defer func() { assert.NoError(t, node.close()) }()
rpc.On("ClientVersion", mock.Anything).Return("", nil)
rpc.On("PollHealthCheck", mock.Anything).Return(nil).Maybe()
const mostRecentBlock = 20
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30})
node.declareAlive()
Expand Down
7 changes: 7 additions & 0 deletions multinode/rpc_client_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,10 @@ func (m *RPCClientBase[HEAD]) GetInterceptedChainInfo() (latest, highestUserObse
defer m.chainInfoLock.RUnlock()
return m.latestChainInfo, m.highestUserObservations
}

// PollHealthCheck provides a default no-op implementation for the RPCClient interface.
// Chain-specific RPC clients can override this method to perform additional health checks
// during polling (e.g., verifying historical state availability).
func (m *RPCClientBase[HEAD]) PollHealthCheck(ctx context.Context) error {
return nil
}
4 changes: 4 additions & 0 deletions multinode/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ type RPCClient[
// Ensure implementation does not have a race condition when values are reset before request completion and as
// a result latest ChainInfo contains information from the previous cycle.
GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo)
// PollHealthCheck - performs an optional additional health check during polling.
// Implementations can use this for chain-specific health verification (e.g., historical state availability).
// Return nil if the check passes or is not applicable, or an error if the check fails.
PollHealthCheck(ctx context.Context) error
}

// Head is the interface required by the NodeClient
Expand Down
Loading