From 89e9d3e064d2a0244e58547455035b9f4e6afb1c Mon Sep 17 00:00:00 2001 From: hanish520 Date: Mon, 12 Jan 2026 01:03:56 -0800 Subject: [PATCH 1/4] initial commit for new client --- client/client.go | 77 ++++++-- internal/proto/clientpb/client.pb.go | 180 +++++++++++++++--- internal/proto/clientpb/client.proto | 19 +- internal/proto/clientpb/client_gorums.pb.go | 97 +++++----- internal/proto/hotstuffpb/hotstuff.pb.go | 16 +- .../proto/hotstuffpb/hotstuff_gorums.pb.go | 12 +- server/clientio.go | 163 +++++++++++++--- types.go | 10 + 8 files changed, 439 insertions(+), 135 deletions(-) diff --git a/client/client.go b/client/client.go index 7af896a90..7ff24768a 100644 --- a/client/client.go +++ b/client/client.go @@ -9,6 +9,8 @@ import ( "errors" "io" "math" + "math/rand" + "strings" "sync" "time" @@ -41,7 +43,6 @@ func (q *qspec) ExecCommandQF(_ *clientpb.Command, signatures map[uint32]*emptyp type pendingCmd struct { sequenceNumber uint64 sendTime time.Time - promise *clientpb.AsyncEmpty cancelCtx context.CancelFunc } @@ -182,6 +183,8 @@ func (c *Client) Stop() { } func (c *Client) close() { + // Signal the command handler to stop fetching statuses before closing the manager. + close(c.pendingCmds) c.mgr.Close() err := c.reader.Close() if err != nil { @@ -238,8 +241,8 @@ loop: } ctx, cancel := context.WithTimeout(ctx, c.timeout) - promise := c.gorumsConfig.ExecCommand(ctx, cmd) - pending := pendingCmd{sequenceNumber: num, sendTime: time.Now(), promise: promise, cancelCtx: cancel} + c.gorumsConfig.ExecCommand(ctx, cmd) + pending := pendingCmd{sequenceNumber: num, sendTime: time.Now(), cancelCtx: cancel} num++ select { @@ -257,6 +260,52 @@ loop: return nil } +func (c *Client) fetchCommandStatus(sequenceNumber uint64) hotstuff.CommandStatus { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + timeout := time.After(c.timeout) + + for { + select { + case <-ticker.C: + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + cmd := &clientpb.Command{ + ClientID: uint32(c.id), + SequenceNumber: sequenceNumber, + } + nodes := c.gorumsConfig.Nodes() + if len(nodes) == 0 { + c.logger.Error("No nodes available in gorums config") + cancel() + continue + } + node := nodes[rand.Intn(len(nodes))] + response, err := node.CommandStatus(ctx, cmd) + cancel() + + if err != nil { + c.logger.Errorf("Failed to fetch command status (client: %d, sequence: %d): %v", c.id, sequenceNumber, err) + // If the node/manager was closed, stop trying and return UNKNOWN. + if strings.Contains(err.Error(), "node closed") { + return hotstuff.UNKNOWN + } + continue + } + if response == nil || response.Command == nil { + c.logger.Errorf("Invalid response received when fetching command status (client: %d, sequence: %d)", c.id, sequenceNumber) + continue + } + c.logger.Infof("Fetched command status (client: %d, sequence: %d, status: %d)", c.id, sequenceNumber, response.Status) + status := hotstuff.CommandStatus(response.Status) + if status == hotstuff.COMMITTED || status == hotstuff.EXECUTED || status == hotstuff.FAILED { + return status + } + case <-timeout: + return hotstuff.UNKNOWN + } + } +} + // handleCommands will get pending commands from the pendingCmds channel and then // handle them as they become acknowledged by the replicas. We expect the commands to be // acknowledged in the order that they were sent. @@ -274,16 +323,17 @@ func (c *Client) handleCommands(ctx context.Context) (executed, failed, timeout case <-ctx.Done(): return } - _, err := cmd.promise.Get() - if err != nil { - if errors.Is(err, context.DeadlineExceeded) { - c.logger.Debug("Command timed out.") - timeout++ - } else if !errors.Is(err, context.Canceled) { - c.logger.Debugf("Did not get enough replies for command: %v\n", err) - failed++ - } - } else { + response := c.fetchCommandStatus(cmd.sequenceNumber) + + switch response { + case hotstuff.UNKNOWN: + c.logger.Infof("Command timed out (client: %d, sequence: %d)", c.id, cmd.sequenceNumber) + timeout++ + case hotstuff.FAILED: + c.logger.Infof("Command failed (client: %d, sequence: %d)", c.id, cmd.sequenceNumber) + failed++ + default: + c.logger.Infof("Command executed (client: %d, sequence: %d)", c.id, cmd.sequenceNumber) executed++ } c.mut.Lock() @@ -291,7 +341,6 @@ func (c *Client) handleCommands(ctx context.Context) (executed, failed, timeout c.highestCommitted = cmd.sequenceNumber } c.mut.Unlock() - duration := time.Since(cmd.sendTime) c.eventLoop.AddEvent(LatencyMeasurementEvent{Latency: duration}) } diff --git a/internal/proto/clientpb/client.pb.go b/internal/proto/clientpb/client.pb.go index b38b8d7d3..0331d0fd1 100644 --- a/internal/proto/clientpb/client.pb.go +++ b/internal/proto/clientpb/client.pb.go @@ -1,16 +1,16 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.10 -// protoc v6.33.0 +// protoc-gen-go v1.36.11 +// protoc v3.6.0 // source: internal/proto/clientpb/client.proto package clientpb import ( + empty "github.com/golang/protobuf/ptypes/empty" _ "github.com/relab/gorums" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - emptypb "google.golang.org/protobuf/types/known/emptypb" reflect "reflect" sync "sync" unsafe "unsafe" @@ -23,6 +23,113 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type CommandStatusResponse_Status int32 + +const ( + CommandStatusResponse_UNKNOWN CommandStatusResponse_Status = 0 + CommandStatusResponse_PENDING CommandStatusResponse_Status = 1 + CommandStatusResponse_COMMITTED CommandStatusResponse_Status = 2 + CommandStatusResponse_EXECUTED CommandStatusResponse_Status = 3 + CommandStatusResponse_FAILED CommandStatusResponse_Status = 4 +) + +// Enum value maps for CommandStatusResponse_Status. +var ( + CommandStatusResponse_Status_name = map[int32]string{ + 0: "UNKNOWN", + 1: "PENDING", + 2: "COMMITTED", + 3: "EXECUTED", + 4: "FAILED", + } + CommandStatusResponse_Status_value = map[string]int32{ + "UNKNOWN": 0, + "PENDING": 1, + "COMMITTED": 2, + "EXECUTED": 3, + "FAILED": 4, + } +) + +func (x CommandStatusResponse_Status) Enum() *CommandStatusResponse_Status { + p := new(CommandStatusResponse_Status) + *p = x + return p +} + +func (x CommandStatusResponse_Status) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (CommandStatusResponse_Status) Descriptor() protoreflect.EnumDescriptor { + return file_internal_proto_clientpb_client_proto_enumTypes[0].Descriptor() +} + +func (CommandStatusResponse_Status) Type() protoreflect.EnumType { + return &file_internal_proto_clientpb_client_proto_enumTypes[0] +} + +func (x CommandStatusResponse_Status) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use CommandStatusResponse_Status.Descriptor instead. +func (CommandStatusResponse_Status) EnumDescriptor() ([]byte, []int) { + return file_internal_proto_clientpb_client_proto_rawDescGZIP(), []int{0, 0} +} + +type CommandStatusResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Status CommandStatusResponse_Status `protobuf:"varint,1,opt,name=status,proto3,enum=clientpb.CommandStatusResponse_Status" json:"status,omitempty"` + Command *Command `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CommandStatusResponse) Reset() { + *x = CommandStatusResponse{} + mi := &file_internal_proto_clientpb_client_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CommandStatusResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CommandStatusResponse) ProtoMessage() {} + +func (x *CommandStatusResponse) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_clientpb_client_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CommandStatusResponse.ProtoReflect.Descriptor instead. +func (*CommandStatusResponse) Descriptor() ([]byte, []int) { + return file_internal_proto_clientpb_client_proto_rawDescGZIP(), []int{0} +} + +func (x *CommandStatusResponse) GetStatus() CommandStatusResponse_Status { + if x != nil { + return x.Status + } + return CommandStatusResponse_UNKNOWN +} + +func (x *CommandStatusResponse) GetCommand() *Command { + if x != nil { + return x.Command + } + return nil +} + // Command is the request that is sent to the HotStuff replicas with the data to // be executed. type Command struct { @@ -36,7 +143,7 @@ type Command struct { func (x *Command) Reset() { *x = Command{} - mi := &file_internal_proto_clientpb_client_proto_msgTypes[0] + mi := &file_internal_proto_clientpb_client_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -48,7 +155,7 @@ func (x *Command) String() string { func (*Command) ProtoMessage() {} func (x *Command) ProtoReflect() protoreflect.Message { - mi := &file_internal_proto_clientpb_client_proto_msgTypes[0] + mi := &file_internal_proto_clientpb_client_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -61,7 +168,7 @@ func (x *Command) ProtoReflect() protoreflect.Message { // Deprecated: Use Command.ProtoReflect.Descriptor instead. func (*Command) Descriptor() ([]byte, []int) { - return file_internal_proto_clientpb_client_proto_rawDescGZIP(), []int{0} + return file_internal_proto_clientpb_client_proto_rawDescGZIP(), []int{1} } func (x *Command) GetClientID() uint32 { @@ -95,7 +202,7 @@ type Batch struct { func (x *Batch) Reset() { *x = Batch{} - mi := &file_internal_proto_clientpb_client_proto_msgTypes[1] + mi := &file_internal_proto_clientpb_client_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -107,7 +214,7 @@ func (x *Batch) String() string { func (*Batch) ProtoMessage() {} func (x *Batch) ProtoReflect() protoreflect.Message { - mi := &file_internal_proto_clientpb_client_proto_msgTypes[1] + mi := &file_internal_proto_clientpb_client_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -120,7 +227,7 @@ func (x *Batch) ProtoReflect() protoreflect.Message { // Deprecated: Use Batch.ProtoReflect.Descriptor instead. func (*Batch) Descriptor() ([]byte, []int) { - return file_internal_proto_clientpb_client_proto_rawDescGZIP(), []int{1} + return file_internal_proto_clientpb_client_proto_rawDescGZIP(), []int{2} } func (x *Batch) GetCommands() []*Command { @@ -134,15 +241,26 @@ var File_internal_proto_clientpb_client_proto protoreflect.FileDescriptor const file_internal_proto_clientpb_client_proto_rawDesc = "" + "\n" + - "$internal/proto/clientpb/client.proto\x12\bclientpb\x1a\fgorums.proto\x1a\x1bgoogle/protobuf/empty.proto\"a\n" + + "$internal/proto/clientpb/client.proto\x12\bclientpb\x1a\fgorums.proto\x1a\x1bgoogle/protobuf/empty.proto\"\xd1\x01\n" + + "\x15CommandStatusResponse\x12>\n" + + "\x06status\x18\x01 \x01(\x0e2&.clientpb.CommandStatusResponse.StatusR\x06status\x12+\n" + + "\acommand\x18\x02 \x01(\v2\x11.clientpb.CommandR\acommand\"K\n" + + "\x06Status\x12\v\n" + + "\aUNKNOWN\x10\x00\x12\v\n" + + "\aPENDING\x10\x01\x12\r\n" + + "\tCOMMITTED\x10\x02\x12\f\n" + + "\bEXECUTED\x10\x03\x12\n" + + "\n" + + "\x06FAILED\x10\x04\"a\n" + "\aCommand\x12\x1a\n" + "\bClientID\x18\x01 \x01(\rR\bClientID\x12&\n" + "\x0eSequenceNumber\x18\x02 \x01(\x04R\x0eSequenceNumber\x12\x12\n" + "\x04Data\x18\x03 \x01(\fR\x04Data\"6\n" + "\x05Batch\x12-\n" + - "\bCommands\x18\x01 \x03(\v2\x11.clientpb.CommandR\bCommands2L\n" + - "\x06Client\x12B\n" + - "\vExecCommand\x12\x11.clientpb.Command\x1a\x16.google.protobuf.Empty\"\b\xa0\xb5\x18\x01ะต\x18\x01B3Z1github.com/relab/hotstuff/internal/proto/clientpbb\x06proto3" + "\bCommands\x18\x01 \x03(\v2\x11.clientpb.CommandR\bCommands2\x8f\x01\n" + + "\x06Client\x12>\n" + + "\vExecCommand\x12\x11.clientpb.Command\x1a\x16.google.protobuf.Empty\"\x04\x98\xb5\x18\x01\x12E\n" + + "\rCommandStatus\x12\x11.clientpb.Command\x1a\x1f.clientpb.CommandStatusResponse\"\x00B3Z1github.com/relab/hotstuff/internal/proto/clientpbb\x06proto3" var ( file_internal_proto_clientpb_client_proto_rawDescOnce sync.Once @@ -156,21 +274,28 @@ func file_internal_proto_clientpb_client_proto_rawDescGZIP() []byte { return file_internal_proto_clientpb_client_proto_rawDescData } -var file_internal_proto_clientpb_client_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_internal_proto_clientpb_client_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_internal_proto_clientpb_client_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_internal_proto_clientpb_client_proto_goTypes = []any{ - (*Command)(nil), // 0: clientpb.Command - (*Batch)(nil), // 1: clientpb.Batch - (*emptypb.Empty)(nil), // 2: google.protobuf.Empty + (CommandStatusResponse_Status)(0), // 0: clientpb.CommandStatusResponse.Status + (*CommandStatusResponse)(nil), // 1: clientpb.CommandStatusResponse + (*Command)(nil), // 2: clientpb.Command + (*Batch)(nil), // 3: clientpb.Batch + (*empty.Empty)(nil), // 4: google.protobuf.Empty } var file_internal_proto_clientpb_client_proto_depIdxs = []int32{ - 0, // 0: clientpb.Batch.Commands:type_name -> clientpb.Command - 0, // 1: clientpb.Client.ExecCommand:input_type -> clientpb.Command - 2, // 2: clientpb.Client.ExecCommand:output_type -> google.protobuf.Empty - 2, // [2:3] is the sub-list for method output_type - 1, // [1:2] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name + 0, // 0: clientpb.CommandStatusResponse.status:type_name -> clientpb.CommandStatusResponse.Status + 2, // 1: clientpb.CommandStatusResponse.command:type_name -> clientpb.Command + 2, // 2: clientpb.Batch.Commands:type_name -> clientpb.Command + 2, // 3: clientpb.Client.ExecCommand:input_type -> clientpb.Command + 2, // 4: clientpb.Client.CommandStatus:input_type -> clientpb.Command + 4, // 5: clientpb.Client.ExecCommand:output_type -> google.protobuf.Empty + 1, // 6: clientpb.Client.CommandStatus:output_type -> clientpb.CommandStatusResponse + 5, // [5:7] is the sub-list for method output_type + 3, // [3:5] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_internal_proto_clientpb_client_proto_init() } @@ -183,13 +308,14 @@ func file_internal_proto_clientpb_client_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_internal_proto_clientpb_client_proto_rawDesc), len(file_internal_proto_clientpb_client_proto_rawDesc)), - NumEnums: 0, - NumMessages: 2, + NumEnums: 1, + NumMessages: 3, NumExtensions: 0, NumServices: 1, }, GoTypes: file_internal_proto_clientpb_client_proto_goTypes, DependencyIndexes: file_internal_proto_clientpb_client_proto_depIdxs, + EnumInfos: file_internal_proto_clientpb_client_proto_enumTypes, MessageInfos: file_internal_proto_clientpb_client_proto_msgTypes, }.Build() File_internal_proto_clientpb_client_proto = out.File diff --git a/internal/proto/clientpb/client.proto b/internal/proto/clientpb/client.proto index 05badc4b2..59ed1e90a 100644 --- a/internal/proto/clientpb/client.proto +++ b/internal/proto/clientpb/client.proto @@ -12,11 +12,26 @@ service Client { // ExecCommand sends a command to all replicas and waits for valid signatures // from f+1 replicas rpc ExecCommand(Command) returns (google.protobuf.Empty) { - option (gorums.quorumcall) = true; - option (gorums.async) = true; + option (gorums.multicast) = true; } + + rpc CommandStatus(Command) returns (CommandStatusResponse) { + } +} + +message CommandStatusResponse { + enum Status { + UNKNOWN = 0; + PENDING = 1; + COMMITTED = 2; + EXECUTED = 3; + FAILED = 4; + } + Status status = 1; + Command command = 2; } + // Command is the request that is sent to the HotStuff replicas with the data to // be executed. message Command { diff --git a/internal/proto/clientpb/client_gorums.pb.go b/internal/proto/clientpb/client_gorums.pb.go index 534bfd2e0..97b4610f5 100644 --- a/internal/proto/clientpb/client_gorums.pb.go +++ b/internal/proto/clientpb/client_gorums.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-gorums. DO NOT EDIT. // versions: // protoc-gen-gorums v0.10.0-devel -// protoc v6.33.0 +// protoc v3.6.0 // source: internal/proto/clientpb/client.proto package clientpb @@ -9,10 +9,9 @@ package clientpb import ( context "context" fmt "fmt" + empty "github.com/golang/protobuf/ptypes/empty" gorums "github.com/relab/gorums" encoding "google.golang.org/grpc/encoding" - proto "google.golang.org/protobuf/proto" - emptypb "google.golang.org/protobuf/types/known/emptypb" ) const ( @@ -150,75 +149,69 @@ type Node struct { *gorums.RawNode } +// ClientClient is the client interface for the Client service. +type ClientClient interface { + ExecCommand(ctx context.Context, in *Command, opts ...gorums.CallOption) +} + +// enforce interface compliance +var _ ClientClient = (*Configuration)(nil) + +// ClientNodeClient is the single node client interface for the Client service. +type ClientNodeClient interface { + CommandStatus(ctx context.Context, in *Command) (resp *CommandStatusResponse, err error) +} + +// enforce interface compliance +var _ ClientNodeClient = (*Node)(nil) + +// Reference imports to suppress errors if they are not otherwise used. +var _ empty.Empty + // ExecCommand sends a command to all replicas and waits for valid signatures // from f+1 replicas -func (c *Configuration) ExecCommand(ctx context.Context, in *Command) *AsyncEmpty { +func (c *Configuration) ExecCommand(ctx context.Context, in *Command, opts ...gorums.CallOption) { cd := gorums.QuorumCallData{ Message: in, Method: "clientpb.Client.ExecCommand", } - cd.QuorumFunction = func(req proto.Message, replies map[uint32]proto.Message) (proto.Message, bool) { - r := make(map[uint32]*emptypb.Empty, len(replies)) - for k, v := range replies { - r[k] = v.(*emptypb.Empty) - } - return c.qspec.ExecCommandQF(req.(*Command), r) - } - fut := c.RawConfiguration.AsyncCall(ctx, cd) - return &AsyncEmpty{fut} -} - -// ClientClient is the client interface for the Client service. -type ClientClient interface { - ExecCommand(ctx context.Context, in *Command) *AsyncEmpty + c.RawConfiguration.Multicast(ctx, cd, opts...) } -// enforce interface compliance -var _ ClientClient = (*Configuration)(nil) +// There are no quorum calls. +type QuorumSpec interface{} -// QuorumSpec is the interface of quorum functions for Client. -type QuorumSpec interface { - gorums.ConfigOption +// CommandStatus is a quorum call invoked on all nodes in configuration c, +// with the same argument in, and returns a combined result. +func (n *Node) CommandStatus(ctx context.Context, in *Command) (resp *CommandStatusResponse, err error) { + cd := gorums.CallData{ + Message: in, + Method: "clientpb.Client.CommandStatus", + } - // ExecCommandQF is the quorum function for the ExecCommand - // asynchronous quorum call method. The in parameter is the request object - // supplied to the ExecCommand method at call time, and may or may not - // be used by the quorum function. If the in parameter is not needed - // you should implement your quorum function with '_ *Command'. - ExecCommandQF(in *Command, replies map[uint32]*emptypb.Empty) (*emptypb.Empty, bool) + res, err := n.RawNode.RPCCall(ctx, cd) + if err != nil { + return nil, err + } + return res.(*CommandStatusResponse), err } // Client is the server-side API for the Client Service type ClientServer interface { - ExecCommand(ctx gorums.ServerCtx, request *Command) (response *emptypb.Empty, err error) + ExecCommand(ctx gorums.ServerCtx, request *Command) + CommandStatus(ctx gorums.ServerCtx, request *Command) (response *CommandStatusResponse, err error) } func RegisterClientServer(srv *gorums.Server, impl ClientServer) { srv.RegisterHandler("clientpb.Client.ExecCommand", func(ctx gorums.ServerCtx, in *gorums.Message) (*gorums.Message, error) { req := gorums.AsProto[*Command](in) - resp, err := impl.ExecCommand(ctx, req) + impl.ExecCommand(ctx, req) + return nil, nil + }) + srv.RegisterHandler("clientpb.Client.CommandStatus", func(ctx gorums.ServerCtx, in *gorums.Message) (*gorums.Message, error) { + req := gorums.AsProto[*Command](in) + resp, err := impl.CommandStatus(ctx, req) return gorums.NewResponseMessage(in.GetMetadata(), resp), err }) } - -type internalEmpty struct { - nid uint32 - reply *emptypb.Empty - err error -} - -// AsyncEmpty is a async object for processing replies. -type AsyncEmpty struct { - *gorums.Async -} - -// Get returns the reply and any error associated with the called method. -// The method blocks until a reply or error is available. -func (f *AsyncEmpty) Get() (*emptypb.Empty, error) { - resp, err := f.Async.Get() - if err != nil { - return nil, err - } - return resp.(*emptypb.Empty), err -} diff --git a/internal/proto/hotstuffpb/hotstuff.pb.go b/internal/proto/hotstuffpb/hotstuff.pb.go index a01e409a9..0d1290dc0 100644 --- a/internal/proto/hotstuffpb/hotstuff.pb.go +++ b/internal/proto/hotstuffpb/hotstuff.pb.go @@ -1,18 +1,18 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.10 -// protoc v6.33.0 +// protoc-gen-go v1.36.11 +// protoc v3.6.0 // source: internal/proto/hotstuffpb/hotstuff.proto package hotstuffpb import ( + empty "github.com/golang/protobuf/ptypes/empty" + timestamp "github.com/golang/protobuf/ptypes/timestamp" _ "github.com/relab/gorums" clientpb "github.com/relab/hotstuff/internal/proto/clientpb" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - emptypb "google.golang.org/protobuf/types/known/emptypb" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" unsafe "unsafe" @@ -128,7 +128,7 @@ type Block struct { View uint64 `protobuf:"varint,3,opt,name=View,proto3" json:"View,omitempty"` Commands *clientpb.Batch `protobuf:"bytes,4,opt,name=Commands,proto3" json:"Commands,omitempty"` Proposer uint32 `protobuf:"varint,5,opt,name=Proposer,proto3" json:"Proposer,omitempty"` - Timestamp *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=Timestamp,proto3" json:"Timestamp,omitempty"` + Timestamp *timestamp.Timestamp `protobuf:"bytes,6,opt,name=Timestamp,proto3" json:"Timestamp,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -198,7 +198,7 @@ func (x *Block) GetProposer() uint32 { return 0 } -func (x *Block) GetTimestamp() *timestamppb.Timestamp { +func (x *Block) GetTimestamp() *timestamp.Timestamp { if x != nil { return x.Timestamp } @@ -1152,8 +1152,8 @@ var file_internal_proto_hotstuffpb_hotstuff_proto_goTypes = []any{ (*AggQC)(nil), // 16: hotstuffpb.AggQC nil, // 17: hotstuffpb.AggQC.QCsEntry (*clientpb.Batch)(nil), // 18: clientpb.Batch - (*timestamppb.Timestamp)(nil), // 19: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 20: google.protobuf.Empty + (*timestamp.Timestamp)(nil), // 19: google.protobuf.Timestamp + (*empty.Empty)(nil), // 20: google.protobuf.Empty } var file_internal_proto_hotstuffpb_hotstuff_proto_depIdxs = []int32{ 2, // 0: hotstuffpb.Proposal.Block:type_name -> hotstuffpb.Block diff --git a/internal/proto/hotstuffpb/hotstuff_gorums.pb.go b/internal/proto/hotstuffpb/hotstuff_gorums.pb.go index e3779e4b5..d375c988c 100644 --- a/internal/proto/hotstuffpb/hotstuff_gorums.pb.go +++ b/internal/proto/hotstuffpb/hotstuff_gorums.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-gorums. DO NOT EDIT. // versions: // protoc-gen-gorums v0.10.0-devel -// protoc v6.33.0 +// protoc v3.6.0 // source: internal/proto/hotstuffpb/hotstuff.proto package hotstuffpb @@ -9,10 +9,10 @@ package hotstuffpb import ( context "context" fmt "fmt" + empty "github.com/golang/protobuf/ptypes/empty" gorums "github.com/relab/gorums" encoding "google.golang.org/grpc/encoding" proto "google.golang.org/protobuf/proto" - emptypb "google.golang.org/protobuf/types/known/emptypb" ) const ( @@ -170,7 +170,7 @@ type ConsensusNodeClient interface { var _ ConsensusNodeClient = (*Node)(nil) // Reference imports to suppress errors if they are not otherwise used. -var _ emptypb.Empty +var _ empty.Empty // Propose is a quorum call invoked on all nodes in configuration c, // with the same argument in, and returns a combined result. @@ -184,7 +184,7 @@ func (c *Configuration) Propose(ctx context.Context, in *Proposal, opts ...gorum } // Reference imports to suppress errors if they are not otherwise used. -var _ emptypb.Empty +var _ empty.Empty // Timeout is a quorum call invoked on all nodes in configuration c, // with the same argument in, and returns a combined result. @@ -275,7 +275,7 @@ type internalBlock struct { } // Reference imports to suppress errors if they are not otherwise used. -var _ emptypb.Empty +var _ empty.Empty // Vote is a quorum call invoked on all nodes in configuration c, // with the same argument in, and returns a combined result. @@ -289,7 +289,7 @@ func (n *Node) Vote(ctx context.Context, in *PartialCert, opts ...gorums.CallOpt } // Reference imports to suppress errors if they are not otherwise used. -var _ emptypb.Empty +var _ empty.Empty // NewView is a quorum call invoked on all nodes in configuration c, // with the same argument in, and returns a combined result. diff --git a/server/clientio.go b/server/clientio.go index 8f88871e6..710fe6d8e 100644 --- a/server/clientio.go +++ b/server/clientio.go @@ -7,14 +7,121 @@ import ( "sync" "github.com/relab/gorums" + "github.com/relab/hotstuff" "github.com/relab/hotstuff/core/eventloop" "github.com/relab/hotstuff/core/logging" "github.com/relab/hotstuff/internal/proto/clientpb" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/emptypb" ) +// clientStatusWindow tracks command statuses for a single client using an array-based approach. +// This is more memory-efficient than nested maps, especially for increasing sequence numbers. +type clientStatusWindow struct { + baseSeqNum uint64 // starting sequence number for the current window + statuses []hotstuff.CommandStatus // array indexed by (seqNum - baseSeqNum) +} + +// CommandStatusTracker efficiently tracks command status per client per sequence number. +// Uses array-based storage per client to avoid the overhead of nested maps. +// Supports sliding window cleanup to prevent unbounded memory growth. +type CommandStatusTracker struct { + // clientWindows maps ClientID -> client status window + clientWindows map[uint32]*clientStatusWindow +} + +// NewCommandStatusTracker creates a new status tracker +func NewCommandStatusTracker() *CommandStatusTracker { + return &CommandStatusTracker{ + clientWindows: make(map[uint32]*clientStatusWindow), + } +} + +// SetStatus sets the status of a command +func (cst *CommandStatusTracker) SetStatus(clientID uint32, seqNum uint64, status hotstuff.CommandStatus) { + window, exists := cst.clientWindows[clientID] + if !exists { + // Initialize new window for this client, starting at this sequence number + window = &clientStatusWindow{ + baseSeqNum: seqNum, + statuses: make([]hotstuff.CommandStatus, 5000), + } + cst.clientWindows[clientID] = window + window.statuses[0] = status + return + } + + // Check if seqNum is within current window + if seqNum >= window.baseSeqNum { + index := seqNum - window.baseSeqNum + // Extend array if necessary + if index >= uint64(len(window.statuses)) { + // Grow array by 50% or enough to fit the new index, whichever is larger + newLen := len(window.statuses) + len(window.statuses)/2 + 1 + if int(index) >= newLen { + newLen = int(index) + 1 + } + newStatuses := make([]hotstuff.CommandStatus, newLen) + copy(newStatuses, window.statuses) + window.statuses = newStatuses + } + window.statuses[index] = status + } + // Ignore updates for seqNum < baseSeqNum (already cleaned up) +} + +// GetStatus retrieves the status of a command. Returns StatusExecuted if not found. +func (cst *CommandStatusTracker) GetStatus(clientID uint32, seqNum uint64) hotstuff.CommandStatus { + window, exists := cst.clientWindows[clientID] + if !exists { + return hotstuff.EXECUTED + } + + // Check if seqNum is within current window + if seqNum >= window.baseSeqNum && seqNum < window.baseSeqNum+uint64(len(window.statuses)) { + index := seqNum - window.baseSeqNum + return window.statuses[index] + } + + // If outside window (cleaned up or not yet added), assume executed + return hotstuff.EXECUTED +} + +// Cleanup removes entries for sequence numbers less than or equal to the given threshold per client. +// This prevents unbounded memory growth by sliding the window forward. +func (cst *CommandStatusTracker) Cleanup(clientID uint32, upToSeqNum uint64) { + window, exists := cst.clientWindows[clientID] + if !exists { + return + } + + // Calculate how many entries to remove from the front + if upToSeqNum >= window.baseSeqNum { + entriesToRemove := int(upToSeqNum - window.baseSeqNum + 1) + if entriesToRemove >= len(window.statuses) { + // Remove entire window, clean up the client entry + delete(cst.clientWindows, clientID) + return + } + + // Slide the window forward + window.statuses = window.statuses[entriesToRemove:] + window.baseSeqNum = upToSeqNum + 1 + } +} + +// GetClientStatuses returns a snapshot of all statuses for a given client (for testing/debugging) +func (cst *CommandStatusTracker) GetClientStatuses(clientID uint32) map[uint64]hotstuff.CommandStatus { + window, exists := cst.clientWindows[clientID] + if !exists { + return make(map[uint64]hotstuff.CommandStatus) + } + + snapshot := make(map[uint64]hotstuff.CommandStatus, len(window.statuses)) + for i, status := range window.statuses { + snapshot[window.baseSeqNum+uint64(i)] = status + } + return snapshot +} + // ClientIO serves a client. type ClientIO struct { logger logging.Logger @@ -26,7 +133,9 @@ type ClientIO struct { hash hash.Hash cmdCount uint32 - lastExecutedSeqNum map[uint32]uint64 // highest executed sequence number per client ID + lastExecutedSeqNum map[uint32]uint64 // highest executed sequence number per client ID + statusTracker *CommandStatusTracker // tracks status of all commands (executed/aborted/failed) + } // NewClientIO returns a new client IO server. @@ -40,10 +149,10 @@ func NewClientIO( logger: logger, cmdCache: cmdCache, - awaitingCmds: make(map[clientpb.MessageID]chan<- error), srv: gorums.NewServer(srvOpts...), hash: sha256.New(), lastExecutedSeqNum: make(map[uint32]uint64), + statusTracker: NewCommandStatusTracker(), } clientpb.RegisterClientServer(srv.srv, srv) eventloop.Register(el, func(event clientpb.ExecuteEvent) { @@ -76,35 +185,26 @@ func (srv *ClientIO) CmdCount() uint32 { return srv.cmdCount } -func (srv *ClientIO) ExecCommand(ctx gorums.ServerCtx, cmd *clientpb.Command) (*emptypb.Empty, error) { - id := cmd.ID() - errChan := make(chan error) - - srv.mut.Lock() - srv.awaitingCmds[id] = errChan - srv.mut.Unlock() - +func (srv *ClientIO) ExecCommand(ctx gorums.ServerCtx, cmd *clientpb.Command) { srv.cmdCache.Add(cmd) + srv.statusTracker.SetStatus(cmd.ClientID, cmd.SequenceNumber, hotstuff.UNKNOWN) ctx.Release() - err := <-errChan - return &emptypb.Empty{}, err } func (srv *ClientIO) Exec(batch *clientpb.Batch) { for _, cmd := range batch.GetCommands() { - id := cmd.ID() srv.mut.Lock() if srv.isDuplicate(cmd) { srv.logger.Info("duplicate command found") - srv.completeCommand(id, status.Error(codes.Aborted, "command already executed")) srv.mut.Unlock() continue } srv.lastExecutedSeqNum[cmd.ClientID] = cmd.SequenceNumber + // Mark command as executed in status tracker + srv.statusTracker.SetStatus(cmd.ClientID, cmd.SequenceNumber, hotstuff.EXECUTED) _, _ = srv.hash.Write(cmd.Data) srv.cmdCount++ - srv.completeCommand(id, nil) srv.mut.Unlock() } srv.logger.Debugf("Hash: %.8x", srv.hash.Sum(nil)) @@ -113,7 +213,8 @@ func (srv *ClientIO) Exec(batch *clientpb.Batch) { func (srv *ClientIO) Abort(batch *clientpb.Batch) { for _, cmd := range batch.GetCommands() { srv.mut.Lock() - srv.completeCommand(cmd.ID(), status.Error(codes.Aborted, "blockchain was forked")) + // Mark command as aborted in status tracker + srv.statusTracker.SetStatus(cmd.ClientID, cmd.SequenceNumber, hotstuff.FAILED) srv.mut.Unlock() } } @@ -125,11 +226,21 @@ func (srv *ClientIO) isDuplicate(cmd *clientpb.Command) bool { return ok && seqNum >= cmd.SequenceNumber } -// completeCommand sends an error or nil to the awaiting client's error channel. -// The caller must hold srv.mut.Lock(). -func (srv *ClientIO) completeCommand(id clientpb.MessageID, err error) { - if errChan, ok := srv.awaitingCmds[id]; ok { - errChan <- err - delete(srv.awaitingCmds, id) - } +// CleanupOldStatuses removes command status entries that are older than the given sequence number +// for a specific client. This should be called periodically to prevent unbounded memory growth. +func (srv *ClientIO) CleanupOldStatuses(clientID uint32, upToSeqNum uint64) { + srv.mut.Lock() + defer srv.mut.Unlock() + srv.statusTracker.Cleanup(clientID, upToSeqNum) +} + +func (srv *ClientIO) CommandStatus(ctx gorums.ServerCtx, in *clientpb.Command) (resp *clientpb.CommandStatusResponse, err error) { + srv.mut.Lock() + defer srv.mut.Unlock() + status := srv.statusTracker.GetStatus(in.ClientID, in.SequenceNumber) + srv.logger.Infof("Received CommandStatus request (client: %d, sequence: %d, status: %d)", in.ClientID, in.SequenceNumber, status) + return &clientpb.CommandStatusResponse{ + Status: clientpb.CommandStatusResponse_Status(status), + Command: in, + }, nil } diff --git a/types.go b/types.go index e3d78fab8..bae2baa35 100644 --- a/types.go +++ b/types.go @@ -373,3 +373,13 @@ type ReplicaInfo struct { Location string Metadata map[string]string } + +type CommandStatus uint8 + +const ( + UNKNOWN CommandStatus = iota + PENDING + COMMITTED + EXECUTED + FAILED +) From c366090211aa14db42749bd4cc1e4e2cfb53cc91 Mon Sep 17 00:00:00 2001 From: hanish520 Date: Mon, 12 Jan 2026 23:42:08 -0800 Subject: [PATCH 2/4] changed the commandstatus to quorum call --- client/client.go | 40 +++++++++++++------- internal/proto/clientpb/client.pb.go | 6 +-- internal/proto/clientpb/client.proto | 1 + internal/proto/clientpb/client_gorums.pb.go | 42 ++++++++++++++------- 4 files changed, 60 insertions(+), 29 deletions(-) diff --git a/client/client.go b/client/client.go index 7ff24768a..750968c00 100644 --- a/client/client.go +++ b/client/client.go @@ -9,7 +9,6 @@ import ( "errors" "io" "math" - "math/rand" "strings" "sync" "time" @@ -23,7 +22,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/protobuf/types/known/emptypb" ) // ID is the identifier for a client. @@ -33,11 +31,32 @@ type qspec struct { faulty int } -func (q *qspec) ExecCommandQF(_ *clientpb.Command, signatures map[uint32]*emptypb.Empty) (*emptypb.Empty, bool) { - if len(signatures) < q.faulty+1 { +func (q *qspec) CommandStatusQF(command *clientpb.Command, replies map[uint32]*clientpb.CommandStatusResponse) (*clientpb.CommandStatusResponse, bool) { + if len(replies) < q.faulty+1 { return nil, false } - return &emptypb.Empty{}, true + responseCount := make([]int, 4) // assuming 4 possible statuses + + for _, resp := range replies { + if resp != nil { + status := resp.Status + if int(status) >= 0 && int(status) < len(responseCount) { + responseCount[int(status)]++ + } + } + } + for status, count := range responseCount { + if count >= q.faulty+1 { + return &clientpb.CommandStatusResponse{ + Command: command, + Status: clientpb.CommandStatusResponse_Status(status), + }, true + } + } + return &clientpb.CommandStatusResponse{ + Command: command, + Status: clientpb.CommandStatusResponse_PENDING, + }, false } type pendingCmd struct { @@ -126,6 +145,7 @@ func (c *Client) Connect(replicas []hotstuff.ReplicaInfo) (err error) { } c.gorumsConfig, err = c.mgr.NewConfiguration(&qspec{faulty: hotstuff.NumFaulty(len(replicas))}, gorums.WithNodeMap(nodes)) if err != nil { + c.logger.Error("unable to create the configuration in client") c.mgr.Close() return err } @@ -273,14 +293,8 @@ func (c *Client) fetchCommandStatus(sequenceNumber uint64) hotstuff.CommandStatu ClientID: uint32(c.id), SequenceNumber: sequenceNumber, } - nodes := c.gorumsConfig.Nodes() - if len(nodes) == 0 { - c.logger.Error("No nodes available in gorums config") - cancel() - continue - } - node := nodes[rand.Intn(len(nodes))] - response, err := node.CommandStatus(ctx, cmd) + + response, err := c.gorumsConfig.CommandStatus(ctx, cmd) cancel() if err != nil { diff --git a/internal/proto/clientpb/client.pb.go b/internal/proto/clientpb/client.pb.go index 0331d0fd1..1659e6f21 100644 --- a/internal/proto/clientpb/client.pb.go +++ b/internal/proto/clientpb/client.pb.go @@ -257,10 +257,10 @@ const file_internal_proto_clientpb_client_proto_rawDesc = "" + "\x0eSequenceNumber\x18\x02 \x01(\x04R\x0eSequenceNumber\x12\x12\n" + "\x04Data\x18\x03 \x01(\fR\x04Data\"6\n" + "\x05Batch\x12-\n" + - "\bCommands\x18\x01 \x03(\v2\x11.clientpb.CommandR\bCommands2\x8f\x01\n" + + "\bCommands\x18\x01 \x03(\v2\x11.clientpb.CommandR\bCommands2\x93\x01\n" + "\x06Client\x12>\n" + - "\vExecCommand\x12\x11.clientpb.Command\x1a\x16.google.protobuf.Empty\"\x04\x98\xb5\x18\x01\x12E\n" + - "\rCommandStatus\x12\x11.clientpb.Command\x1a\x1f.clientpb.CommandStatusResponse\"\x00B3Z1github.com/relab/hotstuff/internal/proto/clientpbb\x06proto3" + "\vExecCommand\x12\x11.clientpb.Command\x1a\x16.google.protobuf.Empty\"\x04\x98\xb5\x18\x01\x12I\n" + + "\rCommandStatus\x12\x11.clientpb.Command\x1a\x1f.clientpb.CommandStatusResponse\"\x04\xa0\xb5\x18\x01B3Z1github.com/relab/hotstuff/internal/proto/clientpbb\x06proto3" var ( file_internal_proto_clientpb_client_proto_rawDescOnce sync.Once diff --git a/internal/proto/clientpb/client.proto b/internal/proto/clientpb/client.proto index 59ed1e90a..6b3258e68 100644 --- a/internal/proto/clientpb/client.proto +++ b/internal/proto/clientpb/client.proto @@ -16,6 +16,7 @@ service Client { } rpc CommandStatus(Command) returns (CommandStatusResponse) { + option (gorums.quorumcall) = true; } } diff --git a/internal/proto/clientpb/client_gorums.pb.go b/internal/proto/clientpb/client_gorums.pb.go index 97b4610f5..0355ce7c2 100644 --- a/internal/proto/clientpb/client_gorums.pb.go +++ b/internal/proto/clientpb/client_gorums.pb.go @@ -12,6 +12,7 @@ import ( empty "github.com/golang/protobuf/ptypes/empty" gorums "github.com/relab/gorums" encoding "google.golang.org/grpc/encoding" + proto "google.golang.org/protobuf/proto" ) const ( @@ -152,18 +153,11 @@ type Node struct { // ClientClient is the client interface for the Client service. type ClientClient interface { ExecCommand(ctx context.Context, in *Command, opts ...gorums.CallOption) -} - -// enforce interface compliance -var _ ClientClient = (*Configuration)(nil) - -// ClientNodeClient is the single node client interface for the Client service. -type ClientNodeClient interface { CommandStatus(ctx context.Context, in *Command) (resp *CommandStatusResponse, err error) } // enforce interface compliance -var _ ClientNodeClient = (*Node)(nil) +var _ ClientClient = (*Configuration)(nil) // Reference imports to suppress errors if they are not otherwise used. var _ empty.Empty @@ -179,18 +173,34 @@ func (c *Configuration) ExecCommand(ctx context.Context, in *Command, opts ...go c.RawConfiguration.Multicast(ctx, cd, opts...) } -// There are no quorum calls. -type QuorumSpec interface{} +// QuorumSpec is the interface of quorum functions for Client. +type QuorumSpec interface { + gorums.ConfigOption + + // CommandStatusQF is the quorum function for the CommandStatus + // quorum call method. The in parameter is the request object + // supplied to the CommandStatus method at call time, and may or may not + // be used by the quorum function. If the in parameter is not needed + // you should implement your quorum function with '_ *Command'. + CommandStatusQF(in *Command, replies map[uint32]*CommandStatusResponse) (*CommandStatusResponse, bool) +} // CommandStatus is a quorum call invoked on all nodes in configuration c, // with the same argument in, and returns a combined result. -func (n *Node) CommandStatus(ctx context.Context, in *Command) (resp *CommandStatusResponse, err error) { - cd := gorums.CallData{ +func (c *Configuration) CommandStatus(ctx context.Context, in *Command) (resp *CommandStatusResponse, err error) { + cd := gorums.QuorumCallData{ Message: in, Method: "clientpb.Client.CommandStatus", } + cd.QuorumFunction = func(req proto.Message, replies map[uint32]proto.Message) (proto.Message, bool) { + r := make(map[uint32]*CommandStatusResponse, len(replies)) + for k, v := range replies { + r[k] = v.(*CommandStatusResponse) + } + return c.qspec.CommandStatusQF(req.(*Command), r) + } - res, err := n.RawNode.RPCCall(ctx, cd) + res, err := c.RawConfiguration.QuorumCall(ctx, cd) if err != nil { return nil, err } @@ -215,3 +225,9 @@ func RegisterClientServer(srv *gorums.Server, impl ClientServer) { return gorums.NewResponseMessage(in.GetMetadata(), resp), err }) } + +type internalCommandStatusResponse struct { + nid uint32 + reply *CommandStatusResponse + err error +} From 9accf72b6013dc74eb6a7b761b23030a2ed95713 Mon Sep 17 00:00:00 2001 From: hanish520 Date: Mon, 12 Jan 2026 23:57:28 -0800 Subject: [PATCH 3/4] missing go.mod changes --- go.mod | 1 + 1 file changed, 1 insertion(+) diff --git a/go.mod b/go.mod index 10673fd89..8fe39ef63 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25.4 require ( cuelang.org/go v0.15.0 github.com/felixge/fgprof v0.9.5 + github.com/golang/protobuf v1.5.4 github.com/google/go-cmp v0.7.0 github.com/kilic/bls12-381 v0.1.1-0.20210208205449-6045b0235e36 github.com/mroth/weightedrand v1.0.0 From 7d875b5c66648666efa9d9f5c5d1380d214bbedf Mon Sep 17 00:00:00 2001 From: hanish520 Date: Thu, 15 Jan 2026 23:36:44 -0800 Subject: [PATCH 4/4] added tests for command status tracker --- server/clientio.go | 11 ++-- server/clientio_test.go | 125 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+), 6 deletions(-) create mode 100644 server/clientio_test.go diff --git a/server/clientio.go b/server/clientio.go index 710fe6d8e..113872297 100644 --- a/server/clientio.go +++ b/server/clientio.go @@ -127,11 +127,10 @@ type ClientIO struct { logger logging.Logger cmdCache *clientpb.CommandCache - mut sync.Mutex - srv *gorums.Server - awaitingCmds map[clientpb.MessageID]chan<- error - hash hash.Hash - cmdCount uint32 + mut sync.Mutex + srv *gorums.Server + hash hash.Hash + cmdCount uint32 lastExecutedSeqNum map[uint32]uint64 // highest executed sequence number per client ID statusTracker *CommandStatusTracker // tracks status of all commands (executed/aborted/failed) @@ -234,7 +233,7 @@ func (srv *ClientIO) CleanupOldStatuses(clientID uint32, upToSeqNum uint64) { srv.statusTracker.Cleanup(clientID, upToSeqNum) } -func (srv *ClientIO) CommandStatus(ctx gorums.ServerCtx, in *clientpb.Command) (resp *clientpb.CommandStatusResponse, err error) { +func (srv *ClientIO) CommandStatus(_ gorums.ServerCtx, in *clientpb.Command) (resp *clientpb.CommandStatusResponse, err error) { srv.mut.Lock() defer srv.mut.Unlock() status := srv.statusTracker.GetStatus(in.ClientID, in.SequenceNumber) diff --git a/server/clientio_test.go b/server/clientio_test.go new file mode 100644 index 000000000..1b9254409 --- /dev/null +++ b/server/clientio_test.go @@ -0,0 +1,125 @@ +package server + +import ( + "testing" + + "github.com/relab/hotstuff" +) + +func TestCommandStatusTracker_GetStatus_DefaultExecuted(t *testing.T) { + tr := NewCommandStatusTracker() + status := tr.GetStatus(1, 1) + if status != hotstuff.EXECUTED { + t.Fatalf("expected default status EXECUTED for unknown client, got %v", status) + } +} + +func TestCommandStatusTracker_SetAndGet(t *testing.T) { + tr := NewCommandStatusTracker() + clientID := uint32(1) + seq := uint64(10) + + tr.SetStatus(clientID, seq, hotstuff.UNKNOWN) + if got := tr.GetStatus(clientID, seq); got != hotstuff.UNKNOWN { + t.Fatalf("GetStatus = %v; want %v", got, hotstuff.UNKNOWN) + } +} + +func TestCommandStatusTracker_ExtendWindow(t *testing.T) { + tr := NewCommandStatusTracker() + clientID := uint32(2) + + // initial set at seq 1 + tr.SetStatus(clientID, 1, hotstuff.UNKNOWN) + if got := tr.GetStatus(clientID, 1); got != hotstuff.UNKNOWN { + t.Fatalf("initial GetStatus = %v; want %v", got, hotstuff.UNKNOWN) + } + + // set far ahead to force window growth + largeSeq := uint64(6000) + tr.SetStatus(clientID, largeSeq, hotstuff.FAILED) + if got := tr.GetStatus(clientID, largeSeq); got != hotstuff.FAILED { + t.Fatalf("after extend GetStatus(%d) = %v; want %v", largeSeq, got, hotstuff.FAILED) + } + + // earlier entry should still be present + if got := tr.GetStatus(clientID, 1); got != hotstuff.UNKNOWN { + t.Fatalf("after extend GetStatus(1) = %v; want %v", got, hotstuff.UNKNOWN) + } +} + +func TestCommandStatusTracker_CleanupSlidesAndDeletes(t *testing.T) { + tr := NewCommandStatusTracker() + clientID := uint32(3) + + // set statuses for seqs 1..10 + for i := uint64(1); i <= 10; i++ { + tr.SetStatus(clientID, i, hotstuff.UNKNOWN) + } + + // cleanup up to 4 -> base should become 5, entries 1..4 removed + tr.Cleanup(clientID, 4) + // seq 3 should now be treated as executed (cleaned up) + if got := tr.GetStatus(clientID, 3); got != hotstuff.EXECUTED { + t.Fatalf("after cleanup GetStatus(3) = %v; want EXECUTED", got) + } + // seq 5 should still be UNKNOWN + if got := tr.GetStatus(clientID, 5); got != hotstuff.UNKNOWN { + t.Fatalf("after cleanup GetStatus(5) = %v; want %v", got, hotstuff.UNKNOWN) + } + + // cleanup up to 10 -> should remove entire window + tr.Cleanup(clientID, 10) + if got := tr.GetStatus(clientID, 5); got != hotstuff.EXECUTED { + t.Fatalf("after full cleanup GetStatus(5) = %v; want EXECUTED", got) + } + // snapshot should be empty for this client + // snapshot should not contain cleaned-up seqs 1..10 + if m := tr.GetClientStatuses(clientID); len(m) != 0 { + for i := uint64(1); i <= 10; i++ { + if _, ok := m[i]; ok { + t.Fatalf("snapshot contains cleaned-up seq %d", i) + } + } + // It's an implementation detail whether the snapshot is empty or contains + // preallocated entries for sequences > cleanup point (which is why len(m) + // may be 4990). We only assert here that cleaned-up sequences are gone. + } +} + +func TestCommandStatusTracker_IgnoreOldSetStatus(t *testing.T) { + tr := NewCommandStatusTracker() + clientID := uint32(4) + + // create window at seq 100 + tr.SetStatus(clientID, 100, hotstuff.UNKNOWN) + // attempt to set older seq 90 -> should be ignored + tr.SetStatus(clientID, 90, hotstuff.FAILED) + if got := tr.GetStatus(clientID, 90); got != hotstuff.EXECUTED { + t.Fatalf("old SetStatus should be ignored; GetStatus(90) = %v; want EXECUTED", got) + } + // newer seq should still be present + if got := tr.GetStatus(clientID, 100); got != hotstuff.UNKNOWN { + t.Fatalf("GetStatus(100) = %v; want %v", got, hotstuff.UNKNOWN) + } +} + +func TestCommandStatusTracker_GetClientStatusesSnapshot(t *testing.T) { + tr := NewCommandStatusTracker() + clientID := uint32(5) + + tr.SetStatus(clientID, 50, hotstuff.UNKNOWN) + tr.SetStatus(clientID, 52, hotstuff.FAILED) + + snap := tr.GetClientStatuses(clientID) + if snap[50] != hotstuff.UNKNOWN { + t.Fatalf("snapshot[50] = %v; want %v", snap[50], hotstuff.UNKNOWN) + } + if snap[52] != hotstuff.FAILED { + t.Fatalf("snapshot[52] = %v; want %v", snap[52], hotstuff.FAILED) + } + // a sequence not set but within window will have zero value; ensure an unrelated seq returns EXECUTED via GetStatus + if got := tr.GetStatus(clientID, 49); got != hotstuff.EXECUTED { + t.Fatalf("GetStatus(49) = %v; want EXECUTED", got) + } +}