99import com .launchdarkly .sdk .server .subsystems .DataSource ;
1010import com .launchdarkly .sdk .server .subsystems .DataSourceUpdateSinkV2 ;
1111
12- import java .io .IOException ;
1312import java .util .ArrayList ;
1413import java .util .Collections ;
1514import java .util .List ;
@@ -154,17 +153,18 @@ private void runInitializers() {
154153 /**
155154 * Determine conditions for the current synchronizer. Synchronizers require different conditions depending on if
156155 * they are the 'prime' synchronizer or if there are other available synchronizers to use.
156+ *
157157 * @return a list of conditions to apply to the synchronizer
158158 */
159159 private List <Condition > getConditions () {
160160 int availableSynchronizers = synchronizerStateManager .getAvailableSynchronizerCount ();
161161 boolean isPrimeSynchronizer = synchronizerStateManager .isPrimeSynchronizer ();
162162
163- if (availableSynchronizers == 1 ) {
163+ if (availableSynchronizers == 1 ) {
164164 // If there is only 1 synchronizer, then we cannot fall back or recover, so we don't need any conditions.
165165 return Collections .emptyList ();
166166 }
167- if (isPrimeSynchronizer ) {
167+ if (isPrimeSynchronizer ) {
168168 // If there isn't a synchronizer to recover to, then don't add and recovery conditions.
169169 return conditionFactories .stream ()
170170 .filter ((ConditionFactory factory ) -> factory .getType () != Condition .ConditionType .RECOVERY )
@@ -175,111 +175,96 @@ private List<Condition> getConditions() {
175175 }
176176
177177 private boolean runSynchronizers () {
178- SynchronizerFactoryWithState availableSynchronizer = synchronizerStateManager . getNextAvailableSynchronizer ();
179- while ( availableSynchronizer != null ) {
180- Synchronizer synchronizer = availableSynchronizer . build ();
181-
182- // Returns true if shutdown .
183- if ( synchronizerStateManager . setActiveSource ( synchronizer )) return false ;
184-
185- try {
186- boolean running = true ;
187- boolean fdv1Fallback = false ;
188-
189- // Conditions run once for the life of the synchronizer.
190- List < Condition > conditions = getConditions () ;
191-
192- // The conditionsFuture will complete if any condition is met. Meeting any condition means we will
193- // switch to a different synchronizer.
194- CompletableFuture <Object > conditionsFuture = CompletableFuture . anyOf (
195- conditions . stream (). map ( Condition :: execute ). toArray ( CompletableFuture []:: new ));
196-
197- while ( running ) {
198- // If the loop needs to be exited, then running should be set to false, and the loop broken.
199- // We don't want to return within the loop because that would bypass cleanup.
200-
201- CompletableFuture < FDv2SourceResult > nextResultFuture = synchronizer . next () ;
202-
203- Object res = CompletableFuture . anyOf ( conditionsFuture , nextResultFuture ). get ();
204-
205- if ( res instanceof Condition ) {
206- Condition c = ( Condition ) res ;
207- switch ( c . getType ()) {
208- case FALLBACK :
209- // For fallback , we will move to the next available synchronizer, which may loop .
210- // This is the default behavior of exiting the run loop, so we don't need to take
211- // any action.
212- break ;
213- case RECOVERY :
214- // For recovery, we will start at the first available synchronizer .
215- // So we reset the source index, and finding the source will start at the beginning.
216- synchronizerStateManager . resetSourceIndex ();
178+ // When runSynchronizers exists, no matter how it exits, the synchronizerStateManager will be closed.
179+ try {
180+ SynchronizerFactoryWithState availableSynchronizer = synchronizerStateManager . getNextAvailableSynchronizer ();
181+
182+ // We want to continue running synchronizers for as long as any are available .
183+ while ( availableSynchronizer != null ) {
184+ Synchronizer synchronizer = availableSynchronizer . build ();
185+
186+ // Returns true if shutdown.
187+ if ( synchronizerStateManager . setActiveSource ( synchronizer )) return false ;
188+
189+ try {
190+ boolean running = true ;
191+
192+ try ( Conditions conditions = new Conditions ( getConditions ())) {
193+ while ( running ) {
194+ CompletableFuture <FDv2SourceResult > nextResultFuture = synchronizer . next ();
195+
196+ // The conditionsFuture will complete if any condition is met. Meeting any condition means we will
197+ // switch to a different synchronizer.
198+ Object res = CompletableFuture . anyOf ( conditions . getFuture (), nextResultFuture ). get ();
199+
200+ if ( res instanceof Condition ) {
201+ Condition c = ( Condition ) res ;
202+ switch ( c . getType ()) {
203+ case FALLBACK :
204+ // For fallback, we will move to the next available synchronizer, which may loop.
205+ // This is the default behavior of exiting the run loop, so we don't need to take
206+ // any action.
207+ break ;
208+ case RECOVERY :
209+ // For recovery , we will start at the first available synchronizer.
210+ // So we reset the source index, and finding the source will start at the beginning.
211+ synchronizerStateManager . resetSourceIndex ();
212+ break ;
213+ }
214+ // A running synchronizer will only have fallback and recovery conditions that it can act on .
215+ // So, if there are no synchronizers to recover to or fallback to, then we will not have
216+ // those conditions.
217217 break ;
218- }
219- // A running synchronizer will only have fallback and recovery conditions that it can act on.
220- // So, if there are no synchronizers to recover to or fallback to, then we will not have
221- // those conditions.
222- break ;
223- }
218+ }
224219
225220
226- FDv2SourceResult result = (FDv2SourceResult ) res ;
227- conditions . forEach ( c -> c .inform (result ) );
221+ FDv2SourceResult result = (FDv2SourceResult ) res ;
222+ conditions .inform (result );
228223
229- switch (result .getResultType ()) {
230- case CHANGE_SET :
231- dataSourceUpdates .apply (result .getChangeSet ());
232- // This could have been completed by any data source. But if it has not been completed before
233- // now, then we complete it.
234- startFuture .complete (true );
235- break ;
236- case STATUS :
237- FDv2SourceResult .Status status = result .getStatus ();
238- switch (status .getState ()) {
239- case INTERRUPTED :
240- // TODO: Track how long we are interrupted.
241- break ;
242- case SHUTDOWN :
243- // We should be overall shutting down.
244- // TODO: We may need logging or to do a little more.
245- running = false ;
224+ switch (result .getResultType ()) {
225+ case CHANGE_SET :
226+ dataSourceUpdates .apply (result .getChangeSet ());
227+ // This could have been completed by any data source. But if it has not been completed before
228+ // now, then we complete it.
229+ startFuture .complete (true );
246230 break ;
247- case TERMINAL_ERROR :
248- availableSynchronizer .block ();
249- running = false ;
250- break ;
251- case GOODBYE :
252- // We let the synchronizer handle this internally.
231+ case STATUS :
232+ FDv2SourceResult .Status status = result .getStatus ();
233+ switch (status .getState ()) {
234+ case INTERRUPTED :
235+ // TODO: Track how long we are interrupted.
236+ break ;
237+ case SHUTDOWN :
238+ // We should be overall shutting down.
239+ // TODO: We may need logging or to do a little more.
240+ return false ;
241+ case TERMINAL_ERROR :
242+ availableSynchronizer .block ();
243+ running = false ;
244+ break ;
245+ case GOODBYE :
246+ // We let the synchronizer handle this internally.
247+ break ;
248+ }
253249 break ;
254250 }
255- break ;
256- }
257- // We have been requested to fall back to FDv1. We handle whatever message was associated,
258- // close the synchronizer, and then fallback.
259- if (result .isFdv1Fallback ()) {
260- fdv1Fallback = true ;
261- running = false ;
251+ // We have been requested to fall back to FDv1. We handle whatever message was associated,
252+ // close the synchronizer, and then fallback.
253+ if (result .isFdv1Fallback ()) {
254+ return true ;
255+ }
256+ }
262257 }
258+ } catch (ExecutionException | InterruptedException | CancellationException e ) {
259+ // TODO: Log.
260+ // Move to next synchronizer.
263261 }
264- // We are going to move to the next synchronizer or exit the synchronization loop, so we can close any
265- // conditions for this synchronizer.
266- conditions .forEach (Condition ::close );
267- // If we are falling back, then we exit the synchronization process.
268- if (fdv1Fallback ) {
269- // When falling back to FDv1, we are done with any FDv2 synchronizers.
270- synchronizerStateManager .close ();
271- return true ;
272- }
273-
274- } catch (ExecutionException | InterruptedException | CancellationException e ) {
275- // TODO: Log.
276- // Move to next synchronizer.
262+ availableSynchronizer = synchronizerStateManager .getNextAvailableSynchronizer ();
277263 }
278- availableSynchronizer = synchronizerStateManager .getNextAvailableSynchronizer ();
264+ return false ;
265+ } finally {
266+ synchronizerStateManager .close ();
279267 }
280-
281- synchronizerStateManager .close ();
282- return false ;
283268 }
284269
285270 @ Override
@@ -310,4 +295,33 @@ public void close() {
310295 // If this is already set, then this has no impact.
311296 startFuture .complete (false );
312297 }
298+
299+ /**
300+ * Helper class to manage the lifecycle of conditions with automatic cleanup.
301+ */
302+ private static class Conditions implements AutoCloseable {
303+ private final List <Condition > conditions ;
304+ private final CompletableFuture <Object > conditionsFuture ;
305+
306+ public Conditions (List <Condition > conditions ) {
307+ this .conditions = conditions ;
308+ this .conditionsFuture = conditions .isEmpty ()
309+ ? new CompletableFuture <>() // Never completes if no conditions
310+ : CompletableFuture .anyOf (
311+ conditions .stream ().map (Condition ::execute ).toArray (CompletableFuture []::new ));
312+ }
313+
314+ public CompletableFuture <Object > getFuture () {
315+ return conditionsFuture ;
316+ }
317+
318+ public void inform (FDv2SourceResult result ) {
319+ conditions .forEach (c -> c .inform (result ));
320+ }
321+
322+ @ Override
323+ public void close () {
324+ conditions .forEach (Condition ::close );
325+ }
326+ }
313327}
0 commit comments