Skip to content

Commit 403bdff

Browse files
committed
feat: add FDv1AdapterSynchronizer wrapping IDataSynchronizer as IFDv2Synchronizer
1 parent 142b2ff commit 403bdff

4 files changed

Lines changed: 485 additions & 0 deletions

File tree

libs/server-sdk/src/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ target_sources(${LIBNAME}
6666
data_systems/fdv2/source_manager.cpp
6767
data_systems/fdv2/fdv2_data_system.hpp
6868
data_systems/fdv2/fdv2_data_system.cpp
69+
data_systems/fdv2/fdv1_adapter_synchronizer.hpp
70+
data_systems/fdv2/fdv1_adapter_synchronizer.cpp
6971
data_systems/background_sync/sources/streaming/streaming_data_source.hpp
7072
data_systems/background_sync/sources/streaming/streaming_data_source.cpp
7173
data_systems/background_sync/sources/streaming/event_handler.hpp
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
#include "fdv1_adapter_synchronizer.hpp"
2+
3+
#include <utility>
4+
5+
namespace launchdarkly::server_side::data_systems {
6+
7+
using data_interfaces::FDv2SourceResult;
8+
9+
// ----- State -----
10+
11+
bool FDv1AdapterSynchronizer::State::TryStart() {
12+
std::lock_guard lock(mutex_);
13+
if (started_ || closed_) {
14+
return false;
15+
}
16+
started_ = true;
17+
return true;
18+
}
19+
20+
bool FDv1AdapterSynchronizer::State::MarkClosed() {
21+
std::lock_guard lock(mutex_);
22+
closed_ = true;
23+
return started_;
24+
}
25+
26+
async::Future<FDv2SourceResult> FDv1AdapterSynchronizer::State::GetNext() {
27+
std::lock_guard lock(mutex_);
28+
if (!result_queue_.empty()) {
29+
auto result = std::move(result_queue_.front());
30+
result_queue_.pop_front();
31+
return async::MakeFuture(std::move(result));
32+
}
33+
return pending_promise_.emplace().GetFuture();
34+
}
35+
36+
void FDv1AdapterSynchronizer::State::ResolvePendingAsShutdown() {
37+
std::optional<async::Promise<FDv2SourceResult>> promise;
38+
{
39+
std::lock_guard lock(mutex_);
40+
if (pending_promise_) {
41+
promise = std::move(pending_promise_);
42+
pending_promise_.reset();
43+
}
44+
}
45+
if (promise) {
46+
promise->Resolve(FDv2SourceResult{FDv2SourceResult::Shutdown{}});
47+
}
48+
}
49+
50+
void FDv1AdapterSynchronizer::State::Notify(FDv2SourceResult result) {
51+
std::optional<async::Promise<FDv2SourceResult>> promise;
52+
{
53+
std::lock_guard lock(mutex_);
54+
if (closed_) {
55+
return;
56+
}
57+
if (pending_promise_) {
58+
promise = std::move(pending_promise_);
59+
pending_promise_.reset();
60+
} else {
61+
result_queue_.push_back(std::move(result));
62+
return;
63+
}
64+
}
65+
// Resolve outside the lock — Promise::Resolve may invoke inline
66+
// continuations that could call back into Notify or GetNext.
67+
promise->Resolve(std::move(result));
68+
}
69+
70+
// ----- ConvertingDestination -----
71+
72+
FDv1AdapterSynchronizer::ConvertingDestination::ConvertingDestination(
73+
std::weak_ptr<State> state)
74+
: state_(std::move(state)) {}
75+
76+
void FDv1AdapterSynchronizer::ConvertingDestination::Init(
77+
data_model::SDKDataSet data_set) {
78+
auto state = state_.lock();
79+
if (!state) {
80+
return;
81+
}
82+
data_interfaces::ChangeSetData changes;
83+
changes.reserve(data_set.flags.size() + data_set.segments.size());
84+
for (auto& [key, flag] : data_set.flags) {
85+
changes.push_back({key, std::move(flag)});
86+
}
87+
for (auto& [key, segment] : data_set.segments) {
88+
changes.push_back({key, std::move(segment)});
89+
}
90+
state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{
91+
data_model::ChangeSet<data_interfaces::ChangeSetData>{
92+
data_model::ChangeSetType::kFull, std::move(changes),
93+
data_model::Selector{}}}});
94+
}
95+
96+
void FDv1AdapterSynchronizer::ConvertingDestination::Upsert(
97+
std::string const& key,
98+
data_model::FlagDescriptor flag) {
99+
auto state = state_.lock();
100+
if (!state) {
101+
return;
102+
}
103+
data_interfaces::ChangeSetData changes;
104+
changes.push_back({key, std::move(flag)});
105+
state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{
106+
data_model::ChangeSet<data_interfaces::ChangeSetData>{
107+
data_model::ChangeSetType::kPartial, std::move(changes),
108+
data_model::Selector{}}}});
109+
}
110+
111+
void FDv1AdapterSynchronizer::ConvertingDestination::Upsert(
112+
std::string const& key,
113+
data_model::SegmentDescriptor segment) {
114+
auto state = state_.lock();
115+
if (!state) {
116+
return;
117+
}
118+
data_interfaces::ChangeSetData changes;
119+
changes.push_back({key, std::move(segment)});
120+
state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{
121+
data_model::ChangeSet<data_interfaces::ChangeSetData>{
122+
data_model::ChangeSetType::kPartial, std::move(changes),
123+
data_model::Selector{}}}});
124+
}
125+
126+
std::string const& FDv1AdapterSynchronizer::ConvertingDestination::Identity()
127+
const {
128+
static std::string const identity = "FDv1 adapter destination";
129+
return identity;
130+
}
131+
132+
// ----- FDv1AdapterSynchronizer -----
133+
134+
FDv1AdapterSynchronizer::FDv1AdapterSynchronizer(
135+
std::unique_ptr<data_interfaces::IDataSynchronizer> fdv1_source)
136+
: state_(std::make_shared<State>()),
137+
destination_(std::make_unique<ConvertingDestination>(state_)),
138+
fdv1_source_(std::move(fdv1_source)) {}
139+
140+
FDv1AdapterSynchronizer::~FDv1AdapterSynchronizer() {
141+
Close();
142+
}
143+
144+
async::Future<FDv2SourceResult> FDv1AdapterSynchronizer::Next(
145+
data_model::Selector /*selector*/) {
146+
auto closed = close_promise_.GetFuture();
147+
if (closed.IsFinished()) {
148+
return async::MakeFuture(
149+
FDv2SourceResult{FDv2SourceResult::Shutdown{}});
150+
}
151+
if (state_->TryStart()) {
152+
fdv1_source_->StartAsync(destination_.get(),
153+
/*bootstrap_data=*/nullptr);
154+
}
155+
auto result_future = state_->GetNext();
156+
if (result_future.IsFinished()) {
157+
return result_future;
158+
}
159+
return async::WhenAny(closed, result_future)
160+
.Then(
161+
[state = state_, result_future](std::size_t const& idx) mutable
162+
-> async::Future<FDv2SourceResult> {
163+
if (idx == 0) {
164+
state->ResolvePendingAsShutdown();
165+
return async::MakeFuture(
166+
FDv2SourceResult{FDv2SourceResult::Shutdown{}});
167+
}
168+
return result_future;
169+
},
170+
async::kInlineExecutor);
171+
}
172+
173+
void FDv1AdapterSynchronizer::Close() {
174+
if (!close_promise_.Resolve(std::monostate{})) {
175+
return;
176+
}
177+
if (state_->MarkClosed()) {
178+
fdv1_source_->ShutdownAsync([] {});
179+
}
180+
}
181+
182+
std::string const& FDv1AdapterSynchronizer::Identity() const {
183+
static std::string const identity = "FDv1 fallback adapter";
184+
return identity;
185+
}
186+
187+
} // namespace launchdarkly::server_side::data_systems
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#pragma once
2+
3+
#include "../../data_interfaces/destination/idestination.hpp"
4+
#include "../../data_interfaces/source/idata_synchronizer.hpp"
5+
#include "../../data_interfaces/source/ifdv2_synchronizer.hpp"
6+
7+
#include <launchdarkly/async/promise.hpp>
8+
9+
#include <deque>
10+
#include <memory>
11+
#include <mutex>
12+
#include <optional>
13+
#include <string>
14+
#include <variant>
15+
16+
namespace launchdarkly::server_side::data_systems {
17+
18+
/**
19+
* Adapts an FDv1 IDataSynchronizer to the IFDv2Synchronizer interface.
20+
*
21+
* FDv1 Init/Upsert callbacks delivered through an internal IDestination are
22+
* translated into FDv2SourceResult::ChangeSet results, with empty selectors
23+
* and fdv1_fallback = false (the directive does not re-fire from FDv1 data).
24+
*
25+
* Threading: Next() and Close() may be called from any thread; only one
26+
* Next() may be outstanding at a time. The adapter blocks in its destructor
27+
* waiting for the FDv1 source's ShutdownAsync completion, so no callbacks
28+
* are in flight when the wrapped source is destroyed.
29+
*/
30+
class FDv1AdapterSynchronizer final
31+
: public data_interfaces::IFDv2Synchronizer {
32+
public:
33+
explicit FDv1AdapterSynchronizer(
34+
std::unique_ptr<data_interfaces::IDataSynchronizer> fdv1_source);
35+
36+
~FDv1AdapterSynchronizer() override;
37+
38+
async::Future<data_interfaces::FDv2SourceResult> Next(
39+
data_model::Selector selector) override;
40+
void Close() override;
41+
[[nodiscard]] std::string const& Identity() const override;
42+
43+
private:
44+
/**
45+
* Holds the lifecycle, result queue, and pending Next() promise; shared
46+
* with the FDv1 source's IDestination via the inner ConvertingDestination.
47+
* All methods are thread-safe.
48+
*/
49+
class State {
50+
public:
51+
// Returns true if this call transitioned Initial → Started; false if
52+
// already started or already closed. Used to gate the one-time
53+
// StartAsync call on the wrapped FDv1 source.
54+
bool TryStart();
55+
56+
// Marks the state closed and returns whether the source was started
57+
// before the transition (so the caller knows whether ShutdownAsync
58+
// needs to be called).
59+
bool MarkClosed();
60+
61+
async::Future<data_interfaces::FDv2SourceResult> GetNext();
62+
63+
// Resolves any pending Next() promise with Shutdown and clears it.
64+
// Called on the close path so the abandoned promise doesn't leave
65+
// potential continuations dangling.
66+
void ResolvePendingAsShutdown();
67+
68+
void Notify(data_interfaces::FDv2SourceResult result);
69+
70+
private:
71+
// Protected by mutex_.
72+
mutable std::mutex mutex_;
73+
bool started_ = false;
74+
bool closed_ = false;
75+
std::optional<async::Promise<data_interfaces::FDv2SourceResult>>
76+
pending_promise_;
77+
std::deque<data_interfaces::FDv2SourceResult> result_queue_;
78+
};
79+
80+
/**
81+
* Translates FDv1 IDestination callbacks into FDv2 results queued on
82+
* State. Thread-safe (delegates to State).
83+
*/
84+
class ConvertingDestination final : public data_interfaces::IDestination {
85+
public:
86+
explicit ConvertingDestination(std::weak_ptr<State> state);
87+
void Init(data_model::SDKDataSet data_set) override;
88+
void Upsert(std::string const& key,
89+
data_model::FlagDescriptor flag) override;
90+
void Upsert(std::string const& key,
91+
data_model::SegmentDescriptor segment) override;
92+
[[nodiscard]] std::string const& Identity() const override;
93+
94+
private:
95+
std::weak_ptr<State> state_;
96+
};
97+
98+
// const after construction.
99+
std::shared_ptr<State> const state_;
100+
std::unique_ptr<ConvertingDestination> const destination_;
101+
std::unique_ptr<data_interfaces::IDataSynchronizer> const fdv1_source_;
102+
103+
// Thread-safe primitive.
104+
async::Promise<std::monostate> close_promise_;
105+
};
106+
107+
} // namespace launchdarkly::server_side::data_systems

0 commit comments

Comments
 (0)