diff --git a/crates/engine/tests/backpressure.rs b/crates/engine/tests/backpressure.rs index 710a075d..8d4e91cc 100644 --- a/crates/engine/tests/backpressure.rs +++ b/crates/engine/tests/backpressure.rs @@ -8,12 +8,29 @@ //! downstream nodes are slower than upstream nodes without deadlocking. use std::path::Path; -use std::time::Duration; +use std::time::{Duration, Instant}; use streamkit_core::control::EngineControlMessage; use streamkit_core::state::NodeState; -use streamkit_engine::{DynamicEngineConfig, Engine}; +use streamkit_engine::{DynamicEngineConfig, DynamicEngineHandle, Engine}; use tokio::time::timeout; +/// Poll `handle.get_node_states()` until a predicate holds, with timeout. +async fn wait_for_states(handle: &DynamicEngineHandle, timeout_dur: Duration, pred: F) -> bool +where + F: Fn(&std::collections::HashMap) -> bool, +{ + let deadline = Instant::now() + timeout_dur; + while Instant::now() < deadline { + if let Ok(states) = handle.get_node_states().await { + if pred(&states) { + return true; + } + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + false +} + /// Tests that a fast file reader feeding a slow pacer node doesn't deadlock. /// This validates the Per-Pin Distributor Architecture correctly handles backpressure. #[tokio::test] @@ -141,22 +158,16 @@ async fn test_backpressure_no_deadlock() { .await .expect("Failed to connect muxer to writer"); - // Wait for nodes to become ready and start processing - tokio::time::sleep(Duration::from_millis(500)).await; - - // 6. Verify nodes are running - let states = handle.get_node_states().await.expect("Failed to get node states"); - - assert!( - matches!(states.get("reader"), Some(NodeState::Running | NodeState::Ready)), - "Reader should be running or ready, got: {:?}", - states.get("reader") - ); - assert!( - matches!(states.get("pacer"), Some(NodeState::Running)), - "Pacer should be running, got: {:?}", - states.get("pacer") - ); + // 6. Wait for nodes to become ready and start processing + let nodes_ready = wait_for_states(&handle, Duration::from_secs(5), |states| { + let reader_ok = states + .get("reader") + .is_some_and(|s| matches!(s, NodeState::Running | NodeState::Ready)); + let pacer_ok = states.get("pacer").is_some_and(|s| matches!(s, NodeState::Running)); + reader_ok && pacer_ok + }) + .await; + assert!(nodes_ready, "Reader should be running/ready and pacer should be running"); // 7. Let the pipeline run for a bit - this is where deadlock would occur in the old architecture // The demuxer will produce Audio packets faster than pacer can forward them (0.1x speed) @@ -214,11 +225,11 @@ async fn test_dynamic_connection_under_backpressure() { .await .unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - - // Verify node is running - let states = handle.get_node_states().await.unwrap(); - assert!(matches!(states.get("pacer"), Some(NodeState::Running)), "Pacer should be running"); + let pacer_running = wait_for_states(&handle, Duration::from_secs(5), |states| { + matches!(states.get("pacer"), Some(NodeState::Running)) + }) + .await; + assert!(pacer_running, "Pacer should be running"); // The key test: verify engine remains responsive when managing connections let result = timeout(Duration::from_secs(1), handle.get_node_states()).await; @@ -251,7 +262,11 @@ async fn test_node_removal_under_backpressure() { .await .unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; + let pacer_created = wait_for_states(&handle, Duration::from_secs(5), |states| { + states.get("pacer").is_some_and(|s| !matches!(s, NodeState::Creating)) + }) + .await; + assert!(pacer_created, "Pacer should have left Creating state"); // Remove the node handle