From 94062cf309b95f720978c6447897595a58cb9204 Mon Sep 17 00:00:00 2001 From: roman Date: Fri, 5 Jun 2026 15:37:30 -0300 Subject: [PATCH] Gate helper ingress when local node is stale. Block helper ingest endpoints with 503 when local Comet status shows no recent blocks for 3 minutes, while keeping queue-summary available for diagnostics. --- CHANGELOG.md | 6 ++++ app/app.go | 67 +++++++++++++++++++++++++++++++++++++ app/helper_liveness_test.go | 46 +++++++++++++++++++++++++ internal/helper/api.go | 28 +++++++++++++++- internal/helper/api_test.go | 36 ++++++++++++++++++++ internal/helper/helper.go | 1 + 6 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 app/helper_liveness_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index a9b9af86..8bf676eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ Historical changes before commit `704b202e2088b91caeaf2290cef85e4a9a759542` are Instructions on coordinated upgrades can be found [here](https://setup.valargroup.org/#coordinated-upgrade). +## Unreleased + +### Changed + +- Helper endpoints now return `503` when the local node has not produced a block for more than 3 minutes (based on local Comet `/status` `latest_block_time`), preventing share ingestion on stale nodes. + ## v1.0.1 Operational migration script updates. Does not touch chain code. diff --git a/app/app.go b/app/app.go index 5ba751bb..ef72cc26 100644 --- a/app/app.go +++ b/app/app.go @@ -2,11 +2,14 @@ package app import ( "context" + "encoding/json" "fmt" "io" + "net/http" "os" "strings" "sync/atomic" + "time" dbm "github.com/cosmos/cosmos-db" @@ -51,6 +54,11 @@ var ( _ servertypes.Application = (*SvoteApp)(nil) ) +const ( + helperMaxBlockStaleness = 3 * time.Minute + helperStatusQueryTimeout = 3 * time.Second +) + // SvoteApp extends an ABCI application for the Shielded-Vote chain. // Built from a stripped-down Cosmos SDK simapp with only the minimal // modules needed for block production (auth, bank, staking, slashing, @@ -369,6 +377,8 @@ func (app *SvoteApp) RegisterAPIRoutes(apiSvr *api.Server, apiConfig config.APIC return false } return h.ExposeQueueSummary + }, func() bool { + return app.helperIngressAllowed() }, func() helper.TreeReader { h := app.GetHelper() if h == nil { @@ -426,6 +436,63 @@ func (app *SvoteApp) resolveAdminPIRServiceURL(_ context.Context) (string, error return cfg.PIRServers[0].URL, nil } +func (app *SvoteApp) helperIngressAllowed() bool { + healthy, err := app.localNodeProducedRecently(helperMaxBlockStaleness) + if err != nil { + app.Logger().Warn("helper ingress disabled: local Comet status check failed", "error", err) + return false + } + return healthy +} + +func (app *SvoteApp) localNodeProducedRecently(maxStaleness time.Duration) (bool, error) { + cometRPC := app.cometRPC + if cometRPC == "" { + cometRPC = "http://localhost:26657" + } else if strings.HasPrefix(cometRPC, "tcp://") { + cometRPC = "http://" + strings.TrimPrefix(cometRPC, "tcp://") + } + + statusURL := strings.TrimRight(cometRPC, "/") + "/status" + ctx, cancel := context.WithTimeout(context.Background(), helperStatusQueryTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, statusURL, nil) + if err != nil { + return false, fmt.Errorf("create status request: %w", err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return false, fmt.Errorf("query status: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return false, fmt.Errorf("status returned HTTP %d", resp.StatusCode) + } + + var statusResp struct { + Result struct { + SyncInfo struct { + LatestBlockTime string `json:"latest_block_time"` + } `json:"sync_info"` + } `json:"result"` + } + if err := json.NewDecoder(resp.Body).Decode(&statusResp); err != nil { + return false, fmt.Errorf("decode status response: %w", err) + } + if statusResp.Result.SyncInfo.LatestBlockTime == "" { + return false, fmt.Errorf("status missing latest_block_time") + } + + latestBlockTime, err := time.Parse(time.RFC3339Nano, statusResp.Result.SyncInfo.LatestBlockTime) + if err != nil { + return false, fmt.Errorf("parse latest_block_time: %w", err) + } + + return time.Since(latestBlockTime) <= maxStaleness, nil +} + // SetHelper publishes the helper instance for concurrent readers. func (app *SvoteApp) SetHelper(h *helper.Helper) { app.helperRef.Store(h) diff --git a/app/helper_liveness_test.go b/app/helper_liveness_test.go new file mode 100644 index 00000000..9df90884 --- /dev/null +++ b/app/helper_liveness_test.go @@ -0,0 +1,46 @@ +package app + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestLocalNodeProducedRecently_HealthyWithinThreeMinutes(t *testing.T) { + latest := time.Now().Add(-2 * time.Minute).UTC().Format(time.RFC3339Nano) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/status", r.URL.Path) + _, _ = fmt.Fprintf(w, `{"result":{"sync_info":{"latest_block_time":"%s"}}}`, latest) + })) + t.Cleanup(server.Close) + + app := &SvoteApp{cometRPC: server.URL} + healthy, err := app.localNodeProducedRecently(helperMaxBlockStaleness) + require.NoError(t, err) + require.True(t, healthy) +} + +func TestLocalNodeProducedRecently_StaleAfterThreeMinutes(t *testing.T) { + latest := time.Now().Add(-(helperMaxBlockStaleness + time.Second)).UTC().Format(time.RFC3339Nano) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/status", r.URL.Path) + _, _ = fmt.Fprintf(w, `{"result":{"sync_info":{"latest_block_time":"%s"}}}`, latest) + })) + t.Cleanup(server.Close) + + app := &SvoteApp{cometRPC: server.URL} + healthy, err := app.localNodeProducedRecently(helperMaxBlockStaleness) + require.NoError(t, err) + require.False(t, healthy) +} + +func TestLocalNodeProducedRecently_StatusRPCFailure(t *testing.T) { + app := &SvoteApp{cometRPC: "http://127.0.0.1:1"} + healthy, err := app.localNodeProducedRecently(helperMaxBlockStaleness) + require.Error(t, err) + require.False(t, healthy) +} diff --git a/internal/helper/api.go b/internal/helper/api.go index 3c18fec6..ae0fa67f 100644 --- a/internal/helper/api.go +++ b/internal/helper/api.go @@ -26,6 +26,7 @@ func RegisterRoutes(router *mux.Router, store *ShareStore, logger log.Logger) { func() *ShareStore { return store }, func() string { return "" }, func() bool { return false }, + func() bool { return true }, nil, nil, nil, @@ -37,7 +38,7 @@ func RegisterRoutes(router *mux.Router, store *ShareStore, logger log.Logger) { // mux router, resolving the store at request time. This allows routes to be // mounted before the helper is fully initialized. func RegisterRoutesWithStoreGetter(router *mux.Router, getStore func() *ShareStore, logger log.Logger) { - RegisterRoutesWithGetters(router, getStore, func() string { return "" }, func() bool { return false }, nil, nil, nil, logger) + RegisterRoutesWithGetters(router, getStore, func() string { return "" }, func() bool { return false }, func() bool { return true }, nil, nil, nil, logger) } // ErrInvalidCommitment is returned when the share's recomputed vote commitment @@ -51,6 +52,7 @@ func RegisterRoutesWithGetters( getStore func() *ShareStore, getAPIToken func() string, getExposeQueueStatus func() bool, + getIngressAllowed func() bool, getTree func() TreeReader, getVCHash func() VCHashFunc, getShareNullifier ShareNullifierCheckerGetter, @@ -62,6 +64,7 @@ func RegisterRoutesWithGetters( getAPIToken, getExposeQueueStatus, func() bool { return true }, + getIngressAllowed, getTree, getVCHash, getShareNullifier, @@ -77,6 +80,7 @@ func RegisterRoutesWithQueueSummaryGetters( getAPIToken func() string, getExposeQueueStatus func() bool, getExposeQueueSummary func() bool, + getIngressAllowed func() bool, getTree func() TreeReader, getVCHash func() VCHashFunc, getShareNullifier ShareNullifierCheckerGetter, @@ -87,6 +91,7 @@ func RegisterRoutesWithQueueSummaryGetters( getAPIToken: getAPIToken, getExposeQueueStatus: getExposeQueueStatus, getExposeQueueSummary: getExposeQueueSummary, + getIngressAllowed: getIngressAllowed, getTree: getTree, getVCHash: getVCHash, getShareNullifier: getShareNullifier, @@ -108,6 +113,7 @@ type apiHandler struct { getAPIToken func() string getExposeQueueStatus func() bool getExposeQueueSummary func() bool + getIngressAllowed func() bool getTree func() TreeReader getVCHash func() VCHashFunc getShareNullifier ShareNullifierCheckerGetter @@ -131,6 +137,9 @@ func jsonError(w http.ResponseWriter, msg string, code int) { } func (h *apiHandler) handleSubmitShare(w http.ResponseWriter, r *http.Request) { + if !h.ensureIngressAllowed(w) { + return + } store := h.getStore() if store == nil { jsonError(w, "helper unavailable", http.StatusServiceUnavailable) @@ -213,6 +222,9 @@ func (h *apiHandler) handleSubmitShare(w http.ResponseWriter, r *http.Request) { } func (h *apiHandler) handleShareStatus(w http.ResponseWriter, r *http.Request) { + if !h.ensureIngressAllowed(w) { + return + } if h.getStore() == nil { jsonError(w, "helper unavailable", http.StatusServiceUnavailable) return @@ -281,6 +293,9 @@ type statusResponse struct { } func (h *apiHandler) handleStatus(w http.ResponseWriter, r *http.Request) { + if !h.ensureIngressAllowed(w) { + return + } store := h.getStore() if store == nil { jsonError(w, "helper unavailable", http.StatusServiceUnavailable) @@ -296,6 +311,9 @@ func (h *apiHandler) handleStatus(w http.ResponseWriter, r *http.Request) { } func (h *apiHandler) handleQueueStatus(w http.ResponseWriter, r *http.Request) { + if !h.ensureIngressAllowed(w) { + return + } if h.getExposeQueueStatus == nil || !h.getExposeQueueStatus() { jsonError(w, "not found", http.StatusNotFound) return @@ -370,6 +388,14 @@ func (h *apiHandler) authorizeSubmit(r *http.Request) bool { return subtle.ConstantTimeCompare([]byte(provided), []byte(token)) == 1 } +func (h *apiHandler) ensureIngressAllowed(w http.ResponseWriter) bool { + if h.getIngressAllowed == nil || h.getIngressAllowed() { + return true + } + jsonError(w, "helper ingress disabled: local node not producing recent blocks", http.StatusServiceUnavailable) + return false +} + // verifyCommitment recomputes the vote commitment Poseidon hash from the // payload fields and compares it against the on-chain leaf at tree_position. // Returns nil when the VC hash function or tree reader is unavailable diff --git a/internal/helper/api_test.go b/internal/helper/api_test.go index 82a6cf99..992e3a95 100644 --- a/internal/helper/api_test.go +++ b/internal/helper/api_test.go @@ -35,6 +35,7 @@ func newQueueStatusRouter(t *testing.T, token string) (*mux.Router, *ShareStore) func() *ShareStore { return store }, func() string { return token }, func() bool { return true }, + func() bool { return true }, nil, nil, nil, @@ -52,6 +53,7 @@ func newQueueSummaryRouter(t *testing.T, store *ShareStore, token string, expose func() string { return token }, func() bool { return false }, func() bool { return expose }, + func() bool { return true }, nil, nil, nil, @@ -115,6 +117,7 @@ func TestShareStatus_PendingThenConfirmed(t *testing.T) { func() *ShareStore { return store }, func() string { return "" }, func() bool { return false }, + func() bool { return true }, nil, nil, func() ShareNullifierChecker { return checker }, @@ -357,6 +360,36 @@ func TestRoutes_HelperUnavailable(t *testing.T) { }) } +func TestRoutes_IngressDisabledReturnsServiceUnavailable(t *testing.T) { + store := newTestStore(t) + router := mux.NewRouter() + RegisterRoutesWithGetters( + router, + func() *ShareStore { return store }, + func() string { return "" }, + func() bool { return true }, + func() bool { return false }, + nil, + nil, + nil, + log.NewNopLogger(), + ) + + t.Run("shares returns 503", func(t *testing.T) { + req := httptest.NewRequest("POST", "/shielded-vote/v1/shares", strings.NewReader(validPayloadJSON())) + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + assert.Equal(t, http.StatusServiceUnavailable, w.Code) + }) + + t.Run("status returns 503", func(t *testing.T) { + req := httptest.NewRequest("GET", "/shielded-vote/v1/status", nil) + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + assert.Equal(t, http.StatusServiceUnavailable, w.Code) + }) +} + func TestRoutes_BecomeReadyAfterStoreSet(t *testing.T) { router := mux.NewRouter() var store *ShareStore @@ -420,6 +453,7 @@ func TestSubmitShare_APITokenAuth(t *testing.T) { func() *ShareStore { return store }, func() string { return "secret-token" }, func() bool { return false }, + func() bool { return true }, nil, nil, nil, @@ -478,6 +512,7 @@ func vcTestRouter(t *testing.T) (*mux.Router, *ShareStore, *vcMockTree) { func() *ShareStore { return store }, func() string { return "" }, func() bool { return false }, + func() bool { return true }, func() TreeReader { return tree }, func() VCHashFunc { return vcHash }, nil, @@ -587,6 +622,7 @@ func TestSubmitShare_VCCrossCheck_GracefulDegradation(t *testing.T) { func() *ShareStore { return store }, func() string { return "" }, func() bool { return false }, + func() bool { return true }, nil, nil, nil, diff --git a/internal/helper/helper.go b/internal/helper/helper.go index a10f28a1..ab7f0280 100644 --- a/internal/helper/helper.go +++ b/internal/helper/helper.go @@ -105,6 +105,7 @@ func (h *Helper) RegisterRoutes(router *mux.Router) { func() string { return h.APIToken }, func() bool { return h.ExposeQueueStatus }, func() bool { return h.ExposeQueueSummary }, + func() bool { return true }, func() TreeReader { return h.Processor.tree }, func() VCHashFunc { return h.VCHash }, func() ShareNullifierChecker { return h.ShareNullifierChecker },