diff --git a/common/proto/accounts/accounts.pb.go b/common/proto/accounts/accounts.pb.go index 101afb8..d2d9493 100644 --- a/common/proto/accounts/accounts.pb.go +++ b/common/proto/accounts/accounts.pb.go @@ -237,6 +237,62 @@ func (x *AccountNonceSyncRequest) GetChecksum() *checksum.Checksum { return nil } +// AccountSyncRequestAccounts is a stateless targeted fetch request. +// map gives set semantics at the wire level — duplicate addresses +// are structurally impossible (map keys are unique in protobuf). +// Used post-PoTS / post-Reconciliation when tagging identifies missing accounts. +type AccountSyncRequestAccounts struct { + state protoimpl.MessageState `protogen:"open.v1"` + Addresses map[string]bool `protobuf:"bytes,1,rep,name=addresses,proto3" json:"addresses,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"` // key = address hex or DID string + Phase *phase.Phase `protobuf:"bytes,2,opt,name=phase,proto3" json:"phase,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AccountSyncRequestAccounts) Reset() { + *x = AccountSyncRequestAccounts{} + mi := &file_accounts_accounts_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AccountSyncRequestAccounts) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AccountSyncRequestAccounts) ProtoMessage() {} + +func (x *AccountSyncRequestAccounts) ProtoReflect() protoreflect.Message { + mi := &file_accounts_accounts_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AccountSyncRequestAccounts.ProtoReflect.Descriptor instead. +func (*AccountSyncRequestAccounts) Descriptor() ([]byte, []int) { + return file_accounts_accounts_proto_rawDescGZIP(), []int{2} +} + +func (x *AccountSyncRequestAccounts) GetAddresses() map[string]bool { + if x != nil { + return x.Addresses + } + return nil +} + +func (x *AccountSyncRequestAccounts) GetPhase() *phase.Phase { + if x != nil { + return x.Phase + } + return nil +} + // AccountBatchAck acknowledges receipt of one ART batch. // Client waits for this before sending the next batch. type AccountBatchAck struct { @@ -249,7 +305,7 @@ type AccountBatchAck struct { func (x *AccountBatchAck) Reset() { *x = AccountBatchAck{} - mi := &file_accounts_accounts_proto_msgTypes[2] + mi := &file_accounts_accounts_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -261,7 +317,7 @@ func (x *AccountBatchAck) String() string { func (*AccountBatchAck) ProtoMessage() {} func (x *AccountBatchAck) ProtoReflect() protoreflect.Message { - mi := &file_accounts_accounts_proto_msgTypes[2] + mi := &file_accounts_accounts_proto_msgTypes[3] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -274,7 +330,7 @@ func (x *AccountBatchAck) ProtoReflect() protoreflect.Message { // Deprecated: Use AccountBatchAck.ProtoReflect.Descriptor instead. func (*AccountBatchAck) Descriptor() ([]byte, []int) { - return file_accounts_accounts_proto_rawDescGZIP(), []int{2} + return file_accounts_accounts_proto_rawDescGZIP(), []int{3} } func (x *AccountBatchAck) GetBatchIndex() uint32 { @@ -303,7 +359,7 @@ type AccountSyncHeartbeat struct { func (x *AccountSyncHeartbeat) Reset() { *x = AccountSyncHeartbeat{} - mi := &file_accounts_accounts_proto_msgTypes[3] + mi := &file_accounts_accounts_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -315,7 +371,7 @@ func (x *AccountSyncHeartbeat) String() string { func (*AccountSyncHeartbeat) ProtoMessage() {} func (x *AccountSyncHeartbeat) ProtoReflect() protoreflect.Message { - mi := &file_accounts_accounts_proto_msgTypes[3] + mi := &file_accounts_accounts_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -328,7 +384,7 @@ func (x *AccountSyncHeartbeat) ProtoReflect() protoreflect.Message { // Deprecated: Use AccountSyncHeartbeat.ProtoReflect.Descriptor instead. func (*AccountSyncHeartbeat) Descriptor() ([]byte, []int) { - return file_accounts_accounts_proto_rawDescGZIP(), []int{3} + return file_accounts_accounts_proto_rawDescGZIP(), []int{4} } func (x *AccountSyncHeartbeat) GetTimestamp() int64 { @@ -356,7 +412,7 @@ type AccountSyncResponse struct { func (x *AccountSyncResponse) Reset() { *x = AccountSyncResponse{} - mi := &file_accounts_accounts_proto_msgTypes[4] + mi := &file_accounts_accounts_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -368,7 +424,7 @@ func (x *AccountSyncResponse) String() string { func (*AccountSyncResponse) ProtoMessage() {} func (x *AccountSyncResponse) ProtoReflect() protoreflect.Message { - mi := &file_accounts_accounts_proto_msgTypes[4] + mi := &file_accounts_accounts_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -381,7 +437,7 @@ func (x *AccountSyncResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use AccountSyncResponse.ProtoReflect.Descriptor instead. func (*AccountSyncResponse) Descriptor() ([]byte, []int) { - return file_accounts_accounts_proto_rawDescGZIP(), []int{4} + return file_accounts_accounts_proto_rawDescGZIP(), []int{5} } func (x *AccountSyncResponse) GetAccounts() []*Account { @@ -438,7 +494,7 @@ type AccountSyncEndOfStream struct { func (x *AccountSyncEndOfStream) Reset() { *x = AccountSyncEndOfStream{} - mi := &file_accounts_accounts_proto_msgTypes[5] + mi := &file_accounts_accounts_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -450,7 +506,7 @@ func (x *AccountSyncEndOfStream) String() string { func (*AccountSyncEndOfStream) ProtoMessage() {} func (x *AccountSyncEndOfStream) ProtoReflect() protoreflect.Message { - mi := &file_accounts_accounts_proto_msgTypes[5] + mi := &file_accounts_accounts_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -463,7 +519,7 @@ func (x *AccountSyncEndOfStream) ProtoReflect() protoreflect.Message { // Deprecated: Use AccountSyncEndOfStream.ProtoReflect.Descriptor instead. func (*AccountSyncEndOfStream) Descriptor() ([]byte, []int) { - return file_accounts_accounts_proto_rawDescGZIP(), []int{5} + return file_accounts_accounts_proto_rawDescGZIP(), []int{6} } func (x *AccountSyncEndOfStream) GetTotalPages() uint32 { @@ -515,7 +571,7 @@ type AccountSyncServerMessage struct { func (x *AccountSyncServerMessage) Reset() { *x = AccountSyncServerMessage{} - mi := &file_accounts_accounts_proto_msgTypes[6] + mi := &file_accounts_accounts_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -527,7 +583,7 @@ func (x *AccountSyncServerMessage) String() string { func (*AccountSyncServerMessage) ProtoMessage() {} func (x *AccountSyncServerMessage) ProtoReflect() protoreflect.Message { - mi := &file_accounts_accounts_proto_msgTypes[6] + mi := &file_accounts_accounts_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -540,7 +596,7 @@ func (x *AccountSyncServerMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use AccountSyncServerMessage.ProtoReflect.Descriptor instead. func (*AccountSyncServerMessage) Descriptor() ([]byte, []int) { - return file_accounts_accounts_proto_rawDescGZIP(), []int{6} + return file_accounts_accounts_proto_rawDescGZIP(), []int{7} } func (x *AccountSyncServerMessage) GetPayload() isAccountSyncServerMessage_Payload { @@ -641,7 +697,13 @@ const file_accounts_accounts_proto_rawDesc = "" + "batchIndex\x12\x17\n" + "\ais_last\x18\x06 \x01(\bR\x06isLast\x12\"\n" + "\x05phase\x18\a \x01(\v2\f.phase.PhaseR\x05phase\x12.\n" + - "\bchecksum\x18\b \x01(\v2\x12.checksum.ChecksumR\bchecksum\"N\n" + + "\bchecksum\x18\b \x01(\v2\x12.checksum.ChecksumR\bchecksum\"\xd1\x01\n" + + "\x1aAccountSyncRequestAccounts\x12Q\n" + + "\taddresses\x18\x01 \x03(\v23.accounts.AccountSyncRequestAccounts.AddressesEntryR\taddresses\x12\"\n" + + "\x05phase\x18\x02 \x01(\v2\f.phase.PhaseR\x05phase\x1a<\n" + + "\x0eAddressesEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\bR\x05value:\x028\x01\"N\n" + "\x0fAccountBatchAck\x12\x1f\n" + "\vbatch_index\x18\x01 \x01(\rR\n" + "batchIndex\x12\x1a\n" + @@ -679,41 +741,45 @@ func file_accounts_accounts_proto_rawDescGZIP() []byte { return file_accounts_accounts_proto_rawDescData } -var file_accounts_accounts_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_accounts_accounts_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_accounts_accounts_proto_goTypes = []any{ - (*Account)(nil), // 0: accounts.Account - (*AccountNonceSyncRequest)(nil), // 1: accounts.AccountNonceSyncRequest - (*AccountBatchAck)(nil), // 2: accounts.AccountBatchAck - (*AccountSyncHeartbeat)(nil), // 3: accounts.AccountSyncHeartbeat - (*AccountSyncResponse)(nil), // 4: accounts.AccountSyncResponse - (*AccountSyncEndOfStream)(nil), // 5: accounts.AccountSyncEndOfStream - (*AccountSyncServerMessage)(nil), // 6: accounts.AccountSyncServerMessage - (*structpb.Struct)(nil), // 7: google.protobuf.Struct - (*tagging.RangeTag)(nil), // 8: tagging.RangeTag - (*phase.Phase)(nil), // 9: phase.Phase - (*checksum.Checksum)(nil), // 10: checksum.Checksum - (*ack.Ack)(nil), // 11: ack.Ack + (*Account)(nil), // 0: accounts.Account + (*AccountNonceSyncRequest)(nil), // 1: accounts.AccountNonceSyncRequest + (*AccountSyncRequestAccounts)(nil), // 2: accounts.AccountSyncRequestAccounts + (*AccountBatchAck)(nil), // 3: accounts.AccountBatchAck + (*AccountSyncHeartbeat)(nil), // 4: accounts.AccountSyncHeartbeat + (*AccountSyncResponse)(nil), // 5: accounts.AccountSyncResponse + (*AccountSyncEndOfStream)(nil), // 6: accounts.AccountSyncEndOfStream + (*AccountSyncServerMessage)(nil), // 7: accounts.AccountSyncServerMessage + nil, // 8: accounts.AccountSyncRequestAccounts.AddressesEntry + (*structpb.Struct)(nil), // 9: google.protobuf.Struct + (*tagging.RangeTag)(nil), // 10: tagging.RangeTag + (*phase.Phase)(nil), // 11: phase.Phase + (*checksum.Checksum)(nil), // 12: checksum.Checksum + (*ack.Ack)(nil), // 13: ack.Ack } var file_accounts_accounts_proto_depIdxs = []int32{ - 7, // 0: accounts.Account.metadata:type_name -> google.protobuf.Struct - 8, // 1: accounts.AccountNonceSyncRequest.keys_range:type_name -> tagging.RangeTag - 9, // 2: accounts.AccountNonceSyncRequest.phase:type_name -> phase.Phase - 10, // 3: accounts.AccountNonceSyncRequest.checksum:type_name -> checksum.Checksum - 11, // 4: accounts.AccountBatchAck.ack:type_name -> ack.Ack - 0, // 5: accounts.AccountSyncResponse.accounts:type_name -> accounts.Account - 11, // 6: accounts.AccountSyncResponse.ack:type_name -> ack.Ack - 9, // 7: accounts.AccountSyncResponse.phase:type_name -> phase.Phase - 11, // 8: accounts.AccountSyncEndOfStream.ack:type_name -> ack.Ack - 9, // 9: accounts.AccountSyncEndOfStream.phase:type_name -> phase.Phase - 2, // 10: accounts.AccountSyncServerMessage.batch_ack:type_name -> accounts.AccountBatchAck - 3, // 11: accounts.AccountSyncServerMessage.heartbeat:type_name -> accounts.AccountSyncHeartbeat - 4, // 12: accounts.AccountSyncServerMessage.response:type_name -> accounts.AccountSyncResponse - 5, // 13: accounts.AccountSyncServerMessage.end:type_name -> accounts.AccountSyncEndOfStream - 14, // [14:14] is the sub-list for method output_type - 14, // [14:14] is the sub-list for method input_type - 14, // [14:14] is the sub-list for extension type_name - 14, // [14:14] is the sub-list for extension extendee - 0, // [0:14] is the sub-list for field type_name + 9, // 0: accounts.Account.metadata:type_name -> google.protobuf.Struct + 10, // 1: accounts.AccountNonceSyncRequest.keys_range:type_name -> tagging.RangeTag + 11, // 2: accounts.AccountNonceSyncRequest.phase:type_name -> phase.Phase + 12, // 3: accounts.AccountNonceSyncRequest.checksum:type_name -> checksum.Checksum + 8, // 4: accounts.AccountSyncRequestAccounts.addresses:type_name -> accounts.AccountSyncRequestAccounts.AddressesEntry + 11, // 5: accounts.AccountSyncRequestAccounts.phase:type_name -> phase.Phase + 13, // 6: accounts.AccountBatchAck.ack:type_name -> ack.Ack + 0, // 7: accounts.AccountSyncResponse.accounts:type_name -> accounts.Account + 13, // 8: accounts.AccountSyncResponse.ack:type_name -> ack.Ack + 11, // 9: accounts.AccountSyncResponse.phase:type_name -> phase.Phase + 13, // 10: accounts.AccountSyncEndOfStream.ack:type_name -> ack.Ack + 11, // 11: accounts.AccountSyncEndOfStream.phase:type_name -> phase.Phase + 3, // 12: accounts.AccountSyncServerMessage.batch_ack:type_name -> accounts.AccountBatchAck + 4, // 13: accounts.AccountSyncServerMessage.heartbeat:type_name -> accounts.AccountSyncHeartbeat + 5, // 14: accounts.AccountSyncServerMessage.response:type_name -> accounts.AccountSyncResponse + 6, // 15: accounts.AccountSyncServerMessage.end:type_name -> accounts.AccountSyncEndOfStream + 16, // [16:16] is the sub-list for method output_type + 16, // [16:16] is the sub-list for method input_type + 16, // [16:16] is the sub-list for extension type_name + 16, // [16:16] is the sub-list for extension extendee + 0, // [0:16] is the sub-list for field type_name } func init() { file_accounts_accounts_proto_init() } @@ -721,7 +787,7 @@ func file_accounts_accounts_proto_init() { if File_accounts_accounts_proto != nil { return } - file_accounts_accounts_proto_msgTypes[6].OneofWrappers = []any{ + file_accounts_accounts_proto_msgTypes[7].OneofWrappers = []any{ (*AccountSyncServerMessage_BatchAck)(nil), (*AccountSyncServerMessage_Heartbeat)(nil), (*AccountSyncServerMessage_Response)(nil), @@ -733,7 +799,7 @@ func file_accounts_accounts_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_accounts_accounts_proto_rawDesc), len(file_accounts_accounts_proto_rawDesc)), NumEnums: 0, - NumMessages: 7, + NumMessages: 9, NumExtensions: 0, NumServices: 0, }, diff --git a/common/proto/accounts/accounts.proto b/common/proto/accounts/accounts.proto index 4f4b22d..c8048bf 100644 --- a/common/proto/accounts/accounts.proto +++ b/common/proto/accounts/accounts.proto @@ -63,6 +63,15 @@ message AccountNonceSyncRequest { checksum.Checksum checksum = 8; // optional: versioned digest (shared checksum.proto) } +// AccountSyncRequestAccounts is a stateless targeted fetch request. +// map gives set semantics at the wire level — duplicate addresses +// are structurally impossible (map keys are unique in protobuf). +// Used post-PoTS / post-Reconciliation when tagging identifies missing accounts. +message AccountSyncRequestAccounts { + map addresses = 1; // key = address hex or DID string + phase.Phase phase = 2; +} + // ================================================================ // Server-side stream envelope (server → client) // ================================================================ diff --git a/common/types/constants/constants.go b/common/types/constants/constants.go index 30c1bdc..684781b 100644 --- a/common/types/constants/constants.go +++ b/common/types/constants/constants.go @@ -41,6 +41,10 @@ const ( ACCOUNTS_SYNC_REQUEST_END = "state:accountssync:accountssyncrequest:end" ACCOUNTS_SYNC_REQUEST = "state:accountssync:accountssyncrequest" ACCOUNTS_SYNC_RESPONSE = "state:accountssync:accountssyncresponse" + + // Take the array of accounts required from the client as request, and send the array of accounts to the server. + ACCOUNTS_SYNC_REQUEST_ACCOUNTS = "state:accountssync:accountssyncrequest:accounts" + ACCOUNTS_SYNC_REQUEST_ACCOUNTS_RESPONSE = "state:accountssync:accountssyncrequest:accounts:response" ) const ( diff --git a/common/types/constants/protocols.go b/common/types/constants/protocols.go index a3d7ced..b9f8739 100644 --- a/common/types/constants/protocols.go +++ b/common/types/constants/protocols.go @@ -11,6 +11,7 @@ const ( DataSyncProtocol protocol.ID = "/fastsync/v1/datasync" AccountsSyncProtocol protocol.ID = "/fastsync/v1/accountssync" AccountsSyncDataProtocol protocol.ID = "/fastsync/v1/accountssync/data" + AccountsSyncFetchProtocol protocol.ID = "/fastsync/v1/accountssync/fetch" PoTSProtocol protocol.ID = "/fastsync/v1/pots" ) diff --git a/core/accountsync/accountsync.go b/core/accountsync/accountsync.go index 800a399..a990621 100644 --- a/core/accountsync/accountsync.go +++ b/core/accountsync/accountsync.go @@ -128,6 +128,58 @@ func (as *AccountSync) AccountSync(remote *availabilitypb.AvailabilityResponse) return totalAccounts, nil } +// FetchAccounts sends a targeted AccountSyncRequestAccounts to the server on +// AccountsSyncFetchProtocol and returns the single-page AccountSyncResponse. +// Empty address maps short-circuit without a network round-trip. +func (as *AccountSync) FetchAccounts(remote *availabilitypb.AvailabilityResponse, addresses map[string]bool) (*accountspb.AccountSyncResponse, error) { + if remote == nil || remote.Nodeinfo == nil { + return nil, fmt.Errorf("accountsfetch: remote is nil") + } + if remote.Auth == nil || remote.Auth.UUID == "" { + return nil, fmt.Errorf("accountsfetch: remote has no auth UUID") + } + if as.Comm == nil { + return nil, fmt.Errorf("accountsfetch: communicator not set — call SetSyncVars first") + } + if len(addresses) == 0 { + return &accountspb.AccountSyncResponse{PageIndex: 0}, nil + } + + ctx := as.SyncVars.Ctx + + remoteNodeInfo, err := routerhelper.NewNodeInfoHelper().ToNodeinfo(remote.Nodeinfo) + if err != nil { + return nil, fmt.Errorf("accountsfetch: parse remote nodeinfo: %w", err) + } + + req := &accountspb.AccountSyncRequestAccounts{ + Addresses: addresses, + Phase: &phasepb.Phase{ + PresentPhase: constants.ACCOUNTS_SYNC_REQUEST_ACCOUNTS, + SuccessivePhase: constants.ACCOUNTS_SYNC_REQUEST_ACCOUNTS, + Auth: remote.Auth, + }, + } + + resp, err := as.Comm.FetchAccounts(ctx, *remoteNodeInfo, req) + if err != nil { + return nil, fmt.Errorf("accountsfetch: %w", err) + } + if resp == nil || !resp.GetAck().GetOk() { + errMsg := "unknown error" + if resp != nil { + errMsg = resp.GetAck().GetError() + } + return nil, fmt.Errorf("accountsfetch: server error: %s", errMsg) + } + + Log.Logger(Log.Sync).Info(ctx, "accountsfetch: complete", + ion.Int("requested", len(addresses)), + ion.Int("received", len(resp.GetAccounts()))) + + return resp, nil +} + func (as *AccountSync) Close() { if as.SyncVars != nil { as.SyncVars = nil diff --git a/core/accountsync/interface.go b/core/accountsync/interface.go index e60398a..fae909c 100644 --- a/core/accountsync/interface.go +++ b/core/accountsync/interface.go @@ -4,6 +4,7 @@ import ( "context" "github.com/JupiterMetaLabs/JMDN-FastSync/common/WAL" + accountspb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/accounts" availabilitypb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/availability" "github.com/JupiterMetaLabs/JMDN-FastSync/common/types" "github.com/libp2p/go-libp2p/core/host" @@ -48,6 +49,17 @@ type AccountSync_router interface { // diff, or stream delivery fails. AccountSync(remote *availabilitypb.AvailabilityResponse) (uint64, error) + // FetchAccounts sends a targeted request for specific accounts (by hex address or DID + // string) to the server. The server strips DIDs and looks up only plain accounts in + // its DB, returning them as a single AccountSyncResponse with page_index=0. + // + // Used post-PoTS / post-Reconciliation when tagging identifies accounts the client + // is missing that are not covered by the streaming AccountSync diff. + // + // Returns the server response. The caller is responsible for writing the returned + // accounts to the local DB. + FetchAccounts(remote *availabilitypb.AvailabilityResponse, addresses map[string]bool) (*accountspb.AccountSyncResponse, error) + // Close releases resources held by the router. Close() } diff --git a/core/priorsync/priorsync.go b/core/priorsync/priorsync.go index d4140fa..a077c3f 100644 --- a/core/priorsync/priorsync.go +++ b/core/priorsync/priorsync.go @@ -101,6 +101,7 @@ func (ps *PriorSync) SetupNetworkHandlers(debug bool) error { ps.SyncVars.Node.RemoveStreamHandler(constants.BlocksPUBSUB) ps.SyncVars.Node.RemoveStreamHandler(constants.AccountsSyncProtocol) ps.SyncVars.Node.RemoveStreamHandler(constants.AccountsSyncDataProtocol) + ps.SyncVars.Node.RemoveStreamHandler(constants.AccountsSyncFetchProtocol) } if ps.cancel != nil { @@ -149,6 +150,10 @@ func (ps *PriorSync) SetupNetworkHandlers(debug bool) error { return err } + if err := syncHandler.HandleAccountsFetch(ctx, ps.SyncVars.Node); err != nil { + return err + } + // Block until parent or Close() cancels <-ctx.Done() @@ -263,5 +268,6 @@ func (ps *PriorSync) Close() { node.RemoveStreamHandler(constants.BlocksPUBSUB) node.RemoveStreamHandler(constants.AccountsSyncProtocol) node.RemoveStreamHandler(constants.AccountsSyncDataProtocol) + node.RemoveStreamHandler(constants.AccountsSyncFetchProtocol) } } diff --git a/core/protocol/communication/communication.go b/core/protocol/communication/communication.go index 99d6fa3..ba5e1e3 100644 --- a/core/protocol/communication/communication.go +++ b/core/protocol/communication/communication.go @@ -50,6 +50,11 @@ type Communicator interface { // StreamAccounts delivers one AccountSyncResponse page from the server to the // client by dialling AccountsSyncDataProtocol (dial-back). Returns the client ack. StreamAccounts(ctx context.Context, peerNode types.Nodeinfo, resp *accountspb.AccountSyncServerMessage) (*accountspb.AccountSyncServerMessage, error) + + // FetchAccounts sends a targeted AccountSyncRequestAccounts to the server and + // returns a single AccountSyncResponse (page_index=0). Used post-PoTS / + // post-Reconciliation when specific missing accounts are identified by address or DID. + FetchAccounts(ctx context.Context, peerNode types.Nodeinfo, req *accountspb.AccountSyncRequestAccounts) (*accountspb.AccountSyncResponse, error) } func NewCommunication(host host.Host, protocolVersion uint16) Communicator { @@ -349,6 +354,39 @@ func (c *communication) StreamAccounts( return ack, nil } +// FetchAccounts sends an AccountSyncRequestAccounts to the server on +// AccountsSyncFetchProtocol and returns the single-page AccountSyncResponse. +func (c *communication) FetchAccounts( + ctx context.Context, + peerNode types.Nodeinfo, + req *accountspb.AccountSyncRequestAccounts, +) (*accountspb.AccountSyncResponse, error) { + if c.host == nil { + return nil, errors.New("host is nil") + } + + peerInfo := libp2p_peer.AddrInfo{ + ID: peerNode.PeerID, + Addrs: peerNode.Multiaddr, + } + + resp := &accountspb.AccountSyncResponse{} + + if err := messaging.SendProtoDelimited( + ctx, + c.protocolVersion, + c.host, + peerInfo, + constants.AccountsSyncFetchProtocol, + req, + resp, + ); err != nil { + return nil, errors.New("failed to send accounts fetch request: " + err.Error()) + } + + return resp, nil +} + func (c *communication) SendPoTSRequest( ctx context.Context, peerNode types.Nodeinfo, diff --git a/core/protocol/router/data_router.go b/core/protocol/router/data_router.go index 8663627..6450e44 100644 --- a/core/protocol/router/data_router.go +++ b/core/protocol/router/data_router.go @@ -1586,4 +1586,77 @@ func (router *Datarouter) ACCOUNTS_SYNC(ctx context.Context, req *accountspb.Acc }, }, } +} + +func (router *Datarouter) HandleAccountsFetch(ctx context.Context, req *accountspb.AccountSyncRequestAccounts, remote *types.Nodeinfo) *accountspb.AccountSyncResponse { + if req.Phase == nil || req.Phase.Auth == nil || req.Phase.Auth.UUID == "" { + return &accountspb.AccountSyncResponse{ + Ack: &ackpb.Ack{Ok: false, Error: errors.AuthRequired.Error()}, + Phase: &phasepb.Phase{ + PresentPhase: constants.ACCOUNTS_SYNC_REQUEST_ACCOUNTS, + SuccessivePhase: constants.FAILURE, + Success: false, + Error: errors.AuthRequired.Error(), + }, + } + } + + authenticated, err := router.Authenticate(ctx, req.Phase.Auth, remote) + if err != nil || !authenticated { + return &accountspb.AccountSyncResponse{ + Ack: &ackpb.Ack{Ok: false, Error: errors.AuthenticationFailed.Error()}, + Phase: &phasepb.Phase{ + PresentPhase: constants.ACCOUNTS_SYNC_REQUEST_ACCOUNTS, + SuccessivePhase: constants.FAILURE, + Success: false, + Error: errors.AuthenticationFailed.Error(), + Auth: req.Phase.Auth, + }, + } + } + + defer func() { + if resetErr := router.ResetTTL(ctx, req.Phase.Auth, remote); resetErr != nil { + Log.Logger(namedlogger).Error(ctx, "Failed to reset TTL", resetErr) + } + }() + + switch req.Phase.PresentPhase { + case constants.ACCOUNTS_SYNC_REQUEST_ACCOUNTS: + protoAccounts, fetchErr := accountshelper.FetchAccountsByAddresses(ctx, router.Nodeinfo.BlockInfo, req.GetAddresses()) + if fetchErr != nil { + return &accountspb.AccountSyncResponse{ + Ack: &ackpb.Ack{Ok: false, Error: fetchErr.Error()}, + Phase: &phasepb.Phase{ + PresentPhase: constants.ACCOUNTS_SYNC_REQUEST_ACCOUNTS, + SuccessivePhase: constants.FAILURE, + Success: false, + Error: fetchErr.Error(), + Auth: req.Phase.Auth, + }, + } + } + return &accountspb.AccountSyncResponse{ + Accounts: protoAccounts, + PageIndex: 0, + Ack: &ackpb.Ack{Ok: true}, + Phase: &phasepb.Phase{ + PresentPhase: constants.ACCOUNTS_SYNC_REQUEST_ACCOUNTS_RESPONSE, + SuccessivePhase: constants.SUCCESS, + Success: true, + Auth: req.Phase.Auth, + }, + } + default: + return &accountspb.AccountSyncResponse{ + Ack: &ackpb.Ack{Ok: false, Error: "unknown state: " + req.Phase.PresentPhase}, + Phase: &phasepb.Phase{ + PresentPhase: req.Phase.PresentPhase, + SuccessivePhase: constants.FAILURE, + Success: false, + Error: "unknown state: " + req.Phase.PresentPhase, + Auth: req.Phase.Auth, + }, + } + } } \ No newline at end of file diff --git a/core/protocol/router/helper/accounts/accounts_fetch_helper.go b/core/protocol/router/helper/accounts/accounts_fetch_helper.go new file mode 100644 index 0000000..0907c66 --- /dev/null +++ b/core/protocol/router/helper/accounts/accounts_fetch_helper.go @@ -0,0 +1,64 @@ +package accounts + +import ( + "context" + "strings" + + accountspb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/accounts" + "github.com/JupiterMetaLabs/JMDN-FastSync/common/types" + Log "github.com/JupiterMetaLabs/JMDN-FastSync/logging" + "github.com/JupiterMetaLabs/ion" +) + +const fetchLogger = "log:accountsfetch" + +// lookupKeyForFetch returns the on-chain account address string used for DB lookup. +// Keys may be raw addresses or DID strings ("did:...:address"); the last ':' segment +// of a DID is treated as the address. +func lookupKeyForFetch(addr string) string { + if strings.HasPrefix(addr, "did:") { + parts := strings.Split(addr, ":") + if len(parts) > 0 { + return parts[len(parts)-1] + } + } + return addr +} + +// FetchAccountsByAddresses looks up each address key in the provided set, +// converts found records to proto, and returns the slice. +// Not-found addresses are silently skipped (not an error). +func FetchAccountsByAddresses(ctx context.Context, blockInfo types.BlockInfo, addresses map[string]bool) ([]*accountspb.Account, error) { + accountMgr := blockInfo.NewAccountManager() + protoAccounts := make([]*accountspb.Account, 0, len(addresses)) + + for addr := range addresses { + lookup := lookupKeyForFetch(addr) + acc, err := accountMgr.GetAccountByAddress(lookup) + if err != nil { + Log.Logger(fetchLogger).Warn(ctx, "AccountsFetch: lookup failed", + ion.String("address", addr), + ion.String("lookup", lookup), + ion.Err(err)) + continue + } + if acc == nil { + continue + } + protoAccounts = append(protoAccounts, &accountspb.Account{ + DidAddress: acc.DIDAddress, + Address: acc.Address.Bytes(), + Balance: "0", // Server should only send the zero balance during fetch accounts steps + Nonce: acc.Nonce, + AccountType: acc.AccountType, + CreatedAt: acc.CreatedAt, + UpdatedAt: acc.UpdatedAt, + }) + } + + Log.Logger(fetchLogger).Info(ctx, "AccountsFetch complete", + ion.Int("requested", len(addresses)), + ion.Int("found", len(protoAccounts))) + + return protoAccounts, nil +} diff --git a/core/reconsillation/interface.go b/core/reconsillation/interface.go index 836bd03..129c61a 100644 --- a/core/reconsillation/interface.go +++ b/core/reconsillation/interface.go @@ -4,6 +4,7 @@ import ( "context" "github.com/JupiterMetaLabs/JMDN-FastSync/common/WAL" + availabilitypb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/availability" "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/tagging" "github.com/JupiterMetaLabs/JMDN-FastSync/common/types" ) @@ -24,7 +25,7 @@ type Reconciliation_router interface { // Reconcile calculates updated balances for all tagged accounts by querying // their transactions and updates the accounts table in the database. // Returns the number of accounts successfully reconciled and list of failed accounts. - Reconcile(taggedAccounts *tagging.TaggedAccounts) (int, []string, error) + Reconcile(taggedAccounts *tagging.TaggedAccounts, remote *availabilitypb.AvailabilityResponse) (int, []string, error) // Close releases resources and cleans up. Close() diff --git a/core/reconsillation/reconsillation.go b/core/reconsillation/reconsillation.go index b83bdf6..f8fab90 100644 --- a/core/reconsillation/reconsillation.go +++ b/core/reconsillation/reconsillation.go @@ -11,9 +11,11 @@ import ( "github.com/JupiterMetaLabs/JMDN-FastSync/common/WAL" blockpb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/block" "github.com/ethereum/go-ethereum/common" + availabilitypb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/availability" taggingpb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/tagging" "github.com/JupiterMetaLabs/JMDN-FastSync/common/types" "github.com/JupiterMetaLabs/JMDN-FastSync/common/types/constants" + "github.com/JupiterMetaLabs/JMDN-FastSync/core/accountsync" "github.com/JupiterMetaLabs/JMDN-FastSync/core/reconsillation/LRUCache" "github.com/JupiterMetaLabs/JMDN-FastSync/core/reconsillation/helper" Log "github.com/JupiterMetaLabs/JMDN-FastSync/logging" @@ -72,6 +74,7 @@ func (r *Reconciliation) GetLRUCache() LRUCache.LRUCacheInterface { return r.headerCache } + // GetSyncVars returns the current sync configuration. func (r *Reconciliation) GetSyncVars() *types.Syncvars { return r.SyncVars @@ -123,7 +126,7 @@ func (r *Reconciliation) GetBlockFromLRUCache(blockNumber uint64) (*blockpb.Head // // Returns the number of accounts reconciled and the list of any accounts that failed // during the computation phase. A non-nil error always means the DB was NOT mutated. -func (r *Reconciliation) Reconcile(taggedAccounts *taggingpb.TaggedAccounts) (int, []string, error) { +func (r *Reconciliation) Reconcile(taggedAccounts *taggingpb.TaggedAccounts, remote *availabilitypb.AvailabilityResponse) (int, []string, error) { if taggedAccounts == nil || len(taggedAccounts.Accounts) == 0 { Log.Logger(namedlogger).Info(r.SyncVars.Ctx, "No tagged accounts to reconcile") return 0, nil, nil @@ -209,6 +212,61 @@ func (r *Reconciliation) Reconcile(taggedAccounts *taggingpb.TaggedAccounts) (in Log.Logger(namedlogger).Info(ctx, "Phase 1 complete — all account states ready", ion.Int("accounts_ready", len(updates))) + // ---------------------------------------------------------------- + // Phase 1.5: Pre-create accounts missing from local DB. + // AccountSync covers ~99% of zero-balance accounts before reconciliation; + // this handles stragglers. Fetching from the remote and writing them with + // zero balance lets Phase 3 issue an UPDATE instead of a CREATE, keeping + // the flow uniform. Accounts the remote also doesn't know about remain + // IsNewAccount=true and are handled by BatchUpdateAccounts as before. + // ---------------------------------------------------------------- + if remote != nil { + missingAddrs := make(map[string]bool) + missingIdx := make(map[string]int) + for updateIdx, update := range updates { + if update.IsNewAccount { + missingAddrs[update.Address] = true + missingIdx[update.Address] = updateIdx + } + } + + if len(missingAddrs) > 0 { + Log.Logger(namedlogger).Debug(ctx, "Phase 1.5: fetching missing accounts from remote", + ion.Int("missing_count", len(missingAddrs))) + + syncVars := r.GetSyncVars() + acctSync := accountsync.NewAccountSync().SetSyncVars(syncVars.Ctx, syncVars.Version, syncVars.NodeInfo, syncVars.Node, syncVars.WAL) + resp, err := acctSync.FetchAccounts(remote, missingAddrs) + if err != nil { + Log.Logger(namedlogger).Warn(ctx, "Phase 1.5: remote fetch failed — BatchUpdateAccounts will create them", + ion.Err(err)) + } else if resp != nil { + created := 0 + for _, acc := range resp.GetAccounts() { + addrBytes := acc.GetAddress() + if len(addrBytes) == 0 { + continue + } + addr := strings.ToLower(common.BytesToAddress(addrBytes).Hex()) + idx, ok := missingIdx[addr] + if !ok { + continue + } + if err := accountManager.CreateAccount(addr, big.NewInt(0), acc.GetNonce()); err != nil { + Log.Logger(namedlogger).Warn(ctx, "Phase 1.5: failed to pre-create account", + ion.String("address", addr), ion.Err(err)) + continue + } + updates[idx].IsNewAccount = false + created++ + } + Log.Logger(namedlogger).Info(ctx, "Phase 1.5 complete — pre-created missing accounts from remote", + ion.Int("requested", len(missingAddrs)), + ion.Int("created", created)) + } + } + } + // ---------------------------------------------------------------- // Phase 2: Write all planned changes to WAL as a single batch event. // Writing one ReconciliationBatchEvent instead of N individual events diff --git a/core/sync/sync_protocols.go b/core/sync/sync_protocols.go index 316cedc..64e0418 100644 --- a/core/sync/sync_protocols.go +++ b/core/sync/sync_protocols.go @@ -60,6 +60,7 @@ type sync_interface interface { HandlePubsub(ctx context.Context, node host.Host) error HandleAccountsSync(ctx context.Context, node host.Host) error HandleAccountsSyncData(ctx context.Context, node host.Host) error + HandleAccountsFetch(ctx context.Context, node host.Host) error Debug(ctx context.Context, protocol protocol.ID, node host.Host, remote *types.Nodeinfo) } @@ -717,6 +718,45 @@ func (s *Sync) HandleAccountsSyncData(ctx context.Context, node host.Host) error return nil } +// HandleAccountsFetch registers the server-side handler for AccountsSyncFetchProtocol. +// The client sends one AccountSyncRequestAccounts; the server resolves the addresses, +// strips DIDs, and returns a single AccountSyncResponse (page_index=0). +// Pattern: stateless read → route → write (no heartbeat — targeted DB lookup). +func (s *Sync) HandleAccountsFetch(ctx context.Context, node host.Host) error { + node.SetStreamHandler(constants.AccountsSyncFetchProtocol, func(str network.Stream) { + defer str.Close() + + select { + case <-ctx.Done(): + return + default: + } + + _ = str.SetReadDeadline(time.Now().Add(constants.StreamDeadline)) + defer str.SetReadDeadline(time.Time{}) + + req := &accountspb.AccountSyncRequestAccounts{} + if err := pbstream.ReadDelimited(str, req); err != nil { + return + } + + remoteNodeInfo := &types.Nodeinfo{ + PeerID: str.Conn().RemotePeer(), + Multiaddr: []multiaddr.Multiaddr{str.Conn().RemoteMultiaddr()}, + Version: s.nodeinfo.Version, + } + + resp := s.Datarouter.HandleAccountsFetch(ctx, req, remoteNodeInfo) + s.Debug(ctx, constants.AccountsSyncFetchProtocol, node, remoteNodeInfo) + + _ = str.SetWriteDeadline(time.Now().Add(constants.StreamDeadline)) + defer str.SetWriteDeadline(time.Time{}) + + _ = pbstream.WriteDelimited(str, resp) + }) + return nil +} + func (s *Sync) Debug(ctx context.Context, protocol protocol.ID, node host.Host, remote *types.Nodeinfo) { if s.debug && remote != nil { logging.Logger(logging.Sync).Debug(ctx, "Sync Protocols Debug",