Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 39 additions & 24 deletions crates/engine/tests/backpressure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,29 @@
//! downstream nodes are slower than upstream nodes without deadlocking.

use std::path::Path;
use std::time::Duration;
use std::time::{Duration, Instant};
use streamkit_core::control::EngineControlMessage;
use streamkit_core::state::NodeState;
use streamkit_engine::{DynamicEngineConfig, Engine};
use streamkit_engine::{DynamicEngineConfig, DynamicEngineHandle, Engine};
use tokio::time::timeout;

/// Poll `handle.get_node_states()` until a predicate holds, with timeout.
async fn wait_for_states<F>(handle: &DynamicEngineHandle, timeout_dur: Duration, pred: F) -> bool
where
F: Fn(&std::collections::HashMap<String, NodeState>) -> bool,
{
let deadline = Instant::now() + timeout_dur;
while Instant::now() < deadline {
if let Ok(states) = handle.get_node_states().await {
if pred(&states) {
return true;
}
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
false
}

/// Tests that a fast file reader feeding a slow pacer node doesn't deadlock.
/// This validates the Per-Pin Distributor Architecture correctly handles backpressure.
#[tokio::test]
Expand Down Expand Up @@ -141,22 +158,16 @@ async fn test_backpressure_no_deadlock() {
.await
.expect("Failed to connect muxer to writer");

// Wait for nodes to become ready and start processing
tokio::time::sleep(Duration::from_millis(500)).await;

// 6. Verify nodes are running
let states = handle.get_node_states().await.expect("Failed to get node states");

assert!(
matches!(states.get("reader"), Some(NodeState::Running | NodeState::Ready)),
"Reader should be running or ready, got: {:?}",
states.get("reader")
);
assert!(
matches!(states.get("pacer"), Some(NodeState::Running)),
"Pacer should be running, got: {:?}",
states.get("pacer")
);
// 6. Wait for nodes to become ready and start processing
let nodes_ready = wait_for_states(&handle, Duration::from_secs(5), |states| {
let reader_ok = states
.get("reader")
.is_some_and(|s| matches!(s, NodeState::Running | NodeState::Ready));
let pacer_ok = states.get("pacer").is_some_and(|s| matches!(s, NodeState::Running));
reader_ok && pacer_ok
})
.await;
assert!(nodes_ready, "Reader should be running/ready and pacer should be running");

// 7. Let the pipeline run for a bit - this is where deadlock would occur in the old architecture
// The demuxer will produce Audio packets faster than pacer can forward them (0.1x speed)
Expand Down Expand Up @@ -214,11 +225,11 @@ async fn test_dynamic_connection_under_backpressure() {
.await
.unwrap();

tokio::time::sleep(Duration::from_millis(100)).await;

// Verify node is running
let states = handle.get_node_states().await.unwrap();
assert!(matches!(states.get("pacer"), Some(NodeState::Running)), "Pacer should be running");
let pacer_running = wait_for_states(&handle, Duration::from_secs(5), |states| {
matches!(states.get("pacer"), Some(NodeState::Running))
})
.await;
assert!(pacer_running, "Pacer should be running");

// The key test: verify engine remains responsive when managing connections
let result = timeout(Duration::from_secs(1), handle.get_node_states()).await;
Expand Down Expand Up @@ -251,7 +262,11 @@ async fn test_node_removal_under_backpressure() {
.await
.unwrap();

tokio::time::sleep(Duration::from_millis(100)).await;
let pacer_created = wait_for_states(&handle, Duration::from_secs(5), |states| {
states.get("pacer").is_some_and(|s| !matches!(s, NodeState::Creating))
})
.await;
assert!(pacer_created, "Pacer should have left Creating state");

// Remove the node
handle
Expand Down
Loading