Skip to content

Commit 982bce3

Browse files
committed
add WhenAny and WhenAll helpers
1 parent 66a9181 commit 982bce3

2 files changed

Lines changed: 251 additions & 52 deletions

File tree

libs/internal/include/launchdarkly/async/promise.hpp

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <atomic>
34
#include <chrono>
45
#include <condition_variable>
56
#include <functional>
@@ -418,4 +419,95 @@ class Future {
418419
std::shared_ptr<PromiseInternal<T>> internal_;
419420
};
420421

422+
// WhenAll takes a variadic list of Futures (each with potentially different
423+
// value types) and returns a Future<std::monostate> that resolves once all
424+
// of the input futures have resolved. The result carries no value; callers
425+
// who need the individual results can read them from their original futures
426+
// after WhenAll resolves.
427+
//
428+
// If called with no arguments, the returned future is already resolved.
429+
//
430+
// Example:
431+
//
432+
// Future<int> f1 = ...;
433+
// Future<std::string> f2 = ...;
434+
// WhenAll(f1, f2).Then(
435+
// [&](std::monostate const&) {
436+
// // f1 and f2 are both finished here.
437+
// use(*f1.GetResult(), *f2.GetResult());
438+
// return std::monostate{};
439+
// },
440+
// executor);
441+
template <typename... Ts>
442+
Future<std::monostate> WhenAll(Future<Ts>... futures) {
443+
Promise<std::monostate> promise;
444+
Future<std::monostate> result = promise.GetFuture();
445+
446+
if constexpr (sizeof...(Ts) == 0) {
447+
promise.Resolve(std::monostate{});
448+
return result;
449+
}
450+
451+
auto shared_promise =
452+
std::make_shared<Promise<std::monostate>>(std::move(promise));
453+
auto count = std::make_shared<std::atomic<std::size_t>>(sizeof...(Ts));
454+
455+
auto attach = [&](auto future) {
456+
future.Then(
457+
[shared_promise, count](auto const&) -> std::monostate {
458+
if (count->fetch_sub(1) == 1) {
459+
shared_promise->Resolve(std::monostate{});
460+
}
461+
return std::monostate{};
462+
},
463+
[](Continuation<void()> f) { f(); });
464+
};
465+
466+
(attach(futures), ...);
467+
468+
return result;
469+
}
470+
471+
// WhenAny takes a variadic list of Futures (each with potentially different
472+
// value types) and returns a Future<std::size_t> that resolves with the
473+
// 0-based index of whichever input future resolves first. The caller can use
474+
// the index to identify the winning future and read its result directly.
475+
//
476+
// If called with no arguments, the returned future never resolves.
477+
//
478+
// Example:
479+
//
480+
// Future<int> f0 = ...;
481+
// Future<std::string> f1 = ...;
482+
// WhenAny(f0, f1).Then(
483+
// [&](std::size_t const& index) {
484+
// if (index == 0) use(*f0.GetResult());
485+
// else use(*f1.GetResult());
486+
// return std::monostate{};
487+
// },
488+
// executor);
489+
template <typename... Ts>
490+
Future<std::size_t> WhenAny(Future<Ts>... futures) {
491+
Promise<std::size_t> promise;
492+
Future<std::size_t> result = promise.GetFuture();
493+
494+
auto shared_promise =
495+
std::make_shared<Promise<std::size_t>>(std::move(promise));
496+
497+
std::size_t index = 0;
498+
auto attach = [&](auto future) {
499+
std::size_t i = index++;
500+
future.Then(
501+
[shared_promise, i](auto const&) -> std::monostate {
502+
shared_promise->Resolve(i);
503+
return std::monostate{};
504+
},
505+
[](Continuation<void()> f) { f(); });
506+
};
507+
508+
(attach(futures), ...);
509+
510+
return result;
511+
}
512+
421513
} // namespace launchdarkly::async

libs/internal/tests/promise_test.cpp

Lines changed: 159 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,45 @@
99

1010
using namespace launchdarkly::async;
1111

12+
TEST(Promise, SimplePromise) {
13+
Promise<int> promise;
14+
Future<int> future = promise.GetFuture();
15+
16+
Future<float> future2 = future.Then(
17+
[](int const& inner) { return static_cast<float>(inner * 2.0); },
18+
[](Continuation<void()> f) { f(); });
19+
20+
promise.Resolve(43);
21+
22+
auto& result = future2.WaitForResult(std::chrono::seconds(5));
23+
ASSERT_TRUE(result.has_value());
24+
EXPECT_FLOAT_EQ(*result, 86.0f);
25+
}
26+
27+
TEST(Promise, ASIOTest) {
28+
boost::asio::io_context ioc;
29+
auto work = boost::asio::make_work_guard(ioc);
30+
std::thread ioc_thread([&]() { ioc.run(); });
31+
32+
Promise<int> promise;
33+
Future<int> future = promise.GetFuture();
34+
35+
Future<float> future2 = future.Then(
36+
[](int const& inner) { return static_cast<float>(inner * 2.0); },
37+
[&ioc](Continuation<void()> f) {
38+
boost::asio::post(ioc, [f = std::move(f)]() mutable { f(); });
39+
});
40+
41+
promise.Resolve(42);
42+
43+
auto& result = future2.WaitForResult(std::chrono::seconds(5));
44+
ASSERT_TRUE(result.has_value());
45+
EXPECT_FLOAT_EQ(*result, 84.0f);
46+
47+
work.reset();
48+
ioc_thread.join();
49+
}
50+
1251
TEST(Promise, GetResultNotFinished) {
1352
Promise<int> promise;
1453
Future<int> future = promise.GetFuture();
@@ -53,9 +92,9 @@ TEST(Promise, ContinueByReturningFuture) {
5392
Promise<int> promise2;
5493
Future<int> future2 = promise2.GetFuture();
5594

56-
Future<int> chained = promise1.GetFuture().Then(
57-
[future2](int const&) { return future2; },
58-
[](Continuation<void()> f) { f(); });
95+
Future<int> chained =
96+
promise1.GetFuture().Then([future2](int const&) { return future2; },
97+
[](Continuation<void()> f) { f(); });
5998

6099
promise1.Resolve(0);
61100
promise2.Resolve(42);
@@ -71,9 +110,8 @@ TEST(Promise, ResolvedBeforeContinuation) {
71110

72111
promise.Resolve(21);
73112

74-
Future<int> future2 = future.Then(
75-
[](int const& val) { return val * 2; },
76-
[](Continuation<void()> f) { f(); });
113+
Future<int> future2 = future.Then([](int const& val) { return val * 2; },
114+
[](Continuation<void()> f) { f(); });
77115

78116
auto& result = future2.WaitForResult(std::chrono::seconds(5));
79117
ASSERT_TRUE(result.has_value());
@@ -84,9 +122,8 @@ TEST(Promise, ResolvedAfterContinuation) {
84122
Promise<int> promise;
85123
Future<int> future = promise.GetFuture();
86124

87-
Future<int> future2 = future.Then(
88-
[](int const& val) { return val * 2; },
89-
[](Continuation<void()> f) { f(); });
125+
Future<int> future2 = future.Then([](int const& val) { return val * 2; },
126+
[](Continuation<void()> f) { f(); });
90127

91128
promise.Resolve(21);
92129

@@ -140,11 +177,10 @@ TEST(Promise, MoveOnlyCallback) {
140177
Future<int> future = promise.GetFuture();
141178

142179
auto captured = std::make_unique<int>(2);
143-
Future<int> future2 = future.Then(
144-
[captured = std::move(captured)](int const& val) {
145-
return val * *captured;
146-
},
147-
[](Continuation<void()> f) { f(); });
180+
Future<int> future2 =
181+
future.Then([captured = std::move(captured)](
182+
int const& val) { return val * *captured; },
183+
[](Continuation<void()> f) { f(); });
148184

149185
promise.Resolve(21);
150186

@@ -161,8 +197,15 @@ TEST(Promise, ResultMovedWhenPossible) {
161197
int value;
162198
int* copy_count;
163199
explicit Counted(int v, int* c) : value(v), copy_count(c) {}
164-
Counted(Counted const& o) : value(o.value), copy_count(o.copy_count) { ++(*copy_count); }
165-
Counted& operator=(Counted const& o) { value = o.value; copy_count = o.copy_count; ++(*copy_count); return *this; }
200+
Counted(Counted const& o) : value(o.value), copy_count(o.copy_count) {
201+
++(*copy_count);
202+
}
203+
Counted& operator=(Counted const& o) {
204+
value = o.value;
205+
copy_count = o.copy_count;
206+
++(*copy_count);
207+
return *this;
208+
}
166209
Counted(Counted&&) noexcept = default;
167210
Counted& operator=(Counted&&) noexcept = default;
168211
};
@@ -186,8 +229,15 @@ TEST(Promise, CallbackMovedWhenPossible) {
186229
int value;
187230
int* copy_count;
188231
explicit Counted(int v, int* c) : value(v), copy_count(c) {}
189-
Counted(Counted const& o) : value(o.value), copy_count(o.copy_count) { ++(*copy_count); }
190-
Counted& operator=(Counted const& o) { value = o.value; copy_count = o.copy_count; ++(*copy_count); return *this; }
232+
Counted(Counted const& o) : value(o.value), copy_count(o.copy_count) {
233+
++(*copy_count);
234+
}
235+
Counted& operator=(Counted const& o) {
236+
value = o.value;
237+
copy_count = o.copy_count;
238+
++(*copy_count);
239+
return *this;
240+
}
191241
Counted(Counted&&) noexcept = default;
192242
Counted& operator=(Counted&&) noexcept = default;
193243
};
@@ -198,19 +248,18 @@ TEST(Promise, CallbackMovedWhenPossible) {
198248
Counted multiplier{2, &copies};
199249
copies = 0; // reset after construction
200250

201-
future.Then(
202-
[multiplier = std::move(multiplier)](int const& val) mutable {
203-
return val * multiplier.value;
204-
},
205-
[](Continuation<void()> f) { f(); });
251+
future.Then([multiplier = std::move(multiplier)](
252+
int const& val) mutable { return val * multiplier.value; },
253+
[](Continuation<void()> f) { f(); });
206254

207255
EXPECT_EQ(copies, 0);
208256

209257
promise.Resolve(21);
210258
}
211259

212-
// Demonstrates using std::monostate as a void-like result type for fire-and-forget
213-
// async operations where the completion matters but no value is produced.
260+
// Demonstrates using std::monostate as a void-like result type for
261+
// fire-and-forget async operations where the completion matters but no value is
262+
// produced.
214263
TEST(Promise, MonostateVoidLike) {
215264
Promise<std::monostate> promise;
216265
Future<std::monostate> future = promise.GetFuture();
@@ -258,42 +307,100 @@ TEST(Promise, ExpectedFailure) {
258307
EXPECT_EQ(result->error(), "timed out");
259308
}
260309

310+
TEST(WhenAll, NoFutures) {
311+
Future<std::monostate> result = WhenAll();
312+
EXPECT_TRUE(result.IsFinished());
313+
}
261314

262-
TEST(Promise, SimplePromise) {
263-
Promise<int> promise;
264-
Future<int> future = promise.GetFuture();
315+
// Verifies WhenAll resolves when all futures are already resolved.
316+
TEST(WhenAll, AllAlreadyResolved) {
317+
Promise<int> p1;
318+
Promise<std::string> p2;
265319

266-
Future<float> future2 = future.Then(
267-
[](int const& inner) { return static_cast<float>(inner * 2.0); },
268-
[](Continuation<void()> f) { f(); });
320+
p1.Resolve(1);
321+
p2.Resolve("hello");
269322

270-
promise.Resolve(43);
323+
Future<std::monostate> result = WhenAll(p1.GetFuture(), p2.GetFuture());
324+
auto& r = result.WaitForResult(std::chrono::seconds(5));
325+
ASSERT_TRUE(r.has_value());
326+
}
271327

272-
auto& result = future2.WaitForResult(std::chrono::seconds(5));
273-
ASSERT_TRUE(result.has_value());
274-
EXPECT_FLOAT_EQ(*result, 86.0f);
328+
// Verifies WhenAll resolves only after all futures resolve, using futures of
329+
// mixed value types. The original futures still hold their results afterward.
330+
TEST(WhenAll, ResolvesAfterAll) {
331+
Promise<int> p1;
332+
Promise<std::string> p2;
333+
334+
Future<int> f1 = p1.GetFuture();
335+
Future<std::string> f2 = p2.GetFuture();
336+
337+
Future<std::monostate> result = WhenAll(f1, f2);
338+
339+
EXPECT_FALSE(result.IsFinished());
340+
p1.Resolve(42);
341+
EXPECT_FALSE(result.IsFinished());
342+
p2.Resolve("done");
343+
344+
auto& r = result.WaitForResult(std::chrono::seconds(5));
345+
ASSERT_TRUE(r.has_value());
346+
EXPECT_EQ(*f1.GetResult(), 42);
347+
EXPECT_EQ(*f2.GetResult(), "done");
275348
}
276349

277-
TEST(Promise, ASIOTest) {
278-
boost::asio::io_context ioc;
279-
auto work = boost::asio::make_work_guard(ioc);
280-
std::thread ioc_thread([&]() { ioc.run(); });
350+
// Verifies that WhenAny resolves with the index of the first future to resolve.
351+
TEST(WhenAny, FirstResolved) {
352+
Promise<int> p0;
353+
Promise<int> p1;
354+
Promise<int> p2;
281355

282-
Promise<int> promise;
283-
Future<int> future = promise.GetFuture();
356+
Future<std::size_t> result =
357+
WhenAny(p0.GetFuture(), p1.GetFuture(), p2.GetFuture());
284358

285-
Future<float> future2 = future.Then(
286-
[](int const& inner) { return static_cast<float>(inner * 2.0); },
287-
[&ioc](Continuation<void()> f) {
288-
boost::asio::post(ioc, [f = std::move(f)]() mutable { f(); });
289-
});
359+
EXPECT_FALSE(result.IsFinished());
360+
p1.Resolve(42);
290361

291-
promise.Resolve(42);
362+
auto& r = result.WaitForResult(std::chrono::seconds(5));
363+
ASSERT_TRUE(r.has_value());
364+
EXPECT_EQ(*r, 1u);
365+
}
292366

293-
auto& result = future2.WaitForResult(std::chrono::seconds(5));
294-
ASSERT_TRUE(result.has_value());
295-
EXPECT_FLOAT_EQ(*result, 84.0f);
367+
// Verifies that WhenAny works with futures of mixed value types, and that
368+
// resolving a later future after the winner does not change the result.
369+
TEST(WhenAny, MixedTypesFirstWins) {
370+
Promise<int> p0;
371+
Promise<std::string> p1;
372+
373+
Future<int> f0 = p0.GetFuture();
374+
Future<std::string> f1 = p1.GetFuture();
375+
376+
Future<std::variant<int, std::string>> result = WhenAny(f0, f1).Then(
377+
[f0, f1](size_t const& index) -> std::variant<int, std::string> {
378+
if (index == 0) {
379+
return f0.GetResult().value();
380+
} else {
381+
return f1.GetResult().value();
382+
}
383+
},
384+
[](Continuation<void()> f) { f(); });
296385

297-
work.reset();
298-
ioc_thread.join();
386+
p1.Resolve("hello");
387+
p0.Resolve(99);
388+
389+
auto& r = result.WaitForResult(std::chrono::seconds(5));
390+
ASSERT_TRUE(r.has_value());
391+
EXPECT_EQ(std::get<std::string>(*result.GetResult()), "hello");
392+
}
393+
394+
// Verifies that WhenAny resolves immediately if a future is already resolved.
395+
TEST(WhenAny, AlreadyResolved) {
396+
Promise<int> p0;
397+
Promise<int> p1;
398+
399+
p0.Resolve(42);
400+
401+
Future<std::size_t> result = WhenAny(p0.GetFuture(), p1.GetFuture());
402+
403+
auto& r = result.WaitForResult(std::chrono::seconds(5));
404+
ASSERT_TRUE(r.has_value());
405+
EXPECT_EQ(*r, 0u);
299406
}

0 commit comments

Comments
 (0)