diff --git a/Cargo.lock b/Cargo.lock index e121b146..a68aed07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -1932,6 +1932,7 @@ dependencies = [ "serial_test", "thiserror", "tokio", + "tracing", "url", ] @@ -2456,9 +2457,21 @@ checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.89", +] + [[package]] name = "tracing-core" version = "0.1.32" diff --git a/engineio/Cargo.toml b/engineio/Cargo.toml index b38c5a6c..ac743b72 100644 --- a/engineio/Cargo.toml +++ b/engineio/Cargo.toml @@ -50,6 +50,9 @@ harness = false [lib] bench = false +[lints.rust] +unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tarpaulin)'] } + [features] default = ["async"] async-callbacks = [] diff --git a/engineio/src/asynchronous/async_socket.rs b/engineio/src/asynchronous/async_socket.rs index 8718e0ea..a3e07242 100644 --- a/engineio/src/asynchronous/async_socket.rs +++ b/engineio/src/asynchronous/async_socket.rs @@ -15,7 +15,7 @@ use tokio::{runtime::Handle, sync::Mutex, time::Instant}; use crate::{ asynchronous::{callback::OptionalCallback, transport::AsyncTransportType}, error::Result, - packet::{HandshakePacket, Payload}, + packet::{HandshakePacket, Payload, RAW_BINARY_MARKER}, Error, Packet, PacketId, }; @@ -119,10 +119,20 @@ impl Socket { /// Helper method that parses bytes and returns an iterator over the elements. fn parse_payload(bytes: Bytes) -> impl Stream> { try_stream! { - let payload = Payload::try_from(bytes); - - for elem in payload?.into_iter() { - yield elem; + let first_byte = bytes.first().copied(); + + // Check for raw binary marker - always single packet, never split + if first_byte == Some(RAW_BINARY_MARKER) { + yield Packet::try_from(bytes)?; + } else if !bytes.iter().any(|&b| b == 0x1e) { + // No separator - definitely a single packet + yield Packet::try_from(bytes)?; + } else { + // Has separator - use standard Payload parsing for batched packets + let payload = Payload::try_from(bytes)?; + for elem in payload.into_iter() { + yield elem; + } } } } diff --git a/engineio/src/asynchronous/async_transports/websocket_general.rs b/engineio/src/asynchronous/async_transports/websocket_general.rs index 2b23a8eb..f57107ae 100644 --- a/engineio/src/asynchronous/async_transports/websocket_general.rs +++ b/engineio/src/asynchronous/async_transports/websocket_general.rs @@ -86,8 +86,10 @@ impl AsyncWebsocketGeneralTransport { match next { Some(Ok(Message::Text(str))) => return Ok(Some(Bytes::from(str))), Some(Ok(Message::Binary(data))) => { + // For WebSocket binary frames (attachments), use 'B' (0x42) as marker + // This indicates raw binary that should bypass normal packet parsing let mut msg = BytesMut::with_capacity(data.len() + 1); - msg.put_u8(PacketId::Message as u8); + msg.put_u8(b'B'); // Raw binary marker msg.put(data.as_ref()); return Ok(Some(msg.freeze())); @@ -115,8 +117,10 @@ impl Stream for AsyncWebsocketGeneralTransport { match next { Some(Ok(Message::Text(str))) => return Poll::Ready(Some(Ok(Bytes::from(str)))), Some(Ok(Message::Binary(data))) => { + // For WebSocket binary frames (attachments), use 'B' (0x42) as marker + // This indicates raw binary that should bypass normal packet parsing let mut msg = BytesMut::with_capacity(data.len() + 1); - msg.put_u8(PacketId::Message as u8); + msg.put_u8(b'B'); // Raw binary marker (not a standard Engine.IO packet type) msg.put(data.as_ref()); return Poll::Ready(Some(Ok(msg.freeze()))); diff --git a/engineio/src/client/client.rs b/engineio/src/client/client.rs index dc22ff77..b823d789 100644 --- a/engineio/src/client/client.rs +++ b/engineio/src/client/client.rs @@ -353,12 +353,14 @@ impl Client { self.socket.is_connected() } - pub fn iter(&self) -> Iter { + #[allow(dead_code)] + pub fn iter(&self) -> Iter<'_> { Iter { socket: self } } } #[derive(Clone)] +#[allow(dead_code)] pub struct Iter<'a> { socket: &'a Client, } diff --git a/engineio/src/packet.rs b/engineio/src/packet.rs index 9238428f..c991e03c 100644 --- a/engineio/src/packet.rs +++ b/engineio/src/packet.rs @@ -8,6 +8,10 @@ use std::fmt::{Display, Formatter, Result as FmtResult, Write}; use std::ops::Index; use crate::error::{Error, Result}; + +/// Byte marker for raw binary payloads (WebSocket binary frames), as opposed to base64 'b'. +pub const RAW_BINARY_MARKER: u8 = b'B'; + /// Enumeration of the `engine.io` `Packet` types. #[derive(Copy, Clone, Eq, PartialEq, Debug)] pub enum PacketId { @@ -65,6 +69,7 @@ impl TryFrom for PacketId { 4 | b'4' => Ok(PacketId::Message), 5 | b'5' => Ok(PacketId::Upgrade), 6 | b'6' => Ok(PacketId::Noop), + RAW_BINARY_MARKER => Ok(PacketId::MessageBinary), // Raw binary marker from WebSocket binary frames _ => Err(Error::InvalidPacketId(b)), } } @@ -115,13 +120,17 @@ impl TryFrom for Packet { return Err(Error::IncompletePacket()); } - let is_base64 = *bytes.first().ok_or(Error::IncompletePacket())? == b'b'; + let first_byte = *bytes.first().ok_or(Error::IncompletePacket())?; + let is_base64 = first_byte == b'b'; + let is_raw_binary = first_byte == RAW_BINARY_MARKER; - // only 'messages' packets could be encoded + // Determine packet type let packet_id = if is_base64 { PacketId::MessageBinary + } else if is_raw_binary { + PacketId::MessageBinary // Raw binary also becomes MessageBinary } else { - (*bytes.first().ok_or(Error::IncompletePacket())?).try_into()? + first_byte.try_into()? }; if bytes.len() == 1 && packet_id == PacketId::Message { @@ -133,8 +142,10 @@ impl TryFrom for Packet { Ok(Packet { packet_id, data: if is_base64 { + // Base64 encoded binary needs decoding Bytes::from(general_purpose::STANDARD.decode(data.as_ref())?) } else { + // Raw binary (from 'B' marker) or regular text - pass through as-is data }, }) diff --git a/socketio/Cargo.toml b/socketio/Cargo.toml index 8e9b3deb..e045963a 100644 --- a/socketio/Cargo.toml +++ b/socketio/Cargo.toml @@ -39,6 +39,9 @@ version = "1.40.0" # we need the `#[tokio::test]` macro features = ["macros", "rt-multi-thread"] +[lints.rust] +unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tarpaulin)'] } + [features] default = [] async-callbacks = ["rust_engineio/async-callbacks"] diff --git a/socketio/src/asynchronous/client/client.rs b/socketio/src/asynchronous/client/client.rs index 01991056..7889ca4e 100644 --- a/socketio/src/asynchronous/client/client.rs +++ b/socketio/src/asynchronous/client/client.rs @@ -472,8 +472,34 @@ impl Client { /// Handles a binary event. #[inline] async fn handle_binary_event(&self, packet: &Packet) -> Result<()> { - let event = if let Some(string_data) = &packet.data { - string_data.replace('\"', "").into() + // Extract event name from packet data + // The packet parser strips brackets and placeholder, so data format is: + // - `"event_name"` (just the quoted event name) + // - `"event_name",{metadata}` (event name followed by metadata) + let event = if let Some(ref data) = packet.data { + // Try to extract the event name - it's the first quoted string + let trimmed = data.trim(); + if trimmed.starts_with('"') { + // Find the end of the quoted string + if let Some(end_quote) = trimmed[1..].find('"') { + let event_name = &trimmed[1..end_quote + 1]; + Event::from(event_name) + } else { + Event::Message + } + } else { + // Try parsing as JSON array (fallback) + match serde_json::from_str::(data) { + Ok(serde_json::Value::Array(contents)) if !contents.is_empty() => { + if let Some(serde_json::Value::String(ev)) = contents.first() { + Event::from(ev.as_str()) + } else { + Event::Message + } + } + _ => Event::Message + } + } } else { Event::Message }; diff --git a/socketio/src/asynchronous/socket.rs b/socketio/src/asynchronous/socket.rs index 81a9ebcd..9e51c44c 100644 --- a/socketio/src/asynchronous/socket.rs +++ b/socketio/src/asynchronous/socket.rs @@ -90,22 +90,57 @@ impl Socket { } fn stream( - client: EngineClient, + mut client: EngineClient, is_connected: Arc, ) -> Pin> + Send>> { Box::pin(try_stream! { - for await received_data in client.clone() { - let packet = received_data?; - - if packet.packet_id == EnginePacketId::Message - || packet.packet_id == EnginePacketId::MessageBinary - { - let packet = Self::handle_engineio_packet(packet, client.clone()).await?; - Self::handle_socketio_packet(&packet, is_connected.clone()); - - yield packet; + loop { + let received_data = match client.next().await { + Some(result) => result, + None => break, + }; + let packet = received_data?; + + if packet.packet_id == EnginePacketId::Message + || packet.packet_id == EnginePacketId::MessageBinary + { + let mut socket_packet = Packet::try_from(&packet.data)?; + + // Collect binary attachments inline from the same stream. + // This fixes a race condition where `client.clone()` caused + // the outer loop and the attachment loop to compete for packets, + // resulting in binary WebSocket frames being misinterpreted as + // Socket.IO text packets (causing EngineIO Error / InvalidUtf8). + if socket_packet.attachment_count > 0 { + let mut attachments_left = socket_packet.attachment_count; + let mut attachments = Vec::new(); + while attachments_left > 0 { + let next = match client.next().await { + Some(result) => result, + None => Err(Error::IncompletePacket())?, + }; + match next { + Err(err) => Err(err)?, + Ok(att_packet) => match att_packet.packet_id { + EnginePacketId::MessageBinary | EnginePacketId::Message => { + attachments.push(att_packet.data); + attachments_left -= 1; + } + _ => { + Err(Error::InvalidAttachmentPacketType( + att_packet.packet_id.into(), + ))?; + } + }, + } + } + socket_packet.attachments = Some(attachments); } + + Self::handle_socketio_packet(&socket_packet, is_connected.clone()); + yield socket_packet; } + } }) } @@ -126,40 +161,9 @@ impl Socket { } } - /// Handles new incoming engineio packets - async fn handle_engineio_packet( - packet: EnginePacket, - mut client: EngineClient, - ) -> Result { - let mut socket_packet = Packet::try_from(&packet.data)?; - - // Only handle attachments if there are any - if socket_packet.attachment_count > 0 { - let mut attachments_left = socket_packet.attachment_count; - let mut attachments = Vec::new(); - while attachments_left > 0 { - // TODO: This is not nice! Find a different way to peek the next element while mapping the stream - let next = client.next().await.unwrap(); - match next { - Err(err) => return Err(err.into()), - Ok(packet) => match packet.packet_id { - EnginePacketId::MessageBinary | EnginePacketId::Message => { - attachments.push(packet.data); - attachments_left -= 1; - } - _ => { - return Err(Error::InvalidAttachmentPacketType( - packet.packet_id.into(), - )); - } - }, - } - } - socket_packet.attachments = Some(attachments); - } - - Ok(socket_packet) - } + // Note: handle_engineio_packet was removed and its logic inlined into stream() + // to fix a race condition where client.clone() caused the outer iteration loop + // and the binary attachment collection loop to compete for the same packets. fn is_engineio_connected(&self) -> bool { self.engine_client.is_connected() diff --git a/socketio/src/client/raw_client.rs b/socketio/src/client/raw_client.rs index 9c8ecef2..539712fd 100644 --- a/socketio/src/client/raw_client.rs +++ b/socketio/src/client/raw_client.rs @@ -399,6 +399,7 @@ impl RawClient { } } +#[allow(dead_code)] pub struct Iter<'a> { socket: &'a RawClient, } diff --git a/socketio/src/error.rs b/socketio/src/error.rs index cc25d897..0c4d306c 100644 --- a/socketio/src/error.rs +++ b/socketio/src/error.rs @@ -40,7 +40,7 @@ pub enum Error { IncompleteIo(#[from] IoError), #[error("Error while parsing an integer")] InvalidInteger(#[from] ParseIntError), - #[error("EngineIO Error")] + #[error("EngineIO Error: {0}")] IncompleteResponseFromEngineIo(#[from] rust_engineio::Error), #[error("Invalid packet type while reading attachments")] InvalidAttachmentPacketType(u8),