diff --git a/Cargo.lock b/Cargo.lock index 51b2946c..1f8d68da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2322,7 +2322,7 @@ dependencies = [ [[package]] name = "socketioxide" -version = "0.18.2" +version = "0.18.3" dependencies = [ "axum", "bytes", @@ -2356,7 +2356,7 @@ dependencies = [ [[package]] name = "socketioxide-core" -version = "0.17.0" +version = "0.18.0" dependencies = [ "arbitrary", "bytes", @@ -2385,7 +2385,7 @@ dependencies = [ [[package]] name = "socketioxide-mongodb" -version = "0.1.1" +version = "0.1.2" dependencies = [ "bson 3.1.0", "bytes", @@ -2406,7 +2406,7 @@ dependencies = [ [[package]] name = "socketioxide-parser-common" -version = "0.17.0" +version = "0.17.1" dependencies = [ "bytes", "codspeed-criterion-compat", @@ -2419,7 +2419,7 @@ dependencies = [ [[package]] name = "socketioxide-parser-msgpack" -version = "0.17.0" +version = "0.17.1" dependencies = [ "bytes", "codspeed-criterion-compat", @@ -2432,7 +2432,7 @@ dependencies = [ [[package]] name = "socketioxide-redis" -version = "0.4.0" +version = "0.4.1" dependencies = [ "bytes", "fred", diff --git a/crates/parser-common/CHANGELOG.md b/crates/parser-common/CHANGELOG.md index 4279ba3e..8e160bed 100644 --- a/crates/parser-common/CHANGELOG.md +++ b/crates/parser-common/CHANGELOG.md @@ -1,3 +1,5 @@ +# socketioxide-parser-common 0.17.1 +* deps: bump `socketioxide-core` to 0.18 # socketioxide-parser-common 0.17 * deps: bump `socketioxide-core` to 0.17 * MSRV: rust-version is now 1.86 with edition 2024 diff --git a/crates/parser-common/Cargo.toml b/crates/parser-common/Cargo.toml index 5d16ec54..d1ad4de0 100644 --- a/crates/parser-common/Cargo.toml +++ b/crates/parser-common/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "socketioxide-parser-common" description = "Common parser for the socketioxide protocol" -version = "0.17.0" +version = "0.17.1" edition.workspace = true rust-version.workspace = true authors.workspace = true @@ -17,7 +17,7 @@ bytes.workspace = true itoa.workspace = true serde.workspace = true serde_json.workspace = true -socketioxide-core = { version = "0.17", path = "../socketioxide-core" } +socketioxide-core = { version = "0.18", path = "../socketioxide-core" } [dev-dependencies] criterion.workspace = true diff --git a/crates/parser-msgpack/CHANGELOG.md b/crates/parser-msgpack/CHANGELOG.md index e74ebae6..45c4e72f 100644 --- a/crates/parser-msgpack/CHANGELOG.md +++ b/crates/parser-msgpack/CHANGELOG.md @@ -1,3 +1,6 @@ +# socketioxide-parser-msgpack 0.17.1 +* deps: bump `socketioxide-core` to 0.18 + # socketioxide-parser-msgpack 0.17 * deps: bump `socketioxide-core` to 0.17 * MSRV: rust-version is now 1.86 with edition 2024 diff --git a/crates/parser-msgpack/Cargo.toml b/crates/parser-msgpack/Cargo.toml index b4f3f57f..8977e64d 100644 --- a/crates/parser-msgpack/Cargo.toml +++ b/crates/parser-msgpack/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "socketioxide-parser-msgpack" description = "Msgpack parser for the socketioxide protocol" -version = "0.17.0" +version = "0.17.1" edition.workspace = true rust-version.workspace = true authors.workspace = true @@ -17,7 +17,7 @@ bytes.workspace = true serde.workspace = true rmp-serde.workspace = true rmp.workspace = true -socketioxide-core = { version = "0.17", path = "../socketioxide-core" } +socketioxide-core = { version = "0.18", path = "../socketioxide-core" } [dev-dependencies] serde_json.workspace = true diff --git a/crates/socketioxide-core/CHANGELOG.md b/crates/socketioxide-core/CHANGELOG.md index 3a044fdb..e04e06a5 100644 --- a/crates/socketioxide-core/CHANGELOG.md +++ b/crates/socketioxide-core/CHANGELOG.md @@ -1,3 +1,6 @@ +# socketioxide-core 0.18.0 +* feat(*breaking*): expose global configured ack timeout to adapter implementations + # socketioxide-core 0.17.0 * feat(*breaking*): remote-adapter packets are now refactored in the core crate. Any adapter implementation can use it through the `remote-adapter` flag. diff --git a/crates/socketioxide-core/Cargo.toml b/crates/socketioxide-core/Cargo.toml index b64b597d..a56d453f 100644 --- a/crates/socketioxide-core/Cargo.toml +++ b/crates/socketioxide-core/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "socketioxide-core" description = "Core of the socketioxide library. Contains basic types and interfaces for the socketioxide crate and all other related sub-crates." -version = "0.17.0" +version = "0.18.0" edition.workspace = true rust-version.workspace = true authors.workspace = true diff --git a/crates/socketioxide-core/src/adapter/mod.rs b/crates/socketioxide-core/src/adapter/mod.rs index 6e6b760f..7e055ed9 100644 --- a/crates/socketioxide-core/src/adapter/mod.rs +++ b/crates/socketioxide-core/src/adapter/mod.rs @@ -219,6 +219,8 @@ pub trait SocketEmitter: Send + Sync + 'static { fn parser(&self) -> impl Parse; /// Get the unique server id. fn server_id(&self) -> Uid; + /// Get the default configured ack timeout. + fn ack_timeout(&self) -> Duration; } /// For static namespaces, the init response will be managed by the user. @@ -536,6 +538,10 @@ impl CoreLocalAdapter { pub fn server_id(&self) -> Uid { self.emitter.server_id() } + /// Get the default configured ack timeout. + pub fn ack_timeout(&self) -> Duration { + self.emitter.ack_timeout() + } } /// The default broadcast iterator. @@ -793,6 +799,9 @@ mod test { fn server_id(&self) -> Uid { Uid::ZERO } + fn ack_timeout(&self) -> Duration { + Duration::ZERO + } } fn create_adapter(sockets: [Sid; S]) -> CoreLocalAdapter { diff --git a/crates/socketioxide-mongodb/CHANGELOG.md b/crates/socketioxide-mongodb/CHANGELOG.md index 4fe46ed4..54a0ffe5 100644 --- a/crates/socketioxide-mongodb/CHANGELOG.md +++ b/crates/socketioxide-mongodb/CHANGELOG.md @@ -1,3 +1,6 @@ +# socketioxide-mongodb 0.1.2 +* fix: race condition between ack timeout and adapter request timeout when broadcasting with acks. + # socketioxide-mongodb 0.1.1 * fix: compilation error without default features. * chore(deps): bump bson to 3.0.0 diff --git a/crates/socketioxide-mongodb/Cargo.toml b/crates/socketioxide-mongodb/Cargo.toml index 04ccd13a..ef90e29c 100644 --- a/crates/socketioxide-mongodb/Cargo.toml +++ b/crates/socketioxide-mongodb/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "socketioxide-mongodb" description = "MongoDB adapter for socketioxide" -version = "0.1.1" +version = "0.1.2" edition.workspace = true rust-version.workspace = true authors.workspace = true @@ -18,7 +18,7 @@ ttl-index = ["dep:bson"] default = ["mongodb", "ttl-index"] [dependencies] -socketioxide-core = { version = "0.17", path = "../socketioxide-core", features = [ +socketioxide-core = { version = "0.18", path = "../socketioxide-core", features = [ "remote-adapter", ] } futures-core.workspace = true diff --git a/crates/socketioxide-mongodb/src/lib.rs b/crates/socketioxide-mongodb/src/lib.rs index 6659e215..da9ae0ca 100644 --- a/crates/socketioxide-mongodb/src/lib.rs +++ b/crates/socketioxide-mongodb/src/lib.rs @@ -443,10 +443,16 @@ impl CoreAdapter for CustomMongoDbAdapter self.send_req(req, None).await?; let (local, _) = self.local.broadcast_with_ack(packet, opts, timeout); + // we wait for the configured ack timeout + the adapter request timeout + let timeout = self + .config + .request_timeout + .saturating_add(timeout.unwrap_or(self.local.ack_timeout())); + Ok(AckStream::new( local, rx, - self.config.request_timeout, + timeout, remote_serv_cnt, req_id, self.responses.clone(), diff --git a/crates/socketioxide-mongodb/tests/broadcast.rs b/crates/socketioxide-mongodb/tests/broadcast.rs index c7ba71a9..f1486cc1 100644 --- a/crates/socketioxide-mongodb/tests/broadcast.rs +++ b/crates/socketioxide-mongodb/tests/broadcast.rs @@ -108,11 +108,14 @@ pub async fn broadcast_with_ack() { #[tokio::test] pub async fn broadcast_with_ack_timeout() { use futures_util::StreamExt; - const TIMEOUT: Duration = Duration::from_millis(50); + const REQ_TIMEOUT: Duration = Duration::from_millis(50); + const ACK_TIMEOUT: Duration = Duration::from_millis(50); + const TIMEOUT: Duration = Duration::from_millis(100); async fn handler(socket: SocketRef) { socket .broadcast() + .timeout(ACK_TIMEOUT) .emit_with_ack::<_, String>("test", "bar") .await .unwrap() @@ -124,7 +127,7 @@ pub async fn broadcast_with_ack_timeout() { socket.emit("ack_res", "timeout").unwrap(); } - let [io1, io2] = fixture::spawn_buggy_servers(TIMEOUT); + let [io1, io2] = fixture::spawn_buggy_servers(REQ_TIMEOUT); io1.ns("/", handler).await.unwrap(); io2.ns("/", async || ()).await.unwrap(); diff --git a/crates/socketioxide-mongodb/tests/fixture.rs b/crates/socketioxide-mongodb/tests/fixture.rs index 3da15cbb..84416ad3 100644 --- a/crates/socketioxide-mongodb/tests/fixture.rs +++ b/crates/socketioxide-mongodb/tests/fixture.rs @@ -25,6 +25,14 @@ pub fn spawn_servers() -> [SocketIo( + request_timeout: Duration, +) -> [SocketIo>; N] { + let sync_buff = Arc::new(RwLock::new(Vec::with_capacity(N))); + let config = MongoDbAdapterConfig::default().with_request_timeout(request_timeout); + spawn_inner(sync_buff, config) +} + pub fn spawn_buggy_servers( timeout: Duration, ) -> [SocketIo>; N] { diff --git a/crates/socketioxide-redis/CHANGELOG.md b/crates/socketioxide-redis/CHANGELOG.md index 2a997e69..5f23e6b1 100644 --- a/crates/socketioxide-redis/CHANGELOG.md +++ b/crates/socketioxide-redis/CHANGELOG.md @@ -1,3 +1,6 @@ +# socketioxide-redis 0.4.1 +* fix: race condition between ack timeout and adapter request timeout when broadcasting with acks. + # socketioxide-redis 0.4.0 * deps: bump `redis` to 1.0! diff --git a/crates/socketioxide-redis/Cargo.toml b/crates/socketioxide-redis/Cargo.toml index 0f209a55..c82e14d6 100644 --- a/crates/socketioxide-redis/Cargo.toml +++ b/crates/socketioxide-redis/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "socketioxide-redis" description = "Redis adapter for the socket.io protocol" -version = "0.4.0" +version = "0.4.1" edition.workspace = true rust-version.workspace = true authors.workspace = true @@ -19,7 +19,7 @@ fred = ["dep:fred"] default = ["redis"] [dependencies] -socketioxide-core = { version = "0.17", path = "../socketioxide-core", features = [ +socketioxide-core = { version = "0.18", path = "../socketioxide-core", features = [ "remote-adapter", ] } futures-core.workspace = true diff --git a/crates/socketioxide-redis/src/lib.rs b/crates/socketioxide-redis/src/lib.rs index 17242423..6f9beccc 100644 --- a/crates/socketioxide-redis/src/lib.rs +++ b/crates/socketioxide-redis/src/lib.rs @@ -528,10 +528,16 @@ impl CoreAdapter for CustomRedisAdapter { self.send_req(req, opts.server_id).await?; let (local, _) = self.local.broadcast_with_ack(packet, opts, timeout); + // we wait for the configured ack timeout + the adapter request timeout + let timeout = self + .config + .request_timeout + .saturating_add(timeout.unwrap_or(self.local.ack_timeout())); + Ok(AckStream::new( local, remote, - self.config.request_timeout, + timeout, remote_serv_cnt, req_id, self.responses.clone(), diff --git a/crates/socketioxide-redis/tests/broadcast.rs b/crates/socketioxide-redis/tests/broadcast.rs index 0c2bb238..16140eb3 100644 --- a/crates/socketioxide-redis/tests/broadcast.rs +++ b/crates/socketioxide-redis/tests/broadcast.rs @@ -108,11 +108,14 @@ pub async fn broadcast_with_ack() { #[tokio::test] pub async fn broadcast_with_ack_timeout() { use futures_util::StreamExt; - const TIMEOUT: Duration = Duration::from_millis(50); + const REQ_TIMEOUT: Duration = Duration::from_millis(50); + const ACK_TIMEOUT: Duration = Duration::from_millis(50); + const TIMEOUT: Duration = Duration::from_millis(100); async fn handler(socket: SocketRef) { socket .broadcast() + .timeout(ACK_TIMEOUT) .emit_with_ack::<_, String>("test", "bar") .await .unwrap() @@ -123,7 +126,7 @@ pub async fn broadcast_with_ack_timeout() { socket.emit("ack_res", "timeout").unwrap(); } - let [io1, io2] = fixture::spawn_buggy_servers(TIMEOUT); + let [io1, io2] = fixture::spawn_buggy_servers(REQ_TIMEOUT); io1.ns("/", handler).await.unwrap(); io2.ns("/", async || ()).await.unwrap(); diff --git a/crates/socketioxide/CHANGELOG.md b/crates/socketioxide/CHANGELOG.md index ae922950..988a6fb9 100644 --- a/crates/socketioxide/CHANGELOG.md +++ b/crates/socketioxide/CHANGELOG.md @@ -1,5 +1,6 @@ # socketioxide 0.18.3 * fix: race condition when emitting with acknowledgement. +* feat: expose global configured ack timeout to adapter implementations # socketioxide 0.18.2 * deps: bump rand from 0.9.1 to 0.10.0 diff --git a/crates/socketioxide/Cargo.toml b/crates/socketioxide/Cargo.toml index c3de82f4..abbe3b11 100644 --- a/crates/socketioxide/Cargo.toml +++ b/crates/socketioxide/Cargo.toml @@ -14,7 +14,7 @@ readme.workspace = true [dependencies] engineioxide = { path = "../engineioxide", version = "0.17" } -socketioxide-core = { path = "../socketioxide-core", version = "0.17" } +socketioxide-core = { path = "../socketioxide-core", version = "0.18" } bytes.workspace = true futures-core.workspace = true diff --git a/crates/socketioxide/src/ack.rs b/crates/socketioxide/src/ack.rs index a8a25ae0..2a193c2a 100644 --- a/crates/socketioxide/src/ack.rs +++ b/crates/socketioxide/src/ack.rs @@ -136,20 +136,17 @@ impl AckInnerStream { /// Creates a new [`AckInnerStream`] from a [`Packet`] and a list of sockets. /// The [`Packet`] is sent to all the sockets and the [`AckInnerStream`] will wait /// for an acknowledgement from each socket. - /// - /// The [`AckInnerStream`] will wait for the default timeout specified in the config - /// (5s by default) if no custom timeout is specified. pub fn broadcast<'a, A: Adapter>( packet: Packet, sockets: impl Iterator>>, - duration: Duration, + timeout: Duration, ) -> (Self, u32) { let rxs = FuturesUnordered::new(); let mut count = 0; for socket in sockets { let rx = socket.send_with_ack(packet.clone()); rxs.push(AckResultWithId { - result: tokio::time::timeout(duration, rx), + result: tokio::time::timeout(timeout, rx), id: socket.id, }); count += 1; diff --git a/crates/socketioxide/src/ns.rs b/crates/socketioxide/src/ns.rs index e39f5504..78236875 100644 --- a/crates/socketioxide/src/ns.rs +++ b/crates/socketioxide/src/ns.rs @@ -381,6 +381,9 @@ impl SocketEmitter for Emitter { fn path(&self) -> &Str { &self.path } + fn ack_timeout(&self) -> Duration { + self.ack_timeout + } } #[doc(hidden)]