Skip to content

Commit 1dd9793

Browse files
authored
refactor: implement fdv2 polling initializer / synchronizer (#519)
This implements the C++ polling initializer and synchronizer. I have rewritten this code substantially since the first round of reviews, mostly by hand. Changes: * Implements `FDv2PollingInitializer` and `FDv2PollingSynchronizer`. * Updates the `IFDv2Initializer` and `IFDv2Synchronizer` interfaces to be async. * Makes it safe to `delete` both classes without joining any threads. * Updates `AsioRequester::Request` to use a more modern style that is more flexible in how it can be used. * This will be needed for cancellation eventually anyway. * Marks a method in `AsioRequester` as `const` to make it more obvious that it's thread-safe. * Adds a helper to create promisified ASIO timers. * Adds some minor Promise helpers to simplify the new code. Caveats: * I have not yet implemented any kind of cancellation on async operations. We should at least do this for timers at some point, but I'd rather defer it to a later PR. It's no longer a deadlock or correctness issue, but we _will_ get timeout timers slowly piling up in the background if requests are fast. Previous description: > The bulk of this code was generated by Claude based on the Java version, but I've gone through it line-by-line, and I think it makes sense. But I'm new to both FDv2 and ASIO, so I could be missing something. > > Probably the most controversial part is the decision from the previous PR to use `std::future` and a blocking call. If we decide we need Java-like conditions, or if we need the callers to be non-blocking, we could change these to use `asio::deferred` instead. But I don't think these changes require that yet. > <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Introduces new FDv2 polling data path and converts FDv2 source interfaces to `Future`-based async, which can affect orchestration flow and lifecycle/shutdown behavior. Also changes the Boost.Asio HTTP requester initiation style, so regressions could impact networking behavior and timeouts. > > **Overview** > Adds an **FDv2 polling implementation**: `FDv2PollingInitializer` performs a one-shot poll, and `FDv2PollingSynchronizer` repeatedly polls with minimum-interval enforcement and timeout/close handling using the internal `Promise/Future` utilities. > > Introduces a shared `FDv2ProtocolHandler` state machine that accumulates FDv2 wire events into a complete `FDv2ChangeSet` (or emits typed protocol/JSON/server errors and `Goodbye`), with accompanying unit tests. > > Refactors FDv2 source interfaces (`IFDv2Initializer::Run`, `IFDv2Synchronizer::Next`) to return `async::Future<FDv2SourceResult>` instead of blocking results, adds async helpers (`kInlineExecutor`, `MakeFuture`, `async::Delay`), and updates `AsioRequester::Request`/`Requester::Request` to be `const` and use `boost::asio::async_initiate`. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 4b02b67. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 4b40bb4 commit 1dd9793

19 files changed

Lines changed: 1597 additions & 65 deletions

libs/internal/include/launchdarkly/async/promise.hpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,10 @@ class Future {
437437
std::shared_ptr<PromiseInternal<T>> internal_;
438438
};
439439

440+
// An executor that runs work immediately on the calling thread. Pass this
441+
// to Then() when no specific thread is required for the continuation.
442+
inline auto const kInlineExecutor = [](Continuation<void()> f) { f(); };
443+
440444
// WhenAll takes a variadic list of Futures (each with potentially different
441445
// value types) and returns a Future<std::monostate> that resolves once all
442446
// of the input futures have resolved. The result carries no value; callers
@@ -528,4 +532,16 @@ Future<std::size_t> WhenAny(Future<Ts>... futures) {
528532
return result;
529533
}
530534

535+
// MakeFuture returns an already-resolved Future<T>. Useful in flattening Then
536+
// continuations where some branches produce a result immediately and others
537+
// return a Future, requiring a uniform Future<T> return type across all
538+
// branches.
539+
template <typename T>
540+
Future<T> MakeFuture(T value) {
541+
Promise<T> p;
542+
auto f = p.GetFuture();
543+
p.Resolve(std::move(value));
544+
return f;
545+
}
546+
531547
} // namespace launchdarkly::async
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#pragma once
2+
3+
#include <launchdarkly/async/promise.hpp>
4+
5+
#include <boost/asio/any_io_executor.hpp>
6+
#include <boost/asio/error.hpp>
7+
#include <boost/asio/steady_timer.hpp>
8+
#include <boost/system/error_code.hpp>
9+
10+
#include <chrono>
11+
#include <memory>
12+
13+
namespace launchdarkly::async {
14+
15+
// Returns a Future<bool> that resolves once the given duration elapses.
16+
// The future resolves with true if the timer fired normally, or false if
17+
// the timer was cancelled before it expired.
18+
template <typename Rep, typename Period>
19+
Future<bool> Delay(boost::asio::any_io_executor executor,
20+
std::chrono::duration<Rep, Period> duration) {
21+
auto timer = std::make_shared<boost::asio::steady_timer>(executor);
22+
timer->expires_after(duration);
23+
Promise<bool> promise;
24+
auto future = promise.GetFuture();
25+
timer->async_wait([p = std::move(promise),
26+
timer](boost::system::error_code code) mutable {
27+
p.Resolve(code != boost::asio::error::operation_aborted);
28+
});
29+
return future;
30+
}
31+
32+
} // namespace launchdarkly::async
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
#pragma once
2+
3+
#include <launchdarkly/data_model/fdv2_change.hpp>
4+
#include <launchdarkly/serialization/json_fdv2_events.hpp>
5+
6+
#include <boost/json/value.hpp>
7+
8+
#include <string_view>
9+
#include <variant>
10+
#include <vector>
11+
12+
namespace launchdarkly {
13+
14+
/**
15+
* Protocol state machine for the FDv2 wire format.
16+
*
17+
* Accumulates put-object and delete-object events between a server-intent
18+
* and payload-transferred event, then emits a complete FDv2ChangeSet.
19+
*
20+
* Shared between the polling and streaming synchronizers.
21+
*/
22+
class FDv2ProtocolHandler {
23+
public:
24+
/**
25+
* Typed error returned by HandleEvent. Carries the original underlying
26+
* error context rather than converting to a plain string.
27+
*/
28+
struct Error {
29+
enum class Kind {
30+
kJsonError, // Failed to deserialise an event's data field.
31+
kProtocolError, // Out-of-order or unexpected event.
32+
kServerError, // Server sent a valid 'error' event.
33+
};
34+
35+
Kind kind;
36+
std::string message;
37+
38+
/**
39+
* Set for kJsonError when the tl::expected parse returned an error.
40+
* Nullopt when parse succeeded but the data value was null.
41+
*/
42+
std::optional<JsonError> json_error;
43+
44+
/**
45+
* Set for kServerError: the full wire error including id and reason.
46+
*/
47+
std::optional<FDv2Error> server_error;
48+
49+
/** JSON deserialisation failed — carries the original JsonError. */
50+
static Error JsonParseError(JsonError err, std::string msg) {
51+
return {Kind::kJsonError, std::move(msg), err, std::nullopt};
52+
}
53+
/** Parse succeeded but data was null — no underlying JsonError. */
54+
static Error JsonParseError(std::string msg) {
55+
return {Kind::kJsonError, std::move(msg), std::nullopt,
56+
std::nullopt};
57+
}
58+
/** Out-of-order or unexpected protocol event. */
59+
static Error ProtocolError(std::string msg) {
60+
return {Kind::kProtocolError, std::move(msg), std::nullopt,
61+
std::nullopt};
62+
}
63+
/** Server sent a well-formed 'error' event. */
64+
static Error ServerError(FDv2Error err) {
65+
return {Kind::kServerError, err.reason, std::nullopt,
66+
std::move(err)};
67+
}
68+
};
69+
70+
/**
71+
* Result of handling a single FDv2 event:
72+
* - monostate: no output yet (accumulating, heartbeat, or unknown event)
73+
* - FDv2ChangeSet: complete changeset ready to apply
74+
* - Error: protocol error (JSON parse failure, protocol violation, or
75+
* server-sent error event)
76+
* - Goodbye: server is closing; caller should rotate sources
77+
*/
78+
using Result =
79+
std::variant<std::monostate, data_model::FDv2ChangeSet, Error, Goodbye>;
80+
81+
/**
82+
* Process one FDv2 event.
83+
*
84+
* @param event_type The event type string (e.g. "server-intent",
85+
* "put-object", "payload-transferred").
86+
* @param data The parsed JSON value for the event's data field.
87+
* @return A Result indicating what (if anything) the caller
88+
* should act on.
89+
*/
90+
Result HandleEvent(std::string_view event_type,
91+
boost::json::value const& data);
92+
93+
/**
94+
* Reset accumulated state. Call on reconnect before processing new events.
95+
*/
96+
void Reset();
97+
98+
FDv2ProtocolHandler() = default;
99+
100+
private:
101+
enum class State { kInactive, kFull, kPartial };
102+
103+
Result HandleServerIntent(boost::json::value const& data);
104+
Result HandlePutObject(boost::json::value const& data);
105+
Result HandleDeleteObject(boost::json::value const& data);
106+
Result HandlePayloadTransferred(boost::json::value const& data);
107+
Result HandleError(boost::json::value const& data);
108+
Result HandleGoodbye(boost::json::value const& data);
109+
110+
State state_ = State::kInactive;
111+
std::vector<data_model::FDv2Change> changes_;
112+
};
113+
114+
} // namespace launchdarkly

libs/internal/include/launchdarkly/network/asio_requester.hpp

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -282,23 +282,17 @@ class AsioRequester {
282282
}
283283

284284
template <typename CompletionToken>
285-
auto Request(HttpRequest request, CompletionToken&& token) {
286-
// TODO: Clang-tidy wants to pass the request by reference, but I am not
287-
// confident that lifetime would make sense.
288-
289-
namespace asio = boost::asio;
290-
namespace system = boost::system;
291-
292-
using Sig = void(HttpResult result);
293-
using Result = asio::async_result<std::decay_t<CompletionToken>, Sig>;
294-
using Handler = typename Result::completion_handler_type;
295-
296-
Handler handler(std::forward<decltype(token)>(token));
297-
Result result(handler);
298-
299-
InnerRequest(net::make_strand(ctx_), request, std::move(handler), 0);
300-
301-
return result.get();
285+
auto Request(HttpRequest request, CompletionToken&& token) const {
286+
return boost::asio::async_initiate<CompletionToken, void(HttpResult)>(
287+
[this](auto handler, HttpRequest req) {
288+
InnerRequest(
289+
net::make_strand(ctx_), std::move(req),
290+
[h = std::move(handler)](HttpResult result) mutable {
291+
std::move(h)(std::move(result));
292+
},
293+
0);
294+
},
295+
token, std::move(request));
302296
}
303297

304298
private:
@@ -313,7 +307,7 @@ class AsioRequester {
313307
void InnerRequest(boost::asio::any_io_executor exec,
314308
std::optional<HttpRequest> request,
315309
std::function<void(HttpResult)> callback,
316-
unsigned char redirect_count) {
310+
unsigned char redirect_count) const {
317311
if (redirect_count > kRedirectLimit) {
318312
boost::asio::post(exec, [callback, request]() mutable {
319313
callback(
@@ -336,7 +330,7 @@ class AsioRequester {
336330
redirect_count]() mutable {
337331
auto beast_request = MakeBeastRequest(*request);
338332

339-
const auto& properties = request->Properties();
333+
auto const& properties = request->Properties();
340334

341335
std::string service =
342336
request->Port().value_or(request->Https() ? "https" : "http");
Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#pragma once
22

3-
#include "http_requester.hpp"
4-
#include <launchdarkly/config/shared/built/http_properties.hpp>
3+
#include <boost/asio/any_io_executor.hpp>
54
#include <functional>
5+
#include <launchdarkly/config/shared/built/http_properties.hpp>
66
#include <memory>
7-
#include <boost/asio/any_io_executor.hpp>
7+
#include "http_requester.hpp"
88

99
namespace launchdarkly::network {
1010

@@ -15,30 +15,32 @@ using TlsOptions = config::shared::built::TlsOptions;
1515
class IRequesterImpl;
1616

1717
/**
18-
* Requester provides HTTP request functionality using either CURL or Boost.Beast
19-
* depending on the LD_CURL_NETWORKING compile-time flag.
18+
* Requester provides HTTP request functionality using either CURL or
19+
* Boost.Beast depending on the LD_CURL_NETWORKING compile-time flag.
2020
*
2121
* When LD_CURL_NETWORKING is ON: Uses CurlRequester (CURL-based implementation)
22-
* When LD_CURL_NETWORKING is OFF: Uses AsioRequester (Boost.Beast-based implementation)
22+
* When LD_CURL_NETWORKING is OFF: Uses AsioRequester (Boost.Beast-based
23+
* implementation)
2324
*
24-
* The implementation choice is made at library compile-time and hidden from users
25-
* via the pimpl idiom to avoid ABI issues.
25+
* The implementation choice is made at library compile-time and hidden from
26+
* users via the pimpl idiom to avoid ABI issues.
2627
*/
2728
class Requester {
28-
public:
29+
public:
2930
Requester(net::any_io_executor ctx, TlsOptions const& tls_options);
3031
~Requester();
3132

3233
// Move-only type
3334
Requester(Requester&&) noexcept;
3435
Requester& operator=(Requester&&) noexcept;
35-
Requester(const Requester&) = delete;
36-
Requester& operator=(const Requester&) = delete;
36+
Requester(Requester const&) = delete;
37+
Requester& operator=(Requester const&) = delete;
3738

38-
void Request(HttpRequest request, std::function<void(const HttpResult&)> cb);
39+
void Request(HttpRequest request,
40+
std::function<void(HttpResult const&)> cb) const;
3941

40-
private:
42+
private:
4143
std::unique_ptr<IRequesterImpl> impl_;
4244
};
4345

44-
} // namespace launchdarkly::network
46+
} // namespace launchdarkly::network

libs/internal/src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ set(INTERNAL_SOURCES
3636
serialization/value_mapping.cpp
3737
serialization/json_evaluation_result.cpp
3838
serialization/json_fdv2_events.cpp
39+
fdv2_protocol_handler.cpp
3940
serialization/json_sdk_data_set.cpp
4041
serialization/json_segment.cpp
4142
serialization/json_primitives.cpp

0 commit comments

Comments
 (0)