diff --git a/DB_OPs/Nodeinfo/account_sync_enqueue_test.go b/DB_OPs/Nodeinfo/account_sync_enqueue_test.go new file mode 100644 index 00000000..5ed9bf61 --- /dev/null +++ b/DB_OPs/Nodeinfo/account_sync_enqueue_test.go @@ -0,0 +1,136 @@ +// White-box test for the bounded-enqueue chunking logic (enqueueRecordsChunked). +// Lives in package NodeInfo because the helper, the RedisStreamer constants, and the +// payload-type tags are unexported. No live Redis/ImmuDB needed — a recording mock +// streamer captures every XADD so we can assert chunk boundaries. +// +// NOTE: craftcode Phase 6 prefers tests under a tests/ tree; Go package-internal +// visibility forces this same-dir _test.go. Matches the repo convention in +// DB_OPs/sqlops/sqlops_test.go. +package NodeInfo + +import ( + "context" + "encoding/json" + "errors" + "testing" + "time" +) + +// recordingStreamer captures Enqueue payloads and optionally fails selected chunks. +// Only Enqueue is exercised; the rest satisfy RedisStreamer with inert returns. +type recordingStreamer struct { + messages []map[string]any + calls int + failEach int // if >0, every Nth Enqueue call returns an error +} + +func (r *recordingStreamer) Enqueue(_ context.Context, _ string, values map[string]any) (string, error) { + r.calls++ + if r.failEach > 0 && r.calls%r.failEach == 0 { + return "", errors.New("simulated XADD failure") + } + r.messages = append(r.messages, values) + return "id", nil +} + +func (r *recordingStreamer) EnsureConsumerGroup(context.Context, string, string) error { return nil } +func (r *recordingStreamer) ReadGroup(context.Context, string, string, string, int64, time.Duration) ([]StreamEntry, error) { + return nil, nil +} +func (r *recordingStreamer) Ack(context.Context, string, string, ...string) error { return nil } +func (r *recordingStreamer) Delete(context.Context, string, ...string) error { return nil } +func (r *recordingStreamer) AutoClaim(context.Context, string, string, string, time.Duration, string, int64) ([]StreamEntry, string, error) { + return nil, "0-0", nil +} +func (r *recordingStreamer) Len(context.Context, string) (int64, error) { return 0, nil } +func (r *recordingStreamer) PendingCount(context.Context, string, string) (int64, error) { return 0, nil } + +// decodeCount returns how many records a recorded message's "data" field holds. +func decodeCount(t *testing.T, msg map[string]any) int { + t.Helper() + data, ok := msg["data"].(string) + if !ok { + t.Fatalf("message missing string data field: %#v", msg) + } + var recs []json.RawMessage + if err := json.Unmarshal([]byte(data), &recs); err != nil { + t.Fatalf("data is not a JSON array: %v", err) + } + return len(recs) +} + +func TestEnqueueRecordsChunked_Boundaries(t *testing.T) { + cases := []struct { + name string + n int + wantMsgs int + }{ + {"empty", 0, 0}, + {"single", 1, 1}, + {"under_one_chunk", 499, 1}, + {"exactly_one_chunk", 500, 1}, + {"one_over", 501, 2}, + {"two_chunks", 1000, 2}, + {"uneven", 2500, 5}, + {"uneven_remainder", 2501, 6}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + items := make([]int, tc.n) + for i := range items { + items[i] = i + } + rs := &recordingStreamer{} + err := enqueueRecordsChunked(context.Background(), rs, payloadTypeAccounts, items) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(rs.messages) != tc.wantMsgs { + t.Fatalf("message count = %d, want %d", len(rs.messages), tc.wantMsgs) + } + total := 0 + for _, msg := range rs.messages { + if tag, _ := msg["type"].(string); tag != string(payloadTypeAccounts) { + t.Fatalf("type tag = %q, want %q", tag, payloadTypeAccounts) + } + c := decodeCount(t, msg) + if c > maxRecordsPerMessage { + t.Fatalf("chunk holds %d records, exceeds cap %d", c, maxRecordsPerMessage) + } + total += c + } + if total != tc.n { + t.Fatalf("total records across messages = %d, want %d", total, tc.n) + } + }) + } +} + +// TestEnqueueRecordsChunked_BestEffort verifies that a transient failure on one chunk +// does not drop the others: the helper attempts every chunk, returns an aggregated +// error, yet the successful chunks are still enqueued. +func TestEnqueueRecordsChunked_BestEffort(t *testing.T) { + const n = 2500 // 5 chunks of 500 + items := make([]int, n) + rs := &recordingStreamer{failEach: 3} // fail the 3rd Enqueue call + + err := enqueueRecordsChunked(context.Background(), rs, payloadTypeAccounts, items) + if err == nil { + t.Fatal("expected aggregated error from failed chunk, got nil") + } + if rs.calls != 5 { + t.Fatalf("Enqueue attempted %d times, want 5 (all chunks attempted despite failure)", rs.calls) + } + if len(rs.messages) != 4 { + t.Fatalf("recorded %d successful messages, want 4 (one chunk failed)", len(rs.messages)) + } +} + +func TestChunkCount(t *testing.T) { + cases := map[int]int{0: 0, 1: 1, 499: 1, 500: 1, 501: 2, 1000: 2, 2500: 5} + for n, want := range cases { + if got := chunkCount(n); got != want { + t.Errorf("chunkCount(%d) = %d, want %d", n, got, want) + } + } +} diff --git a/DB_OPs/Nodeinfo/account_sync_redis.go b/DB_OPs/Nodeinfo/account_sync_redis.go new file mode 100644 index 00000000..d00edfe1 --- /dev/null +++ b/DB_OPs/Nodeinfo/account_sync_redis.go @@ -0,0 +1,254 @@ +// MODULE: DB_OPs/Nodeinfo/account_sync_redis +// PURPOSE: Define the Redis stream transport abstraction (RedisStreamer interface) and +// adapt *redis.Client to it. Owns zero DB or business logic — pure transport. +// +// CORE DATA STRUCTURES: +// - StreamEntry: ephemeral; one per stream message read. Count per ReadGroup call +// is bounded by AccountSyncWorkerConfig.MaxDrainItems at the call site. +// - pkgAccountStreamer / pkgWorkerManager (package-level): set once by InstallAccountQueue. +// Read by every WriteAccounts / BatchUpdateAccounts call. Never replaced after set. +// +// TO MODIFY BEHAVIOR: +// - Change stream backend: implement RedisStreamer → pass to StartAccountSyncWorker. +// - Change stream key / consumer group name: update constants below; no logic changes. +// - Add a new stream key: define a new constant; add a corresponding Enqueue call in +// immudb_account_manager.go and a new case in processBatch. +// +// DO NOT: +// - Import *redis.Client outside redisStreamerAdapter — it is the only concrete import. +// - Store request-scoped state on redisStreamerAdapter (stateless wrapper by design). +// - Replace pkgAccountStreamer with a per-call parameter — types.AccountManager interface +// signatures are fixed by the external JMDN-FastSync module and cannot be changed. +// +// EXTENSION POINT: new queue backends → implement RedisStreamer; inject via StartAccountSyncWorker. +// +// CHANGE SCENARIOS: +// Swap Redis client lib: rewrite redisStreamerAdapter methods — interface unchanged. +// Add new stream key: add constant + Enqueue call in account_manager — this file unchanged. +// Change group/consumer: edit constants — no logic change required. + +package NodeInfo + +import ( + "context" + "strings" + "sync" + "time" + + "github.com/redis/go-redis/v9" +) + +// ─── Stream constants ───────────────────────────────────────────────────────── + +const ( + // accountSyncStream is the Redis stream key for all account sync payloads. + accountSyncStream = "accountsync:accounts" + // accountSyncGroup is the consumer group name. One group = one logical processor. + accountSyncGroup = "accountsync-workers" + // accountSyncConsumer is the consumer name within the group. Single worker model. + accountSyncConsumer = "worker-0" +) + +// syncPayloadType discriminates between WriteAccounts and BatchUpdateAccounts payloads +// stored in the same stream. +type syncPayloadType string + +const ( + payloadTypeAccounts syncPayloadType = "accounts" // payload: []*types.Account (JSON) + payloadTypeUpdates syncPayloadType = "updates" // payload: []accountUpdateWire (JSON) +) + +// ─── Domain types ───────────────────────────────────────────────────────────── + +// StreamEntry is a single Redis stream message with its assigned stream ID. +// ID is used for XACK after successful DB write. +// Values contains the raw message fields as returned by go-redis. +type StreamEntry struct { + ID string + Values map[string]any +} + +// ─── RedisStreamer interface ────────────────────────────────────────────────── + +// RedisStreamer is the minimal Redis stream surface required by the account sync worker. +// It uses only domain-level types — no go-redis types leak through the interface. +// The concrete implementation is redisStreamerAdapter (wraps *redis.Client). +// Tests may substitute a mock implementing this interface. +type RedisStreamer interface { + // Enqueue appends a message to the named stream. Returns the assigned message ID. + // Time: O(1) — single XADD round trip. + Enqueue(ctx context.Context, stream string, values map[string]any) (string, error) + + // EnsureConsumerGroup creates the consumer group on the stream, creating the stream + // itself if it does not exist. Idempotent: no-op if the group already exists. + // Time: O(1) — single XGROUP CREATE round trip. + EnsureConsumerGroup(ctx context.Context, stream, group string) error + + // ReadGroup performs a blocking read from the stream under the given consumer group. + // Reads at most count new (undelivered) entries; blocks up to blockDur waiting for data. + // Returns nil, nil on timeout (no data within blockDur). + // Read entries move to the Pending Entries List (PEL) until ACKed. + // Time: O(count) — single XREADGROUP round trip. + ReadGroup(ctx context.Context, stream, group, consumer string, count int64, blockDur time.Duration) ([]StreamEntry, error) + + // Ack acknowledges the given message IDs, removing them from the PEL. + // Only call after the DB write succeeds — unACKed entries are replayed via AutoClaim. + // Time: O(|ids|) — single XACK round trip. + Ack(ctx context.Context, stream, group string, ids ...string) error + + // Delete removes message IDs from the stream body (XDEL), reclaiming memory. + // Call in a pipeline with Ack after every successful DB commit. XACK alone leaves + // the payload resident in the stream; XDEL is required to reclaim that space. + // Time: O(|ids|) — single XDEL round trip. + Delete(ctx context.Context, stream string, ids ...string) error + + // AutoClaim reclaims pending entries that have been idle longer than minIdle. + // start is the minimum PEL cursor ID ("0-0" to scan from the beginning). + // Returns reclaimed entries and the next cursor ID. + // "0-0" as the returned cursor means the full PEL was scanned. + // Time: O(count) — single XAUTOCLAIM round trip. + AutoClaim(ctx context.Context, stream, group, consumer string, minIdle time.Duration, start string, count int64) ([]StreamEntry, string, error) + + // Len returns the total number of messages currently in the stream (XLEN). + // Time: O(1). + Len(ctx context.Context, stream string) (int64, error) + + // PendingCount returns the count of unacked messages in the PEL for the given group. + // Time: O(1) — single XPENDING round trip. + PendingCount(ctx context.Context, stream, group string) (int64, error) +} + +// ─── Concrete adapter ───────────────────────────────────────────────────────── + +// redisStreamerAdapter adapts *redis.Client to the RedisStreamer interface. +// It is the ONLY place in DB_OPs/Nodeinfo that imports a concrete Redis type. +type redisStreamerAdapter struct { + client *redis.Client +} + +// NewRedisStreamer wraps a *redis.Client as a RedisStreamer. +// Construct in main.go and pass the result to StartAccountSyncWorker. +// +// Time: O(1) +func NewRedisStreamer(client *redis.Client) RedisStreamer { + return &redisStreamerAdapter{client: client} +} + +// Time: O(1) — single XADD round trip +func (r *redisStreamerAdapter) Enqueue(ctx context.Context, stream string, values map[string]any) (string, error) { + return r.client.XAdd(ctx, &redis.XAddArgs{ + Stream: stream, + Values: values, + }).Result() +} + +// Time: O(1) — single XGROUP CREATECONSUMER or XGROUP CREATE round trip. +// BUSYGROUP error means the group already exists; treated as success. +func (r *redisStreamerAdapter) EnsureConsumerGroup(ctx context.Context, stream, group string) error { + err := r.client.XGroupCreateMkStream(ctx, stream, group, "0").Err() + if err != nil && !strings.Contains(err.Error(), "BUSYGROUP") { + return err + } + return nil +} + +// Time: O(count) — XREADGROUP COUNT count BLOCK blockDur ms +// Redis.Nil is returned on timeout; mapped to (nil, nil) so callers don't treat it as an error. +func (r *redisStreamerAdapter) ReadGroup(ctx context.Context, stream, group, consumer string, count int64, blockDur time.Duration) ([]StreamEntry, error) { + result, err := r.client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: group, + Consumer: consumer, + Streams: []string{stream, ">"}, + Count: count, + Block: blockDur, + NoAck: false, + }).Result() + if err != nil { + if err == redis.Nil { + return nil, nil // timeout — no data; caller loops + } + return nil, err + } + var entries []StreamEntry + for _, s := range result { + for _, msg := range s.Messages { + entries = append(entries, StreamEntry{ID: msg.ID, Values: msg.Values}) + } + } + return entries, nil +} + +// Time: O(|ids|) — single XACK round trip +func (r *redisStreamerAdapter) Ack(ctx context.Context, stream, group string, ids ...string) error { + return r.client.XAck(ctx, stream, group, ids...).Err() +} + +// Time: O(|ids|) — single XDEL round trip +func (r *redisStreamerAdapter) Delete(ctx context.Context, stream string, ids ...string) error { + if len(ids) == 0 { + return nil + } + return r.client.XDel(ctx, stream, ids...).Err() +} + +// Time: O(count) — single XAUTOCLAIM round trip +// go-redis v9 XAutoClaimCmd.Result() returns ([]XMessage, string, error) — three values. +func (r *redisStreamerAdapter) AutoClaim(ctx context.Context, stream, group, consumer string, minIdle time.Duration, start string, count int64) ([]StreamEntry, string, error) { + messages, next, err := r.client.XAutoClaim(ctx, &redis.XAutoClaimArgs{ + Stream: stream, + Group: group, + Consumer: consumer, + MinIdle: minIdle, + Start: start, + Count: count, + }).Result() + if err != nil { + return nil, "0-0", err + } + var entries []StreamEntry + for _, msg := range messages { + entries = append(entries, StreamEntry{ID: msg.ID, Values: msg.Values}) + } + return entries, next, nil +} + +func (r *redisStreamerAdapter) Len(ctx context.Context, stream string) (int64, error) { + return r.client.XLen(ctx, stream).Result() +} + +func (r *redisStreamerAdapter) PendingCount(ctx context.Context, stream, group string) (int64, error) { + info, err := r.client.XPending(ctx, stream, group).Result() + if err != nil { + return 0, err + } + return info.Count, nil +} + +// ─── Package-level queue singleton ─────────────────────────────────────────── + +// pkgAccountStreamer and pkgWorkerManager are set once by InstallAccountQueue. +// Read by every WriteAccounts / BatchUpdateAccounts call. types.AccountManager +// interface signatures are fixed externally — package-level injection is the only path. +var ( + pkgAccountStreamer RedisStreamer + pkgWorkerManager *WorkerManager + pkgAccountQueueMu sync.RWMutex +) + +// InstallAccountQueue stores the streamer and manager together. +// Called once from StartAccountSyncWorker during node startup. +func InstallAccountQueue(s RedisStreamer, m *WorkerManager) { + pkgAccountQueueMu.Lock() + pkgAccountStreamer = s + pkgWorkerManager = m + pkgAccountQueueMu.Unlock() +} + +// getAccountQueue returns the package-level streamer and worker manager. +// Both are nil if InstallAccountQueue has not yet been called. +// Time: O(1) +func getAccountQueue() (RedisStreamer, *WorkerManager) { + pkgAccountQueueMu.RLock() + defer pkgAccountQueueMu.RUnlock() + return pkgAccountStreamer, pkgWorkerManager +} diff --git a/DB_OPs/Nodeinfo/account_sync_worker.go b/DB_OPs/Nodeinfo/account_sync_worker.go new file mode 100644 index 00000000..e7ac1c53 --- /dev/null +++ b/DB_OPs/Nodeinfo/account_sync_worker.go @@ -0,0 +1,475 @@ +// MODULE: DB_OPs/Nodeinfo/account_sync_worker +// PURPOSE: Drain the accountsync Redis stream and write account batches to ImmuDB. +// Owns the at-least-once delivery contract: ACK only after successful DB write. +// +// CORE DATA STRUCTURES: +// - []StreamEntry: ephemeral per runWorker iteration. +// Bounded by AccountSyncWorkerConfig.MaxDrainItems (default 100). +// - []dbEntry: ephemeral per processBatch call. +// Bounded by MaxDrainItems × maxRecordsPerMessage (producer caps each message at +// maxRecordsPerMessage records — see immudb_account_manager.go). DID refs may add +// up to one extra entry per account. +// Sub-batched into chunks of MaxAccountsPerBatch before each BatchRestoreAccounts call. +// - PEL (Redis-side, not in-process): unacked entries in flight. +// Evicted by AutoClaim after PendingIdleTimeout; no in-process growth. +// +// TO MODIFY BEHAVIOR: +// - Tuning (batch size, timeouts): change AccountSyncWorkerConfig fields — no code change. +// - Add new payload type: add case in processBatch switch + enqueue helper in +// immudb_account_manager.go. This file changes only at the switch statement. +// - Change DB write path: edit processBatch — impacts ACK semantics and batch split. +// +// DO NOT: +// - Start this worker from a constructor. StartAccountSyncWorker is the only entry point. +// - ACK entries before BatchRestoreAccounts succeeds — breaks at-least-once guarantee. +// - Acquire the DB connection via GetAccountConnectionandPutBack — its auto-return +// goroutine fires on the scoped ctx deadline and can recycle the connection mid-write +// (data race). Use GetAccountsConnections + defer PutAccountsConnection, and thread the +// scoped writeCtx into BatchRestoreAccounts so the deadline bounds the DB ops directly. +// - Replace []dbEntry with a map — sequential append + slice-of-chunks is the right +// access pattern for BatchRestoreAccounts (ordered, fixed-size sub-batches). +// +// EXTENSION POINT: new payload types → add case in processBatch switch; add parse helper. +// +// CHANGE SCENARIOS: +// Add payload type: add case in processBatch switch + parse helper + enqueue in account_manager +// Change batch limits: edit DefaultWorkerConfig or pass custom AccountSyncWorkerConfig +// Change DB write: edit processBatch; ACK block is the only invariant that must not move + +package NodeInfo + +import ( + "context" + "encoding/json" + "fmt" + "log" + "math/big" + "sync/atomic" + "time" + + "github.com/JupiterMetaLabs/JMDN-FastSync/common/types" + "github.com/ethereum/go-ethereum/common" + "gossipnode/DB_OPs" +) + +// ─── dbEntry type alias ─────────────────────────────────────────────────────── + +// dbEntry is a type alias for the anonymous struct expected by DB_OPs.BatchRestoreAccounts. +// Using a type alias (=) ensures []dbEntry is assignment-compatible with the parameter type +// without a conversion loop. Access pattern: sequential append, read-once for sub-batching. +// Growth bound: MaxDrainItems × avg-accounts-per-payload (ephemeral per processBatch call). +type dbEntry = struct { + Key string + Value []byte +} + +// ─── Wire type for BatchUpdateAccounts payloads ─────────────────────────────── + +// accountUpdateWire is the stable JSON representation of types.AccountUpdate used +// in the stream payload. Explicit wire type prevents big.Int JSON serialization +// surprises (math/big.Int marshals as a quoted decimal string, but that behaviour +// is implementation-defined and not guaranteed across versions). +// +// Stored in the stream as: {"address":"0x...","new_balance":"1000000","nonce":42} +type accountUpdateWire struct { + Address string `json:"address"` + NewBalance string `json:"new_balance"` // decimal string from big.Int.String() + Nonce uint64 `json:"nonce"` +} + +// ─── Configuration ──────────────────────────────────────────────────────────── + +// AccountSyncWorkerConfig holds tuning parameters for the account sync worker. +// All fields have safe production defaults; use DefaultWorkerConfig() to get them. +type AccountSyncWorkerConfig struct { + // MaxDrainItems is the maximum number of stream entries read per XREADGROUP call. + // Higher values coalesce more work per ImmuDB commit but increase per-batch memory. + // Default: 100. + MaxDrainItems int64 + + // MaxAccountsPerBatch is the maximum number of accounts per single BatchRestoreAccounts call. + // Prevents oversized ImmuDB writes. If a coalesced batch exceeds this, it is split into chunks. + // Default: 500. + MaxAccountsPerBatch int + + // BlockTimeout is the XREADGROUP BLOCK duration. + // The worker goroutine sleeps inside Redis until data arrives or this duration elapses. + // Must be short enough to allow clean ctx cancellation. Default: 5s. + BlockTimeout time.Duration + + // PendingIdleTimeout is the minimum idle duration before XAUTOCLAIM reclaims a PEL entry. + // Entries stuck in the PEL longer than this (due to worker crash/restart) are replayed. + // Must exceed the worst-case BatchRestoreAccounts latency to avoid spurious reclaims. + // Default: 30s. + PendingIdleTimeout time.Duration + + // DBWriteTimeout bounds each GetAccountConnectionandPutBack + BatchRestoreAccounts call. + // Must exceed the observed worst-case ImmuDB commit latency (~15 s). Default: 60s. + DBWriteTimeout time.Duration +} + +// DefaultWorkerConfig returns production-tuned defaults. +// Time: O(1) +func DefaultWorkerConfig() AccountSyncWorkerConfig { + return AccountSyncWorkerConfig{ + MaxDrainItems: 100, + MaxAccountsPerBatch: 500, + BlockTimeout: 30 * time.Second, + PendingIdleTimeout: 30 * time.Second, + DBWriteTimeout: 60 * time.Second, + } +} + +// ─── WorkerManager — atomic lifecycle ──────────────────────────────────────── + +// WorkerManager manages the drain goroutine lifecycle with lock-free atomics. +// The worker starts lazily on the first WriteAccounts call and shuts down after +// BlockTimeout of idle time. Producers restart it automatically via EnsureActive. +type WorkerManager struct { + isOnline atomic.Bool // true = drain goroutine is running + resetInflight atomic.Bool // true = a lastActivity-reset goroutine is in flight + lastActivity atomic.Int64 // UnixNano — last successful commit or explicit reset + + streamer RedisStreamer + cfg AccountSyncWorkerConfig +} + +// EnsureActive is called by WriteAccounts before every XADD. +// If the worker is offline it wins a CAS to start it; if it is near its idle +// deadline it wins a CAS to extend lastActivity. Always returns immediately. +// Hot-path cost (online + healthy): two atomic loads + subtract + compare ≈ single-digit ns. +func (wm *WorkerManager) EnsureActive() { + if !wm.isOnline.Load() { + if wm.isOnline.CompareAndSwap(false, true) { + wm.lastActivity.Store(time.Now().UnixNano()) + log.Printf("[accountqueue] worker offline — restarting") + go wm.runWorker() + } + // CAS loss = another caller already claimed the spawn; worker is starting. + return + } + + // Online — check remaining idle budget. Refresh if under 50%. + elapsed := time.Since(time.Unix(0, wm.lastActivity.Load())) + if wm.cfg.BlockTimeout-elapsed < wm.cfg.BlockTimeout/2 { + if wm.resetInflight.CompareAndSwap(false, true) { + go func() { + defer wm.resetInflight.Store(false) + wm.lastActivity.Store(time.Now().UnixNano()) + }() + } + } +} + +// ─── Lifecycle ──────────────────────────────────────────────────────────────── + +// StartAccountSyncWorker creates a WorkerManager, installs it as the package-level +// queue, and returns. The drain goroutine starts lazily on the first WriteAccounts call. +// +// MUST be called exactly once from main.go before any WriteAccounts or BatchUpdateAccounts. +// If not called, both methods log an error and skip the enqueue (no write occurs). +// +// Time: O(1) — no Redis round trip; EnsureConsumerGroup is deferred to the first runWorker call. +func StartAccountSyncWorker(streamer RedisStreamer, cfg AccountSyncWorkerConfig) *WorkerManager { + m := &WorkerManager{streamer: streamer, cfg: cfg} + InstallAccountQueue(streamer, m) + return m +} + +// ─── Worker loop ───────────────────────────────────────────────────────────── + +// runWorker is the drain loop running as a method on WorkerManager. +// It exits when BlockTimeout elapses with no data AND lastActivity is stale. +// defer sets isOnline=false so even a panic marks the worker offline. +func (wm *WorkerManager) runWorker() { + defer wm.isOnline.Store(false) + log.Printf("[accountqueue] worker started (stream=%s group=%s consumer=%s)", + accountSyncStream, accountSyncGroup, accountSyncConsumer) + defer log.Printf("[accountqueue] worker stopped") + + if err := wm.streamer.EnsureConsumerGroup(context.Background(), accountSyncStream, accountSyncGroup); err != nil { + log.Printf("[accountqueue] ERROR: EnsureConsumerGroup: %v — worker exiting", err) + return + } + + // Reclaim any entries left unACKed by a prior worker run. + if err := reclaimPending(wm.streamer, wm.cfg); err != nil { + log.Printf("[accountqueue] WARN: startup reclaimPending error: %v", err) + } + + for { + entries, err := wm.streamer.ReadGroup( + context.Background(), + accountSyncStream, accountSyncGroup, accountSyncConsumer, + wm.cfg.MaxDrainItems, + wm.cfg.BlockTimeout, + ) + if err != nil { + log.Printf("[accountqueue] ReadGroup error: %v — retrying in 1s", err) + time.Sleep(time.Second) + continue + } + if entries == nil { + // BlockTimeout elapsed with no data — check idle window. + if time.Since(time.Unix(0, wm.lastActivity.Load())) >= wm.cfg.BlockTimeout { + log.Printf("[accountqueue] worker idle for %s — going offline", wm.cfg.BlockTimeout) + return + } + // lastActivity was refreshed by a concurrent EnsureActive reset; keep going. + continue + } + + if err := processBatch(wm.streamer, entries, wm.cfg); err != nil { + // Do NOT ACK. Entries remain in PEL and are replayed by reclaimPending on next start. + // BatchRestoreAccounts is LWW-idempotent — replays are safe. + log.Printf("[accountqueue] processBatch error: %v — %d entries remain in PEL for retry", + err, len(entries)) + } else { + wm.lastActivity.Store(time.Now().UnixNano()) + } + } +} + +// reclaimPending reclaims and processes all PEL entries whose idle time exceeds +// cfg.PendingIdleTimeout. Called once on worker startup to replay entries left +// unACKed by a previous crash. +// +// Iterates via cursor until the full PEL is scanned ("0-0" returned as next cursor). +// Each DB op uses context.Background() with cfg.DBWriteTimeout — no external cancellation. +// +// Time: O(PEL size / MaxDrainItems) XAUTOCLAIM round trips + processBatch cost per page. +func reclaimPending(s RedisStreamer, cfg AccountSyncWorkerConfig) error { + cursor := "0-0" + for { + entries, next, err := s.AutoClaim( + context.Background(), + accountSyncStream, accountSyncGroup, accountSyncConsumer, + cfg.PendingIdleTimeout, + cursor, + cfg.MaxDrainItems, + ) + if err != nil { + return fmt.Errorf("XAUTOCLAIM cursor=%s: %w", cursor, err) + } + + if len(entries) > 0 { + log.Printf("[accountqueue] reclaiming %d pending entries (cursor=%s)", len(entries), cursor) + if err := processBatch(s, entries, cfg); err != nil { + return fmt.Errorf("process reclaimed entries at cursor=%s: %w", cursor, err) + } + } + + // "0-0" means the full PEL was scanned — no more pending entries. + if next == "0-0" || next == "" { + break + } + cursor = next + } + return nil +} + +// ─── Batch processor ───────────────────────────────────────────────────────── + +// processBatch deserializes all stream entries, merges their accounts into a flat +// list, writes to ImmuDB in sub-batches of MaxAccountsPerBatch, and ACKs all +// entries only after every sub-batch succeeds. +// +// Poison pill handling: entries with undecodable payloads (parse error or unknown type) +// are ACKed immediately and discarded. They will never succeed and must not block the queue. +// +// At-least-once guarantee: +// - goodIDs are ACKed only after BatchRestoreAccounts succeeds for all chunks. +// - If any chunk fails, goodIDs are not ACKed → entries stay in PEL → replayed on restart. +// - Replay safety: BatchRestoreAccounts uses LWW (UpdatedAt timestamp) — duplicate writes +// overwrite with the same data and do not corrupt state. +// +// Time: O(N/MaxAccountsPerBatch) BatchRestoreAccounts round trips, where N = total accounts. +// Space: O(N) — ephemeral []dbEntry freed after ACK. +func processBatch(s RedisStreamer, entries []StreamEntry, cfg AccountSyncWorkerConfig) error { + var ( + writeEntries []dbEntry // accounts to persist to ImmuDB + goodIDs []string // stream IDs to ACK+XDEL after successful DB write + poisonIDs []string // stream IDs to ACK+XDEL immediately (unrecoverable) + ) + + for _, entry := range entries { + payloadType, _ := entry.Values["type"].(string) + dataStr, _ := entry.Values["data"].(string) + + switch syncPayloadType(payloadType) { + case payloadTypeAccounts: + parsed, err := parseAccountsPayload(dataStr) + if err != nil { + log.Printf("[accountqueue] WARN: poison pill — undecodable accounts entry %s: %v", entry.ID, err) + poisonIDs = append(poisonIDs, entry.ID) + continue + } + writeEntries = append(writeEntries, parsed...) + goodIDs = append(goodIDs, entry.ID) + + case payloadTypeUpdates: + parsed, err := parseUpdatesPayload(dataStr) + if err != nil { + log.Printf("[accountqueue] WARN: poison pill — undecodable updates entry %s: %v", entry.ID, err) + poisonIDs = append(poisonIDs, entry.ID) + continue + } + writeEntries = append(writeEntries, parsed...) + goodIDs = append(goodIDs, entry.ID) + + default: + log.Printf("[accountqueue] WARN: poison pill — unknown payload type %q in entry %s", payloadType, entry.ID) + poisonIDs = append(poisonIDs, entry.ID) + } + } + + // ACK + XDEL poison pills immediately — unrecoverable, must not block the PEL. + if len(poisonIDs) > 0 { + ackCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if err := s.Ack(ackCtx, accountSyncStream, accountSyncGroup, poisonIDs...); err != nil { + log.Printf("[accountqueue] WARN: failed to ACK %d poison pills: %v", len(poisonIDs), err) + } else if err := s.Delete(ackCtx, accountSyncStream, poisonIDs...); err != nil { + log.Printf("[accountqueue] WARN: failed to XDEL %d poison pills: %v", len(poisonIDs), err) + } + cancel() + } + + if len(writeEntries) == 0 { + return nil + } + + // Scope a timeout to this DB write. writeCtx bounds connection acquisition AND + // (threaded into BatchRestoreAccounts) every GetAll/ExecAll inside the write. + writeCtx, writeCancel := context.WithTimeout(context.Background(), cfg.DBWriteTimeout) + defer writeCancel() + + // Acquire explicitly and return on processBatch exit — NOT via + // GetAccountConnectionandPutBack. That helper's auto-return goroutine fires when + // writeCtx hits its deadline, which can recycle the connection back into the pool + // while a multi-chunk BatchRestoreAccounts is still issuing gRPC on it (data race). + conn, err := DB_OPs.GetAccountsConnections(writeCtx) + if err != nil { + return fmt.Errorf("get account DB connection: %w", err) + } + defer DB_OPs.PutAccountsConnection(conn) + + // Write in sub-batches to bound individual ImmuDB commit size. + // All chunks must succeed before any ACK is issued. + start := time.Now() + for i := 0; i < len(writeEntries); i += cfg.MaxAccountsPerBatch { + end := i + cfg.MaxAccountsPerBatch + if end > len(writeEntries) { + end = len(writeEntries) + } + if err := DB_OPs.BatchRestoreAccounts(writeCtx, conn, writeEntries[i:end]); err != nil { + return fmt.Errorf("BatchRestoreAccounts chunk [%d:%d] of %d: %w", i, end, len(writeEntries), err) + } + } + commitDur := time.Since(start) + + // All sub-batches succeeded — ACK + XDEL in one pipeline round-trip. + // XACK removes entries from the PEL; XDEL removes the payload from the stream body. + // Without XDEL, ACKed entries accumulate in the stream indefinitely. + // Replay safety: BatchRestoreAccounts is LWW-idempotent if ACK fails and entries replay. + ackCtx, ackCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer ackCancel() + if err := s.Ack(ackCtx, accountSyncStream, accountSyncGroup, goodIDs...); err != nil { + log.Printf("[accountqueue] WARN: ACK failed for %d entries after successful DB write: %v — will be reclaimed and re-written (safe, LWW)", len(goodIDs), err) + } else if err := s.Delete(ackCtx, accountSyncStream, goodIDs...); err != nil { + log.Printf("[accountqueue] WARN: XDEL failed for %d entries after ACK: %v", len(goodIDs), err) + } else { + log.Printf("[accountqueue] wrote %d accounts from %d entries in %s; ACKed + XDELed", + len(writeEntries), len(goodIDs), commitDur.Round(time.Millisecond)) + } + + return nil +} + +// ─── Payload parsers ───────────────────────────────────────────────────────── + +// parseAccountsPayload deserializes a payloadTypeAccounts JSON blob into a flat +// list of DB write entries ready for BatchRestoreAccounts. +// +// Time: O(N) where N = number of accounts in the payload. +// Space: O(N) — one dbEntry per account. +func parseAccountsPayload(dataStr string) ([]dbEntry, error) { + var accs []*types.Account + if err := json.Unmarshal([]byte(dataStr), &accs); err != nil { + return nil, fmt.Errorf("unmarshal []*types.Account: %w", err) + } + + // We might emit up to 2 entries per account (address: and did:) + entries := make([]dbEntry, 0, len(accs)*2) + for _, acc := range accs { + if acc == nil { + continue + } + dbAcc := &DB_OPs.Account{ + DIDAddress: acc.DIDAddress, + Address: acc.Address, + Balance: acc.Balance, + Nonce: acc.Nonce, + AccountType: acc.AccountType, + CreatedAt: acc.CreatedAt, + UpdatedAt: acc.UpdatedAt, + Metadata: acc.Metadata, + } + val, err := json.Marshal(dbAcc) + if err != nil { + return nil, fmt.Errorf("marshal DB_OPs.Account for address %s: %w", acc.Address.Hex(), err) + } + + // 1. Emit the primary address key + entries = append(entries, dbEntry{ + Key: DB_OPs.Prefix + acc.Address.Hex(), + Value: val, + }) + + // 2. Emit the DID key so BatchRestoreAccounts creates the bound reference + if acc.DIDAddress != "" { + entries = append(entries, dbEntry{ + Key: DB_OPs.DIDPrefix + acc.DIDAddress, + Value: val, + }) + } + } + return entries, nil +} + +// parseUpdatesPayload deserializes a payloadTypeUpdates JSON blob into a flat list +// of DB write entries ready for BatchRestoreAccounts. +// Reads accountUpdateWire (not types.AccountUpdate) to avoid big.Int JSON ambiguity. +// +// Time: O(N) where N = number of updates in the payload. +// Space: O(N) — one dbEntry per update. +func parseUpdatesPayload(dataStr string) ([]dbEntry, error) { + var wires []accountUpdateWire + if err := json.Unmarshal([]byte(dataStr), &wires); err != nil { + return nil, fmt.Errorf("unmarshal []accountUpdateWire: %w", err) + } + entries := make([]dbEntry, 0, len(wires)) + for _, w := range wires { + balance := new(big.Int) + if _, ok := balance.SetString(w.NewBalance, 10); !ok { + return nil, fmt.Errorf("invalid decimal balance %q for address %s", w.NewBalance, w.Address) + } + addr := common.HexToAddress(w.Address) + dbAcc := &DB_OPs.Account{ + DIDAddress: w.Address, + Address: addr, + Balance: balance.String(), + Nonce: w.Nonce, + AccountType: "user", + UpdatedAt: time.Now().UTC().UnixNano(), + } + val, err := json.Marshal(dbAcc) + if err != nil { + return nil, fmt.Errorf("marshal DB_OPs.Account for address %s: %w", w.Address, err) + } + entries = append(entries, dbEntry{ + Key: DB_OPs.Prefix + addr.Hex(), + Value: val, + }) + } + return entries, nil +} diff --git a/DB_OPs/Nodeinfo/immudb_account_manager.go b/DB_OPs/Nodeinfo/immudb_account_manager.go index 373d52bb..7a6f9984 100644 --- a/DB_OPs/Nodeinfo/immudb_account_manager.go +++ b/DB_OPs/Nodeinfo/immudb_account_manager.go @@ -3,20 +3,85 @@ package NodeInfo import ( "context" "encoding/json" + "errors" "fmt" "math/big" "sort" "strings" - "sync" "time" "github.com/JupiterMetaLabs/JMDN-FastSync/common/types" "github.com/ethereum/go-ethereum/common" "gossipnode/DB_OPs" + "gossipnode/config" ) type account_manager struct{} +// ─── Bounded enqueue (producer side) ────────────────────────────────────────── +// +// The library's AccountSync receive path (sync_protocols.go HandleAccountsSyncData) +// accumulates every page of a sync session and calls WriteAccounts ONCE at EOF with +// the whole batch — potentially millions of records. Packing that into a single XADD +// risks exceeding Redis proto-max-bulk-len (512 MiB) and stalls/fails the enqueue; a +// failed enqueue at EOF (after all pages were ACKed) collapses the session and drives +// the dispatcher into a retry→dead-letter storm. We split into fixed-size messages so +// every XADD is small and fast, and the worker's per-drain memory stays bounded. + +// maxRecordsPerMessage caps how many account/update records are packed into one Redis +// stream message (one XADD). 500 mirrors AccountSyncWorkerConfig.MaxAccountsPerBatch so +// a single message maps to roughly one ImmuDB sub-batch; at ~300 B/record a message is +// ~150 KB — three orders of magnitude under Redis's 512 MiB bulk limit. +const maxRecordsPerMessage = 500 + +// enqueueTimeout scales the enqueue deadline with chunk count: a 10 s base plus 5 ms per +// chunk covers large syncs (e.g. 2000 chunks → ~20 s) without an unbounded wait. The +// server is not blocked on this enqueue (pages were already ACKed), so a generous, +// bounded budget is safe. +// +// Time: O(1) +func enqueueTimeout(chunks int) time.Duration { + return 10*time.Second + time.Duration(chunks)*5*time.Millisecond +} + +// enqueueRecordsChunked splits items into chunks of at most maxRecordsPerMessage, +// marshals each chunk to JSON, and XADDs it to the account sync stream tagged ptype. +// Best-effort: every chunk is attempted and errors are aggregated (errors.Join), so a +// single transient XADD failure does not drop the remaining chunks. Any chunk that +// fails to enqueue is backfilled by the worker's LWW write on a later sync / +// reconciliation — strictly safer than the previous all-or-nothing single message. +// +// Time: O(N) marshal + O(ceil(N/maxRecordsPerMessage)) XADD round trips, N = len(items). +// Space: O(maxRecordsPerMessage) per message — never the whole batch at once. +// DS: input []T re-sliced in place into fixed-size windows; no intermediate copy. +func enqueueRecordsChunked[T any](ctx context.Context, s RedisStreamer, ptype syncPayloadType, items []T) error { + var errs []error + for start := 0; start < len(items); start += maxRecordsPerMessage { + end := start + maxRecordsPerMessage + if end > len(items) { + end = len(items) + } + data, err := json.Marshal(items[start:end]) + if err != nil { + errs = append(errs, fmt.Errorf("marshal chunk [%d:%d]: %w", start, end, err)) + continue + } + if _, err := s.Enqueue(ctx, accountSyncStream, map[string]any{ + "type": string(ptype), + "data": string(data), + }); err != nil { + errs = append(errs, fmt.Errorf("enqueue chunk [%d:%d]: %w", start, end, err)) + } + } + return errors.Join(errs...) +} + +// chunkCount returns the number of messages len(n) records split into maxRecordsPerMessage. +// Time: O(1) +func chunkCount(n int) int { + return (n + maxRecordsPerMessage - 1) / maxRecordsPerMessage +} + // Time Complexity: O(N) where N is the total number of transactions scanned or retrieved func (am *account_manager) GetTransactionsForAccount(accountAddress string) ([]types.DBTransaction, error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -33,18 +98,9 @@ func (am *account_manager) GetTransactionsForAccount(accountAddress string) ([]t return nil, fmt.Errorf("failed to get transactions by account: %w", err) } - // Serialize and deserialize to map config.Transaction to types.DBTransaction. - // The JSON tags match between config.Transaction and types.Transaction (embedded in DBTransaction), - // so core fields are preserved. DB-specific fields (BlockNumber, TxIndex, CreatedAt) will be zero-valued. - var result []types.DBTransaction + result := make([]types.DBTransaction, 0, len(cfgTxs)) for _, tx := range cfgTxs { - b, err := json.Marshal(tx) - if err == nil { - var dbTx types.DBTransaction - if json.Unmarshal(b, &dbTx) == nil { - result = append(result, dbTx) - } - } + result = append(result, configTxToDBTx(tx)) } return result, nil } @@ -167,83 +223,81 @@ func (am *account_manager) GetAccountByAddress(accountAddress string) (*types.Ac return dbOpsToTypes(acc), nil } -// Time Complexity: O(N) where N is the number of accounts +// WriteAccounts enqueues accounts to the Redis stream for async DB write, split into +// fixed-size messages of at most maxRecordsPerMessage (see enqueueRecordsChunked). +// Returns immediately after the enqueue — the caller gets an ACK without waiting for +// the ImmuDB commit (which can take up to 15 s under load). +// +// The library hands this the entire end-of-stream batch (up to millions of accounts); +// chunking keeps each XADD small so it never exceeds Redis's bulk-string limit and the +// enqueue cannot fail the whole session. Enqueue is best-effort across chunks: a +// partial failure returns an aggregated error but does not drop successful chunks; the +// worker's LWW write backfills the rest on a later sync. +// +// StartAccountSyncWorker must be called before WriteAccounts or this returns an error. +// At-least-once delivery is guaranteed by the worker via PEL + XAUTOCLAIM. +// +// Time: O(N) serialization + O(ceil(N/maxRecordsPerMessage)) XADD round trips, N = len(accounts). func (am *account_manager) WriteAccounts(accounts []*types.Account) error { if len(accounts) == 0 { return nil } - - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - - conn, err := DB_OPs.GetAccountConnectionandPutBack(ctx) - if err != nil { - return fmt.Errorf("failed to get account DB connection: %w", err) + s, mgr := getAccountQueue() + if s == nil { + return fmt.Errorf("WriteAccounts: account queue not initialized; call StartAccountSyncWorker before use") } + mgr.EnsureActive() - entries := make([]struct { - Key string - Value []byte - }, 0, len(accounts)) - - for _, acc := range accounts { - dbAcc := &DB_OPs.Account{ - DIDAddress: acc.DIDAddress, - Address: acc.Address, - Balance: acc.Balance, - Nonce: acc.Nonce, - AccountType: acc.AccountType, - CreatedAt: acc.CreatedAt, - UpdatedAt: acc.UpdatedAt, - Metadata: acc.Metadata, - } - val, err := json.Marshal(dbAcc) - if err != nil { - return fmt.Errorf("marshal account %s: %w", acc.Address.Hex(), err) - } - entries = append(entries, struct { - Key string - Value []byte - }{ - Key: DB_OPs.Prefix + acc.Address.Hex(), - Value: val, - }) + chunks := chunkCount(len(accounts)) + ctx, cancel := context.WithTimeout(context.Background(), enqueueTimeout(chunks)) + defer cancel() + if err := enqueueRecordsChunked(ctx, s, payloadTypeAccounts, accounts); err != nil { + return fmt.Errorf("WriteAccounts: enqueue %d accounts in %d messages: %w", len(accounts), chunks, err) } - - return DB_OPs.BatchRestoreAccounts(conn, entries) + return nil } -// NewAccountNonceIterator returns an iterator that pages through all accounts -// using ListAccountsPaginated, sorted by nonce within each batch. -// The in-memory nonce→account cache supports GetAccountsByNonces lookups. +// NewAccountNonceIterator returns a cursor-based iterator over all accounts. +// Each NextBatch call advances a seekKey cursor — O(N) total scan across all batches. func (am *account_manager) NewAccountNonceIterator(batchSize int) types.AccountNonceIterator { return &immudbNonceIter{ - batchSize: batchSize, - nonceToAccount: make(map[uint64]*types.Account), + batchSize: batchSize, } } // ─── immudbNonceIter ───────────────────────────────────────────────────────── +// MODULE: DB_OPs/Nodeinfo (immudbNonceIter) +// PURPOSE: cursor-based iterator that pages all accounts from ImmuDB in ascending key order. +// +// CORE DATA STRUCTURES: +// - lastKey []byte: scan cursor — key of the last returned account; nil = start of DB. +// Fixed size (one key). Threaded across NextBatch calls so each call resumes where the +// previous left off instead of restarting from key 0. +// +// DO NOT: +// - Replace lastKey with an offset int — that restarts the scan from key 0 each call (O(N²)). +// - Add an in-memory account cache on this struct — 2.7M entries exhaust heap during sync. + type immudbNonceIter struct { - batchSize int - offset int - done bool - mu sync.Mutex - nonceToAccount map[uint64]*types.Account + batchSize int + lastKey []byte // scan cursor: key of last returned account, nil = start + done bool } +// Time: O(1) func (it *immudbNonceIter) TotalAccounts() (uint64, error) { count, err := DB_OPs.CountAccounts(nil) return uint64(count), err } +// Time: O(batchSize) ImmuDB entries; Space: O(batchSize) func (it *immudbNonceIter) NextBatch() ([]*types.Account, error) { if it.done { return nil, nil } - accs, err := DB_OPs.ListAccountsPaginated(nil, it.batchSize, it.offset, "") + accs, lastKey, err := DB_OPs.ListAccountsPaginatedFrom(nil, it.batchSize, it.lastKey, "") if err != nil { return nil, fmt.Errorf("account nonce iterator: %w", err) } @@ -253,44 +307,38 @@ func (it *immudbNonceIter) NextBatch() ([]*types.Account, error) { } result := make([]*types.Account, len(accs)) - it.mu.Lock() for i, acc := range accs { - ta := dbOpsToTypes(acc) - result[i] = ta - it.nonceToAccount[ta.Nonce] = ta + result[i] = dbOpsToTypes(acc) } - it.mu.Unlock() sort.Slice(result, func(i, j int) bool { return result[i].Nonce < result[j].Nonce }) - it.offset += len(accs) + it.lastKey = lastKey if len(accs) < it.batchSize { it.done = true } return result, nil } -// GetAccountsByNonces scans the DB to find accounts matching the given nonces. -// The dispatcher calls this on a fresh iterator (no prior NextBatch), so we -// cannot rely on the in-memory cache — we scan paginated until all nonces are found. +// GetAccountsByNonces scans all accounts once via cursor to find those matching the given nonces. +// Time: O(N) where N = total accounts; Space: O(|nonces|) func (it *immudbNonceIter) GetAccountsByNonces(nonces []uint64) ([]*types.Account, error) { if len(nonces) == 0 { return nil, nil } - nonceSet := make(map[uint64]bool, len(nonces)) + nonceSet := make(map[uint64]struct{}, len(nonces)) for _, n := range nonces { - nonceSet[n] = true + nonceSet[n] = struct{}{} } result := make([]*types.Account, 0, len(nonces)) - const scanBatch = 1000 - offset := 0 + var seekKey []byte for { - accs, err := DB_OPs.ListAccountsPaginated(nil, scanBatch, offset, "") + accs, lastKey, err := DB_OPs.ListAccountsPaginatedFrom(nil, 1000, seekKey, "") if err != nil { return nil, fmt.Errorf("GetAccountsByNonces scan: %w", err) } @@ -299,26 +347,22 @@ func (it *immudbNonceIter) GetAccountsByNonces(nonces []uint64) ([]*types.Accoun } for _, acc := range accs { ta := dbOpsToTypes(acc) - if nonceSet[ta.Nonce] { + if _, ok := nonceSet[ta.Nonce]; ok { result = append(result, ta) if len(result) == len(nonces) { return result, nil } } } - offset += len(accs) - if len(accs) < scanBatch { + if lastKey == nil || len(accs) < 1000 { break } + seekKey = lastKey } return result, nil } -func (it *immudbNonceIter) Close() { - it.mu.Lock() - it.nonceToAccount = nil - it.mu.Unlock() -} +func (it *immudbNonceIter) Close() {} // ─── helpers ───────────────────────────────────────────────────────────────── @@ -335,44 +379,81 @@ func dbOpsToTypes(acc *DB_OPs.Account) *types.Account { } } -// Time Complexity: O(N) where N is the number of updates +// BatchUpdateAccounts enqueues account balance/nonce updates to the Redis stream for +// async DB write, split into fixed-size messages of at most maxRecordsPerMessage. +// Returns immediately after the enqueue. Best-effort across chunks (see WriteAccounts). +// +// StartAccountSyncWorker must be called before BatchUpdateAccounts or this returns an error. +// At-least-once delivery is guaranteed by the worker via PEL + XAUTOCLAIM. +// +// Time: O(N) serialization + O(ceil(N/maxRecordsPerMessage)) XADD round trips, N = len(updates). func (am *account_manager) BatchUpdateAccounts(updates []types.AccountUpdate) error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - conn, err := DB_OPs.GetAccountConnectionandPutBack(ctx) - if err != nil { - return fmt.Errorf("failed to get account DB connection: %w", err) + if len(updates) == 0 { + return nil + } + s, mgr := getAccountQueue() + if s == nil { + return fmt.Errorf("BatchUpdateAccounts: account queue not initialized; call StartAccountSyncWorker before use") + } + mgr.EnsureActive() + // Convert to wire type for stable JSON serialization. + // big.Int.String() produces a decimal string; accountUpdateWire makes the format explicit. + wires := make([]accountUpdateWire, len(updates)) + for i, u := range updates { + wires[i] = accountUpdateWire{ + Address: u.Address, + NewBalance: u.NewBalance.String(), + Nonce: u.Nonce, + } } - var entries []struct { - Key string - Value []byte + chunks := chunkCount(len(wires)) + ctx, cancel := context.WithTimeout(context.Background(), enqueueTimeout(chunks)) + defer cancel() + if err := enqueueRecordsChunked(ctx, s, payloadTypeUpdates, wires); err != nil { + return fmt.Errorf("BatchUpdateAccounts: enqueue %d updates in %d messages: %w", len(updates), chunks, err) } + return nil +} - for _, u := range updates { - addr := common.HexToAddress(u.Address) - acc := &DB_OPs.Account{ - DIDAddress: u.Address, - Address: addr, - Balance: u.NewBalance.String(), - Nonce: u.Nonce, - AccountType: "user", - UpdatedAt: time.Now().UTC().UnixNano(), - } +// configTxToDBTx converts a config.Transaction to types.DBTransaction via direct field copy. +// DB-specific fields (BlockNumber, TxIndex, CreatedAt) are zero-valued — not available from config.Transaction. +func configTxToDBTx(tx *config.Transaction) types.DBTransaction { + return types.DBTransaction{ + Transaction: types.Transaction{ + Hash: tx.Hash, + From: tx.From, + To: tx.To, + Value: tx.Value, + Type: tx.Type, + Timestamp: tx.Timestamp, + ChainID: tx.ChainID, + Nonce: tx.Nonce, + GasLimit: tx.GasLimit, + GasPrice: tx.GasPrice, + MaxFee: tx.MaxFee, + MaxPriorityFee: tx.MaxPriorityFee, + Data: tx.Data, + AccessList: configAccessListToTypes(tx.AccessList), + V: tx.V, + R: tx.R, + S: tx.S, + }, + } +} - val, err := json.Marshal(acc) - if err != nil { - return fmt.Errorf("failed to marshal account %s: %w", u.Address, err) +// configAccessListToTypes converts config.AccessList to types.AccessList. +// Both are structurally identical but defined in separate packages. +func configAccessListToTypes(al config.AccessList) types.AccessList { + if len(al) == 0 { + return nil + } + result := make(types.AccessList, len(al)) + for i, t := range al { + result[i] = types.AccessTuple{ + Address: t.Address, + StorageKeys: t.StorageKeys, } - entries = append(entries, struct { - Key string - Value []byte - }{ - Key: DB_OPs.Prefix + addr.Hex(), - Value: val, - }) } - - return DB_OPs.BatchRestoreAccounts(conn, entries) + return result } diff --git a/DB_OPs/Nodeinfo/immudb_block_nonheaders.go b/DB_OPs/Nodeinfo/immudb_block_nonheaders.go index 2e498119..a9b57c81 100644 --- a/DB_OPs/Nodeinfo/immudb_block_nonheaders.go +++ b/DB_OPs/Nodeinfo/immudb_block_nonheaders.go @@ -94,6 +94,9 @@ func convertZKBlockToNonHeaders(b *config.ZKBlock) *blockpb.NonHeaders { if tx.Value != nil { pbTx.Value = tx.Value.Bytes() } + if tx.ChainID != nil { + pbTx.ChainId = tx.ChainID.Bytes() + } if tx.GasPrice != nil { pbTx.GasPrice = tx.GasPrice.Bytes() } @@ -103,6 +106,15 @@ func convertZKBlockToNonHeaders(b *config.ZKBlock) *blockpb.NonHeaders { if tx.MaxPriorityFee != nil { pbTx.MaxPriorityFee = tx.MaxPriorityFee.Bytes() } + for _, at := range tx.AccessList { + pbAT := &blockpb.AccessTuple{ + Address: at.Address[:], + } + for _, sk := range at.StorageKeys { + pbAT.StorageKeys = append(pbAT.StorageKeys, sk[:]) + } + pbTx.AccessList = append(pbTx.AccessList, pbAT) + } if tx.V != nil { pbTx.V = tx.V.Bytes() } @@ -113,6 +125,21 @@ func convertZKBlockToNonHeaders(b *config.ZKBlock) *blockpb.NonHeaders { pbTx.S = tx.S.Bytes() } + if tx.ChainID != nil { + pbTx.ChainId = tx.ChainID.Bytes() + } + if len(tx.AccessList) > 0 { + for _, al := range tx.AccessList { + pbAl := &blockpb.AccessTuple{ + Address: al.Address[:], + } + for _, sk := range al.StorageKeys { + pbAl.StorageKeys = append(pbAl.StorageKeys, sk[:]) + } + pbTx.AccessList = append(pbTx.AccessList, pbAl) + } + } + nh.Transactions = append(nh.Transactions, &blockpb.DBTransaction{ Tx: pbTx, TxIndex: uint32(idx), diff --git a/DB_OPs/Nodeinfo/immudb_blockheader_iterator.go b/DB_OPs/Nodeinfo/immudb_blockheader_iterator.go index 036b6cc5..21ce6c34 100644 --- a/DB_OPs/Nodeinfo/immudb_blockheader_iterator.go +++ b/DB_OPs/Nodeinfo/immudb_blockheader_iterator.go @@ -45,6 +45,7 @@ func (i *dbBlockHeaderIterator) GetBlockHeaders(blocknumbers []uint64) ([]*block GasLimit: b.GasLimit, GasUsed: b.GasUsed, BlockNumber: b.BlockNumber, + LogsBloom: b.LogsBloom, } if b.CoinbaseAddr != nil { h.CoinbaseAddr = b.CoinbaseAddr[:] @@ -87,6 +88,7 @@ func (i *dbBlockHeaderIterator) GetBlockHeadersRange(start, end uint64) ([]*bloc GasLimit: b.GasLimit, GasUsed: b.GasUsed, BlockNumber: b.BlockNumber, + LogsBloom: b.LogsBloom, } if b.CoinbaseAddr != nil { h.CoinbaseAddr = b.CoinbaseAddr[:] diff --git a/DB_OPs/Nodeinfo/immudb_data_writer.go b/DB_OPs/Nodeinfo/immudb_data_writer.go index 53efaa41..d5a04b97 100644 --- a/DB_OPs/Nodeinfo/immudb_data_writer.go +++ b/DB_OPs/Nodeinfo/immudb_data_writer.go @@ -89,6 +89,9 @@ func (dw *DataWriter) WriteData(data []*blockpb.NonHeaders) error { if len(tx.Value) > 0 { cfgTx.Value = new(big.Int).SetBytes(tx.Value) } + if len(tx.ChainId) > 0 { + cfgTx.ChainID = new(big.Int).SetBytes(tx.ChainId) + } if len(tx.GasPrice) > 0 { cfgTx.GasPrice = new(big.Int).SetBytes(tx.GasPrice) } @@ -98,6 +101,18 @@ func (dw *DataWriter) WriteData(data []*blockpb.NonHeaders) error { if len(tx.MaxPriorityFee) > 0 { cfgTx.MaxPriorityFee = new(big.Int).SetBytes(tx.MaxPriorityFee) } + if len(tx.AccessList) > 0 { + cfgTx.AccessList = make(config.AccessList, 0, len(tx.AccessList)) + for _, pbAT := range tx.AccessList { + at := config.AccessTuple{ + Address: common.BytesToAddress(pbAT.Address), + } + for _, sk := range pbAT.StorageKeys { + at.StorageKeys = append(at.StorageKeys, common.BytesToHash(sk)) + } + cfgTx.AccessList = append(cfgTx.AccessList, at) + } + } if len(tx.V) > 0 { cfgTx.V = new(big.Int).SetBytes(tx.V) } @@ -107,6 +122,20 @@ func (dw *DataWriter) WriteData(data []*blockpb.NonHeaders) error { if len(tx.S) > 0 { cfgTx.S = new(big.Int).SetBytes(tx.S) } + if len(tx.ChainId) > 0 { + cfgTx.ChainID = new(big.Int).SetBytes(tx.ChainId) + } + if len(tx.AccessList) > 0 { + for _, al := range tx.AccessList { + cfgAl := config.AccessTuple{ + Address: common.BytesToAddress(al.Address), + } + for _, sk := range al.StorageKeys { + cfgAl.StorageKeys = append(cfgAl.StorageKeys, common.BytesToHash(sk)) + } + cfgTx.AccessList = append(cfgTx.AccessList, cfgAl) + } + } txs = append(txs, cfgTx) } diff --git a/DB_OPs/Nodeinfo/immudb_headers_writer.go b/DB_OPs/Nodeinfo/immudb_headers_writer.go index a05a5f47..16ee0631 100644 --- a/DB_OPs/Nodeinfo/immudb_headers_writer.go +++ b/DB_OPs/Nodeinfo/immudb_headers_writer.go @@ -51,6 +51,7 @@ func (hw *HeadersWriter) WriteHeaders(headers []*block.Header) error { ExtraData: h.ExtraData, GasLimit: h.GasLimit, GasUsed: h.GasUsed, + LogsBloom: h.LogsBloom, } if len(h.StateRoot) > 0 { diff --git a/DB_OPs/account_immuclient.go b/DB_OPs/account_immuclient.go index 294978f5..a9f29c60 100644 --- a/DB_OPs/account_immuclient.go +++ b/DB_OPs/account_immuclient.go @@ -56,13 +56,36 @@ func (s *AccountsSet) Add(address common.Address) { s.Accounts[address.Hex()] = nil } -// Get the Nonce of a account - NTF -var counter uint64 +// lastNonce is used to guarantee monotonic nanosecond timestamps for PutNonceofAccount. +var lastNonce atomic.Uint64 + +// PutNonceofAccount generates a unique epoch ID for new accounts. +// +// HISTORICAL BUG (Fixed): Previously computed as `uint64(UnixNano) << 16 | counter`, +// which silently overflowed uint64 and corrupted the embedded timestamp. +// +// FIX (Option C): We now use a pure monotonic nanosecond counter. It returns +// exact UnixNano precision, gracefully bumping by +1ns on extreme collisions. +// +// LIFECYCLE WARNING: The `Nonce` field in the Account struct serves a dual purpose: +// 1. On creation: It stores this unique nanosecond timestamp ID. +// 2. Post-transaction: Reconciliation and consensus overwrite it with the account's +// highest transaction nonce (e.g., 0, 1, 2...). +// Do NOT rely on Account.Nonce remaining a timestamp if the account has sent transactions! func PutNonceofAccount() (uint64, error) { - ts := uint64(time.Now().UTC().UnixNano()) - c := atomic.AddUint64(&counter, 1) - return ts<<16 | (c & 0xFFFF), nil // embed counter in low bits + for { + ns := uint64(time.Now().UTC().UnixNano()) + prev := lastNonce.Load() + next := ns + if next <= prev { + next = prev + 1 // same-ns collision: bump forward + } + if lastNonce.CompareAndSwap(prev, next) { + return next, nil + } + // CAS lost race against another goroutine — retry + } } // Create Account from DID and Address and Store using StoreAccount @@ -335,7 +358,7 @@ func BatchCreateAccountsOrdered(PooledConnection *config.PooledConnection, entri // BatchRestoreAccounts applies a batch of entries into accountsdb. // For address: keys it writes KV. For did: it creates a bound reference to the corresponding address key. -func BatchRestoreAccounts(PooledConnection *config.PooledConnection, entries []struct { +func BatchRestoreAccounts(ctx context.Context, PooledConnection *config.PooledConnection, entries []struct { Key string Value []byte }) error { @@ -345,12 +368,6 @@ func BatchRestoreAccounts(PooledConnection *config.PooledConnection, entries []s var err error var shouldReturnConnection bool - // Define Function wide context for timeout - ctx := context.Background() - - // End the context.Background() - defer ctx.Done() - if PooledConnection == nil || PooledConnection.Client == nil { PooledConnection, err = GetAccountConnectionandPutBack(ctx) if err != nil { @@ -456,7 +473,7 @@ func BatchRestoreAccounts(PooledConnection *config.PooledConnection, entries []s } } if len(prefetchKeys) > 0 { - fetchCtx, fetchCancel := context.WithTimeout(context.Background(), 30*time.Second) + fetchCtx, fetchCancel := context.WithTimeout(ctx, 30*time.Second) entriesList, getAllErr := PooledConnection.Client.Client.GetAll(fetchCtx, prefetchKeys) fetchCancel() if getAllErr == nil && entriesList != nil { @@ -526,6 +543,31 @@ func BatchRestoreAccounts(PooledConnection *config.PooledConnection, entries []s ion.String("topic", TOPIC), ion.String("function", "DB_OPs.BatchRestoreAccounts")) } + + // FIELD MERGING: Prevent partial updates (e.g. from Reconciliation) from wiping out account metadata + if shouldWrite { + // 1. Preserve DIDAddress if incoming DID is empty or mistakenly set to the hex address + if incoming.DIDAddress == "" || incoming.DIDAddress == incoming.Address.Hex() { + incoming.DIDAddress = existing.DIDAddress + } + // 2. Preserve CreatedAt + if incoming.CreatedAt == 0 { + incoming.CreatedAt = existing.CreatedAt + } + // 3. Preserve AccountType + if incoming.AccountType == "user" && existing.AccountType != "" { + incoming.AccountType = existing.AccountType + } + // 4. Preserve Metadata + if incoming.Metadata == nil { + incoming.Metadata = existing.Metadata + } + + // Re-serialize the merged account object to overwrite e.Value + if mergedVal, err := json.Marshal(incoming); err == nil { + e.Value = mergedVal + } + } } } else { loggerCtx, cancel := context.WithCancel(context.Background()) @@ -612,7 +654,7 @@ func BatchRestoreAccounts(PooledConnection *config.PooledConnection, entries []s if end > len(ops) { end = len(ops) } - chunkCtx, chunkCancel := context.WithTimeout(context.Background(), 30*time.Second) + chunkCtx, chunkCancel := context.WithTimeout(ctx, 30*time.Second) _, err = PooledConnection.Client.Client.ExecAll(chunkCtx, &schema.ExecAllRequest{Operations: ops[chunkStart:end]}) chunkCancel() if err != nil { @@ -1082,8 +1124,8 @@ func ListAccountsPaginated(PooledConnection *config.PooledConnection, limit, off Desc: true, // latest accounts first } ReadCtx, ReadCancel := context.WithTimeout(context.Background(), 10*time.Second) - defer ReadCancel() scanResult, err := ic.Client.Scan(ReadCtx, scanReq) + ReadCancel() if err != nil { loggerCtx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1118,7 +1160,6 @@ func ListAccountsPaginated(PooledConnection *config.PooledConnection, limit, off var acc Account if err := json.Unmarshal(entry.Value, &acc); err != nil { loggerCtx, cancel := context.WithCancel(context.Background()) - defer cancel() PooledConnection.Client.Logger.Warn(loggerCtx, "Skipping account due to unmarshal error", ion.String("error", err.Error()), ion.String("key", string(entry.Key)), @@ -1127,6 +1168,7 @@ func ListAccountsPaginated(PooledConnection *config.PooledConnection, limit, off ion.String("log_file", LOG_FILE), ion.String("topic", TOPIC), ion.String("function", "DB_OPs.ListAccountsPaginated")) + cancel() continue } @@ -1167,6 +1209,114 @@ func ListAccountsPaginated(PooledConnection *config.PooledConnection, limit, off return accounts, nil } +// ListAccountsPaginatedFrom retrieves up to limit accounts starting after seekKey in ascending key order. +// seekKey=nil starts from the first address: entry. Returns the accounts and the scan cursor +// (key of the last accepted account); pass it as seekKey on the next call to continue without rescanning. +// +// Time: O(limit) ImmuDB entries read; Space: O(limit) +// DS: ImmuDB ascending Scan with SeekKey cursor — no offset restart across calls. +func ListAccountsPaginatedFrom(PooledConnection *config.PooledConnection, limit int, seekKey []byte, extendedPrefix string) ([]*Account, []byte, error) { + var err error + var shouldReturnConnection = false + + ctx := context.Background() + + if PooledConnection == nil || PooledConnection.Client == nil { + PooledConnection, err = GetAccountConnectionandPutBack(ctx) + if err != nil { + return nil, nil, fmt.Errorf("failed to get connection from pool: %w - ListAccountsPaginatedFrom", err) + } + shouldReturnConnection = true + } + if shouldReturnConnection { + defer func() { + PutAccountsConnection(PooledConnection) + }() + } + + ic := PooledConnection.Client + if err := ensureAccountsDBSelected(PooledConnection); err != nil { + return nil, nil, fmt.Errorf("failed to ensure accounts database is selected: %w - ListAccountsPaginatedFrom", err) + } + + prefix := []byte(Prefix) + var accounts []*Account + var lastKey []byte + const internalBatch = 1000 + currentSeek := seekKey + + for len(accounts) < limit { + scanReq := &schema.ScanRequest{ + Prefix: prefix, + Limit: uint64(internalBatch), + SeekKey: currentSeek, + Desc: false, + } + + scanCtx, scanCancel := context.WithTimeout(context.Background(), 10*time.Second) + scanResult, scanErr := ic.Client.Scan(scanCtx, scanReq) + scanCancel() + + if scanErr != nil { + loggerCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + ic.Logger.Error(loggerCtx, "Failed to scan for accounts", + scanErr, + ion.String("database", config.AccountsDBName), + ion.String("created_at", time.Now().UTC().Format(time.RFC3339)), + ion.String("log_file", LOG_FILE), + ion.String("topic", TOPIC), + ion.String("function", "DB_OPs.ListAccountsPaginatedFrom")) + return nil, nil, fmt.Errorf("failed to scan for accounts: %w - ListAccountsPaginatedFrom", scanErr) + } + + if len(scanResult.Entries) == 0 { + break + } + + // ImmuDB Scan is inclusive on SeekKey — skip the first entry if it is the cursor itself. + startIndex := 0 + if currentSeek != nil && string(scanResult.Entries[0].Key) == string(currentSeek) { + startIndex = 1 + } + + for i := startIndex; i < len(scanResult.Entries) && len(accounts) < limit; i++ { + entry := scanResult.Entries[i] + + var acc Account + if jsonErr := json.Unmarshal(entry.Value, &acc); jsonErr != nil { + loggerCtx, cancel := context.WithCancel(context.Background()) + ic.Logger.Warn(loggerCtx, "Skipping account due to unmarshal error", + ion.String("error", jsonErr.Error()), + ion.String("key", string(entry.Key)), + ion.String("database", config.AccountsDBName), + ion.String("created_at", time.Now().UTC().Format(time.RFC3339)), + ion.String("log_file", LOG_FILE), + ion.String("topic", TOPIC), + ion.String("function", "DB_OPs.ListAccountsPaginatedFrom")) + cancel() + continue + } + + if extendedPrefix != "" && !strings.HasPrefix(acc.DIDAddress, extendedPrefix) { + continue + } + + accounts = append(accounts, &acc) + lastKey = entry.Key + } + + if len(accounts) >= limit || len(scanResult.Entries) < internalBatch { + break + } + + // Advance cursor to the end of this scan batch. + currentSeek = scanResult.Entries[len(scanResult.Entries)-1].Key + } + + return accounts, lastKey, nil +} + // CountAccounts returns the total number of Accounts in the database. // This implementation scans keys without loading them all into memory. func CountAccounts(PooledConnection *config.PooledConnection) (int, error) { @@ -1249,10 +1399,9 @@ func GetTransactionsByAccount(PooledConnection *config.PooledConnection, account // Process current batch of blocks for i := startBlock; i <= endBlock; i++ { - block, err := GetZKBlockByNumber(PooledConnection, i) + block, err := GetZKBlockByNumberFast(PooledConnection, i) if err != nil { loggerCtx, cancel := context.WithCancel(context.Background()) - defer cancel() ic.Logger.Warn(loggerCtx, "Error retrieving block, skipping", ion.String("error", err.Error()), ion.Uint64("block_number", i), @@ -1261,6 +1410,7 @@ func GetTransactionsByAccount(PooledConnection *config.PooledConnection, account ion.String("log_file", LOG_FILE), ion.String("topic", TOPIC), ion.String("function", "DB_OPs.GetTransactionsByAccount")) + cancel() continue } @@ -1573,10 +1723,9 @@ func GetTransactionsByAccountPaginated(PooledConnection *config.PooledConnection // Process current batch of blocks (in reverse order) for i := currentBlock; i >= startBlock && len(allMatchingTxs) < transactionsNeeded; i-- { - block, err := GetZKBlockByNumber(PooledConnection, i) + block, err := GetZKBlockByNumberFast(PooledConnection, i) if err != nil { loggerCtx, cancel := context.WithCancel(context.Background()) - defer cancel() ic.Logger.Warn(loggerCtx, "Error retrieving block, skipping", ion.String("error", err.Error()), ion.Uint64("block_number", i), @@ -1585,6 +1734,7 @@ func GetTransactionsByAccountPaginated(PooledConnection *config.PooledConnection ion.String("log_file", LOG_FILE), ion.String("topic", TOPIC), ion.String("function", "DB_OPs.GetTransactionsByAccountPaginated")) + cancel() continue } diff --git a/DB_OPs/immuclient.go b/DB_OPs/immuclient.go index b58eadeb..2dc775f8 100644 --- a/DB_OPs/immuclient.go +++ b/DB_OPs/immuclient.go @@ -2084,6 +2084,46 @@ func GetZKBlockByNumber(mainDBClient *config.PooledConnection, blockNumber uint6 return block, nil } +// GetZKBlockByNumberFast retrieves a ZK block by number using plain Get (no proof generation). +// Use for sync/reconciliation paths where tamper-proof guarantees are not required. +// 5–10× faster than GetZKBlockByNumber for bulk reads. +// +// Time: O(1); Space: O(block size) +func GetZKBlockByNumberFast(mainDBClient *config.PooledConnection, blockNumber uint64) (*config.ZKBlock, error) { + var shouldReturnConnection = false + var err error + blockKey := fmt.Sprintf("%s%d", PREFIX_BLOCK, blockNumber) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + block := new(config.ZKBlock) + if mainDBClient == nil { + mainDBClient, err = GetMainDBConnectionandPutBack(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get main DB connection: %w - GetZKBlockByNumberFast", err) + } + shouldReturnConnection = true + } + + if shouldReturnConnection { + defer func() { + PutMainDBConnection(mainDBClient) + }() + } + + entry, err := mainDBClient.Client.Client.Get(ctx, []byte(blockKey)) + if err != nil { + return nil, fmt.Errorf("failed to retrieve block %d: %w", blockNumber, err) + } + + if err := json.Unmarshal(entry.Value, block); err != nil { + return nil, fmt.Errorf("failed to unmarshal block %d: %w", blockNumber, err) + } + + return block, nil +} + // GetZKBlockByHash retrieves a ZK block by its hash (UNCHANGED) func GetZKBlockByHash(mainDBClient *config.PooledConnection, blockHash string) (*config.ZKBlock, error) { // First get the block number from the hash diff --git a/FastsyncV2/fastsyncv2.go b/FastsyncV2/fastsyncv2.go index c0fbebb8..30b5c052 100644 --- a/FastsyncV2/fastsyncv2.go +++ b/FastsyncV2/fastsyncv2.go @@ -678,6 +678,7 @@ func zkBlockToProtoHeader(b *types.ZKBlock) *blockpb.Header { GasLimit: b.GasLimit, GasUsed: b.GasUsed, BlockNumber: b.BlockNumber, + LogsBloom: b.LogsBloom, } if b.CoinbaseAddr != nil { h.CoinbaseAddr = b.CoinbaseAddr[:] @@ -725,6 +726,9 @@ func zkBlockToProtoNonHeaders(b *types.ZKBlock) *blockpb.NonHeaders { if tx.Value != nil { pbTx.Value = tx.Value.Bytes() } + if tx.ChainID != nil { + pbTx.ChainId = tx.ChainID.Bytes() + } if tx.GasPrice != nil { pbTx.GasPrice = tx.GasPrice.Bytes() } @@ -734,6 +738,15 @@ func zkBlockToProtoNonHeaders(b *types.ZKBlock) *blockpb.NonHeaders { if tx.MaxPriorityFee != nil { pbTx.MaxPriorityFee = tx.MaxPriorityFee.Bytes() } + for _, at := range tx.AccessList { + pbAT := &blockpb.AccessTuple{ + Address: at.Address[:], + } + for _, sk := range at.StorageKeys { + pbAT.StorageKeys = append(pbAT.StorageKeys, sk[:]) + } + pbTx.AccessList = append(pbTx.AccessList, pbAT) + } if tx.V != nil { pbTx.V = tx.V.Bytes() } @@ -744,6 +757,21 @@ func zkBlockToProtoNonHeaders(b *types.ZKBlock) *blockpb.NonHeaders { pbTx.S = tx.S.Bytes() } + if tx.ChainID != nil { + pbTx.ChainId = tx.ChainID.Bytes() + } + if len(tx.AccessList) > 0 { + for _, al := range tx.AccessList { + pbAl := &blockpb.AccessTuple{ + Address: al.Address[:], + } + for _, sk := range al.StorageKeys { + pbAl.StorageKeys = append(pbAl.StorageKeys, sk[:]) + } + pbTx.AccessList = append(pbTx.AccessList, pbAl) + } + } + nh.Transactions = append(nh.Transactions, &blockpb.DBTransaction{ Tx: pbTx, TxIndex: uint32(idx), diff --git a/config/settings/config.go b/config/settings/config.go index 7dcab426..61b3825b 100644 --- a/config/settings/config.go +++ b/config/settings/config.go @@ -63,12 +63,23 @@ type BindSettings struct { Profiler string `mapstructure:"profiler" yaml:"profiler"` } -// DatabaseSettings controls ImmuDB connection parameters. -type DatabaseSettings struct { - Username string `mapstructure:"username" yaml:"username"` +// RedisSettings controls the Redis connection used by the account sync worker. +// The worker uses a Redis Stream (XADD/XREADGROUP/XACK) to decouple the +// WriteAccounts / BatchUpdateAccounts callers from the ~15 s ImmuDB commit latency. +// URL format: "host:port" (e.g. "localhost:6379"). +// Env override: JMDN_DATABASE_REDIS_URL, JMDN_DATABASE_REDIS_PASSWORD +type RedisSettings struct { + URL string `mapstructure:"url" yaml:"url"` Password string `mapstructure:"password" yaml:"password"` } +// DatabaseSettings controls ImmuDB and Redis connection parameters. +type DatabaseSettings struct { + Username string `mapstructure:"username" yaml:"username"` + Password string `mapstructure:"password" yaml:"password"` + Redis RedisSettings `mapstructure:"redis" yaml:"redis"` +} + // LoggingSettings mirrors Ion's Config struct so jmdn.yaml can fully configure // the logger (console, file, OTEL, tracing, metrics) in one place. // This replaces the old otelconfig.LogConfig and scattered env vars. @@ -103,14 +114,15 @@ type LogFileSettings struct { // LogOTELSettings configures OpenTelemetry log/trace export. type LogOTELSettings struct { - Enabled bool `mapstructure:"enabled" yaml:"enabled"` - Endpoint string `mapstructure:"endpoint" yaml:"endpoint"` - Protocol string `mapstructure:"protocol" yaml:"protocol"` // grpc or http - Insecure bool `mapstructure:"insecure" yaml:"insecure"` - Username string `mapstructure:"username" yaml:"username"` - Password string `mapstructure:"password" yaml:"password"` - BatchSize int `mapstructure:"batch_size" yaml:"batch_size"` - ExportInterval time.Duration `mapstructure:"export_interval" yaml:"export_interval"` + Enabled bool `mapstructure:"enabled" yaml:"enabled"` + Endpoint string `mapstructure:"endpoint" yaml:"endpoint"` + Protocol string `mapstructure:"protocol" yaml:"protocol"` // grpc or http + Insecure bool `mapstructure:"insecure" yaml:"insecure"` + Headers map[string]string `mapstructure:"headers" yaml:"headers"` + Username string `mapstructure:"username" yaml:"username"` + Password string `mapstructure:"password" yaml:"password"` + BatchSize int `mapstructure:"batch_size" yaml:"batch_size"` + ExportInterval time.Duration `mapstructure:"export_interval" yaml:"export_interval"` } // LogTracingSettings configures distributed tracing. diff --git a/config/settings/defaults.go b/config/settings/defaults.go index 1c070565..60ae4cac 100644 --- a/config/settings/defaults.go +++ b/config/settings/defaults.go @@ -42,6 +42,10 @@ func DefaultConfig() NodeConfig { Database: DatabaseSettings{ Username: "", Password: "", + Redis: RedisSettings{ + URL: "127.0.0.1:6379", // required for account sync worker; set via jmdn.yaml or JMDN_DATABASE_REDIS_URL + Password: "jmdnredissync", // optional: set if Redis requires authentication + }, }, Logging: LoggingSettings{ Level: "warn", @@ -64,6 +68,7 @@ func DefaultConfig() NodeConfig { Enabled: false, Protocol: "grpc", Insecure: false, + Headers: map[string]string{}, BatchSize: 512, ExportInterval: 5 * time.Second, }, @@ -80,8 +85,8 @@ func DefaultConfig() NodeConfig { Enabled: true, EnablePulling: true, PullOnStartup: true, - SyncTimeout: 10 * time.Minute, - AllowedPeers: []string{}, + SyncTimeout: 10 * time.Minute, + AllowedPeers: []string{}, }, Security: DefaultSecurityConfig(), Alerts: DefaultAlertsConfig(), diff --git a/config/settings/loader.go b/config/settings/loader.go index ecc10b6a..3c60233a 100644 --- a/config/settings/loader.go +++ b/config/settings/loader.go @@ -123,6 +123,8 @@ func setDefaults(v *viper.Viper) { // Database v.SetDefault("database.username", d.Database.Username) v.SetDefault("database.password", d.Database.Password) + v.SetDefault("database.redis.url", d.Database.Redis.URL) + v.SetDefault("database.redis.password", d.Database.Redis.Password) // Logging v.SetDefault("logging.level", d.Logging.Level) @@ -148,6 +150,7 @@ func setDefaults(v *viper.Viper) { v.SetDefault("logging.otel.endpoint", d.Logging.OTEL.Endpoint) v.SetDefault("logging.otel.protocol", d.Logging.OTEL.Protocol) v.SetDefault("logging.otel.insecure", d.Logging.OTEL.Insecure) + v.SetDefault("logging.otel.headers", d.Logging.OTEL.Headers) v.SetDefault("logging.otel.username", d.Logging.OTEL.Username) v.SetDefault("logging.otel.password", d.Logging.OTEL.Password) v.SetDefault("logging.otel.batch_size", d.Logging.OTEL.BatchSize) @@ -169,9 +172,30 @@ func setDefaults(v *viper.Viper) { v.SetDefault("fastsync.allowed_peers", d.FastSync.AllowedPeers) // Security + v.SetDefault("security.enabled", d.Security.Enabled) + v.SetDefault("security.cert_dir", d.Security.CertDir) + v.SetDefault("security.ip_cache_size", d.Security.IPCacheSize) + v.SetDefault("security.global_rate_limit", d.Security.GlobalRateLimit) + v.SetDefault("security.global_burst", d.Security.GlobalBurst) + v.SetDefault("security.trust_forwarded_headers", d.Security.TrustForwardedHeaders) + v.SetDefault("security.trusted_proxies", d.Security.TrustedProxies) + v.SetDefault("security.trusted_clients", d.Security.TrustedClients) v.SetDefault("security.explorer_api_key", d.Security.ExplorerAPIKey) v.SetDefault("security.jwt_secret", d.Security.JWTSecret) + // Register defaults for all predefined Security Services so Viper can pick up ENV overrides + for svcName, policy := range d.Security.Services { + prefix := "security.services." + svcName + "." + v.SetDefault(prefix+"tls", policy.TLS) + v.SetDefault(prefix+"auth_type", string(policy.AuthType)) + v.SetDefault(prefix+"token_env", policy.TokenEnv) + v.SetDefault(prefix+"rate_limit", policy.RateLimit) + v.SetDefault(prefix+"burst", policy.Burst) + v.SetDefault(prefix+"cert_file", policy.CertFile) + v.SetDefault(prefix+"key_file", policy.KeyFile) + v.SetDefault(prefix+"ca_file", policy.CAFile) + } + // Alerts v.SetDefault("alerts.url", d.Alerts.URL) v.SetDefault("alerts.api_key", d.Alerts.APIKey) diff --git a/docs/phases/account-enqueue-chunking.md b/docs/phases/account-enqueue-chunking.md new file mode 100644 index 00000000..cdb0a1aa --- /dev/null +++ b/docs/phases/account-enqueue-chunking.md @@ -0,0 +1,67 @@ +# Bounded Account Enqueue (Chunking) — Implementation Phases + +Mode: **Prod**. Scope: **accounts only**, consumer-side (`DB_OPs/Nodeinfo`). No library change, no `go.mod` bump. + +## Problem (evidence) +- Library client receive handler `core/sync/sync_protocols.go:666 HandleAccountsSyncData` ACKs each page (`:713`, before any DB) then accumulates **all** pages into one in-memory `batch` and calls `WriteAccountsBatch` **once at EOF** (`:720`). +- That single call hands `account_manager.WriteAccounts` (`immudb_account_manager.go:170`) the entire batch (up to millions). It does one `json.Marshal` + one `XADD` → a single huge message. Redis caps a bulk string at `proto-max-bulk-len` (512 MiB default): the message stalls or is rejected → EOF write fails *after* all pages were ACKed → server session fails → dispatcher retries the range (`DispatchACKTimeout=10s`, `DispatchMaxRetries=3`) → dead-letter storm → sync never converges. +- Consumer enqueue path is already async (returns before ImmuDB commit); the worker drain/ACK/XDEL contract is correct. The ONLY defect is the unbounded single message. + +## SOLID Gates +- **S** — Invariant owned by the producer methods: "deliver an account/update batch to the stream as bounded, individually-valid messages." (Worker owns the separate "ACK only after commit" invariant — unchanged.) +- **O** — New record kinds extend via the existing `syncPayloadType` tag + a `processBatch` case; the generic helper accepts any `[]T`. No switch edit needed to change chunk size (const) or add a caller. +- **I** — Helper depends only on `RedisStreamer.Enqueue` (1 method of the 8-method interface) — minimal surface; no new interface added. +- **D** — Helper depends on the `RedisStreamer` interface, not `*redis.Client`. No new concrete cross-package import. (Pre-existing `DB_OPs` import in the worker is out of scope.) + +## Pattern Selection +Primary pattern: **none new** — bounded iteration inside the established Producer (account_manager) → Adapter (RedisStreamer) structure. +Why: the fix is a loop + size bound, not a new abstraction. Adding a Strategy/Builder would be ceremony. +Trade-off: chunk size is a const, not injected — promote to config later via the documented extension point if needed. +Anti-pattern avoided: a "MessageChunker" service object / new interface for a 12-line helper. + +## Phase 1.0: Bounded enqueue helper + const + timeout +- What: add `maxRecordsPerMessage` const (500), `enqueueTimeout(chunks)` helper, and generic `enqueueRecordsChunked[T any](ctx, s RedisStreamer, ptype syncPayloadType, items []T) error` in `immudb_account_manager.go`. Best-effort over chunks, `errors.Join` aggregation. +- Data structures: input `[]T` (sequential, read-once, O(1) re-slicing into fixed chunks — no map, no copy). Bound: each marshalled message holds ≤ `maxRecordsPerMessage` records. +- Inputs: none (uses existing `RedisStreamer`, stream constants). +- Done when: helper compiles; never marshals more than `maxRecordsPerMessage` records into one message. +- Status: [x] + +## Phase 1.1: Rewire WriteAccounts + BatchUpdateAccounts +- Trigger: 1.0 helper exists; both producers must use it instead of the single `json.Marshal`+`XADD`. +- What: replace the one-shot marshal/enqueue in `WriteAccounts` (`:170`) and `BatchUpdateAccounts` (`:324`) with chunk-count computation + `enqueueRecordsChunked`. Sized context via `enqueueTimeout`. +- Done when: both methods enqueue N records as `ceil(N/500)` messages; error wraps record + message counts. +- Status: [x] + +## Phase 1.2: Docs — module headers / function docs +- Trigger: behavior of the two interface methods changed; worker `writeEntries` bound is now finite. +- What: update `WriteAccounts`/`BatchUpdateAccounts` doc comments (chunking + best-effort semantics); update `account_sync_worker.go` module header `[]dbEntry` growth-bound note (now `MaxDrainItems × maxRecordsPerMessage`, previously unbounded). +- Done when: doc comments reflect chunking; worker header bound corrected. +- Status: [x] + +## Phase 1.3: White-box test +- Trigger: helper is unexported; needs same-package test. +- What: `account_sync_enqueue_test.go` (package NodeInfo). Mock `RedisStreamer` records `Enqueue` payloads. Table: 0,1,499,500,501,1000,2500 → assert message count = ceil(n/500), each decoded chunk ≤ 500, sum == n, correct type tag. Failure case: every 3rd chunk errors → `errors.Join` non-nil AND remaining chunks still enqueued (best-effort). +- Deviation (documented): craftcode Phase 6 wants tests under `tests/`; Go package-internal visibility forces a same-dir `_test.go`. Matches existing repo convention (`DB_OPs/sqlops/sqlops_test.go`). +- Done when: `go test ./DB_OPs/Nodeinfo/ -run Enqueue` passes (no live infra needed — mock streamer). +- Status: [x] + +## Phase 2.0: Library durable-before-ACK (scope B — JMDN-FastSync) +Repo: `../JMDN-FastSync` (NOT this repo). Files: +- `common/types/constants/accounts_constants.go` — added `AccountReceiveFlushThreshold = 20_000` (documents the bound; the per-page rewrite makes peak receive memory one page/stream). +- `core/sync/sync_protocols.go HandleAccountsSyncData` — rewrote the receive loop from "accumulate whole stream → one `WriteAccountsBatch` at EOF" to **per-page: read → `WriteAccountsBatch` (WAL + Redis enqueue) → ACK**. Success = `BatchAck` (Ok); persist failure = `ErrBatchAck` (Ok=false) → dispatcher retries page → dead-letter on repeated failure. +- What it fixes: the client previously buffered the entire diff range (up to ~2.7M accounts, ~10 streams) in one slice → OOM. The server's 200k nonce-buffer cap does NOT bound the client. Now receive memory = one page (~3k) per stream. +- Server impact: **none** — stateless dispatcher unchanged; still one-ack-per-page contract; NAK rides the pre-existing retry→DLQ path (`DispatcherCallbacks.go:134`, `run.go handleFailure`). ACK now reflects true durability. +- Done when: `JMDN-FastSync` builds + vets clean; `jmdn` builds end-to-end against it. +- Status: [x] (code) / [ ] (published — see Integration) + +## Phase 2.1: Integration / publish +- Local verify (done): `go mod edit -replace github.com/JupiterMetaLabs/JMDN-FastSync=../JMDN-FastSync` in `jmdn/go.mod`; `CGO_ENABLED=1 go build ./...` → exit 0. **This replace is DEV-ONLY — must be reverted before merge.** +- Production path (NOT yet done — requires user): commit + push `JMDN-FastSync` (branch `fix/accountsync/performance`), obtain the new pseudo-version, then `go get github.com/JupiterMetaLabs/JMDN-FastSync@` in `jmdn` and remove the `replace`. Until then the library change is not in any published artifact. +- Status: [ ] + +## Non-goals (explicit) +- No fix to `parseUpdatesPayload` AccountType/DIDAddress behavior (separate concern). +- No worker config or drain-logic change. +- No change to the server dispatcher (statelessness preserved). + + diff --git a/docs/phases/accountsync-cursor-pagination.md b/docs/phases/accountsync-cursor-pagination.md new file mode 100644 index 00000000..8f73b234 --- /dev/null +++ b/docs/phases/accountsync-cursor-pagination.md @@ -0,0 +1,73 @@ +# AccountSync Performance Fixes — Implementation Phases + +## Context +AccountSync wall-clock >2 days on 10k blocks + 2.7M accounts. +All issues below are in this repo (`jmdn`). Issues in `JMDN-FastSync` library are tracked separately. + +## SOLID Gates +**S:** Each fix owns one invariant (scan, block read, type conversion). +**O:** New scan behaviour → pass `extendedPrefix`; no existing code modified. +**I:** No fat interfaces introduced. +**D:** No new cross-package concrete imports. + +## Pattern Selection +Iterator (Behavioral) for pagination; Facade (Structural) for fast block read variant. + +--- + +## Phase 1: Cursor-based pagination — DONE +- What: Replace `offset int` with `seekKey []byte` cursor in `immudbNonceIter`. + Add `ListAccountsPaginatedFrom` (ascending, cursor-based) in `account_immuclient.go`. + Remove dead `nonceToAccount map` + `sync.Mutex` from iterator. +- Impact: ~365M ImmuDB scan entries → ~2.7M. O(N²) → O(N). +- Files: `DB_OPs/account_immuclient.go`, `DB_OPs/Nodeinfo/immudb_account_manager.go` +- Done when: build passes, `offset` field gone from `immudbNonceIter`. ✅ + +--- + +## Phase 2: Fix `defer ReadCancel()` inside loop in `ListAccountsPaginated` — DONE +- What: Line 1085 — `defer ReadCancel()` is inside a `for` loop. Each iteration + schedules a cancel that only fires on function return, not on loop iteration end. + All cancel funcs accumulate for the function lifetime → goroutine/context leak. + Fix: call `ReadCancel()` immediately after the `Scan` call (not deferred). +- Files: `DB_OPs/account_immuclient.go` +- Done when: no `defer` inside the scan loop of `ListAccountsPaginated`. + +--- + +## Phase 3: Add `GetZKBlockByNumberFast` (plain Get, no proof generation) — DONE +- What: `GetZKBlockByNumber` uses `VerifiedGet` — generates a cryptographic Merkle + proof per read (5–10× slower than plain `Get`). Sync/reconciliation paths do not + need tamper-proof guarantees. Add `GetZKBlockByNumberFast` using `ic.Client.Get`. + Keep `GetZKBlockByNumber` (VerifiedGet) for client-facing verified queries. +- Data structures: none new; same `*config.ZKBlock` return type. +- Files: `DB_OPs/immuclient.go` +- Done when: `GetZKBlockByNumberFast` exported, compiles, uses plain `Get`. + +--- + +## Phase 4: `GetTransactionsByAccount` uses `GetZKBlockByNumberFast` — DONE +- What: `GetTransactionsByAccount` (line 1293) loops every block 0→latestBlock, + calling `GetZKBlockByNumber` (VerifiedGet) per block. This is called per tagged + account during reconciliation → O(accounts × blocks) VerifiedGet calls. + Switch to `GetZKBlockByNumberFast`. Also fix `GetTransactionsByAccountPaginated` + (line 1576) which has the same issue. +- Data structures: none new. +- Files: `DB_OPs/account_immuclient.go` +- Done when: both functions call `GetZKBlockByNumberFast`, no `GetZKBlockByNumber` + call remains inside a block-scan loop. + +--- + +## Phase 5: Remove JSON round-trip in `GetTransactionsForAccount` (#15) — DONE +- What: `immudb_account_manager.go:40-48` marshals each `config.Transaction` to JSON + then unmarshals into `types.DBTransaction` just to convert types. Direct field copy + eliminates two allocs + two reflect traversals per transaction. +- Files: `DB_OPs/Nodeinfo/immudb_account_manager.go` +- Done when: no `json.Marshal` / `json.Unmarshal` in the tx conversion loop. + +--- + +## Phase 6: Build verification — DONE +- What: `go build ./...` — zero errors, zero new import cycles. +- Done when: clean build across all changed packages. diff --git a/docs/phases/redis-accountsync-queue.md b/docs/phases/redis-accountsync-queue.md new file mode 100644 index 00000000..89728a74 --- /dev/null +++ b/docs/phases/redis-accountsync-queue.md @@ -0,0 +1,113 @@ +# Redis AccountSync Queue — Implementation Phases + +## Context + +**Problem:** `BatchRestoreAccounts` (ImmuDB commit) takes ~15 s. AccountSync callers time +out waiting, push to DLQ, retry, and waste throughput. + +**Solution:** `WriteAccounts` and `BatchUpdateAccounts` enqueue payloads to a Redis Stream +and return an immediate ACK. A single background worker (`XREADGROUP` + `XAUTOCLAIM`) +drains the stream, coalesces batches, and writes to ImmuDB asynchronously. + +## Design Decisions (locked) + +| Decision | Choice | Rationale | +|---|---|---| +| Interface contract | Unchanged (`types.AccountManager`) | External module; signatures fixed | +| Redis unavailable | Fail fast | Caller already has DLQ/retry; B degrades to 15 s latency | +| Worker lifecycle | Explicit `StartAccountSyncWorker(ctx, streamer, cfg)` from main.go | main.go owns all infra lifecycles | +| Queue mechanism | Redis Streams (`XADD`/`XREADGROUP`/`XACK`/`XAUTOCLAIM`) | Built-in PEL, ACK semantics, crash recovery | +| Batch coalescing | Drain `MaxDrainItems` entries per `XREADGROUP`; write in `MaxAccountsPerBatch` sub-batches | Reduces DB round trips under burst | +| ACK semantics | ACK only after `BatchRestoreAccounts` succeeds | At-least-once; `BatchRestoreAccounts` is LWW-idempotent | +| Redis client injection | Interface `RedisStreamer` injected via `StartAccountSyncWorker`; `NewRedisStreamer(*redis.Client)` adapter in package | DIP; no concrete cross-package import | + +## SOLID Gates + +**S — Single Responsibility** +- `account_sync_redis.go`: owns "define the Redis stream transport abstraction" +- `account_sync_worker.go`: owns "drain Redis stream → write to ImmuDB (at-least-once)" +- `immudb_account_manager.go`: owns "enqueue account sync payloads and return ACK immediately" + +**O — Open/Closed** +Extension point: new payload types (e.g., DID sync) → add `case` in `processBatch` switch + +new `enqueue*` helper in `immudb_account_manager.go`. Worker loop and stream infra untouched. + +**I — Interface Segregation** +`RedisStreamer` has exactly 5 methods: `Enqueue`, `EnsureConsumerGroup`, `ReadGroup`, `Ack`, +`AutoClaim`. All 5 are used by the worker. No caller sees unused methods. + +**D — Dependency Inversion** +Worker and account_manager both depend on `RedisStreamer` (interface in this package). +Only `redisStreamerAdapter` imports `*redis.Client` (concrete, local to the adapter). +No concrete cross-package import anywhere else in `DB_OPs/Nodeinfo`. + +## Pattern Selection + +**Primary pattern: Adapter** (Structural) +`redisStreamerAdapter` adapts the concrete `*redis.Client` API to the domain `RedisStreamer` +interface. Callers depend on the interface; the adapter is the only concrete import. + +**Secondary: Command** (Behavioral) +Each stream entry is a serialized command (account write operation) consumed by the worker. +Enables at-least-once replay via PEL without reissuing the original RPC. + +**Anti-pattern avoided:** Direct concrete dependency on `*redis.Client` throughout +`DB_OPs/Nodeinfo` — would couple the package to a specific Redis client library forever. + +--- + +## Phase 1.0: RedisStreamer interface + adapter +- **What:** New file `account_sync_redis.go`. + - `StreamEntry` struct + - `RedisStreamer` interface (5 methods; no go-redis types exposed) + - `redisStreamerAdapter` wrapping `*redis.Client` + - `NewRedisStreamer(*redis.Client) RedisStreamer` factory + - Package-level `pkgStreamer`/`pkgStreamerMu` + `setStreamer`/`getStreamer` + - Stream constants: `accountSyncStream`, `accountSyncGroup`, `accountSyncConsumer` + - Payload type constants: `payloadTypeAccounts`, `payloadTypeUpdates` +- **Data structures:** + - `StreamEntry`: ephemeral per read; unbounded count, capped by `MaxDrainItems` at call site. + - `pkgStreamer`: singleton reference; set once by Phase 2's `StartAccountSyncWorker`. +- **Inputs:** none +- **Done when:** package compiles; `NewRedisStreamer` returns a non-nil `RedisStreamer` +- **Status:** [x] + +## Phase 2.0: Worker — `account_sync_worker.go` +- **What:** New file with: + - `AccountSyncWorkerConfig` struct + `DefaultWorkerConfig()` + - `StartAccountSyncWorker(ctx, streamer, cfg) error` + - `runWorker` (XREADGROUP BLOCK loop, ctx-aware exit) + - `reclaimPending` (XAUTOCLAIM on startup for crash recovery) + - `processBatch` (parse → coalesce → sub-batch write → ACK; poison pill handling) + - `parseAccountsPayload` / `parseUpdatesPayload` + - `accountUpdateWire` (stable JSON wire type for `types.AccountUpdate`) + - `dbEntry` type alias for `struct { Key string; Value []byte }` +- **Data structures:** + - `[]StreamEntry`: ephemeral per `runWorker` iteration; bounded by `MaxDrainItems` (100) + - `[]dbEntry`: ephemeral per `processBatch`; bounded by `MaxDrainItems × avg-accounts-per-payload`; sub-batched by `MaxAccountsPerBatch` (500) + - PEL (Redis-side): unbounded count of unacked entries; evicted by XAUTOCLAIM after `PendingIdleTimeout` (30 s) +- **Inputs:** Phase 1.0 complete +- **Done when:** `StartAccountSyncWorker` compiles; worker exits cleanly on ctx cancel +- **Status:** [x] + +## Phase 3.0: Modify `immudb_account_manager.go` +- **What:** + - `WriteAccounts` → `getStreamer()` → `json.Marshal(accounts)` → `s.Enqueue(...)` → return + - `BatchUpdateAccounts` → convert to `[]accountUpdateWire` → `json.Marshal` → `s.Enqueue(...)` → return + - Remove: direct `DB_OPs.GetAccountConnectionandPutBack` + `DB_OPs.BatchRestoreAccounts` calls from these two methods +- **Data structures:** none introduced; removes ephemeral `[]struct{Key,Value}` from both methods +- **Inputs:** Phase 2.0 complete (`accountUpdateWire` defined there; same package) +- **Done when:** `go build ./DB_OPs/Nodeinfo/...` succeeds; both methods no longer block on ImmuDB +- **Status:** [x] + +## Phase 4.0: main.go wiring (caller's responsibility) +- **What:** In main.go (or lifecycle coordinator), after Redis client is initialized: + ```go + streamer := NodeInfo.NewRedisStreamer(redisClient) + if err := NodeInfo.StartAccountSyncWorker(rootCtx, streamer, NodeInfo.DefaultWorkerConfig()); err != nil { + log.Fatalf("account sync worker: %v", err) + } + ``` +- **Inputs:** Phase 3.0 complete +- **Done when:** node boots, worker log line appears, WriteAccounts returns in < 100 ms +- **Status:** [ ] — caller's responsibility diff --git a/fastsync/fastsync.go b/fastsync/fastsync.go index c3514460..bfefb709 100644 --- a/fastsync/fastsync.go +++ b/fastsync/fastsync.go @@ -1110,7 +1110,7 @@ func (fs *FastSync) batchCreateOrderedWithRetry(entries []struct { } case AccountsDB: fmt.Printf(">>> [DB] Calling BatchRestoreAccounts for AccountsDB with %d entries...\n", len(entries)) - err = DB_OPs.BatchRestoreAccounts(dbClient, entries) + err = DB_OPs.BatchRestoreAccounts(context.Background(), dbClient, entries) if err != nil { fmt.Printf(">>> [DB] ERROR: BatchRestoreAccounts failed for AccountsDB: %v\n", err) } else { diff --git a/go.mod b/go.mod index cbf285bc..eb14307a 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,10 @@ module gossipnode go 1.25.0 require ( - github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260520134039-a630fd87e742 + github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260601052219-40e74741de7c github.com/JupiterMetaLabs/JMDN_Merkletree v0.0.0-20260413092720-b819e61566f8 github.com/JupiterMetaLabs/goroutine-orchestrator v0.1.5 - github.com/JupiterMetaLabs/ion v0.3.5 + github.com/JupiterMetaLabs/ion v0.4.2 github.com/bits-and-blooms/bloom/v3 v3.7.1 github.com/codenotary/immudb v1.10.0 github.com/ethereum/go-ethereum v1.17.0 @@ -25,6 +25,7 @@ require ( github.com/olekukonko/tablewriter v0.0.5 github.com/prometheus/client_golang v1.23.2 github.com/prometheus/client_model v0.6.2 + github.com/redis/go-redis/v9 v9.19.0 github.com/rs/zerolog v1.34.0 github.com/spf13/viper v1.21.0 github.com/stretchr/testify v1.11.1 @@ -184,6 +185,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.42.0 // indirect go.opentelemetry.io/otel/trace v1.42.0 // indirect go.opentelemetry.io/proto/otlp v1.10.0 // indirect + go.uber.org/atomic v1.11.0 // indirect go.uber.org/dig v1.19.0 // indirect go.uber.org/fx v1.24.0 // indirect go.uber.org/mock v0.6.0 // indirect @@ -210,3 +212,5 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.4.1 // indirect ) + +replace github.com/JupiterMetaLabs/JMDN-FastSync => ../JMDN-FastSync diff --git a/go.sum b/go.sum index 062c9ff3..51b870f2 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,14 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260520134039-a630fd87e742 h1:HN0VLBOrfKRdPt/j+ALhvfhOJKOTILAJIS469h2RgAs= -github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260520134039-a630fd87e742/go.mod h1:0erT7gGH4TYtitRik+Y3GfxSa5KGLacr9rJovV3vNB0= +github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260601052219-40e74741de7c h1:2Kgkf8pb/FEkLllenyy48GsHda4501EvwHOSdEXabNY= +github.com/JupiterMetaLabs/JMDN-FastSync v0.0.0-20260601052219-40e74741de7c/go.mod h1:0erT7gGH4TYtitRik+Y3GfxSa5KGLacr9rJovV3vNB0= github.com/JupiterMetaLabs/JMDN_Merkletree v0.0.0-20260413092720-b819e61566f8 h1:yPrYb6g6NnqGsiCVqMf0zndEYTuelL3B03Fee+utLWA= github.com/JupiterMetaLabs/JMDN_Merkletree v0.0.0-20260413092720-b819e61566f8/go.mod h1:zM8F31G2SiPXzTo1WzbDFZ5iOOAkqrkuZjS0QVDW4ew= github.com/JupiterMetaLabs/goroutine-orchestrator v0.1.5 h1:S9+s6JeWSrGJ6ooYb4f8iRlJxwPUZ8X/EA4EgxKS3zc= github.com/JupiterMetaLabs/goroutine-orchestrator v0.1.5/go.mod h1:SNkJRVlUwZM7Lt5ZhojWaimBljLg/pV6IKgn8oyViOA= -github.com/JupiterMetaLabs/ion v0.3.5 h1:L5xg2rSuyxaMjY/y0uxQfNc5lg/hEHofVUec5Bok1Ik= -github.com/JupiterMetaLabs/ion v0.3.5/go.mod h1:R64AKOZ4AFLSr/Hp9eBBK1rwvQwuIUx5Ebhqerq63RU= +github.com/JupiterMetaLabs/ion v0.4.2 h1:hogqCgUAQuy6yvLUdXoFOtJlvczFVaRvHGB7NgnFFfc= +github.com/JupiterMetaLabs/ion v0.4.2/go.mod h1:7RPjP/Zo+qJ+PC/yhfz0/I7/i6rHDuopistQivoY8jc= github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251001021608-1fe7b43fc4d6 h1:1zYrtlhrZ6/b6SAjLSfKzWtdgqK0U+HtH/VcBWh1BaU= github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251001021608-1fe7b43fc4d6/go.mod h1:ioLG6R+5bUSO1oeGSDxOV3FADARuMoytZCSX6MEMQkI= github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da h1:KjTM2ks9d14ZYCvmHS9iAKVt9AyzRSqNU1qabPih5BY= @@ -27,6 +27,10 @@ github.com/bits-and-blooms/bitset v1.24.2 h1:M7/NzVbsytmtfHbumG+K2bremQPMJuqv1JD github.com/bits-and-blooms/bitset v1.24.2/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bits-and-blooms/bloom/v3 v3.7.1 h1:WXovk4TRKZttAMJfoQx6K2DM0zNIt8w+c67UqO+etV0= github.com/bits-and-blooms/bloom/v3 v3.7.1/go.mod h1:rZzYLLje2dfzXfAkJNxQQHsKurAyK55KUnL43Euk0hU= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ= github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA= github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA= @@ -347,6 +351,8 @@ github.com/quic-go/quic-go v0.59.0 h1:OLJkp1Mlm/aS7dpKgTc6cnpynnD2Xg7C1pwL6vy/SA github.com/quic-go/quic-go v0.59.0/go.mod h1:upnsH4Ju1YkqpLXC305eW3yDZ4NfnNbmQRCMWS58IKU= github.com/quic-go/webtransport-go v0.10.0 h1:LqXXPOXuETY5Xe8ITdGisBzTYmUOy5eSj+9n4hLTjHI= github.com/quic-go/webtransport-go v0.10.0/go.mod h1:LeGIXr5BQKE3UsynwVBeQrU1TPrbh73MGoC6jd+V7ow= +github.com/redis/go-redis/v9 v9.19.0 h1:XPVaaPSnG6RhYf7p+rmSa9zZfeVAnWsH5h3lxthOm/k= +github.com/redis/go-redis/v9 v9.19.0/go.mod h1:v/M13XI1PVCDcm01VtPFOADfZtHf8YW3baQf57KlIkA= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= @@ -428,6 +434,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs= +github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s= go.dedis.ch/dela v0.2.0 h1:ZwMvLzMBeVfl2LDIB4gQNsrRFIGPAuSLX2TwCz9zQas= go.dedis.ch/dela v0.2.0/go.mod h1:2qkjZawF0II6GCPFC8LnP6XaxHoq/IEbuLvcsM4wT8o= go.dedis.ch/fixbuf v1.0.3 h1:hGcV9Cd/znUxlusJ64eAlExS+5cJDIyTyEG+otu5wQs= @@ -481,6 +489,8 @@ go.opentelemetry.io/otel/trace v1.42.0/go.mod h1:f3K9S+IFqnumBkKhRJMeaZeNk9epyhn go.opentelemetry.io/proto/otlp v1.10.0 h1:IQRWgT5srOCYfiWnpqUYz9CVmbO8bFmKcwYxpuCSL2g= go.opentelemetry.io/proto/otlp v1.10.0/go.mod h1:/CV4QoCR/S9yaPj8utp3lvQPoqMtxXdzn7ozvvozVqk= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/dig v1.19.0 h1:BACLhebsYdpQ7IROQ1AGPjrXcP5dF80U3gKoFzbaq/4= go.uber.org/dig v1.19.0/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE= go.uber.org/fx v1.24.0 h1:wE8mruvpg2kiiL1Vqd0CC+tr0/24XIB10Iwp2lLWzkg= diff --git a/jmdn_default.yaml b/jmdn_default.yaml index e11c698b..ddc82a58 100644 --- a/jmdn_default.yaml +++ b/jmdn_default.yaml @@ -41,6 +41,9 @@ binds: database: username: "" password: "" + redis: + url: "127.0.0.1:6379" + password: "" # ── Logging (Ion) ──────────────────────────────────────── # Maps directly to Ion's config struct. All env vars like @@ -69,6 +72,7 @@ logging: endpoint: "" # e.g. "collector.example.com:4317" protocol: "grpc" insecure: false + headers: {} # e.g. {"Authorization": "Bearer "} username: "" # Prefer env: JMDN_LOGGING_OTEL_USERNAME password: "" # Prefer env: JMDN_LOGGING_OTEL_PASSWORD batch_size: 512 diff --git a/logging/otelsetup/setup.go b/logging/otelsetup/setup.go index 4509ada8..31a5db3d 100644 --- a/logging/otelsetup/setup.go +++ b/logging/otelsetup/setup.go @@ -66,16 +66,15 @@ func Setup(logDir string, logFileName string) (*ion.Ion, []ion.Warning, error) { // OTEL export if logCfg.OTEL.Enabled && logCfg.OTEL.Endpoint != "" { - cfg.OTEL = ion.OTELConfig{ - Enabled: true, - Endpoint: logCfg.OTEL.Endpoint, - Protocol: logCfg.OTEL.Protocol, - Insecure: logCfg.OTEL.Insecure, - Username: logCfg.OTEL.Username, - Password: logCfg.OTEL.Password, - BatchSize: logCfg.OTEL.BatchSize, - ExportInterval: logCfg.OTEL.ExportInterval, - } + cfg.OTEL.Enabled = true + cfg.OTEL.Endpoint = logCfg.OTEL.Endpoint + cfg.OTEL.Protocol = logCfg.OTEL.Protocol + cfg.OTEL.Insecure = logCfg.OTEL.Insecure + cfg.OTEL.Headers = logCfg.OTEL.Headers + cfg.OTEL.Username = logCfg.OTEL.Username + cfg.OTEL.Password = logCfg.OTEL.Password + cfg.OTEL.BatchSize = logCfg.OTEL.BatchSize + cfg.OTEL.ExportInterval = logCfg.OTEL.ExportInterval // Tracing (inherits OTEL endpoint) cfg.Tracing = ion.TracingConfig{ diff --git a/main.go b/main.go index 74a52ea1..a2008d14 100644 --- a/main.go +++ b/main.go @@ -27,6 +27,7 @@ import ( "gossipnode/CA/ImmuDB_CA" cli "gossipnode/CLI" "gossipnode/DB_OPs" + NodeInfo "gossipnode/DB_OPs/Nodeinfo" "gossipnode/DID" "gossipnode/Pubsub" "gossipnode/Security" @@ -51,6 +52,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" _ "github.com/mattn/go-sqlite3" + "github.com/redis/go-redis/v9" "github.com/rs/zerolog/log" ) @@ -862,6 +864,24 @@ func main() { log.Fatal().Err(err).Msg("Failed to initialize accounts database pool") } + // ── Account Sync Worker (Redis Stream) ─────────────────────────────────── + // WriteAccounts and BatchUpdateAccounts enqueue to a Redis Stream and return + // immediately, decoupling callers from the ~15 s ImmuDB commit latency. + // The worker drains the stream and writes batches to ImmuDB asynchronously. + // Required before FastsyncV2 starts — it calls WriteAccounts during sync. + if cfg.Database.Redis.URL == "" { + log.Warn().Msg("[AccountSyncWorker] database.redis.url not configured — WriteAccounts will fail; set url in jmdn.yaml or JMDN_DATABASE_REDIS_URL") + } else { + redisClient := redis.NewClient(&redis.Options{ + Addr: cfg.Database.Redis.URL, + Password: cfg.Database.Redis.Password, + }) + accountStreamer := NodeInfo.NewRedisStreamer(redisClient) + NodeInfo.StartAccountSyncWorker(accountStreamer, NodeInfo.DefaultWorkerConfig()) + log.Info().Str("redis_url", cfg.Database.Redis.URL).Msg("[accountqueue] installed — WriteAccounts is now async, worker starts lazily") + fmt.Println("✅ Account sync worker started (Redis Stream → ImmuDB async)") + } + // Discover Yggdrasil address BEFORE creating the node fmt.Println("Discovering Yggdrasil address...") ipv6, err := helper.GetTun0GlobalIPv6()