From 49232a1a132e8fadef80d2e1cf61257b588ce9e8 Mon Sep 17 00:00:00 2001 From: Neeraj Chowdary <57310710+neerajchowdary889@users.noreply.github.com> Date: Fri, 15 May 2026 14:35:26 +0530 Subject: [PATCH 1/4] Add AccountSyncRequestAccounts message for targeted account fetching Introduce the AccountSyncRequestAccounts message to facilitate stateless targeted fetch requests for accounts. This message includes a map of addresses and a phase field, enhancing the account synchronization process. Update related protobuf definitions to maintain consistency across the protocol. --- common/proto/accounts/accounts.pb.go | 166 +++++++++++++++++++-------- common/proto/accounts/accounts.proto | 9 ++ 2 files changed, 125 insertions(+), 50 deletions(-) diff --git a/common/proto/accounts/accounts.pb.go b/common/proto/accounts/accounts.pb.go index 101afb8..d2d9493 100644 --- a/common/proto/accounts/accounts.pb.go +++ b/common/proto/accounts/accounts.pb.go @@ -237,6 +237,62 @@ func (x *AccountNonceSyncRequest) GetChecksum() *checksum.Checksum { return nil } +// AccountSyncRequestAccounts is a stateless targeted fetch request. +// map gives set semantics at the wire level — duplicate addresses +// are structurally impossible (map keys are unique in protobuf). +// Used post-PoTS / post-Reconciliation when tagging identifies missing accounts. +type AccountSyncRequestAccounts struct { + state protoimpl.MessageState `protogen:"open.v1"` + Addresses map[string]bool `protobuf:"bytes,1,rep,name=addresses,proto3" json:"addresses,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"` // key = address hex or DID string + Phase *phase.Phase `protobuf:"bytes,2,opt,name=phase,proto3" json:"phase,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AccountSyncRequestAccounts) Reset() { + *x = AccountSyncRequestAccounts{} + mi := &file_accounts_accounts_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AccountSyncRequestAccounts) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AccountSyncRequestAccounts) ProtoMessage() {} + +func (x *AccountSyncRequestAccounts) ProtoReflect() protoreflect.Message { + mi := &file_accounts_accounts_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AccountSyncRequestAccounts.ProtoReflect.Descriptor instead. +func (*AccountSyncRequestAccounts) Descriptor() ([]byte, []int) { + return file_accounts_accounts_proto_rawDescGZIP(), []int{2} +} + +func (x *AccountSyncRequestAccounts) GetAddresses() map[string]bool { + if x != nil { + return x.Addresses + } + return nil +} + +func (x *AccountSyncRequestAccounts) GetPhase() *phase.Phase { + if x != nil { + return x.Phase + } + return nil +} + // AccountBatchAck acknowledges receipt of one ART batch. // Client waits for this before sending the next batch. type AccountBatchAck struct { @@ -249,7 +305,7 @@ type AccountBatchAck struct { func (x *AccountBatchAck) Reset() { *x = AccountBatchAck{} - mi := &file_accounts_accounts_proto_msgTypes[2] + mi := &file_accounts_accounts_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -261,7 +317,7 @@ func (x *AccountBatchAck) String() string { func (*AccountBatchAck) ProtoMessage() {} func (x *AccountBatchAck) ProtoReflect() protoreflect.Message { - mi := &file_accounts_accounts_proto_msgTypes[2] + mi := &file_accounts_accounts_proto_msgTypes[3] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -274,7 +330,7 @@ func (x *AccountBatchAck) ProtoReflect() protoreflect.Message { // Deprecated: Use AccountBatchAck.ProtoReflect.Descriptor instead. func (*AccountBatchAck) Descriptor() ([]byte, []int) { - return file_accounts_accounts_proto_rawDescGZIP(), []int{2} + return file_accounts_accounts_proto_rawDescGZIP(), []int{3} } func (x *AccountBatchAck) GetBatchIndex() uint32 { @@ -303,7 +359,7 @@ type AccountSyncHeartbeat struct { func (x *AccountSyncHeartbeat) Reset() { *x = AccountSyncHeartbeat{} - mi := &file_accounts_accounts_proto_msgTypes[3] + mi := &file_accounts_accounts_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -315,7 +371,7 @@ func (x *AccountSyncHeartbeat) String() string { func (*AccountSyncHeartbeat) ProtoMessage() {} func (x *AccountSyncHeartbeat) ProtoReflect() protoreflect.Message { - mi := &file_accounts_accounts_proto_msgTypes[3] + mi := &file_accounts_accounts_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -328,7 +384,7 @@ func (x *AccountSyncHeartbeat) ProtoReflect() protoreflect.Message { // Deprecated: Use AccountSyncHeartbeat.ProtoReflect.Descriptor instead. func (*AccountSyncHeartbeat) Descriptor() ([]byte, []int) { - return file_accounts_accounts_proto_rawDescGZIP(), []int{3} + return file_accounts_accounts_proto_rawDescGZIP(), []int{4} } func (x *AccountSyncHeartbeat) GetTimestamp() int64 { @@ -356,7 +412,7 @@ type AccountSyncResponse struct { func (x *AccountSyncResponse) Reset() { *x = AccountSyncResponse{} - mi := &file_accounts_accounts_proto_msgTypes[4] + mi := &file_accounts_accounts_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -368,7 +424,7 @@ func (x *AccountSyncResponse) String() string { func (*AccountSyncResponse) ProtoMessage() {} func (x *AccountSyncResponse) ProtoReflect() protoreflect.Message { - mi := &file_accounts_accounts_proto_msgTypes[4] + mi := &file_accounts_accounts_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -381,7 +437,7 @@ func (x *AccountSyncResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use AccountSyncResponse.ProtoReflect.Descriptor instead. func (*AccountSyncResponse) Descriptor() ([]byte, []int) { - return file_accounts_accounts_proto_rawDescGZIP(), []int{4} + return file_accounts_accounts_proto_rawDescGZIP(), []int{5} } func (x *AccountSyncResponse) GetAccounts() []*Account { @@ -438,7 +494,7 @@ type AccountSyncEndOfStream struct { func (x *AccountSyncEndOfStream) Reset() { *x = AccountSyncEndOfStream{} - mi := &file_accounts_accounts_proto_msgTypes[5] + mi := &file_accounts_accounts_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -450,7 +506,7 @@ func (x *AccountSyncEndOfStream) String() string { func (*AccountSyncEndOfStream) ProtoMessage() {} func (x *AccountSyncEndOfStream) ProtoReflect() protoreflect.Message { - mi := &file_accounts_accounts_proto_msgTypes[5] + mi := &file_accounts_accounts_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -463,7 +519,7 @@ func (x *AccountSyncEndOfStream) ProtoReflect() protoreflect.Message { // Deprecated: Use AccountSyncEndOfStream.ProtoReflect.Descriptor instead. func (*AccountSyncEndOfStream) Descriptor() ([]byte, []int) { - return file_accounts_accounts_proto_rawDescGZIP(), []int{5} + return file_accounts_accounts_proto_rawDescGZIP(), []int{6} } func (x *AccountSyncEndOfStream) GetTotalPages() uint32 { @@ -515,7 +571,7 @@ type AccountSyncServerMessage struct { func (x *AccountSyncServerMessage) Reset() { *x = AccountSyncServerMessage{} - mi := &file_accounts_accounts_proto_msgTypes[6] + mi := &file_accounts_accounts_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -527,7 +583,7 @@ func (x *AccountSyncServerMessage) String() string { func (*AccountSyncServerMessage) ProtoMessage() {} func (x *AccountSyncServerMessage) ProtoReflect() protoreflect.Message { - mi := &file_accounts_accounts_proto_msgTypes[6] + mi := &file_accounts_accounts_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -540,7 +596,7 @@ func (x *AccountSyncServerMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use AccountSyncServerMessage.ProtoReflect.Descriptor instead. func (*AccountSyncServerMessage) Descriptor() ([]byte, []int) { - return file_accounts_accounts_proto_rawDescGZIP(), []int{6} + return file_accounts_accounts_proto_rawDescGZIP(), []int{7} } func (x *AccountSyncServerMessage) GetPayload() isAccountSyncServerMessage_Payload { @@ -641,7 +697,13 @@ const file_accounts_accounts_proto_rawDesc = "" + "batchIndex\x12\x17\n" + "\ais_last\x18\x06 \x01(\bR\x06isLast\x12\"\n" + "\x05phase\x18\a \x01(\v2\f.phase.PhaseR\x05phase\x12.\n" + - "\bchecksum\x18\b \x01(\v2\x12.checksum.ChecksumR\bchecksum\"N\n" + + "\bchecksum\x18\b \x01(\v2\x12.checksum.ChecksumR\bchecksum\"\xd1\x01\n" + + "\x1aAccountSyncRequestAccounts\x12Q\n" + + "\taddresses\x18\x01 \x03(\v23.accounts.AccountSyncRequestAccounts.AddressesEntryR\taddresses\x12\"\n" + + "\x05phase\x18\x02 \x01(\v2\f.phase.PhaseR\x05phase\x1a<\n" + + "\x0eAddressesEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\bR\x05value:\x028\x01\"N\n" + "\x0fAccountBatchAck\x12\x1f\n" + "\vbatch_index\x18\x01 \x01(\rR\n" + "batchIndex\x12\x1a\n" + @@ -679,41 +741,45 @@ func file_accounts_accounts_proto_rawDescGZIP() []byte { return file_accounts_accounts_proto_rawDescData } -var file_accounts_accounts_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_accounts_accounts_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_accounts_accounts_proto_goTypes = []any{ - (*Account)(nil), // 0: accounts.Account - (*AccountNonceSyncRequest)(nil), // 1: accounts.AccountNonceSyncRequest - (*AccountBatchAck)(nil), // 2: accounts.AccountBatchAck - (*AccountSyncHeartbeat)(nil), // 3: accounts.AccountSyncHeartbeat - (*AccountSyncResponse)(nil), // 4: accounts.AccountSyncResponse - (*AccountSyncEndOfStream)(nil), // 5: accounts.AccountSyncEndOfStream - (*AccountSyncServerMessage)(nil), // 6: accounts.AccountSyncServerMessage - (*structpb.Struct)(nil), // 7: google.protobuf.Struct - (*tagging.RangeTag)(nil), // 8: tagging.RangeTag - (*phase.Phase)(nil), // 9: phase.Phase - (*checksum.Checksum)(nil), // 10: checksum.Checksum - (*ack.Ack)(nil), // 11: ack.Ack + (*Account)(nil), // 0: accounts.Account + (*AccountNonceSyncRequest)(nil), // 1: accounts.AccountNonceSyncRequest + (*AccountSyncRequestAccounts)(nil), // 2: accounts.AccountSyncRequestAccounts + (*AccountBatchAck)(nil), // 3: accounts.AccountBatchAck + (*AccountSyncHeartbeat)(nil), // 4: accounts.AccountSyncHeartbeat + (*AccountSyncResponse)(nil), // 5: accounts.AccountSyncResponse + (*AccountSyncEndOfStream)(nil), // 6: accounts.AccountSyncEndOfStream + (*AccountSyncServerMessage)(nil), // 7: accounts.AccountSyncServerMessage + nil, // 8: accounts.AccountSyncRequestAccounts.AddressesEntry + (*structpb.Struct)(nil), // 9: google.protobuf.Struct + (*tagging.RangeTag)(nil), // 10: tagging.RangeTag + (*phase.Phase)(nil), // 11: phase.Phase + (*checksum.Checksum)(nil), // 12: checksum.Checksum + (*ack.Ack)(nil), // 13: ack.Ack } var file_accounts_accounts_proto_depIdxs = []int32{ - 7, // 0: accounts.Account.metadata:type_name -> google.protobuf.Struct - 8, // 1: accounts.AccountNonceSyncRequest.keys_range:type_name -> tagging.RangeTag - 9, // 2: accounts.AccountNonceSyncRequest.phase:type_name -> phase.Phase - 10, // 3: accounts.AccountNonceSyncRequest.checksum:type_name -> checksum.Checksum - 11, // 4: accounts.AccountBatchAck.ack:type_name -> ack.Ack - 0, // 5: accounts.AccountSyncResponse.accounts:type_name -> accounts.Account - 11, // 6: accounts.AccountSyncResponse.ack:type_name -> ack.Ack - 9, // 7: accounts.AccountSyncResponse.phase:type_name -> phase.Phase - 11, // 8: accounts.AccountSyncEndOfStream.ack:type_name -> ack.Ack - 9, // 9: accounts.AccountSyncEndOfStream.phase:type_name -> phase.Phase - 2, // 10: accounts.AccountSyncServerMessage.batch_ack:type_name -> accounts.AccountBatchAck - 3, // 11: accounts.AccountSyncServerMessage.heartbeat:type_name -> accounts.AccountSyncHeartbeat - 4, // 12: accounts.AccountSyncServerMessage.response:type_name -> accounts.AccountSyncResponse - 5, // 13: accounts.AccountSyncServerMessage.end:type_name -> accounts.AccountSyncEndOfStream - 14, // [14:14] is the sub-list for method output_type - 14, // [14:14] is the sub-list for method input_type - 14, // [14:14] is the sub-list for extension type_name - 14, // [14:14] is the sub-list for extension extendee - 0, // [0:14] is the sub-list for field type_name + 9, // 0: accounts.Account.metadata:type_name -> google.protobuf.Struct + 10, // 1: accounts.AccountNonceSyncRequest.keys_range:type_name -> tagging.RangeTag + 11, // 2: accounts.AccountNonceSyncRequest.phase:type_name -> phase.Phase + 12, // 3: accounts.AccountNonceSyncRequest.checksum:type_name -> checksum.Checksum + 8, // 4: accounts.AccountSyncRequestAccounts.addresses:type_name -> accounts.AccountSyncRequestAccounts.AddressesEntry + 11, // 5: accounts.AccountSyncRequestAccounts.phase:type_name -> phase.Phase + 13, // 6: accounts.AccountBatchAck.ack:type_name -> ack.Ack + 0, // 7: accounts.AccountSyncResponse.accounts:type_name -> accounts.Account + 13, // 8: accounts.AccountSyncResponse.ack:type_name -> ack.Ack + 11, // 9: accounts.AccountSyncResponse.phase:type_name -> phase.Phase + 13, // 10: accounts.AccountSyncEndOfStream.ack:type_name -> ack.Ack + 11, // 11: accounts.AccountSyncEndOfStream.phase:type_name -> phase.Phase + 3, // 12: accounts.AccountSyncServerMessage.batch_ack:type_name -> accounts.AccountBatchAck + 4, // 13: accounts.AccountSyncServerMessage.heartbeat:type_name -> accounts.AccountSyncHeartbeat + 5, // 14: accounts.AccountSyncServerMessage.response:type_name -> accounts.AccountSyncResponse + 6, // 15: accounts.AccountSyncServerMessage.end:type_name -> accounts.AccountSyncEndOfStream + 16, // [16:16] is the sub-list for method output_type + 16, // [16:16] is the sub-list for method input_type + 16, // [16:16] is the sub-list for extension type_name + 16, // [16:16] is the sub-list for extension extendee + 0, // [0:16] is the sub-list for field type_name } func init() { file_accounts_accounts_proto_init() } @@ -721,7 +787,7 @@ func file_accounts_accounts_proto_init() { if File_accounts_accounts_proto != nil { return } - file_accounts_accounts_proto_msgTypes[6].OneofWrappers = []any{ + file_accounts_accounts_proto_msgTypes[7].OneofWrappers = []any{ (*AccountSyncServerMessage_BatchAck)(nil), (*AccountSyncServerMessage_Heartbeat)(nil), (*AccountSyncServerMessage_Response)(nil), @@ -733,7 +799,7 @@ func file_accounts_accounts_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_accounts_accounts_proto_rawDesc), len(file_accounts_accounts_proto_rawDesc)), NumEnums: 0, - NumMessages: 7, + NumMessages: 9, NumExtensions: 0, NumServices: 0, }, diff --git a/common/proto/accounts/accounts.proto b/common/proto/accounts/accounts.proto index 4f4b22d..c8048bf 100644 --- a/common/proto/accounts/accounts.proto +++ b/common/proto/accounts/accounts.proto @@ -63,6 +63,15 @@ message AccountNonceSyncRequest { checksum.Checksum checksum = 8; // optional: versioned digest (shared checksum.proto) } +// AccountSyncRequestAccounts is a stateless targeted fetch request. +// map gives set semantics at the wire level — duplicate addresses +// are structurally impossible (map keys are unique in protobuf). +// Used post-PoTS / post-Reconciliation when tagging identifies missing accounts. +message AccountSyncRequestAccounts { + map addresses = 1; // key = address hex or DID string + phase.Phase phase = 2; +} + // ================================================================ // Server-side stream envelope (server → client) // ================================================================ From ea86ce747ebf7653f6414a5fffb04f3e65e376b6 Mon Sep 17 00:00:00 2001 From: Neeraj Chowdary <57310710+neerajchowdary889@users.noreply.github.com> Date: Fri, 15 May 2026 14:36:05 +0530 Subject: [PATCH 2/4] Add account fetching functionality and related constants Implement the HandleAccountsFetch method in the Datarouter to process account synchronization requests for specific addresses. Introduce a new helper function, FetchAccountsByAddresses, to retrieve account data based on provided addresses. Update constants to include new request and response phases for account synchronization. This enhances the account synchronization process by allowing targeted fetching of account information. --- common/types/constants/constants.go | 4 + core/protocol/router/data_router.go | 73 +++++++++++++++++++ .../helper/accounts/accounts_fetch_helper.go | 64 ++++++++++++++++ 3 files changed, 141 insertions(+) create mode 100644 core/protocol/router/helper/accounts/accounts_fetch_helper.go diff --git a/common/types/constants/constants.go b/common/types/constants/constants.go index 30c1bdc..684781b 100644 --- a/common/types/constants/constants.go +++ b/common/types/constants/constants.go @@ -41,6 +41,10 @@ const ( ACCOUNTS_SYNC_REQUEST_END = "state:accountssync:accountssyncrequest:end" ACCOUNTS_SYNC_REQUEST = "state:accountssync:accountssyncrequest" ACCOUNTS_SYNC_RESPONSE = "state:accountssync:accountssyncresponse" + + // Take the array of accounts required from the client as request, and send the array of accounts to the server. + ACCOUNTS_SYNC_REQUEST_ACCOUNTS = "state:accountssync:accountssyncrequest:accounts" + ACCOUNTS_SYNC_REQUEST_ACCOUNTS_RESPONSE = "state:accountssync:accountssyncrequest:accounts:response" ) const ( diff --git a/core/protocol/router/data_router.go b/core/protocol/router/data_router.go index 8663627..6450e44 100644 --- a/core/protocol/router/data_router.go +++ b/core/protocol/router/data_router.go @@ -1586,4 +1586,77 @@ func (router *Datarouter) ACCOUNTS_SYNC(ctx context.Context, req *accountspb.Acc }, }, } +} + +func (router *Datarouter) HandleAccountsFetch(ctx context.Context, req *accountspb.AccountSyncRequestAccounts, remote *types.Nodeinfo) *accountspb.AccountSyncResponse { + if req.Phase == nil || req.Phase.Auth == nil || req.Phase.Auth.UUID == "" { + return &accountspb.AccountSyncResponse{ + Ack: &ackpb.Ack{Ok: false, Error: errors.AuthRequired.Error()}, + Phase: &phasepb.Phase{ + PresentPhase: constants.ACCOUNTS_SYNC_REQUEST_ACCOUNTS, + SuccessivePhase: constants.FAILURE, + Success: false, + Error: errors.AuthRequired.Error(), + }, + } + } + + authenticated, err := router.Authenticate(ctx, req.Phase.Auth, remote) + if err != nil || !authenticated { + return &accountspb.AccountSyncResponse{ + Ack: &ackpb.Ack{Ok: false, Error: errors.AuthenticationFailed.Error()}, + Phase: &phasepb.Phase{ + PresentPhase: constants.ACCOUNTS_SYNC_REQUEST_ACCOUNTS, + SuccessivePhase: constants.FAILURE, + Success: false, + Error: errors.AuthenticationFailed.Error(), + Auth: req.Phase.Auth, + }, + } + } + + defer func() { + if resetErr := router.ResetTTL(ctx, req.Phase.Auth, remote); resetErr != nil { + Log.Logger(namedlogger).Error(ctx, "Failed to reset TTL", resetErr) + } + }() + + switch req.Phase.PresentPhase { + case constants.ACCOUNTS_SYNC_REQUEST_ACCOUNTS: + protoAccounts, fetchErr := accountshelper.FetchAccountsByAddresses(ctx, router.Nodeinfo.BlockInfo, req.GetAddresses()) + if fetchErr != nil { + return &accountspb.AccountSyncResponse{ + Ack: &ackpb.Ack{Ok: false, Error: fetchErr.Error()}, + Phase: &phasepb.Phase{ + PresentPhase: constants.ACCOUNTS_SYNC_REQUEST_ACCOUNTS, + SuccessivePhase: constants.FAILURE, + Success: false, + Error: fetchErr.Error(), + Auth: req.Phase.Auth, + }, + } + } + return &accountspb.AccountSyncResponse{ + Accounts: protoAccounts, + PageIndex: 0, + Ack: &ackpb.Ack{Ok: true}, + Phase: &phasepb.Phase{ + PresentPhase: constants.ACCOUNTS_SYNC_REQUEST_ACCOUNTS_RESPONSE, + SuccessivePhase: constants.SUCCESS, + Success: true, + Auth: req.Phase.Auth, + }, + } + default: + return &accountspb.AccountSyncResponse{ + Ack: &ackpb.Ack{Ok: false, Error: "unknown state: " + req.Phase.PresentPhase}, + Phase: &phasepb.Phase{ + PresentPhase: req.Phase.PresentPhase, + SuccessivePhase: constants.FAILURE, + Success: false, + Error: "unknown state: " + req.Phase.PresentPhase, + Auth: req.Phase.Auth, + }, + } + } } \ No newline at end of file diff --git a/core/protocol/router/helper/accounts/accounts_fetch_helper.go b/core/protocol/router/helper/accounts/accounts_fetch_helper.go new file mode 100644 index 0000000..db678e2 --- /dev/null +++ b/core/protocol/router/helper/accounts/accounts_fetch_helper.go @@ -0,0 +1,64 @@ +package accounts + +import ( + "context" + "strings" + + accountspb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/accounts" + "github.com/JupiterMetaLabs/JMDN-FastSync/common/types" + Log "github.com/JupiterMetaLabs/JMDN-FastSync/logging" + "github.com/JupiterMetaLabs/ion" +) + +const fetchLogger = "log:accountsfetch" + +// lookupKeyForFetch returns the on-chain account address string used for DB lookup. +// Keys may be raw addresses or DID strings ("did:...:address"); the last ':' segment +// of a DID is treated as the address. +func lookupKeyForFetch(addr string) string { + if strings.HasPrefix(addr, "did:") { + parts := strings.Split(addr, ":") + if len(parts) > 0 { + return parts[len(parts)-1] + } + } + return addr +} + +// FetchAccountsByAddresses looks up each address key in the provided set, +// converts found records to proto, and returns the slice. +// Not-found addresses are silently skipped (not an error). +func FetchAccountsByAddresses(ctx context.Context, blockInfo types.BlockInfo, addresses map[string]bool) ([]*accountspb.Account, error) { + accountMgr := blockInfo.NewAccountManager() + protoAccounts := make([]*accountspb.Account, 0, len(addresses)) + + for addr := range addresses { + lookup := lookupKeyForFetch(addr) + acc, err := accountMgr.GetAccountByAddress(lookup) + if err != nil { + Log.Logger(fetchLogger).Warn(ctx, "AccountsFetch: lookup failed", + ion.String("address", addr), + ion.String("lookup", lookup), + ion.Err(err)) + continue + } + if acc == nil { + continue + } + protoAccounts = append(protoAccounts, &accountspb.Account{ + DidAddress: acc.DIDAddress, + Address: acc.Address.Bytes(), + Balance: acc.Balance, + Nonce: acc.Nonce, + AccountType: acc.AccountType, + CreatedAt: acc.CreatedAt, + UpdatedAt: acc.UpdatedAt, + }) + } + + Log.Logger(fetchLogger).Info(ctx, "AccountsFetch complete", + ion.Int("requested", len(addresses)), + ion.Int("found", len(protoAccounts))) + + return protoAccounts, nil +} From dc178ebfeec799da90cb2f34bdb083dcd4059db4 Mon Sep 17 00:00:00 2001 From: Neeraj Chowdary <57310710+neerajchowdary889@users.noreply.github.com> Date: Fri, 15 May 2026 16:21:27 +0530 Subject: [PATCH 3/4] Add FetchAccounts functionality and related protocol updates Implement the FetchAccounts method in AccountSync to send targeted requests for specific accounts using the new AccountsSyncFetchProtocol. Update the Communicator interface and add necessary handlers in the Sync protocol to support this functionality. Enhance the accounts fetching process by ensuring proper error handling and logging. This update improves the overall account synchronization capabilities. --- common/types/constants/protocols.go | 1 + core/accountsync/accountsync.go | 52 +++++++++++++++++++ core/accountsync/interface.go | 12 +++++ core/priorsync/priorsync.go | 6 +++ core/protocol/communication/communication.go | 38 ++++++++++++++ .../helper/accounts/accounts_fetch_helper.go | 2 +- core/sync/sync_protocols.go | 40 ++++++++++++++ 7 files changed, 150 insertions(+), 1 deletion(-) diff --git a/common/types/constants/protocols.go b/common/types/constants/protocols.go index a3d7ced..b9f8739 100644 --- a/common/types/constants/protocols.go +++ b/common/types/constants/protocols.go @@ -11,6 +11,7 @@ const ( DataSyncProtocol protocol.ID = "/fastsync/v1/datasync" AccountsSyncProtocol protocol.ID = "/fastsync/v1/accountssync" AccountsSyncDataProtocol protocol.ID = "/fastsync/v1/accountssync/data" + AccountsSyncFetchProtocol protocol.ID = "/fastsync/v1/accountssync/fetch" PoTSProtocol protocol.ID = "/fastsync/v1/pots" ) diff --git a/core/accountsync/accountsync.go b/core/accountsync/accountsync.go index 800a399..a990621 100644 --- a/core/accountsync/accountsync.go +++ b/core/accountsync/accountsync.go @@ -128,6 +128,58 @@ func (as *AccountSync) AccountSync(remote *availabilitypb.AvailabilityResponse) return totalAccounts, nil } +// FetchAccounts sends a targeted AccountSyncRequestAccounts to the server on +// AccountsSyncFetchProtocol and returns the single-page AccountSyncResponse. +// Empty address maps short-circuit without a network round-trip. +func (as *AccountSync) FetchAccounts(remote *availabilitypb.AvailabilityResponse, addresses map[string]bool) (*accountspb.AccountSyncResponse, error) { + if remote == nil || remote.Nodeinfo == nil { + return nil, fmt.Errorf("accountsfetch: remote is nil") + } + if remote.Auth == nil || remote.Auth.UUID == "" { + return nil, fmt.Errorf("accountsfetch: remote has no auth UUID") + } + if as.Comm == nil { + return nil, fmt.Errorf("accountsfetch: communicator not set — call SetSyncVars first") + } + if len(addresses) == 0 { + return &accountspb.AccountSyncResponse{PageIndex: 0}, nil + } + + ctx := as.SyncVars.Ctx + + remoteNodeInfo, err := routerhelper.NewNodeInfoHelper().ToNodeinfo(remote.Nodeinfo) + if err != nil { + return nil, fmt.Errorf("accountsfetch: parse remote nodeinfo: %w", err) + } + + req := &accountspb.AccountSyncRequestAccounts{ + Addresses: addresses, + Phase: &phasepb.Phase{ + PresentPhase: constants.ACCOUNTS_SYNC_REQUEST_ACCOUNTS, + SuccessivePhase: constants.ACCOUNTS_SYNC_REQUEST_ACCOUNTS, + Auth: remote.Auth, + }, + } + + resp, err := as.Comm.FetchAccounts(ctx, *remoteNodeInfo, req) + if err != nil { + return nil, fmt.Errorf("accountsfetch: %w", err) + } + if resp == nil || !resp.GetAck().GetOk() { + errMsg := "unknown error" + if resp != nil { + errMsg = resp.GetAck().GetError() + } + return nil, fmt.Errorf("accountsfetch: server error: %s", errMsg) + } + + Log.Logger(Log.Sync).Info(ctx, "accountsfetch: complete", + ion.Int("requested", len(addresses)), + ion.Int("received", len(resp.GetAccounts()))) + + return resp, nil +} + func (as *AccountSync) Close() { if as.SyncVars != nil { as.SyncVars = nil diff --git a/core/accountsync/interface.go b/core/accountsync/interface.go index e60398a..fae909c 100644 --- a/core/accountsync/interface.go +++ b/core/accountsync/interface.go @@ -4,6 +4,7 @@ import ( "context" "github.com/JupiterMetaLabs/JMDN-FastSync/common/WAL" + accountspb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/accounts" availabilitypb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/availability" "github.com/JupiterMetaLabs/JMDN-FastSync/common/types" "github.com/libp2p/go-libp2p/core/host" @@ -48,6 +49,17 @@ type AccountSync_router interface { // diff, or stream delivery fails. AccountSync(remote *availabilitypb.AvailabilityResponse) (uint64, error) + // FetchAccounts sends a targeted request for specific accounts (by hex address or DID + // string) to the server. The server strips DIDs and looks up only plain accounts in + // its DB, returning them as a single AccountSyncResponse with page_index=0. + // + // Used post-PoTS / post-Reconciliation when tagging identifies accounts the client + // is missing that are not covered by the streaming AccountSync diff. + // + // Returns the server response. The caller is responsible for writing the returned + // accounts to the local DB. + FetchAccounts(remote *availabilitypb.AvailabilityResponse, addresses map[string]bool) (*accountspb.AccountSyncResponse, error) + // Close releases resources held by the router. Close() } diff --git a/core/priorsync/priorsync.go b/core/priorsync/priorsync.go index d4140fa..a077c3f 100644 --- a/core/priorsync/priorsync.go +++ b/core/priorsync/priorsync.go @@ -101,6 +101,7 @@ func (ps *PriorSync) SetupNetworkHandlers(debug bool) error { ps.SyncVars.Node.RemoveStreamHandler(constants.BlocksPUBSUB) ps.SyncVars.Node.RemoveStreamHandler(constants.AccountsSyncProtocol) ps.SyncVars.Node.RemoveStreamHandler(constants.AccountsSyncDataProtocol) + ps.SyncVars.Node.RemoveStreamHandler(constants.AccountsSyncFetchProtocol) } if ps.cancel != nil { @@ -149,6 +150,10 @@ func (ps *PriorSync) SetupNetworkHandlers(debug bool) error { return err } + if err := syncHandler.HandleAccountsFetch(ctx, ps.SyncVars.Node); err != nil { + return err + } + // Block until parent or Close() cancels <-ctx.Done() @@ -263,5 +268,6 @@ func (ps *PriorSync) Close() { node.RemoveStreamHandler(constants.BlocksPUBSUB) node.RemoveStreamHandler(constants.AccountsSyncProtocol) node.RemoveStreamHandler(constants.AccountsSyncDataProtocol) + node.RemoveStreamHandler(constants.AccountsSyncFetchProtocol) } } diff --git a/core/protocol/communication/communication.go b/core/protocol/communication/communication.go index 99d6fa3..ba5e1e3 100644 --- a/core/protocol/communication/communication.go +++ b/core/protocol/communication/communication.go @@ -50,6 +50,11 @@ type Communicator interface { // StreamAccounts delivers one AccountSyncResponse page from the server to the // client by dialling AccountsSyncDataProtocol (dial-back). Returns the client ack. StreamAccounts(ctx context.Context, peerNode types.Nodeinfo, resp *accountspb.AccountSyncServerMessage) (*accountspb.AccountSyncServerMessage, error) + + // FetchAccounts sends a targeted AccountSyncRequestAccounts to the server and + // returns a single AccountSyncResponse (page_index=0). Used post-PoTS / + // post-Reconciliation when specific missing accounts are identified by address or DID. + FetchAccounts(ctx context.Context, peerNode types.Nodeinfo, req *accountspb.AccountSyncRequestAccounts) (*accountspb.AccountSyncResponse, error) } func NewCommunication(host host.Host, protocolVersion uint16) Communicator { @@ -349,6 +354,39 @@ func (c *communication) StreamAccounts( return ack, nil } +// FetchAccounts sends an AccountSyncRequestAccounts to the server on +// AccountsSyncFetchProtocol and returns the single-page AccountSyncResponse. +func (c *communication) FetchAccounts( + ctx context.Context, + peerNode types.Nodeinfo, + req *accountspb.AccountSyncRequestAccounts, +) (*accountspb.AccountSyncResponse, error) { + if c.host == nil { + return nil, errors.New("host is nil") + } + + peerInfo := libp2p_peer.AddrInfo{ + ID: peerNode.PeerID, + Addrs: peerNode.Multiaddr, + } + + resp := &accountspb.AccountSyncResponse{} + + if err := messaging.SendProtoDelimited( + ctx, + c.protocolVersion, + c.host, + peerInfo, + constants.AccountsSyncFetchProtocol, + req, + resp, + ); err != nil { + return nil, errors.New("failed to send accounts fetch request: " + err.Error()) + } + + return resp, nil +} + func (c *communication) SendPoTSRequest( ctx context.Context, peerNode types.Nodeinfo, diff --git a/core/protocol/router/helper/accounts/accounts_fetch_helper.go b/core/protocol/router/helper/accounts/accounts_fetch_helper.go index db678e2..0907c66 100644 --- a/core/protocol/router/helper/accounts/accounts_fetch_helper.go +++ b/core/protocol/router/helper/accounts/accounts_fetch_helper.go @@ -48,7 +48,7 @@ func FetchAccountsByAddresses(ctx context.Context, blockInfo types.BlockInfo, ad protoAccounts = append(protoAccounts, &accountspb.Account{ DidAddress: acc.DIDAddress, Address: acc.Address.Bytes(), - Balance: acc.Balance, + Balance: "0", // Server should only send the zero balance during fetch accounts steps Nonce: acc.Nonce, AccountType: acc.AccountType, CreatedAt: acc.CreatedAt, diff --git a/core/sync/sync_protocols.go b/core/sync/sync_protocols.go index 316cedc..64e0418 100644 --- a/core/sync/sync_protocols.go +++ b/core/sync/sync_protocols.go @@ -60,6 +60,7 @@ type sync_interface interface { HandlePubsub(ctx context.Context, node host.Host) error HandleAccountsSync(ctx context.Context, node host.Host) error HandleAccountsSyncData(ctx context.Context, node host.Host) error + HandleAccountsFetch(ctx context.Context, node host.Host) error Debug(ctx context.Context, protocol protocol.ID, node host.Host, remote *types.Nodeinfo) } @@ -717,6 +718,45 @@ func (s *Sync) HandleAccountsSyncData(ctx context.Context, node host.Host) error return nil } +// HandleAccountsFetch registers the server-side handler for AccountsSyncFetchProtocol. +// The client sends one AccountSyncRequestAccounts; the server resolves the addresses, +// strips DIDs, and returns a single AccountSyncResponse (page_index=0). +// Pattern: stateless read → route → write (no heartbeat — targeted DB lookup). +func (s *Sync) HandleAccountsFetch(ctx context.Context, node host.Host) error { + node.SetStreamHandler(constants.AccountsSyncFetchProtocol, func(str network.Stream) { + defer str.Close() + + select { + case <-ctx.Done(): + return + default: + } + + _ = str.SetReadDeadline(time.Now().Add(constants.StreamDeadline)) + defer str.SetReadDeadline(time.Time{}) + + req := &accountspb.AccountSyncRequestAccounts{} + if err := pbstream.ReadDelimited(str, req); err != nil { + return + } + + remoteNodeInfo := &types.Nodeinfo{ + PeerID: str.Conn().RemotePeer(), + Multiaddr: []multiaddr.Multiaddr{str.Conn().RemoteMultiaddr()}, + Version: s.nodeinfo.Version, + } + + resp := s.Datarouter.HandleAccountsFetch(ctx, req, remoteNodeInfo) + s.Debug(ctx, constants.AccountsSyncFetchProtocol, node, remoteNodeInfo) + + _ = str.SetWriteDeadline(time.Now().Add(constants.StreamDeadline)) + defer str.SetWriteDeadline(time.Time{}) + + _ = pbstream.WriteDelimited(str, resp) + }) + return nil +} + func (s *Sync) Debug(ctx context.Context, protocol protocol.ID, node host.Host, remote *types.Nodeinfo) { if s.debug && remote != nil { logging.Logger(logging.Sync).Debug(ctx, "Sync Protocols Debug", From 357c59655fd78033c44f6ad0c6e9bd42afdca706 Mon Sep 17 00:00:00 2001 From: Neeraj Chowdary <57310710+neerajchowdary889@users.noreply.github.com> Date: Mon, 18 May 2026 11:17:32 +0530 Subject: [PATCH 4/4] Enhance Reconciliation process with remote account fetching Update the Reconcile method in the Reconciliation router to accept an additional parameter for remote availability responses. Implement a new phase to pre-create accounts missing from the local database by fetching them from the remote source. This improves the account reconciliation process by ensuring that all relevant accounts are accounted for, even if they were not previously present in the local database. --- core/reconsillation/interface.go | 3 +- core/reconsillation/reconsillation.go | 60 ++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/core/reconsillation/interface.go b/core/reconsillation/interface.go index 836bd03..129c61a 100644 --- a/core/reconsillation/interface.go +++ b/core/reconsillation/interface.go @@ -4,6 +4,7 @@ import ( "context" "github.com/JupiterMetaLabs/JMDN-FastSync/common/WAL" + availabilitypb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/availability" "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/tagging" "github.com/JupiterMetaLabs/JMDN-FastSync/common/types" ) @@ -24,7 +25,7 @@ type Reconciliation_router interface { // Reconcile calculates updated balances for all tagged accounts by querying // their transactions and updates the accounts table in the database. // Returns the number of accounts successfully reconciled and list of failed accounts. - Reconcile(taggedAccounts *tagging.TaggedAccounts) (int, []string, error) + Reconcile(taggedAccounts *tagging.TaggedAccounts, remote *availabilitypb.AvailabilityResponse) (int, []string, error) // Close releases resources and cleans up. Close() diff --git a/core/reconsillation/reconsillation.go b/core/reconsillation/reconsillation.go index b83bdf6..f8fab90 100644 --- a/core/reconsillation/reconsillation.go +++ b/core/reconsillation/reconsillation.go @@ -11,9 +11,11 @@ import ( "github.com/JupiterMetaLabs/JMDN-FastSync/common/WAL" blockpb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/block" "github.com/ethereum/go-ethereum/common" + availabilitypb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/availability" taggingpb "github.com/JupiterMetaLabs/JMDN-FastSync/common/proto/tagging" "github.com/JupiterMetaLabs/JMDN-FastSync/common/types" "github.com/JupiterMetaLabs/JMDN-FastSync/common/types/constants" + "github.com/JupiterMetaLabs/JMDN-FastSync/core/accountsync" "github.com/JupiterMetaLabs/JMDN-FastSync/core/reconsillation/LRUCache" "github.com/JupiterMetaLabs/JMDN-FastSync/core/reconsillation/helper" Log "github.com/JupiterMetaLabs/JMDN-FastSync/logging" @@ -72,6 +74,7 @@ func (r *Reconciliation) GetLRUCache() LRUCache.LRUCacheInterface { return r.headerCache } + // GetSyncVars returns the current sync configuration. func (r *Reconciliation) GetSyncVars() *types.Syncvars { return r.SyncVars @@ -123,7 +126,7 @@ func (r *Reconciliation) GetBlockFromLRUCache(blockNumber uint64) (*blockpb.Head // // Returns the number of accounts reconciled and the list of any accounts that failed // during the computation phase. A non-nil error always means the DB was NOT mutated. -func (r *Reconciliation) Reconcile(taggedAccounts *taggingpb.TaggedAccounts) (int, []string, error) { +func (r *Reconciliation) Reconcile(taggedAccounts *taggingpb.TaggedAccounts, remote *availabilitypb.AvailabilityResponse) (int, []string, error) { if taggedAccounts == nil || len(taggedAccounts.Accounts) == 0 { Log.Logger(namedlogger).Info(r.SyncVars.Ctx, "No tagged accounts to reconcile") return 0, nil, nil @@ -209,6 +212,61 @@ func (r *Reconciliation) Reconcile(taggedAccounts *taggingpb.TaggedAccounts) (in Log.Logger(namedlogger).Info(ctx, "Phase 1 complete — all account states ready", ion.Int("accounts_ready", len(updates))) + // ---------------------------------------------------------------- + // Phase 1.5: Pre-create accounts missing from local DB. + // AccountSync covers ~99% of zero-balance accounts before reconciliation; + // this handles stragglers. Fetching from the remote and writing them with + // zero balance lets Phase 3 issue an UPDATE instead of a CREATE, keeping + // the flow uniform. Accounts the remote also doesn't know about remain + // IsNewAccount=true and are handled by BatchUpdateAccounts as before. + // ---------------------------------------------------------------- + if remote != nil { + missingAddrs := make(map[string]bool) + missingIdx := make(map[string]int) + for updateIdx, update := range updates { + if update.IsNewAccount { + missingAddrs[update.Address] = true + missingIdx[update.Address] = updateIdx + } + } + + if len(missingAddrs) > 0 { + Log.Logger(namedlogger).Debug(ctx, "Phase 1.5: fetching missing accounts from remote", + ion.Int("missing_count", len(missingAddrs))) + + syncVars := r.GetSyncVars() + acctSync := accountsync.NewAccountSync().SetSyncVars(syncVars.Ctx, syncVars.Version, syncVars.NodeInfo, syncVars.Node, syncVars.WAL) + resp, err := acctSync.FetchAccounts(remote, missingAddrs) + if err != nil { + Log.Logger(namedlogger).Warn(ctx, "Phase 1.5: remote fetch failed — BatchUpdateAccounts will create them", + ion.Err(err)) + } else if resp != nil { + created := 0 + for _, acc := range resp.GetAccounts() { + addrBytes := acc.GetAddress() + if len(addrBytes) == 0 { + continue + } + addr := strings.ToLower(common.BytesToAddress(addrBytes).Hex()) + idx, ok := missingIdx[addr] + if !ok { + continue + } + if err := accountManager.CreateAccount(addr, big.NewInt(0), acc.GetNonce()); err != nil { + Log.Logger(namedlogger).Warn(ctx, "Phase 1.5: failed to pre-create account", + ion.String("address", addr), ion.Err(err)) + continue + } + updates[idx].IsNewAccount = false + created++ + } + Log.Logger(namedlogger).Info(ctx, "Phase 1.5 complete — pre-created missing accounts from remote", + ion.Int("requested", len(missingAddrs)), + ion.Int("created", created)) + } + } + } + // ---------------------------------------------------------------- // Phase 2: Write all planned changes to WAL as a single batch event. // Writing one ReconciliationBatchEvent instead of N individual events