diff --git a/.github/workflows/e2e-gpu-job.yml b/.github/workflows/e2e-gpu-job.yml index bc05403ee..04c14ccb2 100644 --- a/.github/workflows/e2e-gpu-job.yml +++ b/.github/workflows/e2e-gpu-job.yml @@ -19,6 +19,11 @@ on: required: true type: number description: "Job timeout in minutes" + test_timeout: + required: false + type: number + default: 25 + description: "pytest step timeout in minutes (must be < job timeout)" extra_deps: required: false type: string @@ -113,7 +118,7 @@ jobs: # Run tests - name: Run E2E tests - timeout-minutes: 25 + timeout-minutes: ${{ inputs.test_timeout }} env: SHOW_WORKER_LOGS: "0" SHOW_ROUTER_LOGS: "1" diff --git a/.github/workflows/pr-test-rust.yml b/.github/workflows/pr-test-rust.yml index 7f0a8bbdc..751f60cb8 100644 --- a/.github/workflows/pr-test-rust.yml +++ b/.github/workflows/pr-test-rust.yml @@ -466,22 +466,28 @@ jobs: || (needs.detect-changes.result == 'success' && (needs.detect-changes.outputs.common == 'true' || needs.detect-changes.outputs.chat-completions == 'true'))) + # Absorbs the previously-2-GPU chat tests (gpt-oss-20b and + # Qwen2.5-14B at tp=1). Timeouts and test_timeout bumped accordingly. strategy: fail-fast: false matrix: include: - engine: sglang - timeout: 30 + timeout: 60 + test_timeout: 45 - engine: vllm - timeout: 20 + timeout: 45 + test_timeout: 35 - engine: trtllm - timeout: 90 + timeout: 120 + test_timeout: 90 uses: ./.github/workflows/e2e-gpu-job.yml with: engine: ${{ matrix.engine }} gpu_tier: "1" runner: 1-gpu-h100 timeout: ${{ matrix.timeout }} + test_timeout: ${{ matrix.test_timeout }} test_dirs: e2e_test/chat_completions secrets: inherit @@ -573,29 +579,12 @@ jobs: # === 2 GPU === - e2e-2gpu-chat: - name: e2e-2gpu-chat (${{ matrix.engine }}) - needs: [e2e-1gpu-chat] - strategy: - fail-fast: false - matrix: - include: - - engine: sglang - timeout: 20 - - engine: vllm - timeout: 20 - - engine: trtllm - timeout: 30 - uses: ./.github/workflows/e2e-gpu-job.yml - with: - engine: ${{ matrix.engine }} - gpu_tier: "2" - runner: 2-gpu-h100 - timeout: ${{ matrix.timeout }} - test_dirs: e2e_test/chat_completions - secrets: inherit + # Note: the previous e2e-2gpu-chat job was retired alongside the tp=1 + # drop on Qwen2.5-14B-Instruct / gpt-oss-20b — every chat_completions + # class that used those models fits on a single H100 now, so all + # chat_completions tests run under e2e-1gpu-chat above. - e2e-2gpu-responses: + e2e-1gpu-responses: needs: [build-wheel, detect-changes] if: >- always() @@ -608,8 +597,8 @@ jobs: uses: ./.github/workflows/e2e-gpu-job.yml with: engine: sglang - gpu_tier: "2" - runner: 2-gpu-h100 + gpu_tier: "1" + runner: 1-gpu-h100 timeout: 30 test_dirs: e2e_test/responses setup_agentic_deps: true @@ -971,7 +960,7 @@ jobs: path: benchmark_go_bindings/ finish: - needs: [pre-commit, python-lint, grpc-proto-build-check, build-wheel, python-unit-tests, unit-tests, benchmarks, e2e-1gpu-chat, e2e-1gpu-completions, e2e-1gpu-embeddings, e2e-1gpu-gateway, e2e-2gpu-chat, e2e-2gpu-responses, e2e-2gpu-pd, e2e-4gpu-chat, e2e-4gpu-gateway, e2e-vendor, go-unit-tests, go-bindings-e2e] + needs: [pre-commit, python-lint, grpc-proto-build-check, build-wheel, python-unit-tests, unit-tests, benchmarks, e2e-1gpu-chat, e2e-1gpu-completions, e2e-1gpu-embeddings, e2e-1gpu-gateway, e2e-1gpu-responses, e2e-2gpu-pd, e2e-4gpu-chat, e2e-4gpu-gateway, e2e-vendor, go-unit-tests, go-bindings-e2e] if: always() runs-on: k8s-runner-cpu permissions: {} @@ -989,8 +978,7 @@ jobs: "${{ needs.e2e-1gpu-completions.result }}" == "failure" || \ "${{ needs.e2e-1gpu-embeddings.result }}" == "failure" || \ "${{ needs.e2e-1gpu-gateway.result }}" == "failure" || \ - "${{ needs.e2e-2gpu-chat.result }}" == "failure" || \ - "${{ needs.e2e-2gpu-responses.result }}" == "failure" || \ + "${{ needs.e2e-1gpu-responses.result }}" == "failure" || \ "${{ needs.e2e-2gpu-pd.result }}" == "failure" || \ "${{ needs.e2e-4gpu-chat.result }}" == "failure" || \ "${{ needs.e2e-4gpu-gateway.result }}" == "failure" || \ diff --git a/e2e_test/chat_completions/test_function_calling.py b/e2e_test/chat_completions/test_function_calling.py index c24b8e9b5..5099dc4b1 100644 --- a/e2e_test/chat_completions/test_function_calling.py +++ b/e2e_test/chat_completions/test_function_calling.py @@ -1591,7 +1591,7 @@ def test_conflicting_defs_required_tool_choice(self, model, api_client): @pytest.mark.engine("sglang", "vllm", "trtllm") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.model("Qwen/Qwen2.5-14B-Instruct") @pytest.mark.gateway(extra_args=["--tool-call-parser", "qwen", "--history-backend", "memory"]) @pytest.mark.parametrize("setup_backend", ["grpc"], indirect=True) diff --git a/e2e_test/chat_completions/test_openai_server.py b/e2e_test/chat_completions/test_openai_server.py index 517cdbb4a..bb49f1b0e 100644 --- a/e2e_test/chat_completions/test_openai_server.py +++ b/e2e_test/chat_completions/test_openai_server.py @@ -360,7 +360,7 @@ def _delta_text(delta): @pytest.mark.engine("sglang", "vllm", "trtllm") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.model("openai/gpt-oss-20b") @pytest.mark.gateway(extra_args=["--history-backend", "memory"]) class TestChatCompletionGptOss(TestChatCompletion): diff --git a/e2e_test/chat_completions/test_structured_output.py b/e2e_test/chat_completions/test_structured_output.py index 654443bbd..8f9b59890 100644 --- a/e2e_test/chat_completions/test_structured_output.py +++ b/e2e_test/chat_completions/test_structured_output.py @@ -140,7 +140,7 @@ class TestStructuredOutputRegular(_TestStructuredOutputBase): @pytest.mark.engine("sglang", "vllm", "trtllm") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.model("openai/gpt-oss-20b") @pytest.mark.gateway(extra_args=["--history-backend", "memory"]) @pytest.mark.parametrize("setup_backend", ["grpc"], indirect=True) diff --git a/e2e_test/chat_completions/test_validation.py b/e2e_test/chat_completions/test_validation.py index 7192f8e08..a9ab7b182 100644 --- a/e2e_test/chat_completions/test_validation.py +++ b/e2e_test/chat_completions/test_validation.py @@ -168,7 +168,7 @@ def run_chat_completion(): @pytest.mark.engine("sglang", "vllm", "trtllm") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.model("openai/gpt-oss-20b") @pytest.mark.gateway(extra_args=["--history-backend", "memory"]) @pytest.mark.parametrize("setup_backend", ["grpc"], indirect=True) diff --git a/e2e_test/conftest.py b/e2e_test/conftest.py index 4a9d33fa8..d3848c684 100644 --- a/e2e_test/conftest.py +++ b/e2e_test/conftest.py @@ -108,6 +108,7 @@ def pytest_runtest_logstart(nodeid: str, location: tuple) -> None: pytest_collection_modifyitems, pytest_configure, pytest_runtest_setup, + pytest_sessionfinish, setup_backend, ) from smg_client import SmgClient @@ -151,6 +152,7 @@ def api_client(request, setup_backend): "pytest_runtest_setup", "pytest_collection_modifyitems", "pytest_configure", + "pytest_sessionfinish", # Fixtures "setup_backend", "backend_router", diff --git a/e2e_test/fixtures/__init__.py b/e2e_test/fixtures/__init__.py index 291915a2a..3cfc8ae10 100644 --- a/e2e_test/fixtures/__init__.py +++ b/e2e_test/fixtures/__init__.py @@ -12,6 +12,7 @@ pytest_collection_modifyitems, pytest_configure, pytest_runtest_setup, + pytest_sessionfinish, ) # Marker helpers @@ -25,6 +26,7 @@ "pytest_collection_modifyitems", "pytest_configure", "pytest_runtest_setup", + "pytest_sessionfinish", # Backend fixtures "setup_backend", "backend_router", diff --git a/e2e_test/fixtures/hooks.py b/e2e_test/fixtures/hooks.py index ca15e0027..8caf49ded 100644 --- a/e2e_test/fixtures/hooks.py +++ b/e2e_test/fixtures/hooks.py @@ -3,6 +3,10 @@ This module handles: - Marker registration: Defining custom pytest markers - Test filtering: Env-var-based filtering by engine, vendor, and GPU tier +- Test ordering: Cluster items by ``(backend, model)`` so the + session-scoped worker pool (``infra.worker_pool``) doesn't have to + evict-and-restart between consecutive classes that share a backend. +- Session teardown: Stop all pooled workers on session finish. """ from __future__ import annotations @@ -10,7 +14,7 @@ import os import pytest -from infra import get_runtime +from infra import cleanup_pool, get_runtime from .markers import resolve_class_marker @@ -110,32 +114,82 @@ def pytest_collection_modifyitems( config: pytest.Config, items: list[pytest.Item], ) -> None: - """Filter collected tests based on E2E_ENGINE, E2E_VENDOR, and E2E_GPU_TIER env vars.""" + """Filter + order collected tests. + + Filtering: env vars ``E2E_ENGINE``, ``E2E_VENDOR``, ``E2E_GPU_TIER`` + select the matching slice when set. + + Ordering: items are sorted by ``(backend, model)`` so consecutive + classes that share a backend cluster together. This is what lets + ``infra.worker_pool`` reuse a single worker across many test classes + instead of evicting on every boundary. + """ engine = os.environ.get("E2E_ENGINE") or None vendor = os.environ.get("E2E_VENDOR") or None gpu_tier = os.environ.get("E2E_GPU_TIER") or None - if not any([engine, vendor, gpu_tier]): - return - - selected: list[pytest.Item] = [] - for item in items: - # Filter by engine - if engine: - engine_marker = _get_marker(item, "engine") - if not engine_marker or engine not in engine_marker.args: - continue - # Filter by vendor - if vendor: - vendor_marker = _get_marker(item, "vendor") - if not vendor_marker or vendor not in vendor_marker.args: - continue - # Filter by GPU tier - if gpu_tier is not None: - gpu_marker = _get_marker(item, "gpu") - gpu_count = gpu_marker.args[0] if gpu_marker else 1 - if str(gpu_count) != gpu_tier: - continue - selected.append(item) - - items[:] = selected + if any([engine, vendor, gpu_tier]): + selected: list[pytest.Item] = [] + for item in items: + # Filter by engine + if engine: + engine_marker = _get_marker(item, "engine") + if not engine_marker or engine not in engine_marker.args: + continue + # Filter by vendor + if vendor: + vendor_marker = _get_marker(item, "vendor") + if not vendor_marker or vendor not in vendor_marker.args: + continue + # Filter by GPU tier + if gpu_tier is not None: + gpu_marker = _get_marker(item, "gpu") + gpu_count = gpu_marker.args[0] if gpu_marker else 1 + if str(gpu_count) != gpu_tier: + continue + selected.append(item) + items[:] = selected + + items.sort(key=_pool_sort_key) + + +def _pool_sort_key(item: pytest.Item) -> tuple: + """Sort key clustering items that would share a pool entry. + + Primary: backend parametrize value — ``setup_backend`` for class-scope + fixtures, falling back to ``backend_router`` for function-scope ones. + Different backends mean different pool keys. + Secondary: ``@pytest.mark.model`` value if set, else empty — same + backend with the same model is the cache-hit case. + Tertiary: ``item.nodeid`` for stability across collections. + """ + backend = "" + callspec = getattr(item, "callspec", None) + if callspec is not None: + params = getattr(callspec, "params", {}) or {} + backend = str(params.get("setup_backend", params.get("backend_router", ""))) + + model_marker = resolve_class_marker(item, "model") + model = "" + if model_marker is not None and model_marker.args: + model = str(model_marker.args[0]) + + return (backend, model, item.nodeid) + + +# --------------------------------------------------------------------------- +# Session teardown — stop pooled workers +# --------------------------------------------------------------------------- + + +def pytest_sessionfinish( + session: pytest.Session, # noqa: ARG001 + exitstatus: int, # noqa: ARG001 +) -> None: + """Tear down any workers held by the session-scoped pool. + + The pool also has an ``atexit`` handler for cases where pytest exits + abnormally before this hook runs (SIGINT, ``pytest.exit``), but the + explicit hook is cheaper and gives clean log output during normal runs. + """ + cleanup_pool() diff --git a/e2e_test/fixtures/setup_backend.py b/e2e_test/fixtures/setup_backend.py index 54a7d71d7..693e7f1fb 100644 --- a/e2e_test/fixtures/setup_backend.py +++ b/e2e_test/fixtures/setup_backend.py @@ -1,8 +1,15 @@ """Backend setup fixtures for E2E tests. -Simplified backend lifecycle -- one set of workers and gateway per test class. -No model pool, no thread-local caching. Direct worker management via -start_workers/stop_workers. +One gateway per test class; workers come from the session-scoped pool in +``infra.worker_pool`` so they survive class teardown when the next class +needs the same backend. The pool keys reuse on +``(engine, model_id, mode, worker_type, count)`` and gates reuse on +worker liveness. + +PD-disaggregation paths run through the pool too (so it can evict a +stale cached worker holding their GPUs) but the caller owns teardown of +the prefill/decode workers via ``stop_workers``. The function-scoped +``backend_router`` fixture intentionally bypasses the pool entirely. """ from __future__ import annotations @@ -27,7 +34,8 @@ launch_cloud_gateway, ) from infra.model_specs import get_model_spec -from infra.worker import start_workers, stop_workers +from infra.worker import stop_workers +from infra.worker_pool import get_pool from .markers import get_marker_kwargs, get_marker_value @@ -49,10 +57,17 @@ def _start_workers_tracked(**kwargs) -> list: - """Start workers and track failures by engine for fail-fast.""" + """Start workers via the session pool and track failures for fail-fast. + + The pool caches REGULAR workers across class boundaries — caller MUST + NOT call ``stop_workers`` on those. Non-REGULAR workers (PD + prefill/decode) skip the cache but still run through the pool so it + can evict any stale cached worker holding their GPUs; the caller still + owns teardown of those. + """ engine = kwargs.get("engine") or get_runtime() try: - return start_workers(**kwargs) + return get_pool().acquire(**kwargs) except (TimeoutError, RuntimeError): _worker_start_failures[engine] = _worker_start_failures.get(engine, 0) + 1 raise @@ -175,7 +190,12 @@ def _setup_local( backend_name, log_dir, ): - """Launch regular workers + gateway, yield result tuple, tear down.""" + """Launch regular workers + gateway, yield result tuple, tear down. + + Workers are acquired from the session-scoped pool so they survive class + teardown when the next class needs the same backend. The gateway is + per-class (its config can differ across classes) and is torn down here. + """ num_workers = workers_config.get("count") or 1 logger.info("Starting %s backend: model=%s, workers=%d", backend_name, model_id, num_workers) @@ -196,9 +216,8 @@ def _setup_local( logger.info("%s backend ready at %s", backend_name, gateway.base_url) yield backend_name, model_path, _make_openai_client(gateway), gateway finally: - logger.info("Tearing down %s backend", backend_name) + logger.info("Tearing down %s backend (workers stay in pool)", backend_name) gateway.shutdown() - stop_workers(workers) # --------------------------------------------------------------------------- @@ -317,8 +336,9 @@ def _setup_cloud(backend_name, request, gateway_config): def backend_router(request: pytest.FixtureRequest): """Function-scoped fixture that launches a fresh gateway per test. - Starts a single worker and a new gateway for each test function. - Use when tests need isolated router state. + A new gateway is started for each test function; the worker comes + from the session-scoped pool (so the GPU isn't fought over with + class-scope tests). Use when tests need isolated router state. Usage:: @@ -331,11 +351,18 @@ def test_router_state(backend_router): connection_mode = ConnectionMode(backend_name) model_path = get_model_spec(model_id)["model"] - workers = start_workers(model_id, engine=get_runtime(), mode=connection_mode, count=1) + # Route through the pool so we evict any cached class-scope worker + # holding the GPUs we need. The pool retains ownership; we don't stop + # the workers ourselves. + workers = get_pool().acquire( + model_id=model_id, + engine=get_runtime(), + mode=connection_mode, + count=1, + ) gateway = Gateway() try: gateway.start(worker_urls=[w.base_url for w in workers], model_path=model_path) yield gateway finally: gateway.shutdown() - stop_workers(workers) diff --git a/e2e_test/infra/__init__.py b/e2e_test/infra/__init__.py index 3af5ed426..e1734cab0 100644 --- a/e2e_test/infra/__init__.py +++ b/e2e_test/infra/__init__.py @@ -69,6 +69,7 @@ ) from .run_eval import run_eval from .worker import Worker, start_workers, stop_workers +from .worker_pool import WorkerPool, cleanup_pool, get_pool __all__ = [ # Enums @@ -129,6 +130,10 @@ "Worker", "start_workers", "stop_workers", + # Session-scoped worker cache + "WorkerPool", + "get_pool", + "cleanup_pool", "MODEL_SPECS", # Gateway "Gateway", diff --git a/e2e_test/infra/model_specs.py b/e2e_test/infra/model_specs.py index 7ee0a3f70..c900566e1 100644 --- a/e2e_test/infra/model_specs.py +++ b/e2e_test/infra/model_specs.py @@ -51,7 +51,9 @@ def _resolve_model_path(hf_path: str) -> str: # Function calling specialist (larger, for Response API tests) "Qwen/Qwen2.5-14B-Instruct": { "model": _resolve_model_path("Qwen/Qwen2.5-14B-Instruct"), - "tp": 2, + # 14B BF16 weights (~28GB) fit on one H100/80GB; tp=1 avoids paying + # NCCL setup on every restart. Override via E2E_MODEL_TP_OVERRIDES. + "tp": 1, "features": ["chat", "streaming", "function_calling", "pythonic_tools"], "sglang_args": ["--context-length=16384"], # Faster startup, prevents memory issues }, @@ -89,7 +91,9 @@ def _resolve_model_path(hf_path: str) -> str: # GPT-OSS models (Harmony) "openai/gpt-oss-20b": { "model": _resolve_model_path("openai/gpt-oss-20b"), - "tp": 2, + # MXFP4-quantized MoE (~13GB weights) fits easily on one H100; tp=1 + # roughly halves worker startup vs tp=2. Override via E2E_MODEL_TP_OVERRIDES. + "tp": 1, "features": ["chat", "streaming", "reasoning", "harmony"], "vllm_args": [ "--structured-outputs-config", diff --git a/e2e_test/infra/worker_pool.py b/e2e_test/infra/worker_pool.py new file mode 100644 index 000000000..ea89d5946 --- /dev/null +++ b/e2e_test/infra/worker_pool.py @@ -0,0 +1,182 @@ +"""Session-scoped worker pool for E2E tests. + +Caches workers by ``(engine, model_id, mode, worker_type, count)`` so +consecutive test classes that need the same backend don't pay the multi- +minute worker startup cost on every class boundary. + +The pool holds at most one *active* key at a time. Switching keys evicts +(stops) the cached workers before starting the new set — required because +GPU resources are exclusive. Combined with the collection-ordering hook in +``fixtures/hooks.py`` (which clusters items by backend/model), this keeps +the worker alive across every test class that uses the same backend. + +PD-disaggregation paths (prefill+decode) don't cache: they hold multiple +workers concurrently and the caller manages teardown. But they still go +through ``acquire()`` so the pool can evict any cached regular worker +first — otherwise the PD launch would race a still-running cached worker +for the same GPUs. + +Lifecycle is managed via ``pytest_sessionfinish`` in ``fixtures/hooks.py``; +a module-level ``atexit`` handler covers the case where pytest exits +before that hook runs (SIGINT / ``pytest.exit``). +""" + +from __future__ import annotations + +import atexit +import logging +import threading + +from .constants import DEFAULT_STARTUP_TIMEOUT, ConnectionMode, WorkerType +from .worker import Worker, start_workers, stop_workers + +logger = logging.getLogger(__name__) + + +# Key is (engine, model_id, mode, worker_type, count). ``count`` is part of +# the key because a class asking for count=2 after a count=1 class on the +# same backend would otherwise reuse a 1-worker entry and run with the +# wrong topology. +_PoolKey = tuple[str, str, ConnectionMode, WorkerType, int] + + +class WorkerPool: + """One-slot worker cache shared across pytest classes. + + Not safe for concurrent use across pytest-xdist workers — each xdist + worker would need its own pool with non-overlapping GPU offsets. + Current CI runs sequentially on GPU runners, so a single-slot pool is + sufficient. + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + self._key: _PoolKey | None = None + self._workers: list[Worker] = [] + self._closed = False + + def acquire( + self, + *, + model_id: str, + engine: str, + mode: ConnectionMode = ConnectionMode.HTTP, + count: int = 1, + worker_type: WorkerType = WorkerType.REGULAR, + timeout: int = DEFAULT_STARTUP_TIMEOUT, + log_dir: str | None = None, + gpu_offset: int = 0, + ) -> list[Worker]: + """Return ``count`` healthy workers for the given key. + + Reuses the cached set when the key matches AND every cached worker + is still alive. Anything else (key mismatch, dead worker, + non-REGULAR worker_type) evicts and starts fresh. + + Raises whatever ``start_workers`` raises on launch failure; the + cache is left empty in that case. + + The lock is intentionally held across the blocking ``start_workers`` + call. CI runs sequentially today, so contention is a non-issue; if + pytest-xdist is ever introduced each worker should get its own pool + rather than competing for this one. + """ + with self._lock: + if self._closed: + raise RuntimeError("WorkerPool has been closed") + + # Non-REGULAR workers (PD prefill/decode) aren't cached, but we + # still have to release any cached regular worker first — it + # holds the GPUs the caller is about to claim. + if worker_type != WorkerType.REGULAR: + if self._key is not None: + logger.info( + "WorkerPool: evicting %s to free GPUs for non-REGULAR %s/%s", + self._key, + worker_type, + model_id, + ) + self._evict_locked() + return start_workers( + model_id=model_id, + engine=engine, + mode=mode, + count=count, + worker_type=worker_type, + timeout=timeout, + log_dir=log_dir, + gpu_offset=gpu_offset, + ) + + # REGULAR workers always start at gpu 0; ``gpu_offset`` is only + # meaningful for non-REGULAR (PD decode) callers. + key: _PoolKey = (engine, model_id, mode, worker_type, count) + + if self._key == key and all(w.is_alive() for w in self._workers): + logger.info( + "WorkerPool: reusing %d cached worker(s) for %s", + count, + key, + ) + return list(self._workers) + + if self._key is not None: + reason = "key mismatch" if self._key != key else "dead worker" + logger.info( + "WorkerPool: evicting %s to start %s (%s)", + self._key, + key, + reason, + ) + self._evict_locked() + + new_workers = start_workers( + model_id=model_id, + engine=engine, + mode=mode, + count=count, + worker_type=worker_type, + timeout=timeout, + log_dir=log_dir, + ) + self._key = key + self._workers = new_workers + return list(new_workers) + + def cleanup(self) -> None: + """Stop all cached workers. Idempotent; safe to call multiple times.""" + with self._lock: + self._evict_locked() + self._closed = True + + def _evict_locked(self) -> None: + if self._workers: + stop_workers(self._workers) + self._workers = [] + self._key = None + + +_POOL: WorkerPool | None = None +_POOL_LOCK = threading.Lock() + + +def get_pool() -> WorkerPool: + """Return the session-wide worker pool, creating it on first use.""" + global _POOL + with _POOL_LOCK: + if _POOL is None or _POOL._closed: + _POOL = WorkerPool() + return _POOL + + +def cleanup_pool() -> None: + """Tear down the session-wide pool if it exists. Called from session-end hook.""" + with _POOL_LOCK: + if _POOL is not None: + _POOL.cleanup() + + +# Register the module-level cleanup once at import time. Calling it after +# the pool has already been torn down (by ``pytest_sessionfinish``) is a +# no-op — ``cleanup_pool`` short-circuits when the slot is empty. +atexit.register(cleanup_pool) diff --git a/e2e_test/responses/conftest.py b/e2e_test/responses/conftest.py index c8951ce37..7c5c86703 100644 --- a/e2e_test/responses/conftest.py +++ b/e2e_test/responses/conftest.py @@ -282,30 +282,33 @@ def _start_local_grpc_gateway_with_mcp( ): """Launch a local gRPC worker + gateway wired to the mock MCP server. - Returns ``(gateway, client, workers, model_path)``. Caller is - responsible for teardown (``gateway.shutdown()`` + ``stop_workers``). + Returns ``(gateway, client, workers, model_path)``. Workers come from + the session-scoped pool (``infra.worker_pool``), so the caller MUST + NOT stop them on teardown — only ``gateway.shutdown()`` is required. Skips (not fails) when worker startup fails — CI runs without GPUs would otherwise poison every engine-parametrized suite. """ from infra import ConnectionMode, Gateway from infra.model_specs import get_model_spec - from infra.worker import start_workers, stop_workers + from infra.worker_pool import get_pool try: - workers = start_workers(model_id, engine, mode=ConnectionMode.GRPC, count=1) + workers = get_pool().acquire( + model_id=model_id, + engine=engine, + mode=ConnectionMode.GRPC, + count=1, + ) except Exception as e: pytest.skip(f"gRPC {engine} worker for {model_id} not available: {e}") - # Everything from this point onward must clean up ``workers`` on any - # exception — ``get_model_spec`` / ``Gateway()`` / ``gateway.start`` / - # ``openai.OpenAI`` each have independent failure modes, and leaking - # the GPU worker on init failure would strand quota until CI - # reclaimed it. - try: - worker = workers[0] - model_path = get_model_spec(model_id)["model"] + # Worker stays in the pool on any downstream failure; the gateway is + # ours so we shut it down on any error from ``start()`` or client init. + worker = workers[0] + model_path = get_model_spec(model_id)["model"] - gateway = Gateway() + gateway = Gateway() + try: gateway.start( worker_urls=[worker.base_url], model_path=model_path, @@ -316,15 +319,9 @@ def _start_local_grpc_gateway_with_mcp( "memory", ], ) - except Exception: - stop_workers(workers) - raise - - try: client = openai.OpenAI(base_url=f"{gateway.base_url}/v1", api_key="not-used") except Exception: gateway.shutdown() - stop_workers(workers) raise return gateway, client, workers, model_path @@ -341,7 +338,7 @@ def gateway_with_mock_mcp_grpc_sglang( (R6.3). Skips when the worker can't start (no GPU available, missing model weights, etc.) rather than hard-failing. """ - gateway, client, workers, model_path = _start_local_grpc_gateway_with_mcp( + gateway, client, _workers, model_path = _start_local_grpc_gateway_with_mcp( engine="sglang", model_id="openai/gpt-oss-20b", mcp_config_file=mock_mcp_config_file, @@ -349,10 +346,7 @@ def gateway_with_mock_mcp_grpc_sglang( try: yield gateway, client, mock_mcp_server, model_path finally: - from infra.worker import stop_workers - gateway.shutdown() - stop_workers(workers) @pytest.fixture(scope="class") @@ -365,7 +359,7 @@ def gateway_with_mock_mcp_grpc_vllm( Uses Llama-3.1-8B-Instruct, which flows through the regular (non- harmony) path (R6.4). Skips when the worker can't start. """ - gateway, client, workers, model_path = _start_local_grpc_gateway_with_mcp( + gateway, client, _workers, model_path = _start_local_grpc_gateway_with_mcp( engine="vllm", model_id="meta-llama/Llama-3.1-8B-Instruct", mcp_config_file=mock_mcp_config_file, @@ -373,7 +367,4 @@ def gateway_with_mock_mcp_grpc_vllm( try: yield gateway, client, mock_mcp_server, model_path finally: - from infra.worker import stop_workers - gateway.shutdown() - stop_workers(workers) diff --git a/e2e_test/responses/test_builtin_tools.py b/e2e_test/responses/test_builtin_tools.py index 98c3c21c9..f8dd03c75 100644 --- a/e2e_test/responses/test_builtin_tools.py +++ b/e2e_test/responses/test_builtin_tools.py @@ -258,7 +258,7 @@ def test_mixed_builtin_and_function_tools(self, model, api_client): @pytest.mark.engine("sglang") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.e2e @pytest.mark.model("openai/gpt-oss-20b") @pytest.mark.gateway(extra_args=["--history-backend", "memory"]) @@ -644,7 +644,7 @@ def find_first(event_type: str) -> int: @pytest.mark.engine("sglang") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.e2e @pytest.mark.model("openai/gpt-oss-20b") class TestBuiltinToolRoutingGrpc: @@ -717,7 +717,7 @@ def test_response_tools_shows_original_type(self, gateway_with_mcp_config_grpc): @pytest.mark.engine("sglang") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.e2e @pytest.mark.model("openai/gpt-oss-20b") class TestWebSearchStreamingEventsGrpc: diff --git a/e2e_test/responses/test_image_generation.py b/e2e_test/responses/test_image_generation.py index e2c7848b8..14e0fb0dd 100644 --- a/e2e_test/responses/test_image_generation.py +++ b/e2e_test/responses/test_image_generation.py @@ -503,21 +503,17 @@ class TestImageGenerationCloud(_ImageGenerationAssertions): _fixture_name = "gateway_with_mock_mcp_cloud" -# ``gpu(2)`` on the gRPC classes matches the ``e2e-2gpu-responses`` job in -# ``.github/workflows/pr-test-rust.yml`` (``gpu_tier: "2"``). Using -# ``gpu(1)`` would filter these tests out at collection time because -# ``pytest_collection_modifyitems`` in ``e2e_test/fixtures/hooks.py`` does -# a strict ``gpu_count == E2E_GPU_TIER`` comparison, and no 1-GPU CI lane -# currently runs the Responses directory. +# ``gpu(1)`` on the gRPC classes matches the ``e2e-1gpu-responses`` job in +# ``.github/workflows/pr-test-rust.yml`` (``gpu_tier: "1"``). Both +# ``gpt-oss-20b`` and ``Llama-3.1-8B-Instruct`` are tp=1 in MODEL_SPECS. # -# The ``e2e-2gpu-responses`` job runs ``engine=sglang`` only — the vllm -# class is a workflow-level gap (no current CI job pairs ``engine=vllm`` -# with ``e2e_test/responses`` at tier 2). That's documented in the PR -# body as a follow-up. +# The CI job runs ``engine=sglang`` only — the vllm class is a workflow- +# level gap (no current CI job pairs ``engine=vllm`` with +# ``e2e_test/responses``). Documented as a follow-up. @pytest.mark.engine("sglang") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.e2e @pytest.mark.model("openai/gpt-oss-20b") class TestImageGenerationGrpcSglang(_ImageGenerationAssertions): @@ -531,7 +527,7 @@ class TestImageGenerationGrpcSglang(_ImageGenerationAssertions): @pytest.mark.engine("vllm") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.e2e @pytest.mark.model("meta-llama/Llama-3.1-8B-Instruct") class TestImageGenerationGrpcVllm(_ImageGenerationAssertions): diff --git a/e2e_test/responses/test_sampling_params.py b/e2e_test/responses/test_sampling_params.py index 2faf34994..e3f0a8bb2 100644 --- a/e2e_test/responses/test_sampling_params.py +++ b/e2e_test/responses/test_sampling_params.py @@ -94,7 +94,7 @@ def test_sampling_params_streaming(self, model, api_client): @pytest.mark.engine("sglang") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.e2e @pytest.mark.model("Qwen/Qwen2.5-14B-Instruct") @pytest.mark.gateway(extra_args=["--tool-call-parser", "qwen", "--history-backend", "memory"]) @@ -104,7 +104,7 @@ class TestSamplingParamsLocal(_SamplingParamsBase): @pytest.mark.engine("sglang", "vllm", "trtllm") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.e2e @pytest.mark.model("openai/gpt-oss-20b") @pytest.mark.gateway(extra_args=["--history-backend", "memory"]) diff --git a/e2e_test/responses/test_state_management.py b/e2e_test/responses/test_state_management.py index ce8679f5d..d474eea7d 100644 --- a/e2e_test/responses/test_state_management.py +++ b/e2e_test/responses/test_state_management.py @@ -220,7 +220,7 @@ class TestStateManagementOracleCustom(_StateManagementCloudBase): @pytest.mark.engine("sglang") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.e2e @pytest.mark.model("Qwen/Qwen2.5-14B-Instruct") @pytest.mark.gateway(extra_args=["--tool-call-parser", "qwen", "--history-backend", "memory"]) @@ -329,7 +329,7 @@ def test_mutually_exclusive_parameters(self, model, api_client): @pytest.mark.engine("sglang", "vllm", "trtllm") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.e2e @pytest.mark.model("openai/gpt-oss-20b") @pytest.mark.gateway(extra_args=["--history-backend", "memory"]) diff --git a/e2e_test/responses/test_streaming_events.py b/e2e_test/responses/test_streaming_events.py index be06235de..d49689c2f 100644 --- a/e2e_test/responses/test_streaming_events.py +++ b/e2e_test/responses/test_streaming_events.py @@ -23,7 +23,7 @@ @pytest.mark.engine("sglang") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.e2e @pytest.mark.model("Qwen/Qwen2.5-14B-Instruct") @pytest.mark.gateway(extra_args=["--tool-call-parser", "qwen", "--history-backend", "memory"]) @@ -107,7 +107,7 @@ def test_output_item_event_emitted(self, model, api_client): @pytest.mark.engine("sglang", "vllm", "trtllm") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.e2e @pytest.mark.model("openai/gpt-oss-20b") @pytest.mark.gateway(extra_args=["--history-backend", "memory"]) diff --git a/e2e_test/responses/test_structured_output.py b/e2e_test/responses/test_structured_output.py index 359910942..7dede24c3 100644 --- a/e2e_test/responses/test_structured_output.py +++ b/e2e_test/responses/test_structured_output.py @@ -116,7 +116,7 @@ def test_structured_output_json_schema(self, model, api_client): @pytest.mark.engine("sglang", "vllm", "trtllm") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.e2e @pytest.mark.model("openai/gpt-oss-20b") @pytest.mark.gateway(extra_args=["--history-backend", "memory"]) @@ -215,7 +215,7 @@ def test_structured_output_json_schema(self, model, api_client): @pytest.mark.engine("sglang") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.e2e @pytest.mark.model("Qwen/Qwen2.5-14B-Instruct") @pytest.mark.gateway(extra_args=["--tool-call-parser", "qwen", "--history-backend", "memory"]) diff --git a/e2e_test/responses/test_tools_call.py b/e2e_test/responses/test_tools_call.py index f4e5bda92..ee124c82d 100644 --- a/e2e_test/responses/test_tools_call.py +++ b/e2e_test/responses/test_tools_call.py @@ -764,7 +764,7 @@ def _check_stream(events, expected_label): @pytest.mark.engine("sglang", "vllm", "trtllm") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.e2e @pytest.mark.model("openai/gpt-oss-20b") @pytest.mark.gateway(extra_args=["--history-backend", "memory"]) @@ -1129,7 +1129,7 @@ def test_mcp_multi_server_tool_call_streaming(self, model, api_client): @pytest.mark.engine("sglang") -@pytest.mark.gpu(2) +@pytest.mark.gpu(1) @pytest.mark.e2e @pytest.mark.model("Qwen/Qwen2.5-14B-Instruct") @pytest.mark.gateway(extra_args=["--tool-call-parser", "qwen", "--history-backend", "memory"]) diff --git a/scripts/ci_download_model.sh b/scripts/ci_download_model.sh index c1d9ef61d..67c95c7a4 100755 --- a/scripts/ci_download_model.sh +++ b/scripts/ci_download_model.sh @@ -19,12 +19,15 @@ RETRY_DELAY=30 resolve_models_for_tier() { local tier="$1" - # ROUTER_LOCAL_MODEL_PATH must be unset so model_specs doesn't resolve to local paths + # A tier-N runner has N GPUs; any model with tp <= N can run on it. + # PD-disaggregation jobs run on 2-GPU runners with tp=1 models, so an + # exact ``tp == tier`` match would leave them empty. + # ROUTER_LOCAL_MODEL_PATH must be unset so model_specs doesn't resolve to local paths. ROUTER_LOCAL_MODEL_PATH="" python3 -c " import sys from e2e_test.infra.model_specs import MODEL_SPECS for model_id, spec in MODEL_SPECS.items(): - if spec['tp'] == int(sys.argv[1]): + if spec['tp'] <= int(sys.argv[1]): print(model_id) " "$tier" }