Skip to content

Commit 14e4f25

Browse files
committed
Merge remote-tracking branch 'origin/main' into ta/SDK-1611/daemon-mode-offline-mode
2 parents 8dfeadb + 4cc57ee commit 14e4f25

16 files changed

Lines changed: 2266 additions & 741 deletions

lib/sdk/server/contract-tests/service/src/main/java/sdktest/SdkClientEntity.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.launchdarkly.sdk.server.FlagsStateOption;
1212
import com.launchdarkly.sdk.server.LDClient;
1313
import com.launchdarkly.sdk.server.LDConfig;
14+
import com.launchdarkly.sdk.server.interfaces.ServiceEndpoints;
1415
import com.launchdarkly.sdk.server.migrations.Migration;
1516
import com.launchdarkly.sdk.server.migrations.MigrationBuilder;
1617
import com.launchdarkly.sdk.server.migrations.MigrationExecution;
@@ -25,12 +26,15 @@
2526
import com.launchdarkly.sdk.server.integrations.HooksConfigurationBuilder;
2627
import com.launchdarkly.sdk.server.integrations.ServiceEndpointsBuilder;
2728
import com.launchdarkly.sdk.server.integrations.StreamingDataSourceBuilder;
29+
import com.launchdarkly.sdk.server.integrations.PollingDataSourceBuilder;
2830
import com.launchdarkly.sdk.server.integrations.DataSystemBuilder;
2931
import com.launchdarkly.sdk.server.DataSystemComponents;
3032
import com.launchdarkly.sdk.server.integrations.FDv2PollingInitializerBuilder;
3133
import com.launchdarkly.sdk.server.integrations.FDv2PollingSynchronizerBuilder;
3234
import com.launchdarkly.sdk.server.integrations.FDv2StreamingSynchronizerBuilder;
3335
import com.launchdarkly.sdk.server.interfaces.BigSegmentStoreStatusProvider;
36+
import com.launchdarkly.sdk.server.subsystems.ComponentConfigurer;
37+
import com.launchdarkly.sdk.server.subsystems.DataSource;
3438
import com.launchdarkly.sdk.server.subsystems.DataSourceBuilder;
3539
import com.launchdarkly.sdk.server.datasources.Initializer;
3640
import com.launchdarkly.sdk.server.datasources.Synchronizer;
@@ -563,6 +567,22 @@ private LDConfig buildSdkConfig(SdkConfigParams params, String tag) {
563567
}
564568
}
565569

570+
// Configure FDv1 fallback synchronizer
571+
SdkConfigSynchronizerParams fallbackSynchronizer =
572+
selectFallbackSynchronizer(params.dataSystem);
573+
if (fallbackSynchronizer != null) {
574+
// Set global polling endpoints if the fallback synchronizer has polling with custom base URI
575+
if (fallbackSynchronizer.polling != null &&
576+
fallbackSynchronizer.polling.baseUri != null) {
577+
endpoints.polling(fallbackSynchronizer.polling.baseUri);
578+
}
579+
580+
// Create and configure FDv1 fallback
581+
ComponentConfigurer<DataSource> fdv1Fallback =
582+
createFDv1FallbackSynchronizer(fallbackSynchronizer);
583+
dataSystemBuilder.fDv1FallbackSynchronizer(fdv1Fallback);
584+
}
585+
566586
builder.dataSystem(dataSystemBuilder);
567587
}
568588

@@ -601,4 +621,59 @@ private DataSourceBuilder<Synchronizer> createSynchronizer(
601621
}
602622
return null;
603623
}
624+
625+
/**
626+
* Selects the best synchronizer configuration to use for FDv1 fallback.
627+
* Prefers polling synchronizers, falls back to primary synchronizer.
628+
*/
629+
private static SdkConfigSynchronizerParams selectFallbackSynchronizer(
630+
SdkConfigDataSystemParams dataSystemParams) {
631+
632+
// Prefer secondary polling synchronizer
633+
if (dataSystemParams.synchronizers != null &&
634+
dataSystemParams.synchronizers.secondary != null &&
635+
dataSystemParams.synchronizers.secondary.polling != null) {
636+
return dataSystemParams.synchronizers.secondary;
637+
}
638+
639+
// Fall back to primary polling synchronizer
640+
if (dataSystemParams.synchronizers != null &&
641+
dataSystemParams.synchronizers.primary != null &&
642+
dataSystemParams.synchronizers.primary.polling != null) {
643+
return dataSystemParams.synchronizers.primary;
644+
}
645+
646+
// Fall back to primary synchronizer (even if streaming)
647+
if (dataSystemParams.synchronizers != null &&
648+
dataSystemParams.synchronizers.primary != null) {
649+
return dataSystemParams.synchronizers.primary;
650+
}
651+
652+
return null;
653+
}
654+
655+
/**
656+
* Creates the FDv1 fallback synchronizer based on the selected synchronizer config.
657+
* FDv1 fallback is always polling-based and uses the global service endpoints configuration.
658+
*/
659+
private static ComponentConfigurer<DataSource> createFDv1FallbackSynchronizer(
660+
SdkConfigSynchronizerParams synchronizer) {
661+
662+
// FDv1 fallback is always polling-based
663+
PollingDataSourceBuilder fdv1Polling = Components.pollingDataSource();
664+
665+
// Configure polling interval if the synchronizer has polling configuration
666+
if (synchronizer.polling != null) {
667+
if (synchronizer.polling.pollIntervalMs != null) {
668+
fdv1Polling.pollInterval(Duration.ofMillis(synchronizer.polling.pollIntervalMs));
669+
}
670+
// Note: FDv1 polling doesn't support per-source service endpoints override,
671+
// so it will use the global service endpoints configuration (which is set
672+
// by the caller before this method is invoked)
673+
}
674+
// If streaming synchronizer, use default polling interval
675+
// (no additional configuration needed)
676+
677+
return fdv1Polling;
678+
}
604679
}
Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +0,0 @@
1-
streaming/validation/unrecognized data that can be safely ignored/unknown event name with JSON body
2-
streaming/validation/unrecognized data that can be safely ignored/unknown event name with non-JSON body
3-
streaming/validation/unrecognized data that can be safely ignored/patch event with unrecognized path kind
4-
streaming/fdv2/fallback to FDv1 handling
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
package com.launchdarkly.sdk.server;
2+
3+
import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
4+
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
5+
import com.launchdarkly.sdk.server.datasources.Synchronizer;
6+
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
7+
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
8+
import com.launchdarkly.sdk.server.subsystems.DataSource;
9+
import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink;
10+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes;
11+
12+
import java.io.IOException;
13+
import java.time.Instant;
14+
import java.util.AbstractMap;
15+
import java.util.Collections;
16+
import java.util.Map;
17+
import java.util.concurrent.CancellationException;
18+
import java.util.concurrent.CompletableFuture;
19+
import java.util.concurrent.ExecutionException;
20+
import java.util.concurrent.Future;
21+
22+
/**
23+
* Adapter that wraps a DataSource (FDv1 protocol) and exposes it as a Synchronizer (FDv2 protocol).
24+
* <p>
25+
* This adapter bridges the push-based DataSource interface with the pull-based Synchronizer interface
26+
* by listening to updates through a custom DataSourceUpdateSink and queueing them as FDv2SourceResult objects.
27+
* <p>
28+
* The adapter is constructed with a factory function that receives the listening update sink and
29+
* creates the DataSource. This ensures the DataSource uses the adapter's internal sink without exposing it.
30+
*/
31+
class DataSourceSynchronizerAdapter implements Synchronizer {
32+
private final DataSource dataSource;
33+
private final IterableAsyncQueue<FDv2SourceResult> resultQueue = new IterableAsyncQueue<>();
34+
private final CompletableFuture<FDv2SourceResult> shutdownFuture = new CompletableFuture<>();
35+
private final Object startLock = new Object();
36+
private boolean started = false;
37+
private boolean closed = false;
38+
private Future<Void> startFuture;
39+
40+
/**
41+
* Functional interface for creating a DataSource with a given update sink.
42+
*/
43+
@FunctionalInterface
44+
public interface DataSourceFactory {
45+
DataSource create(DataSourceUpdateSink updateSink);
46+
}
47+
48+
/**
49+
* Creates a new adapter that wraps a DataSource.
50+
*
51+
* @param dataSourceFactory factory that creates the DataSource with the provided update sink
52+
*/
53+
public DataSourceSynchronizerAdapter(DataSourceFactory dataSourceFactory) {
54+
ConvertingUpdateSink convertingSink = new ConvertingUpdateSink(resultQueue);
55+
this.dataSource = dataSourceFactory.create(convertingSink);
56+
}
57+
58+
@Override
59+
public CompletableFuture<FDv2SourceResult> next() {
60+
synchronized (startLock) {
61+
if (!started && !closed) {
62+
started = true;
63+
startFuture = dataSource.start();
64+
65+
// Monitor the start future for errors
66+
// The data source will emit updates through the listening sink
67+
Thread monitorThread = new Thread(() -> {
68+
try {
69+
startFuture.get();
70+
} catch (ExecutionException e) {
71+
// Initialization failed - emit an interrupted status
72+
DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo(
73+
DataSourceStatusProvider.ErrorKind.UNKNOWN,
74+
0,
75+
e.getCause() != null ? e.getCause().toString() : e.toString(),
76+
Instant.now()
77+
);
78+
resultQueue.put(FDv2SourceResult.interrupted(errorInfo, false));
79+
} catch (CancellationException e) {
80+
// Start future was canceled (during close) - exit cleanly
81+
} catch (InterruptedException e) {
82+
Thread.currentThread().interrupt();
83+
}
84+
});
85+
monitorThread.setName("LaunchDarkly-SDK-Server-DataSourceAdapter-Monitor");
86+
monitorThread.setDaemon(true);
87+
monitorThread.start();
88+
}
89+
}
90+
91+
return CompletableFuture.anyOf(shutdownFuture, resultQueue.take())
92+
.thenApply(result -> (FDv2SourceResult) result);
93+
}
94+
95+
@Override
96+
public void close() {
97+
synchronized (startLock) {
98+
if (closed) {
99+
return;
100+
}
101+
closed = true;
102+
}
103+
104+
try {
105+
dataSource.close();
106+
} catch (IOException e) {
107+
// Ignore as we are shutting down.
108+
}
109+
shutdownFuture.complete(FDv2SourceResult.shutdown());
110+
if(startFuture != null) {
111+
// If the start future is done, this has no effect.
112+
// If it is not, then this will unblock the code waiting on start.
113+
startFuture.cancel(true);
114+
}
115+
}
116+
117+
/**
118+
* A DataSourceUpdateSink that converts DataSource updates into FDv2SourceResult objects.
119+
* This sink doesn't delegate to any other sink - it exists solely to convert FDv1 updates to FDv2 results.
120+
*/
121+
private static class ConvertingUpdateSink implements DataSourceUpdateSink {
122+
private final IterableAsyncQueue<FDv2SourceResult> resultQueue;
123+
124+
public ConvertingUpdateSink(IterableAsyncQueue<FDv2SourceResult> resultQueue) {
125+
this.resultQueue = resultQueue;
126+
}
127+
128+
@Override
129+
public boolean init(DataStoreTypes.FullDataSet<DataStoreTypes.ItemDescriptor> allData) {
130+
// Convert the full data set into a ChangeSet and emit it
131+
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet =
132+
new DataStoreTypes.ChangeSet<>(
133+
DataStoreTypes.ChangeSetType.Full,
134+
Selector.EMPTY,
135+
allData.getData(),
136+
null,
137+
allData.shouldPersist()
138+
);
139+
resultQueue.put(FDv2SourceResult.changeSet(changeSet, false));
140+
return true;
141+
}
142+
143+
@Override
144+
public boolean upsert(DataStoreTypes.DataKind kind, String key, DataStoreTypes.ItemDescriptor item) {
145+
// Convert the upsert into a ChangeSet with a single item and emit it
146+
DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor> items =
147+
new DataStoreTypes.KeyedItems<>(Collections.<Map.Entry<String, DataStoreTypes.ItemDescriptor>>singletonList(
148+
new AbstractMap.SimpleEntry<>(key, item)));
149+
Iterable<Map.Entry<DataStoreTypes.DataKind, DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor>>> data =
150+
Collections.<Map.Entry<DataStoreTypes.DataKind, DataStoreTypes.KeyedItems<DataStoreTypes.ItemDescriptor>>>singletonList(
151+
new AbstractMap.SimpleEntry<>(kind, items));
152+
153+
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet =
154+
new DataStoreTypes.ChangeSet<>(
155+
DataStoreTypes.ChangeSetType.Partial,
156+
Selector.EMPTY,
157+
data,
158+
null,
159+
true // default to true as this adapter is used for adapting FDv1 data sources which are always persistent
160+
);
161+
resultQueue.put(FDv2SourceResult.changeSet(changeSet, false));
162+
return true;
163+
}
164+
165+
@Override
166+
public DataStoreStatusProvider getDataStoreStatusProvider() {
167+
// This adapter doesn't use a data store
168+
return null;
169+
}
170+
171+
@Override
172+
public void updateStatus(DataSourceStatusProvider.State newState, DataSourceStatusProvider.ErrorInfo newError) {
173+
// Convert state changes to FDv2SourceResult status events
174+
switch (newState) {
175+
case INTERRUPTED:
176+
resultQueue.put(FDv2SourceResult.interrupted(newError, false));
177+
break;
178+
case OFF:
179+
if (newError != null) {
180+
resultQueue.put(FDv2SourceResult.terminalError(newError, false));
181+
}
182+
break;
183+
case VALID:
184+
case INITIALIZING:
185+
// These states don't map to FDv2SourceResult status events
186+
break;
187+
}
188+
}
189+
}
190+
}

0 commit comments

Comments
 (0)