Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 81 additions & 18 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"io"
"math"
"strings"
"sync"
"time"

Expand All @@ -21,7 +22,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"
)

// ID is the identifier for a client.
Expand All @@ -31,17 +31,37 @@ type qspec struct {
faulty int
}

func (q *qspec) ExecCommandQF(_ *clientpb.Command, signatures map[uint32]*emptypb.Empty) (*emptypb.Empty, bool) {
if len(signatures) < q.faulty+1 {
func (q *qspec) CommandStatusQF(command *clientpb.Command, replies map[uint32]*clientpb.CommandStatusResponse) (*clientpb.CommandStatusResponse, bool) {
if len(replies) < q.faulty+1 {
return nil, false
}
return &emptypb.Empty{}, true
responseCount := make([]int, 4) // assuming 4 possible statuses

for _, resp := range replies {
if resp != nil {
status := resp.Status
if int(status) >= 0 && int(status) < len(responseCount) {
responseCount[int(status)]++
}
}
}
for status, count := range responseCount {
if count >= q.faulty+1 {
return &clientpb.CommandStatusResponse{
Command: command,
Status: clientpb.CommandStatusResponse_Status(status),
}, true
}
}
return &clientpb.CommandStatusResponse{
Command: command,
Status: clientpb.CommandStatusResponse_PENDING,
}, false
}

type pendingCmd struct {
sequenceNumber uint64
sendTime time.Time
promise *clientpb.AsyncEmpty
cancelCtx context.CancelFunc
}

Expand Down Expand Up @@ -125,6 +145,7 @@ func (c *Client) Connect(replicas []hotstuff.ReplicaInfo) (err error) {
}
c.gorumsConfig, err = c.mgr.NewConfiguration(&qspec{faulty: hotstuff.NumFaulty(len(replicas))}, gorums.WithNodeMap(nodes))
if err != nil {
c.logger.Error("unable to create the configuration in client")
c.mgr.Close()
return err
}
Expand Down Expand Up @@ -182,6 +203,8 @@ func (c *Client) Stop() {
}

func (c *Client) close() {
// Signal the command handler to stop fetching statuses before closing the manager.
close(c.pendingCmds)
c.mgr.Close()
err := c.reader.Close()
if err != nil {
Expand Down Expand Up @@ -238,8 +261,8 @@ loop:
}

ctx, cancel := context.WithTimeout(ctx, c.timeout)
promise := c.gorumsConfig.ExecCommand(ctx, cmd)
pending := pendingCmd{sequenceNumber: num, sendTime: time.Now(), promise: promise, cancelCtx: cancel}
c.gorumsConfig.ExecCommand(ctx, cmd)
pending := pendingCmd{sequenceNumber: num, sendTime: time.Now(), cancelCtx: cancel}

num++
select {
Expand All @@ -257,6 +280,46 @@ loop:
return nil
}

func (c *Client) fetchCommandStatus(sequenceNumber uint64) hotstuff.CommandStatus {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
timeout := time.After(c.timeout)

for {
select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
cmd := &clientpb.Command{
ClientID: uint32(c.id),
SequenceNumber: sequenceNumber,
}

response, err := c.gorumsConfig.CommandStatus(ctx, cmd)
cancel()

if err != nil {
c.logger.Errorf("Failed to fetch command status (client: %d, sequence: %d): %v", c.id, sequenceNumber, err)
// If the node/manager was closed, stop trying and return UNKNOWN.
if strings.Contains(err.Error(), "node closed") {
return hotstuff.UNKNOWN
}
continue
}
if response == nil || response.Command == nil {
c.logger.Errorf("Invalid response received when fetching command status (client: %d, sequence: %d)", c.id, sequenceNumber)
continue
}
c.logger.Infof("Fetched command status (client: %d, sequence: %d, status: %d)", c.id, sequenceNumber, response.Status)
status := hotstuff.CommandStatus(response.Status)
if status == hotstuff.COMMITTED || status == hotstuff.EXECUTED || status == hotstuff.FAILED {
return status
}
case <-timeout:
return hotstuff.UNKNOWN
}
}
}

// handleCommands will get pending commands from the pendingCmds channel and then
// handle them as they become acknowledged by the replicas. We expect the commands to be
// acknowledged in the order that they were sent.
Expand All @@ -274,24 +337,24 @@ func (c *Client) handleCommands(ctx context.Context) (executed, failed, timeout
case <-ctx.Done():
return
}
_, err := cmd.promise.Get()
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
c.logger.Debug("Command timed out.")
timeout++
} else if !errors.Is(err, context.Canceled) {
c.logger.Debugf("Did not get enough replies for command: %v\n", err)
failed++
}
} else {
response := c.fetchCommandStatus(cmd.sequenceNumber)

switch response {
case hotstuff.UNKNOWN:
c.logger.Infof("Command timed out (client: %d, sequence: %d)", c.id, cmd.sequenceNumber)
timeout++
case hotstuff.FAILED:
c.logger.Infof("Command failed (client: %d, sequence: %d)", c.id, cmd.sequenceNumber)
failed++
default:
c.logger.Infof("Command executed (client: %d, sequence: %d)", c.id, cmd.sequenceNumber)
executed++
}
c.mut.Lock()
if cmd.sequenceNumber > c.highestCommitted {
c.highestCommitted = cmd.sequenceNumber
}
c.mut.Unlock()

duration := time.Since(cmd.sendTime)
c.eventLoop.AddEvent(LatencyMeasurementEvent{Latency: duration})
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.25.4
require (
cuelang.org/go v0.15.0
github.com/felixge/fgprof v0.9.5
github.com/golang/protobuf v1.5.4
github.com/google/go-cmp v0.7.0
github.com/kilic/bls12-381 v0.1.1-0.20210208205449-6045b0235e36
github.com/mroth/weightedrand v1.0.0
Expand Down
Loading