Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 135 additions & 24 deletions src/sources/websocket/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ impl WebSocketSource {

pin_mut!(ws_sink, ws_source);

info!(
message = "WebSocket source started.",
uri = %self.config.common.uri,
ping_interval = ?self.config.common.ping_interval.map(|v| format!("{}s", v)),
ping_message = ?self.config.ping_message,
);

loop {
let result = tokio::select! {
_ = cx.shutdown.clone() => {
Expand Down Expand Up @@ -132,21 +139,28 @@ impl WebSocketSource {
emit!(WebSocketConnectionShutdown);
}
WebSocketSourceError::PongTimeout => {
error!("Disconnecting due to pong timeout.");
warn!(
message = "Pong timeout — connection appears dead, will reconnect.",
uri = %self.config.common.uri,
);
emit!(WebSocketReceiveError {
error: &TungsteniteError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"Pong timeout"
))
});
emit!(WebSocketConnectionShutdown);
return Err(error);
}
WebSocketSourceError::Tungstenite { source: ws_err } => {
if is_closed(&ws_err) {
emit!(WebSocketConnectionShutdown);
}
error!(message = "WebSocket error.", error = %ws_err);
error!(
message = "WebSocket error.",
error = %ws_err,
uri = %self.config.common.uri,
is_closed = is_closed(&ws_err),
);
}
// These errors should only happen during `connect` or `reconnect`,
// not in the main loop's result.
Expand All @@ -171,6 +185,8 @@ impl WebSocketSource {
{
Ok(()) => {
backoff.reset();
ping_manager.reset();
info!("Ping manager reset after reconnect.");
break;
}
Err(err) => {
Expand All @@ -197,13 +213,14 @@ impl WebSocketSource {
) -> Result<(), WebSocketSourceError> {
match msg {
Message::Pong(_) => {
info!("Received pong response.");
ping_manager.record_pong();
Ok(())
}
Message::Text(msg_txt) => {
if self.is_custom_pong(&msg_txt) {
ping_manager.record_pong();
debug!("Received custom pong response.");
info!("Received custom pong response.");
} else {
self.process_message(&msg_txt, WebSocketKind::Text, out)
.await;
Expand All @@ -215,7 +232,10 @@ impl WebSocketSource {
.await;
Ok(())
}
Message::Ping(_) => Ok(()),
Message::Ping(_) => {
info!("Received ping from server (tungstenite auto-pongs).");
Ok(())
}
Message::Close(frame) => self.handle_close_frame(frame),
Message::Frame(_) => {
warn!("Unsupported message type received: frame.");
Expand Down Expand Up @@ -290,14 +310,20 @@ impl WebSocketSource {
ws_sink: &mut WebSocketSink,
ws_source: &mut WebSocketStream,
) -> Result<(), WebSocketSourceError> {
info!("Reconnecting to WebSocket...");
info!(
message = "Reconnecting to WebSocket...",
uri = %self.config.common.uri,
);

let (new_sink, new_source) = self.connect(out).await?;

*ws_sink = new_sink;
*ws_source = new_source;

info!("Reconnected.");
info!(
message = "Reconnected successfully.",
uri = %self.config.common.uri,
);

Ok(())
}
Expand Down Expand Up @@ -438,13 +464,19 @@ impl PingManager {
self.waiting_for_pong = false;
}

fn reset(&mut self) {
self.waiting_for_pong = false;
}

async fn tick(&mut self, ws_sink: &mut WebSocketSink) -> Result<(), WebSocketSourceError> {
self.interval.tick().await;

if self.waiting_for_pong {
warn!("Pong not received before next ping interval — declaring connection dead.");
return Err(WebSocketSourceError::PongTimeout);
}

info!("Sending ping.");
ws_sink.send(self.message.clone()).await.map_err(|error| {
emit!(WebSocketSendError { error: &error });
WebSocketSourceError::Tungstenite { source: error }
Expand All @@ -458,7 +490,6 @@ impl PingManager {
#[cfg(feature = "websocket-integration-tests")]
#[cfg(test)]
mod integration_test {
use crate::test_util::components::run_and_assert_source_error;
use crate::{
common::websocket::WebSocketCommonConfig,
sources::websocket::config::PongMessage,
Expand Down Expand Up @@ -602,6 +633,62 @@ mod integration_test {
assert_eq!(event["message"], "second message".into());
}

/// Starts a server that sends a message on the first connection, goes silent
/// (simulating a dead peer / network partition), then accepts a second connection
/// and sends another message.
async fn start_silent_then_recover_server() -> String {
let addr = next_addr();
let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
let server_addr = format!("ws://{}", listener.local_addr().unwrap());

tokio::spawn(async move {
// First connection: send a message then go completely silent
let (stream, _) = listener.accept().await.unwrap();
let mut websocket = accept_async(stream).await.expect("Failed to accept");
websocket
.send(Message::Text("before silence".to_string()))
.await
.unwrap();
// Hold the connection open but never respond to anything — simulating a dead peer.
// The client's ping will go unanswered, triggering a pong timeout.

// Second connection: after client detects dead connection and reconnects
let (stream2, _) = listener.accept().await.unwrap();
let mut websocket2 = accept_async(stream2).await.expect("Failed to accept");
websocket2
.send(Message::Text("after reconnect".to_string()))
.await
.unwrap();

// Keep first connection alive so it doesn't produce a TCP RST
tokio::time::sleep(Duration::from_secs(30)).await;
drop(websocket);
});

server_addr
}

/// When the server goes silent, the source should detect the dead connection via
/// pong timeout, reconnect, and continue receiving data — NOT exit permanently.
#[tokio::test(flavor = "multi_thread")]
async fn websocket_source_reconnects_after_pong_timeout() {
let server_addr = start_silent_then_recover_server().await;

let mut config = make_config(&server_addr);
config.common.ping_interval = NonZeroU64::new(1);

let events =
run_and_assert_source_compliance(config, Duration::from_secs(5), &SOURCE_TAGS).await;

assert_eq!(
events.len(),
2,
"Should reconnect after pong timeout and receive messages from both connections"
);
assert_eq!(events[0].as_log()["message"], "before silence".into());
assert_eq!(events[1].as_log()["message"], "after reconnect".into());
}

#[tokio::test(flavor = "multi_thread")]
async fn websocket_source_consume_binary_event() {
let server_addr = start_binary_push_server().await;
Expand Down Expand Up @@ -738,37 +825,61 @@ mod integration_test {
assert_eq!(events[1].as_log()["message"], "after close".into());
}

async fn start_unresponsive_server() -> String {
/// Starts a server that sends a message on the first connection then stops responding
/// to pings. When the client reconnects (after pong timeout), the server accepts the
/// new connection and sends another message.
async fn start_unresponsive_then_recover_server() -> String {
let addr = next_addr();
let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
let server_addr = format!("ws://{}", listener.local_addr().unwrap());

tokio::spawn(async move {
if let Ok((stream, _)) = listener.accept().await {
// Accept the connection to establish the WebSocket.
let mut websocket = accept_async(stream).await.expect("Failed to accept");
// Simply wait forever without responding to pings.
while websocket.next().await.is_some() {
// Do nothing
}
}
// First connection: send a message then stop responding to pings.
// Spawn into a separate task so the listener can accept the second connection
// while the first is still alive (the client's reconnect() creates a new
// connection before dropping the old one).
let (stream, _) = listener.accept().await.unwrap();
let mut websocket = accept_async(stream).await.expect("Failed to accept");
websocket
.send(Message::Text("before pong timeout".to_string()))
.await
.unwrap();
tokio::spawn(async move {
// Consume messages silently (don't respond to pings)
while websocket.next().await.is_some() {}
});

// Second connection: after client reconnects due to pong timeout
let (stream, _) = listener.accept().await.unwrap();
let mut websocket = accept_async(stream).await.expect("Failed to accept");
websocket
.send(Message::Text("after pong timeout".to_string()))
.await
.unwrap();
});

server_addr
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn websocket_source_exits_on_pong_timeout() {
let server_addr = start_unresponsive_server().await;
#[tokio::test(flavor = "multi_thread")]
async fn websocket_source_reconnects_on_pong_timeout() {
let server_addr = start_unresponsive_then_recover_server().await;

let mut config = make_config(&server_addr);
config.common.ping_interval = NonZeroU64::new(3);
config.common.ping_timeout = NonZeroU64::new(1);
config.common.ping_interval = NonZeroU64::new(1);
config.ping_message = Some("ping".to_string());
config.pong_message = Some(PongMessage::Simple("pong".to_string()));

// The source should fail because the server never sends a pong.
run_and_assert_source_error(config, Duration::from_secs(5), &SOURCE_TAGS).await;
let events =
run_and_assert_source_compliance(config, Duration::from_secs(8), &SOURCE_TAGS).await;

assert_eq!(
events.len(),
2,
"Should reconnect after pong timeout and receive messages from both connections"
);
assert_eq!(events[0].as_log()["message"], "before pong timeout".into());
assert_eq!(events[1].as_log()["message"], "after pong timeout".into());
}

}