From 22d7c2afad4d99f4aa3e734278c7ca4ae9c1eb93 Mon Sep 17 00:00:00 2001 From: Ben Neigher Date: Sat, 7 Feb 2026 06:15:09 +0100 Subject: [PATCH 1/2] fix(async): handle WebSocket binary frames and attachment collection race condition Two bugs were causing the async Socket.IO client to crash when receiving binary events (e.g. Socket.IO v4 binary attachments over WebSocket): 1. **Binary frame misidentification**: WebSocket binary frames were prefixed with `PacketId::Message` (0x34 = '4'), causing the Engine.IO packet parser to treat raw binary data as a text message. This led to `InvalidUtf8` and `InvalidPacketId` errors. Fix: binary frames are now prefixed with a `'B'` (0x42) marker that is recognized by the packet parser and mapped to `PacketId::MessageBinary`, bypassing text decoding. 2. **Attachment collection race condition**: `handle_engineio_packet` collected binary attachments using `client.clone()`, which created a second consumer on the same underlying stream. This caused the outer iteration loop and the attachment loop to compete for packets, resulting in binary frames being consumed by the wrong loop and misinterpreted as Socket.IO text packets. Fix: attachment collection is now inlined into `stream()` using the same `&mut client`, so packets are consumed sequentially without contention. Additional fixes: - `parse_payload` in async_socket.rs short-circuits for raw binary packets and single packets (no record separator), avoiding unnecessary splitting. - `handle_binary_event` properly extracts the event name from quoted strings in binary event packets instead of stripping all quotes from the data. - `IncompleteResponseFromEngineIo` error now includes the inner error message. Co-authored-by: Cursor --- engineio/src/asynchronous/async_socket.rs | 20 +++- .../async_transports/websocket_general.rs | 7 +- engineio/src/client/client.rs | 4 +- engineio/src/packet.rs | 21 ++++- socketio/src/asynchronous/client/client.rs | 30 +++++- socketio/src/asynchronous/socket.rs | 94 ++++++++++--------- socketio/src/client/raw_client.rs | 1 + socketio/src/error.rs | 2 +- 8 files changed, 118 insertions(+), 61 deletions(-) diff --git a/engineio/src/asynchronous/async_socket.rs b/engineio/src/asynchronous/async_socket.rs index 8718e0ea..a3e07242 100644 --- a/engineio/src/asynchronous/async_socket.rs +++ b/engineio/src/asynchronous/async_socket.rs @@ -15,7 +15,7 @@ use tokio::{runtime::Handle, sync::Mutex, time::Instant}; use crate::{ asynchronous::{callback::OptionalCallback, transport::AsyncTransportType}, error::Result, - packet::{HandshakePacket, Payload}, + packet::{HandshakePacket, Payload, RAW_BINARY_MARKER}, Error, Packet, PacketId, }; @@ -119,10 +119,20 @@ impl Socket { /// Helper method that parses bytes and returns an iterator over the elements. fn parse_payload(bytes: Bytes) -> impl Stream> { try_stream! { - let payload = Payload::try_from(bytes); - - for elem in payload?.into_iter() { - yield elem; + let first_byte = bytes.first().copied(); + + // Check for raw binary marker - always single packet, never split + if first_byte == Some(RAW_BINARY_MARKER) { + yield Packet::try_from(bytes)?; + } else if !bytes.iter().any(|&b| b == 0x1e) { + // No separator - definitely a single packet + yield Packet::try_from(bytes)?; + } else { + // Has separator - use standard Payload parsing for batched packets + let payload = Payload::try_from(bytes)?; + for elem in payload.into_iter() { + yield elem; + } } } } diff --git a/engineio/src/asynchronous/async_transports/websocket_general.rs b/engineio/src/asynchronous/async_transports/websocket_general.rs index 2b23a8eb..7e9c2010 100644 --- a/engineio/src/asynchronous/async_transports/websocket_general.rs +++ b/engineio/src/asynchronous/async_transports/websocket_general.rs @@ -1,6 +1,6 @@ use std::{borrow::Cow, str::from_utf8, sync::Arc, task::Poll}; -use crate::{error::Result, Error, Packet, PacketId}; +use crate::{error::Result, packet::RAW_BINARY_MARKER, Error, Packet, PacketId}; use bytes::{BufMut, Bytes, BytesMut}; use futures_util::{ ready, @@ -87,9 +87,8 @@ impl AsyncWebsocketGeneralTransport { Some(Ok(Message::Text(str))) => return Ok(Some(Bytes::from(str))), Some(Ok(Message::Binary(data))) => { let mut msg = BytesMut::with_capacity(data.len() + 1); - msg.put_u8(PacketId::Message as u8); + msg.put_u8(RAW_BINARY_MARKER); msg.put(data.as_ref()); - return Ok(Some(msg.freeze())); } // ignore packets other than text and binary @@ -116,7 +115,7 @@ impl Stream for AsyncWebsocketGeneralTransport { Some(Ok(Message::Text(str))) => return Poll::Ready(Some(Ok(Bytes::from(str)))), Some(Ok(Message::Binary(data))) => { let mut msg = BytesMut::with_capacity(data.len() + 1); - msg.put_u8(PacketId::Message as u8); + msg.put_u8(RAW_BINARY_MARKER); msg.put(data.as_ref()); return Poll::Ready(Some(Ok(msg.freeze()))); diff --git a/engineio/src/client/client.rs b/engineio/src/client/client.rs index dc22ff77..b823d789 100644 --- a/engineio/src/client/client.rs +++ b/engineio/src/client/client.rs @@ -353,12 +353,14 @@ impl Client { self.socket.is_connected() } - pub fn iter(&self) -> Iter { + #[allow(dead_code)] + pub fn iter(&self) -> Iter<'_> { Iter { socket: self } } } #[derive(Clone)] +#[allow(dead_code)] pub struct Iter<'a> { socket: &'a Client, } diff --git a/engineio/src/packet.rs b/engineio/src/packet.rs index 9238428f..3cc1a627 100644 --- a/engineio/src/packet.rs +++ b/engineio/src/packet.rs @@ -8,6 +8,14 @@ use std::fmt::{Display, Formatter, Result as FmtResult, Write}; use std::ops::Index; use crate::error::{Error, Result}; + +/// Internal marker byte for raw WebSocket binary frames. +/// +/// This is **not** a standard Engine.IO packet type (the spec defines types 0–6). +/// It is used only within the library to distinguish binary WebSocket frames +/// from text-encoded packets so that binary data bypasses UTF-8 / base64 parsing. +pub(crate) const RAW_BINARY_MARKER: u8 = b'B'; + /// Enumeration of the `engine.io` `Packet` types. #[derive(Copy, Clone, Eq, PartialEq, Debug)] pub enum PacketId { @@ -65,6 +73,7 @@ impl TryFrom for PacketId { 4 | b'4' => Ok(PacketId::Message), 5 | b'5' => Ok(PacketId::Upgrade), 6 | b'6' => Ok(PacketId::Noop), + RAW_BINARY_MARKER => Ok(PacketId::MessageBinary), _ => Err(Error::InvalidPacketId(b)), } } @@ -115,13 +124,17 @@ impl TryFrom for Packet { return Err(Error::IncompletePacket()); } - let is_base64 = *bytes.first().ok_or(Error::IncompletePacket())? == b'b'; + let first_byte = *bytes.first().ok_or(Error::IncompletePacket())?; + let is_base64 = first_byte == b'b'; + let is_raw_binary = first_byte == RAW_BINARY_MARKER; - // only 'messages' packets could be encoded + // Determine packet type let packet_id = if is_base64 { PacketId::MessageBinary + } else if is_raw_binary { + PacketId::MessageBinary // Raw binary also becomes MessageBinary } else { - (*bytes.first().ok_or(Error::IncompletePacket())?).try_into()? + first_byte.try_into()? }; if bytes.len() == 1 && packet_id == PacketId::Message { @@ -133,8 +146,10 @@ impl TryFrom for Packet { Ok(Packet { packet_id, data: if is_base64 { + // Base64 encoded binary needs decoding Bytes::from(general_purpose::STANDARD.decode(data.as_ref())?) } else { + // Raw binary (from 'B' marker) or regular text - pass through as-is data }, }) diff --git a/socketio/src/asynchronous/client/client.rs b/socketio/src/asynchronous/client/client.rs index 01991056..7889ca4e 100644 --- a/socketio/src/asynchronous/client/client.rs +++ b/socketio/src/asynchronous/client/client.rs @@ -472,8 +472,34 @@ impl Client { /// Handles a binary event. #[inline] async fn handle_binary_event(&self, packet: &Packet) -> Result<()> { - let event = if let Some(string_data) = &packet.data { - string_data.replace('\"', "").into() + // Extract event name from packet data + // The packet parser strips brackets and placeholder, so data format is: + // - `"event_name"` (just the quoted event name) + // - `"event_name",{metadata}` (event name followed by metadata) + let event = if let Some(ref data) = packet.data { + // Try to extract the event name - it's the first quoted string + let trimmed = data.trim(); + if trimmed.starts_with('"') { + // Find the end of the quoted string + if let Some(end_quote) = trimmed[1..].find('"') { + let event_name = &trimmed[1..end_quote + 1]; + Event::from(event_name) + } else { + Event::Message + } + } else { + // Try parsing as JSON array (fallback) + match serde_json::from_str::(data) { + Ok(serde_json::Value::Array(contents)) if !contents.is_empty() => { + if let Some(serde_json::Value::String(ev)) = contents.first() { + Event::from(ev.as_str()) + } else { + Event::Message + } + } + _ => Event::Message + } + } } else { Event::Message }; diff --git a/socketio/src/asynchronous/socket.rs b/socketio/src/asynchronous/socket.rs index 81a9ebcd..9e51c44c 100644 --- a/socketio/src/asynchronous/socket.rs +++ b/socketio/src/asynchronous/socket.rs @@ -90,22 +90,57 @@ impl Socket { } fn stream( - client: EngineClient, + mut client: EngineClient, is_connected: Arc, ) -> Pin> + Send>> { Box::pin(try_stream! { - for await received_data in client.clone() { - let packet = received_data?; - - if packet.packet_id == EnginePacketId::Message - || packet.packet_id == EnginePacketId::MessageBinary - { - let packet = Self::handle_engineio_packet(packet, client.clone()).await?; - Self::handle_socketio_packet(&packet, is_connected.clone()); - - yield packet; + loop { + let received_data = match client.next().await { + Some(result) => result, + None => break, + }; + let packet = received_data?; + + if packet.packet_id == EnginePacketId::Message + || packet.packet_id == EnginePacketId::MessageBinary + { + let mut socket_packet = Packet::try_from(&packet.data)?; + + // Collect binary attachments inline from the same stream. + // This fixes a race condition where `client.clone()` caused + // the outer loop and the attachment loop to compete for packets, + // resulting in binary WebSocket frames being misinterpreted as + // Socket.IO text packets (causing EngineIO Error / InvalidUtf8). + if socket_packet.attachment_count > 0 { + let mut attachments_left = socket_packet.attachment_count; + let mut attachments = Vec::new(); + while attachments_left > 0 { + let next = match client.next().await { + Some(result) => result, + None => Err(Error::IncompletePacket())?, + }; + match next { + Err(err) => Err(err)?, + Ok(att_packet) => match att_packet.packet_id { + EnginePacketId::MessageBinary | EnginePacketId::Message => { + attachments.push(att_packet.data); + attachments_left -= 1; + } + _ => { + Err(Error::InvalidAttachmentPacketType( + att_packet.packet_id.into(), + ))?; + } + }, + } + } + socket_packet.attachments = Some(attachments); } + + Self::handle_socketio_packet(&socket_packet, is_connected.clone()); + yield socket_packet; } + } }) } @@ -126,40 +161,9 @@ impl Socket { } } - /// Handles new incoming engineio packets - async fn handle_engineio_packet( - packet: EnginePacket, - mut client: EngineClient, - ) -> Result { - let mut socket_packet = Packet::try_from(&packet.data)?; - - // Only handle attachments if there are any - if socket_packet.attachment_count > 0 { - let mut attachments_left = socket_packet.attachment_count; - let mut attachments = Vec::new(); - while attachments_left > 0 { - // TODO: This is not nice! Find a different way to peek the next element while mapping the stream - let next = client.next().await.unwrap(); - match next { - Err(err) => return Err(err.into()), - Ok(packet) => match packet.packet_id { - EnginePacketId::MessageBinary | EnginePacketId::Message => { - attachments.push(packet.data); - attachments_left -= 1; - } - _ => { - return Err(Error::InvalidAttachmentPacketType( - packet.packet_id.into(), - )); - } - }, - } - } - socket_packet.attachments = Some(attachments); - } - - Ok(socket_packet) - } + // Note: handle_engineio_packet was removed and its logic inlined into stream() + // to fix a race condition where client.clone() caused the outer iteration loop + // and the binary attachment collection loop to compete for the same packets. fn is_engineio_connected(&self) -> bool { self.engine_client.is_connected() diff --git a/socketio/src/client/raw_client.rs b/socketio/src/client/raw_client.rs index 9c8ecef2..539712fd 100644 --- a/socketio/src/client/raw_client.rs +++ b/socketio/src/client/raw_client.rs @@ -399,6 +399,7 @@ impl RawClient { } } +#[allow(dead_code)] pub struct Iter<'a> { socket: &'a RawClient, } diff --git a/socketio/src/error.rs b/socketio/src/error.rs index cc25d897..0c4d306c 100644 --- a/socketio/src/error.rs +++ b/socketio/src/error.rs @@ -40,7 +40,7 @@ pub enum Error { IncompleteIo(#[from] IoError), #[error("Error while parsing an integer")] InvalidInteger(#[from] ParseIntError), - #[error("EngineIO Error")] + #[error("EngineIO Error: {0}")] IncompleteResponseFromEngineIo(#[from] rust_engineio::Error), #[error("Invalid packet type while reading attachments")] InvalidAttachmentPacketType(u8), From bae4ddbc313d9eeb525140571f351c7f987e0bf7 Mon Sep 17 00:00:00 2001 From: Ben Neigher Date: Wed, 11 Mar 2026 12:01:04 -0700 Subject: [PATCH 2/2] fix: binary attachment handling and lint config - Export RAW_BINARY_MARKER (pub) so async_socket can import it - Use raw binary marker in packet parsing and websocket_general with comments - Add [lints.rust] unexpected_cfgs allow for tarpaulin in engineio + socketio Made-with: Cursor --- Cargo.lock | 15 ++++++++++++++- engineio/Cargo.toml | 3 +++ .../async_transports/websocket_general.rs | 11 ++++++++--- engineio/src/packet.rs | 10 +++------- socketio/Cargo.toml | 3 +++ 5 files changed, 31 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e121b146..a68aed07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -1932,6 +1932,7 @@ dependencies = [ "serial_test", "thiserror", "tokio", + "tracing", "url", ] @@ -2456,9 +2457,21 @@ checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.89", +] + [[package]] name = "tracing-core" version = "0.1.32" diff --git a/engineio/Cargo.toml b/engineio/Cargo.toml index b38c5a6c..ac743b72 100644 --- a/engineio/Cargo.toml +++ b/engineio/Cargo.toml @@ -50,6 +50,9 @@ harness = false [lib] bench = false +[lints.rust] +unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tarpaulin)'] } + [features] default = ["async"] async-callbacks = [] diff --git a/engineio/src/asynchronous/async_transports/websocket_general.rs b/engineio/src/asynchronous/async_transports/websocket_general.rs index 7e9c2010..f57107ae 100644 --- a/engineio/src/asynchronous/async_transports/websocket_general.rs +++ b/engineio/src/asynchronous/async_transports/websocket_general.rs @@ -1,6 +1,6 @@ use std::{borrow::Cow, str::from_utf8, sync::Arc, task::Poll}; -use crate::{error::Result, packet::RAW_BINARY_MARKER, Error, Packet, PacketId}; +use crate::{error::Result, Error, Packet, PacketId}; use bytes::{BufMut, Bytes, BytesMut}; use futures_util::{ ready, @@ -86,9 +86,12 @@ impl AsyncWebsocketGeneralTransport { match next { Some(Ok(Message::Text(str))) => return Ok(Some(Bytes::from(str))), Some(Ok(Message::Binary(data))) => { + // For WebSocket binary frames (attachments), use 'B' (0x42) as marker + // This indicates raw binary that should bypass normal packet parsing let mut msg = BytesMut::with_capacity(data.len() + 1); - msg.put_u8(RAW_BINARY_MARKER); + msg.put_u8(b'B'); // Raw binary marker msg.put(data.as_ref()); + return Ok(Some(msg.freeze())); } // ignore packets other than text and binary @@ -114,8 +117,10 @@ impl Stream for AsyncWebsocketGeneralTransport { match next { Some(Ok(Message::Text(str))) => return Poll::Ready(Some(Ok(Bytes::from(str)))), Some(Ok(Message::Binary(data))) => { + // For WebSocket binary frames (attachments), use 'B' (0x42) as marker + // This indicates raw binary that should bypass normal packet parsing let mut msg = BytesMut::with_capacity(data.len() + 1); - msg.put_u8(RAW_BINARY_MARKER); + msg.put_u8(b'B'); // Raw binary marker (not a standard Engine.IO packet type) msg.put(data.as_ref()); return Poll::Ready(Some(Ok(msg.freeze()))); diff --git a/engineio/src/packet.rs b/engineio/src/packet.rs index 3cc1a627..c991e03c 100644 --- a/engineio/src/packet.rs +++ b/engineio/src/packet.rs @@ -9,12 +9,8 @@ use std::ops::Index; use crate::error::{Error, Result}; -/// Internal marker byte for raw WebSocket binary frames. -/// -/// This is **not** a standard Engine.IO packet type (the spec defines types 0–6). -/// It is used only within the library to distinguish binary WebSocket frames -/// from text-encoded packets so that binary data bypasses UTF-8 / base64 parsing. -pub(crate) const RAW_BINARY_MARKER: u8 = b'B'; +/// Byte marker for raw binary payloads (WebSocket binary frames), as opposed to base64 'b'. +pub const RAW_BINARY_MARKER: u8 = b'B'; /// Enumeration of the `engine.io` `Packet` types. #[derive(Copy, Clone, Eq, PartialEq, Debug)] @@ -73,7 +69,7 @@ impl TryFrom for PacketId { 4 | b'4' => Ok(PacketId::Message), 5 | b'5' => Ok(PacketId::Upgrade), 6 | b'6' => Ok(PacketId::Noop), - RAW_BINARY_MARKER => Ok(PacketId::MessageBinary), + RAW_BINARY_MARKER => Ok(PacketId::MessageBinary), // Raw binary marker from WebSocket binary frames _ => Err(Error::InvalidPacketId(b)), } } diff --git a/socketio/Cargo.toml b/socketio/Cargo.toml index 8e9b3deb..e045963a 100644 --- a/socketio/Cargo.toml +++ b/socketio/Cargo.toml @@ -39,6 +39,9 @@ version = "1.40.0" # we need the `#[tokio::test]` macro features = ["macros", "rt-multi-thread"] +[lints.rust] +unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tarpaulin)'] } + [features] default = [] async-callbacks = ["rust_engineio/async-callbacks"]