Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/agent/routine_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1110,12 +1110,13 @@ async fn send_notification(
}
}

#[cfg(feature = "libsql")]
impl RoutineEngine {
/// Returns the current running count as a read-only snapshot.
/// Returns a clone of the running count atomic counter.
///
/// Intended for test synchronisation only — do not use in production paths.
pub fn running_count(&self) -> usize {
self.running_count.load(Ordering::SeqCst)
pub fn running_count(&self) -> Arc<AtomicUsize> {
self.running_count.clone()
}
}

Expand Down Expand Up @@ -1303,4 +1304,4 @@ mod tests {
assert_eq!(finish_reason_length, crate::llm::FinishReason::Length);
assert_eq!(finish_reason_stop, crate::llm::FinishReason::Stop);
}
}
}
6 changes: 5 additions & 1 deletion tests/e2e_traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

mod support;

#[cfg(feature = "libsql")]
#[path = "support/routine_sync.rs"]
mod routine_sync;

#[path = "e2e_traces/advanced_traces.rs"]
mod advanced_traces;
#[path = "e2e_traces/attachments.rs"]
Expand Down Expand Up @@ -42,4 +46,4 @@ mod trace_memory;
#[path = "e2e_traces/worker_coverage.rs"]
mod worker_coverage;
#[path = "e2e_traces/workspace_coverage.rs"]
mod workspace_coverage;
mod workspace_coverage;
6 changes: 3 additions & 3 deletions tests/e2e_traces/routine_cooldown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use chrono::Utc;
use ironclaw::agent::routine::Trigger;
use ironclaw::db::RoutineRuntimeUpdate;

use crate::routine_sync::{wait_for_idle, wait_for_persisted_run};
use crate::support::routines::{
create_test_db, create_workspace, make_minimal_engine, make_routine,
make_test_incoming_message, wait_for_idle, wait_for_persisted_run,
create_test_db, create_workspace, make_minimal_engine, make_routine, make_test_incoming_message,
};
use crate::support::trace_llm::{LlmTrace, TraceResponse, TraceStep};

Expand Down Expand Up @@ -81,4 +81,4 @@ async fn routine_cooldown() {
// Second fire should be blocked by cooldown.
let fired2 = engine.check_event_triggers(&msg).await;
assert_eq!(fired2, 0, "Second fire should be blocked by cooldown");
}
}
8 changes: 3 additions & 5 deletions tests/e2e_traces/routine_cron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ use chrono::Utc;

use ironclaw::agent::routine::Trigger;

use crate::support::routines::{
create_test_db, create_workspace, make_minimal_engine, make_routine, wait_for_idle,
wait_for_persisted_run,
};
use crate::routine_sync::{wait_for_idle, wait_for_persisted_run};
use crate::support::routines::{create_test_db, create_workspace, make_minimal_engine, make_routine};
use crate::support::trace_llm::{LlmTrace, TraceResponse, TraceStep};

#[tokio::test]
Expand Down Expand Up @@ -62,4 +60,4 @@ async fn cron_routine_fires() {
// Notification may or may not be sent depending on config;
// just verify no panic occurred. Drain the channel.
let _ = notify_rx.try_recv();
}
}
6 changes: 3 additions & 3 deletions tests/e2e_traces/routine_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use ironclaw::agent::routine::Trigger;

use std::time::Duration;

use crate::routine_sync::{wait_for_idle, wait_for_persisted_run};
use crate::support::routines::{
create_test_db, create_workspace, make_minimal_engine, make_routine,
make_test_incoming_message, wait_for_idle, wait_for_persisted_run,
create_test_db, create_workspace, make_minimal_engine, make_routine, make_test_incoming_message,
};
use crate::support::trace_llm::{LlmTrace, TraceResponse, TraceStep};

Expand Down Expand Up @@ -67,4 +67,4 @@ async fn event_trigger_matches() {
let non_matching_msg = make_test_incoming_message("check the staging environment");
let fired_neg = engine.check_event_triggers(&non_matching_msg).await;
assert_eq!(fired_neg, 0, "Expected 0 routines fired on non-match");
}
}
5 changes: 3 additions & 2 deletions tests/e2e_traces/routine_system_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

use std::time::Duration;

use crate::routine_sync::{wait_for_idle, wait_for_persisted_run};
use crate::support::routines::{
SystemEventSpec, assert_system_event_count, create_test_db, create_workspace,
make_minimal_engine, register_github_issue_routine, wait_for_idle, wait_for_persisted_run,
make_minimal_engine, register_github_issue_routine,
};
use crate::support::trace_llm::{LlmTrace, TraceResponse, TraceStep};

Expand Down Expand Up @@ -67,4 +68,4 @@ async fn system_event_trigger_matches_and_filters() {
for (spec, expected, msg) in scenarios {
assert_system_event_count(&engine, spec, expected, msg).await;
}
}
}
90 changes: 90 additions & 0 deletions tests/support/routine_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//! Deterministic synchronisation helpers for routine engine E2E tests.
//!
//! Provides `wait_for_idle` and `wait_for_persisted_run` for coordinating
//! asynchronous routine execution and database persistence in tests that
//! trigger `RoutineEngine`'s task-spawning paths.
//!
//! This module is intentionally **not** declared in `support/mod.rs`. It is
//! included directly from `tests/e2e_traces.rs` so that it is only compiled
//! into the test binary that actually calls these helpers, avoiding spurious
//! `dead_code` warnings in unrelated test binaries without requiring any lint
//! suppression.

#![cfg(feature = "libsql")]

use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;

use uuid::Uuid;

use ironclaw::agent::routine_engine::RoutineEngine;
use ironclaw::db::Database;

/// Polls until the engine's running count reaches zero or the timeout expires.
///
/// This provides deterministic synchronisation for tests that need to wait
/// for asynchronous routine execution to complete, eliminating timing-dependent
/// flakiness without slowing down the test suite on fast machines.
///
/// **Note:** Combine with [`wait_for_persisted_run`] to ensure both execution
/// completion and database persistence, as the running count may reach zero
/// before the database record is fully committed.
pub async fn wait_for_idle(engine: &RoutineEngine, timeout: Duration) {
let start = std::time::Instant::now();
let poll_interval = Duration::from_millis(10);

loop {
let count = engine.running_count().load(Ordering::SeqCst);
if count == 0 {
return;
}

if start.elapsed() >= timeout {
panic!(
"Timeout waiting for engine to become idle (running count: {})",
count
);
}

tokio::time::sleep(poll_interval).await;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Avoid having both timeout and a hard-coded max_attempts to prevent surprising behaviour in long-running tests

Because max_attempts is tied to a fixed 5s window while timeout is configurable, callers using a longer timeout (e.g. 15s in slow CI) would still hit the max_attempts condition and panic after ~5s. To keep behavior aligned with the requested timeout, either derive max_attempts from timeout and poll_interval (e.g. timeout / poll_interval) or remove max_attempts and rely solely on timeout.

Suggested implementation:

    // Derive max_attempts from the configured timeout so the loop's upper bound
    // is consistent with the requested maximum wait duration.
    let max_attempts = (timeout.as_millis() / poll_interval.as_millis())
        .max(1) as u32;

Depending on the exact current implementation, you may also need to:

  1. Ensure timeout and poll_interval are both Duration values already in scope at the point where max_attempts is defined.
  2. If max_attempts is typed as something other than u32 (e.g. usize), adjust the cast to match:
    • For usize: as usize instead of as u32.
  3. If the doc comment or tests mention a fixed 5s window / 500 attempts, update them to reflect that the limit is now derived from timeout (e.g. "waits up to timeout for persistence").
  4. If there are multiple helpers in tests/support/routine_sync.rs that define a hard-coded max_attempts (for idle or persistence helpers), apply the same pattern to each one so all of them respect the configured timeout.

}
}

/// Polls until a routine run is persisted in the database or the timeout expires.
///
/// This helper provides deterministic synchronisation for database persistence,
/// complementing [`wait_for_idle`] which only waits for in-memory execution
/// completion. Call this after `wait_for_idle` to ensure the routine run is
/// durably recorded before asserting on persisted state.
///
/// # Arguments
/// * `db` - The database to query for persisted runs.
/// * `routine_id` - The ID of the routine to check for runs.
/// * `timeout` - Maximum duration to wait for persistence.
pub async fn wait_for_persisted_run(db: &Arc<dyn Database>, routine_id: Uuid, timeout: Duration) {
let start = std::time::Instant::now();
let poll_interval = Duration::from_millis(10);
let max_attempts: u32 = 500; // At 10ms intervals, this is ~5 seconds.

let mut attempts: u32 = 0;
loop {
let runs = db
.list_routine_runs(routine_id, 10)
.await
.expect("list_routine_runs should not fail");

if !runs.is_empty() {
return;
}

attempts += 1;
if attempts >= max_attempts || start.elapsed() >= timeout {
panic!(
"Timeout waiting for routine run to be persisted (routine_id: {routine_id}, attempts: {attempts})"
);
}

tokio::time::sleep(poll_interval).await;
}
}
68 changes: 0 additions & 68 deletions tests/support/routines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,71 +174,3 @@ pub async fn assert_system_event_count(
.await;
assert_eq!(fired, expected, "{msg}");
}

/// Polls until the engine's running count reaches zero or the timeout expires.
///
/// This provides deterministic synchronisation for tests that need to wait
/// for asynchronous routine execution to complete, eliminating timing-dependent
/// flakiness without slowing down the test suite on fast machines.
///
/// **Note:** This helper should be combined with `wait_for_persisted_run` to ensure
/// both execution completion and database persistence, as the running count may
/// reach zero before the database record is fully committed.
#[allow(dead_code)]
pub async fn wait_for_idle(engine: &RoutineEngine, timeout: Duration) {
let start = std::time::Instant::now();
let poll_interval = Duration::from_millis(10);

loop {
let count = engine.running_count();
if count == 0 {
return;
}

if start.elapsed() >= timeout {
panic!(
"Timeout waiting for engine to become idle (running count: {})",
count
);
}

tokio::time::sleep(poll_interval).await;
}
}

/// Polls until a routine run is persisted in the database or the timeout expires.
///
/// This helper provides deterministic synchronisation for database persistence,
/// complementing `wait_for_idle` which only waits for in-memory execution completion.
/// Use this helper after `wait_for_idle` to ensure the routine run is durably recorded.
///
/// # Arguments
/// * `db` - The database to query for persisted runs
/// * `routine_id` - The ID of the routine to check for runs
/// * `timeout` - Maximum duration to wait for persistence
#[allow(dead_code)]
pub async fn wait_for_persisted_run(db: &Arc<dyn Database>, routine_id: Uuid, timeout: Duration) {
let start = std::time::Instant::now();
let poll_interval = Duration::from_millis(10);

loop {
let runs = db
.list_routine_runs(routine_id, 10)
.await
.expect("list_routine_runs should not fail");

if !runs.is_empty() {
return;
}

if start.elapsed() >= timeout {
panic!(
"Timeout waiting for routine run to be persisted (routine_id: {}, elapsed: {:?})",
routine_id,
start.elapsed()
);
}

tokio::time::sleep(poll_interval).await;
}
}
Loading