diff --git a/src/sources/websocket/source.rs b/src/sources/websocket/source.rs index 91a7bece4..ec1470770 100644 --- a/src/sources/websocket/source.rs +++ b/src/sources/websocket/source.rs @@ -93,6 +93,13 @@ impl WebSocketSource { pin_mut!(ws_sink, ws_source); + info!( + message = "WebSocket source started.", + uri = %self.config.common.uri, + ping_interval = ?self.config.common.ping_interval.map(|v| format!("{}s", v)), + ping_message = ?self.config.ping_message, + ); + loop { let result = tokio::select! { _ = cx.shutdown.clone() => { @@ -132,7 +139,10 @@ impl WebSocketSource { emit!(WebSocketConnectionShutdown); } WebSocketSourceError::PongTimeout => { - error!("Disconnecting due to pong timeout."); + warn!( + message = "Pong timeout — connection appears dead, will reconnect.", + uri = %self.config.common.uri, + ); emit!(WebSocketReceiveError { error: &TungsteniteError::Io(std::io::Error::new( std::io::ErrorKind::TimedOut, @@ -140,13 +150,17 @@ impl WebSocketSource { )) }); emit!(WebSocketConnectionShutdown); - return Err(error); } WebSocketSourceError::Tungstenite { source: ws_err } => { if is_closed(&ws_err) { emit!(WebSocketConnectionShutdown); } - error!(message = "WebSocket error.", error = %ws_err); + error!( + message = "WebSocket error.", + error = %ws_err, + uri = %self.config.common.uri, + is_closed = is_closed(&ws_err), + ); } // These errors should only happen during `connect` or `reconnect`, // not in the main loop's result. @@ -171,6 +185,8 @@ impl WebSocketSource { { Ok(()) => { backoff.reset(); + ping_manager.reset(); + info!("Ping manager reset after reconnect."); break; } Err(err) => { @@ -197,13 +213,14 @@ impl WebSocketSource { ) -> Result<(), WebSocketSourceError> { match msg { Message::Pong(_) => { + info!("Received pong response."); ping_manager.record_pong(); Ok(()) } Message::Text(msg_txt) => { if self.is_custom_pong(&msg_txt) { ping_manager.record_pong(); - debug!("Received custom pong response."); + info!("Received custom pong response."); } else { self.process_message(&msg_txt, WebSocketKind::Text, out) .await; @@ -215,7 +232,10 @@ impl WebSocketSource { .await; Ok(()) } - Message::Ping(_) => Ok(()), + Message::Ping(_) => { + info!("Received ping from server (tungstenite auto-pongs)."); + Ok(()) + } Message::Close(frame) => self.handle_close_frame(frame), Message::Frame(_) => { warn!("Unsupported message type received: frame."); @@ -290,14 +310,20 @@ impl WebSocketSource { ws_sink: &mut WebSocketSink, ws_source: &mut WebSocketStream, ) -> Result<(), WebSocketSourceError> { - info!("Reconnecting to WebSocket..."); + info!( + message = "Reconnecting to WebSocket...", + uri = %self.config.common.uri, + ); let (new_sink, new_source) = self.connect(out).await?; *ws_sink = new_sink; *ws_source = new_source; - info!("Reconnected."); + info!( + message = "Reconnected successfully.", + uri = %self.config.common.uri, + ); Ok(()) } @@ -438,13 +464,19 @@ impl PingManager { self.waiting_for_pong = false; } + fn reset(&mut self) { + self.waiting_for_pong = false; + } + async fn tick(&mut self, ws_sink: &mut WebSocketSink) -> Result<(), WebSocketSourceError> { self.interval.tick().await; if self.waiting_for_pong { + warn!("Pong not received before next ping interval — declaring connection dead."); return Err(WebSocketSourceError::PongTimeout); } + info!("Sending ping."); ws_sink.send(self.message.clone()).await.map_err(|error| { emit!(WebSocketSendError { error: &error }); WebSocketSourceError::Tungstenite { source: error } @@ -458,7 +490,6 @@ impl PingManager { #[cfg(feature = "websocket-integration-tests")] #[cfg(test)] mod integration_test { - use crate::test_util::components::run_and_assert_source_error; use crate::{ common::websocket::WebSocketCommonConfig, sources::websocket::config::PongMessage, @@ -602,6 +633,62 @@ mod integration_test { assert_eq!(event["message"], "second message".into()); } + /// Starts a server that sends a message on the first connection, goes silent + /// (simulating a dead peer / network partition), then accepts a second connection + /// and sends another message. + async fn start_silent_then_recover_server() -> String { + let addr = next_addr(); + let listener = TcpListener::bind(&addr).await.expect("Failed to bind"); + let server_addr = format!("ws://{}", listener.local_addr().unwrap()); + + tokio::spawn(async move { + // First connection: send a message then go completely silent + let (stream, _) = listener.accept().await.unwrap(); + let mut websocket = accept_async(stream).await.expect("Failed to accept"); + websocket + .send(Message::Text("before silence".to_string())) + .await + .unwrap(); + // Hold the connection open but never respond to anything — simulating a dead peer. + // The client's ping will go unanswered, triggering a pong timeout. + + // Second connection: after client detects dead connection and reconnects + let (stream2, _) = listener.accept().await.unwrap(); + let mut websocket2 = accept_async(stream2).await.expect("Failed to accept"); + websocket2 + .send(Message::Text("after reconnect".to_string())) + .await + .unwrap(); + + // Keep first connection alive so it doesn't produce a TCP RST + tokio::time::sleep(Duration::from_secs(30)).await; + drop(websocket); + }); + + server_addr + } + + /// When the server goes silent, the source should detect the dead connection via + /// pong timeout, reconnect, and continue receiving data — NOT exit permanently. + #[tokio::test(flavor = "multi_thread")] + async fn websocket_source_reconnects_after_pong_timeout() { + let server_addr = start_silent_then_recover_server().await; + + let mut config = make_config(&server_addr); + config.common.ping_interval = NonZeroU64::new(1); + + let events = + run_and_assert_source_compliance(config, Duration::from_secs(5), &SOURCE_TAGS).await; + + assert_eq!( + events.len(), + 2, + "Should reconnect after pong timeout and receive messages from both connections" + ); + assert_eq!(events[0].as_log()["message"], "before silence".into()); + assert_eq!(events[1].as_log()["message"], "after reconnect".into()); + } + #[tokio::test(flavor = "multi_thread")] async fn websocket_source_consume_binary_event() { let server_addr = start_binary_push_server().await; @@ -738,37 +825,61 @@ mod integration_test { assert_eq!(events[1].as_log()["message"], "after close".into()); } - async fn start_unresponsive_server() -> String { + /// Starts a server that sends a message on the first connection then stops responding + /// to pings. When the client reconnects (after pong timeout), the server accepts the + /// new connection and sends another message. + async fn start_unresponsive_then_recover_server() -> String { let addr = next_addr(); let listener = TcpListener::bind(&addr).await.expect("Failed to bind"); let server_addr = format!("ws://{}", listener.local_addr().unwrap()); tokio::spawn(async move { - if let Ok((stream, _)) = listener.accept().await { - // Accept the connection to establish the WebSocket. - let mut websocket = accept_async(stream).await.expect("Failed to accept"); - // Simply wait forever without responding to pings. - while websocket.next().await.is_some() { - // Do nothing - } - } + // First connection: send a message then stop responding to pings. + // Spawn into a separate task so the listener can accept the second connection + // while the first is still alive (the client's reconnect() creates a new + // connection before dropping the old one). + let (stream, _) = listener.accept().await.unwrap(); + let mut websocket = accept_async(stream).await.expect("Failed to accept"); + websocket + .send(Message::Text("before pong timeout".to_string())) + .await + .unwrap(); + tokio::spawn(async move { + // Consume messages silently (don't respond to pings) + while websocket.next().await.is_some() {} + }); + + // Second connection: after client reconnects due to pong timeout + let (stream, _) = listener.accept().await.unwrap(); + let mut websocket = accept_async(stream).await.expect("Failed to accept"); + websocket + .send(Message::Text("after pong timeout".to_string())) + .await + .unwrap(); }); server_addr } - #[tokio::test(flavor = "current_thread", start_paused = true)] - async fn websocket_source_exits_on_pong_timeout() { - let server_addr = start_unresponsive_server().await; + #[tokio::test(flavor = "multi_thread")] + async fn websocket_source_reconnects_on_pong_timeout() { + let server_addr = start_unresponsive_then_recover_server().await; let mut config = make_config(&server_addr); - config.common.ping_interval = NonZeroU64::new(3); - config.common.ping_timeout = NonZeroU64::new(1); + config.common.ping_interval = NonZeroU64::new(1); config.ping_message = Some("ping".to_string()); config.pong_message = Some(PongMessage::Simple("pong".to_string())); - // The source should fail because the server never sends a pong. - run_and_assert_source_error(config, Duration::from_secs(5), &SOURCE_TAGS).await; + let events = + run_and_assert_source_compliance(config, Duration::from_secs(8), &SOURCE_TAGS).await; + + assert_eq!( + events.len(), + 2, + "Should reconnect after pong timeout and receive messages from both connections" + ); + assert_eq!(events[0].as_log()["message"], "before pong timeout".into()); + assert_eq!(events[1].as_log()["message"], "after pong timeout".into()); } }