From d33131a4e850003d8a44f30c0d0b2ad24519e6a0 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 8 May 2026 14:53:28 -0700 Subject: [PATCH] fix: respect reconnect=false and clamp server-supplied retry: 0 Three pre-existing reconnect-control gaps surfaced during the multi- agent review of #134 (SDK-2347): 1. `BackoffRetry::change_base_delay` accepted any duration, including `Duration::ZERO`. A server emitting `retry: 0` collapsed the backoff to zero across every reconnect path, producing a tight reconnect loop. Clamp the input to a 1 ms floor. 2. The EOF arm of `ReconnectingRequest::poll_next` unconditionally scheduled a reconnect, even when `reconnect_opts.reconnect` was false. Honor the flag and transition to `StreamClosed` when reconnect is disabled, matching every other error path. 3. The parse-error arm only transitioned state when reconnect was enabled. With reconnect disabled, the parser stayed poisoned and the next poll drained to EOF, where (1) above papered over the bug. Transition to `StreamClosed` so the documented "do not use the stream after error" contract holds. Tests: - `test_change_base_delay_clamps_to_minimum` pins the retry floor. - `parser_error_closes_stream_when_reconnect_disabled` asserts the stream returns `None` after a parse error when reconnect is off. - `eof_closes_stream_when_reconnect_disabled` asserts the stream returns `None` after end-of-body when reconnect is off. --- eventsource-client/src/client.rs | 160 ++++++++++++++++++++++++++++--- eventsource-client/src/retry.rs | 25 ++++- 2 files changed, 173 insertions(+), 12 deletions(-) diff --git a/eventsource-client/src/client.rs b/eventsource-client/src/client.rs index 804e88d..4eb485e 100644 --- a/eventsource-client/src/client.rs +++ b/eventsource-client/src/client.rs @@ -506,8 +506,10 @@ impl Stream for ReconnectingRequest { StateProj::Connected(mut body) => match ready!(body.as_mut().poll_next(cx)) { Some(Ok(result)) => { if let Err(e) = this.event_parser.process_bytes(result) { - // A parse error means the current response body is - // unusable; schedule a reconnect to abandon it. + // The current response body is unusable. Either + // schedule a reconnect or close the stream so a + // caller that disabled reconnect doesn't keep + // reading from a poisoned parser. if self.props.reconnect_opts.reconnect { let duration = self .as_mut() @@ -517,6 +519,8 @@ impl Stream for ReconnectingRequest { self.as_mut().project().state.set(State::WaitingToReconnect( delay(duration, "reconnecting"), )); + } else { + self.as_mut().project().state.set(State::StreamClosed); } return Poll::Ready(Some(Err(e))); } @@ -547,15 +551,19 @@ impl Stream for ReconnectingRequest { return Poll::Ready(Some(Err(Error::Transport(e)))); } None => { - let duration = self - .as_mut() - .project() - .retry_strategy - .next_delay(Instant::now()); - self.as_mut() - .project() - .state - .set(State::WaitingToReconnect(delay(duration, "retrying"))); + if self.props.reconnect_opts.reconnect { + let duration = self + .as_mut() + .project() + .retry_strategy + .next_delay(Instant::now()); + self.as_mut() + .project() + .state + .set(State::WaitingToReconnect(delay(duration, "retrying"))); + } else { + self.as_mut().project().state.set(State::StreamClosed); + } if self.event_parser.was_processing() { return Poll::Ready(Some(Err(Error::UnexpectedEof))); @@ -799,4 +807,134 @@ mod tests { items.get(2) ); } + + // With reconnect disabled, a parse error should close the stream so the + // next poll returns `None` rather than continuing to read from a poisoned + // parser or reconnecting via the EOF arm. + #[cfg(feature = "hyper")] + #[tokio::test(flavor = "multi_thread")] + async fn parser_error_closes_stream_when_reconnect_disabled() { + use crate::{Client, ClientBuilder, Error, ReconnectOptionsBuilder, SSE}; + use futures::StreamExt; + use launchdarkly_sdk_transport::HyperTransport; + + let mut server = mockito::Server::new_async().await; + let _mock = server + .mock("GET", "/") + .with_status(200) + .with_body(b"\xff\xfe:bad\n\n".as_ref()) + .create_async() + .await; + + let transport = HyperTransport::new().expect("failed to build transport"); + let client = ClientBuilder::for_url(&server.url()) + .unwrap() + .reconnect( + ReconnectOptionsBuilder::new(false) + .retry_initial(true) + .build(), + ) + .build_with_transport(transport); + + let mut stream = client.stream(); + + let mut items = Vec::new(); + tokio::time::timeout(Duration::from_secs(2), async { + while items.len() < 3 { + match stream.next().await { + Some(item) => items.push(item), + None => { + items.push(Ok(SSE::Comment("__stream_ended__".into()))); + break; + } + } + } + }) + .await + .expect("timed out waiting for stream to close"); + + assert!( + matches!(items.first(), Some(Ok(SSE::Connected(_)))), + "expected initial Connected, got {:?}", + items.first() + ); + assert!( + matches!(items.get(1), Some(Err(Error::InvalidLine(_)))), + "expected InvalidLine error, got {:?}", + items.get(1) + ); + assert!( + matches!( + items.get(2), + Some(Ok(SSE::Comment(s))) if s == "__stream_ended__" + ), + "expected stream to end (None) after parse error with reconnect disabled, got {:?}", + items.get(2) + ); + } + + // With reconnect disabled, a clean end-of-body should close the stream + // rather than scheduling a reconnect. + #[cfg(feature = "hyper")] + #[tokio::test(flavor = "multi_thread")] + async fn eof_closes_stream_when_reconnect_disabled() { + use crate::{Client, ClientBuilder, Error, ReconnectOptionsBuilder, SSE}; + use futures::StreamExt; + use launchdarkly_sdk_transport::HyperTransport; + + let mut server = mockito::Server::new_async().await; + let _mock = server + .mock("GET", "/") + .with_status(200) + .with_body("event: hello\ndata: world\n\n") + .create_async() + .await; + + let transport = HyperTransport::new().expect("failed to build transport"); + let client = ClientBuilder::for_url(&server.url()) + .unwrap() + .reconnect( + ReconnectOptionsBuilder::new(false) + .retry_initial(true) + .build(), + ) + .build_with_transport(transport); + + let mut stream = client.stream(); + + let mut items: Vec>> = Vec::new(); + tokio::time::timeout(Duration::from_secs(2), async { + for _ in 0..4 { + let item = stream.next().await; + let is_terminal = item.is_none(); + items.push(item); + if is_terminal { + break; + } + } + }) + .await + .expect("timed out waiting for stream to close"); + + assert!( + matches!(items.first(), Some(Some(Ok(SSE::Connected(_))))), + "expected initial Connected, got {:?}", + items.first() + ); + assert!( + matches!(items.get(1), Some(Some(Ok(SSE::Event(e)))) if e.event_type == "hello"), + "expected hello event, got {:?}", + items.get(1) + ); + assert!( + matches!(items.get(2), Some(Some(Err(Error::Eof)))), + "expected Eof error after body ends, got {:?}", + items.get(2) + ); + assert!( + matches!(items.get(3), Some(None)), + "expected stream to end (None) after EOF with reconnect disabled, got {:?}", + items.get(3) + ); + } } diff --git a/eventsource-client/src/retry.rs b/eventsource-client/src/retry.rs index 404699e..7f42ad1 100644 --- a/eventsource-client/src/retry.rs +++ b/eventsource-client/src/retry.rs @@ -15,6 +15,11 @@ pub(crate) trait RetryStrategy { const DEFAULT_RESET_RETRY_INTERVAL: Duration = Duration::from_secs(60); +/// Floor applied to a server-supplied SSE `retry:` value. A server that +/// sends `retry: 0` would otherwise collapse the backoff to zero and +/// reconnect would become a tight loop. +const MINIMUM_BASE_DELAY: Duration = Duration::from_millis(1); + pub(crate) struct BackoffRetry { base_delay: Duration, max_delay: Duration, @@ -66,7 +71,7 @@ impl RetryStrategy for BackoffRetry { } fn change_base_delay(&mut self, base_delay: Duration) { - self.base_delay = base_delay; + self.base_delay = std::cmp::max(base_delay, MINIMUM_BASE_DELAY); self.next_delay = self.base_delay; } @@ -111,6 +116,24 @@ mod tests { assert_eq!(retry.next_delay(start.add(Duration::from_secs(2))), base); } + #[test] + fn test_change_base_delay_clamps_to_minimum() { + // A server that sends `retry: 0` would otherwise produce a zero-delay + // reconnect loop. The clamp ensures the next delay stays at least + // MINIMUM_BASE_DELAY. + let mut retry = + BackoffRetry::new(Duration::from_secs(10), Duration::from_secs(30), 1, false); + let start = Instant::now(); + + retry.change_base_delay(Duration::ZERO); + assert!(retry.next_delay(start) >= super::MINIMUM_BASE_DELAY); + + // Sub-minimum values are also clamped, not silently accepted. + let below_minimum = super::MINIMUM_BASE_DELAY / 2; + retry.change_base_delay(below_minimum); + assert!(retry.next_delay(start) >= super::MINIMUM_BASE_DELAY); + } + #[test] fn test_with_backoff() { let base = Duration::from_secs(10);