diff --git a/packages/nemo-evaluator-launcher/examples/slurm_eval_only_ray_cluster.yaml b/packages/nemo-evaluator-launcher/examples/slurm_eval_only_ray_cluster.yaml new file mode 100644 index 000000000..a339af928 --- /dev/null +++ b/packages/nemo-evaluator-launcher/examples/slurm_eval_only_ray_cluster.yaml @@ -0,0 +1,97 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# ============================================================================== +# Eval-only multi-node Ray cluster (deployment: none, agentic Gym evals) +# ============================================================================== +# Target use case: agentic Gym evals (GDPVal, SWE-bench, terminalbench) against +# an external policy endpoint, where Stirrup actor concurrency needs to spread +# over multiple CPU nodes to avoid per-node CPU/FUSE saturation during agent boot. +# +# Architecture (handled by the SLURM executor when eval_ray_cluster=true): +# - Allocates N CPU nodes. +# - On every node, runs the eval container and a per-node pre_cmd +# (apptainer install, FUSE deps). +# - Starts `ray start --head` on PRIMARY_NODE and `ray start --address=...` +# on the remaining nodes; waits for all N daemons to join. +# - `nemo-evaluator run_eval` runs ONLY on PRIMARY_NODE. Gym's +# `ng_e2e_collect_rollouts` connects to the Ray cluster via +# `++ray_head_node_address=$RAY_HEAD_NODE_ADDRESS`. Stirrup actors are +# scheduled across all N nodes; each actor FUSE-mounts its SIF locally. +# +# Policy routing mirrors canonical Ultra V3 GDPVal: +# ++use_absolute_ip=true +# ++policy_base_url={{target.api_endpoint.url}} +# `use_absolute_ip=true` rewrites policy_base_url so worker-node actors hit +# the head node's external IP. The adapter binds to 0.0.0.0:3825 because the +# executor exports `ADAPTER_HOST=0.0.0.0` when eval_ray_cluster=true. +# +# Prerequisite (verify before submitting): Gym's actor placement must be SPREAD, +# not Ray's default PACK. Otherwise actors fill the head node first and the +# multi-node setup buys nothing. Check Gym's `ng_e2e_collect_rollouts` source +# for the relevant placement_strategy flag and pass it via `common_params` if +# needed. +# ============================================================================== + +defaults: + - execution: slurm/default + - deployment: none + - _self_ + +execution: + hostname: ??? # SLURM login hostname + account: ??? + output_dir: ??? # Absolute path on compute nodes + partition: cpu + num_nodes: 8 + gpus_per_node: 0 + walltime: "06:00:00" + sbatch_extra_flags: + qos: cpu-normal + + # Multi-node eval Ray cluster + eval_ray_cluster: true + eval_ray_port: 6379 + eval_ray_dashboard_port: 8265 + eval_ray_ready_timeout: 600 + eval_per_node_pre_cmd: | + set -e + apt-get update -qq && apt-get install -y -qq squashfuse fuse3 git-lfs rpm2cpio cpio + if ! command -v apptainer >/dev/null 2>&1; then + curl -sSL https://raw.githubusercontent.com/apptainer/apptainer/main/tools/install-unprivileged.sh \ + | bash -s - /opt/apptainer + ln -sf /opt/apptainer/bin/apptainer /usr/local/bin/apptainer + fi + ln -sf /usr/local/sbin/apptainer /usr/local/bin/apptainer 2>/dev/null || true + mkdir -p /usr/local/var/apptainer/mnt/session + + mounts: + mount_home: false + evaluation: {} + # Canonical GDPVal apptainer/SIF mounts go here, e.g.: + # /lustre/.../mengxiwu/apptainer/bin: /usr/local/sbin + # /lustre/.../mengxiwu/apptainer/etc/apptainer: /usr/local/etc/apptainer + # /lustre/.../mengxiwu/apptainer/libexec/apptainer: /usr/local/libexec/apptainer + # /lustre/.../agronskiy/images/apptainer: /gdpval/sif + +target: + api_endpoint: + url: https://integrate.api.nvidia.com/v1/chat/completions + model_id: meta/llama-3.1-405b-instruct # Replace with your target model. + api_key_name: NVIDIA_API_KEY + +env_vars: + NVIDIA_API_KEY: host:NVIDIA_API_KEY + +evaluation: + # Placeholder task — replace with your agentic Gym benchmark (e.g. nemo_gym_agentic + # with a container and nemo_evaluator_config including common_params: + # ++use_absolute_ip=true + # ++policy_base_url={{target.api_endpoint.url}} + # ++policy_api_key={{target.api_endpoint.api_key_name}} + # ++policy_model_name={{target.api_endpoint.model_id}} + # ++ray_head_node_address=$RAY_HEAD_NODE_ADDRESS + # ++gdpval_stirrup_agent.responses_api_agents.stirrup_agent.concurrency=192 + # ) + tasks: + - name: AIME_2024 diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/execution/slurm/default.yaml b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/execution/slurm/default.yaml index 638e533d1..e7c02afe8 100644 --- a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/execution/slurm/default.yaml +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/configs/execution/slurm/default.yaml @@ -50,3 +50,27 @@ proxy: haproxy_port: 5009 health_check_path: /health health_check_status: 200 + +# Eval-only multi-node Ray cluster. Active only when deployment.type == "none" +# AND num_nodes > 1 AND eval_ray_cluster == true. Aux deployments are rejected +# in combination with eval_ray_cluster=true. +eval_ray_cluster: false # Opt in to spread eval Ray actors across all allocated nodes. +eval_ray_port: 6379 # Ray GCS port on the head node. +eval_ray_dashboard_port: 8265 # Ray dashboard port. +eval_ray_ready_timeout: 600 # Seconds to wait for all N Ray daemons to join. +eval_per_node_pre_cmd: null # Optional bash snippet run on every node inside the + # eval container before `ray start` (e.g. install + # apptainer / squashfuse). When set, requires + # NEMO_EVALUATOR_TRUST_PRE_CMD=1. +eval_ray_pre_start_cmd: null # Optional bash snippet prepended to each `ray start` + # (head + workers) and `ray status` wait handler. Use + # to put `ray` on PATH when the eval container ships + # it inside a venv (e.g. Gym: `source /opt/Gym/.venv/bin/activate`). +eval_ray_head_workload_cmd: null # Optional bash snippet that REPLACES `sleep infinity` + # in the ray-head's inner_cmd, so the head's bootstrap + # container runs the actual workload (e.g. bash a + # lustre-rendezvous deployment script that invokes + # ng_e2e_collect_rollouts). Driver + Ray actors then + # share one venv → eliminates pickle-ABI skew. Mirror + # canonical vllm_ray pre_cmd's lustre-rendezvous pattern. + # When unset, head stays `sleep infinity`. diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py index 31d9c3b63..285ad8f8f 100644 --- a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py @@ -673,6 +673,21 @@ def _create_slurm_sbatch_script( total_aux_nodes = sum(a.num_nodes for a in aux_deployments) total_num_nodes = cfg.execution.num_nodes + total_aux_nodes + eval_ray_cluster_enabled = ( + cfg.deployment.type == "none" + and cfg.execution.num_nodes > 1 + and cfg.execution.get("eval_ray_cluster", False) + ) + if eval_ray_cluster_enabled and has_aux_deployments: + raise ValueError( + "execution.eval_ray_cluster=true is not supported together with " + "auxiliary deployments. Aux deployments consume nodes from the same " + "allocation; supporting them requires per-pool nodelist arithmetic " + "that is not yet implemented. Either disable eval_ray_cluster or " + "run aux deployments as a separate job and pass their endpoints via " + "env_vars." + ) + s = "#!/bin/bash\n" # SBATCH headers @@ -702,7 +717,7 @@ def _create_slurm_sbatch_script( s += "#SBATCH --output {}\n".format(remote_task_subdir / "logs" / "slurm-%A.log") s += "\n" s += f'TASK_DIR="{str(remote_task_subdir)}"\n' - s += f'NEL_INVOCATION_ID="{invocation_id}"\n' + s += f'export NEL_INVOCATION_ID="{invocation_id}"\n' s += "\n" # Collect env vars using unified pipeline @@ -963,6 +978,73 @@ def _create_slurm_sbatch_script( # Export NEMO_EVALUATOR_DATASET_DIR environment variable s += f"export NEMO_EVALUATOR_DATASET_DIR={dataset_mount_container}\n\n" + # Eval-only Ray cluster bootstrap (multi-node spread for deployment.type=="none") + ray_cluster_is_unsafe = False + if eval_ray_cluster_enabled: + # Bind the adapter on all interfaces so worker-node Stirrup actors can + # reach it via $PRIMARY_IP. Passed through --container-env on every srun. + s += 'export ADAPTER_HOST="${ADAPTER_HOST:-0.0.0.0}"\n' + # Bind a per-node, cross-container-instance shared /tmp/ray so the head's + # ray-session files (node_ip_address.json, etc) are visible from the eval + # client's pyxis container instance. Without this, pyxis isolates /tmp + # per container instance and ray.init(address=...) on the head node + # raises "Can't find node_ip_address.json". + ray_shared_tmp_host = f"/tmp/ray-${{SLURM_JOB_ID}}" + s += "# Create per-node shared /tmp/ray dir on every allocated node\n" + s += ( + f"srun --overlap --jobid=$SLURM_JOB_ID --nodes={cfg.execution.num_nodes} " + f"--ntasks-per-node=1 mkdir -p {ray_shared_tmp_host}\n\n" + ) + # Kill leftover Ray daemons from prior runs on the same nodes (each + # compute node is reused across consecutive eval jobs). Without this, + # the new bootstrap's worker `ray start --address` collides with stale + # raylet/object-manager processes on random worker_ports (observed at + # cpu-0023 with port 17062 conflict, no overlap on canonical's fixed + # ports). pkill runs HOST-side (no --container-image), with the user's + # PID namespace, so it can SIGKILL the orphan processes that survived + # prior pyxis-container SIGTERMs. + s += "# Kill leftover Ray daemons from prior runs on the allocated nodes\n" + s += ( + # Bracket regex `[r]aylet` / `[g]cs_server` / `[r]ay::` matches the + # target processes but does NOT match the pkill command's own + # cmdline (which contains the literal `[r]aylet`, not `raylet`). + # Without this, `pkill -f raylet` self-kills via signal 9. + f"srun --overlap --jobid=$SLURM_JOB_ID --nodes={cfg.execution.num_nodes} " + f"--ntasks-per-node=1 bash -c " + f"'pkill -9 -u $USER -f \"[r]aylet\" 2>/dev/null; " + f"pkill -9 -u $USER -f \"[g]cs_server\" 2>/dev/null; " + f"pkill -9 -u $USER -f \"[r]ay::\" 2>/dev/null; " + f"sleep 1; true'\n\n" + ) + evaluation_mounts_list.append(f"{ray_shared_tmp_host}:/tmp/ray") + bootstrap_env_names = sorted( + set( + list(eval_env_vars.keys()) + + [ + "PRIMARY_IP", + "ADAPTER_HOST", + "EVAL_RAY_PORT", + "EVAL_RAY_DASH_PORT", + ] + ) + ) + # `.secrets.env` defines task-scoped names like `PERSIST_DELIVERABLES_DIR__NEMO_GYM_0`; + # `eval_reexport_cmd` re-exports them under their unsuffixed names. + # Pyxis's `--container-env ` reads the unsuffixed value from the + # sbatch shell, so the re-export must happen BEFORE the bootstrap srun. + # The eval-client srun re-exports again later — idempotent. + if eval_reexport_cmd: + s += f"{eval_reexport_cmd}\n\n" + ray_bootstrap = _generate_eval_ray_cluster_bootstrap( + cfg, + eval_image=eval_image, + eval_mounts_list=evaluation_mounts_list, + eval_env_var_names=bootstrap_env_names, + remote_task_subdir=remote_task_subdir, + ) + s += ray_bootstrap.cmd + ray_cluster_is_unsafe = ray_bootstrap.is_potentially_unsafe + eval_factory_command_struct = get_eval_factory_command( cfg, task, @@ -1031,23 +1113,28 @@ def _create_slurm_sbatch_script( if gpu_devices is not None: s += f"export NVIDIA_VISIBLE_DEVICES={gpu_devices}\n" extra_eval_env_names.append("NVIDIA_VISIBLE_DEVICES") - s += "srun --mpi pmix --overlap " - s += '--nodelist "${PRIMARY_NODE}" --nodes 1 --ntasks 1 ' - s += "--container-image {} ".format(eval_image) # Combine eval env vars with auxiliary endpoint env vars all_eval_env_names = sorted( set(list(eval_env_vars.keys()) + aux_extra_env_names + extra_eval_env_names) ) - if all_eval_env_names: - s += "--container-env {} ".format(",".join(all_eval_env_names)) - if not cfg.execution.get("mounts", {}).get("mount_home", True): - s += "--no-container-mount-home " - - s += "--container-mounts {} ".format(",".join(evaluation_mounts_list)) - s += "--output {} ".format(remote_task_subdir / "logs" / "client-%A.log") - s += "bash -c '\n" - s += eval_factory_command - s += "'\n\n" + if eval_ray_cluster_enabled: + all_eval_env_names = sorted( + set(all_eval_env_names + ["RAY_HEAD_NODE_ADDRESS", "ADAPTER_HOST"]) + ) + s += _srun_in_eval_container( + eval_image=eval_image, + eval_mounts_list=evaluation_mounts_list, + eval_env_var_names=all_eval_env_names, + inner_cmd=eval_factory_command, + nodelist='"${PRIMARY_NODE}"', + nodes=1, + ntasks=1, + output_log=str(remote_task_subdir / "logs" / "client-%A.log"), + no_container_mount_home=not cfg.execution.get("mounts", {}).get( + "mount_home", True + ), + ) + s += "\n" # terminate the server after all evaluation clients finish if cfg.deployment.type != "none": @@ -1065,6 +1152,13 @@ def _create_slurm_sbatch_script( s += f"kill ${aux.pid_var} # terminate the {aux.name} server to finish gracefully\n" s += "\n" + if eval_ray_cluster_enabled: + s += "# Stop eval Ray cluster\n" + s += 'kill "$RAY_HEAD_PID" 2>/dev/null || true\n' + if cfg.execution.num_nodes > 1: + s += 'kill "$RAY_WORKER_PID" 2>/dev/null || true\n' + s += "\n" + # auto-export ae_cfg = cfg.execution.get("auto_export") destinations: list = [] @@ -1100,6 +1194,7 @@ def _create_slurm_sbatch_script( eval_factory_command_struct.is_potentially_unsafe or deployment_is_unsafe or any(aux_is_unsafe.values()) + or ray_cluster_is_unsafe ) return CmdAndReadableComment( @@ -2306,6 +2401,342 @@ def _generate_haproxy_srun_command(cfg, remote_task_subdir): return s +def _srun_in_eval_container( + *, + eval_image: str, + eval_mounts_list: list[str], + eval_env_var_names: list[str], + inner_cmd: str, + nodelist: str, + nodes: int = 1, + ntasks: int | None = None, + ntasks_per_node: int | None = None, + output_log: str, + overlap: bool = True, + mpi_pmix: bool = True, + no_container_mount_home: bool = False, + background: bool = False, +) -> str: + """Emit a single ``srun --container-image=... bash -c ''`` invocation. + + Shared by the evaluation srun and the new Ray-cluster bootstrap sruns so the + eval container, env, and mounts stay identical across all of them. + + Caller passes ``inner_cmd`` verbatim — no trailing newline is auto-added. + Returns a string ending in ``\\n`` (or `` &\\n`` if ``background``). + """ + assert (ntasks is None) ^ (ntasks_per_node is None), ( + "exactly one of ntasks / ntasks_per_node must be set" + ) + parts: list[str] = ["srun"] + if mpi_pmix: + parts.append("--mpi pmix") + if overlap: + parts.append("--overlap") + parts.append(f"--nodelist {nodelist}") + parts.append(f"--nodes {nodes}") + if ntasks is not None: + parts.append(f"--ntasks {ntasks}") + else: + parts.append(f"--ntasks-per-node {ntasks_per_node}") + parts.append(f"--container-image {eval_image}") + if eval_env_var_names: + parts.append(f"--container-env {','.join(sorted(set(eval_env_var_names)))}") + if no_container_mount_home: + parts.append("--no-container-mount-home") + parts.append(f"--container-mounts {','.join(eval_mounts_list)}") + parts.append(f"--output {output_log}") + cmd = " ".join(parts) + " " + cmd += "bash -c '\n" + # Escape literal single quotes in inner_cmd so they don't close the outer + # ``bash -c '...'`` quoting. Standard POSIX trick: close, escaped literal + # single-quote, reopen. No-op when inner_cmd has no single quotes. + cmd += inner_cmd.replace("'", "'\\''") + cmd += "'" + if background: + cmd += " &" + cmd += "\n" + return cmd + + +def _get_wait_for_ray_cluster_handler( + *, + expected_nodes: int, + timeout: int, + head_pid_var: str = "RAY_HEAD_PID", + worker_pid_var: str | None = None, + head_ip_var: str = "PRIMARY_IP", + port_var: str = "EVAL_RAY_PORT", +) -> str: + """Generate the wait-for-Ray-cluster shell block. + + Inline (no srun, no container). Runs in the slurm batch shell on PRIMARY_NODE. + Checks PID liveness of the background head/worker sruns, then probes the + head's GCS via bash ``/dev/tcp`` (built-in). When the TCP port is reachable, + waits a short grace period for workers to attach and proceeds. + + Runs inline so it doesn't fight ``ray-head`` / ``ray-worker`` for pyxis + container extraction slots on PRIMARY_NODE (a separate ``--overlap`` srun + targeting the same node + same image observed deadlocking the head start). + Avoids the ``ray status`` dependency and the dashboard REST endpoint + altogether — the eval client itself confirms cluster shape via + ``ray.init(address=...)``. + + ``expected_nodes`` is recorded in the log message only (the inline check + confirms head reachability, not exact node count). + """ + pid_checks = [ + f'kill -0 "${head_pid_var}" 2>/dev/null ' + f'|| {{ echo "Ray head srun died"; exit 1; }}', + ] + if worker_pid_var: + pid_checks.append( + f'kill -0 "${worker_pid_var}" 2>/dev/null ' + f'|| {{ echo "Ray worker srun died"; exit 1; }}' + ) + pid_check = "; ".join(pid_checks) + + s = "# Wait for Ray head GCS to be reachable (inline TCP check)\n" + s += f"RAY_WAIT_TIMEOUT={timeout}\n" + s += "RAY_WAIT_DEADLINE=$(( $(date +%s) + $RAY_WAIT_TIMEOUT ))\n" + s += "while true; do\n" + s += f" {pid_check}\n" + s += ( + ' if (echo > "/dev/tcp/${' + head_ip_var + '}/${' + port_var + '}") ' + "2>/dev/null; then\n" + ) + s += ( + f' echo "Ray head GCS reachable at ${{{head_ip_var}}}:${{{port_var}}}"\n' + ) + s += " break\n" + s += " fi\n" + s += ' if [ "$(date +%s)" -gt "$RAY_WAIT_DEADLINE" ]; then\n' + s += ' echo "FATAL: timeout waiting for Ray head GCS"\n' + s += " exit 1\n" + s += " fi\n" + s += " sleep 5\n" + s += "done\n" + s += ( + "# Grace period for workers to attach (the eval client will\n" + f"# verify final cluster shape; targeting {expected_nodes} nodes).\n" + ) + s += "sleep 15\n" + return s + + +def _generate_eval_ray_cluster_bootstrap( + cfg: DictConfig, + eval_image: str, + eval_mounts_list: list[str], + eval_env_var_names: list[str], + remote_task_subdir: Path, +) -> CmdAndReadableComment: + """Bootstrap a Ray cluster across all allocated nodes for an eval-only flow. + + Activation guard (enforced by caller in ``_create_slurm_sbatch_script``): + - ``cfg.deployment.type == "none"`` + - ``cfg.execution.num_nodes > 1`` + - ``cfg.execution.eval_ray_cluster is True`` + - no auxiliary deployments (rejected at config-load time) + """ + n = cfg.execution.num_nodes + ray_port = cfg.execution.get("eval_ray_port", 6379) + dash_port = cfg.execution.get("eval_ray_dashboard_port", 8265) + ready_timeout = cfg.execution.get("eval_ray_ready_timeout", 600) + pre_cmd = (cfg.execution.get("eval_per_node_pre_cmd") or "").strip() + # Prepended to each ``ray start ...`` inner_cmd (head + workers). Use to put + # ``ray`` on PATH when the eval container ships it inside a venv (e.g. + # Gym's ``/opt/Gym/.venv/bin/activate``). Must end in a statement separator + # so we can chain ``ray stop ; ray start ...`` after it. + pre_start_cmd = (cfg.execution.get("eval_ray_pre_start_cmd") or "").strip() + # Also propagate cfg.evaluation.pre_cmd to bootstrap inner_cmds: the eval + # client's pre_cmd (e.g. `pip install nemo-evaluator-internal`) runs inside + # its container as `source pre_cmd.sh` from the eval factory command; Ray + # actors that fork from the ray-head/worker bootstrap containers need the + # SAME packages installed, otherwise pickled exceptions can't round-trip + # across the worker→client boundary (observed t12: `TypeError: + # APIStatusError.__init__() missing 2 required keyword-only arguments` + # when `evaluation.pre_cmd` upgrades httpx/openai on the client but not on + # workers). Replicating it guarantees both sides have identical deps. + eval_pre_cmd = ( + (cfg.evaluation.get("pre_cmd") if cfg.evaluation else None) or "" + ).strip() + # Optional: replace the head's idle `sleep infinity` with a workhorse script. + # Used when the driver (e.g. Gym's ng_e2e_collect_rollouts) needs to run + # INSIDE the bootstrap container (same Python/venv as Stirrup actors that + # fork from this container) — eliminates pickle-ABI skew across multi-venv + # container images. Canonical vllm_ray uses this pattern via deployment.pre_cmd + # to bash a lustre-rendezvous script (`/cache/huggingface/$NEL_INVOCATION_ID/ + # GYM_DEPLOYMENT_COMMAND.sh`). For our eval-only Ray cluster, the eval + # client's `command:` writes that script; the head's workload waits + bashes. + # When unset, the head stays `sleep infinity` (the eval client owns the work). + head_workload_cmd = ( + cfg.execution.get("eval_ray_head_workload_cmd") or "" + ).strip() + # Pyxis mounts host's $HOME into container as /root by default. uv-managed + # venvs ship a symlink from `/bin/python` to + # `/root/.local/share/uv/python/cpython-/bin/python` that's BAKED + # into the container image's /root. When pyxis overrides /root with host's + # $HOME, the baked python disappears and the symlink dangles — breaking + # `ray start` ("required file not found"). The eval srun avoids this by + # setting `--no-container-mount-home` (driven by cfg.execution.mounts.mount_home). + # Apply the same flag to ALL bootstrap sruns so head/workers see the same + # baked python as the eval client, eliminating the version-mismatch class + # (head daemon 3.12.3 from /opt/venv vs eval client 3.12.12 from the baked venv). + no_container_mount_home = not cfg.execution.get("mounts", {}).get( + "mount_home", True + ) + + s = "# ===== Eval-only Ray cluster bootstrap =====\n" + s += f"export EVAL_RAY_PORT={ray_port}\n" + s += f"export EVAL_RAY_DASH_PORT={dash_port}\n" + s += "PRIMARY_IP=$(getent hosts \"$PRIMARY_NODE\" | awk '{print $1}' | head -1)\n" + s += "export PRIMARY_IP\n" + s += 'echo "PRIMARY_IP=$PRIMARY_IP"\n\n' + + if n > 1: + s += "# Build worker nodelist (everything except PRIMARY_NODE)\n" + s += ( + 'WORKER_NODES="$(scontrol show hostnames "$SLURM_JOB_NODELIST"' + ' | tail -n +2 | paste -sd,)"\n' + ) + s += 'echo "WORKER_NODES=$WORKER_NODES"\n\n' + + is_potentially_unsafe = False + if pre_cmd or eval_pre_cmd or pre_start_cmd or head_workload_cmd: + # Any user-controlled shell that ends up running inside a container + # raises the same NEMO_EVALUATOR_TRUST_PRE_CMD gate. + is_potentially_unsafe = True + if pre_cmd: + s += "# Per-node container setup (apptainer install, FUSE deps, etc.)\n" + s += _srun_in_eval_container( + eval_image=eval_image, + eval_mounts_list=eval_mounts_list, + eval_env_var_names=eval_env_var_names, + inner_cmd=pre_cmd if pre_cmd.endswith("\n") else pre_cmd + "\n", + nodelist='"$SLURM_JOB_NODELIST"', + nodes=n, + ntasks_per_node=1, + output_log=str( + remote_task_subdir / "logs" / "ray-bootstrap-pre-cmd-%A-%n.log" + ), + no_container_mount_home=no_container_mount_home, + ) + s += "\n" + + s += "# Start Ray head on PRIMARY_NODE\n" + # The pre-start block runs once per bootstrap container (head + workers). + # Order: cfg.evaluation.pre_cmd first (mirrors what the eval client does + # before its run_eval) → eval_ray_pre_start_cmd second (user-controlled + # Ray-specific setup, e.g. venv activate). This makes the bootstrap + # container as close to the eval-client container as possible: same env + # vars (via --container-env), same mounts, same pre-install steps. Ray + # actors that fork from the bootstrap container inherit identical deps. + pre_start_block = "" + if eval_pre_cmd: + pre_start_block += eval_pre_cmd + if not pre_start_block.endswith("\n"): + pre_start_block += "\n" + if pre_start_cmd: + pre_start_block += pre_start_cmd + if not pre_start_block.endswith("\n"): + pre_start_block += "\n" + # Fixed component ports (canonical vllm_ray pattern, + # ultra_v3_deployment_gym.yaml:66). Avoids Ray's auto-allocation conflicting + # with stale daemons or other components on the same node. + ray_fixed_ports = ( + "--node-manager-port=8266 " + "--object-manager-port=8267 " + "--metrics-export-port=8269 " + "--dashboard-agent-grpc-port=8270 " + "--dashboard-agent-listen-port=8271 " + "--runtime-env-agent-port=8272 " + ) + # When the user supplies `eval_ray_head_workload_cmd`, the head's bootstrap + # container runs that *instead of* `sleep infinity` after `ray start`. This + # lets the driver (e.g. ng_e2e_collect_rollouts) execute in the same + # container as the Ray daemon — eliminating pickle-ABI skew between + # Stirrup-venv actors and a separately-spawned eval-client driver. + head_tail = head_workload_cmd if head_workload_cmd else "sleep infinity" + if not head_tail.endswith("\n"): + head_tail += "\n" + ray_head_inner = ( + "set -e\n" + + pre_start_block + + "ray stop 2>/dev/null || true\n" + "ray start --head " + "--port=$EVAL_RAY_PORT " + + ray_fixed_ports + + "--dashboard-host=0.0.0.0 " + "--dashboard-port=$EVAL_RAY_DASH_PORT " + "--node-ip-address=\"$(hostname -I | awk '{print $1}')\" " + "--num-cpus=$(nproc) " + "--temp-dir=/tmp/ray\n" + + head_tail + ) + s += _srun_in_eval_container( + eval_image=eval_image, + eval_mounts_list=eval_mounts_list, + eval_env_var_names=eval_env_var_names, + inner_cmd=ray_head_inner, + nodelist='"$PRIMARY_NODE"', + nodes=1, + ntasks=1, + output_log=str(remote_task_subdir / "logs" / "ray-head-%A.log"), + background=True, + no_container_mount_home=no_container_mount_home, + ) + s += "RAY_HEAD_PID=$!\n\n" + + worker_pid_var = None + if n > 1: + s += "# Start Ray workers on remaining nodes\n" + ray_worker_inner = ( + "set -e\n" + + pre_start_block + + "ray stop 2>/dev/null || true\n" + "ray start " + "--address=$PRIMARY_IP:$EVAL_RAY_PORT " + + ray_fixed_ports + + "--node-ip-address=\"$(hostname -I | awk '{print $1}')\" " + "--num-cpus=$(nproc) " + "--temp-dir=/tmp/ray\n" + "sleep infinity\n" + ) + s += _srun_in_eval_container( + eval_image=eval_image, + eval_mounts_list=eval_mounts_list, + eval_env_var_names=eval_env_var_names, + inner_cmd=ray_worker_inner, + nodelist='"$WORKER_NODES"', + nodes=n - 1, + ntasks_per_node=1, + output_log=str(remote_task_subdir / "logs" / "ray-worker-%A-%n.log"), + background=True, + no_container_mount_home=no_container_mount_home, + ) + s += "RAY_WORKER_PID=$!\n\n" + worker_pid_var = "RAY_WORKER_PID" + + s += "export RAY_HEAD_NODE_ADDRESS=$PRIMARY_IP:$EVAL_RAY_PORT\n" + s += 'echo "RAY_HEAD_NODE_ADDRESS=$RAY_HEAD_NODE_ADDRESS"\n\n' + + s += "# Wait for the Ray cluster to assemble\n" + s += _get_wait_for_ray_cluster_handler( + expected_nodes=n, + timeout=ready_timeout, + head_pid_var="RAY_HEAD_PID", + worker_pid_var=worker_pid_var, + ) + s += "\n" + + return CmdAndReadableComment( + cmd=s, + debug="# (Eval Ray cluster bootstrap)", + is_potentially_unsafe=is_potentially_unsafe, + ) + + def _collect_mount_paths(cfg: DictConfig) -> List[str]: """Collect all mount source paths from the configuration. diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/resources/config_templates/execution/slurm.yaml b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/resources/config_templates/execution/slurm.yaml index bcca75b31..8b7df9bfc 100644 --- a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/resources/config_templates/execution/slurm.yaml +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/resources/config_templates/execution/slurm.yaml @@ -12,3 +12,11 @@ execution: # walltime: "04:00:00" # Job time limit (HH:MM:SS) # partition: "batch" # SLURM partition name # gres: "gpu:1" # Generic resources (e.g., GPUs) + + # Eval-only multi-node Ray cluster (requires deployment: none and num_nodes > 1). + # Stirrup-style agents are spread across all allocated nodes via Ray; only the + # head runs `nemo-evaluator run_eval`. Combine with examples/slurm_eval_only_ray_cluster.yaml. + # eval_ray_cluster: true + # eval_ray_port: 6379 + # eval_per_node_pre_cmd: | + # apt update && apt install -y squashfuse fuse3 diff --git a/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py b/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py index 03c1a59d2..0ecd4c2f9 100644 --- a/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py +++ b/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py @@ -4168,3 +4168,205 @@ def test_export_mounts_not_in_collect(self): ) paths = _collect_mount_paths(cfg) assert "/lustre/cache/uv" not in paths + + +class TestEvalRayCluster: + """Tests for the cfg.execution.eval_ray_cluster mode (multi-node CPU Ray + cluster for deployment.type == 'none'). + """ + + @pytest.fixture + def base_config(self): + return { + "deployment": { + "type": "none", + "endpoints": {"health": "/health"}, + }, + "execution": { + "type": "slurm", + "output_dir": "/test/output", + "walltime": "01:00:00", + "account": "test-account", + "partition": "cpu", + "num_nodes": 8, + "num_instances": 1, + "ntasks_per_node": 1, + "subproject": "test-subproject", + "eval_ray_cluster": True, + "eval_ray_port": 6379, + "eval_ray_dashboard_port": 8265, + "eval_ray_ready_timeout": 600, + }, + "evaluation": {"env_vars": {}}, + "target": {"api_endpoint": {"url": "http://example/v1"}}, + } + + @pytest.fixture + def mock_task(self): + return OmegaConf.create({"name": "test_task"}) + + @pytest.fixture + def mock_dependencies(self): + with ( + patch( + "nemo_evaluator_launcher.executors.slurm.executor.load_tasks_mapping" + ) as mock_load_tasks, + patch( + "nemo_evaluator_launcher.executors.slurm.executor.get_task_definition_for_job" + ) as mock_get_task_def, + patch( + "nemo_evaluator_launcher.common.helpers.get_eval_factory_command" + ) as mock_get_eval_command, + patch( + "nemo_evaluator_launcher.common.helpers.get_served_model_name" + ) as mock_get_model_name, + ): + mock_load_tasks.return_value = {} + mock_get_task_def.return_value = { + "container": "test-eval-container:latest", + "endpoint_type": "openai", + "task": "test_task", + } + from nemo_evaluator_launcher.common.helpers import CmdAndReadableComment + + mock_get_eval_command.return_value = CmdAndReadableComment( + cmd="nemo-evaluator run_eval --test", debug="# Test command" + ) + mock_get_model_name.return_value = "test-model" + yield + + def _run(self, cfg_dict, mock_task): + cfg = OmegaConf.create(cfg_dict) + return _create_slurm_sbatch_script( + cfg=cfg, + task=mock_task, + eval_image="test-eval-container:latest", + remote_task_subdir=Path("/test/remote"), + invocation_id="test123", + job_id="test123.0", + task_idx=0, + ) + + def test_flag_off_no_bootstrap_emitted( + self, base_config, mock_task, mock_dependencies + ): + base_config["execution"]["eval_ray_cluster"] = False + result = self._run(base_config, mock_task) + assert "Eval-only Ray cluster bootstrap" not in result.cmd + assert "RAY_HEAD_NODE_ADDRESS" not in result.cmd + + def test_flag_on_emits_head_and_worker_sruns( + self, base_config, mock_task, mock_dependencies + ): + result = self._run(base_config, mock_task) + script = result.cmd + assert "Eval-only Ray cluster bootstrap" in script + assert "Start Ray head on PRIMARY_NODE" in script + assert "Start Ray workers on remaining nodes" in script + assert '--nodelist "$PRIMARY_NODE"' in script + assert '--nodelist "$WORKER_NODES"' in script + assert "--nodes 7" in script # N-1 workers when N=8 + assert "export RAY_HEAD_NODE_ADDRESS=$PRIMARY_IP:$EVAL_RAY_PORT" in script + assert "export EVAL_RAY_PORT=6379" in script + + def test_flag_on_single_node_skips_bootstrap( + self, base_config, mock_task, mock_dependencies + ): + # Gate requires num_nodes > 1; num_nodes=1 must skip the whole bootstrap. + base_config["execution"]["num_nodes"] = 1 + result = self._run(base_config, mock_task) + assert "Eval-only Ray cluster bootstrap" not in result.cmd + + def test_flag_with_deployment_not_none_skips_bootstrap( + self, base_config, mock_task, mock_dependencies + ): + # deployment.type != "none" disables the gate. + base_config["deployment"] = { + "type": "vllm", + "image": "test-image", + "command": "run", + "served_model_name": "m", + "port": 8000, + "endpoints": {"health": "/health"}, + } + result = self._run(base_config, mock_task) + assert "Eval-only Ray cluster bootstrap" not in result.cmd + + def test_per_node_pre_cmd_emitted_and_marks_unsafe( + self, base_config, mock_task, mock_dependencies + ): + base_config["execution"]["eval_per_node_pre_cmd"] = "apt install -y squashfuse" + result = self._run(base_config, mock_task) + assert "apt install -y squashfuse" in result.cmd + assert "Per-node container setup" in result.cmd + assert result.is_potentially_unsafe is True + + def test_adapter_host_exported_default_0_0_0_0( + self, base_config, mock_task, mock_dependencies + ): + result = self._run(base_config, mock_task) + assert 'export ADAPTER_HOST="${ADAPTER_HOST:-0.0.0.0}"' in result.cmd + + def test_eval_srun_env_includes_ray_head_and_adapter_host( + self, base_config, mock_task, mock_dependencies + ): + result = self._run(base_config, mock_task) + # All eval-srun --container-env lists include both vars when bootstrap is on. + # The eval-srun is the last one with `client-%A.log` output path. + assert "ADAPTER_HOST" in result.cmd + assert "RAY_HEAD_NODE_ADDRESS" in result.cmd + # Concretely, on the eval srun line: + eval_srun_line = [ + line for line in result.cmd.splitlines() if "client-%A.log" in line + ] + assert eval_srun_line, "eval srun output line not found" + assert "ADAPTER_HOST" in eval_srun_line[0] + assert "RAY_HEAD_NODE_ADDRESS" in eval_srun_line[0] + + def test_wait_handler_inline_tcp_check( + self, base_config, mock_task, mock_dependencies + ): + result = self._run(base_config, mock_task) + # Wait handler is now inline (no srun, no container) — bash /dev/tcp + # probe against $PRIMARY_IP:$EVAL_RAY_PORT. Liveness via $RAY_HEAD_PID + # and $RAY_WORKER_PID kill -0 checks. + assert ( + "Wait for Ray head GCS to be reachable (inline TCP check)" in result.cmd + ) + assert "/dev/tcp/${PRIMARY_IP}/${EVAL_RAY_PORT}" in result.cmd + assert "targeting 8 nodes" in result.cmd + assert 'kill -0 "$RAY_HEAD_PID"' in result.cmd + assert 'kill -0 "$RAY_WORKER_PID"' in result.cmd + + def test_cleanup_kills_ray_pids(self, base_config, mock_task, mock_dependencies): + result = self._run(base_config, mock_task) + assert 'kill "$RAY_HEAD_PID" 2>/dev/null || true' in result.cmd + assert 'kill "$RAY_WORKER_PID" 2>/dev/null || true' in result.cmd + + def test_ray_stop_idempotency_on_resume( + self, base_config, mock_task, mock_dependencies + ): + # Auto-resume re-runs the bootstrap; `ray stop` must precede `ray start`. + result = self._run(base_config, mock_task) + head_idx = result.cmd.index("ray start --head ") + # Find the most recent `ray stop` before that head start. + before = result.cmd[:head_idx] + assert "ray stop 2>/dev/null || true" in before + + def test_aux_deployment_combo_raises( + self, base_config, mock_task, mock_dependencies + ): + # Aux deployments + eval_ray_cluster=true must raise at config-load time. + base_config["auxiliary_deployments"] = { + "judge": { + "type": "vllm", + "image": "judge-img", + "command": "run", + "served_model_name": "judge-m", + "port": 8001, + "num_nodes": 1, + "endpoints": {"health": "/health"}, + } + } + with pytest.raises(ValueError, match="eval_ray_cluster=true"): + self._run(base_config, mock_task)