Skip to content

Commit d154cbe

Browse files
committed
fix: server FDv2 goodbye behavior and orchestration polish
1 parent 5909036 commit d154cbe

6 files changed

Lines changed: 346 additions & 50 deletions

File tree

libs/server-sdk/src/data_components/change_notifier/change_notifier.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ namespace launchdarkly::server_side::data_components {
1010

1111
namespace {
1212

13+
// Lets std::visit dispatch to a different lambda per variant alternative.
1314
template <class... Ts>
1415
struct overloaded : Ts... {
1516
using Ts::operator()...;

libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp

Lines changed: 51 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include <boost/asio/post.hpp>
66

7+
#include <cassert>
78
#include <chrono>
89
#include <utility>
910
#include <variant>
@@ -12,6 +13,7 @@ namespace launchdarkly::server_side::data_systems {
1213

1314
namespace {
1415

16+
// Lets std::visit dispatch to a different lambda per variant alternative.
1517
template <class... Ts>
1618
struct overloaded : Ts... {
1719
using Ts::operator()...;
@@ -39,6 +41,7 @@ FDv2DataSystem::FDv2DataSystem(
3941
status_manager_(status_manager),
4042
store_(),
4143
change_notifier_(store_, store_),
44+
initialize_called_(false),
4245
closed_(false),
4346
selector_(),
4447
initializer_index_(0),
@@ -88,6 +91,9 @@ std::string const& FDv2DataSystem::Identity() const {
8891
}
8992

9093
void FDv2DataSystem::Initialize() {
94+
bool const already_called = initialize_called_.exchange(true);
95+
assert(!already_called && "Initialize() must be called at most once");
96+
9197
LD_LOG(logger_, LogLevel::kInfo) << Identity() << ": starting";
9298
if (initializer_factories_.empty() && synchronizer_factories_.empty()) {
9399
// Offline mode: empty store is the canonical state.
@@ -98,8 +104,6 @@ void FDv2DataSystem::Initialize() {
98104
}
99105

100106
void FDv2DataSystem::RunNextInitializer() {
101-
auto future = async::MakeFuture(data_interfaces::FDv2SourceResult{
102-
data_interfaces::FDv2SourceResult::Shutdown{}});
103107
bool exhausted = false;
104108
{
105109
std::lock_guard<std::mutex> lock(mutex_);
@@ -111,22 +115,21 @@ void FDv2DataSystem::RunNextInitializer() {
111115
} else {
112116
auto& factory = initializer_factories_[initializer_index_++];
113117
active_initializer_ = factory->Build();
114-
future = active_initializer_->Run();
118+
active_initializer_->Run().Then(
119+
[this](data_interfaces::FDv2SourceResult const& result)
120+
-> std::monostate {
121+
OnInitializerResult(result);
122+
return {};
123+
},
124+
[ioc = ioc_](async::Continuation<void()> work) {
125+
boost::asio::post(ioc, std::move(work));
126+
});
115127
}
116128
}
117129

118130
if (exhausted) {
119131
StartSynchronizers();
120-
return;
121132
}
122-
123-
std::move(future).Then(
124-
[this](
125-
data_interfaces::FDv2SourceResult const& result) -> std::monostate {
126-
OnInitializerResult(result);
127-
return {};
128-
},
129-
async::kInlineExecutor);
130133
}
131134

132135
void FDv2DataSystem::OnInitializerResult(
@@ -164,13 +167,12 @@ void FDv2DataSystem::OnInitializerResult(
164167
te.error.Kind(), te.error.Message());
165168
},
166169
[&](Result::Goodbye const&) {
167-
LD_LOG(logger_, LogLevel::kWarn)
168-
<< Identity()
169-
<< ": initializer received unexpected goodbye";
170+
LD_LOG(logger_, LogLevel::kDebug)
171+
<< Identity() << ": ignoring goodbye from initializer";
170172
},
171173
[&](Result::Timeout const&) {
172-
LD_LOG(logger_, LogLevel::kWarn)
173-
<< Identity() << ": initializer timed out (unexpected)";
174+
LD_LOG(logger_, LogLevel::kDebug)
175+
<< Identity() << ": ignoring timeout from initializer";
174176
},
175177
},
176178
result.value);
@@ -192,47 +194,56 @@ void FDv2DataSystem::OnInitializerResult(
192194

193195
void FDv2DataSystem::StartSynchronizers() {
194196
bool exhausted = false;
197+
bool cycled_synchronizers = false;
195198
{
196199
std::lock_guard<std::mutex> lock(mutex_);
197200
if (closed_) {
198201
return;
199202
}
200203
if (synchronizer_index_ >= synchronizer_factories_.size()) {
201204
exhausted = true;
205+
cycled_synchronizers = synchronizer_index_ > 0;
202206
} else {
203207
auto& factory = synchronizer_factories_[synchronizer_index_++];
204208
active_synchronizer_ = factory->Build();
205209
}
206210
}
207211

208212
if (exhausted) {
209-
LD_LOG(logger_, LogLevel::kWarn)
210-
<< Identity() << ": no synchronizers available";
213+
// kOff when we can't continue updating; init-only with data stays
214+
// kValid.
215+
if (cycled_synchronizers || !store_.Initialized()) {
216+
std::string const message =
217+
cycled_synchronizers
218+
? "all data source acquisition methods have been exhausted"
219+
: "all initializers exhausted and no synchronizers "
220+
"configured";
221+
LD_LOG(logger_, LogLevel::kWarn) << Identity() << ": " << message;
222+
status_manager_->SetState(
223+
DataSourceStatus::DataSourceState::kOff,
224+
DataSourceStatus::ErrorInfo::ErrorKind::kUnknown, message);
225+
}
211226
return;
212227
}
213228

214229
RunSynchronizerNext();
215230
}
216231

217232
void FDv2DataSystem::RunSynchronizerNext() {
218-
auto future = async::MakeFuture(data_interfaces::FDv2SourceResult{
219-
data_interfaces::FDv2SourceResult::Shutdown{}});
220-
{
221-
std::lock_guard<std::mutex> lock(mutex_);
222-
if (closed_ || !active_synchronizer_) {
223-
return;
224-
}
225-
future =
226-
active_synchronizer_->Next(kSynchronizerNextTimeout, selector_);
233+
std::lock_guard<std::mutex> lock(mutex_);
234+
if (closed_ || !active_synchronizer_) {
235+
return;
227236
}
228-
229-
std::move(future).Then(
230-
[this](
231-
data_interfaces::FDv2SourceResult const& result) -> std::monostate {
232-
OnSynchronizerResult(result);
233-
return {};
234-
},
235-
async::kInlineExecutor);
237+
active_synchronizer_->Next(kSynchronizerNextTimeout, selector_)
238+
.Then(
239+
[this](data_interfaces::FDv2SourceResult const& result)
240+
-> std::monostate {
241+
OnSynchronizerResult(result);
242+
return {};
243+
},
244+
[ioc = ioc_](async::Continuation<void()> work) {
245+
boost::asio::post(ioc, std::move(work));
246+
});
236247
}
237248

238249
void FDv2DataSystem::OnSynchronizerResult(
@@ -266,10 +277,11 @@ void FDv2DataSystem::OnSynchronizerResult(
266277
advance = true;
267278
},
268279
[&](Result::Goodbye const& gb) {
269-
LD_LOG(logger_, LogLevel::kInfo)
270-
<< Identity() << ": synchronizer goodbye"
280+
// The synchronizer handles goodbye internally (reconnects);
281+
// the orchestrator just loops on the same source.
282+
LD_LOG(logger_, LogLevel::kDebug)
283+
<< Identity() << ": ignoring goodbye from synchronizer"
271284
<< (gb.reason ? ": " + *gb.reason : "");
272-
advance = true;
273285
},
274286
[&](Result::Timeout const&) {
275287
LD_LOG(logger_, LogLevel::kDebug)

libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
#include <boost/asio/any_io_executor.hpp>
1414

15+
#include <atomic>
1516
#include <cstddef>
1617
#include <memory>
1718
#include <mutex>
@@ -105,7 +106,8 @@ namespace launchdarkly::server_side::data_systems {
105106
* | Synchronizer | ChangeSet -> apply, loop
106107
* | phase | Interrupted -> loop (source self-retries)
107108
* | M = 0, 1, 2, ... | Timeout -> loop
108-
* | | Goodbye/Term -> M += 1
109+
* | | Goodbye -> loop (source self-restarts)
110+
* | | TerminalError -> M += 1
109111
* | | Shutdown -> [Closed]
110112
* +-------------------+
111113
* |
@@ -121,9 +123,13 @@ namespace launchdarkly::server_side::data_systems {
121123
* kInterrupted on Interrupted / TerminalError
122124
* (filtered to kInitializing while still in
123125
* the initializer phase if not yet Valid).
124-
* kValid -> kInterrupted on errors; kOff in destructor.
126+
* kOff if all initializers exhaust without data
127+
* and no synchronizers are configured.
128+
* kValid -> kInterrupted on errors; kOff in destructor or
129+
* when synchronizers cycle through and exhaust.
125130
* kInterrupted -> kValid on next successful ChangeSet apply;
126-
* kOff in destructor.
131+
* kOff in destructor or on synchronizer
132+
* exhaustion.
127133
* kOff -> terminal.
128134
*/
129135
class FDv2DataSystem final : public data_interfaces::IDataSystem {
@@ -217,8 +223,10 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem {
217223
void Close();
218224

219225
// Orchestration-loop methods. Each step chains the next via Future::Then,
220-
// so at most one is in flight at a time. mutex_ guards shared state
221-
// against the destructor's Close() running concurrently with a callback.
226+
// so at most one step has a pending continuation at any time. mutex_
227+
// provides mutual exclusion for orchestration state when callbacks run on
228+
// different executor threads, and lets Close() safely tear down active
229+
// sources from any thread.
222230

223231
void RunNextInitializer();
224232
void OnInitializerResult(data_interfaces::FDv2SourceResult result);
@@ -250,6 +258,9 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem {
250258
// safe.
251259
data_components::ChangeNotifier change_notifier_;
252260

261+
// Set by Initialize() to detect repeat or concurrent calls.
262+
std::atomic_bool initialize_called_;
263+
253264
// Orchestration state, guarded by mutex_.
254265
std::mutex mutex_;
255266
bool closed_;

libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,14 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) {
210210
<< r.reason.value_or("") << "'.";
211211
Notify(FDv2SourceResult{
212212
FDv2SourceResult::Goodbye{r.reason, false}});
213+
// Drop the current connection and reconnect; the protocol
214+
// handler is reset so the new connection starts in a clean
215+
// state.
216+
protocol_handler_.Reset();
217+
std::lock_guard lock(mutex_);
218+
if (sse_client_) {
219+
sse_client_->async_restart("FDv2 goodbye received");
220+
}
213221
} else if constexpr (std::is_same_v<T,
214222
FDv2ProtocolHandler::Error>) {
215223
if (r.kind == FDv2ProtocolHandler::Error::Kind::kServerError) {

0 commit comments

Comments
 (0)