Skip to content

Commit eb28df0

Browse files
committed
feat: orchestrator switches to FDv1 fallback on directive
1 parent c95b74a commit eb28df0

6 files changed

Lines changed: 243 additions & 10 deletions

File tree

libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer_factory.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ class IFDv2SynchronizerFactory {
1414
public:
1515
virtual std::unique_ptr<IFDv2Synchronizer> Build() = 0;
1616

17+
[[nodiscard]] virtual bool IsFDv1Fallback() const { return false; }
18+
1719
virtual ~IFDv2SynchronizerFactory() = default;
1820
IFDv2SynchronizerFactory(IFDv2SynchronizerFactory const&) = delete;
1921
IFDv2SynchronizerFactory(IFDv2SynchronizerFactory&&) = delete;

libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,12 @@ void FDv2DataSystem::OnInitializerResult(
185185
if (closed_ || got_shutdown) {
186186
return;
187187
}
188+
if (result.fdv1_fallback) {
189+
LD_LOG(logger_, LogLevel::kInfo)
190+
<< Identity() << ": FDv1 fallback engaged";
191+
source_manager_.SwitchToFDv1Fallback();
192+
got_basis = true;
193+
}
188194
}
189195

190196
if (got_basis) {
@@ -349,7 +355,14 @@ void FDv2DataSystem::OnSynchronizerResult(
349355
active_conditions_.reset();
350356
return;
351357
}
352-
if (advance) {
358+
if (result.fdv1_fallback) {
359+
LD_LOG(logger_, LogLevel::kInfo)
360+
<< Identity() << ": FDv1 fallback engaged";
361+
source_manager_.SwitchToFDv1Fallback();
362+
active_synchronizer_.reset();
363+
active_conditions_.reset();
364+
advance = true;
365+
} else if (advance) {
353366
source_manager_.BlockCurrentSynchronizer();
354367
active_synchronizer_.reset();
355368
active_conditions_.reset();

libs/server-sdk/src/data_systems/fdv2/source_manager.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ SourceManager::SourceManager(
1111
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> factories) {
1212
synchronizers_.reserve(factories.size());
1313
for (auto& factory : factories) {
14-
synchronizers_.push_back(
15-
SynchronizerFactoryWithState{std::move(factory), State::kAvailable,
16-
/*is_fdv1_fallback=*/false});
14+
bool const is_fdv1_fallback = factory->IsFDv1Fallback();
15+
synchronizers_.push_back(SynchronizerFactoryWithState{
16+
std::move(factory),
17+
is_fdv1_fallback ? State::kBlocked : State::kAvailable,
18+
is_fdv1_fallback});
1719
}
1820
}
1921

@@ -44,6 +46,14 @@ void SourceManager::ResetSourceIndex() {
4446
synchronizer_index_ = -1;
4547
}
4648

49+
void SourceManager::SwitchToFDv1Fallback() {
50+
for (auto& entry : synchronizers_) {
51+
entry.state =
52+
entry.is_fdv1_fallback ? State::kAvailable : State::kBlocked;
53+
}
54+
synchronizer_index_ = -1;
55+
}
56+
4757
bool SourceManager::IsPrimeSynchronizer() const {
4858
for (std::size_t i = 0; i < synchronizers_.size(); ++i) {
4959
if (synchronizers_[i].state == State::kAvailable) {

libs/server-sdk/src/data_systems/fdv2/source_manager.hpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ namespace launchdarkly::server_side::data_systems {
2323
* by recovery, which wants to fall back to the most-preferred Available
2424
* synchronizer.
2525
*
26-
* Each factory also carries an is_fdv1_fallback flag, currently always
27-
* false. TODO: populate when the FDv1 fallback directive is implemented.
26+
* Factories whose IsFDv1Fallback() returns true start in the Blocked state.
2827
*
2928
* Not thread-safe. The caller is responsible for serializing all calls.
3029
*/
@@ -54,6 +53,14 @@ class SourceManager {
5453
*/
5554
void ResetSourceIndex();
5655

56+
/**
57+
* Blocks every non-FDv1 factory and unblocks the FDv1 fallback factory,
58+
* if one was configured. Resets the iteration cursor so the next call to
59+
* NextSynchronizer returns the FDv1 fallback. If no FDv1 fallback factory
60+
* was configured, every factory is left blocked.
61+
*/
62+
void SwitchToFDv1Fallback();
63+
5764
/**
5865
* Returns true if the currently tracked factory is the first Available
5966
* factory in the list. Returns false if no factory is currently tracked.
@@ -73,9 +80,8 @@ class SourceManager {
7380
[[nodiscard]] std::size_t SynchronizerCount() const;
7481

7582
/**
76-
* Returns true if the currently tracked factory was configured as the
77-
* FDv1 fallback synchronizer. Always false until the FDv1 fallback
78-
* directive is implemented.
83+
* Returns true if the currently tracked factory is the FDv1 fallback
84+
* synchronizer.
7985
*/
8086
[[nodiscard]] bool IsCurrentSynchronizerFDv1Fallback() const;
8187

libs/server-sdk/tests/fdv2_data_system_test.cpp

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,15 @@ class OneShotSynchronizerFactory : public IFDv2SynchronizerFactory {
145145
std::unique_ptr<IFDv2Synchronizer> source_;
146146
};
147147

148+
class FDv1FallbackOneShotFactory : public OneShotSynchronizerFactory {
149+
public:
150+
explicit FDv1FallbackOneShotFactory(
151+
std::unique_ptr<IFDv2Synchronizer> source)
152+
: OneShotSynchronizerFactory(std::move(source)) {}
153+
154+
bool IsFDv1Fallback() const override { return true; }
155+
};
156+
148157
// Returns each pre-supplied source in order on successive Build() calls.
149158
// Returns nullptr once the supply is exhausted. Used in tests that exercise
150159
// wrap-around or recovery, where the same factory is built more than once.
@@ -1090,6 +1099,129 @@ TEST(FDv2DataSystemTest, SingleSynchronizerHasNoFallbackArmed) {
10901099
status_manager.Status().State());
10911100
}
10921101

1102+
// ============================================================================
1103+
// FDv1 fallback directive
1104+
// ============================================================================
1105+
1106+
TEST(FDv2DataSystemTest, SynchronizerFdv1FlagSwitchesToFdv1Adapter) {
1107+
auto logger = MakeNullLogger();
1108+
boost::asio::io_context ioc;
1109+
data_components::DataSourceStatusManager status_manager;
1110+
1111+
// FDv2 synchronizer emits a ChangeSet with the directive, then closes.
1112+
auto fdv2_sync =
1113+
std::make_unique<MockSynchronizer>(std::vector<FDv2SourceResult>{[]() {
1114+
FDv2SourceResult r{FDv2SourceResult::ChangeSet{
1115+
data_model::ChangeSet<ChangeSetData>{
1116+
data_model::ChangeSetType::kNone,
1117+
{},
1118+
data_model::Selector{}}}};
1119+
r.fdv1_fallback = true;
1120+
return r;
1121+
}()});
1122+
auto fdv2_factory =
1123+
std::make_unique<OneShotSynchronizerFactory>(std::move(fdv2_sync));
1124+
1125+
// FDv1 adapter returns Shutdown when reached, ending orchestration.
1126+
auto fdv1_sync =
1127+
std::make_unique<MockSynchronizer>(std::vector<FDv2SourceResult>{});
1128+
auto fdv1_factory =
1129+
std::make_unique<FDv1FallbackOneShotFactory>(std::move(fdv1_sync));
1130+
auto* fdv1_factory_ptr = fdv1_factory.get();
1131+
1132+
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> synchronizers;
1133+
synchronizers.push_back(std::move(fdv2_factory));
1134+
synchronizers.push_back(std::move(fdv1_factory));
1135+
1136+
FDv2DataSystem ds({}, std::move(synchronizers),
1137+
/*fallback_condition_factory=*/nullptr,
1138+
/*recovery_condition_factory=*/nullptr,
1139+
ioc.get_executor(), &status_manager, logger);
1140+
ds.Initialize();
1141+
ioc.run();
1142+
1143+
EXPECT_EQ(1, fdv1_factory_ptr->build_count_);
1144+
}
1145+
1146+
TEST(FDv2DataSystemTest, SynchronizerFdv1FlagWithoutAdapterTransitionsOff) {
1147+
auto logger = MakeNullLogger();
1148+
boost::asio::io_context ioc;
1149+
data_components::DataSourceStatusManager status_manager;
1150+
1151+
auto fdv2_sync =
1152+
std::make_unique<MockSynchronizer>(std::vector<FDv2SourceResult>{[]() {
1153+
FDv2SourceResult r{
1154+
FDv2SourceResult::Interrupted{FDv2SourceResult::ErrorInfo{
1155+
FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse,
1156+
/*status_code=*/418, "directive",
1157+
std::chrono::system_clock::now()}}};
1158+
r.fdv1_fallback = true;
1159+
return r;
1160+
}()});
1161+
auto fdv2_factory =
1162+
std::make_unique<OneShotSynchronizerFactory>(std::move(fdv2_sync));
1163+
1164+
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> synchronizers;
1165+
synchronizers.push_back(std::move(fdv2_factory));
1166+
1167+
FDv2DataSystem ds({}, std::move(synchronizers),
1168+
/*fallback_condition_factory=*/nullptr,
1169+
/*recovery_condition_factory=*/nullptr,
1170+
ioc.get_executor(), &status_manager, logger);
1171+
ds.Initialize();
1172+
ioc.run();
1173+
1174+
EXPECT_EQ(DataSourceStatus::DataSourceState::kOff,
1175+
status_manager.Status().State());
1176+
}
1177+
1178+
TEST(FDv2DataSystemTest, InitializerFdv1FlagSwitchesToFdv1Adapter) {
1179+
auto logger = MakeNullLogger();
1180+
boost::asio::io_context ioc;
1181+
data_components::DataSourceStatusManager status_manager;
1182+
1183+
// Initializer returns Interrupted with the directive set.
1184+
FDv2SourceResult init_result{
1185+
FDv2SourceResult::Interrupted{FDv2SourceResult::ErrorInfo{
1186+
FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse,
1187+
/*status_code=*/418, "directive",
1188+
std::chrono::system_clock::now()}}};
1189+
init_result.fdv1_fallback = true;
1190+
auto initializer =
1191+
std::make_unique<MockInitializer>(std::move(init_result));
1192+
1193+
std::vector<std::unique_ptr<IFDv2InitializerFactory>> initializers;
1194+
initializers.push_back(
1195+
std::make_unique<OneShotInitializerFactory>(std::move(initializer)));
1196+
1197+
auto fdv2_sync =
1198+
std::make_unique<MockSynchronizer>(std::vector<FDv2SourceResult>{});
1199+
auto fdv2_factory =
1200+
std::make_unique<OneShotSynchronizerFactory>(std::move(fdv2_sync));
1201+
auto* fdv2_factory_ptr = fdv2_factory.get();
1202+
1203+
auto fdv1_sync =
1204+
std::make_unique<MockSynchronizer>(std::vector<FDv2SourceResult>{});
1205+
auto fdv1_factory =
1206+
std::make_unique<FDv1FallbackOneShotFactory>(std::move(fdv1_sync));
1207+
auto* fdv1_factory_ptr = fdv1_factory.get();
1208+
1209+
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> synchronizers;
1210+
synchronizers.push_back(std::move(fdv2_factory));
1211+
synchronizers.push_back(std::move(fdv1_factory));
1212+
1213+
FDv2DataSystem ds(std::move(initializers), std::move(synchronizers),
1214+
/*fallback_condition_factory=*/nullptr,
1215+
/*recovery_condition_factory=*/nullptr,
1216+
ioc.get_executor(), &status_manager, logger);
1217+
ds.Initialize();
1218+
ioc.run();
1219+
1220+
// FDv2 synchronizer was skipped; FDv1 adapter was built and ran.
1221+
EXPECT_EQ(0, fdv2_factory_ptr->build_count_);
1222+
EXPECT_EQ(1, fdv1_factory_ptr->build_count_);
1223+
}
1224+
10931225
// ============================================================================
10941226
// Destruction protocol: in-flight orchestration
10951227
// ============================================================================

libs/server-sdk/tests/source_manager_test.cpp

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ class CountingFactory : public IFDv2SynchronizerFactory {
4343
int build_count = 0;
4444
};
4545

46+
class FDv1FallbackFactory : public CountingFactory {
47+
public:
48+
bool IsFDv1Fallback() const override { return true; }
49+
};
50+
4651
} // namespace
4752

4853
TEST(SourceManagerTest, EmptyManagerReportsZeroAvailable) {
@@ -176,7 +181,7 @@ TEST(SourceManagerTest, ResetSourceIndexSkipsBlockedFirstFactory) {
176181
EXPECT_EQ(1, f1_ptr->build_count);
177182
}
178183

179-
TEST(SourceManagerTest, IsCurrentSynchronizerFDv1FallbackAlwaysFalse) {
184+
TEST(SourceManagerTest, IsCurrentSynchronizerFDv1FallbackFalseForFDv2Factory) {
180185
auto f0 = std::make_unique<CountingFactory>();
181186
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> factories;
182187
factories.push_back(std::move(f0));
@@ -185,3 +190,68 @@ TEST(SourceManagerTest, IsCurrentSynchronizerFDv1FallbackAlwaysFalse) {
185190
mgr.NextSynchronizer();
186191
EXPECT_FALSE(mgr.IsCurrentSynchronizerFDv1Fallback());
187192
}
193+
194+
TEST(SourceManagerTest, FDv1FallbackFactoryStartsBlockedAndIsSkipped) {
195+
auto fdv2 = std::make_unique<CountingFactory>();
196+
auto fdv1 = std::make_unique<FDv1FallbackFactory>();
197+
auto* fdv1_ptr = fdv1.get();
198+
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> factories;
199+
factories.push_back(std::move(fdv2));
200+
factories.push_back(std::move(fdv1));
201+
SourceManager mgr(std::move(factories));
202+
203+
EXPECT_EQ(1u, mgr.AvailableSynchronizerCount());
204+
mgr.NextSynchronizer();
205+
EXPECT_FALSE(mgr.IsCurrentSynchronizerFDv1Fallback());
206+
EXPECT_EQ(0, fdv1_ptr->build_count);
207+
}
208+
209+
TEST(SourceManagerTest, SwitchToFDv1FallbackBlocksFDv2AndUnblocksFDv1) {
210+
auto fdv2 = std::make_unique<CountingFactory>();
211+
auto fdv1 = std::make_unique<FDv1FallbackFactory>();
212+
auto* fdv1_ptr = fdv1.get();
213+
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> factories;
214+
factories.push_back(std::move(fdv2));
215+
factories.push_back(std::move(fdv1));
216+
SourceManager mgr(std::move(factories));
217+
218+
mgr.SwitchToFDv1Fallback();
219+
220+
EXPECT_EQ(1u, mgr.AvailableSynchronizerCount());
221+
auto sync = mgr.NextSynchronizer();
222+
ASSERT_NE(sync, nullptr);
223+
EXPECT_EQ(1, fdv1_ptr->build_count);
224+
EXPECT_TRUE(mgr.IsCurrentSynchronizerFDv1Fallback());
225+
}
226+
227+
TEST(SourceManagerTest, SwitchToFDv1FallbackWithoutAdapterBlocksEverything) {
228+
auto fdv2 = std::make_unique<CountingFactory>();
229+
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> factories;
230+
factories.push_back(std::move(fdv2));
231+
SourceManager mgr(std::move(factories));
232+
233+
mgr.SwitchToFDv1Fallback();
234+
235+
EXPECT_EQ(0u, mgr.AvailableSynchronizerCount());
236+
EXPECT_EQ(nullptr, mgr.NextSynchronizer());
237+
}
238+
239+
TEST(SourceManagerTest, SwitchToFDv1FallbackUnblocksPreviouslyBlockedFDv2) {
240+
auto fdv2 = std::make_unique<CountingFactory>();
241+
auto fdv1 = std::make_unique<FDv1FallbackFactory>();
242+
auto* fdv1_ptr = fdv1.get();
243+
std::vector<std::unique_ptr<IFDv2SynchronizerFactory>> factories;
244+
factories.push_back(std::move(fdv2));
245+
factories.push_back(std::move(fdv1));
246+
SourceManager mgr(std::move(factories));
247+
248+
mgr.NextSynchronizer();
249+
mgr.BlockCurrentSynchronizer();
250+
mgr.SwitchToFDv1Fallback();
251+
252+
EXPECT_EQ(1u, mgr.AvailableSynchronizerCount());
253+
auto sync = mgr.NextSynchronizer();
254+
ASSERT_NE(sync, nullptr);
255+
EXPECT_EQ(1, fdv1_ptr->build_count);
256+
EXPECT_TRUE(mgr.IsCurrentSynchronizerFDv1Fallback());
257+
}

0 commit comments

Comments
 (0)