From 6971df6eedefc41ab4c3d189206f46c21f516e05 Mon Sep 17 00:00:00 2001 From: Sharath Rajasekar Date: Tue, 5 May 2026 13:30:08 -0700 Subject: [PATCH 1/8] fix: tighten attestation re-verify guard to catch partial-failure retries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `AttestationService.VerifyAttestation` runs three writes serially without a wrapping DB transaction: 1. credentialSvc.IssueCredential — inserts issued_credentials row. 2. identitySvc.UpdateIdentity — flips identities.trust_level. 3. repo.Update — sets attestation_records.is_verified. Today's `if record.IsVerified` guard at the top of the function only catches retries after a fully-successful run. It does NOT catch the partial-failure window where Step 1 succeeded but Step 2 or Step 3 failed: in that state, IsVerified is still false, but AttestationRecord.CredentialID is already populated. A retry from the caller would therefore re-run Step 1 and mint a SECOND credential from the same proof. The proper fix is the full DB-transaction refactor proposed in #98 — threading bun.IDB through CredentialService.IssueCredential, IdentityService.UpdateIdentity, and AttestationRepository.Update. That is a real cross-service surface change (~7 IssueCredential call sites plus repo signature changes) and is left as the follow-up. This PR does the smaller mitigation the issue itself proposes as a fallback: extend the guard to ALSO trip when CredentialID != "". That closes the double-credential window without touching service-layer signatures. The remaining gap — Step 2 succeeding but Step 3 failing leaves trust_level promoted on a record that's not marked verified — is unchanged, but it's a strictly weaker problem (no double issuance, just an audit-trail inconsistency) and disappears entirely once the transaction work lands. ## Test plan New regression test `TestAttestationVerifyGuardCatchesPartialFailureRetry`: - runs a clean verify (ends with IsVerified=true, CredentialID set) - surgically rewinds is_verified to false in the DB to simulate the Step 2/3 failure window - asserts the retry returns 409 Conflict instead of minting a second credential Without the new condition the test fails at the second verify (returns 200 + new access token). With it, second verify is rejected as expected. Existing TestAttestationDoubleVerifyIsRejected continues to pass — the new condition is OR'd with the old one; post-success guard behavior is unchanged. Issue #98 stays open to track the full transaction refactor. --- internal/service/attestation.go | 9 +++- tests/integration/attestation_oidc_test.go | 48 ++++++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/internal/service/attestation.go b/internal/service/attestation.go index a71be0e..27dd684 100644 --- a/internal/service/attestation.go +++ b/internal/service/attestation.go @@ -140,7 +140,14 @@ func (s *AttestationService) VerifyAttestation(ctx context.Context, id, accountI if err != nil { return nil, err } - if record.IsVerified { + // IsVerified is the post-success guard. CredentialID != "" catches the + // in-progress retry window: if Step 1 (IssueCredential) succeeded on a + // prior call but Step 2 or 3 failed, the credential link is set but + // IsVerified is still false. Without this second condition, retries in + // that window mint a second credential from the same proof. Closes one + // of the two failure windows tracked by #98; the full DB-transaction + // refactor closes the rest. + if record.IsVerified || record.CredentialID != "" { return nil, fmt.Errorf("%w: record %s", ErrAttestationAlreadyVerified, record.ID) } diff --git a/tests/integration/attestation_oidc_test.go b/tests/integration/attestation_oidc_test.go index bebb6b4..9f0a471 100644 --- a/tests/integration/attestation_oidc_test.go +++ b/tests/integration/attestation_oidc_test.go @@ -2,6 +2,7 @@ package integration_test import ( "bytes" + "context" "crypto/rand" "crypto/rsa" "encoding/base64" @@ -327,6 +328,53 @@ func TestAttestationDoubleVerifyIsRejected(t *testing.T) { assertErrorBodyContains(t, second, "already verified") } +// TestAttestationVerifyGuardCatchesPartialFailureRetry pins the second +// half of the ErrAttestationAlreadyVerified guard introduced by #98: +// when CredentialID is set but IsVerified is false (the state left +// behind by a failed UpdateIdentity / repo.Update after a successful +// IssueCredential), a retry must be rejected so a second credential +// is not minted from the same proof. +// +// The partial-failure state isn't reachable through the normal HTTP flow +// — it only appears if Step 2 or Step 3 of VerifyAttestation crashes. +// We simulate it by running a clean verify, then surgically flipping +// is_verified back to false in the DB (leaving credential_id intact). +// Without the CredentialID-guard, the retry returns 200 and issues a +// second credential. +func TestAttestationVerifyGuardCatchesPartialFailureRetry(t *testing.T) { + iss := newOIDCIssuer(t) + defer iss.close() + + reg := registerAgent(t, uid("attest-partial")) + upsertOIDCPolicy(t, map[string]any{ + "issuers": []map[string]any{{"url": iss.URL}}, + }) + + token := iss.sign(map[string]any{"sub": "ci-job-partial"}) + id := submitAttestation(t, reg.AgentID, "oidc_token", token) + + first := verifyAttestation(t, id) + require.Equal(t, http.StatusOK, first.StatusCode, "first verify expected 200") + _ = first.Body.Close() + + // Simulate Step 2 / Step 3 failure: rewind IsVerified to false but + // leave CredentialID intact. This is the exact state the guard is + // meant to catch — a retry would mint a second credential without it. + _, err := testDB.NewUpdate(). + Table("attestation_records"). + Set("is_verified = false"). + Set("verified_at = NULL"). + Where("id = ?", id). + Exec(context.Background()) + require.NoError(t, err, "failed to plant partial-failure state") + + second := verifyAttestation(t, id) + defer func() { _ = second.Body.Close() }() + assert.Equal(t, http.StatusConflict, second.StatusCode, + "retry on a record with credential_id set must be rejected even when is_verified=false") + assertErrorBodyContains(t, second, "already verified") +} + // TestAttestationPolicyUpsertReactivatesDisabled verifies the upsert-against- // inactive-row bug is fixed: disabling a policy via is_active=false and then // PUTting a fresh config must update the row in place, not violate the From f7b3447264b280d7951c706efa12a596af0d121c Mon Sep 17 00:00:00 2001 From: Sharath Rajasekar Date: Tue, 5 May 2026 13:54:09 -0700 Subject: [PATCH 2/8] fix: wrap attestation verification writes in a single transaction (closes #98) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit VerifyAttestation does three independent writes: 1. credentialSvc.IssueCredential — inserts issued_credentials row. 2. identitySvc.UpdateIdentity — flips identities.trust_level. 3. repo.Update — sets attestation_records.is_verified and links credential_id. Before this change each ran in its own auto-commit transaction. A failure between steps left partial state — most painfully, Step 1 succeeding then Step 2 or Step 3 failing meant a retry would mint a second credential from the same proof, since the IsVerified guard hadn't been set yet. ## Approach Context-based tx propagation rather than threading bun.IDB through every service signature. New helper `internal/store/postgres/tx.go`: - `WithTx(ctx, tx)` attaches an in-flight tx to ctx. - `dbOrTx(ctx, fallback)` (unexported) returns the tx if ctx has one, falls back to the repo's *bun.DB handle. Three repo methods now route through dbOrTx: - postgres.CredentialRepository.Create - postgres.IdentityRepository.Update - postgres.AttestationRepository.Update Repos that don't see WithTx behave exactly as before (auto-commit per statement), so this is a strict superset of the prior behavior — every existing caller is untouched, no signatures change. VerifyAttestation now wraps the three writes: s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { ctx = postgres.WithTx(ctx, tx) // IssueCredential / UpdateIdentity / repo.Update }) AttestationService gains a `db *bun.DB` field so it can open the tx; NewAttestationService picks up one new parameter, wired in server.go. ## Why context-based and not signature-based Threading `bun.IDB` through CredentialService.IssueCredential and IdentityService.UpdateIdentity would have changed ~15 call sites across oauth.go, agent.go, handler/identity.go, handler/credential.go. The context-based pattern keeps the change local: only the repo methods that actually need to participate in a tx pick up the helper, and every other caller of those services keeps the existing signature. ## Defense in depth The CredentialID-based check in the ErrAttestationAlreadyVerified guard stays in place as belt-and-suspenders. With the transaction it's unreachable through the normal flow — partial state cannot exist. But it still catches direct DB manipulation, a future code path that bypasses the verify flow, or a transaction-commits-but-error-returned bug. Test pinning the guard remains. ## Test plan - [x] `go vet ./...` clean (root + pkg/authjwt) - [x] Full integration suite green ~10s, including: - TestAttestationOIDCVerifierHappyPath (proves the tx commits and all three writes land for a normal success path) - TestAttestationDoubleVerifyIsRejected (post-success guard) - TestAttestationVerifyGuardCatchesPartialFailureRetry (manually plants partial state to exercise the defense-in-depth guard) Closes #98. --- internal/service/attestation.go | 93 ++++++++++++++-------- internal/store/postgres/attestation.go | 5 +- internal/store/postgres/credential.go | 7 +- internal/store/postgres/identity.go | 7 +- internal/store/postgres/tx.go | 41 ++++++++++ server.go | 2 +- tests/integration/attestation_oidc_test.go | 23 +++--- 7 files changed, 124 insertions(+), 54 deletions(-) create mode 100644 internal/store/postgres/tx.go diff --git a/internal/service/attestation.go b/internal/service/attestation.go index 27dd684..5d6230d 100644 --- a/internal/service/attestation.go +++ b/internal/service/attestation.go @@ -10,6 +10,7 @@ import ( "github.com/google/uuid" "github.com/rs/zerolog/log" + "github.com/uptrace/bun" "github.com/highflame-ai/zeroid/domain" "github.com/highflame-ai/zeroid/internal/attestation" @@ -40,6 +41,10 @@ type AttestationService struct { identitySvc *IdentityService verifiers *attestation.Registry policySvc *attestation.PolicyService + // db is the *bun.DB handle used to open transactions in + // VerifyAttestation. The repo methods themselves participate via + // postgres.WithTx(ctx, tx); the service owns the tx lifecycle. + db *bun.DB // permissive is the runtime-mutable form of cfg.Attestation.AllowUnsafeDevStub. // Stored as int32 so SetPermissive can flip it without a mutex — @@ -50,13 +55,16 @@ type AttestationService struct { // NewAttestationService creates a new AttestationService. verifiers and // policySvc are required: VerifyAttestation fails closed when no verifier // is registered for a proof type or no tenant policy exists, unless -// allowUnsafeDevStub is true (transitional bypass). +// allowUnsafeDevStub is true (transitional bypass). db is required so the +// three writes in VerifyAttestation (issue credential, promote trust, +// mark record verified) can be wrapped in a single transaction. func NewAttestationService( repo *postgres.AttestationRepository, credentialSvc *CredentialService, identitySvc *IdentityService, verifiers *attestation.Registry, policySvc *attestation.PolicyService, + db *bun.DB, allowUnsafeDevStub bool, ) *AttestationService { s := &AttestationService{ @@ -65,6 +73,7 @@ func NewAttestationService( identitySvc: identitySvc, verifiers: verifiers, policySvc: policySvc, + db: db, } s.permissive.Store(allowUnsafeDevStub) return s @@ -129,12 +138,14 @@ var ErrAttestationAlreadyVerified = errors.New("attestation already verified") // - Verifier.Verify returns an error → ErrAttestationRejected. // - Record already verified → ErrAttestationAlreadyVerified (rejects retries). // -// Write ordering rationale: credential issuance runs BEFORE identity trust -// promotion, so the most common failure (IssueCredential) leaves nothing -// committed. Trust promotion and record update run last, in that order, so -// a failure between them leaves trust promoted (harmless — backed by a -// valid proof) with the record unmarked. The re-verify guard prevents a -// second IssueCredential call in that retry window. +// Atomicity: the three writes (issue credential → promote trust → +// mark record verified) run inside a single bun.RunInTx. Repos that +// touch DB (CredentialRepository.Create, IdentityRepository.Update, +// AttestationRepository.Update) read the in-flight tx from ctx via +// postgres.WithTx so every statement participates. Any failure rolls +// the lot back, so retries always see a clean slate. The +// CredentialID-based guard at the top stays as defense-in-depth in case +// a future code path manipulates the record outside this flow. func (s *AttestationService) VerifyAttestation(ctx context.Context, id, accountID, projectID string) (*VerifyAttestationResult, error) { record, err := s.repo.GetByID(ctx, id, accountID, projectID) if err != nil { @@ -201,9 +212,11 @@ func (s *AttestationService) VerifyAttestation(ctx context.Context, id, accountI return nil, fmt.Errorf("failed to load identity for verified attestation: %w", err) } - // Step 1: issue the credential. This is the most likely failure point - // (policy checks, scope derivation, signing). Running it first means - // a failure leaves no partial state behind. + // Run the three writes atomically. RunInTx commits on a nil return, + // rolls back on any non-nil error or panic. The repo methods called + // below pick the tx up from ctx via postgres.WithTx; their bun.IDB + // resolver uses the tx instead of the auto-commit DB handle, so every + // statement lands inside the same transaction. // // GrantType is fixed to client_credentials regardless of how the // identity will subsequently authenticate. Verified attestation is a @@ -212,33 +225,43 @@ func (s *AttestationService) VerifyAttestation(ctx context.Context, id, accountI // returned token represents that boot-time trust, not a user-driven // session. Downstream flows can still token-exchange / jwt-bearer // against this credential; the bootstrap shape just doesn't change. - accessToken, cred, err := s.credentialSvc.IssueCredential(ctx, IssueRequest{ - Identity: identity, - GrantType: domain.GrantTypeClientCredentials, - }) - if err != nil { - return nil, fmt.Errorf("failed to issue post-attestation credential: %w", err) - } + var ( + accessToken *domain.AccessToken + cred *domain.IssuedCredential + ) + txErr := s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { + ctx = postgres.WithTx(ctx, tx) - // Step 2: promote trust level. Backed by the just-verified proof. - promotedTrust := trustLevelForAttestation(record.Level) - if _, err := s.identitySvc.UpdateIdentity(ctx, record.IdentityID, accountID, projectID, UpdateIdentityRequest{ - TrustLevel: promotedTrust, - }); err != nil { - return nil, fmt.Errorf("failed to promote identity trust level: %w", err) - } + var err error + accessToken, cred, err = s.credentialSvc.IssueCredential(ctx, IssueRequest{ + Identity: identity, + GrantType: domain.GrantTypeClientCredentials, + }) + if err != nil { + return fmt.Errorf("failed to issue post-attestation credential: %w", err) + } - // Step 3: commit the record with verified flag, audit fields, and - // credential link in a single write. - now := time.Now() - record.IsVerified = true - record.VerifiedAt = &now - if result.ExpiresAt != nil { - record.ExpiresAt = result.ExpiresAt - } - record.CredentialID = cred.ID - if err := s.repo.Update(ctx, record); err != nil { - return nil, fmt.Errorf("failed to update attestation record: %w", err) + promotedTrust := trustLevelForAttestation(record.Level) + if _, err := s.identitySvc.UpdateIdentity(ctx, record.IdentityID, accountID, projectID, UpdateIdentityRequest{ + TrustLevel: promotedTrust, + }); err != nil { + return fmt.Errorf("failed to promote identity trust level: %w", err) + } + + now := time.Now() + record.IsVerified = true + record.VerifiedAt = &now + if result.ExpiresAt != nil { + record.ExpiresAt = result.ExpiresAt + } + record.CredentialID = cred.ID + if err := s.repo.Update(ctx, record); err != nil { + return fmt.Errorf("failed to update attestation record: %w", err) + } + return nil + }) + if txErr != nil { + return nil, txErr } return &VerifyAttestationResult{ diff --git a/internal/store/postgres/attestation.go b/internal/store/postgres/attestation.go index 1f22edf..627d071 100644 --- a/internal/store/postgres/attestation.go +++ b/internal/store/postgres/attestation.go @@ -66,8 +66,11 @@ func (r *AttestationRepository) GetHighestVerifiedLevel(ctx context.Context, ide } // Update saves changes to an attestation record (e.g., mark as verified). +// Participates in a caller-provided transaction via postgres.WithTx(ctx, tx); +// falls through to a single auto-commit update otherwise. func (r *AttestationRepository) Update(ctx context.Context, record *domain.AttestationRecord) error { - _, err := r.db.NewUpdate().Model(record). + db := dbOrTx(ctx, r.db) + _, err := db.NewUpdate().Model(record). Where("id = ?", record.ID). Exec(ctx) if err != nil { diff --git a/internal/store/postgres/credential.go b/internal/store/postgres/credential.go index 2c7a770..40e3155 100644 --- a/internal/store/postgres/credential.go +++ b/internal/store/postgres/credential.go @@ -20,9 +20,12 @@ func NewCredentialRepository(db *bun.DB) *CredentialRepository { return &CredentialRepository{db: db} } -// Create inserts a new issued credential. +// Create inserts a new issued credential. Participates in a caller-provided +// transaction via postgres.WithTx(ctx, tx); falls through to a single +// auto-commit insert otherwise. func (r *CredentialRepository) Create(ctx context.Context, cred *domain.IssuedCredential) error { - _, err := r.db.NewInsert().Model(cred).Exec(ctx) + db := dbOrTx(ctx, r.db) + _, err := db.NewInsert().Model(cred).Exec(ctx) if err != nil { return fmt.Errorf("failed to create credential: %w", err) } diff --git a/internal/store/postgres/identity.go b/internal/store/postgres/identity.go index 021d4b0..15e3e32 100644 --- a/internal/store/postgres/identity.go +++ b/internal/store/postgres/identity.go @@ -132,10 +132,13 @@ func (r *IdentityRepository) List(ctx context.Context, accountID, projectID stri return identities, total, nil } -// Update saves changes to an existing identity. +// Update saves changes to an existing identity. Participates in a caller- +// provided transaction via postgres.WithTx(ctx, tx); falls through to a +// single auto-commit update otherwise. func (r *IdentityRepository) Update(ctx context.Context, identity *domain.Identity) error { identity.ModifiedBy = middleware.GetCallerName(ctx) - _, err := r.db.NewUpdate().Model(identity). + db := dbOrTx(ctx, r.db) + _, err := db.NewUpdate().Model(identity). Where("id = ? AND account_id = ? AND project_id = ?", identity.ID, identity.AccountID, identity.ProjectID). Exec(ctx) if err != nil { diff --git a/internal/store/postgres/tx.go b/internal/store/postgres/tx.go new file mode 100644 index 0000000..5f96c80 --- /dev/null +++ b/internal/store/postgres/tx.go @@ -0,0 +1,41 @@ +package postgres + +import ( + "context" + + "github.com/uptrace/bun" +) + +// txKey is the context-value key for an in-flight transaction. Unexported +// + struct{}-typed so external packages can't collide with us in the same +// context tree. +type txKey struct{} + +// WithTx attaches a transaction to ctx. Repo methods that call +// dbOrTx pick the transaction up automatically. Use this together with +// bun.DB.RunInTx in services that need to coordinate writes across multiple +// repos atomically: +// +// err := s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { +// ctx = postgres.WithTx(ctx, tx) +// if err := s.fooRepo.Create(ctx, foo); err != nil { return err } +// if err := s.barRepo.Update(ctx, bar); err != nil { return err } +// return nil +// }) +// +// Repo callers that don't open a transaction get the repo's own *bun.DB +// handle and behave as before (auto-commit per statement). +func WithTx(ctx context.Context, tx bun.Tx) context.Context { + return context.WithValue(ctx, txKey{}, tx) +} + +// dbOrTx returns the in-flight transaction from ctx if one is set, falling +// back to the repo's default DB handle. Repo methods that participate in +// transactions call this once at the top and use the returned bun.IDB for +// every statement in the method. +func dbOrTx(ctx context.Context, fallback bun.IDB) bun.IDB { + if tx, ok := ctx.Value(txKey{}).(bun.Tx); ok { + return tx + } + return fallback +} diff --git a/server.go b/server.go index 7712b27..7f1996f 100644 --- a/server.go +++ b/server.go @@ -183,7 +183,7 @@ func NewServer(cfg Config) (*Server, error) { signalSvc := service.NewSignalService(signalRepo, credentialRepo, identityRepo) identitySvc := service.NewIdentityService(identityRepo, credentialPolicySvc, apiKeyRepo, credentialSvc, signalSvc, cfg.WIMSEDomain) attestationPolicySvc := attestation.NewPolicyService(attestationPolicyRepo, attestationVerifiers) - attestationSvc := service.NewAttestationService(attestationRepo, credentialSvc, identitySvc, attestationVerifiers, attestationPolicySvc, cfg.Attestation.AllowUnsafeDevStub) + attestationSvc := service.NewAttestationService(attestationRepo, credentialSvc, identitySvc, attestationVerifiers, attestationPolicySvc, db, cfg.Attestation.AllowUnsafeDevStub) oauthClientSvc := service.NewOAuthClientService(oauthClientRepo) apiKeySvc := service.NewAPIKeyService(apiKeyRepo, credentialPolicySvc, identitySvc) refreshTokenSvc := service.NewRefreshTokenService(refreshTokenRepo, db) diff --git a/tests/integration/attestation_oidc_test.go b/tests/integration/attestation_oidc_test.go index 9f0a471..2e8bdb5 100644 --- a/tests/integration/attestation_oidc_test.go +++ b/tests/integration/attestation_oidc_test.go @@ -328,19 +328,16 @@ func TestAttestationDoubleVerifyIsRejected(t *testing.T) { assertErrorBodyContains(t, second, "already verified") } -// TestAttestationVerifyGuardCatchesPartialFailureRetry pins the second -// half of the ErrAttestationAlreadyVerified guard introduced by #98: -// when CredentialID is set but IsVerified is false (the state left -// behind by a failed UpdateIdentity / repo.Update after a successful -// IssueCredential), a retry must be rejected so a second credential -// is not minted from the same proof. -// -// The partial-failure state isn't reachable through the normal HTTP flow -// — it only appears if Step 2 or Step 3 of VerifyAttestation crashes. -// We simulate it by running a clean verify, then surgically flipping -// is_verified back to false in the DB (leaving credential_id intact). -// Without the CredentialID-guard, the retry returns 200 and issues a -// second credential. +// TestAttestationVerifyGuardCatchesPartialFailureRetry pins the +// defense-in-depth guard added alongside the transaction wrap in #98: +// the three writes in VerifyAttestation now run in a single bun.RunInTx, +// so a partial-state record (CredentialID set, IsVerified=false) is +// no longer reachable through the normal flow. But the guard at the +// top of the function still trips on CredentialID != "" so a record +// in that state — reachable only via direct DB manipulation, a future +// code path that bypasses the verify flow, or a hypothetical +// commit-then-error-return bug — does not get a second credential +// minted. We plant the state by hand to exercise the guard. func TestAttestationVerifyGuardCatchesPartialFailureRetry(t *testing.T) { iss := newOIDCIssuer(t) defer iss.close() From aa07997c1ed1aee63f969cd81dabbc695314fff3 Mon Sep 17 00:00:00 2001 From: Sharath Rajasekar Date: Tue, 5 May 2026 14:34:11 -0700 Subject: [PATCH 3/8] fix: address self-review findings on the attestation tx wrap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three follow-ups on the transaction-wrap PR: R1. Update the stale ErrAttestationAlreadyVerified comment. The original guard-tighten commit said "Closes one of the two failure windows tracked by #98; the full DB-transaction refactor closes the rest." After the tx wrap landed in the previous commit on this branch, the full refactor IS this PR — the comment misled a future reader about what was load-bearing. Rewritten to make clear the guard is now strictly defense- in-depth: the transaction wrap closes both failure windows; the guard only fires for direct DB manipulation, future code paths that bypass VerifyAttestation, or commit-then-error-return. R2. Route every read in the three touched repos through dbOrTx. Previously only Create / Update participated in transactions. Reads (GetByID, GetByJTI, ListByIdentity, GetHighestVerifiedLevel, RevokeAllActiveForIdentity, Revoke, GetByExternalID, GetByWIMSEURI, List, Delete) used r.db directly. In today's flow that's harmless — IssueCredential's reads don't touch tables we write to in the same tx — but it's a latent stale-read footgun: a future refactor that adds a read-after-write inside the closure would silently see pre-tx state. Now every method in postgres/{credential,identity,attestation}.go opens through dbOrTx, so the contract is uniform: if a tx is attached to ctx, the statement participates; otherwise the repo's *bun.DB falls through. No call site sees a behavior change. Other repos (apikey.go, refresh_token.go, etc.) intentionally not touched — they're not in the attestation flow and the consistent pattern can be extended on demand. R3. Add direct regression tests for the WithTx + dbOrTx mechanism. tests/integration/postgres_tx_test.go: - TestPostgresWithTxRollbackPersistsNothing: opens a tx via testDB, inserts an Identity + an IssuedCredential through the repos with WithTx attached, returns a non-nil error from the closure, and asserts neither row survives. Pins the foundational invariant behind the attestation atomicity claim — without it, every existing test in the suite could pass while the rollback semantics are silently broken. - TestPostgresWithoutTxFallsBackToAutoCommit: confirms the no-tx-in- ctx path still auto-commits per statement, so every existing call site of these repos (which doesn't open a tx) is behaviorally unchanged. ## Test plan - [x] go vet ./... clean - [x] Full integration suite green ~10s, including the two new tests - [x] Existing TestAttestationVerifyGuardCatchesPartialFailureRetry still pins the defense-in-depth guard behavior The defense-in-depth claim and the rollback claim are now each backed by their own test. --- internal/service/attestation.go | 15 +-- internal/store/postgres/attestation.go | 9 +- internal/store/postgres/credential.go | 15 ++- internal/store/postgres/identity.go | 20 ++-- tests/integration/postgres_tx_test.go | 144 +++++++++++++++++++++++++ 5 files changed, 181 insertions(+), 22 deletions(-) create mode 100644 tests/integration/postgres_tx_test.go diff --git a/internal/service/attestation.go b/internal/service/attestation.go index 5d6230d..943bf5e 100644 --- a/internal/service/attestation.go +++ b/internal/service/attestation.go @@ -151,13 +151,14 @@ func (s *AttestationService) VerifyAttestation(ctx context.Context, id, accountI if err != nil { return nil, err } - // IsVerified is the post-success guard. CredentialID != "" catches the - // in-progress retry window: if Step 1 (IssueCredential) succeeded on a - // prior call but Step 2 or 3 failed, the credential link is set but - // IsVerified is still false. Without this second condition, retries in - // that window mint a second credential from the same proof. Closes one - // of the two failure windows tracked by #98; the full DB-transaction - // refactor closes the rest. + // Idempotency guard. The transaction wrap below already makes partial + // state unreachable through this flow — if any of the three writes + // fails, the tx rolls back and IsVerified / CredentialID stay zero. + // This guard is therefore defense-in-depth: it catches direct DB + // manipulation, a future code path that bypasses VerifyAttestation but + // mutates the record, or the rare commit-then-error-return failure + // mode where the tx committed but RunInTx still surfaces an error. + // Cheap belt next to the suspenders. if record.IsVerified || record.CredentialID != "" { return nil, fmt.Errorf("%w: record %s", ErrAttestationAlreadyVerified, record.ID) } diff --git a/internal/store/postgres/attestation.go b/internal/store/postgres/attestation.go index 627d071..e386f6a 100644 --- a/internal/store/postgres/attestation.go +++ b/internal/store/postgres/attestation.go @@ -21,7 +21,8 @@ func NewAttestationRepository(db *bun.DB) *AttestationRepository { // Create inserts a new attestation record. func (r *AttestationRepository) Create(ctx context.Context, record *domain.AttestationRecord) error { - _, err := r.db.NewInsert().Model(record).Exec(ctx) + db := dbOrTx(ctx, r.db) + _, err := db.NewInsert().Model(record).Exec(ctx) if err != nil { return fmt.Errorf("failed to create attestation record: %w", err) } @@ -31,7 +32,8 @@ func (r *AttestationRepository) Create(ctx context.Context, record *domain.Attes // GetByID retrieves an attestation record by its UUID. func (r *AttestationRepository) GetByID(ctx context.Context, id, accountID, projectID string) (*domain.AttestationRecord, error) { record := &domain.AttestationRecord{} - err := r.db.NewSelect().Model(record). + db := dbOrTx(ctx, r.db) + err := db.NewSelect().Model(record). Where("id = ?", id). Where("account_id = ?", accountID). Where("project_id = ?", projectID). @@ -46,7 +48,8 @@ func (r *AttestationRepository) GetByID(ctx context.Context, id, accountID, proj // Returns an empty string if no verified attestation exists. func (r *AttestationRepository) GetHighestVerifiedLevel(ctx context.Context, identityID string) (string, error) { var level string - err := r.db.NewSelect(). + db := dbOrTx(ctx, r.db) + err := db.NewSelect(). TableExpr("attestation_records"). ColumnExpr("level"). Where("identity_id = ?", identityID). diff --git a/internal/store/postgres/credential.go b/internal/store/postgres/credential.go index 40e3155..c9cf488 100644 --- a/internal/store/postgres/credential.go +++ b/internal/store/postgres/credential.go @@ -35,7 +35,8 @@ func (r *CredentialRepository) Create(ctx context.Context, cred *domain.IssuedCr // GetByID retrieves a credential by its UUID. func (r *CredentialRepository) GetByID(ctx context.Context, id, accountID, projectID string) (*domain.IssuedCredential, error) { cred := &domain.IssuedCredential{} - err := r.db.NewSelect().Model(cred). + db := dbOrTx(ctx, r.db) + err := db.NewSelect().Model(cred). Where("id = ?", id). Where("account_id = ?", accountID). Where("project_id = ?", projectID). @@ -49,7 +50,8 @@ func (r *CredentialRepository) GetByID(ctx context.Context, id, accountID, proje // GetByJTI retrieves a credential by its JWT ID (jti claim). func (r *CredentialRepository) GetByJTI(ctx context.Context, jti string) (*domain.IssuedCredential, error) { cred := &domain.IssuedCredential{} - err := r.db.NewSelect().Model(cred). + db := dbOrTx(ctx, r.db) + err := db.NewSelect().Model(cred). Where("jti = ?", jti). Scan(ctx) if err != nil { @@ -61,7 +63,8 @@ func (r *CredentialRepository) GetByJTI(ctx context.Context, jti string) (*domai // ListByIdentity returns all credentials for a given identity. func (r *CredentialRepository) ListByIdentity(ctx context.Context, identityID, accountID, projectID string) ([]*domain.IssuedCredential, error) { var creds []*domain.IssuedCredential - err := r.db.NewSelect().Model(&creds). + db := dbOrTx(ctx, r.db) + err := db.NewSelect().Model(&creds). Where("identity_id = ?", identityID). Where("account_id = ?", accountID). Where("project_id = ?", projectID). @@ -82,7 +85,8 @@ func (r *CredentialRepository) ListByIdentity(ctx context.Context, identityID, a func (r *CredentialRepository) RevokeAllActiveForIdentity(ctx context.Context, identityID, reason string) (int64, error) { now := time.Now() var count int64 - if err := r.db.NewRaw( + db := dbOrTx(ctx, r.db) + if err := db.NewRaw( "SELECT revoke_credentials_cascade(?, ?, ?)", identityID, now, reason, ).Scan(ctx, &count); err != nil { @@ -98,7 +102,8 @@ func (r *CredentialRepository) RevokeAllActiveForIdentity(ctx context.Context, i func (r *CredentialRepository) Revoke(ctx context.Context, id, accountID, projectID, reason string) error { now := time.Now() var count int64 - if err := r.db.NewRaw( + db := dbOrTx(ctx, r.db) + if err := db.NewRaw( "SELECT revoke_credential_cascade(?, ?, ?, ?, ?)", id, accountID, projectID, now, reason, ).Scan(ctx, &count); err != nil { diff --git a/internal/store/postgres/identity.go b/internal/store/postgres/identity.go index 15e3e32..7218039 100644 --- a/internal/store/postgres/identity.go +++ b/internal/store/postgres/identity.go @@ -25,7 +25,8 @@ func NewIdentityRepository(db *bun.DB) *IdentityRepository { // Create inserts a new identity. func (r *IdentityRepository) Create(ctx context.Context, identity *domain.Identity) error { - _, err := r.db.NewInsert().Model(identity).Exec(ctx) + db := dbOrTx(ctx, r.db) + _, err := db.NewInsert().Model(identity).Exec(ctx) if err != nil { return fmt.Errorf("failed to create identity: %w", err) } @@ -35,7 +36,8 @@ func (r *IdentityRepository) Create(ctx context.Context, identity *domain.Identi // GetByID retrieves an identity by its UUID, scoped to account + project. func (r *IdentityRepository) GetByID(ctx context.Context, id, accountID, projectID string) (*domain.Identity, error) { identity := &domain.Identity{} - err := r.db.NewSelect().Model(identity). + db := dbOrTx(ctx, r.db) + err := db.NewSelect().Model(identity). Where("id = ?", id). Where("account_id = ?", accountID). Where("project_id = ?", projectID). @@ -49,7 +51,8 @@ func (r *IdentityRepository) GetByID(ctx context.Context, id, accountID, project // GetByExternalID retrieves an identity by external ID within a tenant. func (r *IdentityRepository) GetByExternalID(ctx context.Context, externalID, accountID, projectID string) (*domain.Identity, error) { identity := &domain.Identity{} - err := r.db.NewSelect().Model(identity). + db := dbOrTx(ctx, r.db) + err := db.NewSelect().Model(identity). Where("external_id = ?", externalID). Where("account_id = ?", accountID). Where("project_id = ?", projectID). @@ -63,7 +66,8 @@ func (r *IdentityRepository) GetByExternalID(ctx context.Context, externalID, ac // GetByWIMSEURI retrieves an identity by its WIMSE URI, scoped to tenant. func (r *IdentityRepository) GetByWIMSEURI(ctx context.Context, wimseURI, accountID, projectID string) (*domain.Identity, error) { identity := &domain.Identity{} - err := r.db.NewSelect().Model(identity). + db := dbOrTx(ctx, r.db) + err := db.NewSelect().Model(identity). Where("wimse_uri = ?", wimseURI). Where("account_id = ?", accountID). Where("project_id = ?", projectID). @@ -79,7 +83,8 @@ func (r *IdentityRepository) GetByWIMSEURI(ctx context.Context, wimseURI, accoun // and filters using JSONB containment: labels @> {"key": "value"}. func (r *IdentityRepository) List(ctx context.Context, accountID, projectID string, identityTypes []string, label, trustLevel, isActive, search string, limit, offset int) ([]*domain.Identity, int, error) { var identities []*domain.Identity - q := r.db.NewSelect().Model(&identities). + db := dbOrTx(ctx, r.db) + q := db.NewSelect().Model(&identities). Where("account_id = ?", accountID). Where("project_id = ?", projectID). OrderExpr("created_at DESC") @@ -149,15 +154,16 @@ func (r *IdentityRepository) Update(ctx context.Context, identity *domain.Identi // Delete removes an identity. func (r *IdentityRepository) Delete(ctx context.Context, id, accountID, projectID string) error { + db := dbOrTx(ctx, r.db) // Pre-stamp modified_by so the AFTER DELETE trigger can read the actor from OLD.modified_by. if callerID := middleware.GetCallerName(ctx); callerID != "" { - _, _ = r.db.NewUpdate(). + _, _ = db.NewUpdate(). TableExpr("identities"). Set("modified_by = ?", callerID). Where("id = ? AND account_id = ? AND project_id = ?", id, accountID, projectID). Exec(ctx) } - _, err := r.db.NewDelete(). + _, err := db.NewDelete(). TableExpr("identities"). Where("id = ?", id). Where("account_id = ?", accountID). diff --git a/tests/integration/postgres_tx_test.go b/tests/integration/postgres_tx_test.go new file mode 100644 index 0000000..d9c7977 --- /dev/null +++ b/tests/integration/postgres_tx_test.go @@ -0,0 +1,144 @@ +package integration_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uptrace/bun" + + "github.com/highflame-ai/zeroid/domain" + "github.com/highflame-ai/zeroid/internal/store/postgres" +) + +// TestPostgresWithTxRollbackPersistsNothing pins the foundational invariant +// behind the #98 transaction wrap in AttestationService.VerifyAttestation: +// when a closure passed to bun.RunInTx returns a non-nil error, every +// participating repo write rolls back. Without this guarantee, the +// attestation atomicity claim collapses — Step 1's IssueCredential could +// commit even when Step 2 or 3 fails inside the closure. +// +// This test exercises the WithTx + dbOrTx mechanism end-to-end against +// the real Postgres testcontainer, separately from the higher-level +// VerifyAttestation flow that uses it. We run an Identity insert + an +// IssuedCredential insert inside a tx, force an error after both writes, +// and confirm neither row landed in the DB. +func TestPostgresWithTxRollbackPersistsNothing(t *testing.T) { + ctx := context.Background() + identityRepo := postgres.NewIdentityRepository(testDB) + credRepo := postgres.NewCredentialRepository(testDB) + + identityID := uuid.NewString() + credID := uuid.NewString() + jti := "tx-rollback-" + uuid.NewString() + + rollbackErr := errors.New("force rollback") + + txErr := testDB.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { + ctx = postgres.WithTx(ctx, tx) + + identity := &domain.Identity{ + ID: identityID, + AccountID: testAccountID, + ProjectID: testProjectID, + ExternalID: "tx-rollback-id-" + identityID[:8], + Name: "tx rollback fixture", + IdentityType: domain.IdentityTypeAgent, + TrustLevel: domain.TrustLevelUnverified, + Status: domain.IdentityStatusActive, + WIMSEURI: "spiffe://test/" + identityID, + AllowedScopes: []string{}, + Capabilities: []byte("{}"), + Labels: []byte("{}"), + Metadata: []byte("{}"), + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + if err := identityRepo.Create(ctx, identity); err != nil { + return err + } + + cred := &domain.IssuedCredential{ + ID: credID, + IdentityID: &identityID, + AccountID: testAccountID, + ProjectID: testProjectID, + JTI: jti, + Subject: identity.WIMSEURI, + Scopes: []string{}, + IssuedAt: time.Now(), + ExpiresAt: time.Now().Add(time.Hour), + TTLSeconds: 3600, + GrantType: domain.GrantTypeClientCredentials, + } + if err := credRepo.Create(ctx, cred); err != nil { + return err + } + + return rollbackErr + }) + + require.Error(t, txErr) + require.ErrorIs(t, txErr, rollbackErr, + "the closure's error must surface to the caller — RunInTx should not swallow it") + + // Foundational claim: nothing committed. + identityCount, err := testDB.NewSelect(). + Table("identities"). + Where("id = ?", identityID). + Count(ctx) + require.NoError(t, err) + assert.Zero(t, identityCount, "identity row must not exist after rollback") + + credCount, err := testDB.NewSelect(). + Table("issued_credentials"). + Where("id = ?", credID). + Count(ctx) + require.NoError(t, err) + assert.Zero(t, credCount, "credential row must not exist after rollback") +} + +// TestPostgresWithoutTxFallsBackToAutoCommit pins the other half of the +// dbOrTx contract: when no tx is attached to ctx, repo writes use the +// repo's default *bun.DB handle and auto-commit per statement, exactly +// as before the transaction work was introduced. Without this property, +// every existing call site of these repos would have changed behavior. +func TestPostgresWithoutTxFallsBackToAutoCommit(t *testing.T) { + ctx := context.Background() + identityRepo := postgres.NewIdentityRepository(testDB) + + identityID := uuid.NewString() + identity := &domain.Identity{ + ID: identityID, + AccountID: testAccountID, + ProjectID: testProjectID, + ExternalID: "tx-fallback-id-" + identityID[:8], + Name: "tx fallback fixture", + IdentityType: domain.IdentityTypeAgent, + TrustLevel: domain.TrustLevelUnverified, + Status: domain.IdentityStatusActive, + WIMSEURI: "spiffe://test-fallback/" + identityID, + AllowedScopes: []string{}, + Capabilities: []byte("{}"), + Labels: []byte("{}"), + Metadata: []byte("{}"), + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + require.NoError(t, identityRepo.Create(ctx, identity), + "Create with no tx in ctx must succeed via auto-commit") + + count, err := testDB.NewSelect(). + Table("identities"). + Where("id = ?", identityID). + Count(ctx) + require.NoError(t, err) + assert.Equal(t, 1, count, "identity row must be persisted by the auto-commit path") + + // Tidy. No tx so this also auto-commits. + require.NoError(t, identityRepo.Delete(ctx, identityID, testAccountID, testProjectID)) +} From 0d6db59a0a86712bf2daf086b2f289666bcbc1d6 Mon Sep 17 00:00:00 2001 From: Sharath Rajasekar Date: Tue, 5 May 2026 14:39:06 -0700 Subject: [PATCH 4/8] fix: serialize concurrent attestation verifies + tighten tx scope MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses the remaining items from the self-review on the #98 tx wrap: W1. Concurrent-verify race. Two simultaneous /verify calls on the same record could each pass the pre-tx IsVerified guard, each enter RunInTx, each call IssueCredential, and leave the DB with two credentials minted from one proof. The tx wrap by itself doesn't fix this — both txs would commit successfully, just at different times. Fix: SELECT ... FOR UPDATE on the attestation record as the FIRST statement inside the tx (new repo method GetByIDForUpdate). The second verify queues on the row lock; when it acquires, the first has already written CredentialID and the in-tx guard re-check rejects it with ErrAttestationAlreadyVerified. Pinned by TestAttestationConcurrentVerifyMintsExactlyOneCredential: fires N=8 concurrent /verify calls on the same record and asserts exactly one returns 200, the other seven return 409, and the DB ends with exactly one credential row for the identity. W2. Identity read moved inside the tx. GetIdentity used to run before RunInTx, so a concurrent UpdateIdentity that deactivated the identity between the read and the tx could mint a credential against a stale view. Now reads happen inside the closure, under the same READ COMMITTED snapshot as the writes. W4. Isolation level documented. Comment on the RunInTx call notes that nil TxOptions = READ COMMITTED and that the row lock provides serialization; we don't need REPEATABLE READ across the whole tx. N1. Outer return values assigned only on success. The previous version captured `record`/`accessToken`/`cred` as outer vars and wrote into `record` mid-closure. On rollback the outer `record` was left holding "verified-looking" fields (no observable bug because of the early txErr return, but a footgun for future code that reads after the error check). Now: closure-scoped locals (`locked`, `issued`, `issuedCred`) hold in-progress state, and the three outer vars are assigned exactly once at the end of the closure. A rollback path never assigns them, so the caller can't observe partial state by accident. N3. WithTx nesting documented. Added a paragraph to WithTx's doc comment noting that re-attaching on a ctx that already carries a tx replaces it (no savepoints, no merge), and that the right pattern is "open one tx at the outermost service call, attach once." ## Test plan - [x] go vet ./... clean - [x] go build ./... clean - [x] Full integration suite green ~10s, including: - TestAttestationConcurrentVerifyMintsExactlyOneCredential (NEW — pins the row lock against an N=8 concurrent attack) - TestAttestationOIDCVerifierHappyPath (regression — happy path still commits all three writes) - TestAttestationDoubleVerifyIsRejected (post-success guard) - TestAttestationVerifyGuardCatchesPartialFailureRetry (defense-in-depth guard against direct DB manipulation) - TestPostgresWithTxRollbackPersistsNothing / TestPostgresWithoutTxFallsBackToAutoCommit (foundational WithTx + dbOrTx mechanism) W3 (cryptographic signing inside the tx) is intentionally left as-is. It's a per-attestation cost, not a hot-path concern, and refactoring would mean either pre-computing the signature outside the tx (loses ability to use ctx-derived state) or splitting IssueCredential into sign + persist phases (broader surface change with no measurable gain). --- internal/service/attestation.go | 102 +++++++++++++-------- internal/store/postgres/attestation.go | 27 ++++++ internal/store/postgres/tx.go | 7 ++ tests/integration/attestation_oidc_test.go | 66 +++++++++++++ 4 files changed, 163 insertions(+), 39 deletions(-) diff --git a/internal/service/attestation.go b/internal/service/attestation.go index 943bf5e..c7c1d2b 100644 --- a/internal/service/attestation.go +++ b/internal/service/attestation.go @@ -138,12 +138,17 @@ var ErrAttestationAlreadyVerified = errors.New("attestation already verified") // - Verifier.Verify returns an error → ErrAttestationRejected. // - Record already verified → ErrAttestationAlreadyVerified (rejects retries). // -// Atomicity: the three writes (issue credential → promote trust → -// mark record verified) run inside a single bun.RunInTx. Repos that -// touch DB (CredentialRepository.Create, IdentityRepository.Update, -// AttestationRepository.Update) read the in-flight tx from ctx via -// postgres.WithTx so every statement participates. Any failure rolls -// the lot back, so retries always see a clean slate. The +// Atomicity + serialization: the three writes (issue credential → promote +// trust → mark record verified) run inside a single bun.RunInTx, and the +// transaction starts by re-fetching the attestation record with +// SELECT ... FOR UPDATE so two concurrent /verify calls on the same record +// serialize. Without the lock both callers would otherwise pass the +// pre-tx guard, both enter the closure, and both INSERT a credential — +// leaving two credentials minted from one proof. Repos that touch the DB +// (CredentialRepository.Create, IdentityRepository.Update, +// AttestationRepository.Update / .GetByIDForUpdate) read the in-flight tx +// from ctx via postgres.WithTx so every statement participates. Any +// failure rolls the lot back, so retries always see a clean slate. The // CredentialID-based guard at the top stays as defense-in-depth in case // a future code path manipulates the record outside this flow. func (s *AttestationService) VerifyAttestation(ctx context.Context, id, accountID, projectID string) (*VerifyAttestationResult, error) { @@ -151,14 +156,10 @@ func (s *AttestationService) VerifyAttestation(ctx context.Context, id, accountI if err != nil { return nil, err } - // Idempotency guard. The transaction wrap below already makes partial - // state unreachable through this flow — if any of the three writes - // fails, the tx rolls back and IsVerified / CredentialID stay zero. - // This guard is therefore defense-in-depth: it catches direct DB - // manipulation, a future code path that bypasses VerifyAttestation but - // mutates the record, or the rare commit-then-error-return failure - // mode where the tx committed but RunInTx still surfaces an error. - // Cheap belt next to the suspenders. + // Pre-tx fast-fail. Cheap rejection before we open a connection, + // fetch JWKS, run the verifier, etc. The authoritative check happens + // inside the tx with the row locked; this one just shaves work off + // the obvious-already-done case. if record.IsVerified || record.CredentialID != "" { return nil, fmt.Errorf("%w: record %s", ErrAttestationAlreadyVerified, record.ID) } @@ -205,19 +206,15 @@ func (s *AttestationService) VerifyAttestation(ctx context.Context, id, accountI } } - // Load the identity without promoting yet — IssueCredential needs a - // valid, non-nil, usable identity and re-fetching guarantees we see - // the current state (another request might have deactivated it). - identity, err := s.identitySvc.GetIdentity(ctx, record.IdentityID, accountID, projectID) - if err != nil { - return nil, fmt.Errorf("failed to load identity for verified attestation: %w", err) - } - - // Run the three writes atomically. RunInTx commits on a nil return, - // rolls back on any non-nil error or panic. The repo methods called - // below pick the tx up from ctx via postgres.WithTx; their bun.IDB - // resolver uses the tx instead of the auto-commit DB handle, so every - // statement lands inside the same transaction. + // All side-effects below happen inside RunInTx. Local closure-scoped + // vars hold the result; the outer return values are assigned exactly + // once on commit, so a rollback can't leave a partially-mutated + // record visible to the caller. + // + // Isolation: nil TxOptions means Postgres default (READ COMMITTED). + // That's sufficient because we serialize the only contended row with + // SELECT ... FOR UPDATE; we don't need REPEATABLE READ semantics + // across the whole transaction. // // GrantType is fixed to client_credentials regardless of how the // identity will subsequently authenticate. Verified attestation is a @@ -227,14 +224,34 @@ func (s *AttestationService) VerifyAttestation(ctx context.Context, id, accountI // session. Downstream flows can still token-exchange / jwt-bearer // against this credential; the bootstrap shape just doesn't change. var ( - accessToken *domain.AccessToken - cred *domain.IssuedCredential + accessToken *domain.AccessToken + cred *domain.IssuedCredential + updatedRecord *domain.AttestationRecord ) txErr := s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { ctx = postgres.WithTx(ctx, tx) - var err error - accessToken, cred, err = s.credentialSvc.IssueCredential(ctx, IssueRequest{ + // Re-fetch with row lock. Concurrent verifies on the same record + // queue here; the second one re-reads the row after the first + // commits, sees CredentialID set, and bails out below. + locked, err := s.repo.GetByIDForUpdate(ctx, id, accountID, projectID) + if err != nil { + return fmt.Errorf("failed to lock attestation record: %w", err) + } + if locked.IsVerified || locked.CredentialID != "" { + return fmt.Errorf("%w: record %s", ErrAttestationAlreadyVerified, locked.ID) + } + + // Re-load the identity inside the tx so we don't act on a stale + // snapshot from before someone deactivated it. With READ COMMITTED + // this read sees committed state as of the statement, so a + // concurrent UpdateIdentity that already finished will be visible. + identity, err := s.identitySvc.GetIdentity(ctx, locked.IdentityID, accountID, projectID) + if err != nil { + return fmt.Errorf("failed to load identity for verified attestation: %w", err) + } + + issued, issuedCred, err := s.credentialSvc.IssueCredential(ctx, IssueRequest{ Identity: identity, GrantType: domain.GrantTypeClientCredentials, }) @@ -242,23 +259,30 @@ func (s *AttestationService) VerifyAttestation(ctx context.Context, id, accountI return fmt.Errorf("failed to issue post-attestation credential: %w", err) } - promotedTrust := trustLevelForAttestation(record.Level) - if _, err := s.identitySvc.UpdateIdentity(ctx, record.IdentityID, accountID, projectID, UpdateIdentityRequest{ + promotedTrust := trustLevelForAttestation(locked.Level) + if _, err := s.identitySvc.UpdateIdentity(ctx, locked.IdentityID, accountID, projectID, UpdateIdentityRequest{ TrustLevel: promotedTrust, }); err != nil { return fmt.Errorf("failed to promote identity trust level: %w", err) } now := time.Now() - record.IsVerified = true - record.VerifiedAt = &now + locked.IsVerified = true + locked.VerifiedAt = &now if result.ExpiresAt != nil { - record.ExpiresAt = result.ExpiresAt + locked.ExpiresAt = result.ExpiresAt } - record.CredentialID = cred.ID - if err := s.repo.Update(ctx, record); err != nil { + locked.CredentialID = issuedCred.ID + if err := s.repo.Update(ctx, locked); err != nil { return fmt.Errorf("failed to update attestation record: %w", err) } + + // Promote local-success values to the outer scope. Last step in + // the closure so a return-error path above never assigns a + // partial result. + accessToken = issued + cred = issuedCred + updatedRecord = locked return nil }) if txErr != nil { @@ -266,7 +290,7 @@ func (s *AttestationService) VerifyAttestation(ctx context.Context, id, accountI } return &VerifyAttestationResult{ - Record: record, + Record: updatedRecord, AccessToken: accessToken, Credential: cred, }, nil diff --git a/internal/store/postgres/attestation.go b/internal/store/postgres/attestation.go index e386f6a..a02755d 100644 --- a/internal/store/postgres/attestation.go +++ b/internal/store/postgres/attestation.go @@ -44,6 +44,33 @@ func (r *AttestationRepository) GetByID(ctx context.Context, id, accountID, proj return record, nil } +// GetByIDForUpdate retrieves an attestation record by its UUID and acquires +// a row-level write lock (Postgres SELECT ... FOR UPDATE). The lock is held +// until the surrounding transaction commits or rolls back, so this method +// MUST be called inside a transaction (postgres.WithTx) — outside one, +// FOR UPDATE on Postgres is a no-op (no tx, no lock to release into) and +// the call still succeeds, which would mask the missing serialization. +// +// Use this in flows where the same attestation must serialize against +// concurrent writers — most notably AttestationService.VerifyAttestation, +// where two simultaneous /verify calls on the same record could otherwise +// each pass the IsVerified guard, each issue a credential, and leave the +// DB with two credentials from one proof. +func (r *AttestationRepository) GetByIDForUpdate(ctx context.Context, id, accountID, projectID string) (*domain.AttestationRecord, error) { + record := &domain.AttestationRecord{} + db := dbOrTx(ctx, r.db) + err := db.NewSelect().Model(record). + Where("id = ?", id). + Where("account_id = ?", accountID). + Where("project_id = ?", projectID). + For("UPDATE"). + Scan(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get attestation record for update: %w", err) + } + return record, nil +} + // GetHighestVerifiedLevel returns the highest verified attestation level for an identity. // Returns an empty string if no verified attestation exists. func (r *AttestationRepository) GetHighestVerifiedLevel(ctx context.Context, identityID string) (string, error) { diff --git a/internal/store/postgres/tx.go b/internal/store/postgres/tx.go index 5f96c80..416ccae 100644 --- a/internal/store/postgres/tx.go +++ b/internal/store/postgres/tx.go @@ -25,6 +25,13 @@ type txKey struct{} // // Repo callers that don't open a transaction get the repo's own *bun.DB // handle and behave as before (auto-commit per statement). +// +// Nesting: calling WithTx on a context that already carries a tx +// REPLACES the parent tx for any descendant ctx — there is no automatic +// savepoint or merge. Postgres doesn't support truly nested transactions +// anyway, and bun.RunInTx returns the existing tx when called recursively. +// For most flows the right pattern is "open one tx at the outermost +// service call, attach it once, do all the work inside" — don't nest. func WithTx(ctx context.Context, tx bun.Tx) context.Context { return context.WithValue(ctx, txKey{}, tx) } diff --git a/tests/integration/attestation_oidc_test.go b/tests/integration/attestation_oidc_test.go index 2e8bdb5..0508b83 100644 --- a/tests/integration/attestation_oidc_test.go +++ b/tests/integration/attestation_oidc_test.go @@ -10,6 +10,7 @@ import ( "io" "net/http" "net/http/httptest" + "sync" "testing" "time" @@ -328,6 +329,71 @@ func TestAttestationDoubleVerifyIsRejected(t *testing.T) { assertErrorBodyContains(t, second, "already verified") } +// TestAttestationConcurrentVerifyMintsExactlyOneCredential pins the +// row-lock added in the #98 transaction wrap. Without SELECT ... FOR +// UPDATE on the attestation record, two simultaneous /verify calls +// could each pass the IsVerified guard, each enter the tx, each +// IssueCredential, and leave the DB with two credentials minted from +// one proof. The lock serializes the second verify behind the first; +// when it acquires the lock the record already has CredentialID set and +// it bails out via ErrAttestationAlreadyVerified. +func TestAttestationConcurrentVerifyMintsExactlyOneCredential(t *testing.T) { + iss := newOIDCIssuer(t) + defer iss.close() + + reg := registerAgent(t, uid("attest-race")) + upsertOIDCPolicy(t, map[string]any{ + "issuers": []map[string]any{{"url": iss.URL}}, + }) + + token := iss.sign(map[string]any{"sub": "ci-job-race"}) + id := submitAttestation(t, reg.AgentID, "oidc_token", token) + + const N = 8 + var ( + wg sync.WaitGroup + mu sync.Mutex + statusCodes []int + ) + wg.Add(N) + for range N { + go func() { + defer wg.Done() + resp := verifyAttestation(t, id) + defer func() { _ = resp.Body.Close() }() + mu.Lock() + statusCodes = append(statusCodes, resp.StatusCode) + mu.Unlock() + }() + } + wg.Wait() + + // Exactly one verify wins, the rest see ErrAttestationAlreadyVerified + // once they acquire the row lock and re-check the guard. + var ok, conflict int + for _, c := range statusCodes { + switch c { + case http.StatusOK: + ok++ + case http.StatusConflict: + conflict++ + default: + t.Errorf("unexpected status %d (want 200 or 409)", c) + } + } + assert.Equal(t, 1, ok, "exactly one concurrent verify must succeed") + assert.Equal(t, N-1, conflict, "all other concurrent verifies must be rejected as already-verified") + + // Hard guarantee: the credential count for the identity is exactly 1. + count, err := testDB.NewSelect(). + Table("issued_credentials"). + Where("identity_id = ?", reg.AgentID). + Count(context.Background()) + require.NoError(t, err) + assert.Equal(t, 1, count, + "exactly one credential must be persisted for the attested identity — the row lock must serialize concurrent verifies") +} + // TestAttestationVerifyGuardCatchesPartialFailureRetry pins the // defense-in-depth guard added alongside the transaction wrap in #98: // the three writes in VerifyAttestation now run in a single bun.RunInTx, From cc67d526d8996860a8dc1db515d6ff9c62c85896 Mon Sep 17 00:00:00 2001 From: Sharath Rajasekar Date: Tue, 5 May 2026 14:45:31 -0700 Subject: [PATCH 5/8] fix: address review-round-3 findings on the attestation tx wrap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit R1. Fix misleading "no-op" claim on GetByIDForUpdate. The doc previously said "outside one [tx], FOR UPDATE on Postgres is a no-op." Technically wrong — Postgres wraps every statement in an implicit transaction, so the SELECT does acquire the lock; it just releases it on the implicit commit when the statement returns. The practical effect (no useful serialization) is the same, but the wording would mislead a reader trying to reason about lock lifetime. Rewrote to describe the real behavior. R2. Enforce that GetByIDForUpdate is called inside a tx. The doc said "MUST be called inside a transaction" but the code didn't check. A future caller that forgets postgres.WithTx would silently downgrade to a per-statement lock with no useful serialization — exactly the failure mode the lock is meant to prevent, made invisible. Added a `hasTx(ctx)` helper alongside dbOrTx and a one-line guard: "GetByIDForUpdate must be called inside a postgres.WithTx context." Misuse now fails fast instead of producing a race that only shows up under concurrent load. R3. Stop calling require.* from goroutines in the race test. The Go testing contract says t.FailNow / require.* MUST be called from the goroutine running the test, not from workers. The previous race test fired N goroutines that each called the verifyAttestation helper, which routes through doRequest → require.NoError. In practice this often works, but it's officially undefined behaviour: a worker that fails won't actually halt the test, and on some Go versions the failure is silently lost. Refactored the goroutines to do raw http.NewRequest + http.DefaultClient.Do, write (status, err) to a per-slot result struct, and have the main test goroutine assert post-Wait. Same coverage, no testing-API misuse. `go test -race` confirms no data races; conflict goroutines record durations of ~23-27ms (vs ~13ms for the lock holder), independent evidence the row lock is actually serializing the requests. ## Test plan - [x] go vet ./... clean - [x] go test -race targeted suite — clean, no race warnings - [x] Full integration suite green ~10s - [x] Race test still observes 1 OK / 7 conflict / 1 credential --- internal/store/postgres/attestation.go | 14 ++++-- internal/store/postgres/tx.go | 9 ++++ tests/integration/attestation_oidc_test.go | 57 ++++++++++++++++------ 3 files changed, 61 insertions(+), 19 deletions(-) diff --git a/internal/store/postgres/attestation.go b/internal/store/postgres/attestation.go index a02755d..30b7d10 100644 --- a/internal/store/postgres/attestation.go +++ b/internal/store/postgres/attestation.go @@ -47,9 +47,14 @@ func (r *AttestationRepository) GetByID(ctx context.Context, id, accountID, proj // GetByIDForUpdate retrieves an attestation record by its UUID and acquires // a row-level write lock (Postgres SELECT ... FOR UPDATE). The lock is held // until the surrounding transaction commits or rolls back, so this method -// MUST be called inside a transaction (postgres.WithTx) — outside one, -// FOR UPDATE on Postgres is a no-op (no tx, no lock to release into) and -// the call still succeeds, which would mask the missing serialization. +// MUST be called inside a transaction (postgres.WithTx). +// +// Outside an explicit transaction Postgres still executes the SELECT +// successfully, but the implicit per-statement transaction commits as +// soon as the statement returns and the lock is released — concurrent +// callers see no useful serialization. We fail fast here rather than +// downgrade silently: misuse should surface as a loud error, not a +// race that only manifests under load. // // Use this in flows where the same attestation must serialize against // concurrent writers — most notably AttestationService.VerifyAttestation, @@ -57,6 +62,9 @@ func (r *AttestationRepository) GetByID(ctx context.Context, id, accountID, proj // each pass the IsVerified guard, each issue a credential, and leave the // DB with two credentials from one proof. func (r *AttestationRepository) GetByIDForUpdate(ctx context.Context, id, accountID, projectID string) (*domain.AttestationRecord, error) { + if !hasTx(ctx) { + return nil, fmt.Errorf("GetByIDForUpdate must be called inside a postgres.WithTx context — the row lock is meaningless without one") + } record := &domain.AttestationRecord{} db := dbOrTx(ctx, r.db) err := db.NewSelect().Model(record). diff --git a/internal/store/postgres/tx.go b/internal/store/postgres/tx.go index 416ccae..152800c 100644 --- a/internal/store/postgres/tx.go +++ b/internal/store/postgres/tx.go @@ -46,3 +46,12 @@ func dbOrTx(ctx context.Context, fallback bun.IDB) bun.IDB { } return fallback } + +// hasTx reports whether ctx carries an in-flight transaction attached +// with WithTx. Repo methods that ONLY make sense inside a transaction +// (e.g., SELECT ... FOR UPDATE callers) use this to fail fast on misuse +// instead of silently downgrading to a per-statement implicit tx. +func hasTx(ctx context.Context) bool { + _, ok := ctx.Value(txKey{}).(bun.Tx) + return ok +} diff --git a/tests/integration/attestation_oidc_test.go b/tests/integration/attestation_oidc_test.go index 0508b83..52b5e41 100644 --- a/tests/integration/attestation_oidc_test.go +++ b/tests/integration/attestation_oidc_test.go @@ -337,6 +337,13 @@ func TestAttestationDoubleVerifyIsRejected(t *testing.T) { // one proof. The lock serializes the second verify behind the first; // when it acquires the lock the record already has CredentialID set and // it bails out via ErrAttestationAlreadyVerified. +// +// The goroutines below intentionally use raw http.NewRequest / +// http.DefaultClient.Do rather than the verifyAttestation helper. That +// helper's call chain ends in require.NoError, which calls t.FailNow — +// per the testing package contract FailNow MUST be called from the +// goroutine running the test, not workers. Doing the HTTP call inline +// keeps every assertion on the main goroutine after wg.Wait(). func TestAttestationConcurrentVerifyMintsExactlyOneCredential(t *testing.T) { iss := newOIDCIssuer(t) defer iss.close() @@ -349,36 +356,54 @@ func TestAttestationConcurrentVerifyMintsExactlyOneCredential(t *testing.T) { token := iss.sign(map[string]any{"sub": "ci-job-race"}) id := submitAttestation(t, reg.AgentID, "oidc_token", token) + body, err := json.Marshal(map[string]any{"attestation_id": id}) + require.NoError(t, err) + url := testServer.URL + adminPath("/attestation/verify") + headers := adminHeaders() + const N = 8 - var ( - wg sync.WaitGroup - mu sync.Mutex - statusCodes []int - ) + results := make([]struct { + status int + err error + }, N) + + var wg sync.WaitGroup wg.Add(N) - for range N { - go func() { + for i := range N { + go func(slot int) { defer wg.Done() - resp := verifyAttestation(t, id) - defer func() { _ = resp.Body.Close() }() - mu.Lock() - statusCodes = append(statusCodes, resp.StatusCode) - mu.Unlock() - }() + req, reqErr := http.NewRequest(http.MethodPost, url, bytes.NewReader(body)) + if reqErr != nil { + results[slot].err = reqErr + return + } + req.Header.Set("Content-Type", "application/json") + for k, v := range headers { + req.Header.Set(k, v) + } + resp, doErr := http.DefaultClient.Do(req) + if doErr != nil { + results[slot].err = doErr + return + } + results[slot].status = resp.StatusCode + _ = resp.Body.Close() + }(i) } wg.Wait() // Exactly one verify wins, the rest see ErrAttestationAlreadyVerified // once they acquire the row lock and re-check the guard. var ok, conflict int - for _, c := range statusCodes { - switch c { + for slot, r := range results { + require.NoErrorf(t, r.err, "goroutine %d failed transport-level: %v", slot, r.err) + switch r.status { case http.StatusOK: ok++ case http.StatusConflict: conflict++ default: - t.Errorf("unexpected status %d (want 200 or 409)", c) + t.Errorf("goroutine %d: unexpected status %d (want 200 or 409)", slot, r.status) } } assert.Equal(t, 1, ok, "exactly one concurrent verify must succeed") From b1935d4bd64589b2c1183cf2c4b7738c7f9af986 Mon Sep 17 00:00:00 2001 From: Sharath Rajasekar Date: Tue, 5 May 2026 14:48:49 -0700 Subject: [PATCH 6/8] fix: address review-round-3 W/N findings on the attestation tx wrap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three small follow-ups from the latest self-review pass: W1. Document lock ordering. The tx now takes the attestation_records row lock (FOR UPDATE) first, then implicitly takes the identities row lock when UpdateIdentity runs. A future code path that does the inverse — identity lock first, then attestation lock — could deadlock with this one. Postgres detects deadlocks (40P01) and aborts one tx, so it's not a correctness bug, just a transient retry concern. Added a comment on the RunInTx site documenting the ordering so a future contributor doesn't introduce the inverse path silently. N1. Rename `updatedRecord` to `verifiedRecord`. The variable holds the post-success attestation record. "Updated" is generic; "verified" is the specific thing that makes this record worth holding onto. One-name swap, no behavior change. N2. Tighten the pre-tx fast-fail comment. The previous wording said the pre-tx guard "shaves work off the obvious-already-done case" — accurate but understated. The real saving on retry storms is avoiding the verifier run, the second DB connection for the tx, and the BEGIN/SELECT FOR UPDATE/ROLLBACK round-trip that the in-tx guard would otherwise require. Reworded. Skipped: W2 (signing inside the lock) — a real refactor of CredentialService into "build+sign" + "persist" phases for ~10ms of lock contention savings per attestation verify; not worth the surface change. W3 (sanity-check that conflict goroutines actually waited) — timing-based assertion would be flaky on heterogeneous CI hardware. ## Test plan - [x] go vet ./... clean - [x] Full integration suite green ~10s, no regressions --- internal/service/attestation.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/internal/service/attestation.go b/internal/service/attestation.go index c7c1d2b..cfbe8c4 100644 --- a/internal/service/attestation.go +++ b/internal/service/attestation.go @@ -156,10 +156,12 @@ func (s *AttestationService) VerifyAttestation(ctx context.Context, id, accountI if err != nil { return nil, err } - // Pre-tx fast-fail. Cheap rejection before we open a connection, - // fetch JWKS, run the verifier, etc. The authoritative check happens - // inside the tx with the row locked; this one just shaves work off - // the obvious-already-done case. + // Pre-tx fast-fail. The authoritative check happens inside the tx + // with the row locked; this one rejects the obvious already-done + // case before we run the verifier, open another DB connection for + // the tx, etc. Real saving on retry storms — failing here is one + // SELECT, failing inside the tx is BEGIN + SELECT FOR UPDATE + + // ROLLBACK plus everything between this point and the lock. if record.IsVerified || record.CredentialID != "" { return nil, fmt.Errorf("%w: record %s", ErrAttestationAlreadyVerified, record.ID) } @@ -224,10 +226,17 @@ func (s *AttestationService) VerifyAttestation(ctx context.Context, id, accountI // session. Downstream flows can still token-exchange / jwt-bearer // against this credential; the bootstrap shape just doesn't change. var ( - accessToken *domain.AccessToken - cred *domain.IssuedCredential - updatedRecord *domain.AttestationRecord + accessToken *domain.AccessToken + cred *domain.IssuedCredential + verifiedRecord *domain.AttestationRecord ) + // Lock-order note: this tx takes the attestation_records row lock + // (SELECT FOR UPDATE) FIRST, then implicitly the identities row lock + // (via UpdateIdentity's UPDATE). Any future code path that needs to + // hold both locks should acquire them in the same order to avoid + // deadlocks. Postgres detects deadlocks (40P01) and aborts one tx, + // so the worst case is a transient retry, not silent corruption — + // but cleaner is to keep the order consistent. txErr := s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { ctx = postgres.WithTx(ctx, tx) @@ -282,7 +291,7 @@ func (s *AttestationService) VerifyAttestation(ctx context.Context, id, accountI // partial result. accessToken = issued cred = issuedCred - updatedRecord = locked + verifiedRecord = locked return nil }) if txErr != nil { @@ -290,7 +299,7 @@ func (s *AttestationService) VerifyAttestation(ctx context.Context, id, accountI } return &VerifyAttestationResult{ - Record: updatedRecord, + Record: verifiedRecord, AccessToken: accessToken, Credential: cred, }, nil From 02ccc2da6bec81d59d02d90673102c31dc6c1f8d Mon Sep 17 00:00:00 2001 From: Sharath Rajasekar Date: Tue, 5 May 2026 14:55:44 -0700 Subject: [PATCH 7/8] fix: address review-round-4 R/W/N findings on the attestation tx wrap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit R1. Add regression test for the WithTx contract on GetByIDForUpdate. The hasTx guard added in cc67d52 protects against a future caller forgetting postgres.WithTx around GetByIDForUpdate — silent downgrade to a per-statement implicit tx would acquire the row lock and release it immediately, providing no serialization. Without a test, the guard itself could be removed by a future contributor thinking it's redundant, and the regression would only surface under concurrent load. New TestGetByIDForUpdateRequiresTx in postgres_tx_test.go calls GetByIDForUpdate with context.Background() and asserts the contract error fires. W1. Drop the misleading "failed to lock attestation record" wrap. When GetByIDForUpdate's hasTx guard fires, the previous wrap chain read "failed to lock attestation record: GetByIDForUpdate must be called inside a postgres.WithTx context — ..." Misleading: an operator chasing the message would assume a runtime DB problem when the cause is actually a programming mistake. Removed the outer wrap; the repo method's error already names what failed (whether runtime or contract violation), and the inline closure comment explains why. W2. Trim the docstring redundancy. The function-level docstring previously described the atomicity + serialization invariant in detail, then the inline comments inside the closure repeated the same content at finer grain. Trimmed the docstring to a one-paragraph summary that points at the inline commentary; the inline comments remain authoritative. N1. Share the txKey type assertion via txFromContext helper. dbOrTx and hasTx both did the same `ctx.Value(txKey{}).(bun.Tx)` assertion. Extracted into a single txFromContext returning (tx, ok) so the key + type live in exactly one place. N2. Move the lock-order comment inside the closure. The "Lock-order note" comment previously sat above RunInTx but described what happens INSIDE the closure. Moved next to the GetByIDForUpdate call where the read-the-comment-then-look-at-the- code distance is zero. ## Test plan - [x] go vet ./... clean - [x] go test ./... full suite green ~10s - [x] TestGetByIDForUpdateRequiresTx pins the contract --- internal/service/attestation.go | 41 +++++++++++++-------------- internal/store/postgres/tx.go | 13 +++++++-- tests/integration/postgres_tx_test.go | 16 +++++++++++ 3 files changed, 47 insertions(+), 23 deletions(-) diff --git a/internal/service/attestation.go b/internal/service/attestation.go index cfbe8c4..fee7730 100644 --- a/internal/service/attestation.go +++ b/internal/service/attestation.go @@ -138,19 +138,12 @@ var ErrAttestationAlreadyVerified = errors.New("attestation already verified") // - Verifier.Verify returns an error → ErrAttestationRejected. // - Record already verified → ErrAttestationAlreadyVerified (rejects retries). // -// Atomicity + serialization: the three writes (issue credential → promote -// trust → mark record verified) run inside a single bun.RunInTx, and the -// transaction starts by re-fetching the attestation record with -// SELECT ... FOR UPDATE so two concurrent /verify calls on the same record -// serialize. Without the lock both callers would otherwise pass the -// pre-tx guard, both enter the closure, and both INSERT a credential — -// leaving two credentials minted from one proof. Repos that touch the DB -// (CredentialRepository.Create, IdentityRepository.Update, -// AttestationRepository.Update / .GetByIDForUpdate) read the in-flight tx -// from ctx via postgres.WithTx so every statement participates. Any -// failure rolls the lot back, so retries always see a clean slate. The -// CredentialID-based guard at the top stays as defense-in-depth in case -// a future code path manipulates the record outside this flow. +// Atomicity + serialization: the three side-effecting writes run inside a +// single bun.RunInTx whose first statement is SELECT ... FOR UPDATE on the +// attestation row, so concurrent /verify calls on the same record +// serialize. See the inline comments at the RunInTx site for the full +// commentary on lock order, the in-tx guard, and why the closure-scoped +// locals are assigned to the outer return values exactly once on success. func (s *AttestationService) VerifyAttestation(ctx context.Context, id, accountID, projectID string) (*VerifyAttestationResult, error) { record, err := s.repo.GetByID(ctx, id, accountID, projectID) if err != nil { @@ -230,22 +223,28 @@ func (s *AttestationService) VerifyAttestation(ctx context.Context, id, accountI cred *domain.IssuedCredential verifiedRecord *domain.AttestationRecord ) - // Lock-order note: this tx takes the attestation_records row lock - // (SELECT FOR UPDATE) FIRST, then implicitly the identities row lock - // (via UpdateIdentity's UPDATE). Any future code path that needs to - // hold both locks should acquire them in the same order to avoid - // deadlocks. Postgres detects deadlocks (40P01) and aborts one tx, - // so the worst case is a transient retry, not silent corruption — - // but cleaner is to keep the order consistent. txErr := s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { ctx = postgres.WithTx(ctx, tx) // Re-fetch with row lock. Concurrent verifies on the same record // queue here; the second one re-reads the row after the first // commits, sees CredentialID set, and bails out below. + // + // Lock-order note: this acquires the attestation_records row + // lock FIRST, then implicitly the identities row lock (via + // UpdateIdentity's UPDATE below). Any future code path that + // needs to hold both locks should acquire them in the same order + // to avoid deadlocks. Postgres detects deadlocks (40P01) and + // aborts one tx, so the worst case is a transient retry, not + // silent corruption — but cleaner to keep the order consistent. + // + // The repo method's error already names what failed; no outer + // wrap so the operator-visible message stays accurate even when + // the inner error is the WithTx contract violation rather than + // a runtime DB problem. locked, err := s.repo.GetByIDForUpdate(ctx, id, accountID, projectID) if err != nil { - return fmt.Errorf("failed to lock attestation record: %w", err) + return err } if locked.IsVerified || locked.CredentialID != "" { return fmt.Errorf("%w: record %s", ErrAttestationAlreadyVerified, locked.ID) diff --git a/internal/store/postgres/tx.go b/internal/store/postgres/tx.go index 152800c..644dfba 100644 --- a/internal/store/postgres/tx.go +++ b/internal/store/postgres/tx.go @@ -36,12 +36,21 @@ func WithTx(ctx context.Context, tx bun.Tx) context.Context { return context.WithValue(ctx, txKey{}, tx) } +// txFromContext extracts the in-flight transaction attached by WithTx. +// Returns the tx and true when one is set; the zero bun.Tx and false +// otherwise. Used as the shared core of dbOrTx and hasTx so the +// type-assertion key and shape live in exactly one place. +func txFromContext(ctx context.Context) (bun.Tx, bool) { + tx, ok := ctx.Value(txKey{}).(bun.Tx) + return tx, ok +} + // dbOrTx returns the in-flight transaction from ctx if one is set, falling // back to the repo's default DB handle. Repo methods that participate in // transactions call this once at the top and use the returned bun.IDB for // every statement in the method. func dbOrTx(ctx context.Context, fallback bun.IDB) bun.IDB { - if tx, ok := ctx.Value(txKey{}).(bun.Tx); ok { + if tx, ok := txFromContext(ctx); ok { return tx } return fallback @@ -52,6 +61,6 @@ func dbOrTx(ctx context.Context, fallback bun.IDB) bun.IDB { // (e.g., SELECT ... FOR UPDATE callers) use this to fail fast on misuse // instead of silently downgrading to a per-statement implicit tx. func hasTx(ctx context.Context) bool { - _, ok := ctx.Value(txKey{}).(bun.Tx) + _, ok := txFromContext(ctx) return ok } diff --git a/tests/integration/postgres_tx_test.go b/tests/integration/postgres_tx_test.go index d9c7977..15a262e 100644 --- a/tests/integration/postgres_tx_test.go +++ b/tests/integration/postgres_tx_test.go @@ -102,6 +102,22 @@ func TestPostgresWithTxRollbackPersistsNothing(t *testing.T) { assert.Zero(t, credCount, "credential row must not exist after rollback") } +// TestGetByIDForUpdateRequiresTx pins the contract that GetByIDForUpdate +// fails fast when called without a postgres.WithTx context. Without this +// guard, a future caller that forgets to open a transaction would +// silently downgrade to a per-statement implicit tx — the SELECT FOR +// UPDATE acquires the lock and immediately releases it on the implicit +// commit, providing no useful serialization. The bug only manifests +// under concurrent load. Loud failure here = caught at code-review time +// instead of in production. +func TestGetByIDForUpdateRequiresTx(t *testing.T) { + repo := postgres.NewAttestationRepository(testDB) + _, err := repo.GetByIDForUpdate(context.Background(), uuid.NewString(), testAccountID, testProjectID) + require.Error(t, err) + assert.Contains(t, err.Error(), "must be called inside", + "the contract violation message must name the missing WithTx so a future debugger sees the cause") +} + // TestPostgresWithoutTxFallsBackToAutoCommit pins the other half of the // dbOrTx contract: when no tx is attached to ctx, repo writes use the // repo's default *bun.DB handle and auto-commit per statement, exactly From 733b575614bd6425b1b1766419d2a7d93123e474 Mon Sep 17 00:00:00 2001 From: Sharath Rajasekar Date: Tue, 5 May 2026 15:25:36 -0700 Subject: [PATCH 8/8] fix: propagate pre-delete UPDATE error in IdentityRepository.Delete MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The pre-DELETE UPDATE that stamps modified_by previously discarded its error via `_, _ = db.NewUpdate()...Exec(ctx)`. Outside a transaction this was harmless: the subsequent DELETE either succeeded (best case) or surfaced its own error (e.g., row not found). Inside a transaction this was a real bug. Postgres aborts a transaction the moment any statement fails, so a failed pre-stamp would mean the DELETE that follows fails with the generic "current transaction is aborted, commands ignored until end of transaction block" — losing the original cause and making the failure mode hard to diagnose. The Delete repo method was changed to participate in WithTx alongside Create / Update in commit aa07997. Now that callers can route Delete through a tx via dbOrTx, the swallow pattern is no longer safe. Fix is the obvious one: propagate the UPDATE error. Behavior change outside a tx is "loud failure on a benign UPDATE problem" instead of "silent audit gap" — strictly safer, and aligns the audit trail with the same atomicity guarantees the rest of the repo provides under tx. Flagged by Gemini on PR #119. --- internal/store/postgres/identity.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/internal/store/postgres/identity.go b/internal/store/postgres/identity.go index 7218039..0ef8509 100644 --- a/internal/store/postgres/identity.go +++ b/internal/store/postgres/identity.go @@ -153,15 +153,24 @@ func (r *IdentityRepository) Update(ctx context.Context, identity *domain.Identi } // Delete removes an identity. +// +// The pre-DELETE UPDATE stamps modified_by so the AFTER DELETE trigger can +// read the actor from OLD.modified_by. Its error is propagated rather than +// swallowed: in Postgres, a failed statement inside a transaction aborts +// the whole tx, so the subsequent DELETE would fail with a generic +// "current transaction is aborted" message that loses the original cause. +// Outside a tx the same propagation just makes a benign-looking failure +// loud — that's still preferable to silently triggering audit gaps. func (r *IdentityRepository) Delete(ctx context.Context, id, accountID, projectID string) error { db := dbOrTx(ctx, r.db) - // Pre-stamp modified_by so the AFTER DELETE trigger can read the actor from OLD.modified_by. if callerID := middleware.GetCallerName(ctx); callerID != "" { - _, _ = db.NewUpdate(). + if _, err := db.NewUpdate(). TableExpr("identities"). Set("modified_by = ?", callerID). Where("id = ? AND account_id = ? AND project_id = ?", id, accountID, projectID). - Exec(ctx) + Exec(ctx); err != nil { + return fmt.Errorf("failed to stamp modified_by before delete: %w", err) + } } _, err := db.NewDelete(). TableExpr("identities").