Skip to content

Commit 1833a9c

Browse files
committed
feat: propagate FDv1 fallback directive from FDv2 polling and streaming sources
1 parent 023a6a9 commit 1833a9c

9 files changed

Lines changed: 290 additions & 94 deletions

libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,20 @@ struct FDv2SourceResult {
2222
*/
2323
struct ChangeSet {
2424
data_model::ChangeSet<ChangeSetData> change_set;
25-
/** If true, the server signaled that the client should fall back to
26-
* FDv1. */
27-
bool fdv1_fallback;
2825
};
2926

3027
/**
3128
* A transient error occurred; the source may recover.
3229
*/
3330
struct Interrupted {
3431
ErrorInfo error;
35-
bool fdv1_fallback;
3632
};
3733

3834
/**
3935
* A non-recoverable error occurred; the source should not be retried.
4036
*/
4137
struct TerminalError {
4238
ErrorInfo error;
43-
bool fdv1_fallback;
4439
};
4540

4641
/**
@@ -53,13 +48,18 @@ struct FDv2SourceResult {
5348
*/
5449
struct Goodbye {
5550
std::optional<std::string> reason;
56-
bool fdv1_fallback;
5751
};
5852

5953
using Value =
6054
std::variant<ChangeSet, Interrupted, TerminalError, Shutdown, Goodbye>;
6155

6256
Value value;
57+
58+
/**
59+
* If true, the server signaled (via the X-LD-FD-Fallback response header)
60+
* that the client should fall back to FDv1.
61+
*/
62+
bool fdv1_fallback = false;
6363
};
6464

6565
} // namespace launchdarkly::server_side::data_interfaces

libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
#include <launchdarkly/network/http_error_messages.hpp>
55
#include <launchdarkly/server_side/config/builders/all_builders.hpp>
66

7+
#include <boost/algorithm/string/predicate.hpp>
78
#include <boost/json.hpp>
89
#include <boost/url/parse.hpp>
910
#include <boost/url/url.hpp>
1011

1112
namespace launchdarkly::server_side::data_systems {
1213

1314
static char const* const kFDv2PollPath = "/sdk/poll";
15+
static char const* const kFDv1FallbackHeader = "X-LD-FD-Fallback";
1416

1517
static char const* const kErrorParsingBody =
1618
"Could not parse FDv2 polling response";
@@ -32,6 +34,12 @@ static ErrorInfo MakeError(ErrorKind kind,
3234
std::chrono::system_clock::now()};
3335
}
3436

37+
static bool ReadFDv1FallbackDirective(
38+
network::HttpResult::HeadersType const& headers) {
39+
auto const it = headers.find(kFDv1FallbackHeader);
40+
return it != headers.end() && boost::iequals(it->second, "true");
41+
}
42+
3543
network::HttpRequest MakeFDv2PollRequest(
3644
config::built::ServiceEndpoints const& endpoints,
3745
config::built::HttpProperties const& http_properties,
@@ -88,15 +96,13 @@ static FDv2SourceResult ParseFDv2PollEvents(
8896
auto typed = TranslateChangeSet(*change_set, logger);
8997
if (!typed) {
9098
return FDv2SourceResult{FDv2SourceResult::Interrupted{
91-
MakeError(ErrorKind::kInvalidData, 0, kErrorTranslation),
92-
false}};
99+
MakeError(ErrorKind::kInvalidData, 0, kErrorTranslation)}};
93100
}
94101
return FDv2SourceResult{
95-
FDv2SourceResult::ChangeSet{std::move(*typed), false}};
102+
FDv2SourceResult::ChangeSet{std::move(*typed)}};
96103
}
97104
if (auto* goodbye = std::get_if<Goodbye>(&result)) {
98-
return FDv2SourceResult{
99-
FDv2SourceResult::Goodbye{goodbye->reason, false}};
105+
return FDv2SourceResult{FDv2SourceResult::Goodbye{goodbye->reason}};
100106
}
101107
if (auto* error = std::get_if<FDv2ProtocolHandler::Error>(&result)) {
102108
if (error->kind == FDv2ProtocolHandler::Error::Kind::kServerError) {
@@ -107,16 +113,15 @@ static FDv2SourceResult ParseFDv2PollEvents(
107113
id.value_or("") + "' with reason: '" + error->message +
108114
"'. Automatic retry will occur.";
109115
return FDv2SourceResult{FDv2SourceResult::Interrupted{
110-
MakeError(ErrorKind::kErrorResponse, 0, std::move(msg)),
111-
false}};
116+
MakeError(ErrorKind::kErrorResponse, 0, std::move(msg))}};
112117
}
113118
return FDv2SourceResult{FDv2SourceResult::Interrupted{
114-
MakeError(ErrorKind::kInvalidData, 0, error->message), false}};
119+
MakeError(ErrorKind::kInvalidData, 0, error->message)}};
115120
}
116121
}
117122

118123
return FDv2SourceResult{FDv2SourceResult::Interrupted{
119-
MakeError(ErrorKind::kInvalidData, 0, kErrorIncompletePayload), false}};
124+
MakeError(ErrorKind::kInvalidData, 0, kErrorIncompletePayload)}};
120125
}
121126

122127
static FDv2SourceResult ParseFDv2PollResponse(
@@ -127,25 +132,25 @@ static FDv2SourceResult ParseFDv2PollResponse(
127132
auto parsed = boost::json::parse(body, ec);
128133
if (ec) {
129134
return FDv2SourceResult{FDv2SourceResult::Interrupted{
130-
MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody), false}};
135+
MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody)}};
131136
}
132137

133138
auto const* obj = parsed.if_object();
134139
if (!obj) {
135140
return FDv2SourceResult{FDv2SourceResult::Interrupted{
136-
MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody), false}};
141+
MakeError(ErrorKind::kInvalidData, 0, kErrorParsingBody)}};
137142
}
138143

139144
auto const* events_val = obj->if_contains("events");
140145
if (!events_val) {
141146
return FDv2SourceResult{FDv2SourceResult::Interrupted{
142-
MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), false}};
147+
MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents)}};
143148
}
144149

145150
auto const* events_arr = events_val->if_array();
146151
if (!events_arr) {
147152
return FDv2SourceResult{FDv2SourceResult::Interrupted{
148-
MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents), false}};
153+
MakeError(ErrorKind::kInvalidData, 0, kErrorMissingEvents)}};
149154
}
150155

151156
return ParseFDv2PollEvents(*events_arr, protocol_handler, logger);
@@ -161,24 +166,28 @@ data_interfaces::FDv2SourceResult HandleFDv2PollResponse(
161166
std::string error_msg = msg.has_value() ? *msg : "unknown error";
162167
LD_LOG(logger, LogLevel::kWarn) << identity << ": " << error_msg;
163168
return FDv2SourceResult{FDv2SourceResult::Interrupted{
164-
MakeError(ErrorKind::kNetworkError, 0, std::move(error_msg)),
165-
false}};
169+
MakeError(ErrorKind::kNetworkError, 0, std::move(error_msg))}};
166170
}
167171

172+
bool const fdv1_fallback = ReadFDv1FallbackDirective(res.Headers());
173+
168174
if (res.Status() == 304) {
169-
return FDv2SourceResult{FDv2SourceResult::ChangeSet{
170-
data_model::ChangeSet<data_interfaces::ChangeSetData>{
171-
data_model::ChangeSetType::kNone, {}, data_model::Selector{}},
172-
false}};
175+
return FDv2SourceResult{
176+
FDv2SourceResult::ChangeSet{
177+
data_model::ChangeSet<data_interfaces::ChangeSetData>{
178+
data_model::ChangeSetType::kNone,
179+
{},
180+
data_model::Selector{}}},
181+
fdv1_fallback};
173182
}
174183

175184
if (res.Status() == 200) {
176185
auto const& body = res.Body();
177186
if (!body) {
178-
return FDv2SourceResult{FDv2SourceResult::Interrupted{
179-
MakeError(ErrorKind::kInvalidData, 0,
180-
"polling response contained no body"),
181-
false}};
187+
return FDv2SourceResult{FDv2SourceResult::Interrupted{MakeError(
188+
ErrorKind::kInvalidData, 0,
189+
"polling response contained no body")},
190+
fdv1_fallback};
182191
}
183192

184193
auto result = ParseFDv2PollResponse(*body, protocol_handler, logger);
@@ -192,24 +201,27 @@ data_interfaces::FDv2SourceResult HandleFDv2PollResponse(
192201
<< identity << ": " << interrupted->error.Message();
193202
}
194203
}
204+
result.fdv1_fallback = fdv1_fallback;
195205
return result;
196206
}
197207

198208
if (network::IsRecoverableStatus(res.Status())) {
199209
std::string msg = network::ErrorForStatusCode(
200210
res.Status(), "FDv2 polling request", "will retry");
201211
LD_LOG(logger, LogLevel::kWarn) << identity << ": " << msg;
202-
return FDv2SourceResult{FDv2SourceResult::Interrupted{
203-
MakeError(ErrorKind::kErrorResponse, res.Status(), std::move(msg)),
204-
false}};
212+
return FDv2SourceResult{
213+
FDv2SourceResult::Interrupted{MakeError(
214+
ErrorKind::kErrorResponse, res.Status(), std::move(msg))},
215+
fdv1_fallback};
205216
}
206217

207218
std::string msg = network::ErrorForStatusCode(
208219
res.Status(), "FDv2 polling request", std::nullopt);
209220
LD_LOG(logger, LogLevel::kError) << identity << ": " << msg;
210-
return FDv2SourceResult{FDv2SourceResult::TerminalError{
211-
MakeError(ErrorKind::kErrorResponse, res.Status(), std::move(msg)),
212-
false}};
221+
return FDv2SourceResult{
222+
FDv2SourceResult::TerminalError{
223+
MakeError(ErrorKind::kErrorResponse, res.Status(), std::move(msg))},
224+
fdv1_fallback};
213225
}
214226

215227
} // namespace launchdarkly::server_side::data_systems

libs/server-sdk/src/data_systems/fdv2/polling_initializer.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ async::Future<FDv2SourceResult> FDv2PollingInitializer::Run() {
3737
FDv2SourceResult{FDv2SourceResult::TerminalError{
3838
ErrorInfo{ErrorInfo::ErrorKind::kUnknown, 0,
3939
"invalid polling endpoint URL",
40-
std::chrono::system_clock::now()},
41-
false}});
40+
std::chrono::system_clock::now()}}});
4241
}
4342

4443
// Promisify the callback-based HTTP request.

libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <launchdarkly/async/timer.hpp>
55

6+
#include <boost/algorithm/string/predicate.hpp>
67
#include <boost/json.hpp>
78
#include <boost/url/parse.hpp>
89
#include <boost/url/url.hpp>
@@ -67,8 +68,7 @@ void FDv2StreamingSynchronizer::State::EnsureStarted(
6768
<< kIdentity << ": could not parse streaming endpoint URL";
6869
Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{
6970
MakeError(ErrorKind::kNetworkError, 0,
70-
"could not parse streaming endpoint URL"),
71-
false}});
71+
"could not parse streaming endpoint URL")}});
7272
return;
7373
}
7474

@@ -116,6 +116,12 @@ void FDv2StreamingSynchronizer::State::EnsureStarted(
116116
s->OnConnect(req);
117117
}
118118
});
119+
client_builder.on_response(
120+
[weak](boost::beast::http::response_header<> const& headers) {
121+
if (auto s = weak.lock()) {
122+
s->OnResponse(headers);
123+
}
124+
});
119125
client_builder.receiver([weak](sse::Event const& event) {
120126
if (auto s = weak.lock()) {
121127
s->OnEvent(event);
@@ -137,10 +143,8 @@ void FDv2StreamingSynchronizer::State::EnsureStarted(
137143
// started_ intentionally left true: same reasoning as above.
138144
LD_LOG(logger_, LogLevel::kError)
139145
<< kIdentity << ": could not build SSE client";
140-
Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{
141-
MakeError(ErrorKind::kNetworkError, 0,
142-
"could not build SSE client"),
143-
false}});
146+
Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{MakeError(
147+
ErrorKind::kNetworkError, 0, "could not build SSE client")}});
144148
return;
145149
}
146150

@@ -169,6 +173,15 @@ void FDv2StreamingSynchronizer::State::OnConnect(HttpRequest* req) {
169173
req->target(u.encoded_target());
170174
}
171175

176+
void FDv2StreamingSynchronizer::State::OnResponse(
177+
HttpResponseHeader const& headers) {
178+
auto const it = headers.find("X-LD-FD-Fallback");
179+
bool const directive =
180+
it != headers.end() && boost::iequals(it->value(), "true");
181+
std::lock_guard lock(mutex_);
182+
latest_fdv1_fallback_ = directive;
183+
}
184+
172185
void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) {
173186
boost::system::error_code ec;
174187
auto data = boost::json::parse(event.data(), ec);
@@ -177,7 +190,7 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) {
177190
std::string msg = "could not parse FDv2 streaming event payload";
178191
LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg;
179192
Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{
180-
MakeError(ErrorKind::kInvalidData, 0, std::move(msg)), false}});
193+
MakeError(ErrorKind::kInvalidData, 0, std::move(msg))}});
181194
return;
182195
}
183196

@@ -195,21 +208,20 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) {
195208
"FDv2 streaming changeset could not be translated";
196209
LD_LOG(logger_, LogLevel::kError)
197210
<< kIdentity << ": " << msg;
198-
Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{
199-
MakeError(ErrorKind::kInvalidData, 0, std::move(msg)),
200-
false}});
211+
Notify(FDv2SourceResult{
212+
FDv2SourceResult::Interrupted{MakeError(
213+
ErrorKind::kInvalidData, 0, std::move(msg))}});
201214
return;
202215
}
203216
Notify(FDv2SourceResult{
204-
FDv2SourceResult::ChangeSet{std::move(*typed), false}});
217+
FDv2SourceResult::ChangeSet{std::move(*typed)}});
205218
} else if constexpr (std::is_same_v<T, Goodbye>) {
206219
LD_LOG(logger_, LogLevel::kInfo)
207220
<< kIdentity
208221
<< ": Goodbye was received from the LaunchDarkly "
209222
"connection with reason: '"
210223
<< r.reason.value_or("") << "'.";
211-
Notify(FDv2SourceResult{
212-
FDv2SourceResult::Goodbye{r.reason, false}});
224+
Notify(FDv2SourceResult{FDv2SourceResult::Goodbye{r.reason}});
213225
// Drop the current connection and reconnect; the protocol
214226
// handler is reset so the new connection starts in a clean
215227
// state.
@@ -229,15 +241,15 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) {
229241
"'. Automatic retry will occur.";
230242
LD_LOG(logger_, LogLevel::kInfo)
231243
<< kIdentity << ": " << msg;
232-
Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{
233-
MakeError(ErrorKind::kErrorResponse, 0, std::move(msg)),
234-
false}});
244+
Notify(FDv2SourceResult{
245+
FDv2SourceResult::Interrupted{MakeError(
246+
ErrorKind::kErrorResponse, 0, std::move(msg))}});
235247
return;
236248
}
237249
LD_LOG(logger_, LogLevel::kError)
238250
<< kIdentity << ": " << r.message;
239251
Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{
240-
MakeError(ErrorKind::kInvalidData, 0, r.message), false}});
252+
MakeError(ErrorKind::kInvalidData, 0, r.message)}});
241253
} else {
242254
static_assert(always_false_v<T>, "non-exhaustive visitor");
243255
}
@@ -253,31 +265,30 @@ void FDv2StreamingSynchronizer::State::OnError(sse::Error const& error) {
253265
if (sse::IsRecoverable(error)) {
254266
LD_LOG(logger_, LogLevel::kWarn) << kIdentity << ": " << msg;
255267
Notify(FDv2SourceResult{FDv2SourceResult::Interrupted{
256-
MakeError(ErrorKind::kNetworkError, 0, std::move(msg)), false}});
268+
MakeError(ErrorKind::kNetworkError, 0, std::move(msg))}});
257269
return;
258270
}
259271

260272
LD_LOG(logger_, LogLevel::kError) << kIdentity << ": " << msg;
261273

262274
if (auto const* client_error =
263275
std::get_if<sse::errors::UnrecoverableClientError>(&error)) {
264-
Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{
265-
MakeError(
266-
ErrorKind::kErrorResponse,
267-
static_cast<ErrorInfo::StatusCodeType>(client_error->status),
268-
std::move(msg)),
269-
false}});
276+
Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{MakeError(
277+
ErrorKind::kErrorResponse,
278+
static_cast<ErrorInfo::StatusCodeType>(client_error->status),
279+
std::move(msg))}});
270280
return;
271281
}
272282

273283
Notify(FDv2SourceResult{FDv2SourceResult::TerminalError{
274-
MakeError(ErrorKind::kNetworkError, 0, std::move(msg)), false}});
284+
MakeError(ErrorKind::kNetworkError, 0, std::move(msg))}});
275285
}
276286

277287
void FDv2StreamingSynchronizer::State::Notify(FDv2SourceResult result) {
278288
std::optional<async::Promise<FDv2SourceResult>> promise;
279289
{
280290
std::lock_guard lock(mutex_);
291+
result.fdv1_fallback = latest_fdv1_fallback_;
281292
if (pending_promise_) {
282293
promise = std::move(pending_promise_);
283294
pending_promise_.reset();

0 commit comments

Comments
 (0)