diff --git a/common/messaging/messaging.go b/common/messaging/messaging.go index dc87e23..9925bbb 100644 --- a/common/messaging/messaging.go +++ b/common/messaging/messaging.go @@ -16,6 +16,7 @@ import ( "github.com/JupiterMetaLabs/JMDN-FastSync/logging" "github.com/JupiterMetaLabs/ion" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "github.com/multiformats/go-multiaddr" @@ -670,3 +671,88 @@ type AccountSyncHandlers struct { // function returns. OnEnd func(*accountspb.AccountSyncEndOfStream) error } + +// streamReadWriter is the minimal interface WriteAccountPageAndReadACK needs. +// libp2p network.Stream and any types.AccountSyncStream implementation satisfy it. +type streamReadWriter interface { + io.ReadWriter + SetReadDeadline(t time.Time) error + SetWriteDeadline(t time.Time) error +} + +// OpenAccountsSyncDataStream connects to peerInfo and opens a stream on +// AccountsSyncDataProtocol without closing it — the caller owns the lifetime. +// +// Use this to open a persistent per-worker stream. Call WriteAccountPageAndReadACK +// in a loop on the returned stream, then Close() when the worker is done. +func OpenAccountsSyncDataStream( + ctx context.Context, + version uint16, + h host.Host, + peerInfo peer.AddrInfo, +) (network.Stream, error) { + if h == nil { + return nil, errors.New("host is nil") + } + if len(peerInfo.Addrs) == 0 { + return nil, errors.New("peer has no addresses") + } + + primaryAddr, fallbackAddr, err := SelectTransportAddrWithFallback(peerInfo.Addrs, version) + if err != nil { + return nil, fmt.Errorf("transport selection failed: %w", err) + } + + targetPeer := peer.AddrInfo{ID: peerInfo.ID, Addrs: []multiaddr.Multiaddr{primaryAddr}} + + connectCtx, cancel := context.WithTimeout(ctx, constants.StreamDeadline) + defer cancel() + + connectErr := h.Connect(connectCtx, targetPeer) + if connectErr != nil && version >= 2 && fallbackAddr != nil { + logging.Logger(logging.Transport).Warn(ctx, "primary transport failed, attempting TCP fallback", + ion.Err(connectErr)) + targetPeer.Addrs = []multiaddr.Multiaddr{fallbackAddr} + fallbackCtx, fallbackCancel := context.WithTimeout(ctx, constants.StreamDeadline) + defer fallbackCancel() + connectErr = h.Connect(fallbackCtx, targetPeer) + if connectErr != nil { + return nil, fmt.Errorf("failed to connect (QUIC and TCP fallback) to peer %s: %w", peerInfo.ID, connectErr) + } + } else if connectErr != nil { + return nil, fmt.Errorf("failed to connect to peer %s: %w", peerInfo.ID, connectErr) + } + + stream, err := h.NewStream(ctx, peerInfo.ID, constants.AccountsSyncDataProtocol) + if err != nil { + return nil, fmt.Errorf("open AccountsSyncDataProtocol stream: %w", err) + } + return stream, nil +} + +// WriteAccountPageAndReadACK writes one AccountSyncServerMessage to an open +// stream and reads back the client's AccountSyncServerMessage ACK. +// +// Does NOT open or close the stream — the caller owns its lifetime. +// deadline is applied independently to the write and read operations. +func WriteAccountPageAndReadACK( + stream streamReadWriter, + msg *accountspb.AccountSyncServerMessage, + deadline time.Duration, +) (*accountspb.AccountSyncServerMessage, error) { + if err := stream.SetWriteDeadline(time.Now().Add(deadline)); err != nil { + return nil, fmt.Errorf("set write deadline: %w", err) + } + if err := pbstream.WriteDelimited(stream, msg); err != nil { + return nil, fmt.Errorf("write account page: %w", err) + } + + if err := stream.SetReadDeadline(time.Now().Add(deadline)); err != nil { + return nil, fmt.Errorf("set read deadline: %w", err) + } + ack := &accountspb.AccountSyncServerMessage{} + if err := pbstream.ReadDelimited(stream, ack); err != nil { + return nil, fmt.Errorf("read account page ACK: %w", err) + } + return ack, nil +} diff --git a/common/types/constants/accounts_constants.go b/common/types/constants/accounts_constants.go index 699ee75..debc5f4 100644 --- a/common/types/constants/accounts_constants.go +++ b/common/types/constants/accounts_constants.go @@ -78,7 +78,7 @@ const ( // NonceBufferPauseThreshold is the soft upper limit on nonces buffered in // the dispatcher. Diff goroutines pause when this count is reached and // resume only when the count drops below NonceBufferResumePct % of this. - NonceBufferPauseThreshold = 100_000 + NonceBufferPauseThreshold = 200_000 // NonceBufferResumePct is the percentage of NonceBufferPauseThreshold at // which paused diff goroutines are allowed to resume. diff --git a/common/types/dispatcher.go b/common/types/dispatcher.go index 35b0de8..7370d3b 100644 --- a/common/types/dispatcher.go +++ b/common/types/dispatcher.go @@ -2,15 +2,20 @@ package types import ( "context" + "io" "time" ) // ─── AccountSync Dispatcher Types ───────────────────────────────────────────── -// -// These types define the configuration and observable outputs of the -// AccountSync async streaming dispatcher. They live in common/types so both -// the dispatcher implementation (core/protocol/router/helper/accounts) and -// the diff stage (accounts_tag.go) can reference them without import cycles. + +// AccountSyncStream is the minimal interface a persistent dispatch stream must +// satisfy. libp2p network.Stream satisfies this automatically — no adapter needed. +type AccountSyncStream interface { + io.ReadWriter + io.Closer + SetReadDeadline(t time.Time) error + SetWriteDeadline(t time.Time) error +} // DispatcherConfig is fully self-describing. After construction, no code path // reads the constants package — callers may override any field before passing @@ -76,28 +81,30 @@ type DispatchSummary struct { } // DispatcherCallbacks decouples the dispatcher from DB and networking details. -// FetchAccounts and SendPage are required; the On* callbacks are optional. // -// Using callbacks instead of direct dependencies keeps the dispatcher -// independently testable — pass lightweight mock functions in tests. +// FetchAccounts, OpenStream, and SendPageOnStream are required. +// The On* callbacks are optional (set to nil to skip). +// +// Change from single-stream-per-page to persistent-stream-per-worker: +// - OpenStream is called once per worker at startup. +// - SendPageOnStream writes one page to an already-open stream and reads the ACK. type DispatcherCallbacks struct { - // FetchAccounts fetches full account records for the given nonces from the - // DB. One call per page (~3000 nonces). The dispatcher sets balance="0" - // before sending; the DB value is intentionally ignored. - // - // Accounts not found for a given nonce are silently omitted. + // FetchAccounts fetches full account records for the given nonces. + // Called once per page. Implementations should use a connection pool + // to avoid creating a new DB session per call. FetchAccounts func(ctx context.Context, nonces []uint64) ([]*Account, error) - // SendPage delivers one page of accounts to the client and waits for the - // client's Ack. The caller handles proto conversion and network dialing. - // pageIndex is the 1-based sequence number for this page in the session. - SendPage func(ctx context.Context, pageIndex uint32, accounts []*Account) error + // OpenStream opens a persistent stream to the client for this worker. + // Called once per worker at startup (and once on reopen after a stream error). + OpenStream func(ctx context.Context) (AccountSyncStream, error) + + // SendPageOnStream delivers one page on an already-open stream and waits + // for the client's ACK. pageIndex is the 1-based sequence number. + SendPageOnStream func(ctx context.Context, stream AccountSyncStream, pageIndex uint32, accounts []*Account) error - // OnPageMetrics is called after every dispatch attempt (success or failure). - // Optional — set to nil to skip metrics. + // OnPageMetrics is called after every dispatch attempt. Optional. OnPageMetrics func(ctx context.Context, m DispatchPageMetrics) - // OnDeadLetter is called when a page exhausts all retries. - // Optional — set to nil to skip. + // OnDeadLetter is called when a page exhausts all retries. Optional. OnDeadLetter func(ctx context.Context, dead DeadLetterPage) } diff --git a/core/protocol/communication/communication.go b/core/protocol/communication/communication.go index ba5e1e3..3a1010e 100644 --- a/core/protocol/communication/communication.go +++ b/core/protocol/communication/communication.go @@ -55,6 +55,15 @@ type Communicator interface { // returns a single AccountSyncResponse (page_index=0). Used post-PoTS / // post-Reconciliation when specific missing accounts are identified by address or DID. FetchAccounts(ctx context.Context, peerNode types.Nodeinfo, req *accountspb.AccountSyncRequestAccounts) (*accountspb.AccountSyncResponse, error) + + // OpenAccountsDataStream connects to peerNode and opens a persistent stream on + // AccountsSyncDataProtocol without closing it. The caller owns the stream + // lifetime — call stream.Close() when the dispatch worker exits. + OpenAccountsDataStream(ctx context.Context, peerNode types.Nodeinfo) (types.AccountSyncStream, error) + + // SendAccountPageOnStream writes one AccountSyncServerMessage to an open + // stream and reads back the client's ACK. Does not open or close the stream. + SendAccountPageOnStream(ctx context.Context, stream types.AccountSyncStream, msg *accountspb.AccountSyncServerMessage) (*accountspb.AccountSyncServerMessage, error) } func NewCommunication(host host.Host, protocolVersion uint16) Communicator { @@ -387,6 +396,35 @@ func (c *communication) FetchAccounts( return resp, nil } +// OpenAccountsDataStream connects to peerNode and opens a persistent stream +// on AccountsSyncDataProtocol without closing it. The caller owns the stream. +func (c *communication) OpenAccountsDataStream( + ctx context.Context, + peerNode types.Nodeinfo, +) (types.AccountSyncStream, error) { + if c.host == nil { + return nil, errors.New("host is nil") + } + peerInfo := libp2p_peer.AddrInfo{ + ID: peerNode.PeerID, + Addrs: peerNode.Multiaddr, + } + stream, err := messaging.OpenAccountsSyncDataStream(ctx, c.protocolVersion, c.host, peerInfo) + if err != nil { + return nil, err + } + return stream, nil +} + +// SendAccountPageOnStream writes one page to an already-open stream and reads the ACK. +func (c *communication) SendAccountPageOnStream( + ctx context.Context, + stream types.AccountSyncStream, + msg *accountspb.AccountSyncServerMessage, +) (*accountspb.AccountSyncServerMessage, error) { + return messaging.WriteAccountPageAndReadACK(stream, msg, constants.DispatchACKTimeout) +} + func (c *communication) SendPoTSRequest( ctx context.Context, peerNode types.Nodeinfo, diff --git a/core/protocol/router/data_router.go b/core/protocol/router/data_router.go index 2db9627..3dbe793 100644 --- a/core/protocol/router/data_router.go +++ b/core/protocol/router/data_router.go @@ -1478,27 +1478,16 @@ func (router *Datarouter) FULL_SYNC(ctx context.Context, req *priorsyncpb.PriorS } } -// HandleAccountsSyncData stores one incoming account page received via the -// AccountsSyncDataProtocol dial-back and returns a BatchAck for the server. -func (router *Datarouter) HandleAccountsSyncData(ctx context.Context, resp *accountspb.AccountSyncResponse, remote *types.Nodeinfo) *accountspb.AccountSyncServerMessage { - if resp == nil { - return accountshelper.NewResultFactory(0).ErrBatchAck("nil response") +// WriteAccountsBatch persists all accounts accumulated from one dispatch worker's +// persistent stream in a single DB transaction. +func (router *Datarouter) WriteAccountsBatch(ctx context.Context, accounts []*accountspb.Account) error { + if len(accounts) == 0 { + return nil } - - f := accountshelper.NewResultFactory(resp.GetPageIndex()) - writer := clienthelper.NewClientWriter().SetSyncVars(ctx, *router.Nodeinfo, router.wal) defer writer.Close() - - status, err := writer.WriteAccounts(resp.GetAccounts()) - if err != nil { - return f.ErrBatchAck(err.Error()) - } - if !status { - return f.ErrBatchAck("write returned false with no error") - } - - return f.BatchAck() + _, err := writer.WriteAccounts(accounts) + return err } func (router *Datarouter) ACCOUNTS_SYNC(ctx context.Context, req *accountspb.AccountNonceSyncRequest, remote *types.Nodeinfo, sessionLockedART *accountshelper.LockedART) *accountspb.AccountSyncServerMessage { @@ -1542,7 +1531,14 @@ func (router *Datarouter) ACCOUNTS_SYNC(ctx context.Context, req *accountspb.Acc return f.ErrEndOfStream(err.Error()) } - callbacks := accountshelper.NewDispatcherCallbacks(router.Nodeinfo, *remote, router.Comm.StreamAccounts, req.Phase.Auth) + callbacks := accountshelper.NewDispatcherCallbacks( + router.Nodeinfo, + *remote, + router.Comm.OpenAccountsDataStream, + router.Comm.SendAccountPageOnStream, + constants.DispatchWorkers, + req.Phase.Auth, + ) d, dispErr := accountsdispatcher.New(accountsdispatcher.Default(), callbacks) if dispErr != nil { diff --git a/core/protocol/router/helper/accounts/DispatcherCallbacks.go b/core/protocol/router/helper/accounts/DispatcherCallbacks.go index 2716857..7740cfa 100644 --- a/core/protocol/router/helper/accounts/DispatcherCallbacks.go +++ b/core/protocol/router/helper/accounts/DispatcherCallbacks.go @@ -16,77 +16,95 @@ import ( "google.golang.org/protobuf/types/known/structpb" ) -// StreamAccountsFn is the dial-back function used to deliver one page to the client. -// The server opens a new AccountsSyncDataProtocol stream per call, sends the page, -// and reads the client's BatchAck. Each call is independent — no shared stream state. -type StreamAccountsFn func(ctx context.Context, remote types.Nodeinfo, msg *accountspb.AccountSyncServerMessage) (*accountspb.AccountSyncServerMessage, error) +// OpenStreamFn dials the client on AccountsSyncDataProtocol and returns an open +// persistent stream. Caller owns the stream lifetime. +type OpenStreamFn func(ctx context.Context, peerNode types.Nodeinfo) (types.AccountSyncStream, error) + +// SendPageOnStreamFn writes one AccountSyncServerMessage to an open stream and +// reads back the client's ACK. Does NOT open or close the stream. +type SendPageOnStreamFn func(ctx context.Context, stream types.AccountSyncStream, msg *accountspb.AccountSyncServerMessage) (*accountspb.AccountSyncServerMessage, error) // NewDispatcherCallbacks builds the types.DispatcherCallbacks for one AccountSync session. // // Parameters: -// - nodeinfo : provides BlockInfo → AccountManager → GetAccountsByNonces -// - remote : client's Nodeinfo used by streamAccounts to dial back per page -// - streamAccounts : dials the client on AccountsSyncDataProtocol, sends the page, reads BatchAck -// - auth : session auth token embedded in every AccountSyncResponse.Phase -// -// Each page is delivered on its own short-lived stream via streamAccounts — no shared -// stream mutex needed. DB fetches (FetchAccounts) and page deliveries run fully in -// parallel across all dispatch workers. +// - nodeinfo : provides BlockInfo → AccountManager pool for DB fetches +// - remote : client's Nodeinfo (bound into OpenStream closure) +// - openFn : comm.OpenAccountsDataStream — dials client, returns open stream +// - sendFn : comm.SendAccountPageOnStream — write+read on existing stream +// - numWorkers : number of AccountManagers to pre-create in the fetch pool +// - auth : session auth token embedded in every AccountSyncResponse.Phase // -// Time: O(1). Space: O(1). +// Each dispatch worker opens ONE stream at startup and reuses it for every page. +// DB fetches use a pool of numWorkers AccountManagers (one per worker) to avoid +// per-page session creation. func NewDispatcherCallbacks( nodeinfo *types.Nodeinfo, remote types.Nodeinfo, - streamAccounts StreamAccountsFn, + openFn OpenStreamFn, + sendFn SendPageOnStreamFn, + numWorkers int, auth *authpb.Auth, ) types.DispatcherCallbacks { return types.DispatcherCallbacks{ - FetchAccounts: buildFetchAccounts(nodeinfo), - SendPage: buildSendPage(remote, streamAccounts, auth), - OnPageMetrics: buildOnPageMetrics(), - OnDeadLetter: buildOnDeadLetter(nodeinfo, remote, streamAccounts, auth), + FetchAccounts: buildFetchAccountsWithPool(nodeinfo, numWorkers), + OpenStream: buildOpenStream(remote, openFn), + SendPageOnStream: buildSendPageOnStream(sendFn, auth), + OnPageMetrics: buildOnPageMetrics(), + OnDeadLetter: buildOnDeadLetter(nodeinfo, remote, openFn, sendFn, auth), } } // ─── FetchAccounts ──────────────────────────────────────────────────────────── -// buildFetchAccounts returns a closure that batch-fetches full account rows -// for up to NoncePageSize nonces via GetAccountsByNonces (single DB query). +// buildFetchAccountsWithPool pre-creates numWorkers AccountManagers and hands +// them out one-at-a-time via a buffered channel pool. // -// Time: O(n) where n = len(nonces). Space: O(n). -func buildFetchAccounts(nodeinfo *types.Nodeinfo) func(context.Context, []uint64) ([]*types.Account, error) { +// Each borrow creates an AccountNonceIterator for the nonce set, calls +// GetAccountsByNonces, closes the iterator, and returns the manager to the pool. +// No new AccountManager is created on the hot path — only numWorkers total are +// ever created for the lifetime of this session. +func buildFetchAccountsWithPool(nodeinfo *types.Nodeinfo, numWorkers int) func(context.Context, []uint64) ([]*types.Account, error) { + pool := make(chan types.AccountManager, numWorkers) + for i := 0; i < numWorkers; i++ { + pool <- nodeinfo.BlockInfo.NewAccountManager() + } return func(ctx context.Context, nonces []uint64) ([]*types.Account, error) { - iter := nodeinfo.BlockInfo.NewAccountManager().NewAccountNonceIterator(1) + var mgr types.AccountManager + select { + case mgr = <-pool: + case <-ctx.Done(): + return nil, ctx.Err() + } + defer func() { pool <- mgr }() + + iter := mgr.NewAccountNonceIterator(len(nonces)) defer iter.Close() return iter.GetAccountsByNonces(nonces) } } -// ─── SendPage ──────────────────────────────────────────────────────────────── +// ─── OpenStream ─────────────────────────────────────────────────────────────── -// buildSendPage returns a closure that dials the client and delivers one page. -// -// Steps: -// 1. Convert []*types.Account → proto with balance="0" -// 2. streamAccounts(ctx, remote, msg) → dials client on AccountsSyncDataProtocol, -// sends the AccountSyncResponse, reads back the client's BatchAck -// 3. Return error if the dial/send fails or BatchAck.Ok is false -// -// No mutex needed — each call opens its own stream so workers never share state. -// -// Time: O(n) where n = len(accounts) + one network round-trip. Space: O(n). -func buildSendPage( +// buildOpenStream returns a closure that opens a persistent stream to the client. +// Each dispatch worker calls this once at startup. +func buildOpenStream( remote types.Nodeinfo, - streamAccounts StreamAccountsFn, - auth *authpb.Auth, -) func(context.Context, uint32, []*types.Account) error { - return func(ctx context.Context, pageIndex uint32, accounts []*types.Account) error { - Log.Logger(Log.Sync).Info(ctx, "accountsync: sending page to client", - ion.Int("page_index", int(pageIndex)), - ion.Int("account_count", len(accounts)), - ion.String("to_peer", remote.PeerID.String()), - ) + openFn OpenStreamFn, +) func(context.Context) (types.AccountSyncStream, error) { + return func(ctx context.Context) (types.AccountSyncStream, error) { + return openFn(ctx, remote) + } +} +// ─── SendPageOnStream ───────────────────────────────────────────────────────── + +// buildSendPageOnStream returns a closure that serialises accounts to proto, +// writes the AccountSyncResponse on an existing stream, and reads the ACK. +func buildSendPageOnStream( + sendFn SendPageOnStreamFn, + auth *authpb.Auth, +) func(context.Context, types.AccountSyncStream, uint32, []*types.Account) error { + return func(ctx context.Context, stream types.AccountSyncStream, pageIndex uint32, accounts []*types.Account) error { msg := &accountspb.AccountSyncServerMessage{ Payload: &accountspb.AccountSyncServerMessage_Response{ Response: &accountspb.AccountSyncResponse{ @@ -103,7 +121,7 @@ func buildSendPage( }, } - ack, err := streamAccounts(ctx, remote, msg) + ack, err := sendFn(ctx, stream, msg) if err != nil { return fmt.Errorf("accountsync send page %d: %w", pageIndex, err) } @@ -120,7 +138,6 @@ func buildSendPage( Log.Logger(Log.Sync).Info(ctx, "accountsync: client ACKed page", ion.Int("page_index", int(pageIndex)), ion.Int("account_count", len(accounts)), - ion.String("from_peer", remote.PeerID.String()), ) return nil } @@ -128,9 +145,6 @@ func buildSendPage( // ─── OnPageMetrics ─────────────────────────────────────────────────────────── -// buildOnPageMetrics logs per-page delivery timings after every dispatch attempt. -// -// Time: O(1). Space: O(1). func buildOnPageMetrics() func(context.Context, types.DispatchPageMetrics) { return func(ctx context.Context, m types.DispatchPageMetrics) { if m.Success { @@ -156,20 +170,17 @@ func buildOnPageMetrics() func(context.Context, types.DispatchPageMetrics) { // ─── OnDeadLetter ──────────────────────────────────────────────────────────── // buildOnDeadLetter fires when a page exhausts all dispatcher retries. -// -// Makes one final synchronous attempt: -// 1. Re-fetches accounts from DB (transient errors may have cleared). -// 2. Calls streamAccounts — same dial-back contract as normal pages. -// PageIndex=0 signals a recovery page to the client. -// 3. On failure: logs and abandons. No further retries. -// -// Time: O(n). Space: O(n). +// Opens a one-off recovery stream, re-fetches accounts, makes one final send. func buildOnDeadLetter( nodeinfo *types.Nodeinfo, remote types.Nodeinfo, - streamAccounts StreamAccountsFn, + openFn OpenStreamFn, + sendFn SendPageOnStreamFn, auth *authpb.Auth, ) func(context.Context, types.DeadLetterPage) { + send := buildSendPageOnStream(sendFn, auth) + open := buildOpenStream(remote, openFn) + return func(ctx context.Context, dead types.DeadLetterPage) { Log.Logger(Log.Sync).Warn(ctx, "accountsync: dead-lettered — final recovery attempt", ion.Int("nonce_count", len(dead.Nonces)), @@ -177,38 +188,26 @@ func buildOnDeadLetter( ion.String("failure_hint", dead.FailureHint), ) - accounts, err := buildFetchAccounts(nodeinfo)(ctx, dead.Nonces) + iter := nodeinfo.BlockInfo.NewAccountManager().NewAccountNonceIterator(len(dead.Nonces)) + defer iter.Close() + accounts, err := iter.GetAccountsByNonces(dead.Nonces) if err != nil { Log.Logger(Log.Sync).Error(ctx, "accountsync: dead-letter fetch failed — permanently lost", err, ion.Int("nonce_count", len(dead.Nonces))) return } - msg := &accountspb.AccountSyncServerMessage{ - Payload: &accountspb.AccountSyncServerMessage_Response{ - Response: &accountspb.AccountSyncResponse{ - Accounts: ToProtoAccounts(accounts), - PageIndex: 0, - Ack: &ackpb.Ack{Ok: true}, - Phase: &phasepb.Phase{ - PresentPhase: constants.ACCOUNTS_SYNC_RESPONSE, - SuccessivePhase: constants.ACCOUNTS_SYNC_RESPONSE, - Success: true, - Auth: auth, - }, - }, - }, + stream, err := open(ctx) + if err != nil { + Log.Logger(Log.Sync).Error(ctx, "accountsync: dead-letter stream open failed — permanently lost", + err, ion.Int("nonce_count", len(dead.Nonces))) + return } + defer stream.Close() - ack, sendErr := streamAccounts(ctx, remote, msg) - if sendErr != nil { + if err := send(ctx, stream, 0, accounts); err != nil { Log.Logger(Log.Sync).Error(ctx, "accountsync: dead-letter send failed — permanently lost", - sendErr, ion.Int("nonce_count", len(dead.Nonces))) - return - } - if ack.GetBatchAck() != nil && !ack.GetBatchAck().GetAck().GetOk() { - Log.Logger(Log.Sync).Error(ctx, "accountsync: dead-letter ack rejected — permanently lost", - fmt.Errorf("%s", ack.GetBatchAck().GetAck().GetError()), ion.Int("nonce_count", len(dead.Nonces))) + err, ion.Int("nonce_count", len(dead.Nonces))) return } Log.Logger(Log.Sync).Info(ctx, "accountsync: dead-letter recovery succeeded", @@ -221,8 +220,6 @@ func buildOnDeadLetter( // ToProtoAccounts converts []*types.Account → []*accountspb.Account. // Balance is always "0" — Reconciliation fills actual balances post-DataSync. // Nil entries are silently skipped. -// -// Time: O(n). Space: O(n). func ToProtoAccounts(accounts []*types.Account) []*accountspb.Account { out := make([]*accountspb.Account, 0, len(accounts)) for _, acc := range accounts { diff --git a/core/protocol/router/helper/accounts/dispatcher/helper.go b/core/protocol/router/helper/accounts/dispatcher/helper.go index 523ca65..4a14234 100644 --- a/core/protocol/router/helper/accounts/dispatcher/helper.go +++ b/core/protocol/router/helper/accounts/dispatcher/helper.go @@ -47,8 +47,11 @@ func validateDispatcherConfig(cfg types.DispatcherConfig, callbacks types.Dispat if callbacks.FetchAccounts == nil { return errors.New("dispatcher: FetchAccounts callback is required") } - if callbacks.SendPage == nil { - return errors.New("dispatcher: SendPage callback is required") + if callbacks.OpenStream == nil { + return errors.New("dispatcher: OpenStream callback is required") + } + if callbacks.SendPageOnStream == nil { + return errors.New("dispatcher: SendPageOnStream callback is required") } return nil } \ No newline at end of file diff --git a/core/protocol/router/helper/accounts/dispatcher/run.go b/core/protocol/router/helper/accounts/dispatcher/run.go index ded5688..2b50bc0 100644 --- a/core/protocol/router/helper/accounts/dispatcher/run.go +++ b/core/protocol/router/helper/accounts/dispatcher/run.go @@ -84,16 +84,27 @@ func (d *AccountDispatcher) Run(ctx context.Context) (types.DispatchSummary, err // Time: O(1) per iteration overhead. // Space: O(NoncePageSize) per active page (one DB result set at a time). func (d *AccountDispatcher) dispatchWorker(ctx context.Context) { + stream, err := d.callbacks.OpenStream(ctx) + if err != nil { + return + } + defer stream.Close() + for { select { case <-d.done: - // All pages are finalised — nothing left to process. return case <-ctx.Done(): - // Session cancelled — exit immediately. return case page := <-d.nonceChan: - d.processPage(ctx, page) + streamOK := d.processPage(ctx, stream, page) + if !streamOK { + stream.Close() + stream, err = d.callbacks.OpenStream(ctx) + if err != nil { + return + } + } } } } @@ -103,13 +114,16 @@ func (d *AccountDispatcher) dispatchWorker(ctx context.Context) { // 1. Decrement nonceBufferCount (page left the channel) → signal resume to diff stage // 2. Assign a 1-based page index if this is the first delivery attempt // 3. Fetch full account records from the DB via FetchAccounts callback -// 4. Deliver to client via SendPage callback (with DispatchACKTimeout deadline) +// 4. Deliver to client via SendPageOnStream callback (with DispatchACKTimeout deadline) // 5. On success: record delivery, finalise page // 6. On failure: re-enqueue if retries remain; dead-letter if exhausted // +// Returns streamHealthy=false only when the send failed (stream may be broken). +// DB fetch failures return true — the stream itself is unaffected. +// // Time: O(n) where n = len(page.Nonces) — dominated by DB fetch. // Space: O(n) — one []*Account slice per call, freed after delivery. -func (d *AccountDispatcher) processPage(ctx context.Context, page noncePage) { +func (d *AccountDispatcher) processPage(ctx context.Context, stream types.AccountSyncStream, page noncePage) (streamHealthy bool) { // Step 1: page has left the channel — decrement buffer count and // potentially wake paused diff goroutines (hysteresis resume signal). newCount := d.nonceBufferCount.Add(-int64(len(page.Nonces))) @@ -126,15 +140,14 @@ func (d *AccountDispatcher) processPage(ctx context.Context, page noncePage) { dbMs := time.Since(dbStart).Milliseconds() if fetchErr != nil { - d.handleFailure(ctx, page, fetchErr, - types.DispatchPageMetrics{ - PageIndex: page.PageIndex, - NonceCount: len(page.Nonces), - DBFetchMS: dbMs, - Retries: page.Retries, - Success: false, - }) - return + d.handleFailure(ctx, page, fetchErr, types.DispatchPageMetrics{ + PageIndex: page.PageIndex, + NonceCount: len(page.Nonces), + DBFetchMS: dbMs, + Retries: page.Retries, + Success: false, + }) + return true // DB failure does not break the stream } // Step 4: deliver to client with a per-page ACK deadline. @@ -142,7 +155,7 @@ func (d *AccountDispatcher) processPage(ctx context.Context, page noncePage) { defer sendCancel() sendStart := time.Now() - sendErr := d.callbacks.SendPage(sendCtx, page.PageIndex, accounts) + sendErr := d.callbacks.SendPageOnStream(sendCtx, stream, page.PageIndex, accounts) sendMs := time.Since(sendStart).Milliseconds() metrics := types.DispatchPageMetrics{ @@ -156,7 +169,7 @@ func (d *AccountDispatcher) processPage(ctx context.Context, page noncePage) { if sendErr != nil { d.handleFailure(ctx, page, sendErr, metrics) - return + return false // stream may be broken } // Step 5: success — record delivery and finalise. @@ -169,6 +182,7 @@ func (d *AccountDispatcher) processPage(ctx context.Context, page noncePage) { d.inflight.Add(-1) d.tryCloseDone() + return true } // handleFailure decides whether to re-queue a failed page or dead-letter it. diff --git a/core/sync/sync_protocols.go b/core/sync/sync_protocols.go index 64e0418..32a46f0 100644 --- a/core/sync/sync_protocols.go +++ b/core/sync/sync_protocols.go @@ -666,21 +666,13 @@ func (s *Sync) HandleAccountsSync(ctx context.Context, node host.Host) error { func (s *Sync) HandleAccountsSyncData(ctx context.Context, node host.Host) error { node.SetStreamHandler(constants.AccountsSyncDataProtocol, func(str network.Stream) { defer str.Close() - + select { case <-ctx.Done(): return default: } - _ = str.SetReadDeadline(time.Now().Add(constants.StreamDeadline)) - defer str.SetReadDeadline(time.Time{}) - - page := &accountspb.AccountSyncServerMessage{} - if err := pbstream.ReadDelimited(str, page); err != nil { - return - } - remoteNodeInfo := &types.Nodeinfo{ PeerID: str.Conn().RemotePeer(), Multiaddr: []multiaddr.Multiaddr{str.Conn().RemoteMultiaddr()}, @@ -689,31 +681,54 @@ func (s *Sync) HandleAccountsSyncData(ctx context.Context, node host.Host) error s.Debug(ctx, constants.AccountsSyncDataProtocol, node, remoteNodeInfo) - _ = str.SetWriteDeadline(time.Now().Add(constants.StreamDeadline)) + var batch []*accountspb.Account + + defer str.SetReadDeadline(time.Time{}) defer str.SetWriteDeadline(time.Time{}) + for { + _ = str.SetReadDeadline(time.Now().Add(constants.DispatchACKTimeout)) + page := &accountspb.AccountSyncServerMessage{} + if err := pbstream.ReadDelimited(str, page); err != nil { + // EOF = server worker finished sending all its pages + break + } - resp := page.GetResponse() - if resp == nil { - _ = pbstream.WriteDelimited(str, &accountspb.AccountSyncServerMessage{ - Payload: &accountspb.AccountSyncServerMessage_BatchAck{ - BatchAck: &accountspb.AccountBatchAck{ - Ack: &ackpb.Ack{Ok: false, Error: "expected Response payload"}, + resp := page.GetResponse() + if resp == nil { + _ = str.SetWriteDeadline(time.Now().Add(constants.DispatchACKTimeout)) + _ = pbstream.WriteDelimited(str, &accountspb.AccountSyncServerMessage{ + Payload: &accountspb.AccountSyncServerMessage_BatchAck{ + BatchAck: &accountspb.AccountBatchAck{ + Ack: &ackpb.Ack{Ok: false, Error: "expected Response payload"}, + }, }, - }, - }) + }) + continue + } + + batch = append(batch, resp.GetAccounts()...) + + _ = str.SetWriteDeadline(time.Now().Add(constants.DispatchACKTimeout)) + ack := accountshelper.NewResultFactory(resp.GetPageIndex()).BatchAck() + _ = pbstream.WriteDelimited(str, ack) + } + + if len(batch) == 0 { return } - ack := s.Datarouter.HandleAccountsSyncData(ctx, resp, remoteNodeInfo) + if err := s.Datarouter.WriteAccountsBatch(ctx, batch); err != nil { + logging.Logger(logging.Sync).Error(ctx, "accountsync: batch write failed", err, + ion.Int("account_count", len(batch)), + ion.String("from_peer", remoteNodeInfo.PeerID.String()), + ) + return + } - logging.Logger(logging.Sync).Info(ctx, "accountsync: page received", - ion.Int("page_index", int(resp.GetPageIndex())), - ion.Int("account_count", len(resp.GetAccounts())), + logging.Logger(logging.Sync).Info(ctx, "accountsync: batch written to DB", + ion.Int("account_count", len(batch)), ion.String("from_peer", remoteNodeInfo.PeerID.String()), - ion.Bool("ok", ack.GetBatchAck().GetAck().GetOk()), ) - - _ = pbstream.WriteDelimited(str, ack) }) return nil } diff --git a/docs/PERF_BRANCH_REVIEW.md b/docs/PERF_BRANCH_REVIEW.md new file mode 100644 index 0000000..d613d47 --- /dev/null +++ b/docs/PERF_BRANCH_REVIEW.md @@ -0,0 +1,367 @@ +# AccountSync Performance Branch — Change Review + +**Branch**: `fix/accountsync/performance` +**Base**: `main` @ `a630fd8` +**Commits ahead**: +- `480040b` — types: add AccountSyncStream interface + DispatcherCallbacks refactor +- `00fd0eb` — WAL flushing error handling and logging + +--- + +## What problem does this branch solve? + +The original AccountSync dispatch had two hot-path costs that compounded at scale: + +1. **A new network stream was opened per page.** With 10 workers each handling hundreds of pages, that was hundreds of TCP/QUIC handshakes per sync session. +2. **A new `AccountManager` (DB session) was created per `FetchAccounts` call.** With 3,000 nonces per page and 10 workers running in parallel, that was continuous DB session churn. +3. **Each page was written to DB individually as it arrived on the client.** No batching — one DB transaction per page. + +--- + +## Change 1 — `common/types/dispatcher.go` + +### What changed +Added a new interface and rewired `DispatcherCallbacks`. + +```go +// NEW — AccountSyncStream interface +type AccountSyncStream interface { + io.ReadWriter + io.Closer + SetReadDeadline(t time.Time) error + SetWriteDeadline(t time.Time) error +} +``` + +`DispatcherCallbacks` struct — before vs after: + +| Field | Before | After | +|-------|--------|-------| +| (removed) | `SendPage func(ctx, pageIndex, accounts) error` | — | +| (added) | — | `OpenStream func(ctx) (AccountSyncStream, error)` | +| (added) | — | `SendPageOnStream func(ctx, stream, pageIndex, accounts) error` | +| unchanged | `FetchAccounts` | `FetchAccounts` | +| unchanged | `OnPageMetrics` | `OnPageMetrics` | +| unchanged | `OnDeadLetter` | `OnDeadLetter` | + +### Why +`SendPage` was a single callback that both opened a stream AND sent data. Splitting it into `OpenStream + SendPageOnStream` lets a worker open a stream once at startup and reuse it across all its pages. + +### Potential issues to look for +- `libp2p network.Stream` satisfies `AccountSyncStream` automatically (it implements all 4 methods) — but any mock or test double must now implement `SetReadDeadline` + `SetWriteDeadline` + `Close`, not just `Read`/`Write`. +- `OnDeadLetter` uses `OpenStream` internally to open a fresh one-off stream for recovery. If the client peer is already gone at dead-letter time, `OpenStream` will fail and the recovery attempt is silently abandoned (logged as error). **Is that the desired behaviour, or should it retry?** + +--- + +## Change 2 — `core/protocol/router/helper/accounts/DispatcherCallbacks.go` + +### What changed + +#### 2a — `buildFetchAccounts` → `buildFetchAccountsWithPool` + +Before: +```go +// NEW AccountManager created on EVERY FetchAccounts call +func buildFetchAccounts(nodeinfo *types.Nodeinfo) func(...) { + return func(ctx context.Context, nonces []uint64) ([]*types.Account, error) { + iter := nodeinfo.BlockInfo.NewAccountManager().NewAccountNonceIterator(1) + defer iter.Close() + return iter.GetAccountsByNonces(nonces) + } +} +``` + +After: +```go +// Pool of numWorkers AccountManagers created ONCE at session start +func buildFetchAccountsWithPool(nodeinfo *types.Nodeinfo, numWorkers int) func(...) { + pool := make(chan types.AccountManager, numWorkers) + for i := 0; i < numWorkers; i++ { + pool <- nodeinfo.BlockInfo.NewAccountManager() + } + return func(ctx context.Context, nonces []uint64) ([]*types.Account, error) { + var mgr types.AccountManager + select { + case mgr = <-pool: + case <-ctx.Done(): + return nil, ctx.Err() + } + defer func() { pool <- mgr }() + + iter := mgr.NewAccountNonceIterator(len(nonces)) + defer iter.Close() + return iter.GetAccountsByNonces(nonces) + } +} +``` + +**Potential issues:** +- Pool size = `numWorkers` (= `DispatchWorkers = 10`). Pool is a channel, so if all 10 workers hit `FetchAccounts` simultaneously, they each get their own manager with no contention — correct. +- `AccountManager` from jmdn (`immudb_account_manager`) is NOT a thread-safe struct. Two goroutines never share one manager since pool enforces 1-at-a-time borrow — correct. +- Pool is **never closed**. Managers are created at `buildFetchAccountsWithPool` call time and live for the session. If `AccountManager` holds a persistent DB connection, those connections remain open for the full AccountSync session. **Check: does jmdn's `NewAccountManager()` open a connection immediately, or on first use?** +- `batchSize` passed to `NewAccountNonceIterator` changed: was hardcoded `1`, now `len(nonces)`. **Check: does jmdn's `immudbNonceIter` use `batchSize` for anything other than cursor-based pagination? If it pre-allocates memory based on batchSize, passing 3000 is fine; if it has any upper bound check, verify 3000 doesn't hit it.** + +#### 2b — `buildSendPage` → `buildOpenStream` + `buildSendPageOnStream` + +Before: one callback that dialled the client, sent the page, and read the ACK — all in one call. + +After: two callbacks: +- `buildOpenStream` — thin closure: calls `openFn(ctx, remote)`, returns stream. +- `buildSendPageOnStream` — builds the proto message, calls `sendFn(ctx, stream, msg)`, checks ACK. + +Removed from `buildSendPageOnStream` log line: +```go +// REMOVED: +ion.String("from_peer", remote.PeerID.String()) +``` + +**Potential issue:** The peer ID is no longer logged on the success ACK log line. May make debugging harder when tracing multi-peer sessions. Consider re-adding it or attaching it to context. + +#### 2c — `buildOnDeadLetter` refactored + +Before: used `buildFetchAccounts(nodeinfo)` (which creates a new AccountManager inline) + called `streamAccounts` directly. + +After: uses `buildOpenStream`/`buildSendPageOnStream` internally — opens a one-off recovery stream and calls the same send path as normal pages. + +```go +stream, err := open(ctx) // one-off stream for dead-letter recovery +if err != nil { ... return } // silently abandoned if stream open fails +defer stream.Close() + +if err := send(ctx, stream, 0, accounts); err != nil { ... return } +``` + +**Potential issues:** +- Page index `0` is used for dead-letter recovery pages. Client side must handle `pageIndex=0` as a special recovery page, not a normal sequence number. **Verify the client-side handler (`HandleAccountsSyncData`) doesn't skip or miscount index 0.** +- ACK check was removed from dead-letter path. Before, the code explicitly checked `ack.GetBatchAck().GetAck().GetOk()`. Now `buildSendPageOnStream` handles ACK internally but `send(ctx, stream, 0, accounts)` only returns an `error` — the ACK check is inside `buildSendPageOnStream`. Confirm ACK rejection is still surfaced as an error. + +--- + +## Change 3 — `core/protocol/router/helper/accounts/dispatcher/run.go` + +### What changed + +#### `dispatchWorker` — opens one stream at startup + +Before: no stream; called `callbacks.SendPage(ctx, pageIndex, accounts)` which opened/closed a stream internally per page. + +After: +```go +func (d *AccountDispatcher) dispatchWorker(ctx context.Context) { + stream, err := d.callbacks.OpenStream(ctx) + if err != nil { + return // ← worker exits silently if stream open fails + } + defer stream.Close() + + for { + select { + case <-d.done: return + case <-ctx.Done(): return + case page := <-d.nonceChan: + streamOK := d.processPage(ctx, stream, page) + if !streamOK { + stream.Close() + stream, err = d.callbacks.OpenStream(ctx) + if err != nil { + return // ← worker exits silently if reopen fails + } + } + } + } +} +``` + +**Potential issues:** +- If `OpenStream` fails at startup, the worker exits **silently** with no error recorded, no dead-letter, no inflight decrement. Pages assigned to this worker remain in `nonceChan` and will be drained by other workers — but if ALL workers fail to open streams, nonces sit in the channel until `ctx` times out. **Should a failed worker fire an error signal or at minimum log the failure?** Currently only the outer `Run()` ctx timeout will surface this. +- On stream reopen after a send failure: the failed page was already sent to `handleFailure` for re-queue/dead-letter. The stream is reopened on the NEXT iteration. This is correct — the page lifecycle is handled before the reopen. +- `stream.Close()` is called before reopen. If `Close()` errors (e.g. already half-closed by remote), that error is silently dropped. Fine for libp2p streams but worth noting. + +#### `processPage` — now returns `streamHealthy bool` + +```go +// DB fetch failure → return true (stream is fine) +// Send failure → return false (stream may be broken) +// Success → return true +``` + +**Potential issue:** A send failure causes `handleFailure` to re-queue the page (if retries remain) AND causes the worker to close/reopen the stream. On reopen, the re-queued page may be picked up by the SAME worker on the next iteration or by a DIFFERENT worker. Both are fine — pages are stateless. But verify that `handleFailure` re-queue path correctly restores `nonceBufferCount` before the stream reopen happens (it does — `handleFailure` runs before `processPage` returns `false`). + +--- + +## Change 4 — `core/protocol/communication/communication.go` + +### What changed +Two new methods added to the `Communicator` interface and the `communication` struct: + +```go +// NEW on Communicator interface +OpenAccountsDataStream(ctx context.Context, peerNode types.Nodeinfo) (types.AccountSyncStream, error) +SendAccountPageOnStream(ctx context.Context, stream types.AccountSyncStream, msg *accountspb.AccountSyncServerMessage) (*accountspb.AccountSyncServerMessage, error) +``` + +`OpenAccountsDataStream` — dials the client peer on `AccountsSyncDataProtocol`, returns the raw stream without closing it. Caller owns lifetime. + +`SendAccountPageOnStream` — delegates to `messaging.WriteAccountPageAndReadACK(stream, msg, constants.DispatchACKTimeout)`. + +**Potential issues:** +- Any existing mock/stub of `Communicator` interface (in tests) must now implement these two methods or it won't compile. **Check all test files that mock `Communicator`.** +- `DispatchACKTimeout = 10s` is hardcoded here. If a page has 3,000 accounts, proto serialisation + network round-trip must fit in 10s. **Is 10s sufficient for the expected account payload size on mainnet? `3000 × ~200 bytes ≈ 600KB` per page — should be fine on LAN/QUIC but verify on real network conditions.** + +--- + +## Change 5 — `common/messaging/messaging.go` + +### What changed +Two new functions added: + +```go +// Opens a persistent stream — caller owns Close() +func OpenAccountsSyncDataStream(ctx, version, host, peerInfo) (network.Stream, error) + +// Write one page + read ACK on an already-open stream +func WriteAccountPageAndReadACK(stream streamReadWriter, msg, deadline) (*AccountSyncServerMessage, error) +``` + +`WriteAccountPageAndReadACK` uses a local `streamReadWriter` interface (unexported) that `network.Stream` satisfies automatically. + +**Potential issues:** +- `OpenAccountsSyncDataStream` applies `constants.StreamDeadline` (15s) to the connect phase, but no deadline is set on the stream itself after open. Deadlines are applied per-write and per-read inside `WriteAccountPageAndReadACK`. This is intentional — the stream is reused across many pages. **Confirm there is no scenario where the stream sits idle long enough to trigger a libp2p-level timeout without an explicit deadline reset.** +- The connect timeout context is created with `connectCtx, cancel := context.WithTimeout(ctx, constants.StreamDeadline)` then `defer cancel()`. But `h.NewStream` is called with `ctx` (the outer context), not `connectCtx`. This means the `NewStream` call uses the full session lifetime context, not the 15s connect timeout. **Is that intentional? A hung `NewStream` won't be bounded by `StreamDeadline`.** + +--- + +## Change 6 — `core/sync/sync_protocols.go` — `HandleAccountsSyncData` + +This is the **client-side** stream handler. This is the biggest behavioural change. + +### Before +``` +stream arrives → read ONE page → write to DB immediately → send ACK → stream closes +``` + +### After +``` +stream arrives → loop: + read page → ACK immediately → accumulate into batch[] +until EOF → WriteAccountsBatch(all pages) → single DB write → stream closes +``` + +Full new handler: +```go +var batch []*accountspb.Account + +for { + _ = str.SetReadDeadline(time.Now().Add(constants.DispatchACKTimeout)) + page := &accountspb.AccountSyncServerMessage{} + if err := pbstream.ReadDelimited(str, page); err != nil { + break // EOF = server worker done + } + + resp := page.GetResponse() + if resp == nil { + // send error ACK, continue + continue + } + + batch = append(batch, resp.GetAccounts()...) + + ack := accountshelper.NewResultFactory(resp.GetPageIndex()).BatchAck() + _ = pbstream.WriteDelimited(str, ack) +} + +if len(batch) == 0 { return } + +s.Datarouter.WriteAccountsBatch(ctx, batch) +``` + +**Potential issues — this section warrants the most scrutiny:** + +1. **ACK is sent before DB write.** The server receives the ACK and considers the page delivered. But the batch is only written to DB after the entire stream closes. If the process crashes between the last ACK and `WriteAccountsBatch`, those accounts are permanently lost with no error surfaced to the server. The server's summary would show them as delivered. + > **Question for review: is WAL involved here? Is there a WAL write on the client side before ACK?** If not, this is a durability gap compared to the old per-page write. + +2. **`batch` grows unbounded in memory.** With `DispatchWorkers=10` workers each sending up to `ceil(N/3000)` pages, one stream carries `ceil(N/10/3000)` pages. For N=10M accounts, one worker sends ~333 pages × 3000 accounts × ~200 bytes ≈ **200MB in a single `batch` slice**. **Is this acceptable memory usage on the receiving node?** + +3. **`SetReadDeadline` uses `DispatchACKTimeout = 10s` per page.** If the server worker is slow to send the next page (e.g. DB fetch takes >10s on the server), the client's read deadline fires first, `ReadDelimited` returns an error, and the loop breaks — treating the stream as complete even though the server has more pages. **Is 10s sufficient for the server's DB fetch + serialise + write cycle under load?** + +4. **On `WriteAccountsBatch` error**, the handler returns with only a log line. The server has already received all ACKs and thinks everything is delivered. There is no mechanism to signal the server that the client-side batch write failed. **Should there be an error channel or a final error frame on the stream before `str.Close()`?** + +5. **`defer str.SetReadDeadline(time.Time{})` and `defer str.SetWriteDeadline(time.Time{})` clear deadlines on exit.** This is fine — libp2p stream is being closed anyway (`defer str.Close()` at the top of the handler). But clearing deadlines before `Close()` is technically a no-op since the stream is about to be torn down. No issue, just noise. + +6. **`pageIndex=0` (dead-letter recovery pages)** — the batch accumulator does NOT distinguish between regular pages and recovery pages. They all go into the same `batch`. This is correct — index 0 is just a recovery signal for the server side, the client always just accumulates accounts. + +--- + +## Change 7 — `core/protocol/router/data_router.go` + +### What changed + +`HandleAccountsSyncData` renamed to `WriteAccountsBatch`: + +```go +// Before: HandleAccountsSyncData — per-page write + ACK construction +func (router *Datarouter) HandleAccountsSyncData(ctx, resp, remote) *AccountSyncServerMessage + +// After: WriteAccountsBatch — single batch write, no ACK +func (router *Datarouter) WriteAccountsBatch(ctx context.Context, accounts []*accountspb.Account) error +``` + +`NewDispatcherCallbacks` call updated to pass `OpenAccountsDataStream` + `SendAccountPageOnStream` instead of `StreamAccounts`: + +```go +// Before +callbacks := accountshelper.NewDispatcherCallbacks( + router.Nodeinfo, *remote, router.Comm.StreamAccounts, req.Phase.Auth, +) + +// After +callbacks := accountshelper.NewDispatcherCallbacks( + router.Nodeinfo, *remote, + router.Comm.OpenAccountsDataStream, + router.Comm.SendAccountPageOnStream, + constants.DispatchWorkers, + req.Phase.Auth, +) +``` + +**Potential issues:** +- `WriteAccountsBatch` calls `clienthelper.NewClientWriter().SetSyncVars(ctx, *router.Nodeinfo, router.wal)` and then `writer.WriteAccounts(accounts)`. **Verify `WriteAccounts` can handle slices as large as a full worker's page set (potentially tens of thousands of accounts) atomically. Does the underlying immudb write have a batch size cap?** +- `Datarouter` is stateless (per memory rules), so no concern there. + +--- + +## Change 8 — Tests (`tests/accountssync/`) + +Three new test files added: + +| File | What it tests | +|------|--------------| +| `dispatcher_types_test.go` | Compile-time check: `mockStream` satisfies `AccountSyncStream`; `DispatcherCallbacks` zero value has nil `OpenStream`/`SendPageOnStream` | +| `communication_stream_test.go` | Compile-time check: `communication.NewCommunication` returns a `Communicator` with the two new methods | +| `messaging_stream_test.go` | Integration test: `WriteAccountPageAndReadACK` over a `net.Pipe()` — writes a page, server reads + ACKs, caller verifies ACK ok=true | + +**Potential issues:** +- All three tests are **compile-time or minimal wire tests**. There are no tests for: + - Worker startup failure (`OpenStream` returns error → worker silent exit) + - Stream reopen path (send failure → `streamOK=false` → reopen) + - Client-side batch accumulation and `WriteAccountsBatch` + - Dead-letter recovery path with the new open+send pattern + - Memory bounds of the client batch under large account counts +- `TestCommunicatorHasStreamMethods` passes `nil` as host to `NewCommunication`. If `NewCommunication` ever does a nil-check and panics, this test would fail. Currently it doesn't — but it's fragile. + +--- + +## Summary — Questions for your review + +| # | Area | Question | +|---|------|----------| +| 1 | Client ACK before DB write | If the process crashes between last ACK and `WriteAccountsBatch`, accounts are lost. Is WAL protecting against this? | +| 2 | Client batch memory | ~200MB worst-case per worker stream for 10M accounts. Acceptable on the receiving node? | +| 3 | Read deadline on client loop | `DispatchACKTimeout=10s` per page. Is this enough for server DB fetch + send under load? | +| 4 | No error feedback to server after batch write failure | Server sees all ACKs as success. Should client send an error frame before closing the stream on `WriteAccountsBatch` failure? | +| 5 | Silent worker exit | If `OpenStream` fails at worker startup, the worker exits with no error recorded. Should it log + signal? | +| 6 | Pool connection lifetime | `numWorkers=10` AccountManagers are created at session start and never closed. If `NewAccountManager()` holds a DB connection, those stay open for the full session. Is that fine with jmdn's immudb client? | +| 7 | `NewStream` timeout | `h.NewStream` in `OpenAccountsSyncDataStream` uses the full parent `ctx`, not the 15s connect timeout. A hung stream open won't be bounded. Intentional? | +| 8 | Dead-letter ACK check removed | The explicit `ack.GetBatchAck().GetAck().GetOk()` check was removed from `buildOnDeadLetter`. ACK rejection is now surfaced only via `buildSendPageOnStream` error return. Confirm this still surfaces correctly. | +| 9 | `batchSize=len(nonces)` in pool | `NewAccountNonceIterator(len(nonces))` — was `1` before. Does jmdn's iterator use this value in a way that changes behaviour or memory allocation? | +| 10 | Missing peer ID in send log | `ion.String("from_peer", ...)` removed from `buildSendPageOnStream` success log. Acceptable for debugging? | diff --git a/tests/accountssync/communication_stream_test.go b/tests/accountssync/communication_stream_test.go new file mode 100644 index 0000000..cf86dce --- /dev/null +++ b/tests/accountssync/communication_stream_test.go @@ -0,0 +1,19 @@ +package accountssync_test + +import ( + "testing" + + "github.com/JupiterMetaLabs/JMDN-FastSync/core/protocol/communication" +) + +// TestCommunicatorHasStreamMethods verifies the two new methods exist on the +// Communicator interface. If either method is missing this file will not compile. +func TestCommunicatorHasStreamMethods(t *testing.T) { + // Compile-time check: communication.NewCommunication must return a Communicator + // that includes OpenAccountsDataStream and SendAccountPageOnStream. + // The zero-value nil check is just to reference the type without a live host. + var c communication.Communicator = communication.NewCommunication(nil, 0) + if c == nil { + t.Fatal("expected non-nil communicator") + } +} diff --git a/tests/accountssync/dispatcher_types_test.go b/tests/accountssync/dispatcher_types_test.go new file mode 100644 index 0000000..548cf66 --- /dev/null +++ b/tests/accountssync/dispatcher_types_test.go @@ -0,0 +1,32 @@ +package accountssync_test + +import ( + "io" + "testing" + "time" + + "github.com/JupiterMetaLabs/JMDN-FastSync/common/types" +) + +// mockStream satisfies types.AccountSyncStream for compile-time verification. +type mockStream struct{} + +func (m *mockStream) Read(p []byte) (int, error) { return 0, io.EOF } +func (m *mockStream) Write(p []byte) (int, error) { return len(p), nil } +func (m *mockStream) Close() error { return nil } +func (m *mockStream) SetReadDeadline(t time.Time) error { return nil } +func (m *mockStream) SetWriteDeadline(t time.Time) error { return nil } + +func TestAccountSyncStreamInterface(t *testing.T) { + var _ types.AccountSyncStream = &mockStream{} +} + +func TestDispatcherCallbacksHasOpenStream(t *testing.T) { + cb := types.DispatcherCallbacks{} + if cb.OpenStream != nil { + t.Error("OpenStream should be nil on zero value") + } + if cb.SendPageOnStream != nil { + t.Error("SendPageOnStream should be nil on zero value") + } +} diff --git a/tests/accountssync/messaging_stream_test.go b/tests/accountssync/messaging_stream_test.go new file mode 100644 index 0000000..e535f76 --- /dev/null +++ b/tests/accountssync/messaging_stream_test.go @@ -0,0 +1,63 @@ +package accountssync_test + +import ( + "fmt" + "net" + "testing" + "time" + + accountspb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/accounts" + ackpb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/ack" + "github.com/JupiterMetaLabs/JMDN-FastSync/common/messaging" + "github.com/JupiterMetaLabs/JMDN-FastSync/internal/pbstream" +) + +// netPipeStream wraps net.Conn with deadline methods matching streamReadWriter. +type netPipeStream struct{ net.Conn } + +func (p *netPipeStream) SetReadDeadline(t time.Time) error { return p.Conn.SetDeadline(t) } +func (p *netPipeStream) SetWriteDeadline(t time.Time) error { return p.Conn.SetDeadline(t) } + +func TestWriteAccountPageAndReadACK(t *testing.T) { + serverConn, clientConn := net.Pipe() + defer serverConn.Close() + defer clientConn.Close() + + msg := &accountspb.AccountSyncServerMessage{ + Payload: &accountspb.AccountSyncServerMessage_Response{ + Response: &accountspb.AccountSyncResponse{PageIndex: 7}, + }, + } + wantAck := &accountspb.AccountSyncServerMessage{ + Payload: &accountspb.AccountSyncServerMessage_BatchAck{ + BatchAck: &accountspb.AccountBatchAck{ + Ack: &ackpb.Ack{Ok: true}, + }, + }, + } + + done := make(chan error, 1) + go func() { + got := &accountspb.AccountSyncServerMessage{} + if err := pbstream.ReadDelimited(serverConn, got); err != nil { + done <- err + return + } + if got.GetResponse().GetPageIndex() != 7 { + done <- fmt.Errorf("expected page index 7, got %d", got.GetResponse().GetPageIndex()) + return + } + done <- pbstream.WriteDelimited(serverConn, wantAck) + }() + + ack, err := messaging.WriteAccountPageAndReadACK(&netPipeStream{clientConn}, msg, 2*time.Second) + if err != nil { + t.Fatalf("WriteAccountPageAndReadACK error: %v", err) + } + if !ack.GetBatchAck().GetAck().GetOk() { + t.Fatal("expected ok=true in ACK") + } + if serverErr := <-done; serverErr != nil { + t.Fatalf("server error: %v", serverErr) + } +}