Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions common/messaging/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/JupiterMetaLabs/JMDN-FastSync/logging"
"github.com/JupiterMetaLabs/ion"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -670,3 +671,88 @@ type AccountSyncHandlers struct {
// function returns.
OnEnd func(*accountspb.AccountSyncEndOfStream) error
}

// streamReadWriter is the minimal interface WriteAccountPageAndReadACK needs.
// libp2p network.Stream and any types.AccountSyncStream implementation satisfy it.
type streamReadWriter interface {
io.ReadWriter
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
}

// OpenAccountsSyncDataStream connects to peerInfo and opens a stream on
// AccountsSyncDataProtocol without closing it — the caller owns the lifetime.
//
// Use this to open a persistent per-worker stream. Call WriteAccountPageAndReadACK
// in a loop on the returned stream, then Close() when the worker is done.
func OpenAccountsSyncDataStream(
ctx context.Context,
version uint16,
h host.Host,
peerInfo peer.AddrInfo,
) (network.Stream, error) {
if h == nil {
return nil, errors.New("host is nil")
}
if len(peerInfo.Addrs) == 0 {
return nil, errors.New("peer has no addresses")
}

primaryAddr, fallbackAddr, err := SelectTransportAddrWithFallback(peerInfo.Addrs, version)
if err != nil {
return nil, fmt.Errorf("transport selection failed: %w", err)
}

targetPeer := peer.AddrInfo{ID: peerInfo.ID, Addrs: []multiaddr.Multiaddr{primaryAddr}}

connectCtx, cancel := context.WithTimeout(ctx, constants.StreamDeadline)
defer cancel()

connectErr := h.Connect(connectCtx, targetPeer)
if connectErr != nil && version >= 2 && fallbackAddr != nil {
logging.Logger(logging.Transport).Warn(ctx, "primary transport failed, attempting TCP fallback",
ion.Err(connectErr))
targetPeer.Addrs = []multiaddr.Multiaddr{fallbackAddr}
fallbackCtx, fallbackCancel := context.WithTimeout(ctx, constants.StreamDeadline)
defer fallbackCancel()
connectErr = h.Connect(fallbackCtx, targetPeer)
if connectErr != nil {
return nil, fmt.Errorf("failed to connect (QUIC and TCP fallback) to peer %s: %w", peerInfo.ID, connectErr)
}
} else if connectErr != nil {
return nil, fmt.Errorf("failed to connect to peer %s: %w", peerInfo.ID, connectErr)
}

stream, err := h.NewStream(ctx, peerInfo.ID, constants.AccountsSyncDataProtocol)
if err != nil {
return nil, fmt.Errorf("open AccountsSyncDataProtocol stream: %w", err)
}
return stream, nil
}

// WriteAccountPageAndReadACK writes one AccountSyncServerMessage to an open
// stream and reads back the client's AccountSyncServerMessage ACK.
//
// Does NOT open or close the stream — the caller owns its lifetime.
// deadline is applied independently to the write and read operations.
func WriteAccountPageAndReadACK(
stream streamReadWriter,
msg *accountspb.AccountSyncServerMessage,
deadline time.Duration,
) (*accountspb.AccountSyncServerMessage, error) {
if err := stream.SetWriteDeadline(time.Now().Add(deadline)); err != nil {
return nil, fmt.Errorf("set write deadline: %w", err)
}
if err := pbstream.WriteDelimited(stream, msg); err != nil {
return nil, fmt.Errorf("write account page: %w", err)
}

if err := stream.SetReadDeadline(time.Now().Add(deadline)); err != nil {
return nil, fmt.Errorf("set read deadline: %w", err)
}
ack := &accountspb.AccountSyncServerMessage{}
if err := pbstream.ReadDelimited(stream, ack); err != nil {
return nil, fmt.Errorf("read account page ACK: %w", err)
}
return ack, nil
}
2 changes: 1 addition & 1 deletion common/types/constants/accounts_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const (
// NonceBufferPauseThreshold is the soft upper limit on nonces buffered in
// the dispatcher. Diff goroutines pause when this count is reached and
// resume only when the count drops below NonceBufferResumePct % of this.
NonceBufferPauseThreshold = 100_000
NonceBufferPauseThreshold = 200_000

// NonceBufferResumePct is the percentage of NonceBufferPauseThreshold at
// which paused diff goroutines are allowed to resume.
Expand Down
49 changes: 28 additions & 21 deletions common/types/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ package types

import (
"context"
"io"
"time"
)

// ─── AccountSync Dispatcher Types ─────────────────────────────────────────────
//
// These types define the configuration and observable outputs of the
// AccountSync async streaming dispatcher. They live in common/types so both
// the dispatcher implementation (core/protocol/router/helper/accounts) and
// the diff stage (accounts_tag.go) can reference them without import cycles.

// AccountSyncStream is the minimal interface a persistent dispatch stream must
// satisfy. libp2p network.Stream satisfies this automatically — no adapter needed.
type AccountSyncStream interface {
io.ReadWriter
io.Closer
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
}

// DispatcherConfig is fully self-describing. After construction, no code path
// reads the constants package — callers may override any field before passing
Expand Down Expand Up @@ -76,28 +81,30 @@ type DispatchSummary struct {
}

// DispatcherCallbacks decouples the dispatcher from DB and networking details.
// FetchAccounts and SendPage are required; the On* callbacks are optional.
//
// Using callbacks instead of direct dependencies keeps the dispatcher
// independently testable — pass lightweight mock functions in tests.
// FetchAccounts, OpenStream, and SendPageOnStream are required.
// The On* callbacks are optional (set to nil to skip).
//
// Change from single-stream-per-page to persistent-stream-per-worker:
// - OpenStream is called once per worker at startup.
// - SendPageOnStream writes one page to an already-open stream and reads the ACK.
type DispatcherCallbacks struct {
// FetchAccounts fetches full account records for the given nonces from the
// DB. One call per page (~3000 nonces). The dispatcher sets balance="0"
// before sending; the DB value is intentionally ignored.
//
// Accounts not found for a given nonce are silently omitted.
// FetchAccounts fetches full account records for the given nonces.
// Called once per page. Implementations should use a connection pool
// to avoid creating a new DB session per call.
FetchAccounts func(ctx context.Context, nonces []uint64) ([]*Account, error)

// SendPage delivers one page of accounts to the client and waits for the
// client's Ack. The caller handles proto conversion and network dialing.
// pageIndex is the 1-based sequence number for this page in the session.
SendPage func(ctx context.Context, pageIndex uint32, accounts []*Account) error
// OpenStream opens a persistent stream to the client for this worker.
// Called once per worker at startup (and once on reopen after a stream error).
OpenStream func(ctx context.Context) (AccountSyncStream, error)

// SendPageOnStream delivers one page on an already-open stream and waits
// for the client's ACK. pageIndex is the 1-based sequence number.
SendPageOnStream func(ctx context.Context, stream AccountSyncStream, pageIndex uint32, accounts []*Account) error

// OnPageMetrics is called after every dispatch attempt (success or failure).
// Optional — set to nil to skip metrics.
// OnPageMetrics is called after every dispatch attempt. Optional.
OnPageMetrics func(ctx context.Context, m DispatchPageMetrics)

// OnDeadLetter is called when a page exhausts all retries.
// Optional — set to nil to skip.
// OnDeadLetter is called when a page exhausts all retries. Optional.
OnDeadLetter func(ctx context.Context, dead DeadLetterPage)
}
38 changes: 38 additions & 0 deletions core/protocol/communication/communication.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ type Communicator interface {
// returns a single AccountSyncResponse (page_index=0). Used post-PoTS /
// post-Reconciliation when specific missing accounts are identified by address or DID.
FetchAccounts(ctx context.Context, peerNode types.Nodeinfo, req *accountspb.AccountSyncRequestAccounts) (*accountspb.AccountSyncResponse, error)

// OpenAccountsDataStream connects to peerNode and opens a persistent stream on
// AccountsSyncDataProtocol without closing it. The caller owns the stream
// lifetime — call stream.Close() when the dispatch worker exits.
OpenAccountsDataStream(ctx context.Context, peerNode types.Nodeinfo) (types.AccountSyncStream, error)

// SendAccountPageOnStream writes one AccountSyncServerMessage to an open
// stream and reads back the client's ACK. Does not open or close the stream.
SendAccountPageOnStream(ctx context.Context, stream types.AccountSyncStream, msg *accountspb.AccountSyncServerMessage) (*accountspb.AccountSyncServerMessage, error)
}

func NewCommunication(host host.Host, protocolVersion uint16) Communicator {
Expand Down Expand Up @@ -387,6 +396,35 @@ func (c *communication) FetchAccounts(
return resp, nil
}

// OpenAccountsDataStream connects to peerNode and opens a persistent stream
// on AccountsSyncDataProtocol without closing it. The caller owns the stream.
func (c *communication) OpenAccountsDataStream(
ctx context.Context,
peerNode types.Nodeinfo,
) (types.AccountSyncStream, error) {
if c.host == nil {
return nil, errors.New("host is nil")
}
peerInfo := libp2p_peer.AddrInfo{
ID: peerNode.PeerID,
Addrs: peerNode.Multiaddr,
}
stream, err := messaging.OpenAccountsSyncDataStream(ctx, c.protocolVersion, c.host, peerInfo)
if err != nil {
return nil, err
}
return stream, nil
}

// SendAccountPageOnStream writes one page to an already-open stream and reads the ACK.
func (c *communication) SendAccountPageOnStream(
ctx context.Context,
stream types.AccountSyncStream,
msg *accountspb.AccountSyncServerMessage,
) (*accountspb.AccountSyncServerMessage, error) {
return messaging.WriteAccountPageAndReadACK(stream, msg, constants.DispatchACKTimeout)
}

func (c *communication) SendPoTSRequest(
ctx context.Context,
peerNode types.Nodeinfo,
Expand Down
34 changes: 15 additions & 19 deletions core/protocol/router/data_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -1478,27 +1478,16 @@ func (router *Datarouter) FULL_SYNC(ctx context.Context, req *priorsyncpb.PriorS
}
}

// HandleAccountsSyncData stores one incoming account page received via the
// AccountsSyncDataProtocol dial-back and returns a BatchAck for the server.
func (router *Datarouter) HandleAccountsSyncData(ctx context.Context, resp *accountspb.AccountSyncResponse, remote *types.Nodeinfo) *accountspb.AccountSyncServerMessage {
if resp == nil {
return accountshelper.NewResultFactory(0).ErrBatchAck("nil response")
// WriteAccountsBatch persists all accounts accumulated from one dispatch worker's
// persistent stream in a single DB transaction.
func (router *Datarouter) WriteAccountsBatch(ctx context.Context, accounts []*accountspb.Account) error {
if len(accounts) == 0 {
return nil
}

f := accountshelper.NewResultFactory(resp.GetPageIndex())

writer := clienthelper.NewClientWriter().SetSyncVars(ctx, *router.Nodeinfo, router.wal)
defer writer.Close()

status, err := writer.WriteAccounts(resp.GetAccounts())
if err != nil {
return f.ErrBatchAck(err.Error())
}
if !status {
return f.ErrBatchAck("write returned false with no error")
}

return f.BatchAck()
_, err := writer.WriteAccounts(accounts)
return err
}

func (router *Datarouter) ACCOUNTS_SYNC(ctx context.Context, req *accountspb.AccountNonceSyncRequest, remote *types.Nodeinfo, sessionLockedART *accountshelper.LockedART) *accountspb.AccountSyncServerMessage {
Expand Down Expand Up @@ -1542,7 +1531,14 @@ func (router *Datarouter) ACCOUNTS_SYNC(ctx context.Context, req *accountspb.Acc
return f.ErrEndOfStream(err.Error())
}

callbacks := accountshelper.NewDispatcherCallbacks(router.Nodeinfo, *remote, router.Comm.StreamAccounts, req.Phase.Auth)
callbacks := accountshelper.NewDispatcherCallbacks(
router.Nodeinfo,
*remote,
router.Comm.OpenAccountsDataStream,
router.Comm.SendAccountPageOnStream,
constants.DispatchWorkers,
req.Phase.Auth,
)

d, dispErr := accountsdispatcher.New(accountsdispatcher.Default(), callbacks)
if dispErr != nil {
Expand Down
Loading
Loading