diff --git a/changelog/unreleased/improve-parallel-suite-scheduling-and-correctness-checks.md b/changelog/unreleased/improve-parallel-suite-scheduling-and-correctness-checks.md new file mode 100644 index 0000000..ea929ff --- /dev/null +++ b/changelog/unreleased/improve-parallel-suite-scheduling-and-correctness-checks.md @@ -0,0 +1,14 @@ +--- +title: Improve parallel suite scheduling and correctness checks +type: bugfix +authors: + - mavam + - codex +created: 2026-02-26T10:19:21.558254Z +--- + +Parallel suites are now scheduled more reliably when running with `--jobs > 1`, so interdependent tests (for example publisher/subscriber pairs) start together sooner instead of being delayed behind large backlogs of unrelated tests. + +This update also adds an explicit correctness guard for parallel suites: you can set `suite.min_jobs` in `test.yaml` to declare the minimum job count required for valid execution. The run now fails fast if `--jobs` is lower than `suite.min_jobs`, and it also fails hard when a parallel suite cannot reserve at least `min_jobs` workers at runtime (for example under slot contention), instead of proceeding under-provisioned. + +In addition, the harness now warns when a parallel suite has more tests than available jobs, making it easier to spot cases where effective suite parallelism is capped by worker count. diff --git a/src/tenzir_test/run.py b/src/tenzir_test/run.py index 4f1fce6..e20129c 100644 --- a/src/tenzir_test/run.py +++ b/src/tenzir_test/run.py @@ -23,6 +23,7 @@ import sys import tempfile import threading +import time import typing from collections.abc import Iterable, Iterator, Mapping, Sequence from pathlib import Path @@ -376,6 +377,7 @@ class SuiteConfig: name: str mode: SuiteExecutionMode = SuiteExecutionMode.SEQUENTIAL + min_jobs: int | None = None @dataclasses.dataclass(frozen=True, slots=True) @@ -389,6 +391,7 @@ class SuiteInfo: name: str directory: Path mode: SuiteExecutionMode = SuiteExecutionMode.SEQUENTIAL + min_jobs: int | None = None @dataclasses.dataclass(slots=True) @@ -427,6 +430,11 @@ def is_valid(self) -> bool: RunnerQueueItem = TestQueueItem | SuiteQueueItem +@dataclasses.dataclass(slots=True) +class SuiteQueueState: + pending_items: int = 0 + + @dataclasses.dataclass(slots=True) class ProjectSelection: """Describe which tests to execute for a given project root.""" @@ -1650,7 +1658,15 @@ def _suite_config_from_value(value: object) -> SuiteConfig | None: return None else: return None - return SuiteConfig(name=raw_name.strip(), mode=mode) + raw_min_jobs = value.get("min_jobs") + min_jobs: int | None = None + if raw_min_jobs is not None: + if not isinstance(raw_min_jobs, int) or isinstance(raw_min_jobs, bool): + return None + if raw_min_jobs <= 0: + return None + min_jobs = raw_min_jobs + return SuiteConfig(name=raw_name.strip(), mode=mode, min_jobs=min_jobs) return None @@ -1673,12 +1689,12 @@ def _normalize_suite_value( return SuiteConfig(name=name) if isinstance(value, Mapping): - unknown_keys = {str(key) for key in value.keys()} - {"name", "mode"} + unknown_keys = {str(key) for key in value.keys()} - {"name", "mode", "min_jobs"} if unknown_keys: _raise_config_error( location, f"'suite' mapping contains unknown keys: {', '.join(sorted(unknown_keys))}; " - "allowed keys are: name, mode", + "allowed keys are: min_jobs, mode, name", line_number, ) raw_name = value.get("name") @@ -1708,7 +1724,29 @@ def _normalize_suite_value( f"Invalid value for 'suite.mode', expected string, got '{type(raw_mode).__name__}'", line_number, ) - return SuiteConfig(name=raw_name.strip(), mode=mode) + raw_min_jobs = value.get("min_jobs") + min_jobs: int | None = None + if raw_min_jobs is not None: + if not isinstance(raw_min_jobs, int) or isinstance(raw_min_jobs, bool): + _raise_config_error( + location, + ( + "Invalid value for 'suite.min_jobs', expected positive integer, " + f"got '{type(raw_min_jobs).__name__}'" + ), + line_number, + ) + if raw_min_jobs <= 0: + _raise_config_error( + location, + ( + "Invalid value for 'suite.min_jobs', expected positive integer, " + f"got '{raw_min_jobs}'" + ), + line_number, + ) + min_jobs = raw_min_jobs + return SuiteConfig(name=raw_name.strip(), mode=mode, min_jobs=min_jobs) _raise_config_error( location, @@ -2399,7 +2437,12 @@ def _resolve_suite_for_test(test: Path) -> SuiteInfo | None: except (OSError, ValueError): # Only treat as a suite when the test lives under the suite directory. return None - return SuiteInfo(name=suite_value.name, directory=resolved_dir, mode=suite_value.mode) + return SuiteInfo( + name=suite_value.name, + directory=resolved_dir, + mode=suite_value.mode, + min_jobs=suite_value.min_jobs, + ) def _clear_directory_config_cache() -> None: @@ -3231,6 +3274,80 @@ def _count_queue_tests(queue: Sequence[RunnerQueueItem]) -> int: return total +def _count_suite_queue_items(queue: Sequence[RunnerQueueItem]) -> int: + return sum( + 1 + for item in queue + if isinstance(item, SuiteQueueItem) and item.suite.mode is SuiteExecutionMode.PARALLEL + ) + + +def _iter_parallel_suite_queue_items(queue: Sequence[RunnerQueueItem]) -> Iterator[SuiteQueueItem]: + for item in queue: + if isinstance(item, SuiteQueueItem) and item.suite.mode is SuiteExecutionMode.PARALLEL: + yield item + + +def _validate_parallel_suite_min_jobs(queue: Sequence[RunnerQueueItem], *, jobs: int) -> None: + violations: list[str] = [] + for suite_item in _iter_parallel_suite_queue_items(queue): + min_jobs = suite_item.suite.min_jobs + if min_jobs is None or jobs >= min_jobs: + continue + suite_dir = _relativize_path(suite_item.suite.directory / _CONFIG_FILE_NAME) + violations.append( + ( + f"'{suite_item.suite.name}' ({suite_dir}) requires at least --jobs={min_jobs} " + f"for correctness, got --jobs={jobs}" + ) + ) + if violations: + detail = "; ".join(violations) + raise HarnessError(f"parallel suite job requirements not met: {detail}") + + +def _warn_parallel_suite_oversubscription(queue: Sequence[RunnerQueueItem], *, jobs: int) -> None: + for suite_item in _iter_parallel_suite_queue_items(queue): + suite_size = len(suite_item.tests) + if suite_size <= jobs: + continue + suite_dir = _relativize_path(suite_item.suite.directory / _CONFIG_FILE_NAME) + print( + ( + f"{INFO} warning: parallel suite '{suite_item.suite.name}' in {suite_dir} has " + f"{suite_size} tests but --jobs={jobs}; not all members can run at once" + ) + ) + + +def _pop_next_queue_item( + queue: list[RunnerQueueItem], + *, + queue_lock: threading.Lock, + suite_state: SuiteQueueState | None = None, +) -> RunnerQueueItem | None: + while True: + with queue_lock: + if not queue: + return None + if suite_state is None or suite_state.pending_items <= 0: + return queue.pop() + if ( + isinstance(queue[-1], SuiteQueueItem) + and queue[-1].suite.mode is SuiteExecutionMode.PARALLEL + ): + return queue.pop() + for index in range(len(queue) - 1, -1, -1): + item = queue[index] + if not isinstance(item, SuiteQueueItem): + continue + if item.suite.mode is SuiteExecutionMode.PARALLEL: + return queue.pop(index) + if interrupt_requested(): + return None + time.sleep(0.01) + + def _print_aggregate_totals(project_count: int, summary: Summary) -> None: total = summary.total failed = summary.failed @@ -4221,6 +4338,13 @@ def _log_suite_event( class Worker: + # Default locks are shared so multiple workers created with shared resources + # remain synchronized even when callers omit explicit lock arguments. + _DEFAULT_SLOT_LOCK = threading.Lock() + _DEFAULT_QUEUE_LOCK = threading.Lock() + _DEFAULT_SUITE_STATE_LOCK = threading.Lock() + _DEFAULT_SUITE_QUEUE_STATES: dict[int, tuple[SuiteQueueState, int]] = {} + def __init__( self, queue: list[RunnerQueueItem], @@ -4232,6 +4356,9 @@ def __init__( run_skipped_selector: RunSkippedSelector | None = None, jobs: int | None = None, test_slots: threading.Semaphore | None = None, + slot_lock: threading.Lock | None = None, + queue_lock: threading.Lock | None = None, + suite_queue_state: SuiteQueueState | None = None, ) -> None: self._queue = queue self._result: Summary | None = None @@ -4243,10 +4370,22 @@ def __init__( self._run_skipped_selector = run_skipped_selector or RunSkippedSelector() self._jobs = max(1, jobs if jobs is not None else get_default_jobs()) self._test_slots = test_slots or threading.BoundedSemaphore(self._jobs) + self._slot_lock = slot_lock or self._DEFAULT_SLOT_LOCK + self._queue_lock = queue_lock or self._DEFAULT_QUEUE_LOCK + self._default_suite_state_queue_id: int | None = None + if suite_queue_state is None: + self._suite_queue_state = self._acquire_default_suite_queue_state(queue) + else: + self._suite_queue_state = suite_queue_state self._run_skipped_match_count = 0 self._run_skipped_match_count_lock = threading.Lock() self._thread = threading.Thread(target=self._work) + def __del__(self) -> None: + # Ensure default suite-queue state references are released even when + # a Worker is constructed but never started. + self._release_default_suite_queue_state() + def start(self) -> None: self._thread.start() @@ -4268,6 +4407,80 @@ def _increment_run_skipped_match_count(self, amount: int = 1) -> None: with self._run_skipped_match_count_lock: self._run_skipped_match_count += amount + def _acquire_default_suite_queue_state(self, queue: list[RunnerQueueItem]) -> SuiteQueueState: + queue_id = id(queue) + with self._DEFAULT_SUITE_STATE_LOCK: + entry = self._DEFAULT_SUITE_QUEUE_STATES.get(queue_id) + if entry is None: + state = SuiteQueueState(pending_items=_count_suite_queue_items(queue)) + references = 1 + else: + state, references = entry + references += 1 + self._DEFAULT_SUITE_QUEUE_STATES[queue_id] = (state, references) + self._default_suite_state_queue_id = queue_id + return state + + def _release_default_suite_queue_state(self) -> None: + queue_id = getattr(self, "_default_suite_state_queue_id", None) + if queue_id is None: + return + with self._DEFAULT_SUITE_STATE_LOCK: + entry = self._DEFAULT_SUITE_QUEUE_STATES.get(queue_id) + if entry is None: + self._default_suite_state_queue_id = None + return + state, references = entry + if references <= 1: + self._DEFAULT_SUITE_QUEUE_STATES.pop(queue_id, None) + else: + self._DEFAULT_SUITE_QUEUE_STATES[queue_id] = (state, references - 1) + self._default_suite_state_queue_id = None + + def _acquire_test_slots(self, slots: int) -> bool: + if slots <= 0: + return True + acquired = 0 + with self._slot_lock: + while acquired < slots: + if self._test_slots.acquire(timeout=0.1): + acquired += 1 + continue + if interrupt_requested(): + self._release_test_slots(acquired) + return False + return True + + def _release_test_slots(self, slots: int) -> None: + if slots <= 0: + return + for _ in range(slots): + self._test_slots.release() + + def _acquire_suite_test_slots(self, max_slots: int, *, min_slots: int = 1) -> int: + """Reserve up to `max_slots` slots for a parallel suite execution.""" + + if max_slots <= 0: + return 0 + required = max(1, min(min_slots, max_slots)) + while True: + acquired = 0 + with self._slot_lock: + while acquired == 0: + if self._test_slots.acquire(timeout=0.1): + acquired = 1 + break + if interrupt_requested(): + return 0 + while acquired < max_slots and self._test_slots.acquire(blocking=False): + acquired += 1 + if acquired >= required: + return acquired + self._release_test_slots(acquired) + if interrupt_requested(): + return 0 + time.sleep(0.01) + def _record_static_skip( self, *, @@ -4387,9 +4600,12 @@ def _work(self) -> Summary: while True: if interrupt_requested(): break - try: - queue_item = self._queue.pop() - except IndexError: + queue_item = _pop_next_queue_item( + self._queue, + queue_lock=self._queue_lock, + suite_state=self._suite_queue_state, + ) + if queue_item is None: break if isinstance(queue_item, SuiteQueueItem): @@ -4404,6 +4620,8 @@ def _work(self) -> Summary: if self._result is None: self._result = Summary() return self._result + finally: + self._release_default_suite_queue_state() def _run_test_item_with_slot( self, @@ -4414,9 +4632,8 @@ def _run_test_item_with_slot( suite_fixtures: tuple[fixtures_impl.FixtureSpec, ...] | None = None, suite_assertion_lock: threading.Lock | None = None, ) -> bool: - while not self._test_slots.acquire(timeout=0.1): - if interrupt_requested(): - return True + if not self._acquire_test_slots(1): + return True try: return self._run_test_item( test_item, @@ -4426,7 +4643,7 @@ def _run_test_item_with_slot( suite_assertion_lock=suite_assertion_lock, ) finally: - self._test_slots.release() + self._release_test_slots(1) def _run_suite_sequential( self, @@ -4459,16 +4676,27 @@ def _run_suite_member( suite_progress: tuple[str, int, int], suite_fixtures: tuple[fixtures_impl.FixtureSpec, ...], suite_assertion_lock: threading.Lock | None, + acquire_slot: bool = True, ) -> tuple[Summary, bool]: local_summary = Summary() - interrupted = run_context.run( - self._run_test_item_with_slot, - test_item, - local_summary, - suite_progress=suite_progress, - suite_fixtures=suite_fixtures, - suite_assertion_lock=suite_assertion_lock, - ) + if acquire_slot: + interrupted = run_context.run( + self._run_test_item_with_slot, + test_item, + local_summary, + suite_progress=suite_progress, + suite_fixtures=suite_fixtures, + suite_assertion_lock=suite_assertion_lock, + ) + else: + interrupted = run_context.run( + self._run_test_item, + test_item, + local_summary, + suite_progress=suite_progress, + suite_fixtures=suite_fixtures, + suite_assertion_lock=suite_assertion_lock, + ) return local_summary, interrupted def _run_suite_parallel( @@ -4477,185 +4705,239 @@ def _run_suite_parallel( suite_item: SuiteQueueItem, tests: Sequence[TestQueueItem], summary: Summary, + release_suite_priority: typing.Callable[[], None], ) -> bool: total = len(tests) - max_workers = min(max(1, total), self._jobs) + requested_workers = min(max(1, total), self._jobs) + min_jobs = suite_item.suite.min_jobs + required_workers = min_jobs if min_jobs is not None else 1 + reserved_workers = self._acquire_suite_test_slots( + requested_workers, + min_slots=required_workers, + ) + release_suite_priority() + if reserved_workers <= 0: + return True + if min_jobs is not None and reserved_workers < min_jobs: + self._release_test_slots(reserved_workers) + suite_dir = _relativize_path(suite_item.suite.directory / _CONFIG_FILE_NAME) + raise HarnessError( + ( + f"parallel suite '{suite_item.suite.name}' in {suite_dir} " + f"requires at least {min_jobs} concurrent workers for correctness, " + f"but only {reserved_workers} are currently available" + ) + ) suite_assertion_lock = threading.Lock() suite_summary = Summary() interrupted = False futures: list[concurrent.futures.Future[tuple[Summary, bool]]] = [] - with concurrent.futures.ThreadPoolExecutor( - max_workers=max_workers, - thread_name_prefix="suite", - ) as executor: - for index, test_item in enumerate(tests, start=1): - if interrupt_requested(): - interrupted = True - break - run_context = contextvars.copy_context() - futures.append( - executor.submit( - self._run_suite_member, - run_context=run_context, - test_item=test_item, - suite_progress=(suite_item.suite.name, index, total), - suite_fixtures=suite_item.fixtures, - suite_assertion_lock=suite_assertion_lock, + try: + with concurrent.futures.ThreadPoolExecutor( + max_workers=reserved_workers, + thread_name_prefix="suite", + ) as executor: + for index, test_item in enumerate(tests, start=1): + if interrupt_requested(): + interrupted = True + break + run_context = contextvars.copy_context() + futures.append( + executor.submit( + self._run_suite_member, + run_context=run_context, + test_item=test_item, + suite_progress=(suite_item.suite.name, index, total), + suite_fixtures=suite_item.fixtures, + suite_assertion_lock=suite_assertion_lock, + acquire_slot=False, + ) ) - ) - if interrupt_requested(): - interrupted = True - for pending in futures: - if pending.done(): - continue - pending.cancel() - for future in concurrent.futures.as_completed(futures): - if future.cancelled(): - continue - member_summary, member_interrupted = future.result() - _merge_summary_inplace(suite_summary, member_summary) - if member_interrupted: + if interrupt_requested(): interrupted = True - _request_interrupt() for pending in futures: if pending.done(): continue pending.cancel() + for future in concurrent.futures.as_completed(futures): + if future.cancelled(): + continue + member_summary, member_interrupted = future.result() + _merge_summary_inplace(suite_summary, member_summary) + if member_interrupted: + interrupted = True + _request_interrupt() + for pending in futures: + if pending.done(): + continue + pending.cancel() + finally: + self._release_test_slots(reserved_workers) _merge_summary_inplace(summary, suite_summary) return interrupted def _run_suite(self, suite_item: SuiteQueueItem, summary: Summary) -> None: - tests = suite_item.tests - total = len(tests) - if total == 0: - return - primary_test = tests[0].path - try: - primary_config = parse_test_config(primary_test, coverage=self._coverage) - except ValueError as exc: - raise RuntimeError(f"failed to parse suite config for {primary_test}: {exc}") from exc - # Avoid activating suite fixtures when every suite member is statically skipped. - static_skip_plan = self._suite_static_skip_plan(suite_item) - if static_skip_plan is not None: - for test_item, reason in static_skip_plan: - self._record_static_skip( - test_item=test_item, - fixtures=suite_item.fixtures, - reason=reason, - summary=summary, - ) - return - inputs_override = typing.cast(str | None, primary_config.get("inputs")) - env, config_args = get_test_env_and_config_args(primary_test, inputs=inputs_override) - config_package_dirs = cast(tuple[str, ...], primary_config.get("package_dirs", tuple())) - additional_package_dirs: list[str] = [] - for entry in config_package_dirs: - additional_package_dirs.extend(_expand_package_dirs(Path(entry))) - package_root = packages.find_package_root(primary_test) - package_dir_candidates: list[str] = [] - if package_root is not None: - env["TENZIR_PACKAGE_ROOT"] = str(package_root) - if inputs_override is None: - # Try nearest inputs/ directory, fall back to package-level - nearest = _find_nearest_inputs_dir(primary_test, package_root) - if nearest is not None: - env["TENZIR_INPUTS"] = str(nearest.resolve()) - else: - env["TENZIR_INPUTS"] = str((package_root / "tests" / "inputs")) - package_dir_candidates.append(str(package_root)) - package_dir_candidates.extend(additional_package_dirs) - for cli_path in _get_cli_packages(): - package_dir_candidates.extend(_expand_package_dirs(cli_path)) - if package_dir_candidates: - merged_dirs = _deduplicate_package_dirs(package_dir_candidates) - env["TENZIR_PACKAGE_DIRS"] = ",".join(merged_dirs) - config_args = list(config_args) + [f"--package-dirs={','.join(merged_dirs)}"] - skip_cfg = cast(SkipConfig | None, primary_config.get("skip")) - requires_cfg = cast(RequiresConfig | None, primary_config.get("requires")) - if isinstance(requires_cfg, RequiresConfig) and requires_cfg.has_requirements: - missing_requirements = self._evaluate_suite_requirements( - suite_item=suite_item, - tests=tests, - requires=requires_cfg, - env=env, - config_args=config_args, - ) - if missing_requirements is not None: - configured_reason = skip_cfg.reason if isinstance(skip_cfg, SkipConfig) else None - reason = _compose_skip_reason( - configured_reason, - missing_requirements, - fallback=missing_requirements, - ) - displayed_reason = f"capability unavailable: {reason}" - allows_capability_skip = isinstance( - skip_cfg, SkipConfig - ) and skip_cfg.allows_condition(SKIP_ON_CAPABILITY_UNAVAILABLE) - if allows_capability_skip: - if not self._skip_suite_with_reason( - suite_item=suite_item, - tests=tests, - displayed_reason=displayed_reason, - summary=summary, - ): - raise HarnessError(displayed_reason) - return - raise HarnessError(displayed_reason) - suite_fixture_options = _build_fixture_options(suite_item.fixtures) - context_token = fixtures_impl.push_context( - fixtures_impl.FixtureContext( - test=primary_test, - config=typing.cast(dict[str, Any], primary_config), - coverage=self._coverage, - env=env, - config_args=tuple(config_args), - tenzir_binary=TENZIR_BINARY, - tenzir_node_binary=TENZIR_NODE_BINARY, - fixture_options=suite_fixture_options, - ) - ) - _log_suite_event(suite_item.suite, event="setup", total=total) - interrupted = False + suite_priority_released = False + + def release_suite_priority() -> None: + nonlocal suite_priority_released + if suite_priority_released: + return + with self._queue_lock: + if self._suite_queue_state.pending_items > 0: + self._suite_queue_state.pending_items -= 1 + suite_priority_released = True + try: - with fixtures_impl.suite_scope(suite_item.fixtures): - if suite_item.suite.mode is SuiteExecutionMode.PARALLEL: - interrupted = self._run_suite_parallel( - suite_item=suite_item, - tests=tests, - summary=summary, + tests = suite_item.tests + total = len(tests) + if total == 0: + return + if suite_item.suite.mode is SuiteExecutionMode.PARALLEL: + min_jobs = suite_item.suite.min_jobs + if min_jobs is not None and self._jobs < min_jobs: + suite_dir = _relativize_path(suite_item.suite.directory / _CONFIG_FILE_NAME) + raise HarnessError( + ( + f"parallel suite '{suite_item.suite.name}' in {suite_dir} " + f"requires at least --jobs={min_jobs} for correctness, got --jobs={self._jobs}" + ) ) - else: - interrupted = self._run_suite_sequential( - suite_item=suite_item, - tests=tests, + primary_test = tests[0].path + try: + primary_config = parse_test_config(primary_test, coverage=self._coverage) + except ValueError as exc: + raise RuntimeError( + f"failed to parse suite config for {primary_test}: {exc}" + ) from exc + # Avoid activating suite fixtures when every suite member is statically skipped. + static_skip_plan = self._suite_static_skip_plan(suite_item) + if static_skip_plan is not None: + for test_item, reason in static_skip_plan: + self._record_static_skip( + test_item=test_item, + fixtures=suite_item.fixtures, + reason=reason, summary=summary, ) - except fixtures_impl.FixtureUnavailable as exc: - if not ( - isinstance(skip_cfg, SkipConfig) - and skip_cfg.allows_condition(SKIP_ON_FIXTURE_UNAVAILABLE) - ): - raise - reason = _compose_skip_reason(skip_cfg.reason, str(exc)) - displayed_reason = f"fixture unavailable: {reason}" - if not self._skip_suite_with_reason( - suite_item=suite_item, - tests=tests, - displayed_reason=displayed_reason, - summary=summary, - ): - raise - _CLI_LOGGER.warning( - "fixture unavailable for suite '%s': %s", - suite_item.suite.name, - reason, + return + inputs_override = typing.cast(str | None, primary_config.get("inputs")) + env, config_args = get_test_env_and_config_args(primary_test, inputs=inputs_override) + config_package_dirs = cast(tuple[str, ...], primary_config.get("package_dirs", tuple())) + additional_package_dirs: list[str] = [] + for entry in config_package_dirs: + additional_package_dirs.extend(_expand_package_dirs(Path(entry))) + package_root = packages.find_package_root(primary_test) + package_dir_candidates: list[str] = [] + if package_root is not None: + env["TENZIR_PACKAGE_ROOT"] = str(package_root) + if inputs_override is None: + # Try nearest inputs/ directory, fall back to package-level + nearest = _find_nearest_inputs_dir(primary_test, package_root) + if nearest is not None: + env["TENZIR_INPUTS"] = str(nearest.resolve()) + else: + env["TENZIR_INPUTS"] = str((package_root / "tests" / "inputs")) + package_dir_candidates.append(str(package_root)) + package_dir_candidates.extend(additional_package_dirs) + for cli_path in _get_cli_packages(): + package_dir_candidates.extend(_expand_package_dirs(cli_path)) + if package_dir_candidates: + merged_dirs = _deduplicate_package_dirs(package_dir_candidates) + env["TENZIR_PACKAGE_DIRS"] = ",".join(merged_dirs) + config_args = list(config_args) + [f"--package-dirs={','.join(merged_dirs)}"] + skip_cfg = cast(SkipConfig | None, primary_config.get("skip")) + requires_cfg = cast(RequiresConfig | None, primary_config.get("requires")) + if isinstance(requires_cfg, RequiresConfig) and requires_cfg.has_requirements: + missing_requirements = self._evaluate_suite_requirements( + suite_item=suite_item, + tests=tests, + requires=requires_cfg, + env=env, + config_args=config_args, + ) + if missing_requirements is not None: + configured_reason = ( + skip_cfg.reason if isinstance(skip_cfg, SkipConfig) else None + ) + reason = _compose_skip_reason( + configured_reason, + missing_requirements, + fallback=missing_requirements, + ) + displayed_reason = f"capability unavailable: {reason}" + allows_capability_skip = isinstance( + skip_cfg, SkipConfig + ) and skip_cfg.allows_condition(SKIP_ON_CAPABILITY_UNAVAILABLE) + if allows_capability_skip: + if not self._skip_suite_with_reason( + suite_item=suite_item, + tests=tests, + displayed_reason=displayed_reason, + summary=summary, + ): + raise HarnessError(displayed_reason) + return + raise HarnessError(displayed_reason) + suite_fixture_options = _build_fixture_options(suite_item.fixtures) + context_token = fixtures_impl.push_context( + fixtures_impl.FixtureContext( + test=primary_test, + config=typing.cast(dict[str, Any], primary_config), + coverage=self._coverage, + env=env, + config_args=tuple(config_args), + tenzir_binary=TENZIR_BINARY, + tenzir_node_binary=TENZIR_NODE_BINARY, + fixture_options=suite_fixture_options, + ) ) + _log_suite_event(suite_item.suite, event="setup", total=total) + interrupted = False + try: + with fixtures_impl.suite_scope(suite_item.fixtures): + if suite_item.suite.mode is SuiteExecutionMode.PARALLEL: + interrupted = self._run_suite_parallel( + suite_item=suite_item, + tests=tests, + summary=summary, + release_suite_priority=release_suite_priority, + ) + else: + release_suite_priority() + interrupted = self._run_suite_sequential( + suite_item=suite_item, + tests=tests, + summary=summary, + ) + except fixtures_impl.FixtureUnavailable as exc: + if not ( + isinstance(skip_cfg, SkipConfig) + and skip_cfg.allows_condition(SKIP_ON_FIXTURE_UNAVAILABLE) + ): + raise + reason = _compose_skip_reason(skip_cfg.reason, str(exc)) + displayed_reason = f"fixture unavailable: {reason}" + if not self._skip_suite_with_reason( + suite_item=suite_item, + tests=tests, + displayed_reason=displayed_reason, + summary=summary, + ): + raise + _CLI_LOGGER.warning( + "fixture unavailable for suite '%s': %s", + suite_item.suite.name, + reason, + ) + finally: + _log_suite_event(suite_item.suite, event="teardown", total=total) + fixtures_impl.pop_context(context_token) + cleanup_test_tmp_dir(env.get(TEST_TMP_ENV_VAR)) + if interrupted: + _request_interrupt() finally: - _log_suite_event(suite_item.suite, event="teardown", total=total) - fixtures_impl.pop_context(context_token) - cleanup_test_tmp_dir(env.get(TEST_TMP_ENV_VAR)) - if interrupted: - _request_interrupt() + release_suite_priority() def _run_test_item( self, @@ -5371,6 +5653,8 @@ def run_cli( queue = _build_queue_from_paths(collected_paths, coverage=coverage) queue.sort(key=_queue_sort_key, reverse=True) + _validate_parallel_suite_min_jobs(queue, jobs=jobs) + _warn_parallel_suite_oversubscription(queue, jobs=jobs) project_queue_size = _count_queue_tests(queue) project_summary = Summary() job_count, enabled_flags, verb = _summarize_harness_configuration( @@ -5437,6 +5721,9 @@ def run_cli( printed_projects += 1 test_slots = threading.BoundedSemaphore(max(1, jobs)) + slot_lock = threading.Lock() + queue_lock = threading.Lock() + suite_queue_state = SuiteQueueState(pending_items=_count_suite_queue_items(queue)) workers = [ Worker( queue, @@ -5447,6 +5734,9 @@ def run_cli( run_skipped_selector=run_skipped_selector, jobs=jobs, test_slots=test_slots, + slot_lock=slot_lock, + queue_lock=queue_lock, + suite_queue_state=suite_queue_state, ) for _ in range(jobs) ] diff --git a/tests/test_python_runner.py b/tests/test_python_runner.py index 54fea7d..6ee7138 100644 --- a/tests/test_python_runner.py +++ b/tests/test_python_runner.py @@ -72,7 +72,7 @@ def test_jsonify_config_converts_skip_config() -> None: converted = _jsonify_config(config_payload) assert converted["fixtures"] == ["sink", "mysql"] - assert converted["suite"] == {"name": "meta", "mode": "sequential"} + assert converted["suite"] == {"name": "meta", "mode": "sequential", "min_jobs": None} assert converted["modes"] == ["parallel"] assert converted["skip"] == { "reason": None, @@ -237,6 +237,7 @@ def fake_run(cmd, timeout, stdout, stderr, check, env, text=None, **kwargs): # assert payload["config"]["suite"] == { "name": "fixture-suite", "mode": suite_mode, + "min_jobs": None, } return _DummyCompleted(stdout=b"payload", stderr=b"") diff --git a/tests/test_run.py b/tests/test_run.py index 9acedfd..a792883 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -729,6 +729,177 @@ def __exit__(self, exc_type, exc, tb) -> bool: run.apply_settings(original_settings) +def test_worker_parallel_suite_min_jobs_guard_raises_when_underprovisioned( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + original_settings = config.Settings( + root=run.ROOT, + tenzir_binary=run.TENZIR_BINARY, + tenzir_node_binary=run.TENZIR_NODE_BINARY, + ) + run.apply_settings( + config.Settings( + root=tmp_path, + tenzir_binary=run.TENZIR_BINARY, + tenzir_node_binary=run.TENZIR_NODE_BINARY, + ) + ) + suite_dir = tmp_path / "tests" / "parallel" + suite_dir.mkdir(parents=True) + (suite_dir / "test.yaml").write_text( + "suite:\n name: parallel\n mode: parallel\n min_jobs: 2\n", + encoding="utf-8", + ) + test_file = suite_dir / "01-case.tql" + test_file.write_text("version\nwrite_json\n", encoding="utf-8") + run._clear_directory_config_cache() + + class AlwaysPassRunner(Runner): + def __init__(self) -> None: + super().__init__(name="always-pass") + + def collect_tests(self, path: Path) -> set[tuple[Runner, Path]]: + if path.is_file(): + return {(self, path)} + return set() + + def purge(self) -> None: + return + + def run(self, test: Path, update: bool, coverage: bool = False) -> bool: # noqa: ARG002 + return True + + runner = AlwaysPassRunner() + monkeypatch.setattr(run, "get_runner_for_test", lambda path: runner) + + try: + queue = run._build_queue_from_paths([test_file], coverage=False) + worker = run.Worker(queue, update=False, coverage=False, jobs=1) + worker.start() + with pytest.raises(run.HarnessError, match="requires at least --jobs=2"): + worker.join() + finally: + run._clear_directory_config_cache() + run.apply_settings(original_settings) + + +def test_worker_parallel_suite_min_jobs_waits_for_required_reserved_workers( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + original_settings = config.Settings( + root=run.ROOT, + tenzir_binary=run.TENZIR_BINARY, + tenzir_node_binary=run.TENZIR_NODE_BINARY, + ) + run.apply_settings( + config.Settings( + root=tmp_path, + tenzir_binary=run.TENZIR_BINARY, + tenzir_node_binary=run.TENZIR_NODE_BINARY, + ) + ) + suite_dir = tmp_path / "tests" / "parallel" + suite_dir.mkdir(parents=True) + (suite_dir / "test.yaml").write_text( + "suite:\n name: parallel\n mode: parallel\n min_jobs: 2\n", + encoding="utf-8", + ) + tests: list[Path] = [] + for index in range(1, 3): + test_path = suite_dir / f"{index:02d}.tql" + test_path.write_text("version\nwrite_json\n", encoding="utf-8") + tests.append(test_path) + run._clear_directory_config_cache() + + class AlwaysPassRunner(Runner): + def __init__(self) -> None: + super().__init__(name="always-pass") + + def collect_tests(self, path: Path) -> set[tuple[Runner, Path]]: + if path.is_file(): + return {(self, path)} + return set() + + def purge(self) -> None: + return + + def run(self, test: Path, update: bool, coverage: bool = False) -> bool: # noqa: ARG002 + return True + + runner = AlwaysPassRunner() + monkeypatch.setattr(run, "get_runner_for_test", lambda path: runner) + + reserved_slot = threading.BoundedSemaphore(2) + acquired = reserved_slot.acquire(timeout=0.1) + assert acquired + delayed_release = threading.Event() + + def _release_slot_later() -> None: + if not delayed_release.wait(timeout=1.0): + return + reserved_slot.release() + + release_thread = threading.Thread(target=_release_slot_later) + release_thread.start() + try: + queue = run._build_queue_from_paths(tests, coverage=False) + assert len(queue) == 1 + suite_item = queue[0] + assert isinstance(suite_item, run.SuiteQueueItem) + assert suite_item.suite.mode is run.SuiteExecutionMode.PARALLEL + + worker = run.Worker( + queue, + update=False, + coverage=False, + jobs=2, + test_slots=reserved_slot, + ) + worker.start() + delayed_release.set() + summary = worker.join() + assert summary.total == 2 + assert summary.failed == 0 + finally: + delayed_release.set() + release_thread.join(timeout=1.0) + run._clear_directory_config_cache() + run._INTERRUPT_EVENT.clear() + run._INTERRUPT_ANNOUNCED.clear() + run.apply_settings(original_settings) + + +def test_acquire_suite_test_slots_waits_for_required_minimum() -> None: + queue: list[run.RunnerQueueItem] = [] + shared_slots = threading.BoundedSemaphore(2) + worker = run.Worker( + queue, + update=False, + coverage=False, + jobs=2, + test_slots=shared_slots, + ) + acquired = shared_slots.acquire(timeout=0.1) + assert acquired + + reserved: dict[str, int] = {"slots": 0} + + def reserve_slots() -> None: + reserved["slots"] = worker._acquire_suite_test_slots(2, min_slots=2) + + thread = threading.Thread(target=reserve_slots) + thread.start() + time.sleep(0.05) + assert thread.is_alive() + + shared_slots.release() + thread.join(timeout=1.0) + assert not thread.is_alive() + assert reserved["slots"] == 2 + + worker._release_test_slots(reserved["slots"]) + + def test_worker_parallel_suites_honor_shared_jobs_budget( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> None: @@ -787,19 +958,551 @@ def run(self, test: Path, update: bool, coverage: bool = False) -> bool: # noqa observed["active"] -= 1 return True - runner = SlowRunner() - monkeypatch.setattr(run, "get_runner_for_test", lambda path: runner) + runner = SlowRunner() + monkeypatch.setattr(run, "get_runner_for_test", lambda path: runner) + + try: + queue = run._build_queue_from_paths(tests, coverage=False) + queue.sort(key=run._queue_sort_key, reverse=True) + assert len(queue) == 2 + for item in queue: + assert isinstance(item, run.SuiteQueueItem) + assert item.suite.mode is run.SuiteExecutionMode.PARALLEL + + jobs = 2 + test_slots = threading.BoundedSemaphore(jobs) + slot_lock = threading.Lock() + queue_lock = threading.Lock() + suite_queue_state = run.SuiteQueueState(pending_items=run._count_suite_queue_items(queue)) + workers = [ + run.Worker( + queue, + update=False, + coverage=False, + jobs=jobs, + test_slots=test_slots, + slot_lock=slot_lock, + queue_lock=queue_lock, + suite_queue_state=suite_queue_state, + ) + for _ in range(jobs) + ] + for worker in workers: + worker.start() + summary = run.Summary() + for worker in workers: + summary += worker.join() + + assert summary.total == 6 + assert summary.failed == 0 + assert observed["max_active"] <= jobs + finally: + run._clear_directory_config_cache() + run._INTERRUPT_EVENT.clear() + run._INTERRUPT_ANNOUNCED.clear() + run.apply_settings(original_settings) + + +def test_worker_parallel_suite_caps_reservation_by_available_slots( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + original_settings = config.Settings( + root=run.ROOT, + tenzir_binary=run.TENZIR_BINARY, + tenzir_node_binary=run.TENZIR_NODE_BINARY, + ) + run.apply_settings( + config.Settings( + root=tmp_path, + tenzir_binary=run.TENZIR_BINARY, + tenzir_node_binary=run.TENZIR_NODE_BINARY, + ) + ) + suite_dir = tmp_path / "tests" / "parallel" + suite_dir.mkdir(parents=True) + (suite_dir / "test.yaml").write_text( + "suite:\n name: parallel\n mode: parallel\n", + encoding="utf-8", + ) + tests: list[Path] = [] + for index in range(1, 4): + test_path = suite_dir / f"{index:02d}.tql" + test_path.write_text("version\nwrite_json\n", encoding="utf-8") + tests.append(test_path) + run._clear_directory_config_cache() + + observed = {"active": 0, "max_active": 0} + observed_lock = threading.Lock() + + class SlowRunner(Runner): + def __init__(self) -> None: + super().__init__(name="slow") + + def collect_tests(self, path: Path) -> set[tuple[Runner, Path]]: # noqa: ARG002 + return set() + + def purge(self) -> None: + return None + + def run(self, test: Path, update: bool, coverage: bool = False) -> bool: # noqa: ARG002 + with observed_lock: + observed["active"] += 1 + observed["max_active"] = max(observed["max_active"], observed["active"]) + time.sleep(0.05) + with observed_lock: + observed["active"] -= 1 + return True + + runner = SlowRunner() + monkeypatch.setattr(run, "get_runner_for_test", lambda path: runner) + + worker: run.Worker | None = None + try: + queue = run._build_queue_from_paths(tests, coverage=False) + assert len(queue) == 1 + assert isinstance(queue[0], run.SuiteQueueItem) + assert queue[0].suite.mode is run.SuiteExecutionMode.PARALLEL + + worker = run.Worker( + queue, + update=False, + coverage=False, + jobs=2, + test_slots=threading.BoundedSemaphore(1), + ) + worker.start() + worker._thread.join(timeout=2.0) + if worker._thread.is_alive(): + run._request_interrupt() + worker._thread.join(timeout=2.0) + assert not worker._thread.is_alive(), "worker deadlocked reserving suite test slots" + + summary = worker.join() + assert summary.total == len(tests) + assert summary.failed == 0 + assert observed["max_active"] == 1 + finally: + if worker is not None and worker._thread.is_alive(): + run._request_interrupt() + worker._thread.join(timeout=2.0) + run._clear_directory_config_cache() + run._INTERRUPT_EVENT.clear() + run._INTERRUPT_ANNOUNCED.clear() + run.apply_settings(original_settings) + + +def test_worker_defaults_share_sync_locks_for_shared_resources() -> None: + queue: list[run.RunnerQueueItem] = [] + test_slots = threading.BoundedSemaphore(2) + first = run.Worker(queue, update=False, coverage=False, jobs=2, test_slots=test_slots) + second = run.Worker(queue, update=False, coverage=False, jobs=2, test_slots=test_slots) + + assert first._queue_lock is second._queue_lock + assert first._slot_lock is second._slot_lock + assert first._suite_queue_state is second._suite_queue_state + + +def test_workers_with_default_suite_state_do_not_stall_on_remaining_solo_tests( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + original_settings = config.Settings( + root=run.ROOT, + tenzir_binary=run.TENZIR_BINARY, + tenzir_node_binary=run.TENZIR_NODE_BINARY, + ) + run.apply_settings( + config.Settings( + root=tmp_path, + tenzir_binary=run.TENZIR_BINARY, + tenzir_node_binary=run.TENZIR_NODE_BINARY, + ) + ) + tests_dir = tmp_path / "tests" + parallel_dir = tests_dir / "parallel" + solo_dir = tests_dir / "solo" + parallel_dir.mkdir(parents=True) + solo_dir.mkdir(parents=True) + (parallel_dir / "test.yaml").write_text( + "suite:\n name: parallel\n mode: parallel\n", + encoding="utf-8", + ) + parallel_tests = [parallel_dir / "01.tql", parallel_dir / "02.tql"] + solo_tests = [solo_dir / "01.tql", solo_dir / "02.tql"] + for path in [*parallel_tests, *solo_tests]: + path.write_text("version\nwrite_json\n", encoding="utf-8") + run._clear_directory_config_cache() + + class RecordingRunner(Runner): + def __init__(self) -> None: + super().__init__(name="recording") + + def collect_tests(self, path: Path) -> set[tuple[Runner, Path]]: + if path.is_file(): + return {(self, path)} + return set() + + def purge(self) -> None: + return None + + def run(self, test: Path, update: bool, coverage: bool = False) -> bool: # noqa: ARG002 + time.sleep(0.01) + return True + + runner = RecordingRunner() + monkeypatch.setattr(run, "get_runner_for_test", lambda path: runner) + + try: + queue = run._build_queue_from_paths([*parallel_tests, *solo_tests], coverage=False) + queue.sort(key=run._queue_sort_key, reverse=True) + jobs = 2 + test_slots = threading.BoundedSemaphore(jobs) + workers = [ + run.Worker( + queue, + update=False, + coverage=False, + jobs=jobs, + test_slots=test_slots, + ) + for _ in range(jobs) + ] + + for worker in workers: + worker.start() + for worker in workers: + worker._thread.join(timeout=2.0) + assert not worker._thread.is_alive(), "worker stalled with default suite queue state" + + summary = run.Summary() + for worker in workers: + summary += worker.join() + assert summary.total == len(parallel_tests) + len(solo_tests) + assert summary.failed == 0 + finally: + run._clear_directory_config_cache() + run._INTERRUPT_EVENT.clear() + run._INTERRUPT_ANNOUNCED.clear() + run.apply_settings(original_settings) + + +def test_workers_prioritize_parallel_suites_before_standalone_tests(tmp_path: Path) -> None: + original_settings = config.Settings( + root=run.ROOT, + tenzir_binary=run.TENZIR_BINARY, + tenzir_node_binary=run.TENZIR_NODE_BINARY, + ) + run.apply_settings( + config.Settings( + root=tmp_path, + tenzir_binary=run.TENZIR_BINARY, + tenzir_node_binary=run.TENZIR_NODE_BINARY, + ) + ) + tests_dir = tmp_path / "tests" + alpha_dir = tests_dir / "alpha" + beta_dir = tests_dir / "beta" + solo_dir = tests_dir / "solo" + alpha_dir.mkdir(parents=True) + beta_dir.mkdir(parents=True) + solo_dir.mkdir(parents=True) + + alpha_tests = [alpha_dir / "01.tql", alpha_dir / "02.tql"] + beta_tests = [beta_dir / "01.tql", beta_dir / "02.tql"] + solo_tests = [solo_dir / f"{index:02d}.tql" for index in range(1, 9)] + for path in [*alpha_tests, *beta_tests, *solo_tests]: + path.write_text("version\nwrite_json\n", encoding="utf-8") + run._clear_directory_config_cache() + + execution_order: list[str] = [] + execution_lock = threading.Lock() + + class RecordingRunner(Runner): + def __init__(self) -> None: + super().__init__(name="recording") + + def collect_tests(self, path: Path) -> set[tuple[Runner, Path]]: # noqa: ARG002 + return set() + + def purge(self) -> None: + return None + + def run(self, test: Path, update: bool, coverage: bool = False) -> bool: # noqa: ARG002 + with execution_lock: + execution_order.append(f"{test.parent.name}:{test.name}") + time.sleep(0.01) + return True + + runner = RecordingRunner() + queue: list[run.RunnerQueueItem] = [ + run.SuiteQueueItem( + suite=run.SuiteInfo( + name="alpha", + directory=alpha_dir, + mode=run.SuiteExecutionMode.PARALLEL, + ), + tests=[run.TestQueueItem(runner=runner, path=path) for path in alpha_tests], + fixtures=tuple(), + ), + run.SuiteQueueItem( + suite=run.SuiteInfo( + name="beta", + directory=beta_dir, + mode=run.SuiteExecutionMode.PARALLEL, + ), + tests=[run.TestQueueItem(runner=runner, path=path) for path in beta_tests], + fixtures=tuple(), + ), + *[run.TestQueueItem(runner=runner, path=path) for path in solo_tests], + ] + + jobs = 2 + test_slots = threading.BoundedSemaphore(jobs) + slot_lock = threading.Lock() + queue_lock = threading.Lock() + suite_queue_state = run.SuiteQueueState(pending_items=run._count_suite_queue_items(queue)) + workers = [ + run.Worker( + queue, + update=False, + coverage=False, + jobs=jobs, + test_slots=test_slots, + slot_lock=slot_lock, + queue_lock=queue_lock, + suite_queue_state=suite_queue_state, + ) + for _ in range(jobs) + ] + + try: + for worker in workers: + worker.start() + summary = run.Summary() + for worker in workers: + summary += worker.join() + + assert summary.total == len(alpha_tests) + len(beta_tests) + len(solo_tests) + assert summary.failed == 0 + + first_solo_index = next( + i for i, item in enumerate(execution_order) if item.startswith("solo:") + ) + first_alpha_index = next( + i for i, item in enumerate(execution_order) if item.startswith("alpha:") + ) + first_beta_index = next( + i for i, item in enumerate(execution_order) if item.startswith("beta:") + ) + assert first_alpha_index < first_solo_index + assert first_beta_index < first_solo_index + finally: + run._clear_directory_config_cache() + run._INTERRUPT_EVENT.clear() + run._INTERRUPT_ANNOUNCED.clear() + run.apply_settings(original_settings) + + +def test_workers_hold_suite_priority_until_parallel_suite_reserves_slots( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + original_settings = config.Settings( + root=run.ROOT, + tenzir_binary=run.TENZIR_BINARY, + tenzir_node_binary=run.TENZIR_NODE_BINARY, + ) + run.apply_settings( + config.Settings( + root=tmp_path, + tenzir_binary=run.TENZIR_BINARY, + tenzir_node_binary=run.TENZIR_NODE_BINARY, + ) + ) + tests_dir = tmp_path / "tests" + suite_dir = tests_dir / "suite" + solo_dir = tests_dir / "solo" + suite_dir.mkdir(parents=True) + solo_dir.mkdir(parents=True) + (suite_dir / "test.yaml").write_text( + "suite:\n name: suite\n mode: parallel\n min_jobs: 2\n", + encoding="utf-8", + ) + suite_test = suite_dir / "01.tql" + suite_test_two = suite_dir / "02.tql" + solo_test = solo_dir / "01.tql" + suite_test.write_text("version\nwrite_json\n", encoding="utf-8") + suite_test_two.write_text("version\nwrite_json\n", encoding="utf-8") + solo_test.write_text("version\nwrite_json\n", encoding="utf-8") + run._clear_directory_config_cache() + + reservation_waiting = threading.Event() + release_reservation = threading.Event() + solo_started = threading.Event() + original_acquire_suite_slots = run.Worker._acquire_suite_test_slots + + class RecordingRunner(Runner): + def __init__(self) -> None: + super().__init__(name="recording") + + def collect_tests(self, path: Path) -> set[tuple[Runner, Path]]: + if path.is_file(): + return {(self, path)} + return set() + + def purge(self) -> None: + return None + + def run(self, test: Path, update: bool, coverage: bool = False) -> bool: # noqa: ARG002 + if test == solo_test: + solo_started.set() + time.sleep(0.02) + return True + + def delayed_acquire(self: run.Worker, max_slots: int, *, min_slots: int = 1) -> int: + reservation_waiting.set() + release_reservation.wait(timeout=1.0) + return original_acquire_suite_slots(self, max_slots, min_slots=min_slots) + + runner = RecordingRunner() + monkeypatch.setattr(run, "get_runner_for_test", lambda path: runner) + monkeypatch.setattr(run.Worker, "_acquire_suite_test_slots", delayed_acquire) + + try: + queue = run._build_queue_from_paths([suite_test, suite_test_two, solo_test], coverage=False) + queue.sort(key=run._queue_sort_key, reverse=True) + assert len(queue) == 2 + assert isinstance(queue[0], run.SuiteQueueItem) or isinstance(queue[1], run.SuiteQueueItem) + + jobs = 2 + test_slots = threading.BoundedSemaphore(jobs) + slot_lock = threading.Lock() + queue_lock = threading.Lock() + suite_queue_state = run.SuiteQueueState(pending_items=run._count_suite_queue_items(queue)) + workers = [ + run.Worker( + queue, + update=False, + coverage=False, + jobs=jobs, + test_slots=test_slots, + slot_lock=slot_lock, + queue_lock=queue_lock, + suite_queue_state=suite_queue_state, + ) + for _ in range(jobs) + ] + for worker in workers: + worker.start() + + assert reservation_waiting.wait(timeout=1.0) + time.sleep(0.05) + assert not solo_started.is_set() + + release_reservation.set() + summary = run.Summary() + for worker in workers: + summary += worker.join() + assert summary.total == 3 + assert summary.failed == 0 + finally: + run._clear_directory_config_cache() + run._INTERRUPT_EVENT.clear() + run._INTERRUPT_ANNOUNCED.clear() + run.apply_settings(original_settings) + + +def test_workers_do_not_start_sequential_suite_while_parallel_priority_is_pending( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + original_settings = config.Settings( + root=run.ROOT, + tenzir_binary=run.TENZIR_BINARY, + tenzir_node_binary=run.TENZIR_NODE_BINARY, + ) + run.apply_settings( + config.Settings( + root=tmp_path, + tenzir_binary=run.TENZIR_BINARY, + tenzir_node_binary=run.TENZIR_NODE_BINARY, + ) + ) + tests_dir = tmp_path / "tests" + parallel_dir = tests_dir / "parallel" + sequential_dir = tests_dir / "sequential" + parallel_dir.mkdir(parents=True) + sequential_dir.mkdir(parents=True) + (parallel_dir / "test.yaml").write_text( + "suite:\n name: parallel\n mode: parallel\n min_jobs: 2\n", + encoding="utf-8", + ) + (sequential_dir / "test.yaml").write_text( + "suite:\n name: sequential\n mode: sequential\n", + encoding="utf-8", + ) + parallel_tests = [parallel_dir / "01.tql", parallel_dir / "02.tql"] + sequential_test = sequential_dir / "01.tql" + for path in [*parallel_tests, sequential_test]: + path.write_text("version\nwrite_json\n", encoding="utf-8") + run._clear_directory_config_cache() + + reservation_waiting = threading.Event() + release_reservation = threading.Event() + sequential_started = threading.Event() + original_acquire_suite_slots = run.Worker._acquire_suite_test_slots + + class RecordingRunner(Runner): + def __init__(self) -> None: + super().__init__(name="recording") + + def collect_tests(self, path: Path) -> set[tuple[Runner, Path]]: + if path.is_file(): + return {(self, path)} + return set() + + def purge(self) -> None: + return None + + def run(self, test: Path, update: bool, coverage: bool = False) -> bool: # noqa: ARG002 + if test.parent == sequential_dir: + sequential_started.set() + time.sleep(0.02) + return True + + def delayed_acquire(self: run.Worker, max_slots: int, *, min_slots: int = 1) -> int: + reservation_waiting.set() + release_reservation.wait(timeout=1.0) + return original_acquire_suite_slots(self, max_slots, min_slots=min_slots) - try: - queue = run._build_queue_from_paths(tests, coverage=False) - queue.sort(key=run._queue_sort_key, reverse=True) - assert len(queue) == 2 - for item in queue: - assert isinstance(item, run.SuiteQueueItem) - assert item.suite.mode is run.SuiteExecutionMode.PARALLEL + runner = RecordingRunner() + monkeypatch.setattr(run.Worker, "_acquire_suite_test_slots", delayed_acquire) + + queue: list[run.RunnerQueueItem] = [ + run.SuiteQueueItem( + suite=run.SuiteInfo( + name="parallel", + directory=parallel_dir, + mode=run.SuiteExecutionMode.PARALLEL, + min_jobs=2, + ), + tests=[run.TestQueueItem(runner=runner, path=path) for path in parallel_tests], + fixtures=tuple(), + ), + run.SuiteQueueItem( + suite=run.SuiteInfo( + name="sequential", + directory=sequential_dir, + mode=run.SuiteExecutionMode.SEQUENTIAL, + ), + tests=[run.TestQueueItem(runner=runner, path=sequential_test)], + fixtures=tuple(), + ), + ] + try: jobs = 2 test_slots = threading.BoundedSemaphore(jobs) + slot_lock = threading.Lock() + queue_lock = threading.Lock() + suite_queue_state = run.SuiteQueueState(pending_items=run._count_suite_queue_items(queue)) workers = [ run.Worker( queue, @@ -807,18 +1510,25 @@ def run(self, test: Path, update: bool, coverage: bool = False) -> bool: # noqa coverage=False, jobs=jobs, test_slots=test_slots, + slot_lock=slot_lock, + queue_lock=queue_lock, + suite_queue_state=suite_queue_state, ) for _ in range(jobs) ] for worker in workers: worker.start() + + assert reservation_waiting.wait(timeout=1.0) + time.sleep(0.05) + assert not sequential_started.is_set() + + release_reservation.set() summary = run.Summary() for worker in workers: summary += worker.join() - - assert summary.total == 6 + assert summary.total == 3 assert summary.failed == 0 - assert observed["max_active"] <= jobs finally: run._clear_directory_config_cache() run._INTERRUPT_EVENT.clear() @@ -1646,6 +2356,318 @@ def run(self, test: Path, update: bool, coverage: bool = False) -> bool: # noqa assert run._count_queue_tests(queue) == 4 +def test_count_suite_queue_items_counts_only_parallel_suites(tmp_path: Path) -> None: + class StubRunner(run.Runner): + def __init__(self) -> None: + super().__init__(name="stub") + + def collect_tests( # noqa: ARG002 + self, path: Path + ) -> set[tuple[run.Runner, Path]]: + return set() + + def purge(self) -> None: + return None + + def run(self, test: Path, update: bool, coverage: bool = False) -> bool: # noqa: ARG002 + return True + + runner = StubRunner() + parallel_dir = tmp_path / "parallel" + sequential_dir = tmp_path / "sequential" + parallel_dir.mkdir() + sequential_dir.mkdir() + queue: list[run.RunnerQueueItem] = [ + run.SuiteQueueItem( + suite=run.SuiteInfo( + name="parallel", + directory=parallel_dir, + mode=run.SuiteExecutionMode.PARALLEL, + ), + tests=[run.TestQueueItem(runner=runner, path=parallel_dir / "01.tql")], + fixtures=tuple(), + ), + run.SuiteQueueItem( + suite=run.SuiteInfo( + name="sequential", + directory=sequential_dir, + mode=run.SuiteExecutionMode.SEQUENTIAL, + ), + tests=[run.TestQueueItem(runner=runner, path=sequential_dir / "01.tql")], + fixtures=tuple(), + ), + ] + + assert run._count_suite_queue_items(queue) == 1 + + +def test_validate_parallel_suite_min_jobs_raises_when_underprovisioned(tmp_path: Path) -> None: + class StubRunner(run.Runner): + def __init__(self) -> None: + super().__init__(name="stub") + + def collect_tests( # noqa: ARG002 + self, path: Path + ) -> set[tuple[run.Runner, Path]]: + return set() + + def purge(self) -> None: + return None + + def run(self, test: Path, update: bool, coverage: bool = False) -> bool: # noqa: ARG002 + return True + + runner = StubRunner() + suite_dir = tmp_path / "suite" + suite_dir.mkdir() + queue: list[run.RunnerQueueItem] = [ + run.SuiteQueueItem( + suite=run.SuiteInfo( + name="parallel-pubsub", + directory=suite_dir, + mode=run.SuiteExecutionMode.PARALLEL, + min_jobs=3, + ), + tests=[ + run.TestQueueItem(runner=runner, path=suite_dir / "01.tql"), + run.TestQueueItem(runner=runner, path=suite_dir / "02.tql"), + ], + fixtures=tuple(), + ) + ] + + with pytest.raises(run.HarnessError, match="requires at least --jobs=3"): + run._validate_parallel_suite_min_jobs(queue, jobs=2) + + +def test_warn_parallel_suite_oversubscription_emits_warning( + tmp_path: Path, capsys: pytest.CaptureFixture[str] +) -> None: + class StubRunner(run.Runner): + def __init__(self) -> None: + super().__init__(name="stub") + + def collect_tests( # noqa: ARG002 + self, path: Path + ) -> set[tuple[run.Runner, Path]]: + return set() + + def purge(self) -> None: + return None + + def run(self, test: Path, update: bool, coverage: bool = False) -> bool: # noqa: ARG002 + return True + + runner = StubRunner() + suite_dir = tmp_path / "suite" + suite_dir.mkdir() + queue: list[run.RunnerQueueItem] = [ + run.SuiteQueueItem( + suite=run.SuiteInfo( + name="parallel-pubsub", + directory=suite_dir, + mode=run.SuiteExecutionMode.PARALLEL, + ), + tests=[ + run.TestQueueItem(runner=runner, path=suite_dir / "01.tql"), + run.TestQueueItem(runner=runner, path=suite_dir / "02.tql"), + run.TestQueueItem(runner=runner, path=suite_dir / "03.tql"), + ], + fixtures=tuple(), + ) + ] + + run._warn_parallel_suite_oversubscription(queue, jobs=2) + output = capsys.readouterr().out + assert "warning: parallel suite 'parallel-pubsub'" in output + assert "--jobs=2" in output + + +def test_pop_next_queue_item_prefers_suites() -> None: + class StubRunner(run.Runner): + def __init__(self) -> None: + super().__init__(name="stub") + + def collect_tests( # noqa: ARG002 + self, path: Path + ) -> set[tuple[run.Runner, Path]]: + return set() + + def purge(self) -> None: + return None + + def run(self, test: Path, update: bool, coverage: bool = False) -> bool: # noqa: ARG002 + return True + + runner = StubRunner() + suite_dir = Path("/tmp/suite") + queue: list[run.RunnerQueueItem] = [ + run.TestQueueItem(runner=runner, path=Path("/tmp/a.tql")), + run.TestQueueItem(runner=runner, path=Path("/tmp/b.tql")), + run.SuiteQueueItem( + suite=run.SuiteInfo( + name="parallel-pubsub", + directory=suite_dir, + mode=run.SuiteExecutionMode.PARALLEL, + ), + tests=[run.TestQueueItem(runner=runner, path=Path("/tmp/suite/01.tql"))], + fixtures=tuple(), + ), + ] + queue_lock = threading.Lock() + + popped = run._pop_next_queue_item(queue, queue_lock=queue_lock) + + assert isinstance(popped, run.SuiteQueueItem) + assert len(queue) == 2 + + +def test_pop_next_queue_item_prefers_parallel_suite_when_pending() -> None: + class StubRunner(run.Runner): + def __init__(self) -> None: + super().__init__(name="stub") + + def collect_tests( # noqa: ARG002 + self, path: Path + ) -> set[tuple[run.Runner, Path]]: + return set() + + def purge(self) -> None: + return None + + def run(self, test: Path, update: bool, coverage: bool = False) -> bool: # noqa: ARG002 + return True + + runner = StubRunner() + parallel_dir = Path("/tmp/parallel") + sequential_dir = Path("/tmp/sequential") + queue: list[run.RunnerQueueItem] = [ + run.SuiteQueueItem( + suite=run.SuiteInfo( + name="parallel", + directory=parallel_dir, + mode=run.SuiteExecutionMode.PARALLEL, + ), + tests=[run.TestQueueItem(runner=runner, path=parallel_dir / "01.tql")], + fixtures=tuple(), + ), + run.SuiteQueueItem( + suite=run.SuiteInfo( + name="sequential", + directory=sequential_dir, + mode=run.SuiteExecutionMode.SEQUENTIAL, + ), + tests=[run.TestQueueItem(runner=runner, path=sequential_dir / "01.tql")], + fixtures=tuple(), + ), + run.TestQueueItem(runner=runner, path=Path("/tmp/solo.tql")), + ] + queue_lock = threading.Lock() + suite_state = run.SuiteQueueState(pending_items=1) + + popped = run._pop_next_queue_item(queue, queue_lock=queue_lock, suite_state=suite_state) + + assert isinstance(popped, run.SuiteQueueItem) + assert popped.suite.mode is run.SuiteExecutionMode.PARALLEL + + +def test_pop_next_queue_item_skips_scan_when_no_suites_pending() -> None: + class StubRunner(run.Runner): + def __init__(self) -> None: + super().__init__(name="stub") + + def collect_tests( # noqa: ARG002 + self, path: Path + ) -> set[tuple[run.Runner, Path]]: + return set() + + def purge(self) -> None: + return None + + def run(self, test: Path, update: bool, coverage: bool = False) -> bool: # noqa: ARG002 + return True + + class TrackingQueue(list): + def __init__(self, items: list[run.RunnerQueueItem]) -> None: + super().__init__(items) + self.index_reads = 0 + + def __getitem__(self, index): + self.index_reads += 1 + return super().__getitem__(index) + + runner = StubRunner() + queue = TrackingQueue( + [ + run.TestQueueItem(runner=runner, path=Path("/tmp/a.tql")), + run.TestQueueItem(runner=runner, path=Path("/tmp/b.tql")), + run.TestQueueItem(runner=runner, path=Path("/tmp/c.tql")), + ] + ) + queue_lock = threading.Lock() + suite_state = run.SuiteQueueState(pending_items=0) + + popped = run._pop_next_queue_item(queue, queue_lock=queue_lock, suite_state=suite_state) + + assert isinstance(popped, run.TestQueueItem) + assert popped.path == Path("/tmp/c.tql") + assert queue.index_reads == 0 + + queue.index_reads = 0 + popped = run._pop_next_queue_item(queue, queue_lock=queue_lock, suite_state=suite_state) + assert isinstance(popped, run.TestQueueItem) + assert popped.path == Path("/tmp/b.tql") + assert queue.index_reads == 0 + + +def test_pop_next_queue_item_waits_for_pending_suite_priority() -> None: + class StubRunner(run.Runner): + def __init__(self) -> None: + super().__init__(name="stub") + + def collect_tests( # noqa: ARG002 + self, path: Path + ) -> set[tuple[run.Runner, Path]]: + return set() + + def purge(self) -> None: + return None + + def run(self, test: Path, update: bool, coverage: bool = False) -> bool: # noqa: ARG002 + return True + + runner = StubRunner() + queue: list[run.RunnerQueueItem] = [ + run.TestQueueItem(runner=runner, path=Path("/tmp/a.tql")), + run.TestQueueItem(runner=runner, path=Path("/tmp/b.tql")), + run.TestQueueItem(runner=runner, path=Path("/tmp/c.tql")), + ] + queue_lock = threading.Lock() + suite_state = run.SuiteQueueState(pending_items=1) + popped: list[run.RunnerQueueItem | None] = [] + + def pop_item() -> None: + popped.append( + run._pop_next_queue_item(queue, queue_lock=queue_lock, suite_state=suite_state) + ) + + thread = threading.Thread(target=pop_item) + thread.start() + time.sleep(0.05) + assert thread.is_alive() + assert not popped + + with queue_lock: + suite_state.pending_items = 0 + thread.join(timeout=1.0) + assert not thread.is_alive() + + assert len(popped) == 1 + popped_item = popped[0] + assert isinstance(popped_item, run.TestQueueItem) + assert popped_item.path == Path("/tmp/c.tql") + + def test_cli_rejects_partial_suite_selection( tmp_path: Path, capsys: pytest.CaptureFixture[str] ) -> None: @@ -3699,6 +4721,97 @@ def test_run_cli_reports_no_matching_run_skipped_filters( run.apply_settings(original_settings) +def test_run_cli_warns_for_parallel_suite_larger_than_jobs( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], +) -> None: + info = _make_project(tmp_path) + suite_dir = info["root"] / "tests" / "parallel" + suite_dir.mkdir(parents=True, exist_ok=True) + (suite_dir / "test.yaml").write_text( + "suite:\n name: parallel\n mode: parallel\n", + encoding="utf-8", + ) + for index in range(1, 4): + (suite_dir / f"{index:02d}.tql").write_text("version\n", encoding="utf-8") + + original_settings = config.Settings( + root=run.ROOT, + tenzir_binary=run.TENZIR_BINARY, + tenzir_node_binary=run.TENZIR_NODE_BINARY, + ) + + class FakeWorker: + def __init__( + self, queue: list[run.RunnerQueueItem], *args: object, **kwargs: object + ) -> None: + del args, kwargs + self.run_skipped_match_count = 0 + self._summary = run.Summary(total=run._count_queue_tests(queue)) + + def start(self) -> None: + return None + + def join(self) -> run.Summary: + return self._summary + + monkeypatch.setenv("TENZIR_BINARY", "/usr/bin/env") + monkeypatch.setenv("TENZIR_NODE_BINARY", "/usr/bin/env") + monkeypatch.setattr(run, "get_version", lambda: "0.0.0") + monkeypatch.setattr(run, "Worker", FakeWorker) + + try: + kwargs = _run_cli_kwargs(info["root"], tests=[suite_dir]) + kwargs["jobs"] = 1 + result = run.run_cli(**kwargs) + assert result.exit_code == 0 + output = capsys.readouterr().out + assert "warning: parallel suite 'parallel'" in output + assert "--jobs=1" in output + finally: + run._clear_directory_config_cache() + run.apply_settings(original_settings) + + +def test_run_cli_fails_fast_when_parallel_suite_min_jobs_is_unmet( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + info = _make_project(tmp_path) + suite_dir = info["root"] / "tests" / "parallel" + suite_dir.mkdir(parents=True, exist_ok=True) + (suite_dir / "test.yaml").write_text( + "suite:\n name: parallel\n mode: parallel\n min_jobs: 2\n", + encoding="utf-8", + ) + (suite_dir / "01.tql").write_text("version\n", encoding="utf-8") + + original_settings = config.Settings( + root=run.ROOT, + tenzir_binary=run.TENZIR_BINARY, + tenzir_node_binary=run.TENZIR_NODE_BINARY, + ) + + class UnexpectedWorker: + def __init__(self, *args: object, **kwargs: object) -> None: + raise AssertionError("worker construction should not be reached") + + monkeypatch.setenv("TENZIR_BINARY", "/usr/bin/env") + monkeypatch.setenv("TENZIR_NODE_BINARY", "/usr/bin/env") + monkeypatch.setattr(run, "get_version", lambda: "0.0.0") + monkeypatch.setattr(run, "Worker", UnexpectedWorker) + + try: + kwargs = _run_cli_kwargs(info["root"], tests=[suite_dir]) + kwargs["jobs"] = 1 + with pytest.raises(run.HarnessError, match="parallel suite job requirements not met"): + run.run_cli(**kwargs) + finally: + run._clear_directory_config_cache() + run.apply_settings(original_settings) + + def test_run_cli_handles_interrupt_during_version_probe( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> None: diff --git a/tests/test_run_config.py b/tests/test_run_config.py index c1ee35d..75e627e 100644 --- a/tests/test_run_config.py +++ b/tests/test_run_config.py @@ -287,6 +287,25 @@ def test_resolve_suite_for_test_reads_suite_record_mode( assert suite_info.mode is run.SuiteExecutionMode.PARALLEL +def test_resolve_suite_for_test_reads_suite_record_min_jobs( + tmp_path: Path, configured_root: Path +) -> None: + suite_dir = tmp_path / "tests" + suite_dir.mkdir() + (suite_dir / "test.yaml").write_text( + "suite:\n name: shared-suite\n mode: parallel\n min_jobs: 2\n", + encoding="utf-8", + ) + test_file = suite_dir / "member.tql" + test_file.write_text("version\nwrite_json\n", encoding="utf-8") + run._clear_directory_config_cache() + + suite_info = run._resolve_suite_for_test(test_file) + + assert suite_info is not None + assert suite_info.min_jobs == 2 + + def test_resolve_suite_for_test_defaults_suite_record_mode_to_sequential( tmp_path: Path, configured_root: Path ) -> None: @@ -304,6 +323,7 @@ def test_resolve_suite_for_test_defaults_suite_record_mode_to_sequential( assert suite_info is not None assert suite_info.mode is run.SuiteExecutionMode.SEQUENTIAL + assert suite_info.min_jobs is None def test_parse_test_config_rejects_suite_record_with_invalid_mode( @@ -323,6 +343,40 @@ def test_parse_test_config_rejects_suite_record_with_invalid_mode( run.parse_test_config(test_file) +def test_parse_test_config_rejects_suite_record_with_invalid_min_jobs_type( + tmp_path: Path, configured_root: Path +) -> None: + suite_dir = tmp_path / "tests" + suite_dir.mkdir() + (suite_dir / "test.yaml").write_text( + "suite:\n name: shared-suite\n mode: parallel\n min_jobs: two\n", + encoding="utf-8", + ) + run._clear_directory_config_cache() + test_file = suite_dir / "member.tql" + test_file.write_text("version\nwrite_json\n", encoding="utf-8") + + with pytest.raises(ValueError, match="Invalid value for 'suite.min_jobs'"): + run.parse_test_config(test_file) + + +def test_parse_test_config_rejects_suite_record_with_nonpositive_min_jobs( + tmp_path: Path, configured_root: Path +) -> None: + suite_dir = tmp_path / "tests" + suite_dir.mkdir() + (suite_dir / "test.yaml").write_text( + "suite:\n name: shared-suite\n mode: parallel\n min_jobs: 0\n", + encoding="utf-8", + ) + run._clear_directory_config_cache() + test_file = suite_dir / "member.tql" + test_file.write_text("version\nwrite_json\n", encoding="utf-8") + + with pytest.raises(ValueError, match="Invalid value for 'suite.min_jobs'"): + run.parse_test_config(test_file) + + def test_get_test_env_and_config_args(configured_root: Path) -> None: test_dir = configured_root / "suite" test_dir.mkdir()