Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ Historical changes before commit `704b202e2088b91caeaf2290cef85e4a9a759542` are

Instructions on coordinated upgrades can be found [here](https://setup.valargroup.org/#coordinated-upgrade).

## Unreleased

### Changed

- Helper endpoints now return `503` when the local node has not produced a block for more than 3 minutes (based on local Comet `/status` `latest_block_time`), preventing share ingestion on stale nodes.

## v1.0.1

Operational migration script updates. Does not touch chain code.
Expand Down
67 changes: 67 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package app

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"sync/atomic"
"time"

dbm "github.com/cosmos/cosmos-db"

Expand Down Expand Up @@ -51,6 +54,11 @@ var (
_ servertypes.Application = (*SvoteApp)(nil)
)

const (
helperMaxBlockStaleness = 3 * time.Minute
helperStatusQueryTimeout = 3 * time.Second
)

// SvoteApp extends an ABCI application for the Shielded-Vote chain.
// Built from a stripped-down Cosmos SDK simapp with only the minimal
// modules needed for block production (auth, bank, staking, slashing,
Expand Down Expand Up @@ -369,6 +377,8 @@ func (app *SvoteApp) RegisterAPIRoutes(apiSvr *api.Server, apiConfig config.APIC
return false
}
return h.ExposeQueueSummary
}, func() bool {
return app.helperIngressAllowed()
}, func() helper.TreeReader {
h := app.GetHelper()
if h == nil {
Expand Down Expand Up @@ -426,6 +436,63 @@ func (app *SvoteApp) resolveAdminPIRServiceURL(_ context.Context) (string, error
return cfg.PIRServers[0].URL, nil
}

func (app *SvoteApp) helperIngressAllowed() bool {
healthy, err := app.localNodeProducedRecently(helperMaxBlockStaleness)
if err != nil {
app.Logger().Warn("helper ingress disabled: local Comet status check failed", "error", err)
return false
}
return healthy
}

func (app *SvoteApp) localNodeProducedRecently(maxStaleness time.Duration) (bool, error) {
cometRPC := app.cometRPC
if cometRPC == "" {
cometRPC = "http://localhost:26657"
} else if strings.HasPrefix(cometRPC, "tcp://") {
cometRPC = "http://" + strings.TrimPrefix(cometRPC, "tcp://")
}

statusURL := strings.TrimRight(cometRPC, "/") + "/status"
ctx, cancel := context.WithTimeout(context.Background(), helperStatusQueryTimeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, statusURL, nil)
if err != nil {
return false, fmt.Errorf("create status request: %w", err)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return false, fmt.Errorf("query status: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return false, fmt.Errorf("status returned HTTP %d", resp.StatusCode)
}

var statusResp struct {
Result struct {
SyncInfo struct {
LatestBlockTime string `json:"latest_block_time"`
} `json:"sync_info"`
} `json:"result"`
}
if err := json.NewDecoder(resp.Body).Decode(&statusResp); err != nil {
return false, fmt.Errorf("decode status response: %w", err)
}
if statusResp.Result.SyncInfo.LatestBlockTime == "" {
return false, fmt.Errorf("status missing latest_block_time")
}

latestBlockTime, err := time.Parse(time.RFC3339Nano, statusResp.Result.SyncInfo.LatestBlockTime)
if err != nil {
return false, fmt.Errorf("parse latest_block_time: %w", err)
}

return time.Since(latestBlockTime) <= maxStaleness, nil
}

// SetHelper publishes the helper instance for concurrent readers.
func (app *SvoteApp) SetHelper(h *helper.Helper) {
app.helperRef.Store(h)
Expand Down
46 changes: 46 additions & 0 deletions app/helper_liveness_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package app

import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestLocalNodeProducedRecently_HealthyWithinThreeMinutes(t *testing.T) {
latest := time.Now().Add(-2 * time.Minute).UTC().Format(time.RFC3339Nano)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "/status", r.URL.Path)
_, _ = fmt.Fprintf(w, `{"result":{"sync_info":{"latest_block_time":"%s"}}}`, latest)
}))
t.Cleanup(server.Close)

app := &SvoteApp{cometRPC: server.URL}
healthy, err := app.localNodeProducedRecently(helperMaxBlockStaleness)
require.NoError(t, err)
require.True(t, healthy)
}

func TestLocalNodeProducedRecently_StaleAfterThreeMinutes(t *testing.T) {
latest := time.Now().Add(-(helperMaxBlockStaleness + time.Second)).UTC().Format(time.RFC3339Nano)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "/status", r.URL.Path)
_, _ = fmt.Fprintf(w, `{"result":{"sync_info":{"latest_block_time":"%s"}}}`, latest)
}))
t.Cleanup(server.Close)

app := &SvoteApp{cometRPC: server.URL}
healthy, err := app.localNodeProducedRecently(helperMaxBlockStaleness)
require.NoError(t, err)
require.False(t, healthy)
}

func TestLocalNodeProducedRecently_StatusRPCFailure(t *testing.T) {
app := &SvoteApp{cometRPC: "http://127.0.0.1:1"}
healthy, err := app.localNodeProducedRecently(helperMaxBlockStaleness)
require.Error(t, err)
require.False(t, healthy)
}
28 changes: 27 additions & 1 deletion internal/helper/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func RegisterRoutes(router *mux.Router, store *ShareStore, logger log.Logger) {
func() *ShareStore { return store },
func() string { return "" },
func() bool { return false },
func() bool { return true },
nil,
nil,
nil,
Expand All @@ -37,7 +38,7 @@ func RegisterRoutes(router *mux.Router, store *ShareStore, logger log.Logger) {
// mux router, resolving the store at request time. This allows routes to be
// mounted before the helper is fully initialized.
func RegisterRoutesWithStoreGetter(router *mux.Router, getStore func() *ShareStore, logger log.Logger) {
RegisterRoutesWithGetters(router, getStore, func() string { return "" }, func() bool { return false }, nil, nil, nil, logger)
RegisterRoutesWithGetters(router, getStore, func() string { return "" }, func() bool { return false }, func() bool { return true }, nil, nil, nil, logger)
}

// ErrInvalidCommitment is returned when the share's recomputed vote commitment
Expand All @@ -51,6 +52,7 @@ func RegisterRoutesWithGetters(
getStore func() *ShareStore,
getAPIToken func() string,
getExposeQueueStatus func() bool,
getIngressAllowed func() bool,
getTree func() TreeReader,
getVCHash func() VCHashFunc,
getShareNullifier ShareNullifierCheckerGetter,
Expand All @@ -62,6 +64,7 @@ func RegisterRoutesWithGetters(
getAPIToken,
getExposeQueueStatus,
func() bool { return true },
getIngressAllowed,
getTree,
getVCHash,
getShareNullifier,
Expand All @@ -77,6 +80,7 @@ func RegisterRoutesWithQueueSummaryGetters(
getAPIToken func() string,
getExposeQueueStatus func() bool,
getExposeQueueSummary func() bool,
getIngressAllowed func() bool,
getTree func() TreeReader,
getVCHash func() VCHashFunc,
getShareNullifier ShareNullifierCheckerGetter,
Expand All @@ -87,6 +91,7 @@ func RegisterRoutesWithQueueSummaryGetters(
getAPIToken: getAPIToken,
getExposeQueueStatus: getExposeQueueStatus,
getExposeQueueSummary: getExposeQueueSummary,
getIngressAllowed: getIngressAllowed,
getTree: getTree,
getVCHash: getVCHash,
getShareNullifier: getShareNullifier,
Expand All @@ -108,6 +113,7 @@ type apiHandler struct {
getAPIToken func() string
getExposeQueueStatus func() bool
getExposeQueueSummary func() bool
getIngressAllowed func() bool
getTree func() TreeReader
getVCHash func() VCHashFunc
getShareNullifier ShareNullifierCheckerGetter
Expand All @@ -131,6 +137,9 @@ func jsonError(w http.ResponseWriter, msg string, code int) {
}

func (h *apiHandler) handleSubmitShare(w http.ResponseWriter, r *http.Request) {
if !h.ensureIngressAllowed(w) {
return
}
store := h.getStore()
if store == nil {
jsonError(w, "helper unavailable", http.StatusServiceUnavailable)
Expand Down Expand Up @@ -213,6 +222,9 @@ func (h *apiHandler) handleSubmitShare(w http.ResponseWriter, r *http.Request) {
}

func (h *apiHandler) handleShareStatus(w http.ResponseWriter, r *http.Request) {
if !h.ensureIngressAllowed(w) {
return
}
if h.getStore() == nil {
jsonError(w, "helper unavailable", http.StatusServiceUnavailable)
return
Expand Down Expand Up @@ -281,6 +293,9 @@ type statusResponse struct {
}

func (h *apiHandler) handleStatus(w http.ResponseWriter, r *http.Request) {
if !h.ensureIngressAllowed(w) {
return
}
store := h.getStore()
if store == nil {
jsonError(w, "helper unavailable", http.StatusServiceUnavailable)
Expand All @@ -296,6 +311,9 @@ func (h *apiHandler) handleStatus(w http.ResponseWriter, r *http.Request) {
}

func (h *apiHandler) handleQueueStatus(w http.ResponseWriter, r *http.Request) {
if !h.ensureIngressAllowed(w) {
return
}
if h.getExposeQueueStatus == nil || !h.getExposeQueueStatus() {
jsonError(w, "not found", http.StatusNotFound)
return
Expand Down Expand Up @@ -370,6 +388,14 @@ func (h *apiHandler) authorizeSubmit(r *http.Request) bool {
return subtle.ConstantTimeCompare([]byte(provided), []byte(token)) == 1
}

func (h *apiHandler) ensureIngressAllowed(w http.ResponseWriter) bool {
if h.getIngressAllowed == nil || h.getIngressAllowed() {
return true
}
jsonError(w, "helper ingress disabled: local node not producing recent blocks", http.StatusServiceUnavailable)
return false
}

// verifyCommitment recomputes the vote commitment Poseidon hash from the
// payload fields and compares it against the on-chain leaf at tree_position.
// Returns nil when the VC hash function or tree reader is unavailable
Expand Down
36 changes: 36 additions & 0 deletions internal/helper/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func newQueueStatusRouter(t *testing.T, token string) (*mux.Router, *ShareStore)
func() *ShareStore { return store },
func() string { return token },
func() bool { return true },
func() bool { return true },
nil,
nil,
nil,
Expand All @@ -52,6 +53,7 @@ func newQueueSummaryRouter(t *testing.T, store *ShareStore, token string, expose
func() string { return token },
func() bool { return false },
func() bool { return expose },
func() bool { return true },
nil,
nil,
nil,
Expand Down Expand Up @@ -115,6 +117,7 @@ func TestShareStatus_PendingThenConfirmed(t *testing.T) {
func() *ShareStore { return store },
func() string { return "" },
func() bool { return false },
func() bool { return true },
nil,
nil,
func() ShareNullifierChecker { return checker },
Expand Down Expand Up @@ -357,6 +360,36 @@ func TestRoutes_HelperUnavailable(t *testing.T) {
})
}

func TestRoutes_IngressDisabledReturnsServiceUnavailable(t *testing.T) {
store := newTestStore(t)
router := mux.NewRouter()
RegisterRoutesWithGetters(
router,
func() *ShareStore { return store },
func() string { return "" },
func() bool { return true },
func() bool { return false },
nil,
nil,
nil,
log.NewNopLogger(),
)

t.Run("shares returns 503", func(t *testing.T) {
req := httptest.NewRequest("POST", "/shielded-vote/v1/shares", strings.NewReader(validPayloadJSON()))
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusServiceUnavailable, w.Code)
})

t.Run("status returns 503", func(t *testing.T) {
req := httptest.NewRequest("GET", "/shielded-vote/v1/status", nil)
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusServiceUnavailable, w.Code)
})
}

func TestRoutes_BecomeReadyAfterStoreSet(t *testing.T) {
router := mux.NewRouter()
var store *ShareStore
Expand Down Expand Up @@ -420,6 +453,7 @@ func TestSubmitShare_APITokenAuth(t *testing.T) {
func() *ShareStore { return store },
func() string { return "secret-token" },
func() bool { return false },
func() bool { return true },
nil,
nil,
nil,
Expand Down Expand Up @@ -478,6 +512,7 @@ func vcTestRouter(t *testing.T) (*mux.Router, *ShareStore, *vcMockTree) {
func() *ShareStore { return store },
func() string { return "" },
func() bool { return false },
func() bool { return true },
func() TreeReader { return tree },
func() VCHashFunc { return vcHash },
nil,
Expand Down Expand Up @@ -587,6 +622,7 @@ func TestSubmitShare_VCCrossCheck_GracefulDegradation(t *testing.T) {
func() *ShareStore { return store },
func() string { return "" },
func() bool { return false },
func() bool { return true },
nil,
nil,
nil,
Expand Down
1 change: 1 addition & 0 deletions internal/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (h *Helper) RegisterRoutes(router *mux.Router) {
func() string { return h.APIToken },
func() bool { return h.ExposeQueueStatus },
func() bool { return h.ExposeQueueSummary },
func() bool { return true },
func() TreeReader { return h.Processor.tree },
func() VCHashFunc { return h.VCHash },
func() ShareNullifierChecker { return h.ShareNullifierChecker },
Expand Down
Loading