From dcdf83dfdd3170afb83ad3580902f8334197e3fa Mon Sep 17 00:00:00 2001 From: 0x676e67 Date: Thu, 28 May 2026 17:19:23 +0800 Subject: [PATCH 1/2] test(stream): simplify stream test flow --- test/gvl_stream_test.rb | 330 ---------------------------------------- test/stream_test.rb | 296 ++++++++++++++++++++++++++++++++++- 2 files changed, 294 insertions(+), 332 deletions(-) delete mode 100644 test/gvl_stream_test.rb diff --git a/test/gvl_stream_test.rb b/test/gvl_stream_test.rb deleted file mode 100644 index fbb429a..0000000 --- a/test/gvl_stream_test.rb +++ /dev/null @@ -1,330 +0,0 @@ -# frozen_string_literal: true - -require "test_helper" - -class GvlStreamingTest < Minitest::Test - # ========================================================================= - # Basic streaming functionality - # ========================================================================= - - def test_chunks_yields_string_chunks - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/5") - chunks = [] - resp.chunks do |chunk| - chunks << chunk - assert_kind_of String, chunk, "Each yielded chunk must be a String" - end - assert_equal 5, chunks.size, "Should yield exactly 5 chunks from /stream/5" - end - - def test_chunks_yields_binary_encoding - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/3") - resp.chunks do |chunk| - assert chunk.encoding == Encoding::BINARY || chunk.encoding == Encoding::ASCII_8BIT, - "Chunk should have binary encoding, got #{chunk.encoding}" - end - end - - def test_chunks_with_single_chunk_body - client = Wreq::Client.new - resp = client.get("http://localhost:8080/bytes/1024") - chunk_count = 0 - total_bytes = 0 - resp.chunks do |chunk| - chunk_count += 1 - total_bytes += chunk.bytesize - end - assert chunk_count >= 1, "Should yield at least one chunk" - assert_equal 1024, total_bytes, "Total bytes should match content length" - end - - def test_chunks_returns_nil - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/3") - result = resp.chunks { |_chunk| :processing } - assert_nil result, "chunks should return nil after completion" - end - - def test_chunks_with_empty_body - client = Wreq::Client.new - resp = client.get("http://localhost:8080/status/204") - chunk_count = 0 - resp.chunks do |_chunk| - chunk_count += 1 - end - assert_equal 0, chunk_count, "No chunks should be yielded for empty 204 response" - end - - # ========================================================================= - # Block requirement - # ========================================================================= - - def test_chunks_without_block_raises_error - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/3") - assert_raises(LocalJumpError) do - resp.chunks - end - end - - # ========================================================================= - # GVL concurrency — other Ruby threads can run during streaming - # ========================================================================= - - def test_other_threads_run_during_streaming - client = Wreq::Client.new - resp = client.get("http://localhost:8080/drip?duration=3&numbytes=3&delay=1") - - counter = 0 - tick_thread = Thread.new do - 30.times do - counter += 1 - sleep 0.1 - end - end - - chunks_received = 0 - resp.chunks do |_chunk| - chunks_received += 1 - end - - tick_thread.join(10) - - assert counter > 5, - "Counter only reached #{counter} — other threads may not be running during streaming. " \ - "GVL should be released during I/O waits." - assert chunks_received >= 1, "Should have received at least one chunk" - end - - def test_multiple_concurrent_streams - client = Wreq::Client.new - results = {} - done = {} - - t1 = Thread.new do - resp = client.get("http://localhost:8080/stream/3") - chunks = [] - resp.chunks { |c| chunks << c } - results[:t1] = chunks.size - done[:t1] = true - end - - t2 = Thread.new do - resp = client.get("http://localhost:8080/stream/3") - chunks = [] - resp.chunks { |c| chunks << c } - results[:t2] = chunks.size - done[:t2] = true - end - - t1.join(15) - t2.join(15) - - assert done[:t1], "Thread 1 should complete" - assert done[:t2], "Thread 2 should complete" - assert_equal 3, results[:t1], "Thread 1 should receive 3 chunks" - assert_equal 3, results[:t2], "Thread 2 should receive 3 chunks" - end - - # ========================================================================= - # Thread interruption during streaming - # ========================================================================= - - def test_thread_interrupt_during_streaming - url = "http://localhost:8080/drip?duration=5&numbytes=5" - thread = Thread.new do - resp = Wreq.get(url) - resp.chunks { |chunk| chunk } - rescue => _ - end - sleep 2 - thread.kill - killed = thread.join(5) - assert killed, "Streaming should be interruptible via Thread.kill" - end - - def test_thread_interrupt_during_slow_stream_with_block_processing - url = "http://localhost:8080/drip?duration=5&numbytes=5&delay=1" - thread = Thread.new do - resp = Wreq.get(url) - resp.chunks do |_chunk| - sleep 0.5 - end - rescue => _ - end - sleep 2 - thread.kill - killed = thread.join(5) - assert killed, "Streaming with slow block processing should be interruptible" - end - - # ========================================================================= - # Streaming error propagation - # ========================================================================= - - def test_chunks_propagates_streaming_errors - client = Wreq::Client.new - resp = client.get("http://localhost:8080/drip?duration=10&numbytes=10", timeout: 1) - error_raised = false - begin - resp.chunks do |_chunk| - # Just consume chunks — the timeout should fire mid-stream - end - rescue => e - error_raised = true - assert( - e.is_a?(Wreq::TimeoutError) || e.is_a?(Wreq::BodyError) || e.is_a?(Wreq::ConnectionResetError), - "Expected a streaming error (TimeoutError/BodyError/ConnectionResetError), got #{e.class}: #{e.message}" - ) - end - assert error_raised, "A streaming error should have been raised for a timed-out drip response" - end - - # ========================================================================= - # Block exception propagation - # ========================================================================= - - def test_exception_in_block_propagates - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/5") - error_raised = false - chunks_before_error = 0 - begin - resp.chunks do |chunk| - chunks_before_error += 1 - if chunks_before_error == 2 - raise RuntimeError, "intentional error in block" - end - end - rescue RuntimeError => e - error_raised = true - assert_equal "intentional error in block", e.message - end - assert error_raised, "Exception raised inside the block should propagate out" - assert_equal 2, chunks_before_error, "Should have processed 2 chunks before the error" - end - - # ========================================================================= - # Double-consumption protection - # ========================================================================= - - def test_chunks_called_twice_raises_error - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/3") - resp.chunks { |_c| } - - error_raised = false - begin - resp.chunks { |_c| } - rescue => e - error_raised = true - assert_instance_of Wreq::MemoryError, e, - "Second chunks call should raise MemoryError, got #{e.class}: #{e.message}" - end - assert error_raised, "Second chunks call should raise an error" - end - - def test_text_after_chunks_raises_error - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/3") - resp.chunks { |_c| } - - error_raised = false - begin - resp.text - rescue => e - error_raised = true - assert_instance_of Wreq::MemoryError, e, - "Calling text after chunks should raise MemoryError, got #{e.class}: #{e.message}" - end - assert error_raised, "Calling text after chunks should raise an error" - end - - # ========================================================================= - # Chunk content integrity - # ========================================================================= - - def test_chunks_content_matches_full_body - client = Wreq::Client.new - resp_full = client.get("http://localhost:8080/bytes/4096") - full_bytes = resp_full.bytes - - resp_stream = client.get("http://localhost:8080/bytes/4096") - streamed_bytes = "".b - resp_stream.chunks do |chunk| - streamed_bytes << chunk - end - - assert_equal full_bytes.bytesize, streamed_bytes.bytesize, - "Streamed body size should match full body size" - end - - def test_chunks_json_stream_content - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/5") - chunks = [] - resp.chunks do |chunk| - chunks << chunk - end - - chunks.each_with_index do |chunk, i| - assert_match(/\{.*\}/, chunk, - "Chunk #{i} should contain a JSON object, got: #{chunk[0..80]}") - end - end - - # ========================================================================= - # Block capture / GC safety - # ========================================================================= - - def test_block_not_garbage_collected_during_streaming - client = Wreq::Client.new - resp = client.get("http://localhost:8080/drip?duration=3&numbytes=3&delay=1") - - chunks_received = 0 - resp.chunks do |chunk| - chunks_received += 1 - GC.start - GC.start - end - - assert_equal 3, chunks_received, - "All 3 chunks should be received even with forced GC between yields" - end - - # ========================================================================= - # close() during/after streaming - # ========================================================================= - - def test_close_after_streaming - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/3") - resp.chunks { |_c| } - resp.close - end - - # ========================================================================= - # Client method variants - # ========================================================================= - - def test_chunks_via_module_method - resp = Wreq.get("http://localhost:8080/stream/3") - chunks = [] - resp.chunks do |chunk| - chunks << chunk - end - assert_equal 3, chunks.size, "Module-level Wreq.get + chunks should work" - end - - def test_chunks_via_client_instance - client = Wreq::Client.new - resp = client.get("http://localhost:8080/stream/3") - chunks = [] - resp.chunks do |chunk| - chunks << chunk - end - assert_equal 3, chunks.size, "Client instance get + chunks should work" - end -end \ No newline at end of file diff --git a/test/stream_test.rb b/test/stream_test.rb index ec9465c..38c74f1 100644 --- a/test/stream_test.rb +++ b/test/stream_test.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + require "test_helper" class StreamTest < Minitest::Test @@ -8,12 +10,16 @@ def test_simple_push_stream 3.times { |i| sender.push("chunk-#{i}\n") } sender.close end + resp = client.post("http://localhost:8080/post", body: sender, headers: {"Content-Type" => "text/plain"}) + assert_equal 200, resp.code + echoed = resp.json["data"] assert_includes echoed, "chunk-0" assert_includes echoed, "chunk-1" assert_includes echoed, "chunk-2" + producer.join end @@ -21,12 +27,124 @@ def test_response_body_chunks_stream client = Wreq::Client.new resp = client.get("http://localhost:8080/stream/5") chunks = [] + resp.chunks do |chunk| chunks << chunk - assert_kind_of String, chunk + assert_kind_of String, chunk, "Each yielded chunk must be a String" assert_match(/\{.*\}/, chunk) end - assert_equal 5, chunks.size + + assert_equal 5, chunks.size, "Should yield exactly 5 chunks from /stream/5" + end + + def test_chunks_yields_binary_encoding + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + + resp.chunks do |chunk| + assert chunk.encoding == Encoding::BINARY || chunk.encoding == Encoding::ASCII_8BIT, + "Chunk should have binary encoding, got #{chunk.encoding}" + end + end + + def test_chunks_with_single_chunk_body + client = Wreq::Client.new + resp = client.get("http://localhost:8080/bytes/1024") + chunk_count = 0 + total_bytes = 0 + + resp.chunks do |chunk| + chunk_count += 1 + total_bytes += chunk.bytesize + end + + assert chunk_count >= 1, "Should yield at least one chunk" + assert_equal 1024, total_bytes, "Total bytes should match content length" + end + + def test_chunks_returns_nil + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + + result = resp.chunks { |_chunk| :processing } + + assert_nil result, "chunks should return nil after completion" + end + + def test_chunks_with_empty_body + client = Wreq::Client.new + resp = client.get("http://localhost:8080/status/204") + chunk_count = 0 + + resp.chunks do |_chunk| + chunk_count += 1 + end + + assert_equal 0, chunk_count, "No chunks should be yielded for empty 204 response" + end + + def test_chunks_without_block_raises_error + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + + assert_raises(LocalJumpError) do + resp.chunks + end + end + + def test_other_threads_run_during_streaming + client = Wreq::Client.new + resp = client.get("http://localhost:8080/drip?duration=3&numbytes=3&delay=1") + + counter = 0 + tick_thread = Thread.new do + 30.times do + counter += 1 + sleep 0.1 + end + end + + chunks_received = 0 + resp.chunks do |_chunk| + chunks_received += 1 + end + + tick_thread.join(10) + + assert counter > 5, + "Counter only reached #{counter} - other threads may not be running during streaming. " \ + "GVL should be released during I/O waits." + assert chunks_received >= 1, "Should have received at least one chunk" + end + + def test_multiple_concurrent_streams + client = Wreq::Client.new + results = {} + done = {} + + t1 = Thread.new do + resp = client.get("http://localhost:8080/stream/3") + chunks = [] + resp.chunks { |chunk| chunks << chunk } + results[:t1] = chunks.size + done[:t1] = true + end + + t2 = Thread.new do + resp = client.get("http://localhost:8080/stream/3") + chunks = [] + resp.chunks { |chunk| chunks << chunk } + results[:t2] = chunks.size + done[:t2] = true + end + + t1.join(15) + t2.join(15) + + assert done[:t1], "Thread 1 should complete" + assert done[:t2], "Thread 2 should complete" + assert_equal 3, results[:t1], "Thread 1 should receive 3 chunks" + assert_equal 3, results[:t2], "Thread 2 should receive 3 chunks" end def test_thread_interrupt_connect @@ -35,9 +153,11 @@ def test_thread_interrupt_connect Wreq.get(url) rescue => _ end + sleep 2 thread.kill killed = thread.join(5) + assert killed, "Connect phase should be interruptible" end @@ -47,9 +167,11 @@ def test_thread_interrupt_connect_with_timeout Wreq.get(url, timeout: 60) rescue => _ end + sleep 2 thread.kill killed = thread.join(5) + assert killed, "Connect+timeout phase should be interruptible" end @@ -60,9 +182,11 @@ def test_thread_interrupt_body_reading resp.text rescue => _ end + sleep 2 thread.kill killed = thread.join(5) + assert killed, "Body reading should be interruptible" end @@ -73,9 +197,177 @@ def test_thread_interrupt_body_streaming resp.chunks { |chunk| chunk } rescue => _ end + sleep 2 thread.kill killed = thread.join(5) + assert killed, "Body streaming should be interruptible" end + + def test_thread_interrupt_during_slow_stream_with_block_processing + url = "http://localhost:8080/drip?duration=5&numbytes=5&delay=1" + thread = Thread.new do + resp = Wreq.get(url) + resp.chunks do |_chunk| + sleep 0.5 + end + rescue => _ + end + + sleep 2 + thread.kill + killed = thread.join(5) + + assert killed, "Streaming with slow block processing should be interruptible" + end + + def test_chunks_propagates_streaming_errors + client = Wreq::Client.new + resp = client.get("http://localhost:8080/drip?duration=10&numbytes=10", timeout: 1) + error_raised = false + + begin + resp.chunks do |_chunk| + end + rescue => e + error_raised = true + assert( + e.is_a?(Wreq::TimeoutError) || e.is_a?(Wreq::BodyError) || e.is_a?(Wreq::ConnectionResetError), + "Expected a streaming error (TimeoutError/BodyError/ConnectionResetError), got #{e.class}: #{e.message}" + ) + end + + assert error_raised, "A streaming error should have been raised for a timed-out drip response" + end + + def test_exception_in_block_propagates + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/5") + error_raised = false + chunks_before_error = 0 + + begin + resp.chunks do |_chunk| + chunks_before_error += 1 + raise RuntimeError, "intentional error in block" if chunks_before_error == 2 + end + rescue RuntimeError => e + error_raised = true + assert_equal "intentional error in block", e.message + end + + assert error_raised, "Exception raised inside the block should propagate out" + assert_equal 2, chunks_before_error, "Should have processed 2 chunks before the error" + end + + def test_chunks_called_twice_raises_error + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + resp.chunks { |_chunk| } + error_raised = false + + begin + resp.chunks { |_chunk| } + rescue => e + error_raised = true + assert_instance_of Wreq::MemoryError, e, + "Second chunks call should raise MemoryError, got #{e.class}: #{e.message}" + end + + assert error_raised, "Second chunks call should raise an error" + end + + def test_text_after_chunks_raises_error + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + resp.chunks { |_chunk| } + error_raised = false + + begin + resp.text + rescue => e + error_raised = true + assert_instance_of Wreq::MemoryError, e, + "Calling text after chunks should raise MemoryError, got #{e.class}: #{e.message}" + end + + assert error_raised, "Calling text after chunks should raise an error" + end + + def test_chunks_content_matches_full_body + client = Wreq::Client.new + resp_full = client.get("http://localhost:8080/bytes/4096") + full_bytes = resp_full.bytes + + resp_stream = client.get("http://localhost:8080/bytes/4096") + streamed_bytes = "".b + resp_stream.chunks do |chunk| + streamed_bytes << chunk + end + + assert_equal full_bytes.bytesize, streamed_bytes.bytesize, + "Streamed body size should match full body size" + end + + def test_chunks_json_stream_content + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/5") + chunks = [] + + resp.chunks do |chunk| + chunks << chunk + end + + chunks.each_with_index do |chunk, i| + assert_match(/\{.*\}/, chunk, + "Chunk #{i} should contain a JSON object, got: #{chunk[0..80]}") + end + end + + def test_block_not_garbage_collected_during_streaming + client = Wreq::Client.new + resp = client.get("http://localhost:8080/drip?duration=3&numbytes=3&delay=1") + chunks_received = 0 + + resp.chunks do |_chunk| + chunks_received += 1 + GC.start + GC.start + end + + assert_equal 3, chunks_received, + "All 3 chunks should be received even with forced GC between yields" + end + + def test_close_after_streaming + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + + resp.chunks { |_chunk| } + resp.close + end + + def test_chunks_via_module_method + resp = Wreq.get("http://localhost:8080/stream/3") + chunks = [] + + resp.chunks do |chunk| + chunks << chunk + end + + assert_equal 3, chunks.size, "Module-level Wreq.get + chunks should work" + end + + def test_chunks_via_client_instance + client = Wreq::Client.new + resp = client.get("http://localhost:8080/stream/3") + chunks = [] + + resp.chunks do |chunk| + chunks << chunk + end + + assert_equal 3, chunks.size, "Client instance get + chunks should work" + end end From 6db7ca130dfff951b7f9f19cc75ff5e6a5ffd308 Mon Sep 17 00:00:00 2001 From: 0x676e67 Date: Thu, 28 May 2026 17:22:19 +0800 Subject: [PATCH 2/2] test(stream): simplify stream test flow --- test/stream_test.rb | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/stream_test.rb b/test/stream_test.rb index 38c74f1..cf5a786 100644 --- a/test/stream_test.rb +++ b/test/stream_test.rb @@ -1,5 +1,3 @@ -# frozen_string_literal: true - require "test_helper" class StreamTest < Minitest::Test @@ -250,7 +248,7 @@ def test_exception_in_block_propagates begin resp.chunks do |_chunk| chunks_before_error += 1 - raise RuntimeError, "intentional error in block" if chunks_before_error == 2 + raise "intentional error in block" if chunks_before_error == 2 end rescue RuntimeError => e error_raised = true