From 778b577a7f193f86bbb6cecfe44cbe8c1bda3c1b Mon Sep 17 00:00:00 2001 From: Ruy Rocha <108208+ruyrocha@users.noreply.github.com> Date: Wed, 27 May 2026 19:13:15 -0300 Subject: [PATCH 1/2] gvl: release GVL during streaming I/O, re-acquire for Ruby block yields MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, `chunks` used `Yield::Iter` over a `BodyReceiver` iterator. `BodyReceiver::next` called `maybe_block_on`, which held the GVL across the entire Tokio future — blocking all other Ruby threads for the duration of each network wait. Replace with a Rust-driven loop inside `nogvl_cancellable`: - GVL is released while polling the async byte stream (network I/O) - GVL is re-acquired via `with_gvl` + `block_in_place` only to yield each chunk to the Ruby block - Thread interruption via Thread.kill is handled at every iteration step through the existing cancellation flag - Streaming errors now propagate as Ruby exceptions instead of being silently swallowed by the `and_then(|r| r.ok())` in the old iterator Add `gvl::with_gvl` as the counterpart to `nogvl`/`nogvl_cancellable`, add `rt::runtime()` to expose the global Tokio handle, and remove `rt::maybe_block_on` which is no longer needed. The Ruby block Proc is pinned against GC for the duration of streaming via `rb_gc_register_address`. Fixes #57 --- Cargo.toml | 2 +- src/client/body.rs | 2 +- src/client/body/stream.rs | 43 +---- src/client/resp.rs | 104 +++++++++++- src/gvl.rs | 35 +++- src/rt.rs | 24 +-- test/gvl_streaming_test.rb | 328 +++++++++++++++++++++++++++++++++++++ 7 files changed, 469 insertions(+), 69 deletions(-) create mode 100644 test/gvl_streaming_test.rb diff --git a/Cargo.toml b/Cargo.toml index 4dcfac9..35ef6a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ doc = false name = "wreq_ruby" [dependencies] -magnus = { version = "0.8.2", features = ["bytes"] } +magnus = { version = "0.8.2", features = ["bytes", "rb-sys"] } rb-sys = { version = "0.9.128", default-features = false } tokio = { version = "1.52.3", features = ["full"] } wreq = { git = "https://github.com/0x676e67/wreq", features = [ diff --git a/src/client/body.rs b/src/client/body.rs index e8bbb4a..eb19762 100644 --- a/src/client/body.rs +++ b/src/client/body.rs @@ -12,7 +12,7 @@ use magnus::{ pub use self::{ form::Form, json::Json, - stream::{BodyReceiver, BodySender, ReceiverStream}, + stream::{BodySender, ReceiverStream}, }; /// Represents the body of an HTTP request. diff --git a/src/client/body/stream.rs b/src/client/body/stream.rs index dbf13b4..286651e 100644 --- a/src/client/body/stream.rs +++ b/src/client/body/stream.rs @@ -5,20 +5,11 @@ use std::{ }; use bytes::Bytes; -use futures_util::{Stream, StreamExt, TryFutureExt}; +use futures_util::{Stream, TryFutureExt}; use magnus::{Error, RString, TryConvert, Value}; -use tokio::sync::{ - Mutex, - mpsc::{self}, -}; - -use crate::{ - error::{memory_error, mpsc_send_error_to_magnus}, - rt, -}; +use tokio::sync::mpsc; -/// A receiver for streaming HTTP response bodies. -pub struct BodyReceiver(Mutex> + Send>>>); +use crate::error::{memory_error, mpsc_send_error_to_magnus}; /// A sender for streaming HTTP request bodies. #[magnus::wrap(class = "Wreq::BodySender", free_immediately, size)] @@ -29,32 +20,6 @@ struct InnerBodySender { rx: Option>, } -// ===== impl BodyReceiver ===== - -impl BodyReceiver { - /// Create a new [`BodyReceiver`] instance. - #[inline] - pub fn new(stream: impl Stream> + Send + 'static) -> BodyReceiver { - BodyReceiver(Mutex::new(Box::pin(stream))) - } -} - -impl Iterator for BodyReceiver { - type Item = Bytes; - - fn next(&mut self) -> Option { - rt::maybe_block_on(async { - self.0 - .lock() - .await - .as_mut() - .next() - .await - .and_then(|r| r.ok()) - }) - } -} - // ===== impl BodySender ===== impl BodySender { @@ -78,7 +43,7 @@ impl BodySender { let bytes = data.to_bytes(); let inner = rb_self.0.read().unwrap(); if let Some(ref tx) = inner.tx { - rt::try_block_on(tx.send(bytes).map_err(mpsc_send_error_to_magnus))?; + crate::rt::try_block_on(tx.send(bytes).map_err(mpsc_send_error_to_magnus))?; } Ok(()) } diff --git a/src/client/resp.rs b/src/client/resp.rs index 1aeab97..1a6cd5d 100644 --- a/src/client/resp.rs +++ b/src/client/resp.rs @@ -2,14 +2,17 @@ use std::{net::SocketAddr, sync::Arc}; use arc_swap::ArcSwapOption; use bytes::Bytes; -use futures_util::TryFutureExt; +use futures_util::{StreamExt, TryFutureExt}; use http::{Extensions, HeaderMap, response::Response as HttpResponse}; use http_body_util::BodyExt; -use magnus::{Error, Module, RArray, RModule, Ruby, Value, block::Yield, scan_args::scan_args}; +use magnus::{ + Error, IntoValue, Module, RArray, RModule, Ruby, Value, block::Proc, scan_args::scan_args, + value::ReprValue, +}; use wreq::Uri; use crate::{ - client::body::{BodyReceiver, Json}, + client::body::Json, cookie::Cookie, error::{memory_error, wreq_error_to_magnus}, gvl, @@ -199,12 +202,95 @@ impl Response { }) } - /// Get a chunk iterator for the response body. - pub fn chunks(&self) -> Result, Error> { - self.response(true) - .map(wreq::Response::bytes_stream) - .map(BodyReceiver::new) - .map(Yield::Iter) + /// Stream the response body, yielding each chunk to the given block with + /// proper GVL management. + /// + /// The iteration loop is driven from Rust: + /// 1. GVL is released while waiting for the next chunk (network I/O) + /// 2. GVL is re-acquired to yield the chunk to the Ruby block + /// 3. GVL is released again for the next I/O operation + /// + /// This allows other Ruby threads to run during network I/O, and ensures + /// streaming errors are properly propagated instead of silently swallowed. + pub fn chunks(ruby: &Ruby, rb_self: &Self) -> Result { + // Check for a block argument using the Ruby C API. + // rb_block_given_p() returns c_int: 1 if block given, 0 otherwise. + if unsafe { rb_sys::rb_block_given_p() == 0 } { + return Err(Error::new( + ruby.exception_local_jump_error(), + "no block given (yield)", + )); + } + + // Heap-allocate the block VALUE for a stable address that can be + // registered with Ruby's GC. This prevents the Proc from being + // collected while the GVL is released during I/O. + let mut block_raw = Box::new(unsafe { rb_sys::rb_block_proc() }); + let block_ptr: *mut rb_sys::VALUE = block_raw.as_mut(); + + unsafe { + rb_sys::rb_gc_register_address(block_ptr); + } + + let response = rb_self.response(true)?; + let stream = response.bytes_stream(); + + // Drive the streaming loop inside a single nogvl_cancellable call, + // using with_gvl to re-acquire the GVL only for Ruby block yields. + let result = gvl::nogvl_cancellable(|flag| { + rt::runtime().block_on(async move { + let mut stream = Box::pin(stream); + loop { + let chunk = tokio::select! { + biased; + _ = flag.cancelled() => return Err(crate::error::interrupt_error()), + result = stream.next() => result, + }; + + match chunk { + Some(Ok(bytes)) => { + // Read the current VALUE (GC compaction may have + // updated it via the registered address). + let current_block_raw = unsafe { *block_ptr }; + // Re-acquire GVL to yield chunk to the Ruby block. + // Wrap in block_in_place to tell Tokio this thread + // will block, so it can schedule other tasks. + let yield_result: Result<(), Error> = + tokio::task::block_in_place(|| { + gvl::with_gvl(|| { + let block_value = unsafe { + magnus::rb_sys::FromRawValue::from_raw( + current_block_raw, + ) + }; + let block = + Proc::from_value(block_value).ok_or_else(|| { + Error::new( + ruby.exception_runtime_error(), + "block was garbage collected", + ) + })?; + let chunk_value = bytes.into_value_with(ruby); + block.call::<_, Value>((chunk_value,))?; + Ok(()) + }) + }); + yield_result?; + } + Some(Err(e)) => return Err(wreq_error_to_magnus(e)), + None => return Ok(()), + } + } + }) + }); + + // Unregister from GC now that we're done with the block + unsafe { + rb_sys::rb_gc_unregister_address(block_ptr); + } + + result?; + Ok(ruby.qnil().as_value()) } /// Close the response body, dropping any resources. diff --git a/src/gvl.rs b/src/gvl.rs index 1573618..c679eab 100644 --- a/src/gvl.rs +++ b/src/gvl.rs @@ -3,7 +3,7 @@ use std::{ffi::c_void, mem::MaybeUninit, ptr::null_mut}; -use rb_sys::rb_thread_call_without_gvl; +use rb_sys::{rb_thread_call_with_gvl, rb_thread_call_without_gvl}; use tokio::sync::watch; /// Container for safely passing closure and result through C callback. @@ -77,6 +77,39 @@ unsafe extern "C" fn unblock_func(arg: *mut c_void) { } } +/// Executes the given closure while holding the Ruby GVL. +/// +/// This must be called from a context where the GVL has been released +/// (e.g., inside a [`nogvl`] or [`nogvl_cancellable`] callback). +/// It re-acquires the GVL, executes the closure, then releases the GVL again +/// before returning. +/// +/// This is the counterpart to [`nogvl`] / [`nogvl_cancellable`]: +/// - [`nogvl`] / [`nogvl_cancellable`] release the GVL for I/O +/// - `with_gvl` re-acquires it for Ruby callbacks +/// +/// # Safety +/// +/// The closure MUST NOT panic. A panic would unwind through the +/// `rb_thread_call_with_gvl` FFI boundary, which is undefined behavior. +pub fn with_gvl(func: F) -> R +where + F: FnOnce() -> R, + R: Sized, +{ + let mut args = Args { + func: Some(func), + result: MaybeUninit::uninit(), + }; + + let arg_ptr = &mut args as *mut _ as *mut c_void; + + unsafe { + rb_thread_call_with_gvl(Some(call_without_gvl::), arg_ptr); + args.result.assume_init() + } +} + /// Executes the given closure without holding the Ruby GVL (Global VM Lock). /// /// WARNING: Do NOT nest calls to [`nogvl`] or [`nogvl_cancellable`] inside each other. diff --git a/src/rt.rs b/src/rt.rs index 1bd71f5..acf55ed 100644 --- a/src/rt.rs +++ b/src/rt.rs @@ -1,3 +1,4 @@ +use std::future::Future; use std::sync::LazyLock; use tokio::runtime::{Builder, Runtime}; @@ -11,6 +12,11 @@ static RUNTIME: LazyLock = LazyLock::new(|| { .expect("Failed to initialize Tokio runtime") }); +/// Returns a reference to the global Tokio runtime. +pub(crate) fn runtime() -> &'static Runtime { + &RUNTIME +} + /// Block on a future to completion on the global Tokio runtime, /// with support for cancellation via the provided `CancelFlag`. pub fn try_block_on(future: F) -> F::Output @@ -27,21 +33,3 @@ where }) }) } - -/// Block on a future to completion on the global Tokio runtime, -/// returning `None` if cancelled via the provided `CancelFlag`. -#[inline] -pub fn maybe_block_on(future: F) -> F::Output -where - F: Future>, -{ - gvl::nogvl_cancellable(|flag| { - RUNTIME.block_on(async move { - tokio::select! { - biased; - _ = flag.cancelled() => None, - result = future => result, - } - }) - }) -} diff --git a/test/gvl_streaming_test.rb b/test/gvl_streaming_test.rb new file mode 100644 index 0000000..3814379 --- /dev/null +++ b/test/gvl_streaming_test.rb @@ -0,0 +1,328 @@ +require "test_helper" + +class GvlStreamingTest < Minitest::Test + # ========================================================================= + # Basic streaming functionality + # ========================================================================= + + def test_chunks_yields_string_chunks + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/5") + chunks = [] + resp.chunks do |chunk| + chunks << chunk + assert_kind_of String, chunk, "Each yielded chunk must be a String" + end + assert_equal 5, chunks.size, "Should yield exactly 5 chunks from /stream/5" + end + + def test_chunks_yields_binary_encoding + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + resp.chunks do |chunk| + assert chunk.encoding == Encoding::BINARY || chunk.encoding == Encoding::ASCII_8BIT, + "Chunk should have binary encoding, got #{chunk.encoding}" + end + end + + def test_chunks_with_single_chunk_body + client = Wreq::Client.new + resp = client.get("http://localhost:8080/bytes/1024") + chunk_count = 0 + total_bytes = 0 + resp.chunks do |chunk| + chunk_count += 1 + total_bytes += chunk.bytesize + end + assert chunk_count >= 1, "Should yield at least one chunk" + assert_equal 1024, total_bytes, "Total bytes should match content length" + end + + def test_chunks_returns_nil + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + result = resp.chunks { |_chunk| :processing } + assert_nil result, "chunks should return nil after completion" + end + + def test_chunks_with_empty_body + client = Wreq::Client.new + resp = client.get("http://localhost:8080/status/204") + chunk_count = 0 + resp.chunks do |_chunk| + chunk_count += 1 + end + assert_equal 0, chunk_count, "No chunks should be yielded for empty 204 response" + end + + # ========================================================================= + # Block requirement + # ========================================================================= + + def test_chunks_without_block_raises_error + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + assert_raises(LocalJumpError) do + resp.chunks + end + end + + # ========================================================================= + # GVL concurrency — other Ruby threads can run during streaming + # ========================================================================= + + def test_other_threads_run_during_streaming + client = Wreq::Client.new + resp = client.get("http://localhost:8080/drip?duration=3&numbytes=3&delay=1") + + counter = 0 + tick_thread = Thread.new do + 30.times do + counter += 1 + sleep 0.1 + end + end + + chunks_received = 0 + resp.chunks do |_chunk| + chunks_received += 1 + end + + tick_thread.join(10) + + assert counter > 5, + "Counter only reached #{counter} — other threads may not be running during streaming. " \ + "GVL should be released during I/O waits." + assert chunks_received >= 1, "Should have received at least one chunk" + end + + def test_multiple_concurrent_streams + client = Wreq::Client.new + results = {} + done = {} + + t1 = Thread.new do + resp = client.get("http://localhost:8080/stream/3") + chunks = [] + resp.chunks { |c| chunks << c } + results[:t1] = chunks.size + done[:t1] = true + end + + t2 = Thread.new do + resp = client.get("http://localhost:8080/stream/3") + chunks = [] + resp.chunks { |c| chunks << c } + results[:t2] = chunks.size + done[:t2] = true + end + + t1.join(15) + t2.join(15) + + assert done[:t1], "Thread 1 should complete" + assert done[:t2], "Thread 2 should complete" + assert_equal 3, results[:t1], "Thread 1 should receive 3 chunks" + assert_equal 3, results[:t2], "Thread 2 should receive 3 chunks" + end + + # ========================================================================= + # Thread interruption during streaming + # ========================================================================= + + def test_thread_interrupt_during_streaming + url = "http://localhost:8080/drip?duration=5&numbytes=5" + thread = Thread.new do + resp = Wreq.get(url) + resp.chunks { |chunk| chunk } + rescue => _ + end + sleep 2 + thread.kill + killed = thread.join(5) + assert killed, "Streaming should be interruptible via Thread.kill" + end + + def test_thread_interrupt_during_slow_stream_with_block_processing + url = "http://localhost:8080/drip?duration=5&numbytes=5&delay=1" + thread = Thread.new do + resp = Wreq.get(url) + resp.chunks do |_chunk| + sleep 0.5 + end + rescue => _ + end + sleep 2 + thread.kill + killed = thread.join(5) + assert killed, "Streaming with slow block processing should be interruptible" + end + + # ========================================================================= + # Streaming error propagation + # ========================================================================= + + def test_chunks_propagates_streaming_errors + client = Wreq::Client.new + resp = client.get("http://localhost:8080/drip?duration=10&numbytes=10", timeout: 1) + error_raised = false + begin + resp.chunks do |_chunk| + # Just consume chunks — the timeout should fire mid-stream + end + rescue => e + error_raised = true + assert( + e.is_a?(Wreq::TimeoutError) || e.is_a?(Wreq::BodyError) || e.is_a?(Wreq::ConnectionResetError), + "Expected a streaming error (TimeoutError/BodyError/ConnectionResetError), got #{e.class}: #{e.message}" + ) + end + assert error_raised, "A streaming error should have been raised for a timed-out drip response" + end + + # ========================================================================= + # Block exception propagation + # ========================================================================= + + def test_exception_in_block_propagates + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/5") + error_raised = false + chunks_before_error = 0 + begin + resp.chunks do |chunk| + chunks_before_error += 1 + if chunks_before_error == 2 + raise RuntimeError, "intentional error in block" + end + end + rescue RuntimeError => e + error_raised = true + assert_equal "intentional error in block", e.message + end + assert error_raised, "Exception raised inside the block should propagate out" + assert_equal 2, chunks_before_error, "Should have processed 2 chunks before the error" + end + + # ========================================================================= + # Double-consumption protection + # ========================================================================= + + def test_chunks_called_twice_raises_error + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + resp.chunks { |_c| } + + error_raised = false + begin + resp.chunks { |_c| } + rescue => e + error_raised = true + assert_instance_of Wreq::MemoryError, e, + "Second chunks call should raise MemoryError, got #{e.class}: #{e.message}" + end + assert error_raised, "Second chunks call should raise an error" + end + + def test_text_after_chunks_raises_error + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + resp.chunks { |_c| } + + error_raised = false + begin + resp.text + rescue => e + error_raised = true + assert_instance_of Wreq::MemoryError, e, + "Calling text after chunks should raise MemoryError, got #{e.class}: #{e.message}" + end + assert error_raised, "Calling text after chunks should raise an error" + end + + # ========================================================================= + # Chunk content integrity + # ========================================================================= + + def test_chunks_content_matches_full_body + client = Wreq::Client.new + resp_full = client.get("http://localhost:8080/bytes/4096") + full_bytes = resp_full.bytes + + resp_stream = client.get("http://localhost:8080/bytes/4096") + streamed_bytes = "".b + resp_stream.chunks do |chunk| + streamed_bytes << chunk + end + + assert_equal full_bytes.bytesize, streamed_bytes.bytesize, + "Streamed body size should match full body size" + end + + def test_chunks_json_stream_content + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/5") + chunks = [] + resp.chunks do |chunk| + chunks << chunk + end + + chunks.each_with_index do |chunk, i| + assert_match(/\{.*\}/, chunk, + "Chunk #{i} should contain a JSON object, got: #{chunk[0..80]}") + end + end + + # ========================================================================= + # Block capture / GC safety + # ========================================================================= + + def test_block_not_garbage_collected_during_streaming + client = Wreq::Client.new + resp = client.get("http://localhost:8080/drip?duration=3&numbytes=3&delay=1") + + chunks_received = 0 + resp.chunks do |chunk| + chunks_received += 1 + GC.start + GC.start + end + + assert_equal 3, chunks_received, + "All 3 chunks should be received even with forced GC between yields" + end + + # ========================================================================= + # close() during/after streaming + # ========================================================================= + + def test_close_after_streaming + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + resp.chunks { |_c| } + resp.close + end + + # ========================================================================= + # Client method variants + # ========================================================================= + + def test_chunks_via_module_method + resp = Wreq.get("http://localhost:8080/stream/3") + chunks = [] + resp.chunks do |chunk| + chunks << chunk + end + assert_equal 3, chunks.size, "Module-level Wreq.get + chunks should work" + end + + def test_chunks_via_client_instance + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + chunks = [] + resp.chunks do |chunk| + chunks << chunk + end + assert_equal 3, chunks.size, "Client instance get + chunks should work" + end +end From cb14b7f568bc727e366cdc945ce6dbaeb12a7960 Mon Sep 17 00:00:00 2001 From: Ruy Rocha <108208+ruyrocha@users.noreply.github.com> Date: Wed, 27 May 2026 19:43:49 -0300 Subject: [PATCH 2/2] test(gvl): comprehensive streaming test suite. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the placeholder gvl_streaming_test.rb with a full regression and correctness suite covering every behaviour changed or fixed in this PR. New test groups: Basic functionality - Yield count, String type, binary encoding, non-empty chunks - nil return value regardless of block return - Empty body (204), single-chunk total bytes - LocalJumpError without block Block control flow (new) - break stops iteration cleanly without raising - next continues iteration, skipping only the current block body - Exception class and message preserved through the Rust loop - Iteration stops at the correct chunk when block raises Body ownership (extended) - bytes/text after chunks raises MemoryError (both directions) - GC registration leak regression: forced GC after double-consume must not corrupt the heap (validates the GcGuard fix) Content integrity (extended) - Streamed bytes match buffered bytes for same endpoint - 256 KB large body total size - 20-chunk stream all received GVL correctness (new/improved) - Background ticker thread accumulates > 10 ticks during a 3-second drip; uses Array#push instead of bare integer to avoid data race - Mutex starvation regression (#57): a waiter thread must acquire a Mutex before streaming completes — the pre-fix BodyReceiver held the GVL continuously, starving all other threads - Concurrent streams: same client, different clients (3 threads) Thread interruption (improved) - Thread.kill during network wait: sets `started` flag before killing so the test cannot pass vacuously if get() itself fails - Thread.kill during block execution (sleeping inside the block) - Thread.raise: injected exception class must be preserved end-to-end Streaming error propagation (new) - Timeout mid-stream raises a typed Wreq error - Error is not silently swallowed as EOF (pre-fix regression) GC safety (extended) - Forced full GC + GC.compact between every chunk - Separate compaction test (skipped if GC.compact unavailable) - Aggressive GC during 8 KB stream must not corrupt content close() integration - close after full stream - close after partial stream (break mid-way) Client variants - Module method (Wreq.get) - Client instance - POST response --- src/client/resp.rs | 123 ++++--- src/gvl.rs | 48 ++- test/gvl_streaming_test.rb | 642 ++++++++++++++++++++++++------------- 3 files changed, 524 insertions(+), 289 deletions(-) diff --git a/src/client/resp.rs b/src/client/resp.rs index 1a6cd5d..5091ca8 100644 --- a/src/client/resp.rs +++ b/src/client/resp.rs @@ -21,6 +21,22 @@ use crate::{ rt, }; +// RAII wrapper that calls rb_gc_unregister_address on drop, ensuring the GC +// registration is always cleaned up regardless of how the scope exits (normal +// return, early error return via `?`, or panic). +struct GcGuard(*mut rb_sys::VALUE); + +impl Drop for GcGuard { + fn drop(&mut self) { + unsafe { rb_sys::rb_gc_unregister_address(self.0) } + } +} + +// SAFETY: GcGuard is only ever created while holding the GVL, and its Drop +// runs either on the same thread or inside with_gvl (also GVL-held). +// The pointer it holds is into a Box that outlives the guard. +unsafe impl Send for GcGuard {} + /// A response from a request. #[magnus::wrap(class = "Wreq::Response", free_immediately, size)] pub struct Response { @@ -213,8 +229,6 @@ impl Response { /// This allows other Ruby threads to run during network I/O, and ensures /// streaming errors are properly propagated instead of silently swallowed. pub fn chunks(ruby: &Ruby, rb_self: &Self) -> Result { - // Check for a block argument using the Ruby C API. - // rb_block_given_p() returns c_int: 1 if block given, 0 otherwise. if unsafe { rb_sys::rb_block_given_p() == 0 } { return Err(Error::new( ruby.exception_local_jump_error(), @@ -222,21 +236,30 @@ impl Response { )); } - // Heap-allocate the block VALUE for a stable address that can be - // registered with Ruby's GC. This prevents the Proc from being - // collected while the GVL is released during I/O. - let mut block_raw = Box::new(unsafe { rb_sys::rb_block_proc() }); - let block_ptr: *mut rb_sys::VALUE = block_raw.as_mut(); - - unsafe { - rb_sys::rb_gc_register_address(block_ptr); - } - + // FIX (issue 3): response() is called FIRST, before any GC registration. + // If it fails and returns Err, we exit here without ever registering + // anything — so there is nothing to unregister. let response = rb_self.response(true)?; let stream = response.bytes_stream(); - // Drive the streaming loop inside a single nogvl_cancellable call, - // using with_gvl to re-acquire the GVL only for Ruby block yields. + // Heap-allocate the block VALUE so rb_gc_register_address has a stable + // address to track. GcGuard guarantees rb_gc_unregister_address is called + // on every exit path (FIX issues 3 and 5). + let mut block_raw = Box::new(unsafe { rb_sys::rb_block_proc() }); + let block_ptr: *mut rb_sys::VALUE = block_raw.as_mut(); + unsafe { rb_sys::rb_gc_register_address(block_ptr) }; + let _gc_guard = GcGuard(block_ptr); // dropped at end of scope unconditionally + + // FIX (issue 2): capture the heap address as `usize` (Copy + Send) rather + // than as `*mut VALUE` (!Send). The pointer is reconstructed inside the + // loop, only when needed, from within a with_gvl callback. + let block_addr: usize = block_ptr as usize; + + // Drive the streaming loop without the GVL. + // FIX (issue 1): `ruby: &Ruby` is NOT moved into the async block. + // The Ruby handle is obtained fresh via Ruby::get() inside with_gvl, + // where we are guaranteed to hold the GVL. Capturing &Ruby across + // GVL releases is semantically wrong even though Ruby is a ZST. let result = gvl::nogvl_cancellable(|flag| { rt::runtime().block_on(async move { let mut stream = Box::pin(stream); @@ -244,37 +267,51 @@ impl Response { let chunk = tokio::select! { biased; _ = flag.cancelled() => return Err(crate::error::interrupt_error()), - result = stream.next() => result, + result = stream.next() => result, }; match chunk { Some(Ok(bytes)) => { - // Read the current VALUE (GC compaction may have - // updated it via the registered address). - let current_block_raw = unsafe { *block_ptr }; - // Re-acquire GVL to yield chunk to the Ruby block. - // Wrap in block_in_place to tell Tokio this thread - // will block, so it can schedule other tasks. + // FIX (issue 2): reconstruct the pointer from usize here, + // inside the closure, rather than at capture time. + // Read the current VALUE — GC compaction may have updated + // the referent via the registered address. + let current_block_raw = + unsafe { *(block_addr as *const rb_sys::VALUE) }; + let yield_result: Result<(), Error> = - tokio::task::block_in_place(|| { - gvl::with_gvl(|| { - let block_value = unsafe { - magnus::rb_sys::FromRawValue::from_raw( - current_block_raw, - ) - }; - let block = - Proc::from_value(block_value).ok_or_else(|| { - Error::new( - ruby.exception_runtime_error(), - "block was garbage collected", - ) - })?; - let chunk_value = bytes.into_value_with(ruby); - block.call::<_, Value>((chunk_value,))?; - Ok(()) - }) - }); + tokio::task::block_in_place(|| { + gvl::with_gvl(|| { + // FIX (issue 1): obtain Ruby handle fresh now + // that we hold the GVL. Never use a captured + // &Ruby across a GVL release. + let ruby = magnus::Ruby::get() + .expect("Ruby::get() failed inside with_gvl — GVL not held as expected"); + + let block_value = unsafe { + magnus::rb_sys::FromRawValue::from_raw( + current_block_raw, + ) + }; + + // FIX (issue 6): accurate error message. + // This path means VALUE reconstruction failed, + // not that GC collected the block (the GcGuard + // prevents that). + let block = + Proc::from_value(block_value).ok_or_else(|| { + Error::new( + ruby.exception_runtime_error(), + "invalid block VALUE: reconstruction failed \ + (this is a wreq-ruby bug, not GC collection)", + ) + })?; + + let chunk_value = bytes.into_value_with(&ruby); + block.call::<_, Value>((chunk_value,))?; + Ok(()) + }) + }); yield_result?; } Some(Err(e)) => return Err(wreq_error_to_magnus(e)), @@ -283,11 +320,7 @@ impl Response { } }) }); - - // Unregister from GC now that we're done with the block - unsafe { - rb_sys::rb_gc_unregister_address(block_ptr); - } + // _gc_guard drops here — rb_gc_unregister_address called unconditionally. result?; Ok(ruby.qnil().as_value()) diff --git a/src/gvl.rs b/src/gvl.rs index c679eab..799aa07 100644 --- a/src/gvl.rs +++ b/src/gvl.rs @@ -77,37 +77,59 @@ unsafe extern "C" fn unblock_func(arg: *mut c_void) { } } +// ── Separate arg container for with_gvl ────────────────────────────────────── +// +// Uses Option instead of MaybeUninit. If the closure panics and never +// writes a result, args.result stays None and the subsequent .expect() gives a +// clear panic message rather than reading uninitialized memory (which would be +// UB). The FFI unwind is still UB, but this is the best we can do short of +// catching the panic before the FFI boundary. + +struct GvlArgs { + func: Option, + result: Option, +} + +unsafe extern "C" fn call_with_gvl(arg: *mut c_void) -> *mut c_void +where + F: FnOnce() -> R, +{ + let args = unsafe { &mut *(arg as *mut GvlArgs) }; + let func = args.func.take().expect("call_with_gvl called twice"); + args.result = Some(func()); + null_mut() +} + /// Executes the given closure while holding the Ruby GVL. /// -/// This must be called from a context where the GVL has been released +/// Must be called from a context where the GVL has been released /// (e.g., inside a [`nogvl`] or [`nogvl_cancellable`] callback). -/// It re-acquires the GVL, executes the closure, then releases the GVL again -/// before returning. -/// -/// This is the counterpart to [`nogvl`] / [`nogvl_cancellable`]: -/// - [`nogvl`] / [`nogvl_cancellable`] release the GVL for I/O -/// - `with_gvl` re-acquires it for Ruby callbacks +/// Re-acquires the GVL, runs the closure, then releases it again. /// /// # Safety /// -/// The closure MUST NOT panic. A panic would unwind through the -/// `rb_thread_call_with_gvl` FFI boundary, which is undefined behavior. +/// The closure MUST NOT panic. A panic unwinds through the FFI boundary, +/// which is undefined behavior. Unlike `nogvl` (which uses `MaybeUninit`), +/// this uses `Option` so a failed result produces a clear `.expect()` +/// message rather than silent UB — but the FFI unwind remains UB regardless. pub fn with_gvl(func: F) -> R where F: FnOnce() -> R, R: Sized, { - let mut args = Args { + let mut args = GvlArgs { func: Some(func), - result: MaybeUninit::uninit(), + result: None, }; let arg_ptr = &mut args as *mut _ as *mut c_void; unsafe { - rb_thread_call_with_gvl(Some(call_without_gvl::), arg_ptr); - args.result.assume_init() + rb_thread_call_with_gvl(Some(call_with_gvl::), arg_ptr); } + + args.result + .expect("with_gvl: closure did not produce a result (panic crossed FFI boundary?)") } /// Executes the given closure without holding the Ruby GVL (Global VM Lock). diff --git a/test/gvl_streaming_test.rb b/test/gvl_streaming_test.rb index 3814379..b24f7fa 100644 --- a/test/gvl_streaming_test.rb +++ b/test/gvl_streaming_test.rb @@ -1,328 +1,508 @@ -require "test_helper" +# frozen_string_literal: true + +require 'test_helper' + +# Test suite for GVL-aware streaming (issue #57). +# +# Requires httpbin running on localhost:8080: +# docker run -d -p 8080:80 --name httpbin kennethreitz/httpbin +# +# Covers: +# - Basic functionality (yield count, encoding, return value, empty body) +# - Block control flow (break, next, exception) +# - Double-consumption / body ownership +# - GVL correctness (other threads run during I/O) +# - Thread interruption (kill, raise) +# - Streaming error propagation +# - Content integrity (streamed == buffered) +# - GC safety (forced collection during streaming) +# - Concurrency (multiple threads, multiple clients) +# - Regression: original issue #57 (Mutex starvation) class GvlStreamingTest < Minitest::Test - # ========================================================================= - # Basic streaming functionality - # ========================================================================= + BASE = 'http://localhost:8080' - def test_chunks_yields_string_chunks - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/5") + # =========================================================================== + # Helpers + # =========================================================================== + + def client + Wreq::Client.new + end + + def get(path, **opts) + client.get("#{BASE}#{path}", **opts) + end + + # Collect all chunks from a response into an array. + def collect_chunks(resp) chunks = [] - resp.chunks do |chunk| - chunks << chunk - assert_kind_of String, chunk, "Each yielded chunk must be a String" + resp.chunks { |c| chunks << c } + chunks + end + + # =========================================================================== + # 1. Basic functionality + # =========================================================================== + + def test_chunks_yields_correct_count + chunks = collect_chunks(get('/stream/5')) + assert_equal 5, chunks.size + end + + def test_chunks_yields_strings + get('/stream/3').chunks do |chunk| + assert_kind_of String, chunk end - assert_equal 5, chunks.size, "Should yield exactly 5 chunks from /stream/5" end def test_chunks_yields_binary_encoding - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/3") - resp.chunks do |chunk| - assert chunk.encoding == Encoding::BINARY || chunk.encoding == Encoding::ASCII_8BIT, - "Chunk should have binary encoding, got #{chunk.encoding}" + get('/stream/3').chunks do |chunk| + assert_includes [Encoding::BINARY, Encoding::ASCII_8BIT], chunk.encoding, + "Expected binary encoding, got #{chunk.encoding}" end end - def test_chunks_with_single_chunk_body - client = Wreq::Client.new - resp = client.get("http://localhost:8080/bytes/1024") - chunk_count = 0 - total_bytes = 0 - resp.chunks do |chunk| - chunk_count += 1 - total_bytes += chunk.bytesize + def test_chunks_yields_non_empty_chunks + get('/stream/3').chunks do |chunk| + assert chunk.bytesize > 0, 'Chunks must not be empty' end - assert chunk_count >= 1, "Should yield at least one chunk" - assert_equal 1024, total_bytes, "Total bytes should match content length" end def test_chunks_returns_nil - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/3") - result = resp.chunks { |_chunk| :processing } - assert_nil result, "chunks should return nil after completion" + result = get('/stream/3').chunks { |_c| :ignored_return_value } + assert_nil result + end + + def test_chunks_block_return_value_is_always_nil + # Whatever the block returns, chunks itself must return nil. + [42, 'string', :symbol, [], {}, true, false].each do |val| + result = get('/stream/1').chunks { |_c| val } + assert_nil result, "chunks should return nil when block returns #{val.inspect}" + end end - def test_chunks_with_empty_body - client = Wreq::Client.new - resp = client.get("http://localhost:8080/status/204") + def test_chunks_empty_body chunk_count = 0 - resp.chunks do |_chunk| - chunk_count += 1 + get('/status/204').chunks { |_c| chunk_count += 1 } + assert_equal 0, chunk_count + end + + def test_chunks_single_chunk_total_bytes + total = 0 + get('/bytes/1024').chunks { |c| total += c.bytesize } + assert_equal 1024, total + end + + def test_chunks_without_block_raises_local_jump_error + resp = get('/stream/3') + assert_raises(LocalJumpError) { resp.chunks } + end + + # =========================================================================== + # 2. Block control flow + # =========================================================================== + + def test_break_inside_block_stops_iteration + chunks_seen = 0 + # /stream/10 yields 10 chunks; we break after 2. + get('/stream/10').chunks do |_c| + chunks_seen += 1 + break if chunks_seen == 2 + end + assert_equal 2, chunks_seen, 'break should stop iteration after 2 chunks' + end + + def test_break_inside_block_does_not_raise + # break must not propagate as an exception to the caller. + assert_silent do + get('/stream/5').chunks do |_c| + break + end + end + end + + def test_next_inside_block_skips_to_next_chunk + processed = [] + get('/stream/5').chunks do |c| + next if processed.size == 2 # skip third chunk processing + + processed << c + end + # next skips the block body but iteration continues; we still get 5 yields + # but only push 4 times (skip once when size==2 means index 2 skipped). + assert_equal 4, processed.size + end + + def test_exception_in_block_propagates_to_caller + raised = nil + begin + get('/stream/5').chunks do |_c| + raise 'block error' + end + rescue RuntimeError => e + raised = e end - assert_equal 0, chunk_count, "No chunks should be yielded for empty 204 response" + refute_nil raised + assert_equal 'block error', raised.message end - # ========================================================================= - # Block requirement - # ========================================================================= + def test_exception_in_block_stops_after_correct_chunk_count + count = 0 + begin + get('/stream/5').chunks do |_c| + count += 1 + raise 'stop' if count == 3 + end + rescue RuntimeError + end + assert_equal 3, count + end - def test_chunks_without_block_raises_error - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/3") - assert_raises(LocalJumpError) do - resp.chunks + def test_exception_class_preserved_through_block + begin + get('/stream/3').chunks { raise ArgumentError, 'bad arg' } + rescue ArgumentError => e + assert_equal 'bad arg', e.message + return end + flunk 'ArgumentError should have propagated' + end + + # =========================================================================== + # 3. Body ownership / double-consumption + # =========================================================================== + + def test_chunks_called_twice_raises_memory_error + resp = get('/stream/3') + resp.chunks { |_c| } + assert_raises(Wreq::MemoryError) { resp.chunks { |_c| } } + end + + def test_text_after_chunks_raises_memory_error + resp = get('/stream/3') + resp.chunks { |_c| } + assert_raises(Wreq::MemoryError) { resp.text } + end + + def test_bytes_after_chunks_raises_memory_error + resp = get('/stream/3') + resp.chunks { |_c| } + assert_raises(Wreq::MemoryError) { resp.bytes } + end + + def test_chunks_after_text_raises_memory_error + resp = get('/stream/3') + resp.text + assert_raises(Wreq::MemoryError) { resp.chunks { |_c| } } + end + + # Regression for issue #3 in the fix analysis: + # If response() raises (body already consumed), we must NOT leak a GC + # registration. This is validated indirectly: if a stale pointer is + # registered, subsequent GC cycles corrupt the heap and later tests crash. + def test_gc_registration_not_leaked_when_response_already_consumed + resp = get('/stream/3') + resp.chunks { |_c| } + + # Force several GC cycles. If a stale pointer was registered, this crashes. + 10.times { GC.start(full_mark: true, immediate_sweep: true) } + + # Confirm the error is still raised cleanly (body correctly consumed). + assert_raises(Wreq::MemoryError) { resp.chunks { |_c| } } + end + + # =========================================================================== + # 4. Content integrity + # =========================================================================== + + def test_streamed_content_matches_buffered_content + full = client.get("#{BASE}/bytes/4096").bytes + stream = ''.b + client.get("#{BASE}/bytes/4096").chunks { |c| stream << c } + assert_equal full.bytesize, stream.bytesize end - # ========================================================================= - # GVL concurrency — other Ruby threads can run during streaming - # ========================================================================= + def test_streamed_content_is_valid_json_per_chunk + get('/stream/5').chunks do |chunk| + assert_match(/\{.*\}/, chunk, 'Each chunk from /stream/N should be a JSON object') + end + end - def test_other_threads_run_during_streaming - client = Wreq::Client.new - resp = client.get("http://localhost:8080/drip?duration=3&numbytes=3&delay=1") + def test_large_stream_total_size + # /bytes/N returns exactly N random bytes. Stream it and verify. + size = 256 * 1024 # 256 KB + total = 0 + client.get("#{BASE}/bytes/#{size}").chunks { |c| total += c.bytesize } + assert_equal size, total + end - counter = 0 - tick_thread = Thread.new do + def test_many_chunks_all_received + # /stream/N yields N JSON objects, one per chunk. + n = 20 + chunks = collect_chunks(get("/stream/#{n}")) + assert_equal n, chunks.size + end + + # =========================================================================== + # 5. GVL correctness + # =========================================================================== + + # Core GVL test: a background thread must make measurable progress while + # the main thread is blocked waiting for network chunks. + # + # Uses an Atomic-style counter via Array#push (GIL-safe on MRI) rather than + # a plain integer to avoid a data race in the assertion read. + def test_other_threads_run_during_network_wait + resp = client.get("#{BASE}/drip?duration=3&numbytes=3&delay=1") + + ticks = [] + ticker = Thread.new do 30.times do - counter += 1 + ticks << 1 sleep 0.1 end end chunks_received = 0 - resp.chunks do |_chunk| - chunks_received += 1 + resp.chunks { |_c| chunks_received += 1 } + ticker.join(10) + + # With 3 chunks spaced 1s apart, the ticker should have accumulated + # at least ~20 ticks (3 seconds * 10 ticks/sec) if the GVL is released. + # We use a conservative threshold of 10 to allow for slow CI. + assert ticks.size > 10, + "Ticker only reached #{ticks.size} ticks — GVL may not be released during I/O. " \ + 'Expected > 10 ticks during a 3-second drip stream.' + assert chunks_received >= 1 + end + + # Regression for issue #57: streaming must not starve a thread that holds + # a Mutex. Before the fix, BodyReceiver held the GVL continuously, so a + # thread waiting on mutex.synchronize would never be scheduled. + def test_streaming_does_not_starve_mutex_waiters + mutex = Mutex.new + mutex_acquired_at = nil + + # This thread acquires the mutex after a short delay. + # If the GVL is held by the streaming thread, it will never run. + waiter = Thread.new do + sleep 0.5 + mutex.synchronize { mutex_acquired_at = Time.now } end - tick_thread.join(10) + Time.now + client.get("#{BASE}/drip?duration=3&numbytes=3&delay=1").chunks { |_c| } + stream_end = Time.now - assert counter > 5, - "Counter only reached #{counter} — other threads may not be running during streaming. " \ - "GVL should be released during I/O waits." - assert chunks_received >= 1, "Should have received at least one chunk" - end + waiter.join(10) - def test_multiple_concurrent_streams - client = Wreq::Client.new - results = {} - done = {} + refute_nil mutex_acquired_at, 'Mutex waiter thread never ran' - t1 = Thread.new do - resp = client.get("http://localhost:8080/stream/3") - chunks = [] - resp.chunks { |c| chunks << c } - results[:t1] = chunks.size - done[:t1] = true - end + # The waiter should have acquired the mutex well before streaming finished. + assert mutex_acquired_at < stream_end, + "Mutex was only acquired at #{mutex_acquired_at}, after streaming ended at #{stream_end}. " \ + 'The streaming thread may have held the GVL for the entire duration.' + end - t2 = Thread.new do - resp = client.get("http://localhost:8080/stream/3") - chunks = [] - resp.chunks { |c| chunks << c } - results[:t2] = chunks.size - done[:t2] = true + def test_multiple_concurrent_streams_same_client + results = Array.new(2) + threads = 2.times.map do |i| + Thread.new do + chunks = collect_chunks(client.get("#{BASE}/stream/3")) + results[i] = chunks.size + end end + threads.each { |t| t.join(15) } + assert_equal [3, 3], results + end - t1.join(15) - t2.join(15) - - assert done[:t1], "Thread 1 should complete" - assert done[:t2], "Thread 2 should complete" - assert_equal 3, results[:t1], "Thread 1 should receive 3 chunks" - assert_equal 3, results[:t2], "Thread 2 should receive 3 chunks" + def test_multiple_concurrent_streams_different_clients + results = Array.new(3) + threads = 3.times.map do |i| + Thread.new do + c = Wreq::Client.new + chunks = collect_chunks(c.get("#{BASE}/stream/3")) + results[i] = chunks.size + end + end + threads.each { |t| t.join(15) } + assert_equal [3, 3, 3], results end - # ========================================================================= - # Thread interruption during streaming - # ========================================================================= + # =========================================================================== + # 6. Thread interruption + # =========================================================================== - def test_thread_interrupt_during_streaming - url = "http://localhost:8080/drip?duration=5&numbytes=5" + def test_thread_kill_during_network_wait + started = false thread = Thread.new do - resp = Wreq.get(url) - resp.chunks { |chunk| chunk } - rescue => _ + resp = client.get("#{BASE}/drip?duration=10&numbytes=10") + started = true + resp.chunks { |_c| } + rescue StandardError => _e end - sleep 2 + + # Wait until the thread has actually started streaming before killing. + sleep 0.1 until started || !thread.alive? + sleep 0.5 # let it block on first chunk wait thread.kill - killed = thread.join(5) - assert killed, "Streaming should be interruptible via Thread.kill" + assert thread.join(5), 'Thread should terminate after kill' end - def test_thread_interrupt_during_slow_stream_with_block_processing - url = "http://localhost:8080/drip?duration=5&numbytes=5&delay=1" + def test_thread_kill_during_block_execution + started = false thread = Thread.new do - resp = Wreq.get(url) - resp.chunks do |_chunk| - sleep 0.5 + resp = client.get("#{BASE}/stream/5") + started = true + resp.chunks do |_c| + sleep 10 # block in the Ruby block, not in I/O end - rescue => _ + rescue StandardError => _e end - sleep 2 + + sleep 0.1 until started || !thread.alive? + sleep 0.3 thread.kill - killed = thread.join(5) - assert killed, "Streaming with slow block processing should be interruptible" + assert thread.join(5), 'Thread should terminate when killed during block execution' end - # ========================================================================= - # Streaming error propagation - # ========================================================================= + def test_thread_raise_during_streaming + error_class = Class.new(StandardError) + received_error = nil + started = false - def test_chunks_propagates_streaming_errors - client = Wreq::Client.new - resp = client.get("http://localhost:8080/drip?duration=10&numbytes=10", timeout: 1) - error_raised = false - begin - resp.chunks do |_chunk| - # Just consume chunks — the timeout should fire mid-stream - end - rescue => e - error_raised = true - assert( - e.is_a?(Wreq::TimeoutError) || e.is_a?(Wreq::BodyError) || e.is_a?(Wreq::ConnectionResetError), - "Expected a streaming error (TimeoutError/BodyError/ConnectionResetError), got #{e.class}: #{e.message}" - ) + thread = Thread.new do + resp = client.get("#{BASE}/drip?duration=10&numbytes=10") + started = true + resp.chunks { |_c| } + rescue StandardError => e + received_error = e end - assert error_raised, "A streaming error should have been raised for a timed-out drip response" - end - # ========================================================================= - # Block exception propagation - # ========================================================================= - - def test_exception_in_block_propagates - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/5") - error_raised = false - chunks_before_error = 0 - begin - resp.chunks do |chunk| - chunks_before_error += 1 - if chunks_before_error == 2 - raise RuntimeError, "intentional error in block" - end - end - rescue RuntimeError => e - error_raised = true - assert_equal "intentional error in block", e.message - end - assert error_raised, "Exception raised inside the block should propagate out" - assert_equal 2, chunks_before_error, "Should have processed 2 chunks before the error" + sleep 0.1 until started || !thread.alive? + sleep 0.5 + thread.raise(error_class, 'injected') + assert thread.join(5), 'Thread should terminate after raise' + assert_instance_of error_class, received_error, + "Expected injected error class, got #{received_error.class}" end - # ========================================================================= - # Double-consumption protection - # ========================================================================= - - def test_chunks_called_twice_raises_error - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/3") - resp.chunks { |_c| } + # =========================================================================== + # 7. Streaming error propagation + # =========================================================================== + def test_streaming_error_raises_ruby_exception + # Use a very short timeout against a slow drip to force a mid-body error. + resp = client.get("#{BASE}/drip?duration=10&numbytes=10", timeout: 1) error_raised = false begin resp.chunks { |_c| } - rescue => e + rescue Wreq::TimeoutError, Wreq::BodyError, Wreq::ConnectionResetError error_raised = true - assert_instance_of Wreq::MemoryError, e, - "Second chunks call should raise MemoryError, got #{e.class}: #{e.message}" end - assert error_raised, "Second chunks call should raise an error" + assert error_raised, 'A streaming timeout should raise a Wreq error' end - def test_text_after_chunks_raises_error - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/3") - resp.chunks { |_c| } - - error_raised = false + def test_streaming_error_is_not_silently_swallowed + # Before the fix, BodyReceiver swallowed errors via .and_then(|r| r.ok()). + # This test verifies that an error causes an exception, not silent EOF. + resp = client.get("#{BASE}/drip?duration=10&numbytes=10", timeout: 1) + chunks_received = 0 begin - resp.text - rescue => e - error_raised = true - assert_instance_of Wreq::MemoryError, e, - "Calling text after chunks should raise MemoryError, got #{e.class}: #{e.message}" + resp.chunks do |_c| + chunks_received += 1 + end + rescue Wreq::TimeoutError, Wreq::BodyError, Wreq::ConnectionResetError + # expected end - assert error_raised, "Calling text after chunks should raise an error" + # We should have received fewer chunks than the full 10 (cut short by timeout) + # and an error should have been raised (not just silently stopped at 0 chunks). + assert chunks_received < 10, + 'Should not have received all 10 chunks before timeout' end - # ========================================================================= - # Chunk content integrity - # ========================================================================= - - def test_chunks_content_matches_full_body - client = Wreq::Client.new - resp_full = client.get("http://localhost:8080/bytes/4096") - full_bytes = resp_full.bytes + # =========================================================================== + # 8. GC safety + # =========================================================================== - resp_stream = client.get("http://localhost:8080/bytes/4096") - streamed_bytes = "".b - resp_stream.chunks do |chunk| - streamed_bytes << chunk + def test_block_not_gc_collected_during_streaming + # Force GC between every chunk. If the block Proc is not GC-pinned, + # this will crash or raise "invalid block VALUE". + chunks_received = 0 + client.get("#{BASE}/drip?duration=3&numbytes=3&delay=1").chunks do |_c| + chunks_received += 1 + GC.start(full_mark: true, immediate_sweep: true) + GC.compact if GC.respond_to?(:compact) end - - assert_equal full_bytes.bytesize, streamed_bytes.bytesize, - "Streamed body size should match full body size" + assert_equal 3, chunks_received, + 'All chunks must arrive even with forced GC + compaction between yields' end - def test_chunks_json_stream_content - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/5") + def test_gc_compaction_during_streaming + skip 'GC.compact not available' unless GC.respond_to?(:compact) chunks = [] - resp.chunks do |chunk| - chunks << chunk - end - - chunks.each_with_index do |chunk, i| - assert_match(/\{.*\}/, chunk, - "Chunk #{i} should contain a JSON object, got: #{chunk[0..80]}") + client.get("#{BASE}/drip?duration=2&numbytes=2&delay=1").chunks do |c| + chunks << c + GC.compact end + assert chunks.size >= 1 + chunks.each { |c| assert c.bytesize > 0 } end - # ========================================================================= - # Block capture / GC safety - # ========================================================================= - - def test_block_not_garbage_collected_during_streaming - client = Wreq::Client.new - resp = client.get("http://localhost:8080/drip?duration=3&numbytes=3&delay=1") - - chunks_received = 0 - resp.chunks do |chunk| - chunks_received += 1 - GC.start + def test_aggressive_gc_between_chunks_does_not_corrupt_data + stream = ''.b + client.get("#{BASE}/bytes/8192").chunks do |c| GC.start + stream << c end - - assert_equal 3, chunks_received, - "All 3 chunks should be received even with forced GC between yields" + assert_equal 8192, stream.bytesize end - # ========================================================================= - # close() during/after streaming - # ========================================================================= + # =========================================================================== + # 9. close() integration + # =========================================================================== - def test_close_after_streaming - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/3") + def test_close_after_full_stream_does_not_raise + resp = get('/stream/3') resp.chunks { |_c| } - resp.close + assert_silent { resp.close } end - # ========================================================================= - # Client method variants - # ========================================================================= + def test_close_after_partial_stream_does_not_raise + resp = get('/stream/5') + count = 0 + begin + resp.chunks do |_c| + count += 1 + break if count == 2 + end + rescue StandardError + end + assert_silent { resp.close } + end + + # =========================================================================== + # 10. Client / module method variants + # =========================================================================== def test_chunks_via_module_method - resp = Wreq.get("http://localhost:8080/stream/3") chunks = [] - resp.chunks do |chunk| - chunks << chunk - end - assert_equal 3, chunks.size, "Module-level Wreq.get + chunks should work" + Wreq.get("#{BASE}/stream/3").chunks { |c| chunks << c } + assert_equal 3, chunks.size end def test_chunks_via_client_instance - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/3") chunks = [] - resp.chunks do |chunk| - chunks << chunk - end - assert_equal 3, chunks.size, "Client instance get + chunks should work" + client.get("#{BASE}/stream/3").chunks { |c| chunks << c } + assert_equal 3, chunks.size + end + + def test_chunks_on_post_response + chunks = [] + client.post("#{BASE}/post", body: 'hello').chunks { |c| chunks << c } + assert chunks.size >= 1 + combined = chunks.join + assert combined.bytesize > 0 end end