diff --git a/core/protocol/router/helper/accounts/dispatcher/run.go b/core/protocol/router/helper/accounts/dispatcher/run.go index 2b50bc0..73b24db 100644 --- a/core/protocol/router/helper/accounts/dispatcher/run.go +++ b/core/protocol/router/helper/accounts/dispatcher/run.go @@ -81,14 +81,29 @@ func (d *AccountDispatcher) Run(ctx context.Context) (types.DispatchSummary, err // // Workers never close nonceChan — they only read from it. // +// Stream lifecycle: the per-worker stream is opened LAZILY, on the first page +// the worker actually has to send (and re-opened the same way after a send +// failure). This is deliberate. libp2p uses lazy multistream negotiation once +// the protocol is in the peer's peerstore (via Identify), so the negotiation +// handshake is deferred until the first Write on the stream. If a worker opened +// its stream up-front and then blocked on nonceChan for longer than the remote's +// negotiation timeout (DefaultNegotiationTimeout = 10s), the client would reset +// the un-negotiated stream with StreamProtocolNegotiationFailed (0x1001), and +// the worker's eventual first write would fail instantly with "stream reset by +// remote, error code: 4097". Opening on the first page guarantees the first +// write immediately follows the open, so negotiation always completes before any +// idle window. Once negotiated, idling on nonceChan between pages is harmless +// (yamux keepalive holds the stream open). +// // Time: O(1) per iteration overhead. // Space: O(NoncePageSize) per active page (one DB result set at a time). func (d *AccountDispatcher) dispatchWorker(ctx context.Context) { - stream, err := d.callbacks.OpenStream(ctx) - if err != nil { - return - } - defer stream.Close() + var stream types.AccountSyncStream + defer func() { + if stream != nil { + stream.Close() + } + }() for { select { @@ -97,13 +112,30 @@ func (d *AccountDispatcher) dispatchWorker(ctx context.Context) { case <-ctx.Done(): return case page := <-d.nonceChan: + // Open lazily on the first page (or after a prior failure reset + // stream to nil) so the first write immediately follows the open. + if stream == nil { + s, err := d.callbacks.OpenStream(ctx) + if err != nil { + // Could not open a stream — fail this page (re-queue or + // dead-letter). The stream itself never existed, so there + // is nothing to close; the next page will retry the open. + d.handleFailure(ctx, page, fmt.Errorf("open dispatch stream: %w", err), + types.DispatchPageMetrics{ + PageIndex: page.PageIndex, + NonceCount: len(page.Nonces), + Retries: page.Retries, + Success: false, + }) + continue + } + stream = s + } + streamOK := d.processPage(ctx, stream, page) if !streamOK { stream.Close() - stream, err = d.callbacks.OpenStream(ctx) - if err != nil { - return - } + stream = nil // reopen lazily on the next page } } }