From 313d0bd939b5789a1d1d8b69289d885c5011bf80 Mon Sep 17 00:00:00 2001 From: MatthewMckee4 Date: Wed, 15 Apr 2026 16:21:58 +0100 Subject: [PATCH 1/2] Remove cross-run duration weighting from test partitioning --- crates/karva_cache/src/cache.rs | 73 --------- crates/karva_cache/src/lib.rs | 2 +- crates/karva_runner/src/orchestration.rs | 24 +-- crates/karva_runner/src/partition.rs | 189 +++++------------------ 4 files changed, 44 insertions(+), 244 deletions(-) diff --git a/crates/karva_cache/src/cache.rs b/crates/karva_cache/src/cache.rs index 1ae1dbe6..733358d4 100644 --- a/crates/karva_cache/src/cache.rs +++ b/crates/karva_cache/src/cache.rs @@ -240,40 +240,6 @@ fn collect_run_dirs(cache_dir: &Utf8Path) -> Result> { Ok(run_dirs) } -/// Reads durations from the most recent test run. -/// -/// Finds the most recent `run-{timestamp}` directory, then aggregates -/// all durations from all worker directories within it. -pub fn read_recent_durations(cache_dir: &Utf8PathBuf) -> Result> { - let run_dirs = collect_run_dirs(cache_dir)?; - - let most_recent = run_dirs - .last() - .ok_or_else(|| anyhow::anyhow!("No run directories found"))?; - - let run_dir = cache_dir.join(most_recent); - - let mut aggregated_durations = HashMap::new(); - - for entry in fs::read_dir(&run_dir)? { - let entry = entry?; - let worker_path = Utf8PathBuf::try_from(entry.path()) - .map_err(|e| anyhow::anyhow!("Invalid UTF-8 path: {e}"))?; - - if !worker_path.is_dir() { - continue; - } - - if let Some(durations) = - read_and_parse::>(&worker_path, DURATIONS_FILE)? - { - aggregated_durations.extend(durations); - } - } - - Ok(aggregated_durations) -} - /// Result of a cache prune operation. pub struct PruneResult { /// Names of the removed run directories. @@ -323,18 +289,6 @@ mod tests { use super::*; - fn create_cache_with_durations( - dir: &std::path::Path, - run_name: &str, - worker_id: usize, - durations: &HashMap, - ) { - let worker_dir = dir.join(run_name).join(format!("worker-{worker_id}")); - fs::create_dir_all(&worker_dir).unwrap(); - let json = serde_json::to_string(durations).unwrap(); - fs::write(worker_dir.join(DURATIONS_FILE), json).unwrap(); - } - fn create_cache_with_stats( dir: &std::path::Path, run_name: &str, @@ -346,33 +300,6 @@ mod tests { fs::write(worker_dir.join(STATS_FILE), stats_json).unwrap(); } - #[test] - fn read_recent_durations_returns_from_most_recent_run() { - let tmp = tempfile::tempdir().unwrap(); - let cache_dir = Utf8PathBuf::try_from(tmp.path().to_path_buf()).unwrap(); - - let mut old_durations = HashMap::new(); - old_durations.insert("test_old".to_string(), Duration::from_millis(100)); - create_cache_with_durations(tmp.path(), "run-100", 0, &old_durations); - - let mut new_durations = HashMap::new(); - new_durations.insert("test_new".to_string(), Duration::from_millis(200)); - create_cache_with_durations(tmp.path(), "run-200", 0, &new_durations); - - let result = read_recent_durations(&cache_dir).unwrap(); - assert!(result.contains_key("test_new")); - assert!(!result.contains_key("test_old")); - } - - #[test] - fn read_recent_durations_errors_when_no_runs() { - let tmp = tempfile::tempdir().unwrap(); - let cache_dir = Utf8PathBuf::try_from(tmp.path().to_path_buf()).unwrap(); - - let result = read_recent_durations(&cache_dir); - assert!(result.is_err()); - } - #[test] fn aggregate_results_merges_stats_from_multiple_workers() { let tmp = tempfile::tempdir().unwrap(); diff --git a/crates/karva_cache/src/lib.rs b/crates/karva_cache/src/lib.rs index ca24225a..7bdb2188 100644 --- a/crates/karva_cache/src/lib.rs +++ b/crates/karva_cache/src/lib.rs @@ -3,7 +3,7 @@ pub(crate) mod hash; pub use cache::{ AggregatedResults, Cache, PruneResult, clean_cache, prune_cache, read_last_failed, - read_recent_durations, write_last_failed, + write_last_failed, }; pub use hash::RunHash; diff --git a/crates/karva_runner/src/orchestration.rs b/crates/karva_runner/src/orchestration.rs index 9a3df46c..d679c1cf 100644 --- a/crates/karva_runner/src/orchestration.rs +++ b/crates/karva_runner/src/orchestration.rs @@ -10,8 +10,7 @@ use crossbeam_channel::{Receiver, TryRecvError}; use crate::shutdown::shutdown_receiver; use karva_cache::{ - AggregatedResults, CACHE_DIR, Cache, RunHash, read_last_failed, read_recent_durations, - write_last_failed, + AggregatedResults, CACHE_DIR, Cache, RunHash, read_last_failed, write_last_failed, }; use karva_cli::SubTestCommand; use karva_collector::{CollectedPackage, CollectionSettings}; @@ -280,20 +279,6 @@ pub fn run_parallel_tests( let cache_dir = project.cwd().join(CACHE_DIR); - // Read durations from the most recent run to optimize partitioning - let previous_durations = if config.no_cache { - std::collections::HashMap::new() - } else { - read_recent_durations(&cache_dir).unwrap_or_default() - }; - - if !previous_durations.is_empty() { - tracing::debug!( - "Found {} previous test durations to guide partitioning", - previous_durations.len() - ); - } - let last_failed_set: HashSet = if config.last_failed { read_last_failed(&cache_dir) .unwrap_or_default() @@ -303,12 +288,7 @@ pub fn run_parallel_tests( HashSet::new() }; - let partitions = partition_collected_tests( - &collected, - num_workers, - &previous_durations, - &last_failed_set, - ); + let partitions = partition_collected_tests(&collected, num_workers, &last_failed_set); let run_hash = RunHash::current_time(); diff --git a/crates/karva_runner/src/partition.rs b/crates/karva_runner/src/partition.rs index f6ac6844..a268a95e 100644 --- a/crates/karva_runner/src/partition.rs +++ b/crates/karva_runner/src/partition.rs @@ -1,5 +1,4 @@ use std::collections::{HashMap, HashSet}; -use std::time::Duration; /// Test metadata used for partitioning decisions #[derive(Debug, Clone)] @@ -8,61 +7,25 @@ struct TestInfo { /// The qualified name of the test (e.g., `test_a::test_1`), used for last-failed filtering. qualified_name: String, path: String, - /// Actual runtime from previous test run (if available) - duration: Option, -} - -/// Calculate the weight of a test for partitioning. -/// -/// Uses the actual duration in microseconds if available, otherwise defaults to 1. -fn test_weight(duration: Option) -> u128 { - duration.map_or(1, |d| d.as_micros()) -} - -/// A group of tests from the same module with calculated weight -#[derive(Debug)] -struct ModuleGroup { - tests: Vec, - /// Total weight of all tests in this module - total_weight: u128, -} - -impl ModuleGroup { - fn new(tests: Vec, total_weight: u128) -> Self { - Self { - tests, - total_weight, - } - } - - fn weight(&self) -> u128 { - self.total_weight - } } /// A partition of tests assigned to a single worker #[derive(Debug)] pub struct Partition { tests: Vec, - /// Cumulative weight (duration in microseconds or 1 for unknown tests) - weight: u128, } impl Partition { fn new() -> Self { - Self { - tests: Vec::new(), - weight: 0, - } + Self { tests: Vec::new() } } - fn add_test(&mut self, test: TestInfo, test_weight: u128) { + fn add_test(&mut self, test: TestInfo) { self.tests.push(test.path); - self.weight += test_weight; } - fn weight(&self) -> u128 { - self.weight + fn len(&self) -> usize { + self.tests.len() } pub(crate) fn tests(&self) -> &[String] { @@ -70,177 +33,107 @@ impl Partition { } } -/// Partition collected tests into N groups using module-aware greedy bin-packing -/// -/// # Algorithm: Hybrid Module-Aware LPT (Longest Processing Time First) -/// -/// This implements a hybrid approach that balances load while minimizing module imports: -/// -/// 1. **Group**: Tests are grouped by module and module weights are calculated -/// 2. **Classify**: Modules are classified as "small" or "large" based on a threshold -/// 3. **Assign Small Modules**: Small modules are assigned atomically to partitions (no splitting) -/// 4. **Split Large Modules**: Large modules are split using LPT to prevent imbalance +/// Partition collected tests into N groups using module-aware greedy bin-packing. /// -/// ## Module Grouping Benefits -/// - **Reduced imports**: Tests from the same module stay together in one partition -/// - **Faster startup**: Each partition loads fewer unique modules -/// - **Shared fixtures**: Fixture setup/teardown happens once per module per partition +/// Tests from the same module stay together in one partition when the module is +/// small, so each worker imports fewer unique modules and shares fixture setup. +/// Large modules (with more tests than `total / workers / 2`) are split across +/// workers to keep partition sizes balanced. /// -/// ## Threshold Strategy -/// The split threshold is set to `(total_weight / num_workers) / 2`: -/// - Modules below this are kept together (typical case) -/// - Modules above this are split to prevent worker imbalance -/// -/// ## Complexity -/// - Time: O(n log n + m log m + n*w) where n = tests, m = modules, w = workers -/// - Space: O(n + m + w) -/// - Since m ≤ n and w is small (4-16), this is effectively O(n log n) -/// -/// ## Weighting Strategy -/// - **With historical data**: Uses actual test duration in microseconds -/// - **Without historical data**: Tests are shuffled randomly and assigned with equal weight +/// Every test is weighted equally — the runner previously persisted per-test +/// durations to weight this smarter, but the added complexity wasn't paying off. pub fn partition_collected_tests( package: &karva_collector::CollectedPackage, num_workers: usize, - previous_durations: &HashMap, last_failed: &HashSet, ) -> Vec { let mut test_infos = Vec::new(); - collect_test_paths_recursive(package, &mut test_infos, previous_durations); + collect_test_paths_recursive(package, &mut test_infos); if !last_failed.is_empty() { test_infos.retain(|info| last_failed.contains(&info.qualified_name)); } - // Shuffle tests without durations so they distribute randomly across partitions - shuffle_tests_without_durations(&mut test_infos); + shuffle_tests(&mut test_infos); - // Step 1: Group tests by module and calculate module weights let mut module_groups: HashMap> = HashMap::new(); - let mut module_weights: HashMap = HashMap::new(); - for test_info in test_infos { - let weight = test_weight(test_info.duration); - - *module_weights - .entry(test_info.module_name.clone()) - .or_default() += weight; module_groups .entry(test_info.module_name.clone()) .or_default() .push(test_info); } - // Step 2: Calculate threshold for splitting decision - let total_weight: u128 = module_weights.values().sum(); - let target_partition_weight = total_weight / num_workers.max(1) as u128; - let split_threshold = target_partition_weight / 2; - - // Step 3: Classify modules as small (keep together) or large (allow splitting) - let mut small_modules = Vec::new(); - let mut large_modules = Vec::new(); + let total_tests: usize = module_groups.values().map(Vec::len).sum(); + let split_threshold = total_tests / num_workers.max(1) / 2; - for (module_name, tests) in module_groups { - let weight = module_weights[&module_name]; - let module_group = ModuleGroup::new(tests, weight); + let mut small_modules: Vec> = Vec::new(); + let mut large_modules: Vec> = Vec::new(); - if module_group.weight() < split_threshold { - small_modules.push(module_group); + for (_, tests) in module_groups { + if tests.len() < split_threshold { + small_modules.push(tests); } else { - large_modules.push(module_group); + large_modules.push(tests); } } - // Sort small modules by weight (descending) for better bin-packing - small_modules.sort_by_key(|module| std::cmp::Reverse(module.weight())); + // Pack heaviest small modules first for a better bin-packing fit. + small_modules.sort_by_key(|tests| std::cmp::Reverse(tests.len())); let mut partitions: Vec = (0..num_workers).map(|_| Partition::new()).collect(); - // Step 4: Assign small modules atomically (entire module to one partition) - for module_group in small_modules { - let min_partition_idx = find_lightest_partition(&partitions); - for test_info in module_group.tests { - let weight = test_weight(test_info.duration); - partitions[min_partition_idx].add_test(test_info, weight); + for tests in small_modules { + let idx = find_lightest_partition(&partitions); + for test_info in tests { + partitions[idx].add_test(test_info); } } - // Step 5: Split large modules using LPT to prevent imbalance - for mut module_group in large_modules { - // Sort tests within large modules by weight (descending) - module_group.tests.sort_by(compare_test_weights); - - for test_info in module_group.tests { - let weight = test_weight(test_info.duration); - let min_partition_idx = find_lightest_partition(&partitions); - partitions[min_partition_idx].add_test(test_info, weight); + for tests in large_modules { + for test_info in tests { + let idx = find_lightest_partition(&partitions); + partitions[idx].add_test(test_info); } } partitions } -/// Finds the index of the partition with the smallest weight +/// Finds the index of the partition with the fewest tests. fn find_lightest_partition(partitions: &[Partition]) -> usize { partitions .iter() .enumerate() - .min_by_key(|(_, partition)| partition.weight()) + .min_by_key(|(_, partition)| partition.len()) .map_or(0, |(idx, _)| idx) } -/// Compares two tests by duration descending; tests without durations are considered equal -fn compare_test_weights(a: &TestInfo, b: &TestInfo) -> std::cmp::Ordering { - match (&a.duration, &b.duration) { - (Some(dur_a), Some(dur_b)) => dur_b.cmp(dur_a), - (None, None) => std::cmp::Ordering::Equal, - (None, _) => std::cmp::Ordering::Greater, - (_, None) => std::cmp::Ordering::Less, - } -} - -/// Shuffles only the tests that have no historical duration data. -/// -/// This ensures tests without timing info are randomly distributed across partitions -/// rather than always landing in the same order. -fn shuffle_tests_without_durations(test_infos: &mut [TestInfo]) { - let no_duration_indices: Vec = test_infos - .iter() - .enumerate() - .filter(|(_, t)| t.duration.is_none()) - .map(|(i, _)| i) - .collect(); - - // Fisher-Yates shuffle on the indices - for i in (1..no_duration_indices.len()).rev() { +/// Shuffles tests so they distribute randomly across partitions rather than +/// always landing in discovery order. +fn shuffle_tests(test_infos: &mut [TestInfo]) { + for i in (1..test_infos.len()).rev() { let j = fastrand::usize(..=i); - let idx_a = no_duration_indices[i]; - let idx_b = no_duration_indices[j]; - test_infos.swap(idx_a, idx_b); + test_infos.swap(i, j); } } -/// Recursively collects test information from a package and all its subpackages +/// Recursively collects test information from a package and all its subpackages. fn collect_test_paths_recursive( package: &karva_collector::CollectedPackage, test_infos: &mut Vec, - previous_durations: &HashMap, ) { for module in package.modules.values() { for test_fn_def in &module.test_function_defs { - let qualified_name = format!("{}::{}", module.path.module_name(), test_fn_def.name); - let duration = previous_durations.get(&qualified_name).copied(); - test_infos.push(TestInfo { module_name: module.path.module_name().to_string(), - qualified_name, + qualified_name: format!("{}::{}", module.path.module_name(), test_fn_def.name), path: format!("{}::{}", module.path.path(), test_fn_def.name), - duration, }); } } for subpackage in package.packages.values() { - collect_test_paths_recursive(subpackage, test_infos, previous_durations); + collect_test_paths_recursive(subpackage, test_infos); } } From 73468bddc056dcc338526cad200f5a777b9e31bf Mon Sep 17 00:00:00 2001 From: MatthewMckee4 Date: Wed, 15 Apr 2026 16:38:06 +0100 Subject: [PATCH 2/2] Simplify partition algorithm --- crates/karva_runner/src/partition.rs | 123 +++++++++------------------ 1 file changed, 40 insertions(+), 83 deletions(-) diff --git a/crates/karva_runner/src/partition.rs b/crates/karva_runner/src/partition.rs index a268a95e..0f3d934a 100644 --- a/crates/karva_runner/src/partition.rs +++ b/crates/karva_runner/src/partition.rs @@ -1,15 +1,6 @@ use std::collections::{HashMap, HashSet}; -/// Test metadata used for partitioning decisions -#[derive(Debug, Clone)] -struct TestInfo { - module_name: String, - /// The qualified name of the test (e.g., `test_a::test_1`), used for last-failed filtering. - qualified_name: String, - path: String, -} - -/// A partition of tests assigned to a single worker +/// A partition of tests assigned to a single worker. #[derive(Debug)] pub struct Partition { tests: Vec, @@ -20,10 +11,6 @@ impl Partition { Self { tests: Vec::new() } } - fn add_test(&mut self, test: TestInfo) { - self.tests.push(test.path); - } - fn len(&self) -> usize { self.tests.len() } @@ -33,67 +20,40 @@ impl Partition { } } -/// Partition collected tests into N groups using module-aware greedy bin-packing. -/// -/// Tests from the same module stay together in one partition when the module is -/// small, so each worker imports fewer unique modules and shares fixture setup. -/// Large modules (with more tests than `total / workers / 2`) are split across -/// workers to keep partition sizes balanced. +/// Partition collected tests into `num_workers` groups. /// -/// Every test is weighted equally — the runner previously persisted per-test -/// durations to weight this smarter, but the added complexity wasn't paying off. +/// Tests are grouped by module and modules are sorted by test count +/// (descending). Each module is then assigned whole to the lightest +/// partition, so a worker shares module-level imports and fixture setup. +/// A module larger than the per-worker fair share (`total / num_workers`) +/// would strand other workers if kept atomic, so those are split test-by-test +/// across the lightest partitions instead. pub fn partition_collected_tests( package: &karva_collector::CollectedPackage, num_workers: usize, last_failed: &HashSet, ) -> Vec { - let mut test_infos = Vec::new(); - collect_test_paths_recursive(package, &mut test_infos); - - if !last_failed.is_empty() { - test_infos.retain(|info| last_failed.contains(&info.qualified_name)); - } - - shuffle_tests(&mut test_infos); - - let mut module_groups: HashMap> = HashMap::new(); - for test_info in test_infos { - module_groups - .entry(test_info.module_name.clone()) - .or_default() - .push(test_info); - } + let mut module_groups: HashMap> = HashMap::new(); + collect_module_tests(package, &mut module_groups, last_failed); - let total_tests: usize = module_groups.values().map(Vec::len).sum(); - let split_threshold = total_tests / num_workers.max(1) / 2; + let mut modules: Vec> = module_groups.into_values().collect(); + modules.sort_by_key(|tests| std::cmp::Reverse(tests.len())); - let mut small_modules: Vec> = Vec::new(); - let mut large_modules: Vec> = Vec::new(); - - for (_, tests) in module_groups { - if tests.len() < split_threshold { - small_modules.push(tests); - } else { - large_modules.push(tests); - } - } - - // Pack heaviest small modules first for a better bin-packing fit. - small_modules.sort_by_key(|tests| std::cmp::Reverse(tests.len())); + let num_workers = num_workers.max(1); + let total_tests: usize = modules.iter().map(Vec::len).sum(); + let split_threshold = total_tests / num_workers; let mut partitions: Vec = (0..num_workers).map(|_| Partition::new()).collect(); - for tests in small_modules { - let idx = find_lightest_partition(&partitions); - for test_info in tests { - partitions[idx].add_test(test_info); - } - } - - for tests in large_modules { - for test_info in tests { - let idx = find_lightest_partition(&partitions); - partitions[idx].add_test(test_info); + for tests in modules { + if tests.len() > split_threshold { + for test in tests { + let idx = lightest_partition(&partitions); + partitions[idx].tests.push(test); + } + } else { + let idx = lightest_partition(&partitions); + partitions[idx].tests.extend(tests); } } @@ -101,7 +61,7 @@ pub fn partition_collected_tests( } /// Finds the index of the partition with the fewest tests. -fn find_lightest_partition(partitions: &[Partition]) -> usize { +fn lightest_partition(partitions: &[Partition]) -> usize { partitions .iter() .enumerate() @@ -109,31 +69,28 @@ fn find_lightest_partition(partitions: &[Partition]) -> usize { .map_or(0, |(idx, _)| idx) } -/// Shuffles tests so they distribute randomly across partitions rather than -/// always landing in discovery order. -fn shuffle_tests(test_infos: &mut [TestInfo]) { - for i in (1..test_infos.len()).rev() { - let j = fastrand::usize(..=i); - test_infos.swap(i, j); - } -} - -/// Recursively collects test information from a package and all its subpackages. -fn collect_test_paths_recursive( +/// Walk the package tree and group test paths by their containing module. +fn collect_module_tests( package: &karva_collector::CollectedPackage, - test_infos: &mut Vec, + module_groups: &mut HashMap>, + last_failed: &HashSet, ) { for module in package.modules.values() { + let module_name = module.path.module_name(); for test_fn_def in &module.test_function_defs { - test_infos.push(TestInfo { - module_name: module.path.module_name().to_string(), - qualified_name: format!("{}::{}", module.path.module_name(), test_fn_def.name), - path: format!("{}::{}", module.path.path(), test_fn_def.name), - }); + let qualified_name = format!("{module_name}::{}", test_fn_def.name); + if !last_failed.is_empty() && !last_failed.contains(&qualified_name) { + continue; + } + let path = format!("{}::{}", module.path.path(), test_fn_def.name); + module_groups + .entry(module_name.to_string()) + .or_default() + .push(path); } } for subpackage in package.packages.values() { - collect_test_paths_recursive(subpackage, test_infos); + collect_module_tests(subpackage, module_groups, last_failed); } }