diff --git a/DB_OPs/Nodeinfo/account_sync_worker.go b/DB_OPs/Nodeinfo/account_sync_worker.go index e7ac1c53..84926a5b 100644 --- a/DB_OPs/Nodeinfo/account_sync_worker.go +++ b/DB_OPs/Nodeinfo/account_sync_worker.go @@ -47,9 +47,10 @@ import ( "sync/atomic" "time" + "gossipnode/DB_OPs" + "github.com/JupiterMetaLabs/JMDN-FastSync/common/types" "github.com/ethereum/go-ethereum/common" - "gossipnode/DB_OPs" ) // ─── dbEntry type alias ─────────────────────────────────────────────────────── @@ -397,7 +398,7 @@ func parseAccountsPayload(dataStr string) ([]dbEntry, error) { if err := json.Unmarshal([]byte(dataStr), &accs); err != nil { return nil, fmt.Errorf("unmarshal []*types.Account: %w", err) } - + // We might emit up to 2 entries per account (address: and did:) entries := make([]dbEntry, 0, len(accs)*2) for _, acc := range accs { @@ -409,6 +410,8 @@ func parseAccountsPayload(dataStr string) ([]dbEntry, error) { Address: acc.Address, Balance: acc.Balance, Nonce: acc.Nonce, + TxNonce: acc.TxNonce, + TxCountSent: acc.TxCountSent, AccountType: acc.AccountType, CreatedAt: acc.CreatedAt, UpdatedAt: acc.UpdatedAt, @@ -418,13 +421,13 @@ func parseAccountsPayload(dataStr string) ([]dbEntry, error) { if err != nil { return nil, fmt.Errorf("marshal DB_OPs.Account for address %s: %w", acc.Address.Hex(), err) } - + // 1. Emit the primary address key entries = append(entries, dbEntry{ Key: DB_OPs.Prefix + acc.Address.Hex(), Value: val, }) - + // 2. Emit the DID key so BatchRestoreAccounts creates the bound reference if acc.DIDAddress != "" { entries = append(entries, dbEntry{ diff --git a/DB_OPs/Nodeinfo/immudb_account_manager.go b/DB_OPs/Nodeinfo/immudb_account_manager.go index 7a6f9984..6033856a 100644 --- a/DB_OPs/Nodeinfo/immudb_account_manager.go +++ b/DB_OPs/Nodeinfo/immudb_account_manager.go @@ -371,7 +371,9 @@ func dbOpsToTypes(acc *DB_OPs.Account) *types.Account { DIDAddress: acc.DIDAddress, Address: acc.Address, Balance: acc.Balance, - Nonce: acc.Nonce, + Nonce: acc.Nonce, // MAP: Perfect Match + TxNonce: acc.TxNonce, // MAP: Perfect Match + TxCountSent: acc.TxCountSent, AccountType: acc.AccountType, CreatedAt: acc.CreatedAt, UpdatedAt: acc.UpdatedAt, diff --git a/DB_OPs/Tests/account_immuclient_test.go b/DB_OPs/Tests/account_immuclient_test.go index 7284f809..a1677fc0 100644 --- a/DB_OPs/Tests/account_immuclient_test.go +++ b/DB_OPs/Tests/account_immuclient_test.go @@ -279,54 +279,6 @@ func Test_ConnectionPool_WithNilConnection(t *testing.T) { fmt.Printf(" Address: %s\n", address.Hex()) } -func Test_Account_Nonce_Generation(t *testing.T) { - fmt.Printf("=== Testing Account Nonce Generation ===\n") - - // Test the nonce generation function - nonce1, err := DB_OPs.PutNonceofAccount() - if err != nil { - t.Fatalf("Failed to generate nonce 1: %v", err) - } - - // Wait a bit to ensure different timestamps - // time.Sleep(1 * time.Millisecond) - - nonce2, err := DB_OPs.PutNonceofAccount() - if err != nil { - t.Fatalf("Failed to generate nonce 2: %v", err) - } - - nonce3, err := DB_OPs.PutNonceofAccount() - if err != nil { - t.Fatalf("Failed to generate nonce 3: %v", err) - } - - time.Sleep(1 * time.Millisecond) - - nonce4, err := DB_OPs.PutNonceofAccount() - if err != nil { - t.Fatalf("Failed to generate nonce 4: %v", err) - } - - fmt.Printf("Generated nonces:\n") - fmt.Printf(" Nonce 1: %d\n", nonce1) - fmt.Printf(" Nonce 2: %d\n", nonce2) - fmt.Printf(" Nonce 3: %d\n", nonce3) - fmt.Printf(" Nonce 4: %d\n", nonce4) - // Verify nonces are different - if nonce1 == nonce2 { - t.Fatalf("Generated nonces should be different") - } - - // Verify nonces are reasonable (based on timestamp) - // Note: The nonce includes a counter in the lower bits, so it might be slightly larger than current timestamp - now := time.Now().UTC().UnixNano() - if nonce1 > uint64(now)+1000000 || nonce2 > uint64(now)+1000000 { - t.Fatalf("Generated nonces should be close to current timestamp") - } - - fmt.Printf("✅ Account nonce generation test passed!\n") -} func Test_Account_Database_Write_Read(t *testing.T) { fmt.Printf("=== Testing Account Database Write and Read ===\n") diff --git a/DB_OPs/account_immuclient.go b/DB_OPs/account_immuclient.go index a9f29c60..e6565ce8 100644 --- a/DB_OPs/account_immuclient.go +++ b/DB_OPs/account_immuclient.go @@ -5,11 +5,11 @@ import ( "encoding/json" "fmt" "strings" + "sync/atomic" "gossipnode/config" "gossipnode/config/settings" - "sync/atomic" "time" "github.com/JupiterMetaLabs/ion" @@ -29,9 +29,11 @@ type Account struct { DIDAddress string `json:"did,omitempty"` // New PublicKey based fields - Address common.Address `json:"address"` // Derived from PublicKey - Balance string `json:"balance,omitempty"` - Nonce uint64 `json:"nonce"` + Nonce uint64 `json:"nonce"` // Unique deterministic ID for Fastsync ART (migrated from old nonce) + Address common.Address `json:"address"` // Derived from PublicKey + Balance string `json:"balance,omitempty"` + TxNonce uint64 `json:"tx_nonce"` // Real Ethereum Nonce + TxCountSent uint64 `json:"tx_count_sent"` // Tracks actual analytical transactions sent // Account metadata AccountType string `json:"account_type"` // "did" or "publickey" @@ -56,38 +58,6 @@ func (s *AccountsSet) Add(address common.Address) { s.Accounts[address.Hex()] = nil } - -// lastNonce is used to guarantee monotonic nanosecond timestamps for PutNonceofAccount. -var lastNonce atomic.Uint64 - -// PutNonceofAccount generates a unique epoch ID for new accounts. -// -// HISTORICAL BUG (Fixed): Previously computed as `uint64(UnixNano) << 16 | counter`, -// which silently overflowed uint64 and corrupted the embedded timestamp. -// -// FIX (Option C): We now use a pure monotonic nanosecond counter. It returns -// exact UnixNano precision, gracefully bumping by +1ns on extreme collisions. -// -// LIFECYCLE WARNING: The `Nonce` field in the Account struct serves a dual purpose: -// 1. On creation: It stores this unique nanosecond timestamp ID. -// 2. Post-transaction: Reconciliation and consensus overwrite it with the account's -// highest transaction nonce (e.g., 0, 1, 2...). -// Do NOT rely on Account.Nonce remaining a timestamp if the account has sent transactions! -func PutNonceofAccount() (uint64, error) { - for { - ns := uint64(time.Now().UTC().UnixNano()) - prev := lastNonce.Load() - next := ns - if next <= prev { - next = prev + 1 // same-ns collision: bump forward - } - if lastNonce.CompareAndSwap(prev, next) { - return next, nil - } - // CAS lost race against another goroutine — retry - } -} - // Create Account from DID and Address and Store using StoreAccount func CreateAccount(PooledConnection *config.PooledConnection, DIDAddress string, Address common.Address, metadata map[string]interface{}) error { var err error @@ -135,29 +105,26 @@ func CreateAccount(PooledConnection *config.PooledConnection, DIDAddress string, }() } - // Create a Nonce First - Nonce, err := PutNonceofAccount() - if err != nil { - return err - } - // Create A CreatedAt and UpdatedAt CreatedAt := time.Now().UTC().UnixNano() UpdatedAt := time.Now().UTC().UnixNano() + ARTNonce := GenerateARTNonce() + // Create the account document AccountDoc = &Account{ + Nonce: ARTNonce, DIDAddress: DIDAddress, Address: Address, Balance: "0", - Nonce: Nonce, + TxNonce: 0, + TxCountSent: 0, AccountType: "user", CreatedAt: CreatedAt, UpdatedAt: UpdatedAt, Metadata: metadata, } - // Debugging - // fmt.Println("AccountDoc: ", AccountDoc) + // Store the account document err = storeAccount(PooledConnection, AccountDoc) if err != nil { @@ -223,10 +190,12 @@ func storeAccount(PooledConnection *config.PooledConnection, KeyDoc *Account) er // Create the account document AccountDoc = &Account{ + Nonce: KeyDoc.Nonce, DIDAddress: KeyDoc.DIDAddress, Address: KeyDoc.Address, Balance: KeyDoc.Balance, - Nonce: KeyDoc.Nonce, + TxNonce: KeyDoc.TxNonce, + TxCountSent: KeyDoc.TxCountSent, AccountType: KeyDoc.AccountType, CreatedAt: KeyDoc.CreatedAt, UpdatedAt: time.Now().UTC().UnixNano(), @@ -907,6 +876,7 @@ func UpdateAccountBalance(PooledConnection *config.PooledConnection, address com doc.Balance = newBalance doc.UpdatedAt = time.Now().UTC().UnixNano() + fmt.Printf("DEBUG: Updated account document - New balance: %s, New UpdatedAt: %d\n", doc.Balance, doc.UpdatedAt) // Safe Write to the DB with the same key @@ -943,6 +913,71 @@ func UpdateAccountBalance(PooledConnection *config.PooledConnection, address com return nil } +// UpdateAccountSenderState updates sender balance and nonce, incrementing TxCountSent. +func UpdateAccountSenderState(PooledConnection *config.PooledConnection, address common.Address, newBalance string, newNonce uint64) error { + fmt.Printf("=== DEBUG: UpdateAccountSenderState called for address %s with balance %s and nonce %d ===\n", address.Hex(), newBalance, newNonce) + + // Define Function wide context for timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var err error + var shouldReturnConnection = false + if PooledConnection == nil || PooledConnection.Client == nil { + fmt.Println("DEBUG: PooledConnection is nil, getting new connection from pool") + PooledConnection, err = GetAccountConnectionandPutBack(ctx) + if err != nil { + fmt.Printf("DEBUG: Failed to get connection from pool: %v\n", err) + return fmt.Errorf("failed to get connection from pool: %w - UpdateAccountSenderState", err) + } + shouldReturnConnection = true + } + + if shouldReturnConnection { + defer func() { + fmt.Println("DEBUG: Returning connection to pool") + PutAccountsConnection(PooledConnection) + }() + } + + // Ensure we're using the accounts database + if PooledConnection != nil { + if err := ensureAccountsDBSelected(PooledConnection); err != nil { + return fmt.Errorf("failed to ensure accounts database is selected: %w", err) + } + } + + doc, err := GetAccount(PooledConnection, address) + if err != nil { + return err + } + + doc.Balance = newBalance + doc.TxCountSent = doc.TxCountSent + 1 + doc.TxNonce = newNonce + doc.UpdatedAt = time.Now().UTC().UnixNano() + + // Safe Write to the DB with the same key + key := fmt.Sprintf("%s%s", Prefix, address) + err = SafeCreate(PooledConnection.Client, key, doc) + if err != nil { + loggerCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + PooledConnection.Client.Logger.Error(loggerCtx, "Failed to update account sender state", + err, + ion.String("account", address.String()), + ion.String("database", config.AccountsDBName), + ion.String("created_at", time.Now().UTC().Format(time.RFC3339)), + ion.String("log_file", LOG_FILE), + ion.String("topic", TOPIC), + ion.String("function", "DB_OPs.UpdateAccountSenderState")) + return err + } + + fmt.Printf("=== DEBUG: UpdateAccountSenderState completed successfully for address %s ===\n", address.Hex()) + return nil +} + // ListAllAccounts retrieves all Accounts with a limit func ListAllAccounts(PooledConnection *config.PooledConnection, limit int) ([]*Account, error) { var err error @@ -2299,3 +2334,47 @@ func CheckNonceAndGetLatest(PooledConnection *config.PooledConnection, fromAddr return hasDuplicate, latestNonce, foundLatestNonce, nil } + +// [AUDIT OK]: Connection lifecycle, determinism via addr bytes, and Immudb writes verified safe across 1 call site in BlockProcessing. +// [AUDIT OK]: Read-modify-write pattern verified safe; GetAccount validates existence; 3 call sites in BlockProcessing. +// [AUDIT OK]: State transition logic (TxCountSent++, Nonce update) and blockTimestamp propagation verified safe; 1 call site in BlockProcessing. +// [AUDIT OK]: Nil checks on account/address, connection pooling handling, and direct storage verified safe; 1 call site in DIDPropagation. +// StorePropagatedAccount securely stores an account received from the P2P network, +// perfectly preserving its ART Nonce and other properties to ensure Fastsync consensus. +func StorePropagatedAccount(PooledConnection *config.PooledConnection, account *Account) error { + var err error + var shouldReturnConnection = false + + if account == nil || account.Address == (common.Address{}) { + return fmt.Errorf("propagated account is invalid") + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if PooledConnection == nil || PooledConnection.Client == nil { + PooledConnection, err = GetAccountConnectionandPutBack(ctx) + if err != nil { + return fmt.Errorf("failed to get accounts connection: %w - StorePropagatedAccount", err) + } + shouldReturnConnection = true + } + + if shouldReturnConnection { + defer PutAccountsConnection(PooledConnection) + } + + return storeAccount(PooledConnection, account) +} + +var artNonceCounter uint64 + +// [AUDIT OK]: Atomic counter and bit shift mathematically proven safe against overflow (51 bits for micro + 12 for counter = 63 bits); 1 call site in CreateAccount. +// GenerateARTNonce generates a locally unique Nonce for Fastsync ART routing. +// This is strictly used when this node originates an account (e.g., manual DID creation). +// Accounts synced from the network MUST preserve the sender's ART Nonce. +func GenerateARTNonce() uint64 { + ts := uint64(time.Now().UTC().UnixMicro()) + c := atomic.AddUint64(&artNonceCounter, 1) + return (ts << 12) | (c & 0xFFF) +} diff --git a/DID/DID.go b/DID/DID.go index 3f32102e..0198d5d3 100644 --- a/DID/DID.go +++ b/DID/DID.go @@ -345,8 +345,8 @@ func (s *AccountServer) RegisterDID(ctx context.Context, req *pb.RegisterDIDRequ Did: req.Did, PublicKey: req.PublicKey, Balance: "0", - CreatedAt: time.Now().UTC().Unix(), - UpdatedAt: time.Now().UTC().Unix(), + CreatedAt: time.Now().UTC().UnixNano(), + UpdatedAt: time.Now().UTC().UnixNano(), }, }, nil } diff --git a/FastsyncV2/fastsyncv2.go b/FastsyncV2/fastsyncv2.go index 30b5c052..8e66eab5 100644 --- a/FastsyncV2/fastsyncv2.go +++ b/FastsyncV2/fastsyncv2.go @@ -431,6 +431,20 @@ func (fs *FastsyncV2) handleSyncInternal(targetPeer string, startBlock uint64) e } } if len(missingMap) > 0 { + // DataSync can run for several minutes on large syncs, causing the + // server-side AUTH_TTL (2 min) to expire. Re-run Availability to get + // a fresh token before calling FetchAccounts so we don't fail auth. + if refreshResp, refreshErr := fs.AvailRouter.SendAvailabilityRequest( + ctx, fs.PriorRouter.GetSyncVars(), *targetNodeInfo, startBlock, math.MaxUint64, + ); refreshErr != nil { + log.Printf("[FastsyncV2] Phase 4.5: auth refresh failed, proceeding with existing token: %v", refreshErr) + } else if refreshResp.IsAvailable && refreshResp.Auth != nil && refreshResp.Auth.UUID != "" { + log.Printf("[FastsyncV2] Phase 4.5: auth token refreshed (UUID=%s)", refreshResp.Auth.UUID) + availResp = refreshResp + // Rebuild the remotes slice so Phase 5/6 also use the fresh token. + remotes = []*availabilitypb.AvailabilityResponse{availResp} + } + log.Printf("[FastsyncV2] Phase 4.5: fetching %d missing tagged accounts", len(missingMap)) resp, err := fs.AccountSyncRouter.FetchAccounts(availResp, missingMap) if err != nil { diff --git a/Pubsub/Subscription/SubscriptionManager.go b/Pubsub/Subscription/SubscriptionManager.go index 277a7bc2..7571ba01 100644 --- a/Pubsub/Subscription/SubscriptionManager.go +++ b/Pubsub/Subscription/SubscriptionManager.go @@ -351,7 +351,7 @@ func (sm *SubscriptionManager) Unsubscribe(topic string) error { } // Close the topic to free resources if managed.pubsubTopic != nil { - if err := managed.pubsubTopic.Close(); err != nil { + if err := sm.gps.CloseTopic(topic); err != nil { logger().NamedLogger.Warn(trace_ctx, "SubscriptionManager: Failed to close topic", ion.String("topic", topic), ion.String("error", err.Error()), @@ -399,7 +399,7 @@ func (sm *SubscriptionManager) Shutdown() { } // Close the topic to free resources if managed.pubsubTopic != nil { - if err := managed.pubsubTopic.Close(); err != nil { + if err := sm.gps.CloseTopic(topic); err != nil { logger().NamedLogger.Warn(trace_ctx, "SubscriptionManager: Failed to close topic during shutdown", ion.String("topic", topic), ion.String("error", err.Error()), diff --git a/Scripts/patch_accounts.go b/Scripts/patch_accounts.go new file mode 100644 index 00000000..727a69b2 --- /dev/null +++ b/Scripts/patch_accounts.go @@ -0,0 +1,198 @@ +// Phase 2: Read replayed_accounts.json (produced by replay_blocks_and_export.go) and patch +// every account in accountsdb with the ground-truth Nonce and standardised timestamps. +// +// What gets patched per account: +// - Nonce : set to TxCount from replay (= "next expected nonce", i.e. max_nonce + 1 for +// well-behaved chains, or exactly TxCount when nonces are 0-indexed and gapless) +// - CreatedAt : if stored in seconds (< 1e15), multiply by 1e9 → nanoseconds +// - UpdatedAt : same normalisation as CreatedAt +// +// IMPORTANT: run replay_blocks_and_export.go first and review its output before running this. +// +// Usage: +// go run scripts/patch_accounts.go -host 127.0.0.1 -port 3322 -user immudb -pass immudb + +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "fmt" + "log" + "os" + + "github.com/codenotary/immudb/pkg/api/schema" + "github.com/codenotary/immudb/pkg/client" + + "gossipnode/DB_OPs" +) + +// AccountStats mirrors the struct from replay_blocks_and_export.go. +type AccountStats struct { + TxCount uint64 `json:"tx_count"` + MaxNonce uint64 `json:"max_nonce"` + MaxNonceSet bool `json:"max_nonce_set"` +} + +const scanBatchSize = 500 // keep batches small for accountsdb health + +func main() { + immudbHost := flag.String("host", "127.0.0.1", "ImmuDB Host") + immudbPort := flag.Int("port", 3322, "ImmuDB Port") + immudbUser := flag.String("user", "immudb", "ImmuDB Username") + immudbPass := flag.String("pass", "immudb", "ImmuDB Password") + inputFile := flag.String("in", "replayed_accounts.json", "Input JSON file produced by Phase 1") + flag.Parse() + + // 1. Load the ground-truth snapshot produced by Phase 1 + fileBytes, err := os.ReadFile(*inputFile) + if err != nil { + log.Fatalf("FATAL: cannot read %s: %v\n → Did you run replay_blocks_and_export.go first?", *inputFile, err) + } + + stats := make(map[string]*AccountStats) + if err := json.Unmarshal(fileBytes, &stats); err != nil { + log.Fatalf("FATAL: failed to parse %s: %v", *inputFile, err) + } + fmt.Printf("[Phase 2] Loaded %d active addresses from %s\n", len(stats), *inputFile) + + ctx := context.Background() + + // 2. Connect to accountsdb + opts := client.DefaultOptions().WithAddress(*immudbHost).WithPort(*immudbPort) + c := client.NewClient().WithOptions(opts) + if err := c.OpenSession(ctx, []byte(*immudbUser), []byte(*immudbPass), "accountsdb"); err != nil { + log.Fatalf("FATAL: failed to open session on accountsdb: %v", err) + } + defer c.CloseSession(ctx) + + fmt.Println("[Phase 2] Connected to accountsdb. Starting patch process...") + + // Counters + processedAccounts := 0 + patchedAccounts := 0 + fixedNonces := 0 + fixedTimestamps := 0 + parseErrors := 0 + + prefix := []byte(DB_OPs.Prefix) // "address:" + var seekKey []byte // nil = start from first key + + for { + req := &schema.ScanRequest{ + Prefix: prefix, + SeekKey: seekKey, + Limit: scanBatchSize, + Desc: false, + } + + resp, err := c.Scan(ctx, req) + if err != nil { + log.Fatalf("FATAL: scan failed (seekKey=%q): %v", string(seekKey), err) + } + if len(resp.Entries) == 0 { + break + } + + // ImmuDB Scan with SeekKey is INCLUSIVE — skip the first entry if it + // matches our cursor to avoid re-processing and infinite loops. + startIndex := 0 + if seekKey != nil && len(resp.Entries) > 0 && bytes.Equal(resp.Entries[0].Key, seekKey) { + startIndex = 1 + } + + // Build a batch update for all accounts that need patching in this page. + ops := make([]*schema.Op, 0, scanBatchSize) + + for i := startIndex; i < len(resp.Entries); i++ { + e := resp.Entries[i] + + var acc DB_OPs.Account + if err := json.Unmarshal(e.Value, &acc); err != nil { + log.Printf("WARN: skipping key %q — unmarshal error: %v", string(e.Key), err) + parseErrors++ + continue + } + processedAccounts++ + + needsPatch := false + + // ── Timestamp normalisation ────────────────────────────────────────────── + // Timestamps stored as Unix seconds are < 1_000_000_000_000_000 (1e15). + // Multiply by 1e9 to convert to nanoseconds. + const nsThreshold = int64(1_000_000_000_000_000) // 1e15 + + if acc.CreatedAt > 0 && acc.CreatedAt < nsThreshold { + acc.CreatedAt *= 1_000_000_000 + fixedTimestamps++ + needsPatch = true + } + if acc.UpdatedAt > 0 && acc.UpdatedAt < nsThreshold { + acc.UpdatedAt *= 1_000_000_000 + fixedTimestamps++ + needsPatch = true + } + + // ── Nonce correction ───────────────────────────────────────────────────── + // For addresses that appear in the replay, the true nonce = TxCount. + // Accounts that have never sent a transaction get nonce = 0. + var trueNonce uint64 = 0 + if s, exists := stats[acc.Address.Hex()]; exists { + trueNonce = s.TxCount + } + + if acc.Nonce != trueNonce { + acc.Nonce = trueNonce + fixedNonces++ + needsPatch = true + } + + if needsPatch { + patchedAccounts++ + valBytes, err := json.Marshal(acc) + if err != nil { + log.Printf("WARN: skipping key %q — re-marshal error: %v", string(e.Key), err) + continue + } + ops = append(ops, &schema.Op{ + Operation: &schema.Op_Kv{ + Kv: &schema.KeyValue{ + Key: e.Key, + Value: valBytes, + }, + }, + }) + } + } + + // Flush the batch for this page + if len(ops) > 0 { + _, err := c.ExecAll(ctx, &schema.ExecAllRequest{Operations: ops}) + if err != nil { + log.Fatalf("FATAL: batch write failed: %v", err) + } + } + + // Advance the cursor past the last key in this batch + seekKey = resp.Entries[len(resp.Entries)-1].Key + + // Progress logging + if processedAccounts%100_000 == 0 && processedAccounts > 0 { + fmt.Printf(" ... processed %d accounts, patched %d so far\n", processedAccounts, patchedAccounts) + } + + // If this was a partial batch we've reached the end. + if len(resp.Entries) < scanBatchSize { + break + } + } + + fmt.Println("\n[Phase 2] Migration Complete!") + fmt.Printf(" Accounts scanned : %d\n", processedAccounts) + fmt.Printf(" Accounts patched : %d\n", patchedAccounts) + fmt.Printf(" Nonces fixed : %d\n", fixedNonces) + fmt.Printf(" Timestamps fixed: %d\n", fixedTimestamps) + fmt.Printf(" Parse errors : %d\n", parseErrors) +} diff --git a/Scripts/patch_accounts_v2.go b/Scripts/patch_accounts_v2.go new file mode 100644 index 00000000..50c449ee --- /dev/null +++ b/Scripts/patch_accounts_v2.go @@ -0,0 +1,204 @@ +// Phase 2: Read replayed_accounts.json (produced by replay_blocks_and_export.go) and patch +// every account in accountsdb with the ground-truth Nonce, TxCountSent, and standardised timestamps. +// +// What gets patched per account: +// - Nonce : set to MaxNonce + 1 to prevent replay attacks (since gapless nonces weren't strictly enforced before) +// - TxCountSent : set to TxCount to decouple analytical transaction count from cryptographic Nonce +// - CreatedAt : if stored in seconds (< 1e15), multiply by 1e9 → nanoseconds +// - UpdatedAt : same normalisation as CreatedAt +// +// IMPORTANT: run replay_blocks_and_export.go first and review its output before running this. +// +// Usage: +// go run scripts/patch_accounts_v2.go -host 127.0.0.1 -port 3323 -user immudb -pass immudb + +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "fmt" + "log" + "os" + + "github.com/codenotary/immudb/pkg/api/schema" + "github.com/codenotary/immudb/pkg/client" + + "gossipnode/DB_OPs" +) + +// AccountStats mirrors the struct from replay_blocks_and_export.go. +type AccountStats struct { + TxCount uint64 `json:"tx_count"` + MaxNonce uint64 `json:"max_nonce"` + MaxNonceSet bool `json:"max_nonce_set"` +} + +const scanBatchSize = 500 // keep batches small for accountsdb health + +func main() { + immudbHost := flag.String("host", "127.0.0.1", "ImmuDB Host") + immudbPort := flag.Int("port", 3322, "ImmuDB Port") + immudbUser := flag.String("user", "immudb", "ImmuDB Username") + immudbPass := flag.String("pass", "immudb", "ImmuDB Password") + inputFile := flag.String("in", "replayed_accounts.json", "Input JSON file produced by Phase 1") + flag.Parse() + + // 1. Load the ground-truth snapshot produced by Phase 1 + fileBytes, err := os.ReadFile(*inputFile) + if err != nil { + log.Fatalf("FATAL: cannot read %s: %v\n → Did you run replay_blocks_and_export.go first?", *inputFile, err) + } + + stats := make(map[string]*AccountStats) + if err := json.Unmarshal(fileBytes, &stats); err != nil { + log.Fatalf("FATAL: failed to parse %s: %v", *inputFile, err) + } + fmt.Printf("[Phase 2] Loaded %d active addresses from %s\n", len(stats), *inputFile) + + ctx := context.Background() + + // 2. Connect to accountsdb + opts := client.DefaultOptions().WithAddress(*immudbHost).WithPort(*immudbPort) + c := client.NewClient().WithOptions(opts) + if err := c.OpenSession(ctx, []byte(*immudbUser), []byte(*immudbPass), "accountsdb"); err != nil { + log.Fatalf("FATAL: failed to open session on accountsdb: %v", err) + } + defer c.CloseSession(ctx) + + fmt.Println("[Phase 2] Connected to accountsdb. Starting patch process...") + + // Counters + processedAccounts := 0 + patchedAccounts := 0 + fixedNonces := 0 + fixedTimestamps := 0 + parseErrors := 0 + + prefix := []byte(DB_OPs.Prefix) // "address:" + var seekKey []byte // nil = start from first key + + for { + req := &schema.ScanRequest{ + Prefix: prefix, + SeekKey: seekKey, + Limit: scanBatchSize, + Desc: false, + } + + resp, err := c.Scan(ctx, req) + if err != nil { + log.Fatalf("FATAL: scan failed (seekKey=%q): %v", string(seekKey), err) + } + if len(resp.Entries) == 0 { + break + } + + // ImmuDB Scan with SeekKey is INCLUSIVE — skip the first entry if it + // matches our cursor to avoid re-processing and infinite loops. + startIndex := 0 + if seekKey != nil && len(resp.Entries) > 0 && bytes.Equal(resp.Entries[0].Key, seekKey) { + startIndex = 1 + } + + // Build a batch update for all accounts that need patching in this page. + ops := make([]*schema.Op, 0, scanBatchSize) + + for i := startIndex; i < len(resp.Entries); i++ { + e := resp.Entries[i] + + var acc DB_OPs.Account + if err := json.Unmarshal(e.Value, &acc); err != nil { + log.Printf("WARN: skipping key %q — unmarshal error: %v", string(e.Key), err) + parseErrors++ + continue + } + processedAccounts++ + + needsPatch := false + + // ── Timestamp normalisation ────────────────────────────────────────────── + // Timestamps stored as Unix seconds are < 1_000_000_000_000_000 (1e15). + // Multiply by 1e9 to convert to nanoseconds. + const nsThreshold = int64(1_000_000_000_000_000) // 1e15 + + if acc.CreatedAt > 0 && acc.CreatedAt < nsThreshold { + acc.CreatedAt *= 1_000_000_000 + fixedTimestamps++ + needsPatch = true + } + if acc.UpdatedAt > 0 && acc.UpdatedAt < nsThreshold { + acc.UpdatedAt *= 1_000_000_000 + fixedTimestamps++ + needsPatch = true + } + + // ── Nonce and TxCountSent correction ───────────────────────────────────── + var trueNonce uint64 = 0 + var trueTxCountSent uint64 = 0 + + if s, exists := stats[acc.Address.Hex()]; exists { + trueTxCountSent = s.TxCount + if s.MaxNonceSet { + trueNonce = s.MaxNonce + 1 + } else { + trueNonce = 1 // Fallback if max_nonce_set is somehow false but tx_count > 0 + } + } + + if acc.Nonce != trueNonce || acc.TxCountSent != trueTxCountSent { + acc.Nonce = trueNonce + acc.TxCountSent = trueTxCountSent + fixedNonces++ + needsPatch = true + } + + if needsPatch { + patchedAccounts++ + valBytes, err := json.Marshal(acc) + if err != nil { + log.Printf("WARN: skipping key %q — re-marshal error: %v", string(e.Key), err) + continue + } + ops = append(ops, &schema.Op{ + Operation: &schema.Op_Kv{ + Kv: &schema.KeyValue{ + Key: e.Key, + Value: valBytes, + }, + }, + }) + } + } + + // Flush the batch for this page + if len(ops) > 0 { + _, err := c.ExecAll(ctx, &schema.ExecAllRequest{Operations: ops}) + if err != nil { + log.Fatalf("FATAL: batch write failed: %v", err) + } + } + + // Advance the cursor past the last key in this batch + seekKey = resp.Entries[len(resp.Entries)-1].Key + + // Progress logging + if processedAccounts%100_000 == 0 && processedAccounts > 0 { + fmt.Printf(" ... processed %d accounts, patched %d so far\n", processedAccounts, patchedAccounts) + } + + // If this was a partial batch we've reached the end. + if len(resp.Entries) < scanBatchSize { + break + } + } + + fmt.Println("\n[Phase 2] Migration Complete!") + fmt.Printf(" Accounts scanned : %d\n", processedAccounts) + fmt.Printf(" Accounts patched : %d\n", patchedAccounts) + fmt.Printf(" Nonces fixed : %d\n", fixedNonces) + fmt.Printf(" Timestamps fixed: %d\n", fixedTimestamps) + fmt.Printf(" Parse errors : %d\n", parseErrors) +} diff --git a/Scripts/patch_accounts_v3.go b/Scripts/patch_accounts_v3.go new file mode 100644 index 00000000..71b5bd7d --- /dev/null +++ b/Scripts/patch_accounts_v3.go @@ -0,0 +1,172 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "fmt" + "log" + "os" + + "gossipnode/DB_OPs" + + "github.com/codenotary/immudb/pkg/api/schema" + "github.com/codenotary/immudb/pkg/client" +) + +var ( + dataDir = flag.String("dir", "/opt/jmdn/data-old-live", "Path to pre-migrated ImmuDB data directory") + replayStats = flag.String("replay-stats", "replayed_accounts.json", "Optional JSON file with actual TxCounts (Address -> {Nonce, TxCount})") + batchSize = flag.Int("batch", 500, "Number of accounts per commit") +) + +type AccountStats struct { + TxCount uint64 `json:"tx_count"` + MaxNonce uint64 `json:"max_nonce"` + MaxNonceSet bool `json:"max_nonce_set"` +} + +func main() { + flag.Parse() + + log.Printf("Starting V3 Migration with Pagination...") + log.Printf("Data Directory: %s", *dataDir) + + // 1. Load Replay Stats (if available) + statsMap := make(map[string]AccountStats) + if data, err := os.ReadFile(*replayStats); err == nil { + if err := json.Unmarshal(data, &statsMap); err != nil { + log.Fatalf("Failed to parse %s: %v", *replayStats, err) + } + log.Printf("Loaded true transaction stats for %d active accounts", len(statsMap)) + } else { + log.Printf("No replay stats found at %s. Proceeding with TxNonce=0 for all accounts.", *replayStats) + } + + // 2. Initialize ImmuDB + opts := client.DefaultOptions(). + WithDir(*dataDir). + WithAddress("127.0.0.1"). + WithPort(3323) + + ctx := context.Background() + c := client.NewClient().WithOptions(opts) + err := c.OpenSession(ctx, []byte("immudb"), []byte("immudb"), "accountsdb") + if err != nil { + log.Fatalf("Failed to connect to accountsdb: %v", err) + } + defer c.CloseSession(ctx) + + log.Printf("Connected to accountsdb. Starting paginated patch process...") + + processedAccounts := 0 + patchedAccounts := 0 + parseErrors := 0 + + prefix := []byte(DB_OPs.Prefix) + var seekKey []byte + + for { + req := &schema.ScanRequest{ + Prefix: prefix, + SeekKey: seekKey, + Limit: uint64(*batchSize), + Desc: false, + } + + resp, err := c.Scan(ctx, req) + if err != nil { + log.Fatalf("FATAL: scan failed (seekKey=%q): %v", string(seekKey), err) + } + if len(resp.Entries) == 0 { + break + } + + // ImmuDB Scan with SeekKey is INCLUSIVE — skip the first entry if it + // matches our cursor to avoid re-processing and infinite loops. + startIndex := 0 + if seekKey != nil && len(resp.Entries) > 0 && bytes.Equal(resp.Entries[0].Key, seekKey) { + startIndex = 1 + } + + // Build a batch update for all accounts that need patching in this page. + ops := make([]*schema.Op, 0, *batchSize) + + for i := startIndex; i < len(resp.Entries); i++ { + e := resp.Entries[i] + + var acc DB_OPs.Account + if err := json.Unmarshal(e.Value, &acc); err != nil { + log.Printf("WARN: skipping key %q — unmarshal error: %v", string(e.Key), err) + parseErrors++ + continue + } + processedAccounts++ + + // ── V3 Migration Logic ────────────────────────────────────────────── + // 1. The old bad nonce is automatically loaded into acc.Nonce thanks to the `json:"nonce"` tag. + if acc.Nonce == 0 { + // Just a warning, not an error. + log.Printf("Warning: Account %s had 0 for old nonce!", acc.Address.Hex()) + } + + // 2. Assign the true Ethereum Nonce & TxCount + if stats, ok := statsMap[acc.Address.Hex()]; ok { + acc.TxCountSent = stats.TxCount + if stats.MaxNonceSet { + acc.TxNonce = stats.MaxNonce + 1 + } else { + acc.TxNonce = 1 // Fallback if max_nonce_set is somehow false but tx_count > 0 + } + } else { + acc.TxNonce = 0 + acc.TxCountSent = 0 + } + + // Re-marshal and prepare the operation + valBytes, err := json.Marshal(acc) + if err != nil { + log.Printf("WARN: skipping key %q — re-marshal error: %v", string(e.Key), err) + continue + } + + ops = append(ops, &schema.Op{ + Operation: &schema.Op_Kv{ + Kv: &schema.KeyValue{ + Key: e.Key, + Value: valBytes, + }, + }, + }) + patchedAccounts++ + } + + // Flush the batch for this page + if len(ops) > 0 { + _, err := c.ExecAll(ctx, &schema.ExecAllRequest{Operations: ops}) + if err != nil { + log.Fatalf("FATAL: batch write failed: %v", err) + } + } + + // Advance the cursor past the last key in this batch + seekKey = resp.Entries[len(resp.Entries)-1].Key + + // Progress logging + if processedAccounts%100_000 == 0 && processedAccounts > 0 { + fmt.Printf(" ... processed %d accounts, patched %d so far\n", processedAccounts, patchedAccounts) + } + + // If this was a partial batch we've reached the end. + if len(resp.Entries) < *batchSize { + break + } + } + + log.Printf("\n[Phase V3] Migration Complete!") + log.Printf(" Accounts scanned : %d", processedAccounts) + log.Printf(" Accounts patched : %d", patchedAccounts) + log.Printf(" Parse errors : %d", parseErrors) + log.Printf("Old bad nonces successfully preserved in ART Nonce, and TxNonce initialized.") +} diff --git a/Scripts/replay_blocks_and_export.go b/Scripts/replay_blocks_and_export.go new file mode 100644 index 00000000..d80bb6d7 --- /dev/null +++ b/Scripts/replay_blocks_and_export.go @@ -0,0 +1,138 @@ +// Phase 1: Replay all blocks from defaultdb and export per-address stats to JSON. +// +// For every confirmed transaction it records: +// - tx_count : true number of transactions sent by this address +// - max_nonce : highest nonce value recorded in any of those transactions +// +// Output: replayed_accounts.json (safe to review before running patch_accounts.go) +// +// Usage: +// go run scripts/replay_blocks_and_export.go -host 127.0.0.1 -port 3322 -user immudb -pass immudb + +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log" + "os" + + "github.com/codenotary/immudb/pkg/client" + + "gossipnode/DB_OPs" + "gossipnode/config" +) + +// AccountStats holds the ground-truth values computed from block replay. +type AccountStats struct { + TxCount uint64 `json:"tx_count"` + MaxNonce uint64 `json:"max_nonce"` + MaxNonceSet bool `json:"max_nonce_set"` // true once we have seen at least one tx +} + +func main() { + immudbHost := flag.String("host", "127.0.0.1", "ImmuDB Host") + immudbPort := flag.Int("port", 3322, "ImmuDB Port") + immudbUser := flag.String("user", "immudb", "ImmuDB Username") + immudbPass := flag.String("pass", "immudb", "ImmuDB Password") + outputFile := flag.String("out", "replayed_accounts.json", "Output JSON file path") + flag.Parse() + + ctx := context.Background() + + // 1. Connect directly to defaultdb (read-only intent — we only call Get) + opts := client.DefaultOptions().WithAddress(*immudbHost).WithPort(*immudbPort) + c := client.NewClient().WithOptions(opts) + if err := c.OpenSession(ctx, []byte(*immudbUser), []byte(*immudbPass), "defaultdb"); err != nil { + log.Fatalf("FATAL: failed to open session on defaultdb: %v", err) + } + defer c.CloseSession(ctx) + + fmt.Println("[Phase 1] Connected to defaultdb. Fetching chain tip...") + + // 2. Read latest_block — stored as JSON-encoded uint64 + entry, err := c.Get(ctx, []byte("latest_block")) + if err != nil { + log.Fatalf("FATAL: failed to get latest_block key: %v", err) + } + + var latestBlock uint64 + if err := json.Unmarshal(entry.Value, &latestBlock); err != nil { + log.Fatalf("FATAL: failed to parse latest_block as uint64: %v", err) + } + + fmt.Printf("[Phase 1] Chain tip = block %d. Replaying 0 → %d...\n", latestBlock, latestBlock) + + // address (checksummed hex) → stats + stats := make(map[string]*AccountStats, 512) + totalTxSeen := 0 + blocksWithErrors := 0 + + // 3. Sequential block replay — O(N blocks) + for i := uint64(0); i <= latestBlock; i++ { + blockKey := fmt.Sprintf("%s%d", DB_OPs.PREFIX_BLOCK, i) + bEntry, err := c.Get(ctx, []byte(blockKey)) + if err != nil { + // Don't fatal — log and continue so we get partial data on sparse chains. + log.Printf("WARN: block %d not found (key=%s): %v — skipping", i, blockKey, err) + blocksWithErrors++ + continue + } + + var block config.ZKBlock + if err := json.Unmarshal(bEntry.Value, &block); err != nil { + log.Printf("WARN: failed to unmarshal block %d: %v — skipping", i, err) + blocksWithErrors++ + continue + } + + for _, tx := range block.Transactions { + if tx.From == nil { + // Contract-creation submitted without a sender — skip. + continue + } + + // common.Address.Hex() → "0xAbCd..." (EIP-55 checksum). + // This is the same format used by storeAccount via fmt.Sprintf("%s%s", Prefix, addr). + sender := tx.From.Hex() + + s, exists := stats[sender] + if !exists { + s = &AccountStats{} + stats[sender] = s + } + + s.TxCount++ + totalTxSeen++ + + // Track the highest nonce seen across all transactions for this address. + // We initialise MaxNonce to 0 and MaxNonceSet to false so that nonce=0 + // is correctly handled on the very first tx. + if !s.MaxNonceSet || tx.Nonce > s.MaxNonce { + s.MaxNonce = tx.Nonce + s.MaxNonceSet = true + } + } + + if i > 0 && i%1000 == 0 { + fmt.Printf(" ... replayed through block %d / %d (%d txs so far)\n", i, latestBlock, totalTxSeen) + } + } + + fmt.Printf("[Phase 1] Replay complete. Blocks processed: %d, skipped: %d\n", latestBlock+1-uint64(blocksWithErrors), blocksWithErrors) + fmt.Printf("[Phase 1] Total transactions: %d | Active addresses: %d\n", totalTxSeen, len(stats)) + + // 4. Write JSON + fileBytes, err := json.MarshalIndent(stats, "", " ") + if err != nil { + log.Fatalf("FATAL: failed to marshal stats to JSON: %v", err) + } + if err := os.WriteFile(*outputFile, fileBytes, 0644); err != nil { + log.Fatalf("FATAL: failed to write %s: %v", *outputFile, err) + } + + fmt.Printf("[Phase 1] Exported → %s\n", *outputFile) + fmt.Println("[Phase 1] Please REVIEW the file before running patch_accounts.go (Phase 2).") +} diff --git a/Security/Security.go b/Security/Security.go index 174147b6..0eb456b5 100644 --- a/Security/Security.go +++ b/Security/Security.go @@ -513,47 +513,37 @@ func allChecksWithConn(tx *config.Transaction, security_cache *SecurityCache, ma // ------------------------------------------------------------ // 6. Nonce validation (USING CACHE) _, nonceSpan := tracer.Start(spanCtx, "Security.allChecksWithCache.validateNonce") - hasDuplicate, latestNonce, hasAnyTransactions, err := DB_OPs.CheckNonceAndGetLatest(mainDBConn, tx.From, tx.Nonce) - if err != nil { + + account := security_cache.GetAccount(*tx.From) + if account == nil { + err := errors.New("sender account not found in cache") nonceSpan.RecordError(err) nonceSpan.End() span.RecordError(err) - logger().Error(spanCtx, "Failed to check nonce", err, + logger().Error(spanCtx, "Failed to get account for nonce check", err, ion.String("function", "Security.allChecksWithCache")) - return false, fmt.Errorf("nonce check failed with error: %w", err) + return false, err } + expectedNonce := account.TxNonce nonceSpan.SetAttributes( - attribute.Bool("has_duplicate", hasDuplicate), - attribute.Int64("latest_nonce", int64(latestNonce)), + attribute.Int64("expected_nonce", int64(expectedNonce)), + attribute.Int64("submitted_nonce", int64(tx.Nonce)), ) - if hasDuplicate { - err := fmt.Errorf("transaction with same nonce already exists") + if tx.Nonce < expectedNonce { + err := fmt.Errorf("submitted nonce %d is too low, expected >= %d", tx.Nonce, expectedNonce) nonceSpan.RecordError(err) nonceSpan.End() span.RecordError(err) - logger().Error(spanCtx, "Duplicate nonce detected", err, + logger().Error(spanCtx, "Nonce is too low or duplicate", err, ion.String("function", "Security.allChecksWithCache")) return false, err } - var minAllowedNonce uint64 - if !hasAnyTransactions { - minAllowedNonce = 0 - } else { - minAllowedNonce = latestNonce + 1 - } + // Update cache so subsequent transactions from same sender see incremented nonce + security_cache.UpdateTxNonce(*tx.From, tx.Nonce+1) - if tx.Nonce < minAllowedNonce { - err := fmt.Errorf("submitted nonce %d is too low, must be >= %d", tx.Nonce, minAllowedNonce) - nonceSpan.RecordError(err) - nonceSpan.End() - span.RecordError(err) - logger().Error(spanCtx, "Nonce is too low", err, - ion.String("function", "Security.allChecksWithCache")) - return false, err - } nonceSpan.End() duration := time.Since(startTime).Seconds() diff --git a/Security/security_cache.go b/Security/security_cache.go index d17d514a..e1235a43 100644 --- a/Security/security_cache.go +++ b/Security/security_cache.go @@ -83,21 +83,21 @@ func (s *SecurityCache) SubBalance(address common.Address, wei *big.Int) { } } -func (s *SecurityCache) UpdateNonce(address common.Address, newNonce uint64) { +func (s *SecurityCache) UpdateTxNonce(address common.Address, newNonce uint64) { s.mu.Lock() defer s.mu.Unlock() account := s.accounts[address.Hex()] if account != nil { - account.Nonce = newNonce + account.TxNonce = newNonce } } -func (s *SecurityCache) GetNonce(address common.Address) uint64 { +func (s *SecurityCache) GetTxNonce(address common.Address) uint64 { s.mu.RLock() defer s.mu.RUnlock() account := s.accounts[address.Hex()] if account != nil { - return account.Nonce + return account.TxNonce } return 0 } diff --git a/gETH/Facade/Service/Service.go b/gETH/Facade/Service/Service.go index 6c8d7bb4..095e23b5 100644 --- a/gETH/Facade/Service/Service.go +++ b/gETH/Facade/Service/Service.go @@ -151,36 +151,28 @@ func (s *ServiceImpl) Balance(ctx context.Context, addr string, block *big.Int, if err != nil { fmt.Printf("DEBUG: GetAccount error: %v\n", err) fmt.Printf("DEBUG: Error type: %T\n", err) - // If account not found, create a new account with zero balance if strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "does not exist") { - // Convert address to common.Address using case-insensitive conversion - address := Utils.ConvertAddressCaseInsensitive(addr) - - // Create new account with zero balance - // We need to provide a DID address, so we'll use the address as DID for now - didAddress := fmt.Sprintf("%s%s:%s", DB_OPs.DIDPrefix, network, address.Hex()) - - // Create the Utils.DIDDoc - didDoc := Utils.DIDDoc{ - Address: address, + // Auto-create and propagate the account + didAddress := fmt.Sprintf("%s%s:%s", DB_OPs.DIDPrefix, "jmdn", convertedAddr.Hex()) + doc := Utils.DIDDoc{ + Address: convertedAddr, DIDAddress: didAddress, Metadata: nil, } - - // Create the account and propagate the DID - if err := Utils.CreateAccountandPropagateDID(didDoc); err != nil { - if logErr := Logger.LogData(opCtx, fmt.Sprintf("Balance failed to create account and propagate DID: %v", err), "Balance", -1); logErr != nil { - fmt.Printf("Failed to log Balance account creation and propagation error: %v\n", logErr) + if createErr := Utils.CreateAccountandPropagateDID(doc); createErr != nil { + if logErr := Logger.LogData(opCtx, fmt.Sprintf("Failed to auto-create and propagate DID %s: %v", convertedAddr.Hex(), createErr), "Balance", -1); logErr != nil { + fmt.Printf("Failed to log Balance error: %v\n", logErr) + } + } else { + if logErr := Logger.LogData(opCtx, fmt.Sprintf("Auto-created and propagated DID %s via eth_getBalance", convertedAddr.Hex()), "Balance", 1); logErr != nil { + fmt.Printf("Failed to log Balance success: %v\n", logErr) } - return nil, err } - // Log account creation - if logErr := Logger.LogData(opCtx, fmt.Sprintf("Balance created new account for address: %s", addr), "Balance", 1); logErr != nil { - fmt.Printf("Failed to log Balance account creation: %v\n", logErr) + // Log and return zero balance without writing to database + if logErr := Logger.LogData(opCtx, fmt.Sprintf("Balance returned zero for non-existent address: %s", addr), "Balance", 1); logErr != nil { + fmt.Printf("Failed to log Balance success: %v\n", logErr) } - - // Return zero balance for new account return big.NewInt(0), nil } diff --git a/go.mod b/go.mod index eb14307a..62ec747f 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module gossipnode go 1.25.0 require ( - github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260601052219-40e74741de7c + github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260604113915-c1470ecc039d github.com/JupiterMetaLabs/JMDN_Merkletree v0.0.0-20260413092720-b819e61566f8 github.com/JupiterMetaLabs/goroutine-orchestrator v0.1.5 github.com/JupiterMetaLabs/ion v0.4.2 diff --git a/go.sum b/go.sum index 51b870f2..e5c2b4b7 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260601052219-40e74741de7c h1:2Kgkf8pb/FEkLllenyy48GsHda4501EvwHOSdEXabNY= -github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260601052219-40e74741de7c/go.mod h1:0erT7gGH4TYtitRik+Y3GfxSa5KGLacr9rJovV3vNB0= +github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260604113915-c1470ecc039d h1:DQ+APreEZ1rJtcYlj3ZOz4h4F1frZnYBupQkhD06SUQ= +github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260604113915-c1470ecc039d/go.mod h1:0erT7gGH4TYtitRik+Y3GfxSa5KGLacr9rJovV3vNB0= github.com/JupiterMetaLabs/JMDN_Merkletree v0.0.0-20260413092720-b819e61566f8 h1:yPrYb6g6NnqGsiCVqMf0zndEYTuelL3B03Fee+utLWA= github.com/JupiterMetaLabs/JMDN_Merkletree v0.0.0-20260413092720-b819e61566f8/go.mod h1:zM8F31G2SiPXzTo1WzbDFZ5iOOAkqrkuZjS0QVDW4ew= github.com/JupiterMetaLabs/goroutine-orchestrator v0.1.5 h1:S9+s6JeWSrGJ6ooYb4f8iRlJxwPUZ8X/EA4EgxKS3zc= diff --git a/messaging/BlockProcessing/Processing.go b/messaging/BlockProcessing/Processing.go index 8e132417..8d58f49f 100644 --- a/messaging/BlockProcessing/Processing.go +++ b/messaging/BlockProcessing/Processing.go @@ -172,7 +172,7 @@ func ProcessBlockTransactions(logger_ctx context.Context, block *config.ZKBlock, } // Process the transaction with span context - Process_err := processTransaction(span_ctx, tx, *block.CoinbaseAddr, *block.ZKVMAddr, accountsClient) + Process_err := processTransaction(span_ctx, tx, *block.CoinbaseAddr, *block.ZKVMAddr, accountsClient, block.Timestamp) if Process_err != nil { // ATOMICITY: If any transaction fails, roll back ALL affected accounts span.RecordError(Process_err) @@ -190,7 +190,7 @@ func ProcessBlockTransactions(logger_ctx context.Context, block *config.ZKBlock, ) // Rollback all balances to original state - rollbackError := rollbackBalances(span_ctx, originalBalances, accountsClient) + rollbackError := rollbackBalances(span_ctx, originalBalances, accountsClient, block.Timestamp) if rollbackError != nil { span.RecordError(rollbackError) logger().NamedLogger.Error(span_ctx, "Failed to rollback balances after transaction failure", @@ -259,7 +259,7 @@ func ProcessBlockTransactions(logger_ctx context.Context, block *config.ZKBlock, ion.String("function", "BlockProcessing.ProcessBlockTransactions"), ) // Rollback balances since transaction marking failed - rollbackBalances(span_ctx, originalBalances, accountsClient) + rollbackBalances(span_ctx, originalBalances, accountsClient, block.Timestamp) // Clean up processing markers (they weren't committed due to transaction failure) for _, txHash := range successfullyProcessedTxs { cleanupProcessingMarkers(span_ctx, accountsClient, txHash) @@ -355,7 +355,7 @@ func cleanupProcessingMarkers(span_ctx context.Context, accountsClient *config.P } // rollbackBalances restores original balances for all affected DIDs -func rollbackBalances(span_ctx context.Context, originalBalances map[common.Address]string, accountsClient *config.PooledConnection) error { +func rollbackBalances(span_ctx context.Context, originalBalances map[common.Address]string, accountsClient *config.PooledConnection, blockTimestamp int64) error { rollbackSpanCtx, rollbackSpan := logger().NamedLogger.Tracer("BlockProcessing").Start(span_ctx, "BlockProcessing.rollbackBalances") defer rollbackSpan.End() @@ -405,7 +405,7 @@ func rollbackBalances(span_ctx context.Context, originalBalances map[common.Addr } // ProcessTransaction handles a single transaction's balance updates -func processTransaction(span_ctx context.Context, tx config.Transaction, coinbaseAddr common.Address, zkvmAddr common.Address, accountsClient *config.PooledConnection) error { +func processTransaction(span_ctx context.Context, tx config.Transaction, coinbaseAddr common.Address, zkvmAddr common.Address, accountsClient *config.PooledConnection, blockTimestamp int64) error { // Record trace span and close it txSpanCtx, txSpan := logger().NamedLogger.Tracer("BlockProcessing").Start(span_ctx, "BlockProcessing.processTransaction") defer txSpan.End() @@ -655,7 +655,7 @@ func processTransaction(span_ctx context.Context, tx config.Transaction, coinbas } // 1. Deduct from sender - if err := deductFromSender(txSpanCtx, *tx.From, totalDeduction.String(), accountsClient); err != nil { + if err := deductFromSender(txSpanCtx, &tx, totalDeduction.String(), accountsClient, blockTimestamp); err != nil { txSpan.RecordError(err) txSpan.SetAttributes(attribute.String("status", "deduction_failed"), attribute.String("failed_step", "deduct_from_sender")) cleanupProcessingMarkers(txSpanCtx, accountsClient, tx.Hash.String()) @@ -676,7 +676,7 @@ func processTransaction(span_ctx context.Context, tx config.Transaction, coinbas txSpan.SetAttributes(attribute.String("deduction_step", "completed")) // 2. Add amount to recipient - if err := addToRecipient(txSpanCtx, *tx.To, parsedTx.ValueBig.String(), accountsClient); err != nil { + if err := addToRecipient(txSpanCtx, *tx.To, parsedTx.ValueBig.String(), accountsClient, blockTimestamp); err != nil { // Rollback sender deduction on failure txSpan.RecordError(err) txSpan.SetAttributes(attribute.String("status", "recipient_add_failed"), attribute.String("failed_step", "add_to_recipient")) @@ -710,13 +710,13 @@ func processTransaction(span_ctx context.Context, tx config.Transaction, coinbas txSpan.SetAttributes(attribute.String("recipient_add_step", "completed")) // 3. Split gas fee between coinbase and ZKVM - if err := addToRecipient(txSpanCtx, coinbaseAddr, coinbaseGasFee.String(), accountsClient); err != nil { + if err := addToRecipient(txSpanCtx, coinbaseAddr, coinbaseGasFee.String(), accountsClient, blockTimestamp); err != nil { // Rollback previous operations txSpan.RecordError(err) txSpan.SetAttributes(attribute.String("status", "coinbase_gas_fee_failed"), attribute.String("failed_step", "add_to_coinbase")) rollbackAccounts := []common.Address{*tx.From, *tx.To, coinbaseAddr, zkvmAddr} for _, accounts := range rollbackAccounts { - if rollbackErr := DB_OPs.UpdateAccountBalance(accountsClient, accounts, originalBalances[accounts]); rollbackErr != nil { + if rollbackErr := rollbackBalances(txSpanCtx, originalBalances, accountsClient, blockTimestamp); rollbackErr != nil { txSpan.RecordError(rollbackErr) logger().NamedLogger.Error(txSpanCtx, "Failed to rollback balance", rollbackErr, @@ -744,13 +744,13 @@ func processTransaction(span_ctx context.Context, tx config.Transaction, coinbas txSpan.SetAttributes(attribute.String("coinbase_gas_fee_step", "completed")) - if err := addToRecipient(txSpanCtx, zkvmAddr, zkvmGasFee.String(), accountsClient); err != nil { + if err := addToRecipient(txSpanCtx, zkvmAddr, zkvmGasFee.String(), accountsClient, blockTimestamp); err != nil { // Rollback previous operations txSpan.RecordError(err) txSpan.SetAttributes(attribute.String("status", "zkvm_gas_fee_failed"), attribute.String("failed_step", "add_to_zkvm")) rollbackAccounts := []common.Address{*tx.From, *tx.To, coinbaseAddr, zkvmAddr} for _, accounts := range rollbackAccounts { - if rollbackErr := DB_OPs.UpdateAccountBalance(accountsClient, accounts, originalBalances[accounts]); rollbackErr != nil { + if rollbackErr := rollbackBalances(txSpanCtx, originalBalances, accountsClient, blockTimestamp); rollbackErr != nil { txSpan.RecordError(rollbackErr) logger().NamedLogger.Error(txSpanCtx, "Failed to rollback balance", rollbackErr, @@ -907,7 +907,8 @@ func parseTransaction(tx config.Transaction) (*config.ParsedZKTransaction, error } // deductFromSender deducts an amount from a sender's DID account -func deductFromSender(span_ctx context.Context, fromDID common.Address, amount string, accountsClient *config.PooledConnection) error { +func deductFromSender(span_ctx context.Context, tx *config.Transaction, amount string, accountsClient *config.PooledConnection, blockTimestamp int64) error { + fromDID := *tx.From // Get the current DID document using the provided accounts client didDoc, err := DB_OPs.GetAccount(accountsClient, fromDID) if err != nil { @@ -920,6 +921,11 @@ func deductFromSender(span_ctx context.Context, fromDID common.Address, amount s return fmt.Errorf("invalid balance format for DID %s: %s", fromDID, didDoc.Balance) } + // Foolproof execution-time nonce check (prevents same-block replay attacks) + if tx.Nonce < didDoc.TxNonce { + return fmt.Errorf("execution rejected: submitted nonce %d is lower than account's current DB nonce %d (possible same-block replay attack)", tx.Nonce, didDoc.TxNonce) + } + // Parse amount to deduct deductAmount, ok := new(big.Int).SetString(amount, 10) if !ok { @@ -935,16 +941,17 @@ func deductFromSender(span_ctx context.Context, fromDID common.Address, amount s // Calculate new balance newBalance := new(big.Int).Sub(currentBalance, deductAmount) - // Update the balance in the database using the provided accounts client - if err := DB_OPs.UpdateAccountBalance(accountsClient, fromDID, newBalance.String()); err != nil { - return fmt.Errorf("failed to update sender balance: %w", err) + // Update the balance, tx count, and nonce in the database using the provided accounts client + if err := DB_OPs.UpdateAccountSenderState(accountsClient, fromDID, newBalance.String(), tx.Nonce+1); err != nil { + return fmt.Errorf("failed to update sender balance and state: %w", err) } - logger().NamedLogger.Debug(span_ctx, "Deducted amount from sender", + logger().NamedLogger.Debug(span_ctx, "Deducted amount from sender and updated state", ion.String("account", fromDID.String()), ion.String("amount", amount), ion.String("old_balance", currentBalance.String()), ion.String("new_balance", newBalance.String()), + ion.Uint64("new_nonce", tx.Nonce+1), ion.String("created_at", time.Now().UTC().Format(time.RFC3339)), ion.String("topic", TOPIC), ion.String("function", "BlockProcessing.deductFromSender"), @@ -954,12 +961,11 @@ func deductFromSender(span_ctx context.Context, fromDID common.Address, amount s } // addToRecipient adds an amount to a recipient's DID account -func addToRecipient(span_ctx context.Context, ToAddress common.Address, amount string, accountsClient *config.PooledConnection) error { +func addToRecipient(span_ctx context.Context, ToAddress common.Address, amount string, accountsClient *config.PooledConnection, blockTimestamp int64) error { // Get the current DID document using the provided accounts client didDoc, err := DB_OPs.GetAccount(accountsClient, ToAddress) if err != nil { - // If DID doesn't exist, - return fmt.Errorf("failed to retrieve recipient DID %s: %w", ToAddress, err) + return fmt.Errorf("failed to retrieve recipient DID %s (account must exist before transfer): %w", ToAddress, err) } // Parse current balance diff --git a/messaging/DIDPropagation.go b/messaging/DIDPropagation.go index 4ccad715..217a3edf 100644 --- a/messaging/DIDPropagation.go +++ b/messaging/DIDPropagation.go @@ -150,10 +150,10 @@ func storeAccountInDB(msg DIDMessage) { // UpdatedAt: time.Now().UTC().Unix(), // } - // Store Account document - err := DB_OPs.CreateAccount(client, msg.Account.DIDAddress, msg.Account.Address, nil) + // Store Account document preserving the sender's ART Nonce + err := DB_OPs.StorePropagatedAccount(client, msg.Account) if err != nil { - log.Error().Err(err).Str("Account", msg.Account.DIDAddress).Msg("Failed to store Account in database") + log.Error().Err(err).Str("Account", msg.Account.DIDAddress).Msg("Failed to store Propagated Account in database") return err }