Skip to content

Commit 5909036

Browse files
committed
feat: add server FDv2 data system orchestrator
1 parent a9c3349 commit 5909036

10 files changed

Lines changed: 1359 additions & 8 deletions

File tree

libs/server-sdk/src/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ target_sources(${LIBNAME}
3939
data_components/dependency_tracker/dependency_tracker.cpp
4040
data_components/expiration_tracker/expiration_tracker.hpp
4141
data_components/expiration_tracker/expiration_tracker.cpp
42+
data_interfaces/destination/itransactional_destination.hpp
4243
data_components/memory_store/memory_store.hpp
4344
data_components/memory_store/memory_store.cpp
4445
data_components/serialization_adapters/json_deserializer.hpp
@@ -59,6 +60,8 @@ target_sources(${LIBNAME}
5960
data_systems/fdv2/polling_synchronizer.cpp
6061
data_systems/fdv2/streaming_synchronizer.hpp
6162
data_systems/fdv2/streaming_synchronizer.cpp
63+
data_systems/fdv2/fdv2_data_system.hpp
64+
data_systems/fdv2/fdv2_data_system.cpp
6265
data_systems/background_sync/sources/streaming/streaming_data_source.hpp
6366
data_systems/background_sync/sources/streaming/streaming_data_source.cpp
6467
data_systems/background_sync/sources/streaming/event_handler.hpp

libs/server-sdk/src/data_components/change_notifier/change_notifier.cpp

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,24 @@
11
#include "change_notifier.hpp"
22

33
#include <launchdarkly/signals/boost_signal_connection.hpp>
4+
45
#include <mutex>
6+
#include <utility>
7+
#include <variant>
58

69
namespace launchdarkly::server_side::data_components {
710

11+
namespace {
12+
13+
template <class... Ts>
14+
struct overloaded : Ts... {
15+
using Ts::operator()...;
16+
};
17+
template <class... Ts>
18+
overloaded(Ts...) -> overloaded<Ts...>;
19+
20+
} // namespace
21+
822
std::unique_ptr<IConnection> ChangeNotifier::OnFlagChange(
923
ChangeHandler handler) {
1024
std::lock_guard lock{signal_mutex_};
@@ -55,6 +69,79 @@ void ChangeNotifier::Upsert(std::string const& key,
5569
std::move(segment));
5670
}
5771

72+
void ChangeNotifier::Apply(
73+
data_model::ChangeSet<data_interfaces::ChangeSetData> change_set) {
74+
if (change_set.type == data_model::ChangeSetType::kNone) {
75+
return;
76+
}
77+
78+
// Compute changed dependencies before passing the changeset to the sink.
79+
std::optional<DependencySet> change_notifications;
80+
if (HasListeners()) {
81+
DependencySet affected;
82+
if (change_set.type == data_model::ChangeSetType::kFull) {
83+
// Group items by kind so the existing per-kind diff helper can
84+
// compare the new state to the existing store contents.
85+
Collection<data_model::Flag> new_flags;
86+
Collection<data_model::Segment> new_segments;
87+
for (auto const& change : change_set.data) {
88+
std::visit(
89+
overloaded{
90+
[&](data_model::ItemDescriptor<data_model::Flag> const&
91+
f) { new_flags.emplace(change.key, f); },
92+
[&](data_model::ItemDescriptor<
93+
data_model::Segment> const& s) {
94+
new_segments.emplace(change.key, s);
95+
},
96+
},
97+
change.object);
98+
}
99+
CalculateChanges(DataKind::kFlag, source_.AllFlags(), new_flags,
100+
affected);
101+
CalculateChanges(DataKind::kSegment, source_.AllSegments(),
102+
new_segments, affected);
103+
} else {
104+
// Partial: every item in the changeset is treated as a change;
105+
// no version comparison.
106+
for (auto const& change : change_set.data) {
107+
std::visit(overloaded{
108+
[&](data_model::ItemDescriptor<
109+
data_model::Flag> const&) {
110+
dependency_tracker_.CalculateChanges(
111+
DataKind::kFlag, change.key, affected);
112+
},
113+
[&](data_model::ItemDescriptor<
114+
data_model::Segment> const&) {
115+
dependency_tracker_.CalculateChanges(
116+
DataKind::kSegment, change.key,
117+
affected);
118+
},
119+
},
120+
change.object);
121+
}
122+
}
123+
change_notifications = std::move(affected);
124+
}
125+
126+
// Update the dependency tracker.
127+
if (change_set.type == data_model::ChangeSetType::kFull) {
128+
dependency_tracker_.Clear();
129+
}
130+
for (auto const& change : change_set.data) {
131+
std::visit(
132+
[&](auto const& descriptor) {
133+
dependency_tracker_.UpdateDependencies(change.key, descriptor);
134+
},
135+
change.object);
136+
}
137+
138+
sink_.Apply(std::move(change_set));
139+
140+
if (change_notifications) {
141+
NotifyChanges(std::move(*change_notifications));
142+
}
143+
}
144+
58145
bool ChangeNotifier::HasListeners() const {
59146
std::lock_guard lock{signal_mutex_};
60147
return !signals_.empty();
@@ -69,7 +156,7 @@ void ChangeNotifier::NotifyChanges(DependencySet changes) {
69156
}
70157
}
71158

72-
ChangeNotifier::ChangeNotifier(IDestination& sink,
159+
ChangeNotifier::ChangeNotifier(data_interfaces::ITransactionalDestination& sink,
73160
data_interfaces::IStore const& source)
74161
: sink_(sink), source_(source) {}
75162

libs/server-sdk/src/data_components/change_notifier/change_notifier.hpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#pragma once
22

3-
#include "../../data_interfaces/destination/idestination.hpp"
3+
#include "../../data_interfaces/destination/itransactional_destination.hpp"
44
#include "../../data_interfaces/store/istore.hpp"
55
#include "../dependency_tracker/dependency_tracker.hpp"
66

@@ -13,7 +13,7 @@
1313

1414
namespace launchdarkly::server_side::data_components {
1515

16-
class ChangeNotifier final : public data_interfaces::IDestination,
16+
class ChangeNotifier final : public data_interfaces::ITransactionalDestination,
1717
public IChangeNotifier {
1818
public:
1919
template <typename Storage>
@@ -26,7 +26,8 @@ class ChangeNotifier final : public data_interfaces::IDestination,
2626
using SharedCollection =
2727
std::unordered_map<std::string, SharedItem<Storage>>;
2828

29-
ChangeNotifier(IDestination& sink, data_interfaces::IStore const& source);
29+
ChangeNotifier(data_interfaces::ITransactionalDestination& sink,
30+
data_interfaces::IStore const& source);
3031

3132
std::unique_ptr<IConnection> OnFlagChange(ChangeHandler handler) override;
3233

@@ -35,6 +36,8 @@ class ChangeNotifier final : public data_interfaces::IDestination,
3536
data_model::FlagDescriptor flag) override;
3637
void Upsert(std::string const& key,
3738
data_model::SegmentDescriptor segment) override;
39+
void Apply(data_model::ChangeSet<data_interfaces::ChangeSetData> change_set)
40+
override;
3841

3942
[[nodiscard]] std::string const& Identity() const override;
4043

@@ -105,7 +108,7 @@ class ChangeNotifier final : public data_interfaces::IDestination,
105108

106109
void NotifyChanges(DependencySet changes);
107110

108-
IDestination& sink_;
111+
data_interfaces::ITransactionalDestination& sink_;
109112
data_interfaces::IStore const& source_;
110113

111114
boost::signals2::signal<void(std::shared_ptr<ChangeSet>)> signals_;

libs/server-sdk/src/data_components/memory_store/memory_store.hpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#pragma once
22

3-
#include "../../data_interfaces/destination/idestination.hpp"
3+
#include "../../data_interfaces/destination/itransactional_destination.hpp"
44
#include "../../data_interfaces/item_change.hpp"
55
#include "../../data_interfaces/store/istore.hpp"
66

@@ -14,7 +14,7 @@
1414
namespace launchdarkly::server_side::data_components {
1515

1616
class MemoryStore final : public data_interfaces::IStore,
17-
public data_interfaces::IDestination {
17+
public data_interfaces::ITransactionalDestination {
1818
public:
1919
[[nodiscard]] std::shared_ptr<data_model::FlagDescriptor> GetFlag(
2020
std::string const& key) const override;
@@ -47,7 +47,8 @@ class MemoryStore final : public data_interfaces::IStore,
4747

4848
bool RemoveSegment(std::string const& key);
4949

50-
void Apply(data_model::ChangeSet<data_interfaces::ChangeSetData> changeSet);
50+
void Apply(data_model::ChangeSet<data_interfaces::ChangeSetData> changeSet)
51+
override;
5152

5253
MemoryStore() = default;
5354
~MemoryStore() override = default;
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#pragma once
2+
3+
#include "../item_change.hpp"
4+
#include "idestination.hpp"
5+
6+
#include <launchdarkly/data_model/change_set.hpp>
7+
8+
namespace launchdarkly::server_side::data_interfaces {
9+
10+
/**
11+
* ITransactionalDestination extends IDestination with the ability to apply
12+
* an FDv2 changeset atomically.
13+
*
14+
* A changeset is a batch of flag and segment upserts and deletions that must
15+
* be applied as a unit; readers must never observe a partially applied
16+
* changeset.
17+
*/
18+
class ITransactionalDestination : public IDestination {
19+
public:
20+
/**
21+
* Applies an FDv2 changeset to the destination atomically.
22+
*/
23+
virtual void Apply(data_model::ChangeSet<ChangeSetData> change_set) = 0;
24+
25+
ITransactionalDestination(ITransactionalDestination const&) = delete;
26+
ITransactionalDestination(ITransactionalDestination&&) = delete;
27+
ITransactionalDestination& operator=(ITransactionalDestination const&) =
28+
delete;
29+
ITransactionalDestination& operator=(ITransactionalDestination&&) = delete;
30+
~ITransactionalDestination() override = default;
31+
32+
protected:
33+
ITransactionalDestination() = default;
34+
};
35+
36+
} // namespace launchdarkly::server_side::data_interfaces
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#pragma once
2+
3+
#include "ifdv2_initializer.hpp"
4+
5+
#include <memory>
6+
7+
namespace launchdarkly::server_side::data_interfaces {
8+
9+
/**
10+
* Builds new IFDv2Initializer instances on demand. Each call to Build()
11+
* produces a fresh initializer that has not yet been started.
12+
*/
13+
class IFDv2InitializerFactory {
14+
public:
15+
virtual std::unique_ptr<IFDv2Initializer> Build() = 0;
16+
17+
virtual ~IFDv2InitializerFactory() = default;
18+
IFDv2InitializerFactory(IFDv2InitializerFactory const&) = delete;
19+
IFDv2InitializerFactory(IFDv2InitializerFactory&&) = delete;
20+
IFDv2InitializerFactory& operator=(IFDv2InitializerFactory const&) = delete;
21+
IFDv2InitializerFactory& operator=(IFDv2InitializerFactory&&) = delete;
22+
23+
protected:
24+
IFDv2InitializerFactory() = default;
25+
};
26+
27+
} // namespace launchdarkly::server_side::data_interfaces
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#pragma once
2+
3+
#include "ifdv2_synchronizer.hpp"
4+
5+
#include <memory>
6+
7+
namespace launchdarkly::server_side::data_interfaces {
8+
9+
/**
10+
* Builds new IFDv2Synchronizer instances on demand. Each call to Build()
11+
* produces a fresh synchronizer that has not yet been started.
12+
*/
13+
class IFDv2SynchronizerFactory {
14+
public:
15+
virtual std::unique_ptr<IFDv2Synchronizer> Build() = 0;
16+
17+
virtual ~IFDv2SynchronizerFactory() = default;
18+
IFDv2SynchronizerFactory(IFDv2SynchronizerFactory const&) = delete;
19+
IFDv2SynchronizerFactory(IFDv2SynchronizerFactory&&) = delete;
20+
IFDv2SynchronizerFactory& operator=(IFDv2SynchronizerFactory const&) =
21+
delete;
22+
IFDv2SynchronizerFactory& operator=(IFDv2SynchronizerFactory&&) = delete;
23+
24+
protected:
25+
IFDv2SynchronizerFactory() = default;
26+
};
27+
28+
} // namespace launchdarkly::server_side::data_interfaces

0 commit comments

Comments
 (0)