Skip to content

Commit 1cd29b0

Browse files
committed
fix: stop FDv2DataSource.Conditions from leaking on healthy primary
FDv2DataSource's run loop calls CompletableFuture.anyOf(conditions.getFuture(), synchronizer.next()).get() on every iteration. Before this change, getFuture() returned the same shared CompletableFuture<Object> instance to every caller. Each anyOf call attaches an OrRelay Completion node to the shared instance's stack; CompletableFuture has no deregister path for the loser of a race, so the OrRelay stays on the stack until the shared future completes. The shared future only completes when fallback or recovery fires. On a healthy primary streaming ChangeSets, fallback is never armed and recovery is suppressed (only-available-synchronizer / single-prime configurations). The future never completes; the stack grows monotonically for the synchronizer's full tenure -- effectively the SDK's uptime on a healthy server. Per-iteration cost ~200 B: an OrRelay Completion plus the anyOf result CompletableFuture plus the chain references back to the inputs. At 10 ChangeSets/sec sustained that is ~150 MB/day per active synchronizer. The fix: a single permanent whenComplete listener on the underlying aggregate fans out completion to every fresh future handed out by getFuture(). Pending fresh futures are tracked via WeakReference, so a fresh future whose only strong references were the caller's local variables (typical lifetime: one loop iteration) becomes garbage-collectable once that iteration ends. Pending entries whose referent has been collected are pruned opportunistically on each getFuture() call and on close(). Conditions is now package-private rather than private so the new direct unit tests can reach it. Adds a test-only pendingSize() helper. Verified bug-proving discipline: two of the new tests (getFutureReturnsDistinctInstancesPerCall, getFutureReturnsDistinctInstancesEvenWithNoConditions) fail on the pre-fix shared-instance behavior and pass after the fix. Full server-sdk test suite (1857 tests) is clean.
1 parent cac1568 commit 1cd29b0

2 files changed

Lines changed: 328 additions & 5 deletions

File tree

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java

Lines changed: 126 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@
1111
import com.launchdarkly.sdk.server.subsystems.DataSource;
1212
import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSinkV2;
1313

14+
import java.lang.ref.WeakReference;
1415
import java.util.ArrayList;
1516
import java.util.Collections;
1617
import java.util.Date;
18+
import java.util.Iterator;
1719
import java.util.List;
1820
import java.util.concurrent.*;
1921
import java.util.concurrent.atomic.AtomicBoolean;
@@ -591,21 +593,115 @@ private void maybeReportUnexpectedExhaustion(String message) {
591593

592594
/**
593595
* Helper class to manage the lifecycle of conditions with automatic cleanup.
596+
*
597+
* <p>{@link #getFuture()} returns a <em>fresh</em> {@link CompletableFuture}
598+
* per call rather than returning the same shared instance. This matters
599+
* because the run loop calls {@code CompletableFuture.anyOf(getFuture(),
600+
* synchronizerNext)} on every iteration: if {@code getFuture()} returned a
601+
* shared instance, each {@code anyOf} call would permanently attach an
602+
* {@code OrRelay} {@code Completion} to its {@code stack}. On a healthy
603+
* primary synchronizer that streams ChangeSets without ever arming the
604+
* fallback timer, the aggregate never completes, so those Completion nodes
605+
* accumulate monotonically for the synchronizer's full tenure -- a real
606+
* memory leak proportional to event rate.
607+
*
608+
* <p>The fix: a single permanent listener on the underlying aggregate fans
609+
* out completion to every fresh future handed out by {@link #getFuture()}.
610+
* Fresh futures are tracked via {@link WeakReference} on a pending list, so
611+
* a fresh future whose only strong references were in the caller's loop
612+
* iteration becomes garbage-collectable once that iteration ends. Pending
613+
* entries whose referent has been collected are pruned opportunistically on
614+
* subsequent {@code getFuture()} calls and on {@link #close()}.
615+
*
616+
* <p>Package-private (rather than private) so that direct unit tests can
617+
* exercise the API surface and assert per-call distinctness.
594618
*/
595-
private static class Conditions implements AutoCloseable {
619+
static class Conditions implements AutoCloseable {
596620
private final List<Condition> conditions;
597-
private final CompletableFuture<Object> conditionsFuture;
621+
private final CompletableFuture<Object> aggregate;
622+
private final Object lock = new Object();
623+
624+
/**
625+
* Holds the value the aggregate completed with, once it has completed.
626+
* {@code volatile} so the fast path in {@link #getFuture()} avoids
627+
* taking the lock. Set under {@code lock} together with clearing
628+
* {@code pending} so the two stay consistent.
629+
*/
630+
private volatile Object completedValue;
631+
632+
/**
633+
* Tracks futures previously returned by {@link #getFuture()} that have
634+
* not yet been completed. {@code null} once the aggregate has fired
635+
* (and all pending entries have been drained). Mutated only under
636+
* {@code lock}.
637+
*/
638+
private List<WeakReference<CompletableFuture<Object>>> pending = new ArrayList<>();
598639

599640
public Conditions(List<Condition> conditions) {
600641
this.conditions = conditions;
601-
this.conditionsFuture = conditions.isEmpty()
642+
this.aggregate = conditions.isEmpty()
602643
? new CompletableFuture<>() // Never completes if no conditions
603644
: CompletableFuture.anyOf(
604-
conditions.stream().map(Condition::execute).toArray(CompletableFuture[]::new));
645+
conditions.stream().map(Condition::execute).toArray(CompletableFuture[]::new));
646+
647+
// Single permanent listener. This is the only Completion node ever
648+
// attached to aggregate.stack -- subsequent getFuture() calls do
649+
// not touch the aggregate at all.
650+
this.aggregate.whenComplete((result, throwable) -> {
651+
List<WeakReference<CompletableFuture<Object>>> snapshot;
652+
synchronized (lock) {
653+
completedValue = (throwable == null) ? result : null;
654+
snapshot = pending;
655+
pending = null;
656+
}
657+
if (snapshot == null) {
658+
return;
659+
}
660+
for (WeakReference<CompletableFuture<Object>> ref : snapshot) {
661+
CompletableFuture<Object> cf = ref.get();
662+
if (cf == null) {
663+
continue; // Already GC'd -- nothing to complete.
664+
}
665+
if (throwable != null) {
666+
cf.completeExceptionally(throwable);
667+
} else {
668+
cf.complete(result);
669+
}
670+
}
671+
});
605672
}
606673

674+
/**
675+
* Returns a fresh future that will complete when the underlying
676+
* aggregate condition fires, or an already-completed future if the
677+
* aggregate has already fired by the time this method is called.
678+
*/
607679
public CompletableFuture<Object> getFuture() {
608-
return conditionsFuture;
680+
Object v = completedValue;
681+
if (v != null) {
682+
return CompletableFuture.completedFuture(v);
683+
}
684+
685+
CompletableFuture<Object> fresh = new CompletableFuture<>();
686+
synchronized (lock) {
687+
if (pending == null) {
688+
// Raced with aggregate completion. completedValue is now
689+
// guaranteed populated (set under lock before pending was
690+
// nulled).
691+
return CompletableFuture.completedFuture(completedValue);
692+
}
693+
// Opportunistic prune of weak refs whose target has been
694+
// collected. Keeps pending bounded even if the aggregate never
695+
// fires.
696+
Iterator<WeakReference<CompletableFuture<Object>>> it = pending.iterator();
697+
while (it.hasNext()) {
698+
if (it.next().get() == null) {
699+
it.remove();
700+
}
701+
}
702+
pending.add(new WeakReference<>(fresh));
703+
}
704+
return fresh;
609705
}
610706

611707
public void inform(FDv2SourceResult result) {
@@ -615,6 +711,31 @@ public void inform(FDv2SourceResult result) {
615711
@Override
616712
public void close() {
617713
conditions.forEach(Condition::close);
714+
synchronized (lock) {
715+
if (pending != null) {
716+
pending.clear();
717+
}
718+
}
719+
}
720+
721+
/**
722+
* Test-only: snapshot of the current pending list size after
723+
* opportunistic pruning. Used by tests to assert that the pending list
724+
* does not grow unboundedly across iterations.
725+
*/
726+
int pendingSize() {
727+
synchronized (lock) {
728+
if (pending == null) {
729+
return 0;
730+
}
731+
Iterator<WeakReference<CompletableFuture<Object>>> it = pending.iterator();
732+
while (it.hasNext()) {
733+
if (it.next().get() == null) {
734+
it.remove();
735+
}
736+
}
737+
return pending.size();
738+
}
618739
}
619740
}
620741
}
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
package com.launchdarkly.sdk.server;
2+
3+
import com.launchdarkly.sdk.server.FDv2DataSourceConditions.Condition;
4+
import com.launchdarkly.sdk.server.FDv2DataSourceConditions.FallbackCondition;
5+
import com.launchdarkly.sdk.server.FDv2DataSourceConditions.RecoveryCondition;
6+
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
7+
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
8+
9+
import org.junit.After;
10+
import org.junit.Before;
11+
import org.junit.Test;
12+
13+
import java.time.Instant;
14+
import java.util.Collections;
15+
import java.util.concurrent.CompletableFuture;
16+
import java.util.concurrent.Executors;
17+
import java.util.concurrent.ScheduledExecutorService;
18+
import java.util.concurrent.TimeUnit;
19+
20+
import static org.hamcrest.MatcherAssert.assertThat;
21+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
22+
import static org.hamcrest.Matchers.not;
23+
import static org.hamcrest.Matchers.sameInstance;
24+
import static org.junit.Assert.assertNotNull;
25+
import static org.junit.Assert.assertTrue;
26+
27+
/**
28+
* Direct tests for {@link FDv2DataSource.Conditions}.
29+
*
30+
* <p>The Conditions class is the aggregator that races fallback/recovery
31+
* condition futures against synchronizer.next() in the FDv2DataSource run
32+
* loop. Each iteration of that loop calls getFuture() and passes the result to
33+
* CompletableFuture.anyOf(...) -- so getFuture() must not return a shared
34+
* instance, or every anyOf call permanently attaches a Completion node to the
35+
* shared instance's stack, leaking memory proportional to event rate during
36+
* the synchronizer's tenure on a healthy primary.
37+
*/
38+
public class FDv2DataSourceConditionsAggregateTest {
39+
private ScheduledExecutorService executor;
40+
41+
@Before
42+
public void setUp() {
43+
executor = Executors.newScheduledThreadPool(1);
44+
}
45+
46+
@After
47+
public void tearDown() {
48+
executor.shutdownNow();
49+
}
50+
51+
/**
52+
* Bug-proving test: getFuture() must return a fresh instance per call.
53+
*
54+
* <p>If it returns the same instance (as it did before the fix), the run
55+
* loop's per-iteration {@code anyOf(getFuture(), syncNext)} attaches a new
56+
* OrRelay Completion to the shared future's stack every iteration, with no
57+
* deregister path -- a monotonic leak for a non-firing aggregate.
58+
*/
59+
@Test
60+
public void getFutureReturnsDistinctInstancesPerCall() {
61+
Condition fallback = new FallbackCondition(executor, 60);
62+
try (FDv2DataSource.Conditions conditions =
63+
new FDv2DataSource.Conditions(Collections.singletonList(fallback))) {
64+
CompletableFuture<Object> f1 = conditions.getFuture();
65+
CompletableFuture<Object> f2 = conditions.getFuture();
66+
CompletableFuture<Object> f3 = conditions.getFuture();
67+
assertThat(f1, not(sameInstance(f2)));
68+
assertThat(f2, not(sameInstance(f3)));
69+
assertThat(f1, not(sameInstance(f3)));
70+
}
71+
}
72+
73+
/**
74+
* Even with no underlying conditions (a single-synchronizer configuration),
75+
* getFuture() must return fresh instances. The aggregate never completes
76+
* in this case, which is exactly the scenario where any per-iteration
77+
* accumulation would be most damaging.
78+
*/
79+
@Test
80+
public void getFutureReturnsDistinctInstancesEvenWithNoConditions() {
81+
try (FDv2DataSource.Conditions conditions =
82+
new FDv2DataSource.Conditions(Collections.emptyList())) {
83+
CompletableFuture<Object> f1 = conditions.getFuture();
84+
CompletableFuture<Object> f2 = conditions.getFuture();
85+
assertThat(f1, not(sameInstance(f2)));
86+
}
87+
}
88+
89+
/**
90+
* Every fresh future returned by getFuture() must complete when the
91+
* underlying aggregate fires. The fan-out via the single permanent listener
92+
* is what makes the fresh-per-call pattern work; verify it actually
93+
* delivers.
94+
*/
95+
@Test
96+
public void allFreshFuturesCompleteWhenAggregateFires() throws Exception {
97+
// 0-second timeout -> fires on first INTERRUPTED inform.
98+
Condition fallback = new FallbackCondition(executor, 0);
99+
try (FDv2DataSource.Conditions conditions =
100+
new FDv2DataSource.Conditions(Collections.singletonList(fallback))) {
101+
CompletableFuture<Object> f1 = conditions.getFuture();
102+
CompletableFuture<Object> f2 = conditions.getFuture();
103+
CompletableFuture<Object> f3 = conditions.getFuture();
104+
105+
conditions.inform(makeInterruptedResult());
106+
107+
Object r1 = f1.get(2, TimeUnit.SECONDS);
108+
Object r2 = f2.get(2, TimeUnit.SECONDS);
109+
Object r3 = f3.get(2, TimeUnit.SECONDS);
110+
111+
assertNotNull(r1);
112+
assertNotNull(r2);
113+
assertNotNull(r3);
114+
assertTrue(r1 instanceof Condition);
115+
assertTrue(r2 instanceof Condition);
116+
assertTrue(r3 instanceof Condition);
117+
}
118+
}
119+
120+
/**
121+
* getFuture() called after the aggregate has already fired returns an
122+
* already-completed future synchronously (the fast path).
123+
*/
124+
@Test
125+
public void getFutureAfterAggregateFiresReturnsCompletedFuture() throws Exception {
126+
// RecoveryCondition arms its timer in the constructor and fires after
127+
// the configured timeout. With timeout=0 it fires near-immediately.
128+
Condition recovery = new RecoveryCondition(executor, 0);
129+
try (FDv2DataSource.Conditions conditions =
130+
new FDv2DataSource.Conditions(Collections.singletonList(recovery))) {
131+
// Drain a future to confirm the aggregate has fired.
132+
conditions.getFuture().get(2, TimeUnit.SECONDS);
133+
134+
CompletableFuture<Object> postFire = conditions.getFuture();
135+
assertTrue("post-fire getFuture() should be already complete", postFire.isDone());
136+
assertNotNull(postFire.get(0, TimeUnit.SECONDS));
137+
}
138+
}
139+
140+
/**
141+
* Bug-proving test for the underlying leak: repeated getFuture() calls
142+
* whose returned futures are then dropped (the run-loop pattern: each
143+
* iteration's anyOf result becomes garbage at end of iteration) must NOT
144+
* cause the pending list to grow without bound. The opportunistic prune
145+
* inside getFuture() collects entries whose WeakReference target has been
146+
* collected.
147+
*
148+
* <p>Java does not guarantee that {@link System#gc()} actually runs, but
149+
* in practice with HotSpot's default GC plus a brief sleep this is
150+
* reliable. If it ever flakes on CI, increase the iteration count or the
151+
* sleep, or migrate to a {@code -XX:+UseSerialGC} test profile.
152+
*/
153+
@Test
154+
public void pendingListDoesNotGrowUnboundedlyWhenFreshFuturesAreDropped()
155+
throws Exception {
156+
Condition fallback = new FallbackCondition(executor, 60); // never fires
157+
try (FDv2DataSource.Conditions conditions =
158+
new FDv2DataSource.Conditions(Collections.singletonList(fallback))) {
159+
int iterations = 10_000;
160+
for (int i = 0; i < iterations; i++) {
161+
CompletableFuture<Object> f = conditions.getFuture();
162+
// Simulate the run loop: race f against a fast-resolving sibling.
163+
// The anyOf result is awaited and discarded; f becomes unreachable
164+
// at end of iteration.
165+
CompletableFuture<Object> sibling = CompletableFuture.completedFuture("ok");
166+
CompletableFuture.anyOf(f, sibling).get(1, TimeUnit.SECONDS);
167+
// f goes out of scope here.
168+
169+
// Periodically encourage GC + give the cleanup path a chance.
170+
if (i % 1000 == 999) {
171+
System.gc();
172+
Thread.sleep(10);
173+
}
174+
}
175+
System.gc();
176+
Thread.sleep(50);
177+
178+
// After 10k iterations, pendingSize() should not be anywhere near
179+
// 10k. The opportunistic prune inside getFuture() runs on every
180+
// call, so any entry whose WeakReference has been collected drops
181+
// out. A small handful (< 100) of recently-added live refs is
182+
// expected because the most recent iterations may not yet have
183+
// been GC'd. Choose a generous ceiling to avoid CI flakiness while
184+
// still being orders of magnitude below the pre-fix accumulation.
185+
int finalSize = conditions.pendingSize();
186+
assertThat(
187+
"pending list size should be bounded; was " + finalSize
188+
+ " after " + iterations + " iterations",
189+
finalSize, lessThanOrEqualTo(500));
190+
}
191+
}
192+
193+
private static FDv2SourceResult makeInterruptedResult() {
194+
return FDv2SourceResult.interrupted(
195+
new DataSourceStatusProvider.ErrorInfo(
196+
DataSourceStatusProvider.ErrorKind.NETWORK_ERROR,
197+
0,
198+
"simulated",
199+
Instant.now()),
200+
false);
201+
}
202+
}

0 commit comments

Comments
 (0)