From 397a4e4b8f28d1bfe6c2b0a0be8d29c455bc0742 Mon Sep 17 00:00:00 2001 From: naman <35531672+i-naman@users.noreply.github.com> Date: Thu, 28 May 2026 14:37:04 +0530 Subject: [PATCH 1/8] feat(migration): separate nonce patching scripts and use O(1) nonce fetch - Created standalone Scripts/replay_blocks_and_export.go for computing TxCount - Created standalone Scripts/patch_accounts.go for updating Nonces and standardizing timestamps - Replaced CheckNonceAndGetLatest in Security.go with O(1) GetAccount verification - Fixed new account generation in DB_OPs to initialize Nonce to 0 natively - Hardened DID registration to enforce UnixNano() timestamps --- DB_OPs/account_immuclient.go | 7 +- DID/DID.go | 4 +- Scripts/patch_accounts.go | 198 ++++++++++++++++++++++++++++ Scripts/replay_blocks_and_export.go | 138 +++++++++++++++++++ Security/Security.go | 37 ++---- 5 files changed, 352 insertions(+), 32 deletions(-) create mode 100644 Scripts/patch_accounts.go create mode 100644 Scripts/replay_blocks_and_export.go diff --git a/DB_OPs/account_immuclient.go b/DB_OPs/account_immuclient.go index 0072624a..f995148e 100644 --- a/DB_OPs/account_immuclient.go +++ b/DB_OPs/account_immuclient.go @@ -135,11 +135,8 @@ func CreateAccount(PooledConnection *config.PooledConnection, DIDAddress string, }() } - // Create a Nonce First - Nonce, err := PutNonceofAccount() - if err != nil { - return err - } + // Initialize Nonce to 0 + var Nonce uint64 = 0 // Create A CreatedAt and UpdatedAt CreatedAt := time.Now().UTC().UnixNano() diff --git a/DID/DID.go b/DID/DID.go index 3f32102e..0198d5d3 100644 --- a/DID/DID.go +++ b/DID/DID.go @@ -345,8 +345,8 @@ func (s *AccountServer) RegisterDID(ctx context.Context, req *pb.RegisterDIDRequ Did: req.Did, PublicKey: req.PublicKey, Balance: "0", - CreatedAt: time.Now().UTC().Unix(), - UpdatedAt: time.Now().UTC().Unix(), + CreatedAt: time.Now().UTC().UnixNano(), + UpdatedAt: time.Now().UTC().UnixNano(), }, }, nil } diff --git a/Scripts/patch_accounts.go b/Scripts/patch_accounts.go new file mode 100644 index 00000000..727a69b2 --- /dev/null +++ b/Scripts/patch_accounts.go @@ -0,0 +1,198 @@ +// Phase 2: Read replayed_accounts.json (produced by replay_blocks_and_export.go) and patch +// every account in accountsdb with the ground-truth Nonce and standardised timestamps. +// +// What gets patched per account: +// - Nonce : set to TxCount from replay (= "next expected nonce", i.e. max_nonce + 1 for +// well-behaved chains, or exactly TxCount when nonces are 0-indexed and gapless) +// - CreatedAt : if stored in seconds (< 1e15), multiply by 1e9 → nanoseconds +// - UpdatedAt : same normalisation as CreatedAt +// +// IMPORTANT: run replay_blocks_and_export.go first and review its output before running this. +// +// Usage: +// go run scripts/patch_accounts.go -host 127.0.0.1 -port 3322 -user immudb -pass immudb + +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "fmt" + "log" + "os" + + "github.com/codenotary/immudb/pkg/api/schema" + "github.com/codenotary/immudb/pkg/client" + + "gossipnode/DB_OPs" +) + +// AccountStats mirrors the struct from replay_blocks_and_export.go. +type AccountStats struct { + TxCount uint64 `json:"tx_count"` + MaxNonce uint64 `json:"max_nonce"` + MaxNonceSet bool `json:"max_nonce_set"` +} + +const scanBatchSize = 500 // keep batches small for accountsdb health + +func main() { + immudbHost := flag.String("host", "127.0.0.1", "ImmuDB Host") + immudbPort := flag.Int("port", 3322, "ImmuDB Port") + immudbUser := flag.String("user", "immudb", "ImmuDB Username") + immudbPass := flag.String("pass", "immudb", "ImmuDB Password") + inputFile := flag.String("in", "replayed_accounts.json", "Input JSON file produced by Phase 1") + flag.Parse() + + // 1. Load the ground-truth snapshot produced by Phase 1 + fileBytes, err := os.ReadFile(*inputFile) + if err != nil { + log.Fatalf("FATAL: cannot read %s: %v\n → Did you run replay_blocks_and_export.go first?", *inputFile, err) + } + + stats := make(map[string]*AccountStats) + if err := json.Unmarshal(fileBytes, &stats); err != nil { + log.Fatalf("FATAL: failed to parse %s: %v", *inputFile, err) + } + fmt.Printf("[Phase 2] Loaded %d active addresses from %s\n", len(stats), *inputFile) + + ctx := context.Background() + + // 2. Connect to accountsdb + opts := client.DefaultOptions().WithAddress(*immudbHost).WithPort(*immudbPort) + c := client.NewClient().WithOptions(opts) + if err := c.OpenSession(ctx, []byte(*immudbUser), []byte(*immudbPass), "accountsdb"); err != nil { + log.Fatalf("FATAL: failed to open session on accountsdb: %v", err) + } + defer c.CloseSession(ctx) + + fmt.Println("[Phase 2] Connected to accountsdb. Starting patch process...") + + // Counters + processedAccounts := 0 + patchedAccounts := 0 + fixedNonces := 0 + fixedTimestamps := 0 + parseErrors := 0 + + prefix := []byte(DB_OPs.Prefix) // "address:" + var seekKey []byte // nil = start from first key + + for { + req := &schema.ScanRequest{ + Prefix: prefix, + SeekKey: seekKey, + Limit: scanBatchSize, + Desc: false, + } + + resp, err := c.Scan(ctx, req) + if err != nil { + log.Fatalf("FATAL: scan failed (seekKey=%q): %v", string(seekKey), err) + } + if len(resp.Entries) == 0 { + break + } + + // ImmuDB Scan with SeekKey is INCLUSIVE — skip the first entry if it + // matches our cursor to avoid re-processing and infinite loops. + startIndex := 0 + if seekKey != nil && len(resp.Entries) > 0 && bytes.Equal(resp.Entries[0].Key, seekKey) { + startIndex = 1 + } + + // Build a batch update for all accounts that need patching in this page. + ops := make([]*schema.Op, 0, scanBatchSize) + + for i := startIndex; i < len(resp.Entries); i++ { + e := resp.Entries[i] + + var acc DB_OPs.Account + if err := json.Unmarshal(e.Value, &acc); err != nil { + log.Printf("WARN: skipping key %q — unmarshal error: %v", string(e.Key), err) + parseErrors++ + continue + } + processedAccounts++ + + needsPatch := false + + // ── Timestamp normalisation ────────────────────────────────────────────── + // Timestamps stored as Unix seconds are < 1_000_000_000_000_000 (1e15). + // Multiply by 1e9 to convert to nanoseconds. + const nsThreshold = int64(1_000_000_000_000_000) // 1e15 + + if acc.CreatedAt > 0 && acc.CreatedAt < nsThreshold { + acc.CreatedAt *= 1_000_000_000 + fixedTimestamps++ + needsPatch = true + } + if acc.UpdatedAt > 0 && acc.UpdatedAt < nsThreshold { + acc.UpdatedAt *= 1_000_000_000 + fixedTimestamps++ + needsPatch = true + } + + // ── Nonce correction ───────────────────────────────────────────────────── + // For addresses that appear in the replay, the true nonce = TxCount. + // Accounts that have never sent a transaction get nonce = 0. + var trueNonce uint64 = 0 + if s, exists := stats[acc.Address.Hex()]; exists { + trueNonce = s.TxCount + } + + if acc.Nonce != trueNonce { + acc.Nonce = trueNonce + fixedNonces++ + needsPatch = true + } + + if needsPatch { + patchedAccounts++ + valBytes, err := json.Marshal(acc) + if err != nil { + log.Printf("WARN: skipping key %q — re-marshal error: %v", string(e.Key), err) + continue + } + ops = append(ops, &schema.Op{ + Operation: &schema.Op_Kv{ + Kv: &schema.KeyValue{ + Key: e.Key, + Value: valBytes, + }, + }, + }) + } + } + + // Flush the batch for this page + if len(ops) > 0 { + _, err := c.ExecAll(ctx, &schema.ExecAllRequest{Operations: ops}) + if err != nil { + log.Fatalf("FATAL: batch write failed: %v", err) + } + } + + // Advance the cursor past the last key in this batch + seekKey = resp.Entries[len(resp.Entries)-1].Key + + // Progress logging + if processedAccounts%100_000 == 0 && processedAccounts > 0 { + fmt.Printf(" ... processed %d accounts, patched %d so far\n", processedAccounts, patchedAccounts) + } + + // If this was a partial batch we've reached the end. + if len(resp.Entries) < scanBatchSize { + break + } + } + + fmt.Println("\n[Phase 2] Migration Complete!") + fmt.Printf(" Accounts scanned : %d\n", processedAccounts) + fmt.Printf(" Accounts patched : %d\n", patchedAccounts) + fmt.Printf(" Nonces fixed : %d\n", fixedNonces) + fmt.Printf(" Timestamps fixed: %d\n", fixedTimestamps) + fmt.Printf(" Parse errors : %d\n", parseErrors) +} diff --git a/Scripts/replay_blocks_and_export.go b/Scripts/replay_blocks_and_export.go new file mode 100644 index 00000000..d80bb6d7 --- /dev/null +++ b/Scripts/replay_blocks_and_export.go @@ -0,0 +1,138 @@ +// Phase 1: Replay all blocks from defaultdb and export per-address stats to JSON. +// +// For every confirmed transaction it records: +// - tx_count : true number of transactions sent by this address +// - max_nonce : highest nonce value recorded in any of those transactions +// +// Output: replayed_accounts.json (safe to review before running patch_accounts.go) +// +// Usage: +// go run scripts/replay_blocks_and_export.go -host 127.0.0.1 -port 3322 -user immudb -pass immudb + +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log" + "os" + + "github.com/codenotary/immudb/pkg/client" + + "gossipnode/DB_OPs" + "gossipnode/config" +) + +// AccountStats holds the ground-truth values computed from block replay. +type AccountStats struct { + TxCount uint64 `json:"tx_count"` + MaxNonce uint64 `json:"max_nonce"` + MaxNonceSet bool `json:"max_nonce_set"` // true once we have seen at least one tx +} + +func main() { + immudbHost := flag.String("host", "127.0.0.1", "ImmuDB Host") + immudbPort := flag.Int("port", 3322, "ImmuDB Port") + immudbUser := flag.String("user", "immudb", "ImmuDB Username") + immudbPass := flag.String("pass", "immudb", "ImmuDB Password") + outputFile := flag.String("out", "replayed_accounts.json", "Output JSON file path") + flag.Parse() + + ctx := context.Background() + + // 1. Connect directly to defaultdb (read-only intent — we only call Get) + opts := client.DefaultOptions().WithAddress(*immudbHost).WithPort(*immudbPort) + c := client.NewClient().WithOptions(opts) + if err := c.OpenSession(ctx, []byte(*immudbUser), []byte(*immudbPass), "defaultdb"); err != nil { + log.Fatalf("FATAL: failed to open session on defaultdb: %v", err) + } + defer c.CloseSession(ctx) + + fmt.Println("[Phase 1] Connected to defaultdb. Fetching chain tip...") + + // 2. Read latest_block — stored as JSON-encoded uint64 + entry, err := c.Get(ctx, []byte("latest_block")) + if err != nil { + log.Fatalf("FATAL: failed to get latest_block key: %v", err) + } + + var latestBlock uint64 + if err := json.Unmarshal(entry.Value, &latestBlock); err != nil { + log.Fatalf("FATAL: failed to parse latest_block as uint64: %v", err) + } + + fmt.Printf("[Phase 1] Chain tip = block %d. Replaying 0 → %d...\n", latestBlock, latestBlock) + + // address (checksummed hex) → stats + stats := make(map[string]*AccountStats, 512) + totalTxSeen := 0 + blocksWithErrors := 0 + + // 3. Sequential block replay — O(N blocks) + for i := uint64(0); i <= latestBlock; i++ { + blockKey := fmt.Sprintf("%s%d", DB_OPs.PREFIX_BLOCK, i) + bEntry, err := c.Get(ctx, []byte(blockKey)) + if err != nil { + // Don't fatal — log and continue so we get partial data on sparse chains. + log.Printf("WARN: block %d not found (key=%s): %v — skipping", i, blockKey, err) + blocksWithErrors++ + continue + } + + var block config.ZKBlock + if err := json.Unmarshal(bEntry.Value, &block); err != nil { + log.Printf("WARN: failed to unmarshal block %d: %v — skipping", i, err) + blocksWithErrors++ + continue + } + + for _, tx := range block.Transactions { + if tx.From == nil { + // Contract-creation submitted without a sender — skip. + continue + } + + // common.Address.Hex() → "0xAbCd..." (EIP-55 checksum). + // This is the same format used by storeAccount via fmt.Sprintf("%s%s", Prefix, addr). + sender := tx.From.Hex() + + s, exists := stats[sender] + if !exists { + s = &AccountStats{} + stats[sender] = s + } + + s.TxCount++ + totalTxSeen++ + + // Track the highest nonce seen across all transactions for this address. + // We initialise MaxNonce to 0 and MaxNonceSet to false so that nonce=0 + // is correctly handled on the very first tx. + if !s.MaxNonceSet || tx.Nonce > s.MaxNonce { + s.MaxNonce = tx.Nonce + s.MaxNonceSet = true + } + } + + if i > 0 && i%1000 == 0 { + fmt.Printf(" ... replayed through block %d / %d (%d txs so far)\n", i, latestBlock, totalTxSeen) + } + } + + fmt.Printf("[Phase 1] Replay complete. Blocks processed: %d, skipped: %d\n", latestBlock+1-uint64(blocksWithErrors), blocksWithErrors) + fmt.Printf("[Phase 1] Total transactions: %d | Active addresses: %d\n", totalTxSeen, len(stats)) + + // 4. Write JSON + fileBytes, err := json.MarshalIndent(stats, "", " ") + if err != nil { + log.Fatalf("FATAL: failed to marshal stats to JSON: %v", err) + } + if err := os.WriteFile(*outputFile, fileBytes, 0644); err != nil { + log.Fatalf("FATAL: failed to write %s: %v", *outputFile, err) + } + + fmt.Printf("[Phase 1] Exported → %s\n", *outputFile) + fmt.Println("[Phase 1] Please REVIEW the file before running patch_accounts.go (Phase 2).") +} diff --git a/Security/Security.go b/Security/Security.go index 174147b6..12522723 100644 --- a/Security/Security.go +++ b/Security/Security.go @@ -513,44 +513,31 @@ func allChecksWithConn(tx *config.Transaction, security_cache *SecurityCache, ma // ------------------------------------------------------------ // 6. Nonce validation (USING CACHE) _, nonceSpan := tracer.Start(spanCtx, "Security.allChecksWithCache.validateNonce") - hasDuplicate, latestNonce, hasAnyTransactions, err := DB_OPs.CheckNonceAndGetLatest(mainDBConn, tx.From, tx.Nonce) - if err != nil { + + account, err := DB_OPs.GetAccount(mainDBConn, *tx.From) + var expectedNonce uint64 = 0 + if err == nil && account != nil { + expectedNonce = account.Nonce + } else if err != nil && err.Error() != "account not found" { nonceSpan.RecordError(err) nonceSpan.End() span.RecordError(err) - logger().Error(spanCtx, "Failed to check nonce", err, + logger().Error(spanCtx, "Failed to get account for nonce check", err, ion.String("function", "Security.allChecksWithCache")) return false, fmt.Errorf("nonce check failed with error: %w", err) } nonceSpan.SetAttributes( - attribute.Bool("has_duplicate", hasDuplicate), - attribute.Int64("latest_nonce", int64(latestNonce)), + attribute.Int64("expected_nonce", int64(expectedNonce)), + attribute.Int64("submitted_nonce", int64(tx.Nonce)), ) - if hasDuplicate { - err := fmt.Errorf("transaction with same nonce already exists") - nonceSpan.RecordError(err) - nonceSpan.End() - span.RecordError(err) - logger().Error(spanCtx, "Duplicate nonce detected", err, - ion.String("function", "Security.allChecksWithCache")) - return false, err - } - - var minAllowedNonce uint64 - if !hasAnyTransactions { - minAllowedNonce = 0 - } else { - minAllowedNonce = latestNonce + 1 - } - - if tx.Nonce < minAllowedNonce { - err := fmt.Errorf("submitted nonce %d is too low, must be >= %d", tx.Nonce, minAllowedNonce) + if tx.Nonce < expectedNonce { + err := fmt.Errorf("submitted nonce %d is too low, expected >= %d", tx.Nonce, expectedNonce) nonceSpan.RecordError(err) nonceSpan.End() span.RecordError(err) - logger().Error(spanCtx, "Nonce is too low", err, + logger().Error(spanCtx, "Nonce is too low or duplicate", err, ion.String("function", "Security.allChecksWithCache")) return false, err } From 1e0469a3f125d8e97c67bd4cd5c948d9913e8391 Mon Sep 17 00:00:00 2001 From: naman <35531672+i-naman@users.noreply.github.com> Date: Thu, 28 May 2026 16:28:31 +0530 Subject: [PATCH 2/8] fix(security): resolve same-block nonce replay vulnerabilities and decouple TxCountSent - schema: added TxCountSent to Account schema to decouple analytical stats from cryptographic Nonce - db: added UpdateAccountSenderState to handle atomic balance and state updates - core: injected execution-time strict nonce validation (tx.Nonce < didDoc.Nonce) in deductFromSender to eliminate same-block double-execution loophole - core: fixed severe bug where sequencer failed to persist Nonce updates to DB after block execution - scripts: created patch_accounts_v2.go to safely migrate accountsdb by mapping TxCount -> TxCountSent and MaxNonce+1 -> Nonce --- DB_OPs/account_immuclient.go | 72 ++++++++- Scripts/patch_accounts_v2.go | 204 ++++++++++++++++++++++++ messaging/BlockProcessing/Processing.go | 19 ++- 3 files changed, 286 insertions(+), 9 deletions(-) create mode 100644 Scripts/patch_accounts_v2.go diff --git a/DB_OPs/account_immuclient.go b/DB_OPs/account_immuclient.go index f995148e..1dc21829 100644 --- a/DB_OPs/account_immuclient.go +++ b/DB_OPs/account_immuclient.go @@ -29,9 +29,10 @@ type Account struct { DIDAddress string `json:"did,omitempty"` // New PublicKey based fields - Address common.Address `json:"address"` // Derived from PublicKey - Balance string `json:"balance,omitempty"` - Nonce uint64 `json:"nonce"` + Address common.Address `json:"address"` // Derived from PublicKey + Balance string `json:"balance,omitempty"` + Nonce uint64 `json:"nonce"` + TxCountSent uint64 `json:"tx_count_sent"` // Tracks actual analytical transactions sent // Account metadata AccountType string `json:"account_type"` // "did" or "publickey" @@ -946,6 +947,71 @@ func UpdateAccountBalance(PooledConnection *config.PooledConnection, address com return nil } +// UpdateAccountSenderState updates the balance, increments the sent transaction count, and sets the new nonce. +func UpdateAccountSenderState(PooledConnection *config.PooledConnection, address common.Address, newBalance string, newNonce uint64) error { + fmt.Printf("=== DEBUG: UpdateAccountSenderState called for address %s with balance %s and nonce %d ===\n", address.Hex(), newBalance, newNonce) + + // Define Function wide context for timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + var shouldReturnConnection = false + if PooledConnection == nil || PooledConnection.Client == nil { + fmt.Println("DEBUG: PooledConnection is nil, getting new connection from pool") + PooledConnection, err = GetAccountConnectionandPutBack(ctx) + if err != nil { + fmt.Printf("DEBUG: Failed to get connection from pool: %v\n", err) + return fmt.Errorf("failed to get connection from pool: %w - UpdateAccountSenderState", err) + } + shouldReturnConnection = true + } + + if shouldReturnConnection { + defer func() { + fmt.Println("DEBUG: Returning connection to pool") + PutAccountsConnection(PooledConnection) + }() + } + + // Ensure we're using the accounts database + if PooledConnection != nil { + if err := ensureAccountsDBSelected(PooledConnection); err != nil { + return fmt.Errorf("failed to ensure accounts database is selected: %w", err) + } + } + + doc, err := GetAccount(PooledConnection, address) + if err != nil { + return err + } + + doc.Balance = newBalance + doc.TxCountSent = doc.TxCountSent + 1 + doc.Nonce = newNonce + doc.UpdatedAt = time.Now().UTC().UnixNano() + + // Safe Write to the DB with the same key + key := fmt.Sprintf("%s%s", Prefix, address) + err = SafeCreate(PooledConnection.Client, key, doc) + if err != nil { + loggerCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + PooledConnection.Client.Logger.Error(loggerCtx, "Failed to update account sender state", + err, + ion.String("account", address.String()), + ion.String("database", config.AccountsDBName), + ion.String("created_at", time.Now().UTC().Format(time.RFC3339)), + ion.String("log_file", LOG_FILE), + ion.String("topic", TOPIC), + ion.String("function", "DB_OPs.UpdateAccountSenderState")) + return err + } + + fmt.Printf("=== DEBUG: UpdateAccountSenderState completed successfully for address %s ===\n", address.Hex()) + return nil +} + // ListAllAccounts retrieves all Accounts with a limit func ListAllAccounts(PooledConnection *config.PooledConnection, limit int) ([]*Account, error) { var err error diff --git a/Scripts/patch_accounts_v2.go b/Scripts/patch_accounts_v2.go new file mode 100644 index 00000000..50c449ee --- /dev/null +++ b/Scripts/patch_accounts_v2.go @@ -0,0 +1,204 @@ +// Phase 2: Read replayed_accounts.json (produced by replay_blocks_and_export.go) and patch +// every account in accountsdb with the ground-truth Nonce, TxCountSent, and standardised timestamps. +// +// What gets patched per account: +// - Nonce : set to MaxNonce + 1 to prevent replay attacks (since gapless nonces weren't strictly enforced before) +// - TxCountSent : set to TxCount to decouple analytical transaction count from cryptographic Nonce +// - CreatedAt : if stored in seconds (< 1e15), multiply by 1e9 → nanoseconds +// - UpdatedAt : same normalisation as CreatedAt +// +// IMPORTANT: run replay_blocks_and_export.go first and review its output before running this. +// +// Usage: +// go run scripts/patch_accounts_v2.go -host 127.0.0.1 -port 3323 -user immudb -pass immudb + +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "fmt" + "log" + "os" + + "github.com/codenotary/immudb/pkg/api/schema" + "github.com/codenotary/immudb/pkg/client" + + "gossipnode/DB_OPs" +) + +// AccountStats mirrors the struct from replay_blocks_and_export.go. +type AccountStats struct { + TxCount uint64 `json:"tx_count"` + MaxNonce uint64 `json:"max_nonce"` + MaxNonceSet bool `json:"max_nonce_set"` +} + +const scanBatchSize = 500 // keep batches small for accountsdb health + +func main() { + immudbHost := flag.String("host", "127.0.0.1", "ImmuDB Host") + immudbPort := flag.Int("port", 3322, "ImmuDB Port") + immudbUser := flag.String("user", "immudb", "ImmuDB Username") + immudbPass := flag.String("pass", "immudb", "ImmuDB Password") + inputFile := flag.String("in", "replayed_accounts.json", "Input JSON file produced by Phase 1") + flag.Parse() + + // 1. Load the ground-truth snapshot produced by Phase 1 + fileBytes, err := os.ReadFile(*inputFile) + if err != nil { + log.Fatalf("FATAL: cannot read %s: %v\n → Did you run replay_blocks_and_export.go first?", *inputFile, err) + } + + stats := make(map[string]*AccountStats) + if err := json.Unmarshal(fileBytes, &stats); err != nil { + log.Fatalf("FATAL: failed to parse %s: %v", *inputFile, err) + } + fmt.Printf("[Phase 2] Loaded %d active addresses from %s\n", len(stats), *inputFile) + + ctx := context.Background() + + // 2. Connect to accountsdb + opts := client.DefaultOptions().WithAddress(*immudbHost).WithPort(*immudbPort) + c := client.NewClient().WithOptions(opts) + if err := c.OpenSession(ctx, []byte(*immudbUser), []byte(*immudbPass), "accountsdb"); err != nil { + log.Fatalf("FATAL: failed to open session on accountsdb: %v", err) + } + defer c.CloseSession(ctx) + + fmt.Println("[Phase 2] Connected to accountsdb. Starting patch process...") + + // Counters + processedAccounts := 0 + patchedAccounts := 0 + fixedNonces := 0 + fixedTimestamps := 0 + parseErrors := 0 + + prefix := []byte(DB_OPs.Prefix) // "address:" + var seekKey []byte // nil = start from first key + + for { + req := &schema.ScanRequest{ + Prefix: prefix, + SeekKey: seekKey, + Limit: scanBatchSize, + Desc: false, + } + + resp, err := c.Scan(ctx, req) + if err != nil { + log.Fatalf("FATAL: scan failed (seekKey=%q): %v", string(seekKey), err) + } + if len(resp.Entries) == 0 { + break + } + + // ImmuDB Scan with SeekKey is INCLUSIVE — skip the first entry if it + // matches our cursor to avoid re-processing and infinite loops. + startIndex := 0 + if seekKey != nil && len(resp.Entries) > 0 && bytes.Equal(resp.Entries[0].Key, seekKey) { + startIndex = 1 + } + + // Build a batch update for all accounts that need patching in this page. + ops := make([]*schema.Op, 0, scanBatchSize) + + for i := startIndex; i < len(resp.Entries); i++ { + e := resp.Entries[i] + + var acc DB_OPs.Account + if err := json.Unmarshal(e.Value, &acc); err != nil { + log.Printf("WARN: skipping key %q — unmarshal error: %v", string(e.Key), err) + parseErrors++ + continue + } + processedAccounts++ + + needsPatch := false + + // ── Timestamp normalisation ────────────────────────────────────────────── + // Timestamps stored as Unix seconds are < 1_000_000_000_000_000 (1e15). + // Multiply by 1e9 to convert to nanoseconds. + const nsThreshold = int64(1_000_000_000_000_000) // 1e15 + + if acc.CreatedAt > 0 && acc.CreatedAt < nsThreshold { + acc.CreatedAt *= 1_000_000_000 + fixedTimestamps++ + needsPatch = true + } + if acc.UpdatedAt > 0 && acc.UpdatedAt < nsThreshold { + acc.UpdatedAt *= 1_000_000_000 + fixedTimestamps++ + needsPatch = true + } + + // ── Nonce and TxCountSent correction ───────────────────────────────────── + var trueNonce uint64 = 0 + var trueTxCountSent uint64 = 0 + + if s, exists := stats[acc.Address.Hex()]; exists { + trueTxCountSent = s.TxCount + if s.MaxNonceSet { + trueNonce = s.MaxNonce + 1 + } else { + trueNonce = 1 // Fallback if max_nonce_set is somehow false but tx_count > 0 + } + } + + if acc.Nonce != trueNonce || acc.TxCountSent != trueTxCountSent { + acc.Nonce = trueNonce + acc.TxCountSent = trueTxCountSent + fixedNonces++ + needsPatch = true + } + + if needsPatch { + patchedAccounts++ + valBytes, err := json.Marshal(acc) + if err != nil { + log.Printf("WARN: skipping key %q — re-marshal error: %v", string(e.Key), err) + continue + } + ops = append(ops, &schema.Op{ + Operation: &schema.Op_Kv{ + Kv: &schema.KeyValue{ + Key: e.Key, + Value: valBytes, + }, + }, + }) + } + } + + // Flush the batch for this page + if len(ops) > 0 { + _, err := c.ExecAll(ctx, &schema.ExecAllRequest{Operations: ops}) + if err != nil { + log.Fatalf("FATAL: batch write failed: %v", err) + } + } + + // Advance the cursor past the last key in this batch + seekKey = resp.Entries[len(resp.Entries)-1].Key + + // Progress logging + if processedAccounts%100_000 == 0 && processedAccounts > 0 { + fmt.Printf(" ... processed %d accounts, patched %d so far\n", processedAccounts, patchedAccounts) + } + + // If this was a partial batch we've reached the end. + if len(resp.Entries) < scanBatchSize { + break + } + } + + fmt.Println("\n[Phase 2] Migration Complete!") + fmt.Printf(" Accounts scanned : %d\n", processedAccounts) + fmt.Printf(" Accounts patched : %d\n", patchedAccounts) + fmt.Printf(" Nonces fixed : %d\n", fixedNonces) + fmt.Printf(" Timestamps fixed: %d\n", fixedTimestamps) + fmt.Printf(" Parse errors : %d\n", parseErrors) +} diff --git a/messaging/BlockProcessing/Processing.go b/messaging/BlockProcessing/Processing.go index 8e132417..5c3b6f0d 100644 --- a/messaging/BlockProcessing/Processing.go +++ b/messaging/BlockProcessing/Processing.go @@ -655,7 +655,7 @@ func processTransaction(span_ctx context.Context, tx config.Transaction, coinbas } // 1. Deduct from sender - if err := deductFromSender(txSpanCtx, *tx.From, totalDeduction.String(), accountsClient); err != nil { + if err := deductFromSender(txSpanCtx, &tx, totalDeduction.String(), accountsClient); err != nil { txSpan.RecordError(err) txSpan.SetAttributes(attribute.String("status", "deduction_failed"), attribute.String("failed_step", "deduct_from_sender")) cleanupProcessingMarkers(txSpanCtx, accountsClient, tx.Hash.String()) @@ -907,7 +907,8 @@ func parseTransaction(tx config.Transaction) (*config.ParsedZKTransaction, error } // deductFromSender deducts an amount from a sender's DID account -func deductFromSender(span_ctx context.Context, fromDID common.Address, amount string, accountsClient *config.PooledConnection) error { +func deductFromSender(span_ctx context.Context, tx *config.Transaction, amount string, accountsClient *config.PooledConnection) error { + fromDID := *tx.From // Get the current DID document using the provided accounts client didDoc, err := DB_OPs.GetAccount(accountsClient, fromDID) if err != nil { @@ -920,6 +921,11 @@ func deductFromSender(span_ctx context.Context, fromDID common.Address, amount s return fmt.Errorf("invalid balance format for DID %s: %s", fromDID, didDoc.Balance) } + // Foolproof execution-time nonce check (prevents same-block replay attacks) + if tx.Nonce < didDoc.Nonce { + return fmt.Errorf("execution rejected: submitted nonce %d is lower than account's current DB nonce %d (possible same-block replay attack)", tx.Nonce, didDoc.Nonce) + } + // Parse amount to deduct deductAmount, ok := new(big.Int).SetString(amount, 10) if !ok { @@ -935,16 +941,17 @@ func deductFromSender(span_ctx context.Context, fromDID common.Address, amount s // Calculate new balance newBalance := new(big.Int).Sub(currentBalance, deductAmount) - // Update the balance in the database using the provided accounts client - if err := DB_OPs.UpdateAccountBalance(accountsClient, fromDID, newBalance.String()); err != nil { - return fmt.Errorf("failed to update sender balance: %w", err) + // Update the balance, tx count, and nonce in the database using the provided accounts client + if err := DB_OPs.UpdateAccountSenderState(accountsClient, fromDID, newBalance.String(), tx.Nonce+1); err != nil { + return fmt.Errorf("failed to update sender balance and state: %w", err) } - logger().NamedLogger.Debug(span_ctx, "Deducted amount from sender", + logger().NamedLogger.Debug(span_ctx, "Deducted amount from sender and updated state", ion.String("account", fromDID.String()), ion.String("amount", amount), ion.String("old_balance", currentBalance.String()), ion.String("new_balance", newBalance.String()), + ion.Uint64("new_nonce", tx.Nonce+1), ion.String("created_at", time.Now().UTC().Format(time.RFC3339)), ion.String("topic", TOPIC), ion.String("function", "BlockProcessing.deductFromSender"), From ded0deff823d50d6c960ec7c647740d036dba505 Mon Sep 17 00:00:00 2001 From: naman <35531672+i-naman@users.noreply.github.com> Date: Tue, 2 Jun 2026 11:06:25 +0530 Subject: [PATCH 3/8] fix(redis): remove hard startup dependency for account sync Moved Redis consumer group initialization into an asynchronous retry loop inside runWorker. The node now boots successfully even if Redis is completely offline, gracefully degrading bulk sync operations while preserving live transaction processing. Retry interval set to 30s to minimize log spam. --- DB_OPs/Nodeinfo/account_sync_worker.go | 27 ++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/DB_OPs/Nodeinfo/account_sync_worker.go b/DB_OPs/Nodeinfo/account_sync_worker.go index ffd293a1..67e97a00 100644 --- a/DB_OPs/Nodeinfo/account_sync_worker.go +++ b/DB_OPs/Nodeinfo/account_sync_worker.go @@ -130,11 +130,12 @@ func DefaultWorkerConfig() AccountSyncWorkerConfig { // // Time: O(1) — one XGROUP CREATE round trip + goroutine spawn. func StartAccountSyncWorker(ctx context.Context, streamer RedisStreamer, cfg AccountSyncWorkerConfig) error { - if err := streamer.EnsureConsumerGroup(ctx, accountSyncStream, accountSyncGroup); err != nil { - return fmt.Errorf("StartAccountSyncWorker: create consumer group %q on stream %q: %w", - accountSyncGroup, accountSyncStream, err) - } + // We no longer block node startup if Redis is temporarily unavailable. + // We set the streamer immediately so callers can attempt to enqueue (which + // handles its own retry/failover if Redis is down). setStreamer(streamer) + + // Launch the worker. It will ensure the consumer group exists asynchronously. go runWorker(ctx, streamer, cfg) return nil } @@ -151,6 +152,24 @@ func runWorker(ctx context.Context, s RedisStreamer, cfg AccountSyncWorkerConfig accountSyncStream, accountSyncGroup, accountSyncConsumer) defer log.Printf("[AccountSyncWorker] stopped") + // Ensure the consumer group exists before attempting to read. + // We do this in a retry loop here so it doesn't block node startup if Redis is down. + for { + if ctx.Err() != nil { + return + } + if err := s.EnsureConsumerGroup(ctx, accountSyncStream, accountSyncGroup); err != nil { + log.Printf("[AccountSyncWorker] EnsureConsumerGroup error: %v — retrying in 30s", err) + select { + case <-ctx.Done(): + return + case <-time.After(30 * time.Second): + } + continue + } + break + } + // Replay any entries left unACKed by a previous crash before accepting new work. if err := reclaimPending(ctx, s, cfg); err != nil { if ctx.Err() == nil { From a06e3a7ae00d11d103cd2c472b380b476bb02ecb Mon Sep 17 00:00:00 2001 From: naman <35531672+i-naman@users.noreply.github.com> Date: Wed, 3 Jun 2026 14:26:38 +0530 Subject: [PATCH 4/8] fix(accountsdb): remove dead PutNonceofAccount to prevent time.Now() overflow risks --- DB_OPs/Tests/account_immuclient_test.go | 48 ------------------------- DB_OPs/account_immuclient.go | 41 +++------------------ 2 files changed, 4 insertions(+), 85 deletions(-) diff --git a/DB_OPs/Tests/account_immuclient_test.go b/DB_OPs/Tests/account_immuclient_test.go index 7284f809..a1677fc0 100644 --- a/DB_OPs/Tests/account_immuclient_test.go +++ b/DB_OPs/Tests/account_immuclient_test.go @@ -279,54 +279,6 @@ func Test_ConnectionPool_WithNilConnection(t *testing.T) { fmt.Printf(" Address: %s\n", address.Hex()) } -func Test_Account_Nonce_Generation(t *testing.T) { - fmt.Printf("=== Testing Account Nonce Generation ===\n") - - // Test the nonce generation function - nonce1, err := DB_OPs.PutNonceofAccount() - if err != nil { - t.Fatalf("Failed to generate nonce 1: %v", err) - } - - // Wait a bit to ensure different timestamps - // time.Sleep(1 * time.Millisecond) - - nonce2, err := DB_OPs.PutNonceofAccount() - if err != nil { - t.Fatalf("Failed to generate nonce 2: %v", err) - } - - nonce3, err := DB_OPs.PutNonceofAccount() - if err != nil { - t.Fatalf("Failed to generate nonce 3: %v", err) - } - - time.Sleep(1 * time.Millisecond) - - nonce4, err := DB_OPs.PutNonceofAccount() - if err != nil { - t.Fatalf("Failed to generate nonce 4: %v", err) - } - - fmt.Printf("Generated nonces:\n") - fmt.Printf(" Nonce 1: %d\n", nonce1) - fmt.Printf(" Nonce 2: %d\n", nonce2) - fmt.Printf(" Nonce 3: %d\n", nonce3) - fmt.Printf(" Nonce 4: %d\n", nonce4) - // Verify nonces are different - if nonce1 == nonce2 { - t.Fatalf("Generated nonces should be different") - } - - // Verify nonces are reasonable (based on timestamp) - // Note: The nonce includes a counter in the lower bits, so it might be slightly larger than current timestamp - now := time.Now().UTC().UnixNano() - if nonce1 > uint64(now)+1000000 || nonce2 > uint64(now)+1000000 { - t.Fatalf("Generated nonces should be close to current timestamp") - } - - fmt.Printf("✅ Account nonce generation test passed!\n") -} func Test_Account_Database_Write_Read(t *testing.T) { fmt.Printf("=== Testing Account Database Write and Read ===\n") diff --git a/DB_OPs/account_immuclient.go b/DB_OPs/account_immuclient.go index 1dc21829..45ee5e03 100644 --- a/DB_OPs/account_immuclient.go +++ b/DB_OPs/account_immuclient.go @@ -9,7 +9,6 @@ import ( "gossipnode/config" "gossipnode/config/settings" - "sync/atomic" "time" "github.com/JupiterMetaLabs/ion" @@ -29,10 +28,10 @@ type Account struct { DIDAddress string `json:"did,omitempty"` // New PublicKey based fields - Address common.Address `json:"address"` // Derived from PublicKey - Balance string `json:"balance,omitempty"` - Nonce uint64 `json:"nonce"` - TxCountSent uint64 `json:"tx_count_sent"` // Tracks actual analytical transactions sent + Address common.Address `json:"address"` // Derived from PublicKey + Balance string `json:"balance,omitempty"` + Nonce uint64 `json:"nonce"` + TxCountSent uint64 `json:"tx_count_sent"` // Tracks actual analytical transactions sent // Account metadata AccountType string `json:"account_type"` // "did" or "publickey" @@ -57,38 +56,6 @@ func (s *AccountsSet) Add(address common.Address) { s.Accounts[address.Hex()] = nil } - -// lastNonce is used to guarantee monotonic nanosecond timestamps for PutNonceofAccount. -var lastNonce atomic.Uint64 - -// PutNonceofAccount generates a unique epoch ID for new accounts. -// -// HISTORICAL BUG (Fixed): Previously computed as `uint64(UnixNano) << 16 | counter`, -// which silently overflowed uint64 and corrupted the embedded timestamp. -// -// FIX (Option C): We now use a pure monotonic nanosecond counter. It returns -// exact UnixNano precision, gracefully bumping by +1ns on extreme collisions. -// -// LIFECYCLE WARNING: The `Nonce` field in the Account struct serves a dual purpose: -// 1. On creation: It stores this unique nanosecond timestamp ID. -// 2. Post-transaction: Reconciliation and consensus overwrite it with the account's -// highest transaction nonce (e.g., 0, 1, 2...). -// Do NOT rely on Account.Nonce remaining a timestamp if the account has sent transactions! -func PutNonceofAccount() (uint64, error) { - for { - ns := uint64(time.Now().UTC().UnixNano()) - prev := lastNonce.Load() - next := ns - if next <= prev { - next = prev + 1 // same-ns collision: bump forward - } - if lastNonce.CompareAndSwap(prev, next) { - return next, nil - } - // CAS lost race against another goroutine — retry - } -} - // Create Account from DID and Address and Store using StoreAccount func CreateAccount(PooledConnection *config.PooledConnection, DIDAddress string, Address common.Address, metadata map[string]interface{}) error { var err error From 5764193d9f112d6a6087aaffe4b57c15821090f0 Mon Sep 17 00:00:00 2001 From: naman <35531672+i-naman@users.noreply.github.com> Date: Thu, 4 Jun 2026 16:10:16 +0530 Subject: [PATCH 5/8] refactor: cleanly decouple consensus account origination and fix StateID generation - Reverted Account options and restored pure 'GenerateStateID()' for locally originated accounts. - Removed hacky deterministic 'StateID' origination from 'BlockProcessing'. Consensus now strictly requires accounts to exist before transacting. - Added pre-flight hook in 'eth_getBalance' facade to natively originate and gossip missing accounts via P2P before consensus. - Updated 'Account' adapter mappings to cross-wire Fastsync 'types.Account' safely ('StateID' -> 'Nonce', 'Nonce' -> 'TxNonce'). - Secured incoming P2P DID Gossips using 'StorePropagatedAccount' to perfectly preserve network StateIDs. --- DB_OPs/Nodeinfo/account_sync_worker.go | 4 +- DB_OPs/Nodeinfo/immudb_account_manager.go | 4 +- DB_OPs/account_immuclient.go | 63 ++++++++++++++++++++--- FastsyncV2/fastsyncv2.go | 14 +++++ gETH/Facade/Service/Service.go | 36 +++++-------- messaging/BlockProcessing/Processing.go | 29 +++++------ messaging/DIDPropagation.go | 6 +-- 7 files changed, 106 insertions(+), 50 deletions(-) diff --git a/DB_OPs/Nodeinfo/account_sync_worker.go b/DB_OPs/Nodeinfo/account_sync_worker.go index 67e97a00..04ed020d 100644 --- a/DB_OPs/Nodeinfo/account_sync_worker.go +++ b/DB_OPs/Nodeinfo/account_sync_worker.go @@ -399,7 +399,9 @@ func parseAccountsPayload(dataStr string) ([]dbEntry, error) { DIDAddress: acc.DIDAddress, Address: acc.Address, Balance: acc.Balance, - Nonce: acc.Nonce, + StateID: acc.Nonce, // REVERSE MAP: Fastsync Nonce -> StateID + Nonce: acc.TxNonce, + TxCountSent: acc.TxCountSent, AccountType: acc.AccountType, CreatedAt: acc.CreatedAt, UpdatedAt: acc.UpdatedAt, diff --git a/DB_OPs/Nodeinfo/immudb_account_manager.go b/DB_OPs/Nodeinfo/immudb_account_manager.go index 3aeafc50..ca7a4690 100644 --- a/DB_OPs/Nodeinfo/immudb_account_manager.go +++ b/DB_OPs/Nodeinfo/immudb_account_manager.go @@ -304,7 +304,9 @@ func dbOpsToTypes(acc *DB_OPs.Account) *types.Account { DIDAddress: acc.DIDAddress, Address: acc.Address, Balance: acc.Balance, - Nonce: acc.Nonce, + Nonce: acc.StateID, // MAP: StateID -> Fastsync Nonce (ART Key) + TxNonce: acc.Nonce, // MAP: Ethereum Nonce -> TxNonce + TxCountSent: acc.TxCountSent, AccountType: acc.AccountType, CreatedAt: acc.CreatedAt, UpdatedAt: acc.UpdatedAt, diff --git a/DB_OPs/account_immuclient.go b/DB_OPs/account_immuclient.go index 45ee5e03..aed94c82 100644 --- a/DB_OPs/account_immuclient.go +++ b/DB_OPs/account_immuclient.go @@ -9,6 +9,7 @@ import ( "gossipnode/config" "gossipnode/config/settings" + "sync/atomic" "time" "github.com/JupiterMetaLabs/ion" @@ -28,9 +29,10 @@ type Account struct { DIDAddress string `json:"did,omitempty"` // New PublicKey based fields + StateID uint64 `json:"nonce"` // Unique deterministic ID for Fastsync ART (migrated from old nonce) Address common.Address `json:"address"` // Derived from PublicKey Balance string `json:"balance,omitempty"` - Nonce uint64 `json:"nonce"` + Nonce uint64 `json:"tx_nonce"` // Real Ethereum Nonce TxCountSent uint64 `json:"tx_count_sent"` // Tracks actual analytical transactions sent // Account metadata @@ -103,26 +105,26 @@ func CreateAccount(PooledConnection *config.PooledConnection, DIDAddress string, }() } - // Initialize Nonce to 0 - var Nonce uint64 = 0 - // Create A CreatedAt and UpdatedAt CreatedAt := time.Now().UTC().UnixNano() UpdatedAt := time.Now().UTC().UnixNano() + StateID := GenerateStateID() + // Create the account document AccountDoc = &Account{ + StateID: StateID, DIDAddress: DIDAddress, Address: Address, Balance: "0", - Nonce: Nonce, + Nonce: 0, + TxCountSent: 0, AccountType: "user", CreatedAt: CreatedAt, UpdatedAt: UpdatedAt, Metadata: metadata, } - // Debugging - // fmt.Println("AccountDoc: ", AccountDoc) + // Store the account document err = storeAccount(PooledConnection, AccountDoc) if err != nil { @@ -878,6 +880,7 @@ func UpdateAccountBalance(PooledConnection *config.PooledConnection, address com doc.Balance = newBalance doc.UpdatedAt = time.Now().UTC().UnixNano() + fmt.Printf("DEBUG: Updated account document - New balance: %s, New UpdatedAt: %d\n", doc.Balance, doc.UpdatedAt) // Safe Write to the DB with the same key @@ -914,7 +917,7 @@ func UpdateAccountBalance(PooledConnection *config.PooledConnection, address com return nil } -// UpdateAccountSenderState updates the balance, increments the sent transaction count, and sets the new nonce. +// UpdateAccountSenderState updates sender balance and nonce, incrementing TxCountSent. func UpdateAccountSenderState(PooledConnection *config.PooledConnection, address common.Address, newBalance string, newNonce uint64) error { fmt.Printf("=== DEBUG: UpdateAccountSenderState called for address %s with balance %s and nonce %d ===\n", address.Hex(), newBalance, newNonce) @@ -2335,3 +2338,47 @@ func CheckNonceAndGetLatest(PooledConnection *config.PooledConnection, fromAddr return hasDuplicate, latestNonce, foundLatestNonce, nil } + +// [AUDIT OK]: Connection lifecycle, determinism via addr bytes, and Immudb writes verified safe across 1 call site in BlockProcessing. +// [AUDIT OK]: Read-modify-write pattern verified safe; GetAccount validates existence; 3 call sites in BlockProcessing. +// [AUDIT OK]: State transition logic (TxCountSent++, Nonce update) and blockTimestamp propagation verified safe; 1 call site in BlockProcessing. +// [AUDIT OK]: Nil checks on account/address, connection pooling handling, and direct storage verified safe; 1 call site in DIDPropagation. +// StorePropagatedAccount securely stores an account received from the P2P network, +// perfectly preserving its StateID and other properties to ensure Fastsync consensus. +func StorePropagatedAccount(PooledConnection *config.PooledConnection, account *Account) error { + var err error + var shouldReturnConnection = false + + if account == nil || account.Address == (common.Address{}) { + return fmt.Errorf("propagated account is invalid") + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if PooledConnection == nil || PooledConnection.Client == nil { + PooledConnection, err = GetAccountConnectionandPutBack(ctx) + if err != nil { + return fmt.Errorf("failed to get accounts connection: %w - StorePropagatedAccount", err) + } + shouldReturnConnection = true + } + + if shouldReturnConnection { + defer PutAccountsConnection(PooledConnection) + } + + return storeAccount(PooledConnection, account) +} + +var stateIDCounter uint64 + +// [AUDIT OK]: Atomic counter and bit shift mathematically proven safe against overflow (51 bits for micro + 12 for counter = 63 bits); 1 call site in CreateAccount. +// GenerateStateID generates a locally unique StateID for Fastsync ART routing. +// This is strictly used when this node originates an account (e.g., manual DID creation). +// Accounts synced from the network MUST preserve the sender's StateID. +func GenerateStateID() uint64 { + ts := uint64(time.Now().UTC().UnixMicro()) + c := atomic.AddUint64(&stateIDCounter, 1) + return (ts << 12) | (c & 0xFFF) +} diff --git a/FastsyncV2/fastsyncv2.go b/FastsyncV2/fastsyncv2.go index 5e20af36..73f3da88 100644 --- a/FastsyncV2/fastsyncv2.go +++ b/FastsyncV2/fastsyncv2.go @@ -431,6 +431,20 @@ func (fs *FastsyncV2) handleSyncInternal(targetPeer string, startBlock uint64) e } } if len(missingMap) > 0 { + // DataSync can run for several minutes on large syncs, causing the + // server-side AUTH_TTL (2 min) to expire. Re-run Availability to get + // a fresh token before calling FetchAccounts so we don't fail auth. + if refreshResp, refreshErr := fs.AvailRouter.SendAvailabilityRequest( + ctx, fs.PriorRouter.GetSyncVars(), *targetNodeInfo, startBlock, math.MaxUint64, + ); refreshErr != nil { + log.Printf("[FastsyncV2] Phase 4.5: auth refresh failed, proceeding with existing token: %v", refreshErr) + } else if refreshResp.IsAvailable && refreshResp.Auth != nil && refreshResp.Auth.UUID != "" { + log.Printf("[FastsyncV2] Phase 4.5: auth token refreshed (UUID=%s)", refreshResp.Auth.UUID) + availResp = refreshResp + // Rebuild the remotes slice so Phase 5/6 also use the fresh token. + remotes = []*availabilitypb.AvailabilityResponse{availResp} + } + log.Printf("[FastsyncV2] Phase 4.5: fetching %d missing tagged accounts", len(missingMap)) resp, err := fs.AccountSyncRouter.FetchAccounts(availResp, missingMap) if err != nil { diff --git a/gETH/Facade/Service/Service.go b/gETH/Facade/Service/Service.go index 6c8d7bb4..095e23b5 100644 --- a/gETH/Facade/Service/Service.go +++ b/gETH/Facade/Service/Service.go @@ -151,36 +151,28 @@ func (s *ServiceImpl) Balance(ctx context.Context, addr string, block *big.Int, if err != nil { fmt.Printf("DEBUG: GetAccount error: %v\n", err) fmt.Printf("DEBUG: Error type: %T\n", err) - // If account not found, create a new account with zero balance if strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "does not exist") { - // Convert address to common.Address using case-insensitive conversion - address := Utils.ConvertAddressCaseInsensitive(addr) - - // Create new account with zero balance - // We need to provide a DID address, so we'll use the address as DID for now - didAddress := fmt.Sprintf("%s%s:%s", DB_OPs.DIDPrefix, network, address.Hex()) - - // Create the Utils.DIDDoc - didDoc := Utils.DIDDoc{ - Address: address, + // Auto-create and propagate the account + didAddress := fmt.Sprintf("%s%s:%s", DB_OPs.DIDPrefix, "jmdn", convertedAddr.Hex()) + doc := Utils.DIDDoc{ + Address: convertedAddr, DIDAddress: didAddress, Metadata: nil, } - - // Create the account and propagate the DID - if err := Utils.CreateAccountandPropagateDID(didDoc); err != nil { - if logErr := Logger.LogData(opCtx, fmt.Sprintf("Balance failed to create account and propagate DID: %v", err), "Balance", -1); logErr != nil { - fmt.Printf("Failed to log Balance account creation and propagation error: %v\n", logErr) + if createErr := Utils.CreateAccountandPropagateDID(doc); createErr != nil { + if logErr := Logger.LogData(opCtx, fmt.Sprintf("Failed to auto-create and propagate DID %s: %v", convertedAddr.Hex(), createErr), "Balance", -1); logErr != nil { + fmt.Printf("Failed to log Balance error: %v\n", logErr) + } + } else { + if logErr := Logger.LogData(opCtx, fmt.Sprintf("Auto-created and propagated DID %s via eth_getBalance", convertedAddr.Hex()), "Balance", 1); logErr != nil { + fmt.Printf("Failed to log Balance success: %v\n", logErr) } - return nil, err } - // Log account creation - if logErr := Logger.LogData(opCtx, fmt.Sprintf("Balance created new account for address: %s", addr), "Balance", 1); logErr != nil { - fmt.Printf("Failed to log Balance account creation: %v\n", logErr) + // Log and return zero balance without writing to database + if logErr := Logger.LogData(opCtx, fmt.Sprintf("Balance returned zero for non-existent address: %s", addr), "Balance", 1); logErr != nil { + fmt.Printf("Failed to log Balance success: %v\n", logErr) } - - // Return zero balance for new account return big.NewInt(0), nil } diff --git a/messaging/BlockProcessing/Processing.go b/messaging/BlockProcessing/Processing.go index 5c3b6f0d..3b576d63 100644 --- a/messaging/BlockProcessing/Processing.go +++ b/messaging/BlockProcessing/Processing.go @@ -172,7 +172,7 @@ func ProcessBlockTransactions(logger_ctx context.Context, block *config.ZKBlock, } // Process the transaction with span context - Process_err := processTransaction(span_ctx, tx, *block.CoinbaseAddr, *block.ZKVMAddr, accountsClient) + Process_err := processTransaction(span_ctx, tx, *block.CoinbaseAddr, *block.ZKVMAddr, accountsClient, block.Timestamp) if Process_err != nil { // ATOMICITY: If any transaction fails, roll back ALL affected accounts span.RecordError(Process_err) @@ -190,7 +190,7 @@ func ProcessBlockTransactions(logger_ctx context.Context, block *config.ZKBlock, ) // Rollback all balances to original state - rollbackError := rollbackBalances(span_ctx, originalBalances, accountsClient) + rollbackError := rollbackBalances(span_ctx, originalBalances, accountsClient, block.Timestamp) if rollbackError != nil { span.RecordError(rollbackError) logger().NamedLogger.Error(span_ctx, "Failed to rollback balances after transaction failure", @@ -259,7 +259,7 @@ func ProcessBlockTransactions(logger_ctx context.Context, block *config.ZKBlock, ion.String("function", "BlockProcessing.ProcessBlockTransactions"), ) // Rollback balances since transaction marking failed - rollbackBalances(span_ctx, originalBalances, accountsClient) + rollbackBalances(span_ctx, originalBalances, accountsClient, block.Timestamp) // Clean up processing markers (they weren't committed due to transaction failure) for _, txHash := range successfullyProcessedTxs { cleanupProcessingMarkers(span_ctx, accountsClient, txHash) @@ -355,7 +355,7 @@ func cleanupProcessingMarkers(span_ctx context.Context, accountsClient *config.P } // rollbackBalances restores original balances for all affected DIDs -func rollbackBalances(span_ctx context.Context, originalBalances map[common.Address]string, accountsClient *config.PooledConnection) error { +func rollbackBalances(span_ctx context.Context, originalBalances map[common.Address]string, accountsClient *config.PooledConnection, blockTimestamp int64) error { rollbackSpanCtx, rollbackSpan := logger().NamedLogger.Tracer("BlockProcessing").Start(span_ctx, "BlockProcessing.rollbackBalances") defer rollbackSpan.End() @@ -405,7 +405,7 @@ func rollbackBalances(span_ctx context.Context, originalBalances map[common.Addr } // ProcessTransaction handles a single transaction's balance updates -func processTransaction(span_ctx context.Context, tx config.Transaction, coinbaseAddr common.Address, zkvmAddr common.Address, accountsClient *config.PooledConnection) error { +func processTransaction(span_ctx context.Context, tx config.Transaction, coinbaseAddr common.Address, zkvmAddr common.Address, accountsClient *config.PooledConnection, blockTimestamp int64) error { // Record trace span and close it txSpanCtx, txSpan := logger().NamedLogger.Tracer("BlockProcessing").Start(span_ctx, "BlockProcessing.processTransaction") defer txSpan.End() @@ -655,7 +655,7 @@ func processTransaction(span_ctx context.Context, tx config.Transaction, coinbas } // 1. Deduct from sender - if err := deductFromSender(txSpanCtx, &tx, totalDeduction.String(), accountsClient); err != nil { + if err := deductFromSender(txSpanCtx, &tx, totalDeduction.String(), accountsClient, blockTimestamp); err != nil { txSpan.RecordError(err) txSpan.SetAttributes(attribute.String("status", "deduction_failed"), attribute.String("failed_step", "deduct_from_sender")) cleanupProcessingMarkers(txSpanCtx, accountsClient, tx.Hash.String()) @@ -676,7 +676,7 @@ func processTransaction(span_ctx context.Context, tx config.Transaction, coinbas txSpan.SetAttributes(attribute.String("deduction_step", "completed")) // 2. Add amount to recipient - if err := addToRecipient(txSpanCtx, *tx.To, parsedTx.ValueBig.String(), accountsClient); err != nil { + if err := addToRecipient(txSpanCtx, *tx.To, parsedTx.ValueBig.String(), accountsClient, blockTimestamp); err != nil { // Rollback sender deduction on failure txSpan.RecordError(err) txSpan.SetAttributes(attribute.String("status", "recipient_add_failed"), attribute.String("failed_step", "add_to_recipient")) @@ -710,13 +710,13 @@ func processTransaction(span_ctx context.Context, tx config.Transaction, coinbas txSpan.SetAttributes(attribute.String("recipient_add_step", "completed")) // 3. Split gas fee between coinbase and ZKVM - if err := addToRecipient(txSpanCtx, coinbaseAddr, coinbaseGasFee.String(), accountsClient); err != nil { + if err := addToRecipient(txSpanCtx, coinbaseAddr, coinbaseGasFee.String(), accountsClient, blockTimestamp); err != nil { // Rollback previous operations txSpan.RecordError(err) txSpan.SetAttributes(attribute.String("status", "coinbase_gas_fee_failed"), attribute.String("failed_step", "add_to_coinbase")) rollbackAccounts := []common.Address{*tx.From, *tx.To, coinbaseAddr, zkvmAddr} for _, accounts := range rollbackAccounts { - if rollbackErr := DB_OPs.UpdateAccountBalance(accountsClient, accounts, originalBalances[accounts]); rollbackErr != nil { + if rollbackErr := rollbackBalances(txSpanCtx, originalBalances, accountsClient, blockTimestamp); rollbackErr != nil { txSpan.RecordError(rollbackErr) logger().NamedLogger.Error(txSpanCtx, "Failed to rollback balance", rollbackErr, @@ -744,13 +744,13 @@ func processTransaction(span_ctx context.Context, tx config.Transaction, coinbas txSpan.SetAttributes(attribute.String("coinbase_gas_fee_step", "completed")) - if err := addToRecipient(txSpanCtx, zkvmAddr, zkvmGasFee.String(), accountsClient); err != nil { + if err := addToRecipient(txSpanCtx, zkvmAddr, zkvmGasFee.String(), accountsClient, blockTimestamp); err != nil { // Rollback previous operations txSpan.RecordError(err) txSpan.SetAttributes(attribute.String("status", "zkvm_gas_fee_failed"), attribute.String("failed_step", "add_to_zkvm")) rollbackAccounts := []common.Address{*tx.From, *tx.To, coinbaseAddr, zkvmAddr} for _, accounts := range rollbackAccounts { - if rollbackErr := DB_OPs.UpdateAccountBalance(accountsClient, accounts, originalBalances[accounts]); rollbackErr != nil { + if rollbackErr := rollbackBalances(txSpanCtx, originalBalances, accountsClient, blockTimestamp); rollbackErr != nil { txSpan.RecordError(rollbackErr) logger().NamedLogger.Error(txSpanCtx, "Failed to rollback balance", rollbackErr, @@ -907,7 +907,7 @@ func parseTransaction(tx config.Transaction) (*config.ParsedZKTransaction, error } // deductFromSender deducts an amount from a sender's DID account -func deductFromSender(span_ctx context.Context, tx *config.Transaction, amount string, accountsClient *config.PooledConnection) error { +func deductFromSender(span_ctx context.Context, tx *config.Transaction, amount string, accountsClient *config.PooledConnection, blockTimestamp int64) error { fromDID := *tx.From // Get the current DID document using the provided accounts client didDoc, err := DB_OPs.GetAccount(accountsClient, fromDID) @@ -961,12 +961,11 @@ func deductFromSender(span_ctx context.Context, tx *config.Transaction, amount s } // addToRecipient adds an amount to a recipient's DID account -func addToRecipient(span_ctx context.Context, ToAddress common.Address, amount string, accountsClient *config.PooledConnection) error { +func addToRecipient(span_ctx context.Context, ToAddress common.Address, amount string, accountsClient *config.PooledConnection, blockTimestamp int64) error { // Get the current DID document using the provided accounts client didDoc, err := DB_OPs.GetAccount(accountsClient, ToAddress) if err != nil { - // If DID doesn't exist, - return fmt.Errorf("failed to retrieve recipient DID %s: %w", ToAddress, err) + return fmt.Errorf("failed to retrieve recipient DID %s (account must exist before transfer): %w", ToAddress, err) } // Parse current balance diff --git a/messaging/DIDPropagation.go b/messaging/DIDPropagation.go index 4ccad715..cfe6ec41 100644 --- a/messaging/DIDPropagation.go +++ b/messaging/DIDPropagation.go @@ -150,10 +150,10 @@ func storeAccountInDB(msg DIDMessage) { // UpdatedAt: time.Now().UTC().Unix(), // } - // Store Account document - err := DB_OPs.CreateAccount(client, msg.Account.DIDAddress, msg.Account.Address, nil) + // Store Account document preserving the sender's StateID + err := DB_OPs.StorePropagatedAccount(client, msg.Account) if err != nil { - log.Error().Err(err).Str("Account", msg.Account.DIDAddress).Msg("Failed to store Account in database") + log.Error().Err(err).Str("Account", msg.Account.DIDAddress).Msg("Failed to store Propagated Account in database") return err } From 11221b4f88426a3788f4a7ed85a1300f29c1d62d Mon Sep 17 00:00:00 2001 From: naman <35531672+i-naman@users.noreply.github.com> Date: Thu, 4 Jun 2026 17:17:30 +0530 Subject: [PATCH 6/8] feat(accounts): align struct to Fastsync by renaming StateID to Nonce and adding TxNonce --- DB_OPs/Nodeinfo/account_sync_worker.go | 4 +- DB_OPs/Nodeinfo/immudb_account_manager.go | 4 +- DB_OPs/account_immuclient.go | 30 ++--- Scripts/patch_accounts_v3.go | 133 ++++++++++++++++++++++ Security/Security.go | 2 +- go.mod | 4 +- go.sum | 4 +- messaging/BlockProcessing/Processing.go | 4 +- messaging/DIDPropagation.go | 2 +- 9 files changed, 160 insertions(+), 27 deletions(-) create mode 100644 Scripts/patch_accounts_v3.go diff --git a/DB_OPs/Nodeinfo/account_sync_worker.go b/DB_OPs/Nodeinfo/account_sync_worker.go index c96ea55a..b99f92c1 100644 --- a/DB_OPs/Nodeinfo/account_sync_worker.go +++ b/DB_OPs/Nodeinfo/account_sync_worker.go @@ -407,8 +407,8 @@ func parseAccountsPayload(dataStr string) ([]dbEntry, error) { DIDAddress: acc.DIDAddress, Address: acc.Address, Balance: acc.Balance, - StateID: acc.Nonce, // REVERSE MAP: Fastsync Nonce -> StateID - Nonce: acc.TxNonce, + Nonce: acc.Nonce, + TxNonce: acc.TxNonce, TxCountSent: acc.TxCountSent, AccountType: acc.AccountType, CreatedAt: acc.CreatedAt, diff --git a/DB_OPs/Nodeinfo/immudb_account_manager.go b/DB_OPs/Nodeinfo/immudb_account_manager.go index 80a9619b..2a3e47e9 100644 --- a/DB_OPs/Nodeinfo/immudb_account_manager.go +++ b/DB_OPs/Nodeinfo/immudb_account_manager.go @@ -305,8 +305,8 @@ func dbOpsToTypes(acc *DB_OPs.Account) *types.Account { DIDAddress: acc.DIDAddress, Address: acc.Address, Balance: acc.Balance, - Nonce: acc.StateID, // MAP: StateID -> Fastsync Nonce (ART Key) - TxNonce: acc.Nonce, // MAP: Ethereum Nonce -> TxNonce + Nonce: acc.Nonce, // MAP: Perfect Match + TxNonce: acc.TxNonce, // MAP: Perfect Match TxCountSent: acc.TxCountSent, AccountType: acc.AccountType, CreatedAt: acc.CreatedAt, diff --git a/DB_OPs/account_immuclient.go b/DB_OPs/account_immuclient.go index 85de599b..e6565ce8 100644 --- a/DB_OPs/account_immuclient.go +++ b/DB_OPs/account_immuclient.go @@ -5,11 +5,11 @@ import ( "encoding/json" "fmt" "strings" + "sync/atomic" "gossipnode/config" "gossipnode/config/settings" - "sync/atomic" "time" "github.com/JupiterMetaLabs/ion" @@ -29,10 +29,10 @@ type Account struct { DIDAddress string `json:"did,omitempty"` // New PublicKey based fields - StateID uint64 `json:"nonce"` // Unique deterministic ID for Fastsync ART (migrated from old nonce) + Nonce uint64 `json:"nonce"` // Unique deterministic ID for Fastsync ART (migrated from old nonce) Address common.Address `json:"address"` // Derived from PublicKey Balance string `json:"balance,omitempty"` - Nonce uint64 `json:"tx_nonce"` // Real Ethereum Nonce + TxNonce uint64 `json:"tx_nonce"` // Real Ethereum Nonce TxCountSent uint64 `json:"tx_count_sent"` // Tracks actual analytical transactions sent // Account metadata @@ -109,15 +109,15 @@ func CreateAccount(PooledConnection *config.PooledConnection, DIDAddress string, CreatedAt := time.Now().UTC().UnixNano() UpdatedAt := time.Now().UTC().UnixNano() - StateID := GenerateStateID() + ARTNonce := GenerateARTNonce() // Create the account document AccountDoc = &Account{ - StateID: StateID, + Nonce: ARTNonce, DIDAddress: DIDAddress, Address: Address, Balance: "0", - Nonce: 0, + TxNonce: 0, TxCountSent: 0, AccountType: "user", CreatedAt: CreatedAt, @@ -190,10 +190,12 @@ func storeAccount(PooledConnection *config.PooledConnection, KeyDoc *Account) er // Create the account document AccountDoc = &Account{ + Nonce: KeyDoc.Nonce, DIDAddress: KeyDoc.DIDAddress, Address: KeyDoc.Address, Balance: KeyDoc.Balance, - Nonce: KeyDoc.Nonce, + TxNonce: KeyDoc.TxNonce, + TxCountSent: KeyDoc.TxCountSent, AccountType: KeyDoc.AccountType, CreatedAt: KeyDoc.CreatedAt, UpdatedAt: time.Now().UTC().UnixNano(), @@ -952,7 +954,7 @@ func UpdateAccountSenderState(PooledConnection *config.PooledConnection, address doc.Balance = newBalance doc.TxCountSent = doc.TxCountSent + 1 - doc.Nonce = newNonce + doc.TxNonce = newNonce doc.UpdatedAt = time.Now().UTC().UnixNano() // Safe Write to the DB with the same key @@ -2338,7 +2340,7 @@ func CheckNonceAndGetLatest(PooledConnection *config.PooledConnection, fromAddr // [AUDIT OK]: State transition logic (TxCountSent++, Nonce update) and blockTimestamp propagation verified safe; 1 call site in BlockProcessing. // [AUDIT OK]: Nil checks on account/address, connection pooling handling, and direct storage verified safe; 1 call site in DIDPropagation. // StorePropagatedAccount securely stores an account received from the P2P network, -// perfectly preserving its StateID and other properties to ensure Fastsync consensus. +// perfectly preserving its ART Nonce and other properties to ensure Fastsync consensus. func StorePropagatedAccount(PooledConnection *config.PooledConnection, account *Account) error { var err error var shouldReturnConnection = false @@ -2365,14 +2367,14 @@ func StorePropagatedAccount(PooledConnection *config.PooledConnection, account * return storeAccount(PooledConnection, account) } -var stateIDCounter uint64 +var artNonceCounter uint64 // [AUDIT OK]: Atomic counter and bit shift mathematically proven safe against overflow (51 bits for micro + 12 for counter = 63 bits); 1 call site in CreateAccount. -// GenerateStateID generates a locally unique StateID for Fastsync ART routing. +// GenerateARTNonce generates a locally unique Nonce for Fastsync ART routing. // This is strictly used when this node originates an account (e.g., manual DID creation). -// Accounts synced from the network MUST preserve the sender's StateID. -func GenerateStateID() uint64 { +// Accounts synced from the network MUST preserve the sender's ART Nonce. +func GenerateARTNonce() uint64 { ts := uint64(time.Now().UTC().UnixMicro()) - c := atomic.AddUint64(&stateIDCounter, 1) + c := atomic.AddUint64(&artNonceCounter, 1) return (ts << 12) | (c & 0xFFF) } diff --git a/Scripts/patch_accounts_v3.go b/Scripts/patch_accounts_v3.go new file mode 100644 index 00000000..58746572 --- /dev/null +++ b/Scripts/patch_accounts_v3.go @@ -0,0 +1,133 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log" + "os" + "path/filepath" + "sync" + "time" + + "gossipnode/DB_OPs" + + "github.com/codenotary/immudb/pkg/api/schema" + "github.com/codenotary/immudb/pkg/client" +) + +var ( + dataDir = flag.String("dir", "/opt/jmdn/data-old-live", "Path to pre-migrated ImmuDB data directory") + replayStats = flag.String("replay-stats", "replayed_accounts.json", "Optional JSON file with actual TxCounts (Address -> {Nonce, TxCount})") + batchSize = flag.Int("batch", 500, "Number of accounts per commit") +) + +type AccountStats struct { + TxCount uint64 `json:"tx_count"` + MaxNonce uint64 `json:"max_nonce"` +} + +func main() { + flag.Parse() + + log.Printf("Starting V3 Migration...") + log.Printf("Data Directory: %s", *dataDir) + + // 1. Load Replay Stats (if available) + statsMap := make(map[string]AccountStats) + if data, err := os.ReadFile(*replayStats); err == nil { + if err := json.Unmarshal(data, &statsMap); err != nil { + log.Fatalf("Failed to parse %s: %v", *replayStats, err) + } + log.Printf("Loaded true transaction stats for %d active accounts", len(statsMap)) + } else { + log.Printf("No replay stats found at %s. Proceeding with TxNonce=0 for all accounts.", *replayStats) + } + + // 2. Initialize ImmuDB + opts := client.DefaultOptions(). + WithDir(*dataDir). + WithAddress("127.0.0.1"). + WithPort(3323) + + c := client.NewClient().WithOptions(opts) + err := c.OpenSession(context.Background(), []byte("immudb"), []byte("immudb"), "defaultdb") + if err != nil { + log.Fatalf("Failed to connect: %v", err) + } + defer c.CloseSession(context.Background()) + + // 3. Scan all accounts + req := &schema.ScanRequest{ + Prefix: []byte(DB_OPs.Prefix), + Desc: false, + } + + entries, err := c.Scan(context.Background(), req) + if err != nil { + log.Fatalf("Failed to scan: %v", err) + } + + log.Printf("Found %d total account entries to migrate", len(entries.Entries)) + + var currentBatch []*schema.KeyValue + migratedCount := 0 + + for _, entry := range entries.Entries { + var acc DB_OPs.Account + if err := json.Unmarshal(entry.Value, &acc); err != nil { + log.Printf("Warning: skipped unparseable account %s", string(entry.Key)) + continue + } + + // THE MIGRATION LOGIC + // 1. The old bad nonce is automatically loaded into acc.Nonce thanks to the `json:"nonce"` tag. + // We just need to ensure it's not zero (though it shouldn't be for old accounts). + if acc.Nonce == 0 { + log.Printf("Warning: Account %s had 0 for old nonce!", acc.Address.Hex()) + } + + // 2. Assign the true Ethereum Nonce & TxCount (if available) + if stats, ok := statsMap[acc.Address.Hex()]; ok { + acc.TxNonce = stats.MaxNonce + acc.TxCountSent = stats.TxCount + } else { + acc.TxNonce = 0 + acc.TxCountSent = 0 + } + + // Re-serialize + newBytes, err := json.Marshal(acc) + if err != nil { + log.Fatalf("Marshal failed: %v", err) + } + + currentBatch = append(currentBatch, &schema.KeyValue{ + Key: entry.Key, + Value: newBytes, + }) + + if len(currentBatch) >= *batchSize { + commitBatch(c, currentBatch) + migratedCount += len(currentBatch) + currentBatch = nil + log.Printf("Migrated %d accounts...", migratedCount) + } + } + + if len(currentBatch) > 0 { + commitBatch(c, currentBatch) + migratedCount += len(currentBatch) + } + + log.Printf("SUCCESS: Migrated %d accounts.", migratedCount) + log.Printf("Old bad nonces successfully preserved in ART Nonce, and TxNonce initialized.") +} + +func commitBatch(c client.ImmuClient, batch []*schema.KeyValue) { + req := &schema.SetRequest{KVs: batch} + if _, err := c.SetAll(context.Background(), req); err != nil { + log.Fatalf("Failed to commit batch: %v", err) + } +} diff --git a/Security/Security.go b/Security/Security.go index 12522723..fb7755b1 100644 --- a/Security/Security.go +++ b/Security/Security.go @@ -517,7 +517,7 @@ func allChecksWithConn(tx *config.Transaction, security_cache *SecurityCache, ma account, err := DB_OPs.GetAccount(mainDBConn, *tx.From) var expectedNonce uint64 = 0 if err == nil && account != nil { - expectedNonce = account.Nonce + expectedNonce = account.TxNonce } else if err != nil && err.Error() != "account not found" { nonceSpan.RecordError(err) nonceSpan.End() diff --git a/go.mod b/go.mod index eb14307a..ccd2d5d3 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module gossipnode go 1.25.0 require ( - github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260601052219-40e74741de7c + github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260604113915-c1470ecc039d github.com/JupiterMetaLabs/JMDN_Merkletree v0.0.0-20260413092720-b819e61566f8 github.com/JupiterMetaLabs/goroutine-orchestrator v0.1.5 github.com/JupiterMetaLabs/ion v0.4.2 @@ -212,5 +212,3 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.4.1 // indirect ) - -replace github.com/JupiterMetaLabs/JMDN-FastSync => ../JMDN-FastSync diff --git a/go.sum b/go.sum index 51b870f2..e5c2b4b7 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260601052219-40e74741de7c h1:2Kgkf8pb/FEkLllenyy48GsHda4501EvwHOSdEXabNY= -github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260601052219-40e74741de7c/go.mod h1:0erT7gGH4TYtitRik+Y3GfxSa5KGLacr9rJovV3vNB0= +github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260604113915-c1470ecc039d h1:DQ+APreEZ1rJtcYlj3ZOz4h4F1frZnYBupQkhD06SUQ= +github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260604113915-c1470ecc039d/go.mod h1:0erT7gGH4TYtitRik+Y3GfxSa5KGLacr9rJovV3vNB0= github.com/JupiterMetaLabs/JMDN_Merkletree v0.0.0-20260413092720-b819e61566f8 h1:yPrYb6g6NnqGsiCVqMf0zndEYTuelL3B03Fee+utLWA= github.com/JupiterMetaLabs/JMDN_Merkletree v0.0.0-20260413092720-b819e61566f8/go.mod h1:zM8F31G2SiPXzTo1WzbDFZ5iOOAkqrkuZjS0QVDW4ew= github.com/JupiterMetaLabs/goroutine-orchestrator v0.1.5 h1:S9+s6JeWSrGJ6ooYb4f8iRlJxwPUZ8X/EA4EgxKS3zc= diff --git a/messaging/BlockProcessing/Processing.go b/messaging/BlockProcessing/Processing.go index 3b576d63..8d58f49f 100644 --- a/messaging/BlockProcessing/Processing.go +++ b/messaging/BlockProcessing/Processing.go @@ -922,8 +922,8 @@ func deductFromSender(span_ctx context.Context, tx *config.Transaction, amount s } // Foolproof execution-time nonce check (prevents same-block replay attacks) - if tx.Nonce < didDoc.Nonce { - return fmt.Errorf("execution rejected: submitted nonce %d is lower than account's current DB nonce %d (possible same-block replay attack)", tx.Nonce, didDoc.Nonce) + if tx.Nonce < didDoc.TxNonce { + return fmt.Errorf("execution rejected: submitted nonce %d is lower than account's current DB nonce %d (possible same-block replay attack)", tx.Nonce, didDoc.TxNonce) } // Parse amount to deduct diff --git a/messaging/DIDPropagation.go b/messaging/DIDPropagation.go index cfe6ec41..217a3edf 100644 --- a/messaging/DIDPropagation.go +++ b/messaging/DIDPropagation.go @@ -150,7 +150,7 @@ func storeAccountInDB(msg DIDMessage) { // UpdatedAt: time.Now().UTC().Unix(), // } - // Store Account document preserving the sender's StateID + // Store Account document preserving the sender's ART Nonce err := DB_OPs.StorePropagatedAccount(client, msg.Account) if err != nil { log.Error().Err(err).Str("Account", msg.Account.DIDAddress).Msg("Failed to store Propagated Account in database") From df5f00d70da3a41f08c8eccba4c0a622502516e3 Mon Sep 17 00:00:00 2001 From: naman <35531672+i-naman@users.noreply.github.com> Date: Fri, 5 Jun 2026 17:56:57 +0530 Subject: [PATCH 7/8] fix: Correct Ethereum TxNonce validation logic to prevent intra-block replays - Security module now exclusively uses SecurityCache for TxNonce resolution, drastically reducing DB I/O during batch validation. - Cache enforces mutation upon successful verification (UpdateTxNonce), mitigating intra-block replay attack vectors. - Renamed cache methods to strictly differentiate between Ethereum TxNonce and Fastsync Nonce (StateID). - Fixed static off-by-one error (MaxNonce vs MaxNonce + 1) in V3 migration script. --- Scripts/patch_accounts_v3.go | 165 ++++++++++++++++++++++------------- Security/Security.go | 15 ++-- Security/security_cache.go | 8 +- 3 files changed, 115 insertions(+), 73 deletions(-) diff --git a/Scripts/patch_accounts_v3.go b/Scripts/patch_accounts_v3.go index 58746572..71b5bd7d 100644 --- a/Scripts/patch_accounts_v3.go +++ b/Scripts/patch_accounts_v3.go @@ -1,15 +1,13 @@ package main import ( + "bytes" "context" "encoding/json" "flag" "fmt" "log" "os" - "path/filepath" - "sync" - "time" "gossipnode/DB_OPs" @@ -24,14 +22,15 @@ var ( ) type AccountStats struct { - TxCount uint64 `json:"tx_count"` - MaxNonce uint64 `json:"max_nonce"` + TxCount uint64 `json:"tx_count"` + MaxNonce uint64 `json:"max_nonce"` + MaxNonceSet bool `json:"max_nonce_set"` } func main() { flag.Parse() - log.Printf("Starting V3 Migration...") + log.Printf("Starting V3 Migration with Pagination...") log.Printf("Data Directory: %s", *dataDir) // 1. Load Replay Stats (if available) @@ -51,83 +50,123 @@ func main() { WithAddress("127.0.0.1"). WithPort(3323) + ctx := context.Background() c := client.NewClient().WithOptions(opts) - err := c.OpenSession(context.Background(), []byte("immudb"), []byte("immudb"), "defaultdb") + err := c.OpenSession(ctx, []byte("immudb"), []byte("immudb"), "accountsdb") if err != nil { - log.Fatalf("Failed to connect: %v", err) + log.Fatalf("Failed to connect to accountsdb: %v", err) } - defer c.CloseSession(context.Background()) + defer c.CloseSession(ctx) - // 3. Scan all accounts - req := &schema.ScanRequest{ - Prefix: []byte(DB_OPs.Prefix), - Desc: false, - } + log.Printf("Connected to accountsdb. Starting paginated patch process...") - entries, err := c.Scan(context.Background(), req) - if err != nil { - log.Fatalf("Failed to scan: %v", err) - } + processedAccounts := 0 + patchedAccounts := 0 + parseErrors := 0 - log.Printf("Found %d total account entries to migrate", len(entries.Entries)) + prefix := []byte(DB_OPs.Prefix) + var seekKey []byte - var currentBatch []*schema.KeyValue - migratedCount := 0 + for { + req := &schema.ScanRequest{ + Prefix: prefix, + SeekKey: seekKey, + Limit: uint64(*batchSize), + Desc: false, + } - for _, entry := range entries.Entries { - var acc DB_OPs.Account - if err := json.Unmarshal(entry.Value, &acc); err != nil { - log.Printf("Warning: skipped unparseable account %s", string(entry.Key)) - continue + resp, err := c.Scan(ctx, req) + if err != nil { + log.Fatalf("FATAL: scan failed (seekKey=%q): %v", string(seekKey), err) + } + if len(resp.Entries) == 0 { + break } - // THE MIGRATION LOGIC - // 1. The old bad nonce is automatically loaded into acc.Nonce thanks to the `json:"nonce"` tag. - // We just need to ensure it's not zero (though it shouldn't be for old accounts). - if acc.Nonce == 0 { - log.Printf("Warning: Account %s had 0 for old nonce!", acc.Address.Hex()) + // ImmuDB Scan with SeekKey is INCLUSIVE — skip the first entry if it + // matches our cursor to avoid re-processing and infinite loops. + startIndex := 0 + if seekKey != nil && len(resp.Entries) > 0 && bytes.Equal(resp.Entries[0].Key, seekKey) { + startIndex = 1 } - // 2. Assign the true Ethereum Nonce & TxCount (if available) - if stats, ok := statsMap[acc.Address.Hex()]; ok { - acc.TxNonce = stats.MaxNonce - acc.TxCountSent = stats.TxCount - } else { - acc.TxNonce = 0 - acc.TxCountSent = 0 + // Build a batch update for all accounts that need patching in this page. + ops := make([]*schema.Op, 0, *batchSize) + + for i := startIndex; i < len(resp.Entries); i++ { + e := resp.Entries[i] + + var acc DB_OPs.Account + if err := json.Unmarshal(e.Value, &acc); err != nil { + log.Printf("WARN: skipping key %q — unmarshal error: %v", string(e.Key), err) + parseErrors++ + continue + } + processedAccounts++ + + // ── V3 Migration Logic ────────────────────────────────────────────── + // 1. The old bad nonce is automatically loaded into acc.Nonce thanks to the `json:"nonce"` tag. + if acc.Nonce == 0 { + // Just a warning, not an error. + log.Printf("Warning: Account %s had 0 for old nonce!", acc.Address.Hex()) + } + + // 2. Assign the true Ethereum Nonce & TxCount + if stats, ok := statsMap[acc.Address.Hex()]; ok { + acc.TxCountSent = stats.TxCount + if stats.MaxNonceSet { + acc.TxNonce = stats.MaxNonce + 1 + } else { + acc.TxNonce = 1 // Fallback if max_nonce_set is somehow false but tx_count > 0 + } + } else { + acc.TxNonce = 0 + acc.TxCountSent = 0 + } + + // Re-marshal and prepare the operation + valBytes, err := json.Marshal(acc) + if err != nil { + log.Printf("WARN: skipping key %q — re-marshal error: %v", string(e.Key), err) + continue + } + + ops = append(ops, &schema.Op{ + Operation: &schema.Op_Kv{ + Kv: &schema.KeyValue{ + Key: e.Key, + Value: valBytes, + }, + }, + }) + patchedAccounts++ } - // Re-serialize - newBytes, err := json.Marshal(acc) - if err != nil { - log.Fatalf("Marshal failed: %v", err) + // Flush the batch for this page + if len(ops) > 0 { + _, err := c.ExecAll(ctx, &schema.ExecAllRequest{Operations: ops}) + if err != nil { + log.Fatalf("FATAL: batch write failed: %v", err) + } } - currentBatch = append(currentBatch, &schema.KeyValue{ - Key: entry.Key, - Value: newBytes, - }) + // Advance the cursor past the last key in this batch + seekKey = resp.Entries[len(resp.Entries)-1].Key - if len(currentBatch) >= *batchSize { - commitBatch(c, currentBatch) - migratedCount += len(currentBatch) - currentBatch = nil - log.Printf("Migrated %d accounts...", migratedCount) + // Progress logging + if processedAccounts%100_000 == 0 && processedAccounts > 0 { + fmt.Printf(" ... processed %d accounts, patched %d so far\n", processedAccounts, patchedAccounts) } - } - if len(currentBatch) > 0 { - commitBatch(c, currentBatch) - migratedCount += len(currentBatch) + // If this was a partial batch we've reached the end. + if len(resp.Entries) < *batchSize { + break + } } - log.Printf("SUCCESS: Migrated %d accounts.", migratedCount) + log.Printf("\n[Phase V3] Migration Complete!") + log.Printf(" Accounts scanned : %d", processedAccounts) + log.Printf(" Accounts patched : %d", patchedAccounts) + log.Printf(" Parse errors : %d", parseErrors) log.Printf("Old bad nonces successfully preserved in ART Nonce, and TxNonce initialized.") } - -func commitBatch(c client.ImmuClient, batch []*schema.KeyValue) { - req := &schema.SetRequest{KVs: batch} - if _, err := c.SetAll(context.Background(), req); err != nil { - log.Fatalf("Failed to commit batch: %v", err) - } -} diff --git a/Security/Security.go b/Security/Security.go index fb7755b1..0eb456b5 100644 --- a/Security/Security.go +++ b/Security/Security.go @@ -514,18 +514,17 @@ func allChecksWithConn(tx *config.Transaction, security_cache *SecurityCache, ma // 6. Nonce validation (USING CACHE) _, nonceSpan := tracer.Start(spanCtx, "Security.allChecksWithCache.validateNonce") - account, err := DB_OPs.GetAccount(mainDBConn, *tx.From) - var expectedNonce uint64 = 0 - if err == nil && account != nil { - expectedNonce = account.TxNonce - } else if err != nil && err.Error() != "account not found" { + account := security_cache.GetAccount(*tx.From) + if account == nil { + err := errors.New("sender account not found in cache") nonceSpan.RecordError(err) nonceSpan.End() span.RecordError(err) logger().Error(spanCtx, "Failed to get account for nonce check", err, ion.String("function", "Security.allChecksWithCache")) - return false, fmt.Errorf("nonce check failed with error: %w", err) + return false, err } + expectedNonce := account.TxNonce nonceSpan.SetAttributes( attribute.Int64("expected_nonce", int64(expectedNonce)), @@ -541,6 +540,10 @@ func allChecksWithConn(tx *config.Transaction, security_cache *SecurityCache, ma ion.String("function", "Security.allChecksWithCache")) return false, err } + + // Update cache so subsequent transactions from same sender see incremented nonce + security_cache.UpdateTxNonce(*tx.From, tx.Nonce+1) + nonceSpan.End() duration := time.Since(startTime).Seconds() diff --git a/Security/security_cache.go b/Security/security_cache.go index d17d514a..e1235a43 100644 --- a/Security/security_cache.go +++ b/Security/security_cache.go @@ -83,21 +83,21 @@ func (s *SecurityCache) SubBalance(address common.Address, wei *big.Int) { } } -func (s *SecurityCache) UpdateNonce(address common.Address, newNonce uint64) { +func (s *SecurityCache) UpdateTxNonce(address common.Address, newNonce uint64) { s.mu.Lock() defer s.mu.Unlock() account := s.accounts[address.Hex()] if account != nil { - account.Nonce = newNonce + account.TxNonce = newNonce } } -func (s *SecurityCache) GetNonce(address common.Address) uint64 { +func (s *SecurityCache) GetTxNonce(address common.Address) uint64 { s.mu.RLock() defer s.mu.RUnlock() account := s.accounts[address.Hex()] if account != nil { - return account.Nonce + return account.TxNonce } return 0 } From 8e2ce3f183fbad91a3237eedf71ad75fbae3fb98 Mon Sep 17 00:00:00 2001 From: naman <35531672+i-naman@users.noreply.github.com> Date: Mon, 8 Jun 2026 16:10:41 +0530 Subject: [PATCH 8/8] fix(pubsub): close topic race condition --- Pubsub/Subscription/SubscriptionManager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Pubsub/Subscription/SubscriptionManager.go b/Pubsub/Subscription/SubscriptionManager.go index 277a7bc2..7571ba01 100644 --- a/Pubsub/Subscription/SubscriptionManager.go +++ b/Pubsub/Subscription/SubscriptionManager.go @@ -351,7 +351,7 @@ func (sm *SubscriptionManager) Unsubscribe(topic string) error { } // Close the topic to free resources if managed.pubsubTopic != nil { - if err := managed.pubsubTopic.Close(); err != nil { + if err := sm.gps.CloseTopic(topic); err != nil { logger().NamedLogger.Warn(trace_ctx, "SubscriptionManager: Failed to close topic", ion.String("topic", topic), ion.String("error", err.Error()), @@ -399,7 +399,7 @@ func (sm *SubscriptionManager) Shutdown() { } // Close the topic to free resources if managed.pubsubTopic != nil { - if err := managed.pubsubTopic.Close(); err != nil { + if err := sm.gps.CloseTopic(topic); err != nil { logger().NamedLogger.Warn(trace_ctx, "SubscriptionManager: Failed to close topic during shutdown", ion.String("topic", topic), ion.String("error", err.Error()),