diff --git a/eventsource-client/src/client.rs b/eventsource-client/src/client.rs index 804e88d..4eb485e 100644 --- a/eventsource-client/src/client.rs +++ b/eventsource-client/src/client.rs @@ -506,8 +506,10 @@ impl Stream for ReconnectingRequest { StateProj::Connected(mut body) => match ready!(body.as_mut().poll_next(cx)) { Some(Ok(result)) => { if let Err(e) = this.event_parser.process_bytes(result) { - // A parse error means the current response body is - // unusable; schedule a reconnect to abandon it. + // The current response body is unusable. Either + // schedule a reconnect or close the stream so a + // caller that disabled reconnect doesn't keep + // reading from a poisoned parser. if self.props.reconnect_opts.reconnect { let duration = self .as_mut() @@ -517,6 +519,8 @@ impl Stream for ReconnectingRequest { self.as_mut().project().state.set(State::WaitingToReconnect( delay(duration, "reconnecting"), )); + } else { + self.as_mut().project().state.set(State::StreamClosed); } return Poll::Ready(Some(Err(e))); } @@ -547,15 +551,19 @@ impl Stream for ReconnectingRequest { return Poll::Ready(Some(Err(Error::Transport(e)))); } None => { - let duration = self - .as_mut() - .project() - .retry_strategy - .next_delay(Instant::now()); - self.as_mut() - .project() - .state - .set(State::WaitingToReconnect(delay(duration, "retrying"))); + if self.props.reconnect_opts.reconnect { + let duration = self + .as_mut() + .project() + .retry_strategy + .next_delay(Instant::now()); + self.as_mut() + .project() + .state + .set(State::WaitingToReconnect(delay(duration, "retrying"))); + } else { + self.as_mut().project().state.set(State::StreamClosed); + } if self.event_parser.was_processing() { return Poll::Ready(Some(Err(Error::UnexpectedEof))); @@ -799,4 +807,134 @@ mod tests { items.get(2) ); } + + // With reconnect disabled, a parse error should close the stream so the + // next poll returns `None` rather than continuing to read from a poisoned + // parser or reconnecting via the EOF arm. + #[cfg(feature = "hyper")] + #[tokio::test(flavor = "multi_thread")] + async fn parser_error_closes_stream_when_reconnect_disabled() { + use crate::{Client, ClientBuilder, Error, ReconnectOptionsBuilder, SSE}; + use futures::StreamExt; + use launchdarkly_sdk_transport::HyperTransport; + + let mut server = mockito::Server::new_async().await; + let _mock = server + .mock("GET", "/") + .with_status(200) + .with_body(b"\xff\xfe:bad\n\n".as_ref()) + .create_async() + .await; + + let transport = HyperTransport::new().expect("failed to build transport"); + let client = ClientBuilder::for_url(&server.url()) + .unwrap() + .reconnect( + ReconnectOptionsBuilder::new(false) + .retry_initial(true) + .build(), + ) + .build_with_transport(transport); + + let mut stream = client.stream(); + + let mut items = Vec::new(); + tokio::time::timeout(Duration::from_secs(2), async { + while items.len() < 3 { + match stream.next().await { + Some(item) => items.push(item), + None => { + items.push(Ok(SSE::Comment("__stream_ended__".into()))); + break; + } + } + } + }) + .await + .expect("timed out waiting for stream to close"); + + assert!( + matches!(items.first(), Some(Ok(SSE::Connected(_)))), + "expected initial Connected, got {:?}", + items.first() + ); + assert!( + matches!(items.get(1), Some(Err(Error::InvalidLine(_)))), + "expected InvalidLine error, got {:?}", + items.get(1) + ); + assert!( + matches!( + items.get(2), + Some(Ok(SSE::Comment(s))) if s == "__stream_ended__" + ), + "expected stream to end (None) after parse error with reconnect disabled, got {:?}", + items.get(2) + ); + } + + // With reconnect disabled, a clean end-of-body should close the stream + // rather than scheduling a reconnect. + #[cfg(feature = "hyper")] + #[tokio::test(flavor = "multi_thread")] + async fn eof_closes_stream_when_reconnect_disabled() { + use crate::{Client, ClientBuilder, Error, ReconnectOptionsBuilder, SSE}; + use futures::StreamExt; + use launchdarkly_sdk_transport::HyperTransport; + + let mut server = mockito::Server::new_async().await; + let _mock = server + .mock("GET", "/") + .with_status(200) + .with_body("event: hello\ndata: world\n\n") + .create_async() + .await; + + let transport = HyperTransport::new().expect("failed to build transport"); + let client = ClientBuilder::for_url(&server.url()) + .unwrap() + .reconnect( + ReconnectOptionsBuilder::new(false) + .retry_initial(true) + .build(), + ) + .build_with_transport(transport); + + let mut stream = client.stream(); + + let mut items: Vec>> = Vec::new(); + tokio::time::timeout(Duration::from_secs(2), async { + for _ in 0..4 { + let item = stream.next().await; + let is_terminal = item.is_none(); + items.push(item); + if is_terminal { + break; + } + } + }) + .await + .expect("timed out waiting for stream to close"); + + assert!( + matches!(items.first(), Some(Some(Ok(SSE::Connected(_))))), + "expected initial Connected, got {:?}", + items.first() + ); + assert!( + matches!(items.get(1), Some(Some(Ok(SSE::Event(e)))) if e.event_type == "hello"), + "expected hello event, got {:?}", + items.get(1) + ); + assert!( + matches!(items.get(2), Some(Some(Err(Error::Eof)))), + "expected Eof error after body ends, got {:?}", + items.get(2) + ); + assert!( + matches!(items.get(3), Some(None)), + "expected stream to end (None) after EOF with reconnect disabled, got {:?}", + items.get(3) + ); + } } diff --git a/eventsource-client/src/retry.rs b/eventsource-client/src/retry.rs index 404699e..7f42ad1 100644 --- a/eventsource-client/src/retry.rs +++ b/eventsource-client/src/retry.rs @@ -15,6 +15,11 @@ pub(crate) trait RetryStrategy { const DEFAULT_RESET_RETRY_INTERVAL: Duration = Duration::from_secs(60); +/// Floor applied to a server-supplied SSE `retry:` value. A server that +/// sends `retry: 0` would otherwise collapse the backoff to zero and +/// reconnect would become a tight loop. +const MINIMUM_BASE_DELAY: Duration = Duration::from_millis(1); + pub(crate) struct BackoffRetry { base_delay: Duration, max_delay: Duration, @@ -66,7 +71,7 @@ impl RetryStrategy for BackoffRetry { } fn change_base_delay(&mut self, base_delay: Duration) { - self.base_delay = base_delay; + self.base_delay = std::cmp::max(base_delay, MINIMUM_BASE_DELAY); self.next_delay = self.base_delay; } @@ -111,6 +116,24 @@ mod tests { assert_eq!(retry.next_delay(start.add(Duration::from_secs(2))), base); } + #[test] + fn test_change_base_delay_clamps_to_minimum() { + // A server that sends `retry: 0` would otherwise produce a zero-delay + // reconnect loop. The clamp ensures the next delay stays at least + // MINIMUM_BASE_DELAY. + let mut retry = + BackoffRetry::new(Duration::from_secs(10), Duration::from_secs(30), 1, false); + let start = Instant::now(); + + retry.change_base_delay(Duration::ZERO); + assert!(retry.next_delay(start) >= super::MINIMUM_BASE_DELAY); + + // Sub-minimum values are also clamped, not silently accepted. + let below_minimum = super::MINIMUM_BASE_DELAY / 2; + retry.change_base_delay(below_minimum); + assert!(retry.next_delay(start) >= super::MINIMUM_BASE_DELAY); + } + #[test] fn test_with_backoff() { let base = Duration::from_secs(10);