Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 149 additions & 11 deletions eventsource-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,10 @@ impl<T: HttpTransport> Stream for ReconnectingRequest<T> {
StateProj::Connected(mut body) => match ready!(body.as_mut().poll_next(cx)) {
Some(Ok(result)) => {
if let Err(e) = this.event_parser.process_bytes(result) {
// A parse error means the current response body is
// unusable; schedule a reconnect to abandon it.
// The current response body is unusable. Either
// schedule a reconnect or close the stream so a
// caller that disabled reconnect doesn't keep
// reading from a poisoned parser.
if self.props.reconnect_opts.reconnect {
let duration = self
.as_mut()
Expand All @@ -517,6 +519,8 @@ impl<T: HttpTransport> Stream for ReconnectingRequest<T> {
self.as_mut().project().state.set(State::WaitingToReconnect(
delay(duration, "reconnecting"),
));
} else {
self.as_mut().project().state.set(State::StreamClosed);
}
return Poll::Ready(Some(Err(e)));
}
Expand Down Expand Up @@ -547,15 +551,19 @@ impl<T: HttpTransport> Stream for ReconnectingRequest<T> {
return Poll::Ready(Some(Err(Error::Transport(e))));
}
None => {
let duration = self
.as_mut()
.project()
.retry_strategy
.next_delay(Instant::now());
self.as_mut()
.project()
.state
.set(State::WaitingToReconnect(delay(duration, "retrying")));
if self.props.reconnect_opts.reconnect {
let duration = self
.as_mut()
.project()
.retry_strategy
.next_delay(Instant::now());
self.as_mut()
.project()
.state
.set(State::WaitingToReconnect(delay(duration, "retrying")));
} else {
self.as_mut().project().state.set(State::StreamClosed);
}

if self.event_parser.was_processing() {
return Poll::Ready(Some(Err(Error::UnexpectedEof)));
Expand Down Expand Up @@ -799,4 +807,134 @@ mod tests {
items.get(2)
);
}

// With reconnect disabled, a parse error should close the stream so the
// next poll returns `None` rather than continuing to read from a poisoned
// parser or reconnecting via the EOF arm.
#[cfg(feature = "hyper")]
#[tokio::test(flavor = "multi_thread")]
async fn parser_error_closes_stream_when_reconnect_disabled() {
use crate::{Client, ClientBuilder, Error, ReconnectOptionsBuilder, SSE};
use futures::StreamExt;
use launchdarkly_sdk_transport::HyperTransport;

let mut server = mockito::Server::new_async().await;
let _mock = server
.mock("GET", "/")
.with_status(200)
.with_body(b"\xff\xfe:bad\n\n".as_ref())
.create_async()
.await;

let transport = HyperTransport::new().expect("failed to build transport");
let client = ClientBuilder::for_url(&server.url())
.unwrap()
.reconnect(
ReconnectOptionsBuilder::new(false)
.retry_initial(true)
.build(),
)
.build_with_transport(transport);

let mut stream = client.stream();

let mut items = Vec::new();
tokio::time::timeout(Duration::from_secs(2), async {
while items.len() < 3 {
match stream.next().await {
Some(item) => items.push(item),
None => {
items.push(Ok(SSE::Comment("__stream_ended__".into())));
break;
}
}
}
})
.await
.expect("timed out waiting for stream to close");

assert!(
matches!(items.first(), Some(Ok(SSE::Connected(_)))),
"expected initial Connected, got {:?}",
items.first()
);
assert!(
matches!(items.get(1), Some(Err(Error::InvalidLine(_)))),
"expected InvalidLine error, got {:?}",
items.get(1)
);
assert!(
matches!(
items.get(2),
Some(Ok(SSE::Comment(s))) if s == "__stream_ended__"
),
"expected stream to end (None) after parse error with reconnect disabled, got {:?}",
items.get(2)
);
}

// With reconnect disabled, a clean end-of-body should close the stream
// rather than scheduling a reconnect.
#[cfg(feature = "hyper")]
#[tokio::test(flavor = "multi_thread")]
async fn eof_closes_stream_when_reconnect_disabled() {
use crate::{Client, ClientBuilder, Error, ReconnectOptionsBuilder, SSE};
use futures::StreamExt;
use launchdarkly_sdk_transport::HyperTransport;

let mut server = mockito::Server::new_async().await;
let _mock = server
.mock("GET", "/")
.with_status(200)
.with_body("event: hello\ndata: world\n\n")
.create_async()
.await;

let transport = HyperTransport::new().expect("failed to build transport");
let client = ClientBuilder::for_url(&server.url())
.unwrap()
.reconnect(
ReconnectOptionsBuilder::new(false)
.retry_initial(true)
.build(),
)
.build_with_transport(transport);

let mut stream = client.stream();

let mut items: Vec<Option<crate::Result<SSE>>> = Vec::new();
tokio::time::timeout(Duration::from_secs(2), async {
for _ in 0..4 {
let item = stream.next().await;
let is_terminal = item.is_none();
items.push(item);
if is_terminal {
break;
}
}
})
.await
.expect("timed out waiting for stream to close");

assert!(
matches!(items.first(), Some(Some(Ok(SSE::Connected(_))))),
"expected initial Connected, got {:?}",
items.first()
);
assert!(
matches!(items.get(1), Some(Some(Ok(SSE::Event(e)))) if e.event_type == "hello"),
"expected hello event, got {:?}",
items.get(1)
);
assert!(
matches!(items.get(2), Some(Some(Err(Error::Eof)))),
"expected Eof error after body ends, got {:?}",
items.get(2)
);
assert!(
matches!(items.get(3), Some(None)),
"expected stream to end (None) after EOF with reconnect disabled, got {:?}",
items.get(3)
);
}
}
25 changes: 24 additions & 1 deletion eventsource-client/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ pub(crate) trait RetryStrategy {

const DEFAULT_RESET_RETRY_INTERVAL: Duration = Duration::from_secs(60);

/// Floor applied to a server-supplied SSE `retry:` value. A server that
/// sends `retry: 0` would otherwise collapse the backoff to zero and
/// reconnect would become a tight loop.
const MINIMUM_BASE_DELAY: Duration = Duration::from_millis(1);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want this larger.


pub(crate) struct BackoffRetry {
base_delay: Duration,
max_delay: Duration,
Expand Down Expand Up @@ -66,7 +71,7 @@ impl RetryStrategy for BackoffRetry {
}

fn change_base_delay(&mut self, base_delay: Duration) {
self.base_delay = base_delay;
self.base_delay = std::cmp::max(base_delay, MINIMUM_BASE_DELAY);
self.next_delay = self.base_delay;
}

Expand Down Expand Up @@ -111,6 +116,24 @@ mod tests {
assert_eq!(retry.next_delay(start.add(Duration::from_secs(2))), base);
}

#[test]
fn test_change_base_delay_clamps_to_minimum() {
// A server that sends `retry: 0` would otherwise produce a zero-delay
// reconnect loop. The clamp ensures the next delay stays at least
// MINIMUM_BASE_DELAY.
let mut retry =
BackoffRetry::new(Duration::from_secs(10), Duration::from_secs(30), 1, false);
let start = Instant::now();

retry.change_base_delay(Duration::ZERO);
assert!(retry.next_delay(start) >= super::MINIMUM_BASE_DELAY);

// Sub-minimum values are also clamped, not silently accepted.
let below_minimum = super::MINIMUM_BASE_DELAY / 2;
retry.change_base_delay(below_minimum);
assert!(retry.next_delay(start) >= super::MINIMUM_BASE_DELAY);
}

#[test]
fn test_with_backoff() {
let base = Duration::from_secs(10);
Expand Down
Loading