From 79170387c97126cfcaa0772931738419f8ddb250 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 8 May 2026 14:32:30 -0700 Subject: [PATCH 1/3] fix: schedule reconnect after parse error during streaming In `ReconnectingRequest::poll_next`, parser errors propagated through `process_bytes(...)?` while in the `Connected` state previously returned the error without scheduling a reconnect. The next poll continued draining the broken response body before finally hitting end-of-body and emitting a spurious `UnexpectedEof` before the reconnect kicked in. Schedule a reconnect on parse errors when reconnect is enabled. The error is still returned to the caller; the only behavioral change is that the next poll cleanly transitions to reconnect rather than draining the broken body. Adds `parser_error_schedules_reconnect_immediately` test verifying that the next stream item after a parse error is the reconnect's `Connected`, not an extra error. Contributes to launchdarkly/rust-server-sdk#116. --- eventsource-client/src/client.rs | 79 +++++++++++++++++++++++++++++++- 1 file changed, 78 insertions(+), 1 deletion(-) diff --git a/eventsource-client/src/client.rs b/eventsource-client/src/client.rs index 2db6b31..c5de518 100644 --- a/eventsource-client/src/client.rs +++ b/eventsource-client/src/client.rs @@ -495,7 +495,21 @@ impl Stream for ReconnectingRequest { }, StateProj::Connected(mut body) => match ready!(body.as_mut().poll_next(cx)) { Some(Ok(result)) => { - this.event_parser.process_bytes(result)?; + if let Err(e) = this.event_parser.process_bytes(result) { + // A parse error means the current response body is + // unusable; schedule a reconnect to abandon it. + if self.props.reconnect_opts.reconnect { + let duration = self + .as_mut() + .project() + .retry_strategy + .next_delay(Instant::now()); + self.as_mut().project().state.set(State::WaitingToReconnect( + delay(duration, "reconnecting"), + )); + } + return Poll::Ready(Some(Err(e))); + } continue; } Some(Err(e)) => { @@ -712,4 +726,67 @@ mod tests { initial_connection(&mock_server.url(), retry_initial, expected).await; } + + // When a parse error happens during streaming and reconnect is + // enabled, the next stream item should be a fresh `Connected` from + // the reconnect, not another error from continuing to drain the + // broken response body. + #[cfg(feature = "hyper")] + #[tokio::test(flavor = "multi_thread")] + async fn parser_error_schedules_reconnect_immediately() { + use crate::{Client, ClientBuilder, ReconnectOptionsBuilder, SSE}; + use futures::StreamExt; + use launchdarkly_sdk_transport::HyperTransport; + + let mut server = mockito::Server::new_async().await; + let _mock = server + .mock("GET", "/") + .with_status(200) + .with_body(b"\xff\xfe:bad\n\n".as_ref()) + .expect_at_least(2) + .create_async() + .await; + + let transport = HyperTransport::new().expect("failed to build transport"); + let client = ClientBuilder::for_url(&server.url()) + .unwrap() + .reconnect( + ReconnectOptionsBuilder::new(true) + .delay(Duration::from_millis(10)) + .delay_max(Duration::from_millis(10)) + .retry_initial(true) + .build(), + ) + .build_with_transport(transport); + + let mut stream = client.stream(); + + // Expected order: Connected, parse error, Connected (reconnect). + let mut items = Vec::new(); + let _ = tokio::time::timeout(Duration::from_millis(500), async { + while items.len() < 3 { + match stream.next().await { + Some(item) => items.push(item), + None => break, + } + } + }) + .await; + + assert!( + matches!(items.first(), Some(Ok(SSE::Connected(_)))), + "expected initial Connected, got {:?}", + items.first() + ); + assert!( + matches!(items.get(1), Some(Err(_))), + "expected parse error after first connection, got {:?}", + items.get(1) + ); + assert!( + matches!(items.get(2), Some(Ok(SSE::Connected(_)))), + "expected reconnect (Connected) immediately after parse error, got {:?}", + items.get(2) + ); + } } From fc9c18b492e2520062a3f01a174933961458a0dd Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 8 May 2026 14:50:10 -0700 Subject: [PATCH 2/3] test: tighten parser-error reconnect test - Match `Error::InvalidLine(_)` specifically so a regression that produces a different error class is caught. - Replace `tokio::time::timeout(...).ok()` with `.expect(...)` so a flaky timeout produces a clear failure mode rather than a silent "got None" further down. - Bump the timeout to 2 s to absorb slower CI runners. - Drop the decorative `expect_at_least(2)` -- mockito's default `assert_on_drop` is false, so it enforced nothing; the `items[2] == Connected` assertion already requires the reconnect. --- eventsource-client/src/client.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/eventsource-client/src/client.rs b/eventsource-client/src/client.rs index c5de518..5b06fd9 100644 --- a/eventsource-client/src/client.rs +++ b/eventsource-client/src/client.rs @@ -734,7 +734,7 @@ mod tests { #[cfg(feature = "hyper")] #[tokio::test(flavor = "multi_thread")] async fn parser_error_schedules_reconnect_immediately() { - use crate::{Client, ClientBuilder, ReconnectOptionsBuilder, SSE}; + use crate::{Client, ClientBuilder, Error, ReconnectOptionsBuilder, SSE}; use futures::StreamExt; use launchdarkly_sdk_transport::HyperTransport; @@ -743,7 +743,6 @@ mod tests { .mock("GET", "/") .with_status(200) .with_body(b"\xff\xfe:bad\n\n".as_ref()) - .expect_at_least(2) .create_async() .await; @@ -763,7 +762,7 @@ mod tests { // Expected order: Connected, parse error, Connected (reconnect). let mut items = Vec::new(); - let _ = tokio::time::timeout(Duration::from_millis(500), async { + tokio::time::timeout(Duration::from_secs(2), async { while items.len() < 3 { match stream.next().await { Some(item) => items.push(item), @@ -771,7 +770,8 @@ mod tests { } } }) - .await; + .await + .expect("timed out waiting for parse error and reconnect"); assert!( matches!(items.first(), Some(Ok(SSE::Connected(_)))), @@ -779,8 +779,8 @@ mod tests { items.first() ); assert!( - matches!(items.get(1), Some(Err(_))), - "expected parse error after first connection, got {:?}", + matches!(items.get(1), Some(Err(Error::InvalidLine(_)))), + "expected InvalidLine error after first connection, got {:?}", items.get(1) ); assert!( From ddea87dfcee08b7583d5533cb6f4ca78fb10c200 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 8 May 2026 15:35:19 -0700 Subject: [PATCH 3/3] docs: clarify Client::stream() error / termination contract The previous docstring told consumers "Do not use the stream after it returned an error!" That instruction contradicts how the stream actually works: most errors are non-terminal, and the reconnect machinery only runs if the consumer keeps polling. Stream termination is signalled by `Poll::Ready(None)`, not by an error. Reword the docstring to describe the real contract: errors are informational, keep polling, the stream ends when poll_next returns None. --- eventsource-client/src/client.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/eventsource-client/src/client.rs b/eventsource-client/src/client.rs index 5b06fd9..804e88d 100644 --- a/eventsource-client/src/client.rs +++ b/eventsource-client/src/client.rs @@ -199,10 +199,20 @@ impl Client for ClientImpl { /// Connect to the server and begin consuming the stream. Produces a /// [`Stream`] of [`Event`](crate::Event)s wrapped in [`Result`]. /// - /// Do not use the stream after it returned an error! + /// Errors yielded by the stream are not terminal: keep polling. + /// When [`ReconnectOptions::reconnect`] is enabled (the default), + /// the stream schedules a reconnect on retryable errors and the + /// next poll resumes from a fresh connection. /// - /// After the first successful connection, the stream will - /// reconnect for retryable errors. + /// The stream is exhausted only when [`Stream::poll_next`] returns + /// [`Poll::Ready(None)`]. That happens when the underlying state + /// machine reaches `StreamClosed` (e.g. a redirect-limit overrun, + /// a malformed `Location` header, or an error during initial + /// connection while [`ReconnectOptions::retry_initial`] is + /// disabled), or after any error when reconnect is disabled. + /// + /// [`Poll::Ready(None)`]: std::task::Poll::Ready + /// [`Stream::poll_next`]: futures::Stream::poll_next fn stream(&self) -> BoxStream> { Box::pin(ReconnectingRequest::new( Arc::clone(&self.transport),