Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions core/protocol/router/helper/accounts/dispatcher/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,29 @@ func (d *AccountDispatcher) Run(ctx context.Context) (types.DispatchSummary, err
//
// Workers never close nonceChan — they only read from it.
//
// Stream lifecycle: the per-worker stream is opened LAZILY, on the first page
// the worker actually has to send (and re-opened the same way after a send
// failure). This is deliberate. libp2p uses lazy multistream negotiation once
// the protocol is in the peer's peerstore (via Identify), so the negotiation
// handshake is deferred until the first Write on the stream. If a worker opened
// its stream up-front and then blocked on nonceChan for longer than the remote's
// negotiation timeout (DefaultNegotiationTimeout = 10s), the client would reset
// the un-negotiated stream with StreamProtocolNegotiationFailed (0x1001), and
// the worker's eventual first write would fail instantly with "stream reset by
// remote, error code: 4097". Opening on the first page guarantees the first
// write immediately follows the open, so negotiation always completes before any
// idle window. Once negotiated, idling on nonceChan between pages is harmless
// (yamux keepalive holds the stream open).
//
// Time: O(1) per iteration overhead.
// Space: O(NoncePageSize) per active page (one DB result set at a time).
func (d *AccountDispatcher) dispatchWorker(ctx context.Context) {
stream, err := d.callbacks.OpenStream(ctx)
if err != nil {
return
}
defer stream.Close()
var stream types.AccountSyncStream
defer func() {
if stream != nil {
stream.Close()
}
}()

for {
select {
Expand All @@ -97,13 +112,30 @@ func (d *AccountDispatcher) dispatchWorker(ctx context.Context) {
case <-ctx.Done():
return
case page := <-d.nonceChan:
// Open lazily on the first page (or after a prior failure reset
// stream to nil) so the first write immediately follows the open.
if stream == nil {
s, err := d.callbacks.OpenStream(ctx)
if err != nil {
// Could not open a stream — fail this page (re-queue or
// dead-letter). The stream itself never existed, so there
// is nothing to close; the next page will retry the open.
d.handleFailure(ctx, page, fmt.Errorf("open dispatch stream: %w", err),
types.DispatchPageMetrics{
PageIndex: page.PageIndex,
NonceCount: len(page.Nonces),
Retries: page.Retries,
Success: false,
})
continue
}
stream = s
}

streamOK := d.processPage(ctx, stream, page)
if !streamOK {
stream.Close()
stream, err = d.callbacks.OpenStream(ctx)
if err != nil {
return
}
stream = nil // reopen lazily on the next page
}
}
}
Expand Down
Loading