Skip to content

Commit 64b98e3

Browse files
authored
refactor(fdv2): add server FDv2 heuristic fallback and recovery (#531)
## Summary Heuristic FDv2 fallback (interrupted ≥ 2 min) and recovery (valid ≥ 5 min) for the server SDK, modeled on Java's event-driven `Condition` design. The orchestrator races a condition future against `synchronizer.Next()`; whichever resolves first wins. Synchronizer rotation is cyclic with `Available`/`Blocked` state, mirroring Java's `SourceManager`. ## What's in - `IFDv2Condition` + concrete `FallbackCondition` / `RecoveryCondition` with internal timers and an `Inform` / `Execute` interface. - `Conditions` aggregator over multiple conditions, built on a new `async::WhenAny` vector overload. - `SourceManager` owns the synchronizer factory list with per-factory `Available`/`Blocked` state. Iteration is cyclic (wraps + skips blocked). `BlockCurrentSynchronizer` on `TerminalError`; `ResetSourceIndex` on recovery. - `FDv2DataSystem` builds conditions keyed off `AvailableSynchronizerCount`/`IsPrimeSynchronizer` (no conditions if only one available; prime gets fallback only; others get fallback + recovery). - Drops the `timeout` argument from `IFDv2Synchronizer::Next()` and removes the unreachable `FDv2SourceResult::Timeout` variant. ## Deferred to future PRs - FDv1 fallback directive (`X-LD-FD-Fallback` response header) — server-driven, one-way switch to an FDv1 synchronizer. - FDv1 synchronizer adapter wrapping the existing FDv1 sources as `IFDv2Synchronizer`. - Most of the spec's orchestration-logging requirements. ## Test plan Unit tests for each new component and integration tests for fallback advance, wrap-around on last synchronizer, recovery reset, and exhaustion via block. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Changes FDv2 orchestration and synchronizer interfaces (removing `Next()` timeouts/`Timeout` results) while introducing timer-driven fallback/recovery and cyclic source rotation, which could affect update reliability and shutdown behavior. > > **Overview** > Adds a new FDv2 *condition* abstraction (`IFDv2Condition`) with timed `FallbackCondition`/`RecoveryCondition` implementations plus a `Conditions` aggregator that races condition futures against `synchronizer.Next()` to trigger source transitions. > > Refactors FDv2 synchronizer rotation into a new `SourceManager` that cycles through synchronizer factories, skips blocked sources, blocks on `TerminalError`, and resets to the most-preferred source on recovery. > > Removes FDv2 per-`Next()` timeouts by dropping the timeout parameter from `IFDv2Synchronizer::Next()` and deleting the `FDv2SourceResult::Timeout` variant; updates polling/streaming synchronizers and tests accordingly. Also adds a vector overload of `async::WhenAny` and comprehensive unit/integration tests for the new rotation and condition behavior. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 1f7f26e. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 261ba85 commit 64b98e3

20 files changed

Lines changed: 1669 additions & 287 deletions

libs/internal/include/launchdarkly/async/promise.hpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,30 @@ Future<std::size_t> WhenAny(Future<Ts>... futures) {
534534
return result;
535535
}
536536

537+
// Vector overload of WhenAny. Returns a Future<std::size_t> that resolves with
538+
// the 0-based index of whichever input future resolves first. If the vector is
539+
// empty, the returned future never resolves.
540+
template <typename T>
541+
Future<std::size_t> WhenAny(std::vector<Future<T>> const& futures) {
542+
Promise<std::size_t> promise;
543+
Future<std::size_t> result = promise.GetFuture();
544+
545+
auto shared_promise =
546+
std::make_shared<Promise<std::size_t>>(std::move(promise));
547+
548+
for (std::size_t i = 0; i < futures.size(); ++i) {
549+
Future<T> future = futures[i];
550+
future.Then(
551+
[shared_promise, i](T const&) -> std::monostate {
552+
shared_promise->Resolve(i);
553+
return std::monostate{};
554+
},
555+
[](Continuation<void()> f) { f(); });
556+
}
557+
558+
return result;
559+
}
560+
537561
// MakeFuture returns an already-resolved Future<T>. Useful in flattening Then
538562
// continuations where some branches produce a result immediately and others
539563
// return a Future, requiring a uniform Future<T> return type across all

libs/internal/tests/promise_test.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,34 @@ TEST(WhenAny, MixedTypesFirstWins) {
416416
EXPECT_EQ(std::get<std::string>(*result.GetResult()), "hello");
417417
}
418418

419+
// Verifies that the vector overload of WhenAny resolves with the index of
420+
// the first future to resolve.
421+
TEST(WhenAny, VectorFirstResolved) {
422+
Promise<int> p0;
423+
Promise<int> p1;
424+
Promise<int> p2;
425+
426+
std::vector<Future<int>> futures{p0.GetFuture(), p1.GetFuture(),
427+
p2.GetFuture()};
428+
Future<std::size_t> result = WhenAny(futures);
429+
430+
EXPECT_FALSE(result.IsFinished());
431+
p2.Resolve(99);
432+
433+
auto r = result.WaitForResult(std::chrono::seconds(5));
434+
ASSERT_TRUE(r.has_value());
435+
EXPECT_EQ(*r, 2u);
436+
}
437+
438+
// Verifies that an empty vector produces a future that never resolves.
439+
TEST(WhenAny, VectorEmptyNeverResolves) {
440+
std::vector<Future<int>> futures;
441+
Future<std::size_t> result = WhenAny(futures);
442+
443+
auto r = result.WaitForResult(std::chrono::milliseconds(50));
444+
EXPECT_FALSE(r.has_value());
445+
}
446+
419447
// Verifies that WhenAny resolves immediately if a future is already resolved.
420448
TEST(WhenAny, AlreadyResolved) {
421449
Promise<int> p0;

libs/server-sdk/src/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ target_sources(${LIBNAME}
6060
data_systems/fdv2/polling_synchronizer.cpp
6161
data_systems/fdv2/streaming_synchronizer.hpp
6262
data_systems/fdv2/streaming_synchronizer.cpp
63+
data_systems/fdv2/conditions.hpp
64+
data_systems/fdv2/conditions.cpp
65+
data_systems/fdv2/source_manager.hpp
66+
data_systems/fdv2/source_manager.cpp
6367
data_systems/fdv2/fdv2_data_system.hpp
6468
data_systems/fdv2/fdv2_data_system.cpp
6569
data_systems/background_sync/sources/streaming/streaming_data_source.hpp

libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,17 +56,8 @@ struct FDv2SourceResult {
5656
bool fdv1_fallback;
5757
};
5858

59-
/**
60-
* Next() returned because the timeout expired before a result arrived.
61-
*/
62-
struct Timeout {};
63-
64-
using Value = std::variant<ChangeSet,
65-
Interrupted,
66-
TerminalError,
67-
Shutdown,
68-
Goodbye,
69-
Timeout>;
59+
using Value =
60+
std::variant<ChangeSet, Interrupted, TerminalError, Shutdown, Goodbye>;
7061

7162
Value value;
7263
};
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#pragma once
2+
3+
#include "fdv2_source_result.hpp"
4+
5+
#include <launchdarkly/async/promise.hpp>
6+
7+
#include <memory>
8+
9+
namespace launchdarkly::server_side::data_interfaces {
10+
11+
/**
12+
* A condition observes the orchestrator's stream of synchronizer results and
13+
* fires when criteria for a synchronizer transition are met.
14+
*
15+
* Each condition plays one of two roles, identified by Type():
16+
* - kFallback: when fired, the orchestrator stops the active synchronizer
17+
* and starts the next-preferred one.
18+
* - kRecovery: when fired, the orchestrator stops the active fallback
19+
* synchronizer and returns to the most-preferred synchronizer.
20+
*
21+
* Conditions are stateful: the orchestrator pushes results into a condition
22+
* via Inform() so the condition can update its internal state (typically a
23+
* timer). When the condition's criteria are satisfied, the future returned
24+
* by Execute() resolves with the condition's Type.
25+
*
26+
* Conditions are single-use: once fired, they are not re-armed. The
27+
* orchestrator builds a fresh condition for each synchronizer activation via
28+
* an IFDv2ConditionFactory.
29+
*
30+
* Close() cancels any pending internal work (e.g., a timer) and resolves the
31+
* future with kCancelled.
32+
*
33+
* Implementations must be thread-safe: Execute, Inform, Close, and GetType
34+
* may be called from any thread.
35+
*/
36+
class IFDv2Condition {
37+
public:
38+
enum class Type {
39+
/** Stop the active synchronizer and start the next-preferred one. */
40+
kFallback,
41+
/** Return to the most-preferred synchronizer. */
42+
kRecovery,
43+
/** The condition was Close()d before firing; orchestrator ignores. */
44+
kCancelled,
45+
};
46+
47+
/**
48+
* Returns a Future that resolves with the condition's Type once the
49+
* condition's criteria are satisfied. May be called multiple times; each
50+
* call returns a Future referring to the same underlying state.
51+
*/
52+
[[nodiscard]] virtual async::Future<Type> Execute() = 0;
53+
54+
/**
55+
* Pushes a synchronizer result into the condition so it can update any
56+
* internal state (e.g., arm or cancel a timer).
57+
*/
58+
virtual void Inform(FDv2SourceResult const& result) = 0;
59+
60+
/**
61+
* Cancels any pending internal work and resolves the future returned by
62+
* Execute() with kCancelled if it has not already resolved. Idempotent.
63+
*/
64+
virtual void Close() = 0;
65+
66+
/**
67+
* Returns the condition's role in the orchestrator.
68+
*/
69+
[[nodiscard]] virtual Type GetType() const = 0;
70+
71+
virtual ~IFDv2Condition() = default;
72+
IFDv2Condition(IFDv2Condition const&) = delete;
73+
IFDv2Condition(IFDv2Condition&&) = delete;
74+
IFDv2Condition& operator=(IFDv2Condition const&) = delete;
75+
IFDv2Condition& operator=(IFDv2Condition&&) = delete;
76+
77+
protected:
78+
IFDv2Condition() = default;
79+
};
80+
81+
/**
82+
* Builds new IFDv2Condition instances on demand. Each call to Build() produces
83+
* a fresh condition with no prior state.
84+
*
85+
* Implementations must be thread-safe: Build and GetType may be called from
86+
* any thread.
87+
*/
88+
class IFDv2ConditionFactory {
89+
public:
90+
[[nodiscard]] virtual std::unique_ptr<IFDv2Condition> Build() = 0;
91+
92+
/**
93+
* Returns the type of conditions this factory builds.
94+
*/
95+
[[nodiscard]] virtual IFDv2Condition::Type GetType() const = 0;
96+
97+
virtual ~IFDv2ConditionFactory() = default;
98+
IFDv2ConditionFactory(IFDv2ConditionFactory const&) = delete;
99+
IFDv2ConditionFactory(IFDv2ConditionFactory&&) = delete;
100+
IFDv2ConditionFactory& operator=(IFDv2ConditionFactory const&) = delete;
101+
IFDv2ConditionFactory& operator=(IFDv2ConditionFactory&&) = delete;
102+
103+
protected:
104+
IFDv2ConditionFactory() = default;
105+
};
106+
107+
} // namespace launchdarkly::server_side::data_interfaces

libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
#include <launchdarkly/async/promise.hpp>
66
#include <launchdarkly/data_model/selector.hpp>
77

8-
#include <chrono>
98
#include <string>
109

1110
namespace launchdarkly::server_side::data_interfaces {
@@ -20,25 +19,19 @@ namespace launchdarkly::server_side::data_interfaces {
2019
class IFDv2Synchronizer {
2120
public:
2221
/**
23-
* Returns a Future that resolves with the next result once it is available
24-
* or the timeout expires.
22+
* Returns a Future that resolves with the next result once it is
23+
* available.
2524
*
2625
* On the first call, the synchronizer starts its underlying connection.
2726
* Subsequent calls continue reading from the same connection.
2827
*
29-
* If the timeout expires before a result arrives, the future resolves with
30-
* FDv2SourceResult::Timeout. The orchestrator uses this to evaluate
31-
* fallback conditions.
32-
*
3328
* Close() may be called from another thread to unblock Next(), in which
3429
* case the future resolves with FDv2SourceResult::Shutdown.
3530
*
36-
* @param timeout Maximum time to wait for the next result.
3731
* @param selector The selector to send with the request, reflecting any
3832
* changesets applied since the previous call.
3933
*/
4034
virtual async::Future<FDv2SourceResult> Next(
41-
std::chrono::milliseconds timeout,
4235
data_model::Selector selector) = 0;
4336

4437
/**

0 commit comments

Comments
 (0)