From 817bce1f3276f1f8cc06e4223855fccba48a2e54 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 25 Apr 2026 12:02:46 +0000 Subject: [PATCH] fix(engine): replace fixed sleeps with polling in backpressure tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The test_dynamic_connection_under_backpressure test used a fixed 100ms sleep before asserting the pacer node reached Running state. On slow CI runners the node lifecycle (Creating → Initializing → Running) takes longer than 100ms, causing sporadic assertion failures. Replace all fixed sleep + assert patterns across the three backpressure tests with a wait_for_states polling helper (same pattern used in the async_node_creation test suite). Each call polls get_node_states() every 20ms with a 5-second timeout, eliminating timing-dependent flakiness. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/engine/tests/backpressure.rs | 63 ++++++++++++++++++----------- 1 file changed, 39 insertions(+), 24 deletions(-) diff --git a/crates/engine/tests/backpressure.rs b/crates/engine/tests/backpressure.rs index 710a075d..8d4e91cc 100644 --- a/crates/engine/tests/backpressure.rs +++ b/crates/engine/tests/backpressure.rs @@ -8,12 +8,29 @@ //! downstream nodes are slower than upstream nodes without deadlocking. use std::path::Path; -use std::time::Duration; +use std::time::{Duration, Instant}; use streamkit_core::control::EngineControlMessage; use streamkit_core::state::NodeState; -use streamkit_engine::{DynamicEngineConfig, Engine}; +use streamkit_engine::{DynamicEngineConfig, DynamicEngineHandle, Engine}; use tokio::time::timeout; +/// Poll `handle.get_node_states()` until a predicate holds, with timeout. +async fn wait_for_states(handle: &DynamicEngineHandle, timeout_dur: Duration, pred: F) -> bool +where + F: Fn(&std::collections::HashMap) -> bool, +{ + let deadline = Instant::now() + timeout_dur; + while Instant::now() < deadline { + if let Ok(states) = handle.get_node_states().await { + if pred(&states) { + return true; + } + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + false +} + /// Tests that a fast file reader feeding a slow pacer node doesn't deadlock. /// This validates the Per-Pin Distributor Architecture correctly handles backpressure. #[tokio::test] @@ -141,22 +158,16 @@ async fn test_backpressure_no_deadlock() { .await .expect("Failed to connect muxer to writer"); - // Wait for nodes to become ready and start processing - tokio::time::sleep(Duration::from_millis(500)).await; - - // 6. Verify nodes are running - let states = handle.get_node_states().await.expect("Failed to get node states"); - - assert!( - matches!(states.get("reader"), Some(NodeState::Running | NodeState::Ready)), - "Reader should be running or ready, got: {:?}", - states.get("reader") - ); - assert!( - matches!(states.get("pacer"), Some(NodeState::Running)), - "Pacer should be running, got: {:?}", - states.get("pacer") - ); + // 6. Wait for nodes to become ready and start processing + let nodes_ready = wait_for_states(&handle, Duration::from_secs(5), |states| { + let reader_ok = states + .get("reader") + .is_some_and(|s| matches!(s, NodeState::Running | NodeState::Ready)); + let pacer_ok = states.get("pacer").is_some_and(|s| matches!(s, NodeState::Running)); + reader_ok && pacer_ok + }) + .await; + assert!(nodes_ready, "Reader should be running/ready and pacer should be running"); // 7. Let the pipeline run for a bit - this is where deadlock would occur in the old architecture // The demuxer will produce Audio packets faster than pacer can forward them (0.1x speed) @@ -214,11 +225,11 @@ async fn test_dynamic_connection_under_backpressure() { .await .unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - - // Verify node is running - let states = handle.get_node_states().await.unwrap(); - assert!(matches!(states.get("pacer"), Some(NodeState::Running)), "Pacer should be running"); + let pacer_running = wait_for_states(&handle, Duration::from_secs(5), |states| { + matches!(states.get("pacer"), Some(NodeState::Running)) + }) + .await; + assert!(pacer_running, "Pacer should be running"); // The key test: verify engine remains responsive when managing connections let result = timeout(Duration::from_secs(1), handle.get_node_states()).await; @@ -251,7 +262,11 @@ async fn test_node_removal_under_backpressure() { .await .unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; + let pacer_created = wait_for_states(&handle, Duration::from_secs(5), |states| { + states.get("pacer").is_some_and(|s| !matches!(s, NodeState::Creating)) + }) + .await; + assert!(pacer_created, "Pacer should have left Creating state"); // Remove the node handle