Skip to content

Commit ce27c1a

Browse files
beekldkinyoklion
andauthored
refactor: add SSE Builder::on_response hook (#537)
## Summary Adds an `on_response` hook to `sse::Builder`, invoked once per (re)connect after the HTTP response headers have been received, before any branching on status. Fires for every response (any status, including redirects and errors); does not fire if the connection failed before any response (TCP error, read timeout). Prep for upcoming FDv2 work that needs to inspect the `X-LD-FD-Fallback` response header. ## Design notes cURL backend accumulates header lines into a per-transfer `http::response_header<>` on `RequestContext`, because `HeaderCallback` is invoked once per CRLF-terminated line. The hook fires on the empty terminator and is posted to the client strand. ## Test plan - [x] `gtest_launchdarkly-sse-client` — green under both Foxy and cURL backends - [x] New `ClientTest.OnResponseHookFiresWithCustomHeader` <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Touches reconnect/header parsing on both Foxy and cURL SSE paths; cURL header assembly is new logic though covered by existing and new client tests. > > **Overview** > Adds **`sse::Builder::on_response`**, a callback that runs **once per connect/reconnect** right after response headers are parsed and **before** status-based handling (success, redirect, errors). It receives a `http::response_header<>` so callers can read headers such as **`X-LD-FD-Fallback`** without waiting for the SSE body. > > **Foxy** backend invokes the hook synchronously on the client executor when header parsing completes. **cURL** backend rebuilds the prior Content-Type-only header sniffing into full **line-by-line header assembly** (status line, OWS trimming, duplicate headers via `insert()`, redirect chains), emits the hook on the blank line after headers, and **posts** it to the same executor as other callbacks. Header accumulator state is reset on each new transfer. > > Includes **`ClientTest.OnResponseHookFiresWithCustomHeader`** for status and custom header visibility. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 733ec51. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Co-authored-by: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com>
1 parent f1e7555 commit ce27c1a

5 files changed

Lines changed: 226 additions & 11 deletions

File tree

libs/server-sent-events/include/launchdarkly/sse/client.hpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ class Builder {
3636
using ErrorCallback = std::function<void(Error)>;
3737
using ConnectionHook =
3838
std::function<void(http::request<http::string_body>*)>;
39+
using ResponseHook =
40+
std::function<void(http::response_header<> const& headers)>;
3941

4042
/**
4143
* Create a builder for the given URL. If the port is omitted, 443 is
@@ -196,6 +198,20 @@ class Builder {
196198
*/
197199
Builder& on_connect(ConnectionHook hook);
198200

201+
/**
202+
* Register a hook invoked once per (re)connect attempt after the response
203+
* headers have been received, before any branching on status. Fires for
204+
* every HTTP response (any status code, including redirects and errors).
205+
* Does NOT fire if the connection failed before any response arrived
206+
* (e.g. TCP error, read timeout).
207+
*
208+
* The hook is invoked on the client's executor; callers must not block.
209+
*
210+
* @param hook Callback invoked with the response header.
211+
* @return Reference to this builder.
212+
*/
213+
Builder& on_response(ResponseHook hook);
214+
199215
/**
200216
* Builds a Client. The shared pointer is necessary to extend the lifetime
201217
* of the Client to encompass each asynchronous operation that it performs.
@@ -219,6 +235,7 @@ class Builder {
219235
std::optional<std::string> custom_ca_file_;
220236
std::optional<std::string> proxy_url_;
221237
ConnectionHook connection_hook_;
238+
ResponseHook response_hook_;
222239
};
223240

224241
/**

libs/server-sent-events/src/client.cpp

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class FoxyClient : public Client,
7575
Builder::LogCallback logger,
7676
Builder::ErrorCallback errors,
7777
Builder::ConnectionHook connection_hook,
78+
Builder::ResponseHook response_hook,
7879
std::optional<net::ssl::context> maybe_ssl)
7980
: ssl_context_(std::move(maybe_ssl)),
8081
host_(std::move(host)),
@@ -87,6 +88,7 @@ class FoxyClient : public Client,
8788
logger_(std::move(logger)),
8889
errors_(std::move(errors)),
8990
connection_hook_(std::move(connection_hook)),
91+
response_hook_(std::move(response_hook)),
9092
body_parser_(std::nullopt),
9193
session_(std::nullopt),
9294
last_event_id_(std::nullopt),
@@ -256,6 +258,9 @@ class FoxyClient : public Client,
256258

257259
/* headers are finished, body is ready */
258260
auto response = body_parser_->get();
261+
if (response_hook_) {
262+
response_hook_(response.base());
263+
}
259264
auto status_class = beast::http::to_status_class(response.result());
260265

261266
if (status_class == beast::http::status_class::successful) {
@@ -517,6 +522,10 @@ class FoxyClient : public Client,
517522
// updating query parameters from external state).
518523
Builder::ConnectionHook connection_hook_;
519524

525+
// Optional hook invoked once per (re)connect after the response headers
526+
// have been received, before any branching on status.
527+
Builder::ResponseHook response_hook_;
528+
520529
// Customized parser (see parser.hpp) which repeatedly receives chunks of
521530
// data and parses them into SSE events. It cannot be reused across
522531
// connections, hence the optional so it can be destroyed easily.
@@ -650,6 +659,11 @@ Builder& Builder::on_connect(ConnectionHook hook) {
650659
return *this;
651660
}
652661

662+
Builder& Builder::on_response(ResponseHook hook) {
663+
response_hook_ = std::move(hook);
664+
return *this;
665+
}
666+
653667
std::shared_ptr<Client> Builder::build() {
654668
auto uri_components = boost::urls::parse_uri(url_);
655669
if (!uri_components) {
@@ -697,8 +711,8 @@ std::shared_ptr<Client> Builder::build() {
697711
return std::make_shared<CurlClient>(
698712
net::make_strand(executor_), request, host, service, connect_timeout_,
699713
read_timeout_, write_timeout_, initial_reconnect_delay_, receiver_,
700-
logging_cb_, error_cb_, connection_hook_, skip_verify_peer_,
701-
custom_ca_file_, use_https, proxy_url_);
714+
logging_cb_, error_cb_, connection_hook_, response_hook_,
715+
skip_verify_peer_, custom_ca_file_, use_https, proxy_url_);
702716
#else
703717
std::optional<ssl::context> ssl;
704718
if (uri_components->scheme_id() == boost::urls::scheme::https) {
@@ -720,7 +734,8 @@ std::shared_ptr<Client> Builder::build() {
720734
return std::make_shared<FoxyClient>(
721735
net::make_strand(executor_), request, host, service, connect_timeout_,
722736
read_timeout_, write_timeout_, initial_reconnect_delay_, receiver_,
723-
logging_cb_, error_cb_, connection_hook_, std::move(ssl));
737+
logging_cb_, error_cb_, connection_hook_, response_hook_,
738+
std::move(ssl));
724739
#endif
725740
}
726741

libs/server-sent-events/src/curl_client.cpp

Lines changed: 108 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
#include "curl_client.hpp"
44

55
#include <boost/asio/post.hpp>
6+
#include <boost/beast/core/string.hpp>
67
#include <boost/beast/http/status.hpp>
78
#include <boost/url/url.hpp>
89

10+
#include <charconv>
911
#include <sstream>
12+
#include <system_error>
1013

1114
namespace launchdarkly::sse {
1215
namespace beast = boost::beast;
@@ -41,6 +44,7 @@ CurlClient::CurlClient(
4144
Builder::LogCallback logger,
4245
Builder::ErrorCallback errors,
4346
Builder::ConnectionHook connection_hook,
47+
Builder::ResponseHook response_hook,
4448
bool skip_verify_peer,
4549
std::optional<std::string> custom_ca_file,
4650
bool use_https,
@@ -51,6 +55,7 @@ CurlClient::CurlClient(
5155
logger_(std::move(logger)),
5256
errors_(std::move(errors)),
5357
connection_hook_(std::move(connection_hook)),
58+
response_hook_(std::move(response_hook)),
5459
use_https_(use_https),
5560
backoff_timer_(executor),
5661
multi_manager_(CurlMultiManager::create(executor)),
@@ -149,6 +154,19 @@ void CurlClient::do_run() {
149154
self->log_message(message);
150155
}
151156
}
157+
},
158+
[weak_self, weak_ctx](http::response_header<> headers) {
159+
if (auto ctx = weak_ctx.lock()) {
160+
if (auto const self = weak_self.lock()) {
161+
if (self->response_hook_) {
162+
boost::asio::post(
163+
self->backoff_timer_.get_executor(),
164+
[self, headers = std::move(headers)]() {
165+
self->response_hook_(headers);
166+
});
167+
}
168+
}
169+
}
152170
}));
153171
// Start request using CURL multi (non-blocking)
154172
PerformRequestWithMulti(multi_manager_, ctx);
@@ -397,19 +415,98 @@ size_t CurlClient::WriteCallback(char const* data,
397415
// Callback for reading request headers
398416
//
399417
// https://curl.se/libcurl/c/CURLOPT_HEADERFUNCTION.html
418+
//
419+
// libcurl invokes this once per CRLF-terminated response line: the HTTP status
420+
// line, then each header, then an empty terminator. With
421+
// CURLOPT_FOLLOWLOCATION enabled the cycle repeats for each response in the
422+
// redirect chain.
400423
size_t CurlClient::HeaderCallback(char const* buffer,
401424
size_t size,
402425
size_t nitems,
403426
void* userdata) {
404427
size_t const total_size = size * nitems;
405428
auto* context = static_cast<RequestContext*>(userdata);
406429

407-
// Check for Content-Type header
408-
if (std::string const header(buffer, total_size);
409-
header.find("Content-Type:") == 0 ||
410-
header.find("content-type:") == 0) {
411-
if (header.find("text/event-stream") == std::string::npos) {
412-
context->log_message("warning: unexpected Content-Type: " + header);
430+
std::string_view line(buffer, total_size);
431+
432+
// Strip the line terminator. Allow bare LF or bare CR per RFC 9112 §2.2;
433+
// libcurl preserves the original wire bytes for HTTP/1.x (only HTTP/2
434+
// synthesizes CRLF), so a non-compliant origin can deliver bare LF here.
435+
while (!line.empty() && (line.back() == '\r' || line.back() == '\n')) {
436+
line.remove_suffix(1);
437+
}
438+
439+
if (line.empty()) {
440+
// Terminator. If we're between responses (e.g., the line ends a
441+
// chunked-transfer trailer block), there's nothing to emit.
442+
if (context->reading_headers) {
443+
context->response(std::move(context->current_response));
444+
context->current_response = http::response_header<>{};
445+
context->reading_headers = false;
446+
}
447+
return total_size;
448+
}
449+
450+
if (line.substr(0, 5) == "HTTP/") {
451+
// Status line: "HTTP/X.Y CODE REASON". Only legitimate before any
452+
// header has been seen for this response — an interior HTTP/ line
453+
// would otherwise wipe accumulated state.
454+
if (context->reading_headers) {
455+
return total_size;
456+
}
457+
// Beast default-constructs result_ to status::ok (200); reset to 0
458+
// so an unparseable status line surfaces as result_int() == 0.
459+
context->current_response = http::response_header<>{};
460+
context->current_response.result(0);
461+
auto const code_start = line.find(' ');
462+
if (code_start != std::string_view::npos) {
463+
unsigned code = 0;
464+
auto const result = std::from_chars(
465+
line.data() + code_start + 1, line.data() + line.size(), code);
466+
// Three-digit status per RFC 7231 §6; the tight bound avoids
467+
// result(unsigned) throwing across the libcurl C frame.
468+
if (result.ec == std::errc{} && code >= 100 && code <= 999) {
469+
context->current_response.result(code);
470+
}
471+
}
472+
context->reading_headers = true;
473+
return total_size;
474+
}
475+
476+
if (!context->reading_headers) {
477+
// Header line received outside an active response — chunked trailer
478+
// or protocol-level junk. Ignore.
479+
return total_size;
480+
}
481+
482+
auto const colon = line.find(':');
483+
if (colon != std::string_view::npos) {
484+
std::string_view name = line.substr(0, colon);
485+
// HTTP optional whitespace (OWS) per RFC 7230 §3.2.3 is SP or HTAB.
486+
std::string_view value = line.substr(colon + 1);
487+
while (!value.empty() &&
488+
(value.front() == ' ' || value.front() == '\t')) {
489+
value.remove_prefix(1);
490+
}
491+
while (!value.empty() &&
492+
(value.back() == ' ' || value.back() == '\t')) {
493+
value.remove_suffix(1);
494+
}
495+
// insert() preserves duplicate-name headers (Set-Cookie, Via, …);
496+
// set() would collapse them and diverge from the Foxy backend.
497+
try {
498+
context->current_response.insert(std::string(name),
499+
std::string(value));
500+
} catch (std::exception const&) {
501+
// insert() throws if the name isn't a valid RFC 7230 token.
502+
context->log_message("ignoring response header with invalid name");
503+
return total_size;
504+
}
505+
506+
if (beast::iequals(name, "Content-Type") &&
507+
value.find("text/event-stream") == std::string_view::npos) {
508+
context->log_message("warning: unexpected Content-Type: " +
509+
std::string(line));
413510
}
414511
}
415512

@@ -426,6 +523,11 @@ void CurlClient::PerformRequestWithMulti(
426523
// Initialize parser for new connection (last_event_id is tracked
427524
// separately)
428525
context->init_parser();
526+
// Reset header-accumulator state in case the previous transfer dropped
527+
// mid-headers, which would otherwise leave reading_headers=true and
528+
// cause the new response's HTTP/ status line to be skipped.
529+
context->current_response = http::response_header<>{};
530+
context->reading_headers = false;
429531

430532
std::shared_ptr<CURL> curl(curl_easy_init(), curl_easy_cleanup);
431533
if (!curl) {

libs/server-sent-events/src/curl_client.hpp

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,20 @@ class CurlClient final : public Client,
4949
std::function<void(Error)> on_error;
5050
std::function<void()> reset_backoff;
5151
std::function<void(std::string)> log_message;
52+
std::function<void(http::response_header<>)> on_response;
5253

5354
Callbacks(std::function<void(std::string)> do_backoff,
5455
std::function<void(Event)> on_receive,
5556
std::function<void(Error)> on_error,
5657
std::function<void()> reset_backoff,
57-
std::function<void(std::string)> log_message)
58+
std::function<void(std::string)> log_message,
59+
std::function<void(http::response_header<>)> on_response)
5860
: do_backoff(std::move(do_backoff)),
5961
on_receive(std::move(on_receive)),
6062
on_error(std::move(on_error)),
6163
reset_backoff(std::move(reset_backoff)),
62-
log_message(std::move(log_message)) {}
64+
log_message(std::move(log_message)),
65+
on_response(std::move(on_response)) {}
6366
};
6467

6568
/**
@@ -94,6 +97,16 @@ class CurlClient final : public Client,
9497
std::chrono::steady_clock::time_point last_progress_time;
9598
curl_off_t last_download_amount;
9699

100+
// Accumulator for the current response's headers; reset on each new
101+
// status line, emitted on the empty terminator line.
102+
http::response_header<> current_response;
103+
104+
// True while accumulating headers between an `HTTP/` status line and
105+
// the empty terminator. Gates `HeaderCallback` against chunked
106+
// trailers (which arrive without a fresh status line) and against
107+
// interior `HTTP/` lines that would otherwise wipe accumulated state.
108+
bool reading_headers = false;
109+
97110
// Mutated on the strand in do_run() before each transfer, and read by
98111
// libcurl via raw pointers (CURLOPT_URL, CURLOPT_POSTFIELDS) for the
99112
// duration of the transfer. Safe because the next do_run() only fires
@@ -158,6 +171,16 @@ class CurlClient final : public Client,
158171
}
159172
}
160173

174+
void response(http::response_header<> headers) {
175+
std::lock_guard lock(mutex_);
176+
if (shutting_down_) {
177+
return;
178+
}
179+
if (callbacks_) {
180+
callbacks_->on_response(std::move(headers));
181+
}
182+
}
183+
161184
void set_callbacks(Callbacks callbacks) {
162185
std::lock_guard lock(mutex_);
163186
callbacks_ = std::move(callbacks);
@@ -238,6 +261,7 @@ class CurlClient final : public Client,
238261
Builder::LogCallback logger,
239262
Builder::ErrorCallback errors,
240263
Builder::ConnectionHook connection_hook,
264+
Builder::ResponseHook response_hook,
241265
bool skip_verify_peer,
242266
std::optional<std::string> custom_ca_file,
243267
bool use_https,
@@ -294,6 +318,7 @@ class CurlClient final : public Client,
294318
Builder::LogCallback logger_;
295319
Builder::ErrorCallback errors_;
296320
Builder::ConnectionHook connection_hook_;
321+
Builder::ResponseHook response_hook_;
297322

298323
bool use_https_;
299324
boost::asio::steady_timer backoff_timer_;

0 commit comments

Comments
 (0)