Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/parser-common/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# socketioxide-parser-common 0.17.1
* deps: bump `socketioxide-core` to 0.18
# socketioxide-parser-common 0.17
* deps: bump `socketioxide-core` to 0.17
* MSRV: rust-version is now 1.86 with edition 2024
Expand Down
4 changes: 2 additions & 2 deletions crates/parser-common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "socketioxide-parser-common"
description = "Common parser for the socketioxide protocol"
version = "0.17.0"
version = "0.17.1"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
Expand All @@ -17,7 +17,7 @@ bytes.workspace = true
itoa.workspace = true
serde.workspace = true
serde_json.workspace = true
socketioxide-core = { version = "0.17", path = "../socketioxide-core" }
socketioxide-core = { version = "0.18", path = "../socketioxide-core" }

[dev-dependencies]
criterion.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions crates/parser-msgpack/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# socketioxide-parser-msgpack 0.17.1
* deps: bump `socketioxide-core` to 0.18

# socketioxide-parser-msgpack 0.17
* deps: bump `socketioxide-core` to 0.17
* MSRV: rust-version is now 1.86 with edition 2024
Expand Down
4 changes: 2 additions & 2 deletions crates/parser-msgpack/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "socketioxide-parser-msgpack"
description = "Msgpack parser for the socketioxide protocol"
version = "0.17.0"
version = "0.17.1"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
Expand All @@ -17,7 +17,7 @@ bytes.workspace = true
serde.workspace = true
rmp-serde.workspace = true
rmp.workspace = true
socketioxide-core = { version = "0.17", path = "../socketioxide-core" }
socketioxide-core = { version = "0.18", path = "../socketioxide-core" }

[dev-dependencies]
serde_json.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions crates/socketioxide-core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# socketioxide-core 0.18.0
* feat(*breaking*): expose global configured ack timeout to adapter implementations

# socketioxide-core 0.17.0
* feat(*breaking*): remote-adapter packets are now refactored in the core crate. Any adapter implementation can use
it through the `remote-adapter` flag.
Expand Down
2 changes: 1 addition & 1 deletion crates/socketioxide-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "socketioxide-core"
description = "Core of the socketioxide library. Contains basic types and interfaces for the socketioxide crate and all other related sub-crates."
version = "0.17.0"
version = "0.18.0"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
Expand Down
9 changes: 9 additions & 0 deletions crates/socketioxide-core/src/adapter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ pub trait SocketEmitter: Send + Sync + 'static {
fn parser(&self) -> impl Parse;
/// Get the unique server id.
fn server_id(&self) -> Uid;
/// Get the default configured ack timeout.
fn ack_timeout(&self) -> Duration;
}

/// For static namespaces, the init response will be managed by the user.
Expand Down Expand Up @@ -536,6 +538,10 @@ impl<E: SocketEmitter> CoreLocalAdapter<E> {
pub fn server_id(&self) -> Uid {
self.emitter.server_id()
}
/// Get the default configured ack timeout.
pub fn ack_timeout(&self) -> Duration {
self.emitter.ack_timeout()
}
}

/// The default broadcast iterator.
Expand Down Expand Up @@ -793,6 +799,9 @@ mod test {
fn server_id(&self) -> Uid {
Uid::ZERO
}
fn ack_timeout(&self) -> Duration {
Duration::ZERO
}
}

fn create_adapter<const S: usize>(sockets: [Sid; S]) -> CoreLocalAdapter<StubSockets> {
Expand Down
3 changes: 3 additions & 0 deletions crates/socketioxide-mongodb/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# socketioxide-mongodb 0.1.2
* fix: race condition between ack timeout and adapter request timeout when broadcasting with acks.

# socketioxide-mongodb 0.1.1
* fix: compilation error without default features.
* chore(deps): bump bson to 3.0.0
Expand Down
4 changes: 2 additions & 2 deletions crates/socketioxide-mongodb/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "socketioxide-mongodb"
description = "MongoDB adapter for socketioxide"
version = "0.1.1"
version = "0.1.2"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
Expand All @@ -18,7 +18,7 @@ ttl-index = ["dep:bson"]
default = ["mongodb", "ttl-index"]

[dependencies]
socketioxide-core = { version = "0.17", path = "../socketioxide-core", features = [
socketioxide-core = { version = "0.18", path = "../socketioxide-core", features = [
"remote-adapter",
] }
futures-core.workspace = true
Expand Down
8 changes: 7 additions & 1 deletion crates/socketioxide-mongodb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,16 @@ impl<E: SocketEmitter, D: Driver> CoreAdapter<E> for CustomMongoDbAdapter<E, D>
self.send_req(req, None).await?;
let (local, _) = self.local.broadcast_with_ack(packet, opts, timeout);

// we wait for the configured ack timeout + the adapter request timeout
let timeout = self
.config
.request_timeout
.saturating_add(timeout.unwrap_or(self.local.ack_timeout()));

Ok(AckStream::new(
local,
rx,
self.config.request_timeout,
timeout,
remote_serv_cnt,
req_id,
self.responses.clone(),
Expand Down
7 changes: 5 additions & 2 deletions crates/socketioxide-mongodb/tests/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,14 @@ pub async fn broadcast_with_ack() {
#[tokio::test]
pub async fn broadcast_with_ack_timeout() {
use futures_util::StreamExt;
const TIMEOUT: Duration = Duration::from_millis(50);
const REQ_TIMEOUT: Duration = Duration::from_millis(50);
const ACK_TIMEOUT: Duration = Duration::from_millis(50);
const TIMEOUT: Duration = Duration::from_millis(100);

async fn handler<A: Adapter>(socket: SocketRef<A>) {
socket
.broadcast()
.timeout(ACK_TIMEOUT)
.emit_with_ack::<_, String>("test", "bar")
.await
.unwrap()
Expand All @@ -124,7 +127,7 @@ pub async fn broadcast_with_ack_timeout() {
socket.emit("ack_res", "timeout").unwrap();
}

let [io1, io2] = fixture::spawn_buggy_servers(TIMEOUT);
let [io1, io2] = fixture::spawn_buggy_servers(REQ_TIMEOUT);

io1.ns("/", handler).await.unwrap();
io2.ns("/", async || ()).await.unwrap();
Expand Down
8 changes: 8 additions & 0 deletions crates/socketioxide-mongodb/tests/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ pub fn spawn_servers<const N: usize>() -> [SocketIo<CustomMongoDbAdapter<Emitter
spawn_inner(sync_buff, MongoDbAdapterConfig::default())
}

pub fn spawn_servers_with_request_timeout<const N: usize>(
request_timeout: Duration,
) -> [SocketIo<CustomMongoDbAdapter<Emitter, StubDriver>>; N] {
let sync_buff = Arc::new(RwLock::new(Vec::with_capacity(N)));
let config = MongoDbAdapterConfig::default().with_request_timeout(request_timeout);
spawn_inner(sync_buff, config)
}

pub fn spawn_buggy_servers<const N: usize>(
timeout: Duration,
) -> [SocketIo<CustomMongoDbAdapter<Emitter, StubDriver>>; N] {
Expand Down
3 changes: 3 additions & 0 deletions crates/socketioxide-redis/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# socketioxide-redis 0.4.1
* fix: race condition between ack timeout and adapter request timeout when broadcasting with acks.

# socketioxide-redis 0.4.0
* deps: bump `redis` to 1.0!

Expand Down
4 changes: 2 additions & 2 deletions crates/socketioxide-redis/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "socketioxide-redis"
description = "Redis adapter for the socket.io protocol"
version = "0.4.0"
version = "0.4.1"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
Expand All @@ -19,7 +19,7 @@ fred = ["dep:fred"]
default = ["redis"]

[dependencies]
socketioxide-core = { version = "0.17", path = "../socketioxide-core", features = [
socketioxide-core = { version = "0.18", path = "../socketioxide-core", features = [
"remote-adapter",
] }
futures-core.workspace = true
Expand Down
8 changes: 7 additions & 1 deletion crates/socketioxide-redis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,10 +528,16 @@ impl<E: SocketEmitter, R: Driver> CoreAdapter<E> for CustomRedisAdapter<E, R> {
self.send_req(req, opts.server_id).await?;
let (local, _) = self.local.broadcast_with_ack(packet, opts, timeout);

// we wait for the configured ack timeout + the adapter request timeout
let timeout = self
.config
.request_timeout
.saturating_add(timeout.unwrap_or(self.local.ack_timeout()));

Ok(AckStream::new(
local,
remote,
self.config.request_timeout,
timeout,
remote_serv_cnt,
req_id,
self.responses.clone(),
Expand Down
7 changes: 5 additions & 2 deletions crates/socketioxide-redis/tests/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,14 @@ pub async fn broadcast_with_ack() {
#[tokio::test]
pub async fn broadcast_with_ack_timeout() {
use futures_util::StreamExt;
const TIMEOUT: Duration = Duration::from_millis(50);
const REQ_TIMEOUT: Duration = Duration::from_millis(50);
const ACK_TIMEOUT: Duration = Duration::from_millis(50);
const TIMEOUT: Duration = Duration::from_millis(100);

async fn handler<A: Adapter>(socket: SocketRef<A>) {
socket
.broadcast()
.timeout(ACK_TIMEOUT)
.emit_with_ack::<_, String>("test", "bar")
.await
.unwrap()
Expand All @@ -123,7 +126,7 @@ pub async fn broadcast_with_ack_timeout() {
socket.emit("ack_res", "timeout").unwrap();
}

let [io1, io2] = fixture::spawn_buggy_servers(TIMEOUT);
let [io1, io2] = fixture::spawn_buggy_servers(REQ_TIMEOUT);

io1.ns("/", handler).await.unwrap();
io2.ns("/", async || ()).await.unwrap();
Expand Down
1 change: 1 addition & 0 deletions crates/socketioxide/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# socketioxide 0.18.3
* fix: race condition when emitting with acknowledgement.
* feat: expose global configured ack timeout to adapter implementations

# socketioxide 0.18.2
* deps: bump rand from 0.9.1 to 0.10.0
Expand Down
2 changes: 1 addition & 1 deletion crates/socketioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ readme.workspace = true

[dependencies]
engineioxide = { path = "../engineioxide", version = "0.17" }
socketioxide-core = { path = "../socketioxide-core", version = "0.17" }
socketioxide-core = { path = "../socketioxide-core", version = "0.18" }

bytes.workspace = true
futures-core.workspace = true
Expand Down
7 changes: 2 additions & 5 deletions crates/socketioxide/src/ack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,20 +136,17 @@ impl AckInnerStream {
/// Creates a new [`AckInnerStream`] from a [`Packet`] and a list of sockets.
/// The [`Packet`] is sent to all the sockets and the [`AckInnerStream`] will wait
/// for an acknowledgement from each socket.
///
/// The [`AckInnerStream`] will wait for the default timeout specified in the config
/// (5s by default) if no custom timeout is specified.
pub fn broadcast<'a, A: Adapter>(
packet: Packet,
sockets: impl Iterator<Item = &'a Arc<Socket<A>>>,
duration: Duration,
timeout: Duration,
) -> (Self, u32) {
let rxs = FuturesUnordered::new();
let mut count = 0;
for socket in sockets {
let rx = socket.send_with_ack(packet.clone());
rxs.push(AckResultWithId {
result: tokio::time::timeout(duration, rx),
result: tokio::time::timeout(timeout, rx),
id: socket.id,
});
count += 1;
Expand Down
3 changes: 3 additions & 0 deletions crates/socketioxide/src/ns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ impl SocketEmitter for Emitter {
fn path(&self) -> &Str {
&self.path
}
fn ack_timeout(&self) -> Duration {
self.ack_timeout
}
}

#[doc(hidden)]
Expand Down
Loading