diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java index 5957ce86796ac..0b15837eae777 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java @@ -64,6 +64,7 @@ public class Backoff { private Duration next; @Getter private Instant firstBackoffTime; + private boolean firstBackoffTimeSet; @Getter private boolean mandatoryStopMade; @@ -74,6 +75,7 @@ private Backoff(Duration initial, Duration max, Duration mandatoryStop, Clock cl this.next = initial; this.clock = clock; this.firstBackoffTime = Instant.EPOCH; + this.firstBackoffTimeSet = false; if (initial.isZero() && max.isZero() && mandatoryStop.isZero()) { this.mandatoryStopMade = true; } @@ -117,8 +119,9 @@ public Duration next() { if (!mandatoryStopMade) { Instant now = clock.instant(); Duration timeElapsedSinceFirstBackoff = Duration.ZERO; - if (initial.equals(current)) { + if (!firstBackoffTimeSet) { firstBackoffTime = now; + firstBackoffTimeSet = true; } else { timeElapsedSinceFirstBackoff = Duration.between(firstBackoffTime, now); } @@ -156,6 +159,8 @@ public void reduceToHalf() { */ public void reset() { this.next = this.initial; + this.firstBackoffTime = Instant.EPOCH; + this.firstBackoffTimeSet = false; this.mandatoryStopMade = initial.isZero() && max.isZero() && mandatoryStop.isZero(); } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java index 152ddee5156d0..2a498441a8c9b 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java @@ -76,6 +76,99 @@ public void firstBackoffTimerTest() { assertEquals(diffBackOffTime, 300); } + @Test + public void mandatoryStopShouldWorkWithConstantBackoff() { + Clock mockClock = Mockito.mock(Clock.class); + Mockito.when(mockClock.instant()) + .thenReturn(Instant.ofEpochMilli(0)) + .thenReturn(Instant.ofEpochMilli(400)); + + Backoff backoff = Backoff.builder() + .initialDelay(Duration.ofMillis(100)) + .maxBackoff(Duration.ofMillis(100)) + .mandatoryStop(Duration.ofMillis(300)) + .clock(mockClock) + .build(); + + // first call starts the mandatory-stop timer + backoff.next(); + Instant firstBackoffTime = backoff.getFirstBackoffTime(); + + // elapsed wall-clock time should exceed mandatory stop + backoff.next(); + assertEquals(backoff.getFirstBackoffTime(), firstBackoffTime, + "firstBackoffTime should not be updated after the first call to next()"); + assertTrue(backoff.isMandatoryStopMade(), + "mandatory stop should be reached even when initial == max (constant backoff)"); + } + + @Test + public void reduceToHalfShouldNotResetMandatoryStopTimer() { + Clock mockClock = Mockito.mock(Clock.class); + Mockito.when(mockClock.instant()) + .thenReturn(Instant.ofEpochMilli(0)) + .thenReturn(Instant.ofEpochMilli(200)); + + Backoff backoff = Backoff.builder() + .initialDelay(Duration.ofMillis(100)) + .maxBackoff(Duration.ofMillis(400)) + .mandatoryStop(Duration.ofMillis(250)) + .clock(mockClock) + .build(); + + // first call starts the mandatory-stop timer (next becomes 200) + backoff.next(); + Instant firstBackoffTime = backoff.getFirstBackoffTime(); + + // This can bring the next delay back to initial, but it should not reset firstBackoffTime. + backoff.reduceToHalf(); + + backoff.next(); + assertEquals(backoff.getFirstBackoffTime(), firstBackoffTime, + "reduceToHalf should not reset firstBackoffTime"); + assertTrue(backoff.isMandatoryStopMade(), + "mandatory stop should be reached based on wall-clock time since the first call to next()"); + } + + @Test + public void resetShouldStartNewMandatoryStopCycle() { + Clock mockClock = Mockito.mock(Clock.class); + Mockito.when(mockClock.instant()) + .thenReturn(Instant.ofEpochMilli(0)) + .thenReturn(Instant.ofEpochMilli(400)) + .thenReturn(Instant.ofEpochMilli(1000)) + .thenReturn(Instant.ofEpochMilli(1400)); + + Backoff backoff = Backoff.builder() + .initialDelay(Duration.ofMillis(100)) + .maxBackoff(Duration.ofMillis(100)) + .mandatoryStop(Duration.ofMillis(300)) + .clock(mockClock) + .build(); + + // Cycle 1: reach mandatory stop + backoff.next(); + Instant firstCycleStart = backoff.getFirstBackoffTime(); + backoff.next(); + assertTrue(backoff.isMandatoryStopMade(), "mandatory stop should be reached in cycle-1"); + + // Reset should clear mandatory-stop state so a new cycle can start. + backoff.reset(); + assertFalse(backoff.isMandatoryStopMade(), "reset should clear mandatoryStopMade"); + assertEquals(backoff.getFirstBackoffTime(), Instant.EPOCH, "reset should clear firstBackoffTime"); + + // Cycle 2: should start timing again and be able to reach mandatory stop again. + backoff.next(); + Instant secondCycleStart = backoff.getFirstBackoffTime(); + assertEquals(secondCycleStart, Instant.ofEpochMilli(1000), + "reset should start a new mandatory-stop timing window"); + assertEquals(Duration.between(firstCycleStart, secondCycleStart).toMillis(), 1000, + "reset should not reuse the old timing window"); + + backoff.next(); + assertTrue(backoff.isMandatoryStopMade(), "mandatory stop should be reached in cycle-2"); + } + @Test public void basicTest() { Clock mockClock = Clock.fixed(Instant.EPOCH, ZoneId.systemDefault());