diff --git a/extensions/tn_digest/engine_ops_integration_test.go b/extensions/tn_digest/engine_ops_integration_test.go index b54ed7a49..5cbf818b0 100644 --- a/extensions/tn_digest/engine_ops_integration_test.go +++ b/extensions/tn_digest/engine_ops_integration_test.go @@ -41,7 +41,7 @@ func TestBuildAndBroadcastAutoDigestTx_VerifiesTxBuildSignAndDBEffect(t *testing require.NoError(t, err) // Prepare EngineOperations - ops := internal.NewEngineOperations(platform.Engine, platform.DB, accts, log.New()) + ops := internal.NewEngineOperations(platform.Engine, platform.DB, nil, accts, log.New()) // Generate a node signer (secp256k1) priv, _, err := crypto.GenerateSecp256k1Key(nil) diff --git a/extensions/tn_digest/internal/engine_ops.go b/extensions/tn_digest/internal/engine_ops.go index 5a00468f8..092ec08ca 100644 --- a/extensions/tn_digest/internal/engine_ops.go +++ b/extensions/tn_digest/internal/engine_ops.go @@ -26,11 +26,29 @@ type EngineOperations struct { engine common.Engine logger log.Logger db sql.DB + dbPool sql.DelayedReadTxMaker // For fresh read transactions in background jobs accounts common.Accounts } -func NewEngineOperations(engine common.Engine, db sql.DB, accounts common.Accounts, logger log.Logger) *EngineOperations { - return &EngineOperations{engine: engine, db: db, accounts: accounts, logger: logger.New("ops")} +func NewEngineOperations(engine common.Engine, db sql.DB, dbPool sql.DelayedReadTxMaker, accounts common.Accounts, logger log.Logger) *EngineOperations { + return &EngineOperations{engine: engine, db: db, dbPool: dbPool, accounts: accounts, logger: logger.New("ops")} +} + +// getFreshReadTx returns a fresh database connection for read operations. +// This is critical for background scheduler operations where e.db may be stale/closed. +// Returns: (db, cleanup function, error) +// The cleanup function MUST be called (via defer) to rollback the read transaction. +func (e *EngineOperations) getFreshReadTx(ctx context.Context) (sql.DB, func(), error) { + if e.dbPool != nil { + readTx := e.dbPool.BeginDelayedReadTx() + cleanup := func() { + readTx.Rollback(ctx) + } + return readTx, cleanup, nil + } + // Fallback to stored db (may fail if tx is closed, but better than nothing) + e.logger.Warn("dbPool is nil, falling back to stored db connection (may be stale)") + return e.db, func() {}, nil } // LoadDigestConfig reads the single-row digest configuration. @@ -41,8 +59,17 @@ func (e *EngineOperations) LoadDigestConfig(ctx context.Context) (bool, string, schedule string found bool ) + + // Use fresh read transaction from pool to avoid stale connection issues + // in background scheduler contexts where e.db may be closed + db, cleanup, err := e.getFreshReadTx(ctx) + if err != nil { + return false, "", fmt.Errorf("get fresh read tx: %w", err) + } + defer cleanup() + // Read using engine without engine ctx (owner-level read) - err := e.engine.ExecuteWithoutEngineCtx(ctx, e.db, + err = e.engine.ExecuteWithoutEngineCtx(ctx, db, `SELECT enabled, digest_schedule FROM main.digest_config WHERE id = 1`, nil, func(row *common.Row) error { if len(row.Values) >= 2 { @@ -79,9 +106,16 @@ func (e *EngineOperations) BuildAndBroadcastAutoDigestTx(ctx context.Context, ch return fmt.Errorf("failed to get signer account: %w", err) } + // Use fresh read transaction from pool to avoid stale connection issues + db, cleanup, err := e.getFreshReadTx(ctx) + if err != nil { + return fmt.Errorf("get fresh read tx: %w", err) + } + defer cleanup() + // Get account information using the accounts service directly on database // DB interface embeds Executor, so we can use it directly - account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID) + account, err := e.accounts.GetAccount(ctx, db, signerAccountID) var nextNonce uint64 if err != nil { // Account doesn't exist yet - use nonce 1 for first transaction @@ -148,11 +182,18 @@ func (e *EngineOperations) BroadcastAutoDigestWithArgsAndParse( return nil, fmt.Errorf("failed to get signer account: %w", err) } + // Use fresh read transaction from pool to avoid stale connection issues + db, cleanup, err := e.getFreshReadTx(ctx) + if err != nil { + return nil, fmt.Errorf("get fresh read tx: %w", err) + } + defer cleanup() + // Get account information using the accounts service directly on database - account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID) + account, err := e.accounts.GetAccount(ctx, db, signerAccountID) var nextNonce uint64 if err != nil { - // Only treat “not found” / “no rows” as missing-account; fail fast on any other error + // Only treat "not found" / "no rows" as missing-account; fail fast on any other error msg := strings.ToLower(err.Error()) if !strings.Contains(msg, "not found") && !strings.Contains(msg, "no rows") { return nil, fmt.Errorf("get account: %w", err) @@ -310,8 +351,15 @@ func (e *EngineOperations) broadcastAutoDigestWithFreshNonce( return nil, ktypes.Hash{}, fmt.Errorf("get signer account: %w", err) } + // Use fresh read transaction from pool to avoid stale connection issues + db, cleanup, err := e.getFreshReadTx(ctx) + if err != nil { + return nil, ktypes.Hash{}, fmt.Errorf("get fresh read tx: %w", err) + } + defer cleanup() + // ALWAYS query database for current nonce (fresh state) - account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID) + account, err := e.accounts.GetAccount(ctx, db, signerAccountID) var nextNonce uint64 if err != nil { // Only treat "not found" / "no rows" as missing-account diff --git a/extensions/tn_digest/leader_reload_test.go b/extensions/tn_digest/leader_reload_test.go index 2c6eeea82..9c8329ae0 100644 --- a/extensions/tn_digest/leader_reload_test.go +++ b/extensions/tn_digest/leader_reload_test.go @@ -150,7 +150,7 @@ func resetExtensionForTest() *Extension { ext.SetNodeSigner(mockSigner{}) ext.SetBroadcaster(mockBroadcaster{}) // minimal engine ops - ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, &fakeDB{}, &fakeAccounts{}, log.New())) + ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, &fakeDB{}, nil, &fakeAccounts{}, log.New())) SetExtension(ext) return ext } @@ -217,7 +217,7 @@ func TestDigest_Reload_EnablesAndStarts_WhenBecomesEnabled(t *testing.T) { // attach EngineOps with fake DB that returns enabled on reload BEFORE first hook fdb := &fakeDB{enabled: true, schedule: "*/5 * * * *"} - ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, &fakeAccounts{}, log.New())) + ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, nil, &fakeAccounts{}, log.New())) // leader at height 1: disabled, no scheduler digestLeaderAcquire(context.Background(), app, makeBlock(1, identity)) @@ -245,7 +245,7 @@ func TestDigest_Reload_DisablesAndStops_WhenBecomesDisabled(t *testing.T) { // reload returns disabled fdb := &fakeDB{enabled: false, schedule: "*/5 * * * *"} - ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, &fakeAccounts{}, log.New())) + ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, nil, &fakeAccounts{}, log.New())) digestLeaderEndBlock(context.Background(), app, makeBlock(2, identity)) // stop should be idempotent @@ -311,7 +311,7 @@ func TestDigest_Reload_TransientFailure_SucceedsAfterRetry(t *testing.T) { schedule: "0 9 * * *", failCount: 2, } - ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, &fakeAccounts{}, log.New())) + ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, nil, &fakeAccounts{}, log.New())) // Reload at height 2 - first attempt fails, triggers background retry digestLeaderEndBlock(context.Background(), app, makeBlock(2, identity)) @@ -356,7 +356,7 @@ func TestDigest_Reload_AllRetriesFail_KeepsCurrentConfig(t *testing.T) { schedule: "0 9 * * *", failCount: 20, // More than max retries (15) } - ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, &fakeAccounts{}, log.New())) + ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, nil, &fakeAccounts{}, log.New())) // Reload at height 2 - first attempt fails, triggers background retry digestLeaderEndBlock(context.Background(), app, makeBlock(2, identity)) @@ -396,7 +396,7 @@ func TestDigest_Reload_ContextCancellation_ExitsGracefully(t *testing.T) { schedule: "0 9 * * *", failCount: 10, } - ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, &fakeAccounts{}, log.New())) + ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, nil, &fakeAccounts{}, log.New())) // Reload triggers background retry digestLeaderEndBlock(context.Background(), app, makeBlock(2, identity)) diff --git a/extensions/tn_digest/tn_digest.go b/extensions/tn_digest/tn_digest.go index df82b9111..eb32564c9 100644 --- a/extensions/tn_digest/tn_digest.go +++ b/extensions/tn_digest/tn_digest.go @@ -53,7 +53,7 @@ func InitializeExtensionWithNodeCapabilities(_ TxBroadcaster, _ auth.Signer) { I func engineReadyHook(ctx context.Context, app *common.App) error { logger := app.Service.Logger.New(ExtensionName) - // Use app.DB for read/query; if a separate RO pool is needed in future, wire it here. + // Use app.DB for read/query var db sql.DB db = app.DB if db == nil { @@ -61,7 +61,8 @@ func engineReadyHook(ctx context.Context, app *common.App) error { } // Build engine operations wrapper - engOps := internal.NewEngineOperations(app.Engine, db, app.Accounts, app.Service.Logger) + // Pass DBPool for fresh read transactions in background jobs + engOps := internal.NewEngineOperations(app.Engine, db, app.Service.DBPool, app.Accounts, app.Service.Logger) // Load schedule from config; fall back to default if absent enabled, schedule, _ := engOps.LoadDigestConfig(ctx) diff --git a/extensions/tn_settlement/extension.go b/extensions/tn_settlement/extension.go index 367f714e3..8fd2f255c 100644 --- a/extensions/tn_settlement/extension.go +++ b/extensions/tn_settlement/extension.go @@ -51,6 +51,10 @@ type Extension struct { retrySignal chan struct{} // signal to trigger retry worker retryMu sync.Mutex // protects retry state + // shutdown context for graceful termination + shutdownCtx context.Context + shutdownCancel context.CancelFunc + // tx submission wiring broadcaster TxBroadcaster nodeSigner auth.Signer @@ -165,6 +169,18 @@ func (e *Extension) Broadcaster() TxBroadcaster { return e.broadcaster } func (e *Extension) SetNodeSigner(s auth.Signer) { e.nodeSigner = s } func (e *Extension) NodeSigner() auth.Signer { return e.nodeSigner } +// ShutdownContext returns the extension's shutdown context, creating it if needed. +// This context is cancelled when Close() is called, allowing graceful termination +// of long-running operations like the scheduler. +func (e *Extension) ShutdownContext() context.Context { + e.mu.Lock() + defer e.mu.Unlock() + if e.shutdownCtx == nil { + e.shutdownCtx, e.shutdownCancel = context.WithCancel(context.Background()) + } + return e.shutdownCtx +} + // startRetryWorker starts the background config reload retry worker func (e *Extension) startRetryWorker() { e.retryMu.Lock() @@ -297,7 +313,8 @@ func (e *Extension) applyConfigChangeWithLock(ctx context.Context, enabled bool, e.Logger().Debug("tn_settlement: prerequisites missing; deferring (re)start after config update") } else if e.Scheduler() != nil { e.stopSchedulerIfRunning() - if err := e.startScheduler(ctx); err != nil { + // Use extension's shutdown context for scheduler - ensures graceful shutdown + if err := e.startScheduler(e.ShutdownContext()); err != nil { e.Logger().Warn("failed to (re)start tn_settlement scheduler after config update", "error", err) } else { e.Logger().Info("tn_settlement (re)started with new schedule", "schedule", e.Schedule()) @@ -309,10 +326,17 @@ func (e *Extension) applyConfigChangeWithLock(ctx context.Context, enabled bool, } } -// Close stops background jobs +// Close stops background jobs and cancels the shutdown context func (e *Extension) Close() { e.stopRetryWorker() if e.scheduler != nil { _ = e.scheduler.Stop() } + // Cancel shutdown context to signal any operations using it + e.mu.Lock() + if e.shutdownCancel != nil { + e.shutdownCancel() + e.shutdownCancel = nil + } + e.mu.Unlock() } diff --git a/extensions/tn_settlement/internal/engine_ops.go b/extensions/tn_settlement/internal/engine_ops.go index ac09d9079..a6a70b7d0 100644 --- a/extensions/tn_settlement/internal/engine_ops.go +++ b/extensions/tn_settlement/internal/engine_ops.go @@ -6,6 +6,8 @@ import ( "strings" "time" + gethAbi "github.com/ethereum/go-ethereum/accounts/abi" + gethCommon "github.com/ethereum/go-ethereum/common" "github.com/trufnetwork/kwil-db/common" "github.com/trufnetwork/kwil-db/core/crypto/auth" "github.com/trufnetwork/kwil-db/core/log" @@ -13,11 +15,20 @@ import ( "github.com/trufnetwork/kwil-db/node/types/sql" ) +// QueryComponents holds decoded ABI-encoded query components from a market +type QueryComponents struct { + DataProvider string + StreamID string + ActionName string + ArgsBytes []byte +} + // EngineOperations wraps engine calls needed by the settlement extension type EngineOperations struct { engine common.Engine logger log.Logger db sql.DB + dbPool sql.DelayedReadTxMaker // For fresh read transactions in background jobs accounts common.Accounts } @@ -28,10 +39,11 @@ type UnsettledMarket struct { SettleTime int64 // Unix timestamp } -func NewEngineOperations(engine common.Engine, db sql.DB, accounts common.Accounts, logger log.Logger) *EngineOperations { +func NewEngineOperations(engine common.Engine, db sql.DB, dbPool sql.DelayedReadTxMaker, accounts common.Accounts, logger log.Logger) *EngineOperations { return &EngineOperations{ engine: engine, db: db, + dbPool: dbPool, accounts: accounts, logger: logger.New("settlement_ops"), } @@ -47,6 +59,23 @@ func isAccountNotFoundError(err error) bool { return strings.Contains(msg, "not found") || strings.Contains(msg, "no rows") } +// getFreshReadTx returns a fresh database connection for read operations. +// This is critical for background scheduler operations where e.db may be stale/closed. +// Returns: (db, cleanup function, error) +// The cleanup function MUST be called (via defer) to rollback the read transaction. +func (e *EngineOperations) getFreshReadTx(ctx context.Context) (sql.DB, func(), error) { + if e.dbPool != nil { + readTx := e.dbPool.BeginDelayedReadTx() + cleanup := func() { + readTx.Rollback(ctx) + } + return readTx, cleanup, nil + } + // Fallback to stored db (may fail if tx is closed, but better than nothing) + e.logger.Warn("dbPool is nil, falling back to stored db connection (may be stale)") + return e.db, func() {}, nil +} + // LoadSettlementConfig reads the single-row settlement configuration // Returns (enabled, schedule, maxMarketsPerRun, retryAttempts) // If table/row missing, returns false, "", 10, 3 and no error @@ -59,8 +88,16 @@ func (e *EngineOperations) LoadSettlementConfig(ctx context.Context) (bool, stri found bool ) + // Use fresh read transaction from pool to avoid stale connection issues + // in background scheduler contexts where e.db may be closed + db, cleanup, err := e.getFreshReadTx(ctx) + if err != nil { + return false, "", 10, 3, fmt.Errorf("get fresh read tx: %w", err) + } + defer cleanup() + // Read using engine without engine ctx (owner-level read) - err := e.engine.ExecuteWithoutEngineCtx(ctx, e.db, + err = e.engine.ExecuteWithoutEngineCtx(ctx, db, `SELECT enabled, settlement_schedule, max_markets_per_run, retry_attempts FROM main.settlement_config WHERE id = 1`, nil, func(row *common.Row) error { @@ -125,7 +162,14 @@ func (e *EngineOperations) FindUnsettledMarkets(ctx context.Context, limit int) LIMIT $limit ` - err := e.engine.ExecuteWithoutEngineCtx(ctx, e.db, query, + // Use fresh read transaction from pool to avoid stale connection issues + db, cleanup, err := e.getFreshReadTx(ctx) + if err != nil { + return nil, fmt.Errorf("get fresh read tx: %w", err) + } + defer cleanup() + + err = e.engine.ExecuteWithoutEngineCtx(ctx, db, query, map[string]any{ "current_time": currentTime, "limit": int64(limit), @@ -193,7 +237,14 @@ func (e *EngineOperations) AttestationExists(ctx context.Context, marketHash []b LIMIT 1 ` - err := e.engine.ExecuteWithoutEngineCtx(ctx, e.db, query, + // Use fresh read transaction from pool to avoid stale connection issues + db, cleanup, err := e.getFreshReadTx(ctx) + if err != nil { + return false, fmt.Errorf("get fresh read tx: %w", err) + } + defer cleanup() + + err = e.engine.ExecuteWithoutEngineCtx(ctx, db, query, map[string]any{"hash": marketHash}, func(row *common.Row) error { exists = true @@ -279,22 +330,44 @@ func (e *EngineOperations) broadcastSettleMarketWithFreshNonce( return ktypes.Hash{}, fmt.Errorf("get signer account: %w", err) } - // Fetch fresh nonce from database - account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID) + // Fetch fresh nonce from database using a fresh read transaction var nextNonce uint64 - if err != nil { - if !isAccountNotFoundError(err) { - return ktypes.Hash{}, fmt.Errorf("get account: %w", err) + if e.dbPool != nil { + readTx := e.dbPool.BeginDelayedReadTx() + defer readTx.Rollback(ctx) + + account, err := e.accounts.GetAccount(ctx, readTx, signerAccountID) + if err != nil { + if !isAccountNotFoundError(err) { + return ktypes.Hash{}, fmt.Errorf("get account: %w", err) + } + nextNonce = 1 + e.logger.Info("account not found, using nonce 1", + "account", fmt.Sprintf("%x", signerAccountID.Identifier)) + } else { + nextNonce = uint64(account.Nonce + 1) + e.logger.Info("fresh nonce from database", + "account", fmt.Sprintf("%x", signerAccountID.Identifier), + "db_nonce", account.Nonce, + "next_nonce", nextNonce) } - nextNonce = 1 - e.logger.Info("account not found, using nonce 1", - "account", fmt.Sprintf("%x", signerAccountID.Identifier)) } else { - nextNonce = uint64(account.Nonce + 1) - e.logger.Info("fresh nonce from database", - "account", fmt.Sprintf("%x", signerAccountID.Identifier), - "db_nonce", account.Nonce, - "next_nonce", nextNonce) + // Fallback to stored db (may fail if tx is closed) + account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID) + if err != nil { + if !isAccountNotFoundError(err) { + return ktypes.Hash{}, fmt.Errorf("get account: %w", err) + } + nextNonce = 1 + e.logger.Info("account not found, using nonce 1", + "account", fmt.Sprintf("%x", signerAccountID.Identifier)) + } else { + nextNonce = uint64(account.Nonce + 1) + e.logger.Info("fresh nonce from database", + "account", fmt.Sprintf("%x", signerAccountID.Identifier), + "db_nonce", account.Nonce, + "next_nonce", nextNonce) + } } // Encode query_id argument @@ -340,3 +413,242 @@ func (e *EngineOperations) broadcastSettleMarketWithFreshNonce( return hash, nil } + +// GetMarketQueryComponents fetches and decodes query_components for a market +func (e *EngineOperations) GetMarketQueryComponents(ctx context.Context, queryID int) (*QueryComponents, error) { + var queryComponentsBytes []byte + var foundRow bool + var foundData bool + + // Use fresh read transaction from pool to avoid stale connection issues + db, cleanup, err := e.getFreshReadTx(ctx) + if err != nil { + return nil, fmt.Errorf("get fresh read tx: %w", err) + } + defer cleanup() + + err = e.engine.ExecuteWithoutEngineCtx(ctx, db, + `SELECT query_components FROM ob_queries WHERE id = $query_id`, + map[string]any{"query_id": int64(queryID)}, + func(row *common.Row) error { + if len(row.Values) >= 1 { + foundRow = true + if row.Values[0] == nil { + return fmt.Errorf("query_components is NULL for query_id=%d", queryID) + } + bytes, ok := row.Values[0].([]byte) + if !ok { + return fmt.Errorf("unexpected query_components type: %T", row.Values[0]) + } + queryComponentsBytes = bytes + foundData = true + } + return nil + }) + + if err != nil { + return nil, fmt.Errorf("fetch query_components: %w", err) + } + if !foundRow { + return nil, fmt.Errorf("market not found: query_id=%d", queryID) + } + if !foundData { + return nil, fmt.Errorf("query_components missing or invalid for query_id=%d", queryID) + } + + return decodeQueryComponents(queryComponentsBytes) +} + +// decodeQueryComponents decodes ABI-encoded query components (address, bytes32, string, bytes) +func decodeQueryComponents(data []byte) (*QueryComponents, error) { + if len(data) == 0 { + return nil, fmt.Errorf("query_components is empty") + } + + addressType, err := gethAbi.NewType("address", "", nil) + if err != nil { + return nil, fmt.Errorf("create address type: %w", err) + } + bytes32Type, err := gethAbi.NewType("bytes32", "", nil) + if err != nil { + return nil, fmt.Errorf("create bytes32 type: %w", err) + } + stringType, err := gethAbi.NewType("string", "", nil) + if err != nil { + return nil, fmt.Errorf("create string type: %w", err) + } + bytesType, err := gethAbi.NewType("bytes", "", nil) + if err != nil { + return nil, fmt.Errorf("create bytes type: %w", err) + } + + args := gethAbi.Arguments{ + {Type: addressType}, + {Type: bytes32Type}, + {Type: stringType}, + {Type: bytesType}, + } + + decoded, err := args.Unpack(data) + if err != nil { + return nil, fmt.Errorf("unpack query_components: %w", err) + } + + if len(decoded) != 4 { + return nil, fmt.Errorf("expected 4 components, got %d", len(decoded)) + } + + // Extract data provider address + dataProvider, ok := decoded[0].(gethCommon.Address) + if !ok { + return nil, fmt.Errorf("invalid data_provider type: %T", decoded[0]) + } + + // Extract stream ID (bytes32 -> string, trim null padding) + streamIDBytes, ok := decoded[1].([32]byte) + if !ok { + return nil, fmt.Errorf("invalid stream_id type: %T", decoded[1]) + } + streamID := strings.TrimRight(string(streamIDBytes[:]), "\x00") + + // Extract action name + actionName, ok := decoded[2].(string) + if !ok { + return nil, fmt.Errorf("invalid action_name type: %T", decoded[2]) + } + + // Extract args bytes + argsBytes, ok := decoded[3].([]byte) + if !ok { + return nil, fmt.Errorf("invalid args_bytes type: %T", decoded[3]) + } + + return &QueryComponents{ + DataProvider: strings.ToLower(dataProvider.Hex()), + StreamID: streamID, + ActionName: actionName, + ArgsBytes: argsBytes, + }, nil +} + +// RequestAttestationForMarket broadcasts a request_attestation transaction for a market +func (e *EngineOperations) RequestAttestationForMarket( + ctx context.Context, + chainID string, + signer auth.Signer, + broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error), + market *UnsettledMarket, +) error { + // Get query components from market + components, err := e.GetMarketQueryComponents(ctx, market.ID) + if err != nil { + return fmt.Errorf("get query components: %w", err) + } + + // Get signer account ID + signerAccountID, err := ktypes.GetSignerAccount(signer) + if err != nil { + return fmt.Errorf("get signer account: %w", err) + } + + // Fetch fresh nonce from database using a fresh read transaction + var nextNonce uint64 + if e.dbPool != nil { + readTx := e.dbPool.BeginDelayedReadTx() + defer readTx.Rollback(ctx) + + account, err := e.accounts.GetAccount(ctx, readTx, signerAccountID) + if err != nil { + if !isAccountNotFoundError(err) { + return fmt.Errorf("get account: %w", err) + } + nextNonce = 1 + } else { + nextNonce = uint64(account.Nonce + 1) + } + } else { + // Fallback to stored db (may fail if tx is closed) + account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID) + if err != nil { + if !isAccountNotFoundError(err) { + return fmt.Errorf("get account: %w", err) + } + nextNonce = 1 + } else { + nextNonce = uint64(account.Nonce + 1) + } + } + + // Encode arguments for request_attestation action + // Parameters: data_provider TEXT, stream_id TEXT, action_name TEXT, args_bytes BYTEA, encrypt_sig BOOL, max_fee NUMERIC + dataProviderArg, err := ktypes.EncodeValue(components.DataProvider) + if err != nil { + return fmt.Errorf("encode data_provider: %w", err) + } + streamIDArg, err := ktypes.EncodeValue(components.StreamID) + if err != nil { + return fmt.Errorf("encode stream_id: %w", err) + } + actionNameArg, err := ktypes.EncodeValue(components.ActionName) + if err != nil { + return fmt.Errorf("encode action_name: %w", err) + } + argsBytesArg, err := ktypes.EncodeValue(components.ArgsBytes) + if err != nil { + return fmt.Errorf("encode args_bytes: %w", err) + } + encryptSigArg, err := ktypes.EncodeValue(false) + if err != nil { + return fmt.Errorf("encode encrypt_sig: %w", err) + } + // max_fee is NULL for network_writer role (exempt from fees) + maxFeeArg, err := ktypes.EncodeValue(nil) + if err != nil { + return fmt.Errorf("encode max_fee: %w", err) + } + + // Build ActionExecution payload + payload := &ktypes.ActionExecution{ + Namespace: "main", + Action: "request_attestation", + Arguments: [][]*ktypes.EncodedValue{{ + dataProviderArg, + streamIDArg, + actionNameArg, + argsBytesArg, + encryptSigArg, + maxFeeArg, + }}, + } + + // Create transaction + tx, err := ktypes.CreateNodeTransaction(payload, chainID, nextNonce) + if err != nil { + return fmt.Errorf("create tx: %w", err) + } + + // Sign transaction + if err := tx.Sign(signer); err != nil { + return fmt.Errorf("sign tx: %w", err) + } + + // Broadcast (sync mode = WaitCommit) + hash, txResult, err := broadcaster(ctx, tx, 1) + if err != nil { + return fmt.Errorf("broadcast tx: %w", err) + } + + // Check transaction result + if txResult.Code != uint32(ktypes.CodeOk) { + return fmt.Errorf("transaction failed with code %d: %s", txResult.Code, txResult.Log) + } + + e.logger.Info("request_attestation broadcast succeeded", + "query_id", market.ID, + "tx_hash", hash.String(), + "data_provider", components.DataProvider, + "stream_id", components.StreamID, + "action_name", components.ActionName) + + return nil +} diff --git a/extensions/tn_settlement/internal/engine_ops_test.go b/extensions/tn_settlement/internal/engine_ops_test.go index 1778e7a29..eb5909f63 100644 --- a/extensions/tn_settlement/internal/engine_ops_test.go +++ b/extensions/tn_settlement/internal/engine_ops_test.go @@ -1,13 +1,18 @@ package internal import ( + "bytes" "context" "errors" + "fmt" "math/big" "strings" "testing" "time" + gethAbi "github.com/ethereum/go-ethereum/accounts/abi" + gethCommon "github.com/ethereum/go-ethereum/common" + "github.com/trufnetwork/kwil-db/common" "github.com/trufnetwork/kwil-db/core/crypto" "github.com/trufnetwork/kwil-db/core/crypto/auth" "github.com/trufnetwork/kwil-db/core/log" @@ -83,7 +88,7 @@ func TestBroadcastSettleMarketWithRetry_ImmediateSuccess(t *testing.T) { if err != nil { t.Fatalf("Failed to generate key: %v", err) } - signer := auth.GetNodeSigner(priv) + signer := auth.GetUserSigner(priv) ops := &EngineOperations{ logger: log.New(), @@ -127,7 +132,7 @@ func TestBroadcastSettleMarketWithRetry_RetriesOnError(t *testing.T) { if err != nil { t.Fatalf("Failed to generate key: %v", err) } - signer := auth.GetNodeSigner(priv) + signer := auth.GetUserSigner(priv) ops := &EngineOperations{ logger: log.New(), @@ -173,7 +178,7 @@ func TestBroadcastSettleMarketWithRetry_MaxRetriesExceeded(t *testing.T) { if err != nil { t.Fatalf("Failed to generate key: %v", err) } - signer := auth.GetNodeSigner(priv) + signer := auth.GetUserSigner(priv) ops := &EngineOperations{ logger: log.New(), @@ -218,7 +223,7 @@ func TestBroadcastSettleMarketWithRetry_FreshNonceEachAttempt(t *testing.T) { if err != nil { t.Fatalf("Failed to generate key: %v", err) } - signer := auth.GetNodeSigner(priv) + signer := auth.GetUserSigner(priv) ops := &EngineOperations{ logger: log.New(), @@ -265,7 +270,7 @@ func TestBroadcastSettleMarketWithRetry_ContextCancellation(t *testing.T) { if err != nil { t.Fatalf("Failed to generate key: %v", err) } - signer := auth.GetNodeSigner(priv) + signer := auth.GetUserSigner(priv) ops := &EngineOperations{ logger: log.New(), @@ -308,7 +313,7 @@ func TestBroadcastSettleMarketWithRetry_TransactionFailure(t *testing.T) { if err != nil { t.Fatalf("Failed to generate key: %v", err) } - signer := auth.GetNodeSigner(priv) + signer := auth.GetUserSigner(priv) ops := &EngineOperations{ logger: log.New(), @@ -353,7 +358,7 @@ func TestBroadcastSettleMarketWithRetry_ExponentialBackoff(t *testing.T) { if err != nil { t.Fatalf("Failed to generate key: %v", err) } - signer := auth.GetNodeSigner(priv) + signer := auth.GetUserSigner(priv) ops := &EngineOperations{ logger: log.New(), @@ -416,7 +421,7 @@ func TestBroadcastSettleMarketWithRetry_VerifyTransactionStructure(t *testing.T) if err != nil { t.Fatalf("Failed to generate key: %v", err) } - signer := auth.GetNodeSigner(priv) + signer := auth.GetUserSigner(priv) ops := &EngineOperations{ logger: log.New(), @@ -465,3 +470,209 @@ func TestBroadcastSettleMarketWithRetry_VerifyTransactionStructure(t *testing.T) t.Logf("Successfully verified transaction structure for settle_market(query_id=%d)", queryID) } + +// ============================================================================= +// Test: Decode Query Components +// ============================================================================= + +func TestDecodeQueryComponents_ValidInput(t *testing.T) { + // Create valid ABI-encoded query components + dataProvider := "0xe5252596672cd0208a881bdb67c9df429916ba92" + streamID := "st9bc3cf61c3a88aa17f4ea5f1bad7b2" + actionName := "price_above_threshold" + argsBytes := []byte{0x01, 0x02, 0x03, 0x04} + + encoded, err := encodeQueryComponentsForTest(dataProvider, streamID, actionName, argsBytes) + if err != nil { + t.Fatalf("Failed to encode query components: %v", err) + } + + decoded, err := decodeQueryComponents(encoded) + if err != nil { + t.Fatalf("Failed to decode query components: %v", err) + } + + if decoded.DataProvider != strings.ToLower(dataProvider) { + t.Errorf("DataProvider mismatch: expected %s, got %s", strings.ToLower(dataProvider), decoded.DataProvider) + } + + if decoded.StreamID != streamID { + t.Errorf("StreamID mismatch: expected %s, got %s", streamID, decoded.StreamID) + } + + if decoded.ActionName != actionName { + t.Errorf("ActionName mismatch: expected %s, got %s", actionName, decoded.ActionName) + } + + if !bytes.Equal(decoded.ArgsBytes, argsBytes) { + t.Errorf("ArgsBytes mismatch: expected %v, got %v", argsBytes, decoded.ArgsBytes) + } +} + +func TestDecodeQueryComponents_EmptyInput(t *testing.T) { + _, err := decodeQueryComponents(nil) + if err == nil { + t.Fatal("Expected error for nil input") + } + if !strings.Contains(err.Error(), "empty") { + t.Errorf("Expected empty error, got: %v", err) + } + + _, err = decodeQueryComponents([]byte{}) + if err == nil { + t.Fatal("Expected error for empty input") + } +} + +func TestDecodeQueryComponents_InvalidInput(t *testing.T) { + // Invalid ABI data + _, err := decodeQueryComponents([]byte{0x01, 0x02, 0x03}) + if err == nil { + t.Fatal("Expected error for invalid ABI data") + } + if !strings.Contains(err.Error(), "unpack") { + t.Errorf("Expected unpack error, got: %v", err) + } +} + +// ============================================================================= +// Test: Request Attestation For Market +// ============================================================================= + +func TestRequestAttestationForMarket_VerifyTransactionStructure(t *testing.T) { + accounts := &mockAccounts{} + + var capturedTx *ktypes.Transaction + capturingBroadcaster := func(ctx context.Context, tx *ktypes.Transaction, sync uint8) (ktypes.Hash, *ktypes.TxResult, error) { + capturedTx = tx + return ktypes.Hash{1, 2, 3}, &ktypes.TxResult{ + Code: uint32(ktypes.CodeOk), + Log: "Attestation requested", + }, nil + } + + priv, _, err := crypto.GenerateSecp256k1Key(nil) + if err != nil { + t.Fatalf("Failed to generate key: %v", err) + } + signer := auth.GetUserSigner(priv) + + // Create mock engine that returns query_components + dataProvider := "0xe5252596672cd0208a881bdb67c9df429916ba92" + streamID := "st9bc3cf61c3a88aa17f4ea5f1bad7b2" + actionName := "price_above_threshold" + argsBytes := []byte{0x01, 0x02, 0x03, 0x04} + + queryComponents, err := encodeQueryComponentsForTest(dataProvider, streamID, actionName, argsBytes) + if err != nil { + t.Fatalf("Failed to encode query components: %v", err) + } + + mockEngine := &mockEngineForQueryComponents{ + queryComponents: queryComponents, + } + + ops := &EngineOperations{ + logger: log.New(), + accounts: accounts, + engine: mockEngine, + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + market := &UnsettledMarket{ + ID: 1, + Hash: []byte{0xab, 0xcd}, + SettleTime: 1234567890, + } + + err = ops.RequestAttestationForMarket(ctx, "test-chain", signer, capturingBroadcaster, market) + if err != nil { + t.Fatalf("Expected success, got error: %v", err) + } + + // Verify transaction was created + if capturedTx == nil { + t.Fatal("Transaction was not captured") + } + + // Verify transaction has signature + if capturedTx.Signature == nil || capturedTx.Signature.Data == nil { + t.Error("Transaction is not signed") + } + + // Verify payload type is ActionExecution + if capturedTx.Body.PayloadType != ktypes.PayloadTypeExecute { + t.Errorf("Expected PayloadType Execute, got %s", capturedTx.Body.PayloadType) + } + + t.Log("Successfully verified transaction structure for request_attestation") +} + +// ============================================================================= +// Helper: Encode Query Components for Testing +// ============================================================================= + +func encodeQueryComponentsForTest(dataProvider, streamID, actionName string, argsBytes []byte) ([]byte, error) { + if len(streamID) > 32 { + return nil, fmt.Errorf("streamID must be <= 32 bytes, got %d", len(streamID)) + } + addressType, err := gethAbi.NewType("address", "", nil) + if err != nil { + return nil, err + } + bytes32Type, err := gethAbi.NewType("bytes32", "", nil) + if err != nil { + return nil, err + } + stringType, err := gethAbi.NewType("string", "", nil) + if err != nil { + return nil, err + } + bytesType, err := gethAbi.NewType("bytes", "", nil) + if err != nil { + return nil, err + } + + args := gethAbi.Arguments{ + {Type: addressType}, + {Type: bytes32Type}, + {Type: stringType}, + {Type: bytesType}, + } + + addr := gethCommon.HexToAddress(dataProvider) + + var streamIDBytes32 [32]byte + copy(streamIDBytes32[:], []byte(streamID)) + + return args.Pack(addr, streamIDBytes32, actionName, argsBytes) +} + +// Mock engine for query components test +type mockEngineForQueryComponents struct { + queryComponents []byte +} + +// Compile-time check that mockEngineForQueryComponents implements common.Engine +var _ common.Engine = (*mockEngineForQueryComponents)(nil) + +func (m *mockEngineForQueryComponents) ExecuteWithoutEngineCtx(ctx context.Context, db sql.DB, stmt string, params map[string]any, fn func(*common.Row) error) error { + // Return mock query_components + row := &common.Row{ + Values: []any{m.queryComponents}, + } + return fn(row) +} + +// Satisfy the Engine interface - these are not used in our tests +func (m *mockEngineForQueryComponents) Call(ctx *common.EngineContext, db sql.DB, namespace, action string, args []any, fn func(*common.Row) error) (*common.CallResult, error) { + return nil, nil +} +func (m *mockEngineForQueryComponents) CallWithoutEngineCtx(ctx context.Context, db sql.DB, namespace, action string, args []any, fn func(*common.Row) error) (*common.CallResult, error) { + return nil, nil +} +func (m *mockEngineForQueryComponents) Execute(ctx *common.EngineContext, db sql.DB, stmt string, params map[string]any, fn func(*common.Row) error) error { + return nil +} diff --git a/extensions/tn_settlement/scheduler/scheduler.go b/extensions/tn_settlement/scheduler/scheduler.go index c51b70539..d4e367ace 100644 --- a/extensions/tn_settlement/scheduler/scheduler.go +++ b/extensions/tn_settlement/scheduler/scheduler.go @@ -20,10 +20,19 @@ type txBroadcaster interface { BroadcastTx(ctx context.Context, tx *ktypes.Transaction, sync uint8) (ktypes.Hash, *ktypes.TxResult, error) } +// EngineOps defines the operations needed by the settlement scheduler. +// This interface allows for mocking in tests. +type EngineOps interface { + FindUnsettledMarkets(ctx context.Context, limit int) ([]*internal.UnsettledMarket, error) + AttestationExists(ctx context.Context, marketHash []byte) (bool, error) + RequestAttestationForMarket(ctx context.Context, chainID string, signer auth.Signer, broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error), market *internal.UnsettledMarket) error + BroadcastSettleMarketWithRetry(ctx context.Context, chainID string, signer auth.Signer, broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error), queryID int, maxRetries int) error +} + type SettlementScheduler struct { kwilService *common.Service logger log.Logger - engineOps *internal.EngineOperations + engineOps EngineOps cron *gocron.Scheduler ctx context.Context cancel context.CancelFunc @@ -39,7 +48,7 @@ type SettlementScheduler struct { type NewSettlementSchedulerParams struct { Service *common.Service Logger log.Logger - EngineOps *internal.EngineOperations + EngineOps EngineOps Signer auth.Signer Tx txBroadcaster MaxMarketsPerRun int @@ -83,7 +92,10 @@ func (s *SettlementScheduler) Start(ctx context.Context, cronExpr string) error if s.cancel != nil { s.cancel() } - s.ctx, s.cancel = context.WithCancel(ctx) + // Use Background context instead of the passed-in context to ensure + // the scheduler's jobs continue running even if the caller's context + // (e.g., a block processing context) is canceled + s.ctx, s.cancel = context.WithCancel(context.Background()) // Clear any existing jobs to avoid duplicates on (re)start s.cron.Clear() @@ -158,11 +170,22 @@ func (s *SettlementScheduler) Start(ctx context.Context, cronExpr string) error continue } if !hasAttestation { - s.logger.Debug("attestation not yet available, skipping market", + // Request attestation for this market + s.logger.Info("attestation not available, requesting attestation", "query_id", market.ID, "settle_time", market.SettleTime) - skipped++ - continue // Not an error, just not ready yet + + if err := engineOps.RequestAttestationForMarket(jobCtx, chainID, signer, broadcaster.BroadcastTx, market); err != nil { + s.logger.Warn("failed to request attestation", + "query_id", market.ID, + "error", err) + failed++ + } else { + s.logger.Info("attestation requested successfully", + "query_id", market.ID) + skipped++ // Will settle on next run after signing + } + continue } // Broadcast settle_market transaction with retry @@ -245,7 +268,19 @@ func (s *SettlementScheduler) RunOnce(ctx context.Context) error { for _, market := range markets { hasAttestation, err := engineOps.AttestationExists(ctx, market.Hash) - if err != nil || !hasAttestation { + if err != nil { + s.logger.Error("failed to check attestation existence", + "query_id", market.ID, + "error", err) + return fmt.Errorf("check attestation for market %d: %w", market.ID, err) + } + if !hasAttestation { + // Request attestation for this market + if err := engineOps.RequestAttestationForMarket(ctx, chainID, signer, broadcaster.BroadcastTx, market); err != nil { + s.logger.Warn("failed to request attestation in RunOnce", + "query_id", market.ID, + "error", err) + } continue } diff --git a/extensions/tn_settlement/scheduler/scheduler_test.go b/extensions/tn_settlement/scheduler/scheduler_test.go index 3f4abc544..6ae31dcc5 100644 --- a/extensions/tn_settlement/scheduler/scheduler_test.go +++ b/extensions/tn_settlement/scheduler/scheduler_test.go @@ -6,10 +6,13 @@ import ( "time" "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/config" "github.com/trufnetwork/kwil-db/core/crypto" "github.com/trufnetwork/kwil-db/core/crypto/auth" "github.com/trufnetwork/kwil-db/core/log" ktypes "github.com/trufnetwork/kwil-db/core/types" + + "github.com/trufnetwork/node/extensions/tn_settlement/internal" ) // ============================================================================= @@ -87,6 +90,42 @@ func (m *mockSigner) PubKey() crypto.PublicKey { return &mockPublicKey{data: mockPubKeyData} } +// mockEngineOps implements EngineOps interface for testing. +// It signals when methods are called to verify job execution. +// It also checks context cancellation to ensure the scheduler uses its own context. +type mockEngineOps struct { + t *testing.T + onFindUnsettledMarkets func() +} + +func (m *mockEngineOps) FindUnsettledMarkets(ctx context.Context, limit int) ([]*internal.UnsettledMarket, error) { + // Check if context is canceled - this would indicate the bug regressed + // (scheduler passing parent context instead of its own internal context) + if ctx.Err() != nil { + if m.t != nil { + m.t.Fatalf("FindUnsettledMarkets called with canceled context: %v - scheduler should use its own internal context", ctx.Err()) + } + return nil, ctx.Err() + } + if m.onFindUnsettledMarkets != nil { + m.onFindUnsettledMarkets() + } + // Return empty list - no markets to settle + return []*internal.UnsettledMarket{}, nil +} + +func (m *mockEngineOps) AttestationExists(ctx context.Context, marketHash []byte) (bool, error) { + return false, nil +} + +func (m *mockEngineOps) RequestAttestationForMarket(ctx context.Context, chainID string, signer auth.Signer, broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error), market *internal.UnsettledMarket) error { + return nil +} + +func (m *mockEngineOps) BroadcastSettleMarketWithRetry(ctx context.Context, chainID string, signer auth.Signer, broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error), queryID int, maxRetries int) error { + return nil +} + // ============================================================================= // Test: Scheduler Start/Stop // ============================================================================= @@ -362,6 +401,80 @@ func TestSchedulerSetSigner(t *testing.T) { t.Log("Scheduler handled signer change correctly (thread-safe)") } +// ============================================================================= +// Test: Scheduler Job Runs After Parent Context Canceled +// This tests the bug fix where block context was passed to Start() and got +// canceled before cron jobs could run. +// ============================================================================= + +func TestSchedulerJobRunsAfterParentContextCanceled(t *testing.T) { + // Create service with GenesisConfig to pass prerequisites check + service := &common.Service{ + Logger: log.New(), + GenesisConfig: &config.GenesisConfig{ + ChainID: "test-chain", + }, + } + + broadcaster := &mockTxBroadcaster{} + signer := &mockSigner{} + + // Track if job executed - use buffered channel to avoid blocking + jobExecuted := make(chan struct{}, 1) + + // Create mock EngineOps that signals when FindUnsettledMarkets is called + // The mock also checks that ctx is not canceled - if it is, the test fails + // because the scheduler should use its own internal context, not the parent context + mockOps := &mockEngineOps{ + t: t, + onFindUnsettledMarkets: func() { + // Signal that the job ran (non-blocking send) + select { + case jobExecuted <- struct{}{}: + default: + } + }, + } + + scheduler := NewSettlementScheduler(NewSettlementSchedulerParams{ + Service: service, + Logger: log.New(), + EngineOps: mockOps, // Mock that signals when FindUnsettledMarkets is called + Tx: broadcaster, + Signer: signer, + MaxMarketsPerRun: 10, + RetryAttempts: 3, + }) + + // Simulate the bug: create a context that will be canceled immediately + // (like a block processing context) + blockCtx, cancelBlock := context.WithCancel(context.Background()) + + // Start scheduler - with our fix, it should use its own internal context + err := scheduler.Start(blockCtx, "* * * * * *") // Every second + if err != nil { + t.Fatalf("Failed to start scheduler: %v", err) + } + + // Immediately cancel the "block" context (simulating block processing completion) + cancelBlock() + + // Wait for the job to execute with a timeout + // The old bug would cause "context canceled" errors and the job would never run + select { + case <-jobExecuted: + t.Log("Scheduler job executed successfully even after parent context was canceled - bug fix verified!") + case <-time.After(3 * time.Second): + t.Fatal("Timeout: scheduler job never executed after parent context was canceled - the bug fix may have regressed") + } + + // Clean up + err = scheduler.Stop() + if err != nil { + t.Fatalf("Failed to stop scheduler: %v", err) + } +} + // ============================================================================= // Test: Scheduler Parameter Validation // ============================================================================= diff --git a/extensions/tn_settlement/scheduler_lifecycle.go b/extensions/tn_settlement/scheduler_lifecycle.go index 78b2ec401..5bcfb4648 100644 --- a/extensions/tn_settlement/scheduler_lifecycle.go +++ b/extensions/tn_settlement/scheduler_lifecycle.go @@ -72,7 +72,7 @@ func wireSignerAndBroadcaster(app *common.App, ext *Extension) { if pk, err := key.LoadNodeKey(keyPath); err != nil { ext.Logger().Warn("failed to load node key for signer; tn_settlement disabled until available", "path", keyPath, "error", err) } else { - ext.SetNodeSigner(auth.GetNodeSigner(pk)) + ext.SetNodeSigner(auth.GetUserSigner(pk)) } } } @@ -124,7 +124,9 @@ func settlementLeaderAcquire(ctx context.Context, app *common.App, block *common ext.Logger().Debug("tn_settlement: prerequisites missing; deferring start until broadcaster/signer/engine/service are available") return } - if err := ext.startScheduler(ctx); err != nil { + // Use extension's shutdown context - this ensures the scheduler stops on node shutdown + // even if leaderwatch callbacks don't fire + if err := ext.startScheduler(ext.ShutdownContext()); err != nil { ext.Logger().Warn("failed to start tn_settlement scheduler on leader acquire", "error", err) } else { ext.Logger().Info("tn_settlement started (leader)", "schedule", ext.Schedule()) diff --git a/extensions/tn_settlement/settlement_integration_test.go b/extensions/tn_settlement/settlement_integration_test.go index 6f68b1d2c..7db8b69a3 100644 --- a/extensions/tn_settlement/settlement_integration_test.go +++ b/extensions/tn_settlement/settlement_integration_test.go @@ -129,7 +129,7 @@ func testFindUnsettledMarkets(t *testing.T) func(context.Context, *kwilTesting.P // Test FindUnsettledMarkets accts, err := accounts.InitializeAccountStore(ctx, platform.DB, log.New()) require.NoError(t, err) - ops := internal.NewEngineOperations(platform.Engine, platform.DB, accts, log.New()) + ops := internal.NewEngineOperations(platform.Engine, platform.DB, nil, accts, log.New()) markets, err := ops.FindUnsettledMarkets(ctx, 10) require.NoError(t, err) @@ -199,7 +199,7 @@ func testAttestationExists(t *testing.T) func(context.Context, *kwilTesting.Plat // Test AttestationExists accts, err := accounts.InitializeAccountStore(ctx, platform.DB, log.New()) require.NoError(t, err) - ops := internal.NewEngineOperations(platform.Engine, platform.DB, accts, log.New()) + ops := internal.NewEngineOperations(platform.Engine, platform.DB, nil, accts, log.New()) exists, err := ops.AttestationExists(ctx, attestationHash) require.NoError(t, err) @@ -308,12 +308,12 @@ func testLoadSettlementConfig(t *testing.T) func(context.Context, *kwilTesting.P // Test LoadSettlementConfig (table exists from migration with seeded defaults) accts, err := accounts.InitializeAccountStore(ctx, platform.DB, log.New()) require.NoError(t, err) - ops := internal.NewEngineOperations(platform.Engine, platform.DB, accts, log.New()) + ops := internal.NewEngineOperations(platform.Engine, platform.DB, nil, accts, log.New()) enabled, schedule, maxMarkets, retries, err := ops.LoadSettlementConfig(ctx) require.NoError(t, err) - require.False(t, enabled, "should be false (default disabled for safety)") - require.Equal(t, "0 * * * *", schedule, "should be hourly schedule from migration") + require.True(t, enabled, "should be true (enabled by migration 041)") + require.Equal(t, "0,30 * * * *", schedule, "should be 30-minute schedule from migration 041") require.Equal(t, 10, maxMarkets) require.Equal(t, 3, retries) @@ -385,7 +385,7 @@ func testSkipMarketWithoutAttestation(t *testing.T) func(context.Context, *kwilT // Test AttestationExists should return false accts, err := accounts.InitializeAccountStore(ctx, platform.DB, log.New()) require.NoError(t, err) - ops := internal.NewEngineOperations(platform.Engine, platform.DB, accts, log.New()) + ops := internal.NewEngineOperations(platform.Engine, platform.DB, nil, accts, log.New()) exists, err := ops.AttestationExists(ctx, attestationHash) require.NoError(t, err) @@ -469,7 +469,7 @@ func testMultipleMarketsProcessing(t *testing.T) func(context.Context, *kwilTest // Test FindUnsettledMarkets with limit accts, err := accounts.InitializeAccountStore(ctx, platform.DB, log.New()) require.NoError(t, err) - ops := internal.NewEngineOperations(platform.Engine, platform.DB, accts, log.New()) + ops := internal.NewEngineOperations(platform.Engine, platform.DB, nil, accts, log.New()) markets, err := ops.FindUnsettledMarkets(ctx, 2) require.NoError(t, err) diff --git a/extensions/tn_settlement/tn_settlement.go b/extensions/tn_settlement/tn_settlement.go index 33189498a..f986edaee 100644 --- a/extensions/tn_settlement/tn_settlement.go +++ b/extensions/tn_settlement/tn_settlement.go @@ -59,7 +59,8 @@ func engineReadyHook(ctx context.Context, app *common.App) error { } // Build engine operations wrapper - engOps := internal.NewEngineOperations(app.Engine, db, app.Accounts, app.Service.Logger) + // Pass DBPool for fresh read transactions in background jobs + engOps := internal.NewEngineOperations(app.Engine, db, app.Service.DBPool, app.Accounts, app.Service.Logger) // Load schedule from config; fall back to defaults if absent enabled, schedule, maxMarkets, retries, _ := engOps.LoadSettlementConfig(ctx) diff --git a/internal/migrations/037-order-book-validation.sql b/internal/migrations/037-order-book-validation.sql index 2fef33b05..5eedc686f 100644 --- a/internal/migrations/037-order-book-validation.sql +++ b/internal/migrations/037-order-book-validation.sql @@ -10,6 +10,10 @@ * * Returns diagnostic information for debugging accounting issues. * + * NOTE: Vault balance is GLOBAL across all markets using the same bridge. + * The validation sums expected collateral across ALL unsettled markets + * to correctly validate multi-market scenarios. + * * Dependencies: * - Migration 030: ob_positions table (share positions) * - Migration 031: vault operations (lock/unlock) @@ -25,16 +29,16 @@ * * Validates market integrity by checking: * 1. Binary token parity: equal TRUE/FALSE shares (no orphan shares) - * 2. Vault collateral: balance matches obligations + * 2. Vault collateral: balance matches total obligations across ALL unsettled markets * * Returns: - * - valid_token_binaries: TRUE if total_true = total_false - * - valid_collateral: TRUE if vault balance = expected collateral - * - total_true: Count of TRUE shares (holdings + open sells) - * - total_false: Count of FALSE shares (holdings + open sells) + * - valid_token_binaries: TRUE if total_true = total_false for this market + * - valid_collateral: TRUE if vault balance = total expected collateral (all markets) + * - total_true: Count of TRUE shares for THIS market (holdings + open sells) + * - total_false: Count of FALSE shares for THIS market (holdings + open sells) * - vault_balance: Current network ownedBalance from ethereum_bridge - * - expected_collateral: Calculated expected balance - * - open_buys_value: Total escrowed buy order collateral (in cents) + * - expected_collateral: Total expected collateral across ALL unsettled markets + * - open_buys_value: Total escrowed buy order collateral for THIS market (in cents) * * Usage: * kwil-cli database call --action validate_market_collateral \ @@ -54,7 +58,7 @@ PUBLIC VIEW RETURNS ( expected_collateral NUMERIC(78, 0), open_buys_value BIGINT ) { - -- Step 1: Count TRUE shares in circulation (holdings + open sells) + -- Step 1: Count TRUE shares in circulation for THIS market (holdings + open sells) $total_true BIGINT := 0; for $row in SELECT COALESCE(SUM(amount)::BIGINT, 0::BIGINT) as total @@ -66,7 +70,7 @@ PUBLIC VIEW RETURNS ( $total_true := $row.total; } - -- Step 2: Count FALSE shares in circulation (holdings + open sells) + -- Step 2: Count FALSE shares in circulation for THIS market (holdings + open sells) $total_false BIGINT := 0; for $row in SELECT COALESCE(SUM(amount)::BIGINT, 0::BIGINT) as total @@ -78,7 +82,7 @@ PUBLIC VIEW RETURNS ( $total_false := $row.total; } - -- Step 3: Calculate open buy collateral obligations (in cents) + -- Step 3: Calculate open buy collateral obligations for THIS market (in cents) -- Buy orders: price is negative (stored in cents: -1 to -99) -- Collateral per buy order = |price| * amount / 100 (converted to dollars) -- We return the value in cents for precision @@ -92,14 +96,7 @@ PUBLIC VIEW RETURNS ( $open_buys_value := $row.total_value; } - -- Step 4: Calculate expected vault collateral (in wei, 18 decimals) - -- Total share pairs × $1.00 (10^18 wei) + open buy collateral (converted from cents) - $expected_collateral NUMERIC(78, 0); - $shares_collateral NUMERIC(78, 0) := $total_true::NUMERIC(78, 0) * '1000000000000000000'::NUMERIC(78, 0); - $buys_collateral NUMERIC(78, 0) := ($open_buys_value::NUMERIC(78, 0) * '1000000000000000000'::NUMERIC(78, 0)) / 100::NUMERIC(78, 0); - $expected_collateral := ($shares_collateral + $buys_collateral)::NUMERIC(78, 0); - - -- Step 5: Get market's bridge and retrieve actual vault balance + -- Step 4: Get market's bridge $bridge TEXT; for $row in SELECT bridge FROM ob_queries WHERE id = $query_id { $bridge := $row.bridge; @@ -108,7 +105,43 @@ PUBLIC VIEW RETURNS ( ERROR('Market not found for query_id: ' || $query_id::TEXT); } - -- The bridge.info() precompile returns network ownedBalance + -- Step 5: Calculate TOTAL expected collateral across ALL unsettled markets using same bridge + -- This fixes the multi-market validation issue where vault holds collateral for all markets + $total_shares_all_markets BIGINT := 0; + $total_buys_cents_all_markets BIGINT := 0; + + -- Sum TRUE shares (holdings + sells) across all unsettled markets with same bridge + for $row in + SELECT COALESCE(SUM(p.amount)::BIGINT, 0::BIGINT) as total + FROM ob_positions p + JOIN ob_queries q ON p.query_id = q.id + WHERE q.settled = FALSE + AND q.bridge = $bridge + AND p.outcome = TRUE + AND p.price >= 0 + { + $total_shares_all_markets := $row.total; + } + + -- Sum open buy orders (in cents) across all unsettled markets with same bridge + for $row in + SELECT COALESCE(SUM(ABS(p.price) * p.amount)::BIGINT, 0::BIGINT) as total_value + FROM ob_positions p + JOIN ob_queries q ON p.query_id = q.id + WHERE q.settled = FALSE + AND q.bridge = $bridge + AND p.price < 0 + { + $total_buys_cents_all_markets := $row.total_value; + } + + -- Calculate total expected collateral in wei (18 decimals) + $expected_collateral NUMERIC(78, 0); + $shares_collateral NUMERIC(78, 0) := $total_shares_all_markets::NUMERIC(78, 0) * '1000000000000000000'::NUMERIC(78, 0); + $buys_collateral NUMERIC(78, 0) := ($total_buys_cents_all_markets::NUMERIC(78, 0) * '1000000000000000000'::NUMERIC(78, 0)) / 100::NUMERIC(78, 0); + $expected_collateral := ($shares_collateral + $buys_collateral)::NUMERIC(78, 0); + + -- Step 6: Get actual vault balance from bridge $vault_balance NUMERIC(78, 0) := 0::NUMERIC(78, 0); $row_count INT := 0; @@ -136,7 +169,7 @@ PUBLIC VIEW RETURNS ( ERROR('Cannot validate collateral: bridge.info() returned no data. Bridge may be unavailable or not initialized.'); } - -- Step 6: Validate binary token parity + -- Step 7: Validate binary token parity for THIS market $valid_token_binaries BOOL; if $total_true = $total_false { $valid_token_binaries := TRUE; @@ -144,11 +177,8 @@ PUBLIC VIEW RETURNS ( $valid_token_binaries := FALSE; } - -- Step 7: Validate collateral balance - -- NOTE: Multi-market limitation - vault_balance is GLOBAL (all markets combined), - -- but expected_collateral is per-market. In single-market scenarios this check - -- is accurate. In multi-market scenarios, valid_collateral will be FALSE since - -- vault_balance > expected_collateral. See testMultipleMarketsIsolation for details. + -- Step 8: Validate collateral balance + -- Now compares vault balance against TOTAL expected collateral from ALL unsettled markets $valid_collateral BOOL; if $vault_balance = $expected_collateral { $valid_collateral := TRUE; @@ -156,7 +186,7 @@ PUBLIC VIEW RETURNS ( $valid_collateral := FALSE; } - -- Step 8: Return diagnostics + -- Step 9: Return diagnostics RETURN $valid_token_binaries, $valid_collateral, diff --git a/internal/migrations/041-settlement-config-actions.sql b/internal/migrations/041-settlement-config-actions.sql new file mode 100644 index 000000000..bf68491b1 --- /dev/null +++ b/internal/migrations/041-settlement-config-actions.sql @@ -0,0 +1,120 @@ +-- MIGRATION 041: SETTLEMENT CONFIG ACTIONS +-- +-- Enables automatic settlement and adds actions to manage settlement configuration. +-- Uses existing network_writer role for access control. +-- +-- Changes: +-- - Enables settlement with 30-minute schedule by default +-- - Adds update_settlement_config action (role-gated to network_writer) +-- - Adds get_settlement_config action (public view) +-- +-- Dependencies: +-- - Migration 039: settlement_config table must exist +-- - Migration 015: system:network_writer role must exist + +-- ============================================================================= +-- ENABLE SETTLEMENT BY DEFAULT (every 30 minutes: at 0 and 30 past the hour) +-- ============================================================================= +UPDATE settlement_config +SET + enabled = true, + settlement_schedule = '0,30 * * * *', + max_markets_per_run = 10, + retry_attempts = 3, + updated_at = 0 +WHERE id = 1; + +-- ============================================================================= +-- ACTION: update_settlement_config +-- ============================================================================= +-- Updates the settlement extension configuration. +-- Requires caller to be a member of system:network_writer role. +-- +-- Parameters: +-- - $enabled: Enable/disable automatic settlement (BOOL) +-- - $schedule: Cron schedule for settlement checks (TEXT) +-- - $max_markets_per_run: Max markets to process per job run (INT, 1-100) +-- - $retry_attempts: Number of retry attempts for failed settlements (INT, 1-10) +-- +-- Usage: +-- kwil-cli call-action update_settlement_config bool:true text:'0,30 * * * *' int:10 int:3 +CREATE OR REPLACE ACTION update_settlement_config( + $enabled BOOL, + $schedule TEXT, + $max_markets_per_run INT, + $retry_attempts INT +) PUBLIC { + -- Validate caller has network_writer role + $caller_addr TEXT := LOWER(@caller); + $has_role BOOL := false; + + for $row in SELECT 1 FROM role_members + WHERE role_owner = 'system' + AND role_name = 'network_writer' + AND member_address = $caller_addr { + $has_role := true; + } + + if NOT $has_role { + ERROR('caller must be a member of system:network_writer role'); + } + + -- Validate inputs + if $enabled IS NULL { + ERROR('enabled cannot be NULL'); + } + + if $schedule IS NULL OR $schedule = '' { + ERROR('schedule cannot be empty'); + } + + if $max_markets_per_run IS NULL OR $max_markets_per_run < 1 OR $max_markets_per_run > 100 { + ERROR('max_markets_per_run must be between 1 and 100'); + } + + if $retry_attempts IS NULL OR $retry_attempts < 1 OR $retry_attempts > 10 { + ERROR('retry_attempts must be between 1 and 10'); + } + + -- Update configuration + UPDATE settlement_config + SET + enabled = $enabled, + settlement_schedule = $schedule, + max_markets_per_run = $max_markets_per_run, + retry_attempts = $retry_attempts, + updated_at = @height + WHERE id = 1; +}; + +-- ============================================================================= +-- ACTION: get_settlement_config +-- ============================================================================= +-- Returns the current settlement configuration. +-- Public view action - anyone can read the config. +-- +-- Returns: +-- - enabled: Whether automatic settlement is enabled +-- - settlement_schedule: Cron schedule for settlement checks +-- - max_markets_per_run: Max markets processed per job run +-- - retry_attempts: Retry attempts for failed settlements +-- - updated_at: Block height when config was last updated +CREATE OR REPLACE ACTION get_settlement_config() +PUBLIC VIEW RETURNS TABLE( + enabled BOOL, + settlement_schedule TEXT, + max_markets_per_run INT, + retry_attempts INT, + updated_at INT8 +) { + for $row in SELECT + enabled, + settlement_schedule, + max_markets_per_run, + retry_attempts, + updated_at + FROM settlement_config + WHERE id = 1 { + RETURN NEXT $row.enabled, $row.settlement_schedule, $row.max_markets_per_run, $row.retry_attempts, $row.updated_at; + } +}; diff --git a/tests/streams/hoodi_bridge_coexistence_test.go b/tests/streams/hoodi_bridge_coexistence_test.go index 5df64930f..d64be8cc1 100644 --- a/tests/streams/hoodi_bridge_coexistence_test.go +++ b/tests/streams/hoodi_bridge_coexistence_test.go @@ -29,17 +29,20 @@ const ( testTTExtensionName = "hoodi_tt" // hoodi_tt2 bridge (second bridge) + // Must match the escrow address in migrations/erc20-bridge/000-extension.sql testTT2Chain = "hoodi" - testTT2Escrow = "0x9BD843A3ce718FE639e9968860B933b026784687" + testTT2Escrow = "0x80D9B3b6941367917816d36748C88B303f7F1415" testTT2ERC20 = "0x1591DeAa21710E0BA6CC1b15F49620C9F65B2dEd" testTT2ExtensionName = "hoodi_tt2" ) var ( - ttPointCounter int64 = 7000 // Start from 7000 for TT bridge - tt2PointCounter int64 = 8000 // Start from 8000 for TT2 bridge - ttPrevPoint *int64 // Track previous point for TT deposits - tt2PrevPoint *int64 // Track previous point for TT2 deposits + // Separate counters and prev pointers for each bridge + // Each bridge has its own ordered-sync topic (per escrow address) + ttPointCounter int64 = 7000 + tt2PointCounter int64 = 8000 + ttPrevPoint *int64 + tt2PrevPoint *int64 ) // TestHoodiBridgeCoexistence verifies that hoodi_tt and hoodi_tt2 can co-exist without interfering @@ -60,7 +63,7 @@ func TestHoodiBridgeCoexistence(t *testing.T) { // setupBridgeCoexistenceEnvironment sets up both Hoodi bridge instances func setupBridgeCoexistenceEnvironment(t *testing.T) func(ctx context.Context, platform *kwilTesting.Platform) error { return func(ctx context.Context, platform *kwilTesting.Platform) error { - // Reset point trackers + // Reset point trackers for fresh test ttPrevPoint = nil tt2PrevPoint = nil @@ -107,6 +110,10 @@ func setupBridgeCoexistenceEnvironment(t *testing.T) func(ctx context.Context, p // Test 1: Deposit to TT only affects TT balance func testTTDepositOnlyAffectsTTBalance(t *testing.T) func(ctx context.Context, platform *kwilTesting.Platform) error { return func(ctx context.Context, platform *kwilTesting.Platform) error { + // Reset point trackers - each test runs in isolated container with fresh DB + ttPrevPoint = nil + tt2PrevPoint = nil + err := erc20shim.ForTestingInitializeExtension(ctx, platform) require.NoError(t, err, "failed to re-initialize extension") @@ -135,6 +142,10 @@ func testTTDepositOnlyAffectsTTBalance(t *testing.T) func(ctx context.Context, p // Test 2: Deposit to TT2 only affects TT2 balance func testTT2DepositOnlyAffectsTT2Balance(t *testing.T) func(ctx context.Context, platform *kwilTesting.Platform) error { return func(ctx context.Context, platform *kwilTesting.Platform) error { + // Reset point trackers - each test runs in isolated container with fresh DB + ttPrevPoint = nil + tt2PrevPoint = nil + err := erc20shim.ForTestingInitializeExtension(ctx, platform) require.NoError(t, err, "failed to re-initialize extension") @@ -163,16 +174,16 @@ func testTT2DepositOnlyAffectsTT2Balance(t *testing.T) func(ctx context.Context, // Test 3: Withdrawals are isolated between bridges func testWithdrawalsAreIsolated(t *testing.T) func(ctx context.Context, platform *kwilTesting.Platform) error { return func(ctx context.Context, platform *kwilTesting.Platform) error { + // Reset point trackers - each test runs in isolated container with fresh DB + ttPrevPoint = nil + tt2PrevPoint = nil + err := erc20shim.ForTestingInitializeExtension(ctx, platform) require.NoError(t, err, "failed to re-initialize extension") userAddrVal := util.Unsafe_NewEthereumAddressFromString("0xc777777777777777777777777777777777777777") userAddr := &userAddrVal - // Reset deposit chains - ttPrevPoint = nil - tt2PrevPoint = nil - // Give user 100 TT and 100 TT2 err = giveTTBalance(ctx, platform, userAddr.Address(), "100000000000000000000") require.NoError(t, err, "failed to give TT balance") @@ -225,6 +236,10 @@ func testWithdrawalsAreIsolated(t *testing.T) func(ctx context.Context, platform // Test 4: Cross-contamination test - operations on one bridge don't affect the other func testCrossContamination(t *testing.T) func(ctx context.Context, platform *kwilTesting.Platform) error { return func(ctx context.Context, platform *kwilTesting.Platform) error { + // Reset point trackers - each test runs in isolated container with fresh DB + ttPrevPoint = nil + tt2PrevPoint = nil + err := erc20shim.ForTestingInitializeExtension(ctx, platform) require.NoError(t, err, "failed to re-initialize extension") @@ -234,10 +249,6 @@ func testCrossContamination(t *testing.T) func(ctx context.Context, platform *kw user2AddrVal := util.Unsafe_NewEthereumAddressFromString("0xc999999999999999999999999999999999999999") user2Addr := &user2AddrVal - // Reset deposit chains - ttPrevPoint = nil - tt2PrevPoint = nil - // User1: deposit 50 TT err = giveTTBalance(ctx, platform, user1Addr.Address(), "50000000000000000000") require.NoError(t, err) diff --git a/tests/streams/utils/erc20/helper.go b/tests/streams/utils/erc20/helper.go index f8edcea19..21fc50b27 100644 --- a/tests/streams/utils/erc20/helper.go +++ b/tests/streams/utils/erc20/helper.go @@ -65,7 +65,7 @@ func GetUserBalance(ctx context.Context, platform *kwilTesting.Platform, extensi escrowAddr = "0x80D9B3b6941367917816d36748C88B303f7F1415" case "sepolia_bridge": chainName = "sepolia" - escrowAddr = "0x80d9b3b6941367917816d36748c88b303f7f1415" + escrowAddr = "0x502430eD0BbE0f230215870c9C2853e126eE5Ae3" case "erc20_bridge_test": // Test-only alias used in tests/extensions/erc20/ tests chainName = "sepolia"