Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 91 additions & 4 deletions eventsource-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,20 @@ impl<T: HttpTransport> Client for ClientImpl<T> {
/// Connect to the server and begin consuming the stream. Produces a
/// [`Stream`] of [`Event`](crate::Event)s wrapped in [`Result`].
///
/// Do not use the stream after it returned an error!
/// Errors yielded by the stream are not terminal: keep polling.
/// When [`ReconnectOptions::reconnect`] is enabled (the default),
/// the stream schedules a reconnect on retryable errors and the
/// next poll resumes from a fresh connection.
///
/// After the first successful connection, the stream will
/// reconnect for retryable errors.
/// The stream is exhausted only when [`Stream::poll_next`] returns
/// [`Poll::Ready(None)`]. That happens when the underlying state
/// machine reaches `StreamClosed` (e.g. a redirect-limit overrun,
/// a malformed `Location` header, or an error during initial
/// connection while [`ReconnectOptions::retry_initial`] is
/// disabled), or after any error when reconnect is disabled.
///
/// [`Poll::Ready(None)`]: std::task::Poll::Ready
/// [`Stream::poll_next`]: futures::Stream::poll_next
fn stream(&self) -> BoxStream<Result<SSE>> {
Box::pin(ReconnectingRequest::new(
Arc::clone(&self.transport),
Expand Down Expand Up @@ -495,7 +505,21 @@ impl<T: HttpTransport> Stream for ReconnectingRequest<T> {
},
StateProj::Connected(mut body) => match ready!(body.as_mut().poll_next(cx)) {
Some(Ok(result)) => {
this.event_parser.process_bytes(result)?;
if let Err(e) = this.event_parser.process_bytes(result) {
// A parse error means the current response body is
// unusable; schedule a reconnect to abandon it.
if self.props.reconnect_opts.reconnect {
let duration = self
.as_mut()
.project()
.retry_strategy
.next_delay(Instant::now());
self.as_mut().project().state.set(State::WaitingToReconnect(
delay(duration, "reconnecting"),
));
}
return Poll::Ready(Some(Err(e)));
}
continue;
}
Some(Err(e)) => {
Expand Down Expand Up @@ -712,4 +736,67 @@ mod tests {

initial_connection(&mock_server.url(), retry_initial, expected).await;
}

// When a parse error happens during streaming and reconnect is
// enabled, the next stream item should be a fresh `Connected` from
// the reconnect, not another error from continuing to drain the
// broken response body.
#[cfg(feature = "hyper")]
#[tokio::test(flavor = "multi_thread")]
async fn parser_error_schedules_reconnect_immediately() {
use crate::{Client, ClientBuilder, Error, ReconnectOptionsBuilder, SSE};
use futures::StreamExt;
use launchdarkly_sdk_transport::HyperTransport;

let mut server = mockito::Server::new_async().await;
let _mock = server
.mock("GET", "/")
.with_status(200)
.with_body(b"\xff\xfe:bad\n\n".as_ref())
.create_async()
.await;

let transport = HyperTransport::new().expect("failed to build transport");
let client = ClientBuilder::for_url(&server.url())
.unwrap()
.reconnect(
ReconnectOptionsBuilder::new(true)
.delay(Duration::from_millis(10))
.delay_max(Duration::from_millis(10))
.retry_initial(true)
.build(),
)
.build_with_transport(transport);

let mut stream = client.stream();

// Expected order: Connected, parse error, Connected (reconnect).
let mut items = Vec::new();
tokio::time::timeout(Duration::from_secs(2), async {
while items.len() < 3 {
match stream.next().await {
Some(item) => items.push(item),
None => break,
}
}
})
.await
.expect("timed out waiting for parse error and reconnect");

assert!(
matches!(items.first(), Some(Ok(SSE::Connected(_)))),
"expected initial Connected, got {:?}",
items.first()
);
assert!(
matches!(items.get(1), Some(Err(Error::InvalidLine(_)))),
"expected InvalidLine error after first connection, got {:?}",
items.get(1)
);
assert!(
matches!(items.get(2), Some(Ok(SSE::Connected(_)))),
"expected reconnect (Connected) immediately after parse error, got {:?}",
items.get(2)
);
}
}
Loading