Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion extensions/tn_digest/engine_ops_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestBuildAndBroadcastAutoDigestTx_VerifiesTxBuildSignAndDBEffect(t *testing
require.NoError(t, err)

// Prepare EngineOperations
ops := internal.NewEngineOperations(platform.Engine, platform.DB, accts, log.New())
ops := internal.NewEngineOperations(platform.Engine, platform.DB, nil, accts, log.New())

// Generate a node signer (secp256k1)
priv, _, err := crypto.GenerateSecp256k1Key(nil)
Expand Down
62 changes: 55 additions & 7 deletions extensions/tn_digest/internal/engine_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,29 @@ type EngineOperations struct {
engine common.Engine
logger log.Logger
db sql.DB
dbPool sql.DelayedReadTxMaker // For fresh read transactions in background jobs
accounts common.Accounts
}

func NewEngineOperations(engine common.Engine, db sql.DB, accounts common.Accounts, logger log.Logger) *EngineOperations {
return &EngineOperations{engine: engine, db: db, accounts: accounts, logger: logger.New("ops")}
func NewEngineOperations(engine common.Engine, db sql.DB, dbPool sql.DelayedReadTxMaker, accounts common.Accounts, logger log.Logger) *EngineOperations {
return &EngineOperations{engine: engine, db: db, dbPool: dbPool, accounts: accounts, logger: logger.New("ops")}
}

// getFreshReadTx returns a fresh database connection for read operations.
// This is critical for background scheduler operations where e.db may be stale/closed.
// Returns: (db, cleanup function, error)
// The cleanup function MUST be called (via defer) to rollback the read transaction.
func (e *EngineOperations) getFreshReadTx(ctx context.Context) (sql.DB, func(), error) {
if e.dbPool != nil {
readTx := e.dbPool.BeginDelayedReadTx()
cleanup := func() {
readTx.Rollback(ctx)
}
return readTx, cleanup, nil
}
// Fallback to stored db (may fail if tx is closed, but better than nothing)
e.logger.Warn("dbPool is nil, falling back to stored db connection (may be stale)")
return e.db, func() {}, nil
}

// LoadDigestConfig reads the single-row digest configuration.
Expand All @@ -41,8 +59,17 @@ func (e *EngineOperations) LoadDigestConfig(ctx context.Context) (bool, string,
schedule string
found bool
)

// Use fresh read transaction from pool to avoid stale connection issues
// in background scheduler contexts where e.db may be closed
db, cleanup, err := e.getFreshReadTx(ctx)
if err != nil {
return false, "", fmt.Errorf("get fresh read tx: %w", err)
}
defer cleanup()

// Read using engine without engine ctx (owner-level read)
err := e.engine.ExecuteWithoutEngineCtx(ctx, e.db,
err = e.engine.ExecuteWithoutEngineCtx(ctx, db,
`SELECT enabled, digest_schedule FROM main.digest_config WHERE id = 1`, nil,
func(row *common.Row) error {
if len(row.Values) >= 2 {
Expand Down Expand Up @@ -79,9 +106,16 @@ func (e *EngineOperations) BuildAndBroadcastAutoDigestTx(ctx context.Context, ch
return fmt.Errorf("failed to get signer account: %w", err)
}

// Use fresh read transaction from pool to avoid stale connection issues
db, cleanup, err := e.getFreshReadTx(ctx)
if err != nil {
return fmt.Errorf("get fresh read tx: %w", err)
}
defer cleanup()

// Get account information using the accounts service directly on database
// DB interface embeds Executor, so we can use it directly
account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID)
account, err := e.accounts.GetAccount(ctx, db, signerAccountID)
var nextNonce uint64
if err != nil {
// Account doesn't exist yet - use nonce 1 for first transaction
Expand Down Expand Up @@ -148,11 +182,18 @@ func (e *EngineOperations) BroadcastAutoDigestWithArgsAndParse(
return nil, fmt.Errorf("failed to get signer account: %w", err)
}

// Use fresh read transaction from pool to avoid stale connection issues
db, cleanup, err := e.getFreshReadTx(ctx)
if err != nil {
return nil, fmt.Errorf("get fresh read tx: %w", err)
}
defer cleanup()

// Get account information using the accounts service directly on database
account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID)
account, err := e.accounts.GetAccount(ctx, db, signerAccountID)
var nextNonce uint64
if err != nil {
// Only treat not found / no rows as missing-account; fail fast on any other error
// Only treat "not found" / "no rows" as missing-account; fail fast on any other error
msg := strings.ToLower(err.Error())
if !strings.Contains(msg, "not found") && !strings.Contains(msg, "no rows") {
return nil, fmt.Errorf("get account: %w", err)
Expand Down Expand Up @@ -310,8 +351,15 @@ func (e *EngineOperations) broadcastAutoDigestWithFreshNonce(
return nil, ktypes.Hash{}, fmt.Errorf("get signer account: %w", err)
}

// Use fresh read transaction from pool to avoid stale connection issues
db, cleanup, err := e.getFreshReadTx(ctx)
if err != nil {
return nil, ktypes.Hash{}, fmt.Errorf("get fresh read tx: %w", err)
}
defer cleanup()

// ALWAYS query database for current nonce (fresh state)
account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID)
account, err := e.accounts.GetAccount(ctx, db, signerAccountID)
var nextNonce uint64
if err != nil {
// Only treat "not found" / "no rows" as missing-account
Expand Down
12 changes: 6 additions & 6 deletions extensions/tn_digest/leader_reload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func resetExtensionForTest() *Extension {
ext.SetNodeSigner(mockSigner{})
ext.SetBroadcaster(mockBroadcaster{})
// minimal engine ops
ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, &fakeDB{}, &fakeAccounts{}, log.New()))
ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, &fakeDB{}, nil, &fakeAccounts{}, log.New()))
SetExtension(ext)
return ext
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestDigest_Reload_EnablesAndStarts_WhenBecomesEnabled(t *testing.T) {

// attach EngineOps with fake DB that returns enabled on reload BEFORE first hook
fdb := &fakeDB{enabled: true, schedule: "*/5 * * * *"}
ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, &fakeAccounts{}, log.New()))
ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, nil, &fakeAccounts{}, log.New()))

// leader at height 1: disabled, no scheduler
digestLeaderAcquire(context.Background(), app, makeBlock(1, identity))
Expand Down Expand Up @@ -245,7 +245,7 @@ func TestDigest_Reload_DisablesAndStops_WhenBecomesDisabled(t *testing.T) {

// reload returns disabled
fdb := &fakeDB{enabled: false, schedule: "*/5 * * * *"}
ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, &fakeAccounts{}, log.New()))
ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, nil, &fakeAccounts{}, log.New()))
digestLeaderEndBlock(context.Background(), app, makeBlock(2, identity))

// stop should be idempotent
Expand Down Expand Up @@ -311,7 +311,7 @@ func TestDigest_Reload_TransientFailure_SucceedsAfterRetry(t *testing.T) {
schedule: "0 9 * * *",
failCount: 2,
}
ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, &fakeAccounts{}, log.New()))
ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, nil, &fakeAccounts{}, log.New()))

// Reload at height 2 - first attempt fails, triggers background retry
digestLeaderEndBlock(context.Background(), app, makeBlock(2, identity))
Expand Down Expand Up @@ -356,7 +356,7 @@ func TestDigest_Reload_AllRetriesFail_KeepsCurrentConfig(t *testing.T) {
schedule: "0 9 * * *",
failCount: 20, // More than max retries (15)
}
ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, &fakeAccounts{}, log.New()))
ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, nil, &fakeAccounts{}, log.New()))

// Reload at height 2 - first attempt fails, triggers background retry
digestLeaderEndBlock(context.Background(), app, makeBlock(2, identity))
Expand Down Expand Up @@ -396,7 +396,7 @@ func TestDigest_Reload_ContextCancellation_ExitsGracefully(t *testing.T) {
schedule: "0 9 * * *",
failCount: 10,
}
ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, &fakeAccounts{}, log.New()))
ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, nil, &fakeAccounts{}, log.New()))

// Reload triggers background retry
digestLeaderEndBlock(context.Background(), app, makeBlock(2, identity))
Expand Down
5 changes: 3 additions & 2 deletions extensions/tn_digest/tn_digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,16 @@ func InitializeExtensionWithNodeCapabilities(_ TxBroadcaster, _ auth.Signer) { I
func engineReadyHook(ctx context.Context, app *common.App) error {
logger := app.Service.Logger.New(ExtensionName)

// Use app.DB for read/query; if a separate RO pool is needed in future, wire it here.
// Use app.DB for read/query
var db sql.DB
db = app.DB
if db == nil {
logger.Warn("app.DB is nil; digest extension may not be fully operational")
}

// Build engine operations wrapper
engOps := internal.NewEngineOperations(app.Engine, db, app.Accounts, app.Service.Logger)
// Pass DBPool for fresh read transactions in background jobs
engOps := internal.NewEngineOperations(app.Engine, db, app.Service.DBPool, app.Accounts, app.Service.Logger)

// Load schedule from config; fall back to default if absent
enabled, schedule, _ := engOps.LoadDigestConfig(ctx)
Expand Down
28 changes: 26 additions & 2 deletions extensions/tn_settlement/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type Extension struct {
retrySignal chan struct{} // signal to trigger retry worker
retryMu sync.Mutex // protects retry state

// shutdown context for graceful termination
shutdownCtx context.Context
shutdownCancel context.CancelFunc

// tx submission wiring
broadcaster TxBroadcaster
nodeSigner auth.Signer
Expand Down Expand Up @@ -165,6 +169,18 @@ func (e *Extension) Broadcaster() TxBroadcaster { return e.broadcaster }
func (e *Extension) SetNodeSigner(s auth.Signer) { e.nodeSigner = s }
func (e *Extension) NodeSigner() auth.Signer { return e.nodeSigner }

// ShutdownContext returns the extension's shutdown context, creating it if needed.
// This context is cancelled when Close() is called, allowing graceful termination
// of long-running operations like the scheduler.
func (e *Extension) ShutdownContext() context.Context {
e.mu.Lock()
defer e.mu.Unlock()
if e.shutdownCtx == nil {
e.shutdownCtx, e.shutdownCancel = context.WithCancel(context.Background())
}
return e.shutdownCtx
}

// startRetryWorker starts the background config reload retry worker
func (e *Extension) startRetryWorker() {
e.retryMu.Lock()
Expand Down Expand Up @@ -297,7 +313,8 @@ func (e *Extension) applyConfigChangeWithLock(ctx context.Context, enabled bool,
e.Logger().Debug("tn_settlement: prerequisites missing; deferring (re)start after config update")
} else if e.Scheduler() != nil {
e.stopSchedulerIfRunning()
if err := e.startScheduler(ctx); err != nil {
// Use extension's shutdown context for scheduler - ensures graceful shutdown
if err := e.startScheduler(e.ShutdownContext()); err != nil {
e.Logger().Warn("failed to (re)start tn_settlement scheduler after config update", "error", err)
} else {
e.Logger().Info("tn_settlement (re)started with new schedule", "schedule", e.Schedule())
Expand All @@ -309,10 +326,17 @@ func (e *Extension) applyConfigChangeWithLock(ctx context.Context, enabled bool,
}
}

// Close stops background jobs
// Close stops background jobs and cancels the shutdown context
func (e *Extension) Close() {
e.stopRetryWorker()
if e.scheduler != nil {
_ = e.scheduler.Stop()
}
// Cancel shutdown context to signal any operations using it
e.mu.Lock()
if e.shutdownCancel != nil {
e.shutdownCancel()
e.shutdownCancel = nil
}
e.mu.Unlock()
}
Loading
Loading