Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions engineio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ harness = false
[lib]
bench = false

[lints.rust]
unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tarpaulin)'] }

[features]
default = ["async"]
async-callbacks = []
Expand Down
20 changes: 15 additions & 5 deletions engineio/src/asynchronous/async_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tokio::{runtime::Handle, sync::Mutex, time::Instant};
use crate::{
asynchronous::{callback::OptionalCallback, transport::AsyncTransportType},
error::Result,
packet::{HandshakePacket, Payload},
packet::{HandshakePacket, Payload, RAW_BINARY_MARKER},
Error, Packet, PacketId,
};

Expand Down Expand Up @@ -119,10 +119,20 @@ impl Socket {
/// Helper method that parses bytes and returns an iterator over the elements.
fn parse_payload(bytes: Bytes) -> impl Stream<Item = Result<Packet>> {
try_stream! {
let payload = Payload::try_from(bytes);

for elem in payload?.into_iter() {
yield elem;
let first_byte = bytes.first().copied();

// Check for raw binary marker - always single packet, never split
if first_byte == Some(RAW_BINARY_MARKER) {
yield Packet::try_from(bytes)?;
} else if !bytes.iter().any(|&b| b == 0x1e) {
// No separator - definitely a single packet
yield Packet::try_from(bytes)?;
} else {
// Has separator - use standard Payload parsing for batched packets
let payload = Payload::try_from(bytes)?;
for elem in payload.into_iter() {
yield elem;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ impl AsyncWebsocketGeneralTransport {
match next {
Some(Ok(Message::Text(str))) => return Ok(Some(Bytes::from(str))),
Some(Ok(Message::Binary(data))) => {
// For WebSocket binary frames (attachments), use 'B' (0x42) as marker
// This indicates raw binary that should bypass normal packet parsing
let mut msg = BytesMut::with_capacity(data.len() + 1);
msg.put_u8(PacketId::Message as u8);
msg.put_u8(b'B'); // Raw binary marker
msg.put(data.as_ref());

return Ok(Some(msg.freeze()));
Expand Down Expand Up @@ -115,8 +117,10 @@ impl Stream for AsyncWebsocketGeneralTransport {
match next {
Some(Ok(Message::Text(str))) => return Poll::Ready(Some(Ok(Bytes::from(str)))),
Some(Ok(Message::Binary(data))) => {
// For WebSocket binary frames (attachments), use 'B' (0x42) as marker
// This indicates raw binary that should bypass normal packet parsing
let mut msg = BytesMut::with_capacity(data.len() + 1);
msg.put_u8(PacketId::Message as u8);
msg.put_u8(b'B'); // Raw binary marker (not a standard Engine.IO packet type)
msg.put(data.as_ref());

return Poll::Ready(Some(Ok(msg.freeze())));
Expand Down
4 changes: 3 additions & 1 deletion engineio/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,14 @@ impl Client {
self.socket.is_connected()
}

pub fn iter(&self) -> Iter {
#[allow(dead_code)]
pub fn iter(&self) -> Iter<'_> {
Iter { socket: self }
}
}

#[derive(Clone)]
#[allow(dead_code)]
pub struct Iter<'a> {
socket: &'a Client,
}
Expand Down
17 changes: 14 additions & 3 deletions engineio/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ use std::fmt::{Display, Formatter, Result as FmtResult, Write};
use std::ops::Index;

use crate::error::{Error, Result};

/// Byte marker for raw binary payloads (WebSocket binary frames), as opposed to base64 'b'.
pub const RAW_BINARY_MARKER: u8 = b'B';

/// Enumeration of the `engine.io` `Packet` types.
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum PacketId {
Expand Down Expand Up @@ -65,6 +69,7 @@ impl TryFrom<u8> for PacketId {
4 | b'4' => Ok(PacketId::Message),
5 | b'5' => Ok(PacketId::Upgrade),
6 | b'6' => Ok(PacketId::Noop),
RAW_BINARY_MARKER => Ok(PacketId::MessageBinary), // Raw binary marker from WebSocket binary frames
_ => Err(Error::InvalidPacketId(b)),
}
}
Expand Down Expand Up @@ -115,13 +120,17 @@ impl TryFrom<Bytes> for Packet {
return Err(Error::IncompletePacket());
}

let is_base64 = *bytes.first().ok_or(Error::IncompletePacket())? == b'b';
let first_byte = *bytes.first().ok_or(Error::IncompletePacket())?;
let is_base64 = first_byte == b'b';
let is_raw_binary = first_byte == RAW_BINARY_MARKER;

// only 'messages' packets could be encoded
// Determine packet type
let packet_id = if is_base64 {
PacketId::MessageBinary
} else if is_raw_binary {
PacketId::MessageBinary // Raw binary also becomes MessageBinary
} else {
(*bytes.first().ok_or(Error::IncompletePacket())?).try_into()?
first_byte.try_into()?
};

if bytes.len() == 1 && packet_id == PacketId::Message {
Expand All @@ -133,8 +142,10 @@ impl TryFrom<Bytes> for Packet {
Ok(Packet {
packet_id,
data: if is_base64 {
// Base64 encoded binary needs decoding
Bytes::from(general_purpose::STANDARD.decode(data.as_ref())?)
} else {
// Raw binary (from 'B' marker) or regular text - pass through as-is
data
},
})
Expand Down
3 changes: 3 additions & 0 deletions socketio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ version = "1.40.0"
# we need the `#[tokio::test]` macro
features = ["macros", "rt-multi-thread"]

[lints.rust]
unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tarpaulin)'] }

[features]
default = []
async-callbacks = ["rust_engineio/async-callbacks"]
Expand Down
30 changes: 28 additions & 2 deletions socketio/src/asynchronous/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,34 @@ impl Client {
/// Handles a binary event.
#[inline]
async fn handle_binary_event(&self, packet: &Packet) -> Result<()> {
let event = if let Some(string_data) = &packet.data {
string_data.replace('\"', "").into()
// Extract event name from packet data
// The packet parser strips brackets and placeholder, so data format is:
// - `"event_name"` (just the quoted event name)
// - `"event_name",{metadata}` (event name followed by metadata)
let event = if let Some(ref data) = packet.data {
// Try to extract the event name - it's the first quoted string
let trimmed = data.trim();
if trimmed.starts_with('"') {
// Find the end of the quoted string
if let Some(end_quote) = trimmed[1..].find('"') {
let event_name = &trimmed[1..end_quote + 1];
Event::from(event_name)
} else {
Event::Message
}
} else {
// Try parsing as JSON array (fallback)
match serde_json::from_str::<serde_json::Value>(data) {
Ok(serde_json::Value::Array(contents)) if !contents.is_empty() => {
if let Some(serde_json::Value::String(ev)) = contents.first() {
Event::from(ev.as_str())
} else {
Event::Message
}
}
_ => Event::Message
}
}
} else {
Event::Message
};
Expand Down
94 changes: 49 additions & 45 deletions socketio/src/asynchronous/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,57 @@ impl Socket {
}

fn stream(
client: EngineClient,
mut client: EngineClient,
is_connected: Arc<AtomicBool>,
) -> Pin<Box<impl Stream<Item = Result<Packet>> + Send>> {
Box::pin(try_stream! {
for await received_data in client.clone() {
let packet = received_data?;

if packet.packet_id == EnginePacketId::Message
|| packet.packet_id == EnginePacketId::MessageBinary
{
let packet = Self::handle_engineio_packet(packet, client.clone()).await?;
Self::handle_socketio_packet(&packet, is_connected.clone());

yield packet;
loop {
let received_data = match client.next().await {
Some(result) => result,
None => break,
};
let packet = received_data?;

if packet.packet_id == EnginePacketId::Message
|| packet.packet_id == EnginePacketId::MessageBinary
{
let mut socket_packet = Packet::try_from(&packet.data)?;

// Collect binary attachments inline from the same stream.
// This fixes a race condition where `client.clone()` caused
// the outer loop and the attachment loop to compete for packets,
// resulting in binary WebSocket frames being misinterpreted as
// Socket.IO text packets (causing EngineIO Error / InvalidUtf8).
if socket_packet.attachment_count > 0 {
let mut attachments_left = socket_packet.attachment_count;
let mut attachments = Vec::new();
while attachments_left > 0 {
let next = match client.next().await {
Some(result) => result,
None => Err(Error::IncompletePacket())?,
};
match next {
Err(err) => Err(err)?,
Ok(att_packet) => match att_packet.packet_id {
EnginePacketId::MessageBinary | EnginePacketId::Message => {
attachments.push(att_packet.data);
attachments_left -= 1;
}
_ => {
Err(Error::InvalidAttachmentPacketType(
att_packet.packet_id.into(),
))?;
}
},
}
}
socket_packet.attachments = Some(attachments);
}

Self::handle_socketio_packet(&socket_packet, is_connected.clone());
yield socket_packet;
}
}
})
}

Expand All @@ -126,40 +161,9 @@ impl Socket {
}
}

/// Handles new incoming engineio packets
async fn handle_engineio_packet(
packet: EnginePacket,
mut client: EngineClient,
) -> Result<Packet> {
let mut socket_packet = Packet::try_from(&packet.data)?;

// Only handle attachments if there are any
if socket_packet.attachment_count > 0 {
let mut attachments_left = socket_packet.attachment_count;
let mut attachments = Vec::new();
while attachments_left > 0 {
// TODO: This is not nice! Find a different way to peek the next element while mapping the stream
let next = client.next().await.unwrap();
match next {
Err(err) => return Err(err.into()),
Ok(packet) => match packet.packet_id {
EnginePacketId::MessageBinary | EnginePacketId::Message => {
attachments.push(packet.data);
attachments_left -= 1;
}
_ => {
return Err(Error::InvalidAttachmentPacketType(
packet.packet_id.into(),
));
}
},
}
}
socket_packet.attachments = Some(attachments);
}

Ok(socket_packet)
}
// Note: handle_engineio_packet was removed and its logic inlined into stream()
// to fix a race condition where client.clone() caused the outer iteration loop
// and the binary attachment collection loop to compete for the same packets.

fn is_engineio_connected(&self) -> bool {
self.engine_client.is_connected()
Expand Down
1 change: 1 addition & 0 deletions socketio/src/client/raw_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ impl RawClient {
}
}

#[allow(dead_code)]
pub struct Iter<'a> {
socket: &'a RawClient,
}
Expand Down
2 changes: 1 addition & 1 deletion socketio/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub enum Error {
IncompleteIo(#[from] IoError),
#[error("Error while parsing an integer")]
InvalidInteger(#[from] ParseIntError),
#[error("EngineIO Error")]
#[error("EngineIO Error: {0}")]
IncompleteResponseFromEngineIo(#[from] rust_engineio::Error),
#[error("Invalid packet type while reading attachments")]
InvalidAttachmentPacketType(u8),
Expand Down