Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 29 additions & 38 deletions ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,15 +216,24 @@ func TestDoBatchWithOptionsContextCancellation(t *testing.T) {
)
cancelCause := errors.New("cancel cause")

measureDuration := func(r *Ring, keys []uint32) time.Duration {
callback := func(InstanceDesc, []int) error { return nil }
t0 := time.Now()
err := DoBatchWithOptions(context.Background(), Write, r, keys, callback, DoBatchOptions{})
duration := time.Since(t0)
require.NoError(t, err)
t.Logf("Call took %s", duration)
return duration
}
// Build ring and keys once; all subtests share them (read-only).
keys := make([]uint32, numKeys)
generateKeys(rand.New(rand.NewSource(0)), numKeys, keys)
desc := &Desc{Ingesters: generateRingInstances(NewRandomTokenGeneratorWithSeed(0), numInstances, numZones, numTokens)}
r := newRingForTesting(Config{
HeartbeatTimeout: time.Hour,
ZoneAwarenessEnabled: true,
SubringCacheDisabled: true,
ReplicationFactor: numZones,
}, true)
r.setRingStateFromDesc(desc, false, false, false)

// Measure once how long an uncancelled DoBatch takes; reuse for both deadline subtests.
warmupCallback := func(InstanceDesc, []int) error { return nil }
t0 := time.Now()
require.NoError(t, DoBatchWithOptions(context.Background(), Write, r, keys, warmupCallback, DoBatchOptions{}))
batchDuration := time.Since(t0)
t.Logf("Warmup DoBatch took %s", batchDuration)

type callbackFunc = func(InstanceDesc, []int) error
never := func(_ InstanceDesc, _ []int) error {
Expand All @@ -233,17 +242,15 @@ func TestDoBatchWithOptionsContextCancellation(t *testing.T) {
}
tests := []struct {
name string
setup func(*Ring, []uint32) (context.Context, callbackFunc)
setup func() (context.Context, callbackFunc)
expectedErr error
}{
{
name: "context deadline exceeded",
setup: func(r *Ring, keys []uint32) (context.Context, callbackFunc) {
duration := measureDuration(r, keys)

// Make a second call cancelling after a hundredth of duration of the first one.
// For a 4s first call, this is 40ms: should be enough for this test to not be flaky.
ctx, cancel := context.WithTimeout(context.Background(), duration/100)
setup: func() (context.Context, callbackFunc) {
// Cancel after a hundredth of the warmup duration.
// For a 4s warmup this is 40ms: enough to avoid flakiness.
ctx, cancel := context.WithTimeout(context.Background(), batchDuration/100)
go func() {
<-ctx.Done()
cancel()
Expand All @@ -254,12 +261,8 @@ func TestDoBatchWithOptionsContextCancellation(t *testing.T) {
},
{
name: "context deadline exceeded with cause",
setup: func(r *Ring, keys []uint32) (context.Context, callbackFunc) {
duration := measureDuration(r, keys)

// Make a second call cancelling after a hundredth of duration of the first one.
// For a 4s first call, this is 40ms: should be enough for this test to not be flaky.
ctx, cancel := context.WithTimeoutCause(context.Background(), duration/100, cancelCause)
setup: func() (context.Context, callbackFunc) {
ctx, cancel := context.WithTimeoutCause(context.Background(), batchDuration/100, cancelCause)
go func() {
<-ctx.Done()
cancel()
Expand All @@ -270,7 +273,7 @@ func TestDoBatchWithOptionsContextCancellation(t *testing.T) {
},
{
name: "context initially cancelled without cause",
setup: func(_ *Ring, _ []uint32) (context.Context, callbackFunc) {
setup: func() (context.Context, callbackFunc) {
ctx, cancelFunc := context.WithCancel(context.Background())
// start batch with cancelled context
cancelFunc()
Expand All @@ -280,7 +283,7 @@ func TestDoBatchWithOptionsContextCancellation(t *testing.T) {
},
{
name: "context initially cancelled with cause",
setup: func(_ *Ring, _ []uint32) (context.Context, callbackFunc) {
setup: func() (context.Context, callbackFunc) {
ctx, cancelFunc := context.WithCancelCause(context.Background())
// start batch with cancelled context
cancelFunc(cancelCause)
Expand All @@ -290,7 +293,7 @@ func TestDoBatchWithOptionsContextCancellation(t *testing.T) {
},
{
name: "context cancelled during batch processing",
setup: func(_ *Ring, _ []uint32) (context.Context, callbackFunc) {
setup: func() (context.Context, callbackFunc) {
ctx, cancel := context.WithCancelCause(context.Background())

wg := sync.WaitGroup{}
Expand All @@ -315,19 +318,7 @@ func TestDoBatchWithOptionsContextCancellation(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
keys := make([]uint32, numKeys)
generateKeys(rand.New(rand.NewSource(0)), numKeys, keys)

desc := &Desc{Ingesters: generateRingInstances(NewRandomTokenGeneratorWithSeed(0), numInstances, numZones, numTokens)}
r := newRingForTesting(Config{
HeartbeatTimeout: time.Hour,
ZoneAwarenessEnabled: true,
SubringCacheDisabled: true,
ReplicationFactor: numZones,
}, true)
r.setRingStateFromDesc(desc, false, false, false)

ctx, callback := tt.setup(r, keys)
ctx, callback := tt.setup()

wg := sync.WaitGroup{}
wg.Add(1)
Expand Down
4 changes: 2 additions & 2 deletions ring/spread_minimizing_token_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func TestSpreadMinimizingTokenGenerator_CalculateNewToken(t *testing.T) {
func TestSpreadMinimizingTokenGenerator_GenerateAllTokensIdempotent(t *testing.T) {
t.Parallel()

maxInstanceID := 128
maxInstanceID := 16
for instanceID := 0; instanceID < maxInstanceID; instanceID++ {
for _, zone := range zones {
instance := fmt.Sprintf("instance-%s-%d", zone, instanceID)
Expand Down Expand Up @@ -339,7 +339,7 @@ func TestSpreadMinimizingTokenGenerator_CheckTokenUniqueness(t *testing.T) {
t.Parallel()

tokensPerInstance := 512
instanceID := 10000
instanceID := 1000
allTokens := make(map[uint32]bool, tokensPerInstance*(instanceID+1)*len(zones))
for _, zone := range zones {
instance := fmt.Sprintf("instance-%s-%d", zone, instanceID)
Expand Down
2 changes: 1 addition & 1 deletion ring/token_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func testCheckingOfKeyOwnership(t *testing.T, randomizeInstanceStates bool) {
// Generate users with different number of tokens
userTokens := map[string][]uint32{}
shardSizes := map[string]int{}
for _, cnt := range []int{1000, 5000, 10000, 25000, 50000, 100000} {
for _, cnt := range []int{1000, 5000, 10000, 25000} {
uid := fmt.Sprintf("%dk", cnt/1000)
userTokens[uid] = gen.GenerateTokens(cnt, nil)

Expand Down
Loading