diff --git a/configs/README.md b/configs/README.md index fdfd1a07..5784404c 100644 --- a/configs/README.md +++ b/configs/README.md @@ -37,6 +37,51 @@ python -m torchspec.train_entry --config configs/sglang_qwen3_8b.yaml training.l | `inference.sglang` | `tp_size`, `mem_fraction_static`, `extra_args` | SGLang engine settings (nested under inference) | | `mooncake` | `protocol`, `device_name` | Mooncake transfer engine settings | +## Custom Ray placement + +Use `training.placement_strategy: custom` when training and inference must run +on explicitly chosen Ray nodes. This is useful when the default `PACK` placement +would put actors on nodes with the wrong network locality, cache state, or GPU +partition. + +IP-based placement uses Ray's built-in `node:` resource and does not require +custom Ray labels: + +```yaml +training: + placement_strategy: custom + training_num_nodes: 1 + training_num_gpus_per_node: 8 + training_node_ips: + - 10.0.0.1 + +inference: + inference_num_gpus: 16 + inference_num_gpus_per_node: 8 + inference_node_ips: + - 10.0.0.2 + - 10.0.0.3 +``` + +Ray label selectors are also supported when your Ray version supports placement +group `bundle_label_selector`: + +```yaml +training: + placement_strategy: custom + training_node_selectors: + - {"torchspec/node": "trainer-0"} + +inference: + inference_node_selectors: + - {"torchspec/node": "infer-0"} + - {"torchspec/node": "infer-1"} +``` + +For each role, set either `*_node_ips` or `*_node_selectors`, not both. The +configured node order is preserved; for multi-node inference it determines the +engine actor order and therefore the `node_rank` passed to SGLang or vLLM. + ## SGLang engine configuration SGLang settings live under `inference.sglang` and are split into two tiers: diff --git a/docs/code_architecture.md b/docs/code_architecture.md index 43d2ac9c..e777be7a 100644 --- a/docs/code_architecture.md +++ b/docs/code_architecture.md @@ -12,7 +12,7 @@ torchspec/ ├── ray/ # Ray infrastructure (shared across all packages) │ ├── ray_actor.py # RayActor base class (GPU setup, network utils) │ ├── train_group.py # RayTrainGroup (manages training actor group) -│ └── placement_group.py # Placement group creation, GPU resource management +│ └── placement_group.py # Placement group creation, GPU resource management, custom node placement ├── controller/ # Async pipeline orchestration │ ├── training_controller.py # AsyncTrainingController (Ray actor) │ ├── inference_manager.py # AsyncInferenceManager (Ray actor) @@ -209,11 +209,14 @@ training: ttt_length: 7 # Speculative depth train_backend: fsdp fsdp_strategy: REPLICATE + placement_strategy: training_first # or inference_first/custom + training_node_ips: null # custom placement only inference: inference_engine_type: hf # or "sgl" inference_batch_size: 1 inference_num_gpus: 4 + inference_node_ips: null # custom placement only sglang: # nested under inference tp_size: 8 extra_args: # power-user passthrough to sgl.Engine @@ -258,7 +261,7 @@ python train.py --config base.yaml --config experiment.yaml training.learning_ra |--------|---------| | `torchspec/ray/ray_actor.py` | `RayActor` base class (GPU setup, IP/port utils, master addr negotiation) | | `torchspec/ray/train_group.py` | `RayTrainGroup` - Manages a group of training actors | -| `torchspec/ray/placement_group.py` | Placement group creation, GPU resource waiting, `create_placement_groups()`, `create_train_group()` | +| `torchspec/ray/placement_group.py` | Placement group creation, GPU resource waiting, custom node placement, `create_placement_groups()`, `create_train_group()` | ### Controller diff --git a/docs/ray.md b/docs/ray.md index 7fa36a77..896b42ab 100644 --- a/docs/ray.md +++ b/docs/ray.md @@ -35,12 +35,13 @@ Placement groups reserve GPUs for training and inference as a unit and place the | Mode | Training GPUs | Inference GPUs | Use case | |------|--------------|----------------|----------| -| Default (separate) | Dedicated PG | Dedicated PG | Production: no GPU contention | +| Default | Sliced from unified PG | Sliced from unified PG | Production: deterministic node-to-role assignment | +| `custom` | Sliced from custom unified PG | Sliced from custom unified PG | Production: explicit node choice with the same unified reservation semantics | | `colocate` | Shared PG | Shared PG | Dev: share GPUs between train & inference | | `debug_train_only` | Dedicated PG | Empty | Debug training without inference | | `debug_inference_only` | Empty | Dedicated PG | Debug inference without training | -Each placement group probes bundles with a temporary `InfoActor` to discover the actual (node IP, GPU ID) mapping, then sorts by (node, GPU ID) for deterministic ordering. +Each placement group probes bundles with a temporary `InfoActor` to discover the actual (node IP, GPU ID) mapping, then sorts by (node, GPU ID) for deterministic ordering. In `custom` mode, TorchSpec sorts by the configured node order first and by physical GPU ID within each selected node. ## Ray Cluster Setup @@ -134,6 +135,65 @@ The PACK placement strategy spreads them across nodes automatically. | `training.training_num_nodes` | 1 | Number of training nodes | | `training.training_num_gpus_per_node` | 1 | GPUs per training node | +### Custom node placement + +By default, TorchSpec creates a unified placement group with Ray's `PACK` +strategy, probes the resulting bundles, and assigns the ordered bundles to +training or inference according to `training.placement_strategy` +(`training_first` or `inference_first`). Set +`training.placement_strategy: custom` to explicitly choose the nodes for each +role while still reserving the non-colocated training and inference bundles in a +single unified placement group. + +IP-based placement uses Ray's per-node resource labels (`node:`) and does +not require custom Ray labels: + +```yaml +training: + placement_strategy: custom + training_num_nodes: 2 + training_num_gpus_per_node: 8 + training_node_ips: + - 10.0.0.1 + - 10.0.0.3 + +inference: + inference_num_gpus: 16 + inference_num_gpus_per_node: 8 + inference_node_ips: + - 10.0.0.2 + - 10.0.0.4 +``` + +Ray label selectors are also supported when the installed Ray version supports +placement group `bundle_label_selector`. Start Ray nodes with labels, then use +matching selectors in the config: + +```yaml +training: + placement_strategy: custom + training_num_nodes: 2 + training_num_gpus_per_node: 8 + training_node_selectors: + - {"torchspec/node": "trainer-0"} + - {"torchspec/node": "trainer-1"} + +inference: + inference_node_selectors: + - {"torchspec/node": "infer-0"} + - {"torchspec/node": "infer-1"} +``` + +The configured node order is preserved. For multi-node inference, this order +determines the order of inference engine actors and therefore the `node_rank` +passed to SGLang or vLLM. Within each selected node, bundles are ordered by the +actual GPU ID discovered by `InfoActor`. + +The number of configured training nodes must equal +`training.training_num_nodes`. The number of configured inference nodes must +match `ceil(inference.inference_num_gpus / inference.inference_num_gpus_per_node)`. +For each role, set only one of `*_node_ips` or `*_node_selectors`. + ### Inference across nodes (SglEngine multi-node TP) When a single model is too large for one node, SglEngine supports multi-node diff --git a/tests/test_placement_group.py b/tests/test_placement_group.py new file mode 100644 index 00000000..06a2ad3c --- /dev/null +++ b/tests/test_placement_group.py @@ -0,0 +1,283 @@ +from argparse import Namespace +import importlib.util +import sys +import types +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + + +repo_root = Path(__file__).resolve().parents[1] +torchspec_pkg = sys.modules.get("torchspec") +if torchspec_pkg is None and importlib.util.find_spec("torch") is None: + torchspec_pkg = types.ModuleType("torchspec") + torchspec_pkg.__path__ = [str(repo_root / "torchspec")] + torchspec_pkg.__package__ = "torchspec" + sys.modules["torchspec"] = torchspec_pkg + +ray_stub = types.ModuleType("ray") +ray_util_stub = types.ModuleType("ray.util") +ray_pg_stub = types.ModuleType("ray.util.placement_group") +ray_sched_stub = types.ModuleType("ray.util.scheduling_strategies") + + +def _remote(*args, **_kwargs): + if args and len(args) == 1 and callable(args[0]): + return args[0] + + def _decorator(obj): + return obj + + return _decorator + + +def _placement_group(*_args, **_kwargs): + return MagicMock(name="placement_group") + + +class _PlacementGroup: + pass + + +class _PlacementGroupSchedulingStrategy: + def __init__(self, **kwargs): + self.kwargs = kwargs + + +class _NodeAffinitySchedulingStrategy: + def __init__(self, **kwargs): + self.kwargs = kwargs + + +ray_stub.remote = _remote +ray_stub.ObjectRef = object +ray_stub.util = ray_util_stub +ray_pg_stub.PlacementGroup = _PlacementGroup +ray_pg_stub.placement_group = _placement_group +ray_sched_stub.PlacementGroupSchedulingStrategy = _PlacementGroupSchedulingStrategy +ray_sched_stub.NodeAffinitySchedulingStrategy = _NodeAffinitySchedulingStrategy +sys.modules["ray"] = ray_stub +sys.modules["ray.util"] = ray_util_stub +sys.modules["ray.util.placement_group"] = ray_pg_stub +sys.modules["ray.util.scheduling_strategies"] = ray_sched_stub + +train_group_stub = types.ModuleType("torchspec.ray.train_group") +train_group_stub.RayTrainGroup = object +sys.modules["torchspec.ray.train_group"] = train_group_stub + +from torchspec.ray.placement_group import ( # noqa: E402 + _NodeConstraint, + _build_custom_bundles, + _sort_probed_bundle_infos, + create_placement_groups, +) + + +def _make_args(**overrides): + defaults = dict( + placement_strategy="training_first", + debug_train_only=False, + debug_inference_only=False, + colocate=False, + training_num_nodes=1, + training_num_gpus_per_node=2, + inference_num_gpus=2, + inference_num_gpus_per_node=2, + training_node_ips=None, + inference_node_ips=None, + training_node_selectors=None, + inference_node_selectors=None, + ) + defaults.update(overrides) + return Namespace(**defaults) + + +def test_custom_ip_bundles_add_node_resources(): + bundles, selectors, node_groups = _build_custom_bundles( + "training", + [_NodeConstraint(ip="10.0.0.1"), _NodeConstraint(ip="10.0.0.2")], + total_gpus=3, + gpus_per_node=2, + ) + + assert bundles == [ + {"GPU": 1, "CPU": 1, "node:10.0.0.1": 0.001}, + {"GPU": 1, "CPU": 1, "node:10.0.0.1": 0.001}, + {"GPU": 1, "CPU": 1, "node:10.0.0.2": 0.001}, + ] + assert selectors == [{}, {}, {}] + assert node_groups == [0, 0, 1] + + +def test_custom_label_bundles_add_bundle_label_selectors(): + bundles, selectors, node_groups = _build_custom_bundles( + "inference", + [ + _NodeConstraint(label_selector=(("torchspec/node", "infer-0"),)), + _NodeConstraint(label_selector=(("torchspec/node", "infer-1"),)), + ], + total_gpus=3, + gpus_per_node=2, + ) + + assert bundles == [ + {"GPU": 1, "CPU": 1}, + {"GPU": 1, "CPU": 1}, + {"GPU": 1, "CPU": 1}, + ] + assert selectors == [ + {"torchspec/node": "infer-0"}, + {"torchspec/node": "infer-0"}, + {"torchspec/node": "infer-1"}, + ] + assert node_groups == [0, 0, 1] + + +def test_custom_bundles_validate_node_count(): + with pytest.raises(ValueError, match="expected 2 node"): + _build_custom_bundles( + "training", + [_NodeConstraint(ip="10.0.0.1")], + total_gpus=3, + gpus_per_node=2, + ) + + +def test_custom_bundle_sort_preserves_user_node_order_then_gpu_id(): + gpu_ids = [ + ("10.0.0.2", 7), + ("10.0.0.1", 1), + ("10.0.0.2", 0), + ("10.0.0.1", 0), + ] + + sorted_infos = _sort_probed_bundle_infos(gpu_ids, node_group_indices=[1, 0, 1, 0]) + + assert [info[0] for info in sorted_infos] == [3, 1, 2, 0] + + +def test_create_placement_groups_requires_custom_strategy_for_custom_fields(): + args = _make_args(training_node_ips=["10.0.0.1"]) + + with ( + patch("torchspec.ray.placement_group._ensure_ray_initialized"), + patch("torchspec.ray.placement_group._wait_for_gpu_resources") as wait_for_gpus, + pytest.raises(ValueError, match="placement_strategy=custom"), + ): + create_placement_groups(args) + + wait_for_gpus.assert_not_called() + + +def test_create_placement_groups_validates_custom_constraints_before_waiting_for_gpus(): + args = _make_args( + placement_strategy="custom", + training_num_nodes=2, + training_num_gpus_per_node=2, + training_node_ips=["10.0.0.1"], + inference_node_ips=["10.0.0.2"], + ) + + with ( + patch("torchspec.ray.placement_group._ensure_ray_initialized"), + patch("torchspec.ray.placement_group._wait_for_gpu_resources") as wait_for_gpus, + pytest.raises(ValueError, match="training custom placement expected 2 node"), + ): + create_placement_groups(args) + + wait_for_gpus.assert_not_called() + + +def test_create_placement_groups_custom_unified_uses_role_node_order(): + args = _make_args( + placement_strategy="custom", + training_num_nodes=1, + training_num_gpus_per_node=2, + inference_num_gpus=2, + inference_num_gpus_per_node=2, + training_node_ips=["10.0.0.1"], + inference_node_ips=["10.0.0.2"], + ) + fake_pg = MagicMock(name="pg") + + with ( + patch("torchspec.ray.placement_group._ensure_ray_initialized"), + patch("torchspec.ray.placement_group._wait_for_gpu_resources"), + patch( + "torchspec.ray.placement_group._create_placement_group", + return_value=(fake_pg, [0, 1, 2, 3], [0, 1, 0, 1]), + ) as create_pg, + ): + result = create_placement_groups(args) + + assert create_pg.call_count == 1 + kwargs = create_pg.call_args.kwargs + assert kwargs["bundles"][0]["node:10.0.0.1"] == 0.001 + assert kwargs["bundles"][2]["node:10.0.0.2"] == 0.001 + assert kwargs["node_group_indices"] == [0, 0, 1, 1] + assert result["training"] == (fake_pg, [0, 1], [0, 1]) + assert result["inference"] == (fake_pg, [2, 3], [0, 1]) + + +def test_create_placement_groups_custom_unified_allows_zero_inference_gpus(): + args = _make_args( + placement_strategy="custom", + training_num_nodes=1, + training_num_gpus_per_node=2, + inference_num_gpus=0, + inference_num_gpus_per_node=2, + training_node_ips=["10.0.0.1"], + inference_node_ips=None, + ) + fake_pg = MagicMock(name="pg") + + with ( + patch("torchspec.ray.placement_group._ensure_ray_initialized"), + patch("torchspec.ray.placement_group._wait_for_gpu_resources"), + patch( + "torchspec.ray.placement_group._create_placement_group", + return_value=(fake_pg, [0, 1], [0, 1]), + ) as create_pg, + ): + result = create_placement_groups(args) + + kwargs = create_pg.call_args.kwargs + assert kwargs["bundles"] == [ + {"GPU": 1, "CPU": 1, "node:10.0.0.1": 0.001}, + {"GPU": 1, "CPU": 1, "node:10.0.0.1": 0.001}, + ] + assert kwargs["node_group_indices"] == [0, 0] + assert result["training"] == (fake_pg, [0, 1], [0, 1]) + assert result["inference"] == (fake_pg, [], []) + + +def test_custom_colocate_uses_training_topology_for_inference_constraints(): + args = _make_args( + placement_strategy="custom", + colocate=True, + training_num_nodes=2, + training_num_gpus_per_node=4, + inference_num_gpus_per_node=8, + training_node_ips=None, + inference_node_ips=["10.0.0.1", "10.0.0.2"], + ) + fake_pg = MagicMock(name="pg") + + with ( + patch("torchspec.ray.placement_group._ensure_ray_initialized"), + patch("torchspec.ray.placement_group._wait_for_gpu_resources"), + patch( + "torchspec.ray.placement_group._create_placement_group", + return_value=(fake_pg, list(range(8)), list(range(8))), + ) as create_pg, + ): + result = create_placement_groups(args) + + kwargs = create_pg.call_args.kwargs + assert len(kwargs["bundles"]) == 8 + assert kwargs["bundles"][0]["node:10.0.0.1"] == 0.001 + assert kwargs["bundles"][4]["node:10.0.0.2"] == 0.001 + assert kwargs["node_group_indices"] == [0, 0, 0, 0, 1, 1, 1, 1] + assert result["training"] == (fake_pg, list(range(8)), list(range(8))) + assert result["inference"] == (fake_pg, list(range(8)), list(range(8))) diff --git a/torchspec/config/inference_config.py b/torchspec/config/inference_config.py index 33a440d8..f11f2f7b 100644 --- a/torchspec/config/inference_config.py +++ b/torchspec/config/inference_config.py @@ -115,6 +115,8 @@ class InferenceConfig: inference_num_gpus: Optional[int] = None inference_num_gpus_per_engine: int = 1 inference_num_gpus_per_node: int = 8 + inference_node_ips: Optional[list[str]] = None + inference_node_selectors: Optional[list[dict[str, str]]] = None last_hidden_states_prenorm: Optional[bool] = None max_sample_pool_size: int = 0 store_last_hidden_states: bool = True diff --git a/torchspec/config/train_config.py b/torchspec/config/train_config.py index 51e4ec83..c19c8106 100644 --- a/torchspec/config/train_config.py +++ b/torchspec/config/train_config.py @@ -103,8 +103,10 @@ class TrainingConfig: fsdp_reduce_dtype: str = "float32" # "float32" or "bfloat16" fsdp_strategy: str = "REPLICATE" # Controls which workload claims head-node GPUs first under PACK strategy. - # "training_first" (default) or "inference_first". Extensible to custom mappings later. + # "training_first" (default), "inference_first", or "custom". placement_strategy: str = "training_first" + training_node_ips: Optional[list[str]] = None + training_node_selectors: Optional[list[dict[str, str]]] = None compile_model: bool = False # torch.compile the full training model sp_ring_size: int = 1 sp_ulysses_size: int = 1 diff --git a/torchspec/ray/placement_group.py b/torchspec/ray/placement_group.py index 23362d23..5a80d00a 100644 --- a/torchspec/ray/placement_group.py +++ b/torchspec/ray/placement_group.py @@ -18,9 +18,14 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +from __future__ import annotations + +import math import os import socket import time +from collections.abc import Mapping, Sequence +from dataclasses import dataclass import ray from ray.util.placement_group import placement_group @@ -30,6 +35,37 @@ from torchspec.utils.logging import logger +# Ray exposes a tiny "node:" resource on each node. Requiring a fractional +# amount pins a bundle to that node without consuming a full logical resource. +_NODE_RESOURCE_EPSILON = 0.001 +_CUSTOM_PLACEMENT_FIELDS = ( + "training_node_ips", + "inference_node_ips", + "training_node_selectors", + "inference_node_selectors", +) + + +@dataclass(frozen=True) +class _NodeConstraint: + ip: str | None = None + label_selector: tuple[tuple[str, str], ...] = () + + @property + def selector_for_log(self) -> str: + if self.ip is not None: + return self.ip + return str(dict(self.label_selector)) + + def to_bundle_resource(self) -> dict[str, float]: + if self.ip is None: + return {} + return {_node_ip_resource(self.ip): _NODE_RESOURCE_EPSILON} + + def to_label_selector(self) -> dict[str, str]: + return dict(self.label_selector) + + @ray.remote(num_gpus=1) class InfoActor: def get_ip_and_gpu_id(self): @@ -37,7 +73,7 @@ def get_ip_and_gpu_id(self): def sort_key(x): - index, node_identifier, gpu_id = x + _index, node_identifier, gpu_id = x # Sort by node IP number and then by GPU ID try: # try to parse it as an IP address. @@ -57,10 +93,180 @@ def sort_key(x): return (node_ip_parts, gpu_id) -def _create_placement_group(num_gpus, strategy="PACK", name=None): +def _has_value(value) -> bool: + return value is not None and value != [] and value != {} + + +def _has_custom_placement_fields(args) -> bool: + return any(_has_value(getattr(args, field, None)) for field in _CUSTOM_PLACEMENT_FIELDS) + + +def _validate_custom_strategy_usage(args) -> None: + placement_strategy = getattr(args, "placement_strategy", "training_first") + if placement_strategy != "custom" and _has_custom_placement_fields(args): + raise ValueError( + "Custom placement fields require training.placement_strategy=custom. " + f"Got placement_strategy={placement_strategy!r}." + ) + + +def _as_list(value, field_name: str) -> list: + if value is None: + return [] + if isinstance(value, str) or not isinstance(value, Sequence): + raise ValueError(f"{field_name} must be a list, got {type(value).__name__}") + return list(value) + + +def _normalize_node_constraints( + args, role: str, *, required: bool = False +) -> list[_NodeConstraint]: + ip_field = f"{role}_node_ips" + selector_field = f"{role}_node_selectors" + node_ips = _as_list(getattr(args, ip_field, None), ip_field) + node_selectors = _as_list(getattr(args, selector_field, None), selector_field) + + if node_ips and node_selectors: + raise ValueError(f"Set only one of {ip_field} or {selector_field}, not both") + if not node_ips and not node_selectors: + if required: + raise ValueError( + f"training.placement_strategy=custom requires {ip_field} or {selector_field}" + ) + return [] + + if node_ips: + constraints = [] + for ip in node_ips: + if not isinstance(ip, str) or not ip: + raise ValueError(f"{ip_field} entries must be non-empty strings") + constraints.append(_NodeConstraint(ip=ip)) + return constraints + + constraints = [] + for selector in node_selectors: + if not isinstance(selector, Mapping) or not selector: + raise ValueError(f"{selector_field} entries must be non-empty mappings") + normalized = tuple(sorted((str(key), str(value)) for key, value in selector.items())) + constraints.append(_NodeConstraint(label_selector=normalized)) + return constraints + + +def _node_ip_resource(ip: str) -> str: + return f"node:{ip}" + + +def _expected_node_count(total_gpus: int, gpus_per_node: int, role: str) -> int: + if total_gpus < 0: + raise ValueError(f"{role} total GPUs must be non-negative, got {total_gpus}") + if total_gpus == 0: + return 0 + if gpus_per_node <= 0: + raise ValueError(f"{role} GPUs per node must be positive, got {gpus_per_node}") + return math.ceil(total_gpus / gpus_per_node) + + +def _build_custom_bundles( + role: str, + constraints: list[_NodeConstraint], + total_gpus: int, + gpus_per_node: int, +) -> tuple[list[dict[str, float]], list[dict[str, str]], list[int]]: + """Build Ray bundles for nodes in user-provided order. + + ``node_group_indices`` records the configured node ordinal for each bundle. + After Ray schedules the placement group, InfoActor probes actual nodes and + GPU ids; the ordinal lets us restore user node order while still sorting + GPUs within a node by physical GPU id. + """ + expected_nodes = _expected_node_count(total_gpus, gpus_per_node, role) + if len(constraints) != expected_nodes: + raise ValueError( + f"{role} custom placement expected {expected_nodes} node(s) for " + f"{total_gpus} GPU(s) with {gpus_per_node} GPU(s) per node, " + f"got {len(constraints)}" + ) + + bundles: list[dict[str, float]] = [] + bundle_label_selectors: list[dict[str, str]] = [] + node_group_indices: list[int] = [] + remaining = total_gpus + + for node_index, constraint in enumerate(constraints): + gpus_on_node = min(gpus_per_node, remaining) + remaining -= gpus_on_node + + for _ in range(gpus_on_node): + bundle: dict[str, float] = {"GPU": 1, "CPU": 1, **constraint.to_bundle_resource()} + bundles.append(bundle) + bundle_label_selectors.append(constraint.to_label_selector()) + node_group_indices.append(node_index) + + return bundles, bundle_label_selectors, node_group_indices + + +def _merge_bundle_label_selectors( + selectors: list[dict[str, str]], +) -> list[dict[str, str]] | None: + return selectors if any(selector for selector in selectors) else None + + +def _placement_group( + bundles: list[dict[str, float]], + *, + strategy: str, + name: str | None, + bundle_label_selector: list[dict[str, str]] | None = None, +): + kwargs = {"bundles": bundles, "strategy": strategy, "name": name} + if bundle_label_selector is not None: + kwargs["bundle_label_selector"] = bundle_label_selector + + try: + return placement_group(**kwargs) + except TypeError as e: + if bundle_label_selector is not None and "bundle_label_selector" in str(e): + raise RuntimeError( + "Ray bundle_label_selector is not supported by the installed Ray version. " + "Use training_node_ips/inference_node_ips or upgrade Ray." + ) from e + raise + + +def _sort_probed_bundle_infos(gpu_ids, node_group_indices: list[int] | None = None): + """Sort probed bundles by default topology or explicit user node order.""" + bundle_infos = [(i, gpu_ids[i][0], gpu_ids[i][1]) for i in range(len(gpu_ids))] + if node_group_indices is None: + return sorted(bundle_infos, key=sort_key) + if len(node_group_indices) != len(gpu_ids): + raise ValueError( + f"node_group_indices length ({len(node_group_indices)}) must match " + f"bundle count ({len(gpu_ids)})" + ) + return sorted(bundle_infos, key=lambda info: (node_group_indices[info[0]], info[2])) + + +def _create_placement_group( + num_gpus, + strategy="PACK", + name=None, + *, + bundles: list[dict[str, float]] | None = None, + bundle_label_selector: list[dict[str, str]] | None = None, + node_group_indices: list[int] | None = None, +): """Create a placement group with the specified number of GPUs.""" - bundles = [{"GPU": 1, "CPU": 1} for _ in range(num_gpus)] - pg = placement_group(bundles, strategy=strategy, name=name) + if bundles is None: + bundles = [{"GPU": 1, "CPU": 1} for _ in range(num_gpus)] + elif len(bundles) != num_gpus: + raise ValueError(f"num_gpus={num_gpus} does not match bundle count={len(bundles)}") + + pg = _placement_group( + bundles, + strategy=strategy, + name=name, + bundle_label_selector=bundle_label_selector, + ) num_bundles = len(bundles) ray.get(pg.ready()) @@ -79,8 +285,7 @@ def _create_placement_group(num_gpus, strategy="PACK", name=None): for actor in info_actors: ray.kill(actor) - bundle_infos = [(i, gpu_ids[i][0], gpu_ids[i][1]) for i in range(num_bundles)] - sorted_bundle_infos = sorted(bundle_infos, key=sort_key) + sorted_bundle_infos = _sort_probed_bundle_infos(gpu_ids, node_group_indices) pg_reordered_bundle_indices = [info[0] for info in sorted_bundle_infos] # Map from logical index -> physical GPU ID pg_reordered_gpu_ids = [gpu_ids[info[0]][1] for info in sorted_bundle_infos] @@ -105,8 +310,14 @@ def _ensure_ray_initialized(): ray.init(address=ray_address, ignore_reinit_error=True) logger.info(f"Connected to Ray cluster at {ray_address}") except ConnectionError: - logger.warning("No existing Ray cluster found, starting a local instance") - ray.init(ignore_reinit_error=True) + if ray_address == "auto": + logger.warning("No existing Ray cluster found, starting a local instance") + ray.init(ignore_reinit_error=True) + return + raise RuntimeError( + f"Failed to connect to Ray cluster at {ray_address}. " + "Refusing to fall back to a local Ray instance when RAY_ADDRESS is explicitly set." + ) from None def _get_expected_gpu_count(args) -> int: @@ -144,19 +355,257 @@ def _wait_for_gpu_resources(expected_gpus: int, timeout: int = 300, poll_interva ) +def _create_custom_role_placement_group( + args, + role: str, + *, + total_gpus: int, + gpus_per_node: int, + name: str, +): + constraints = _normalize_node_constraints(args, role, required=True) + bundles, bundle_label_selectors, node_group_indices = _build_custom_bundles( + role, + constraints, + total_gpus, + gpus_per_node, + ) + logger.info( + f"Creating custom {role} placement group with {total_gpus} GPU(s) on " + f"{[constraint.selector_for_log for constraint in constraints]}" + ) + return _create_placement_group( + total_gpus, + strategy="PACK", + name=name, + bundles=bundles, + bundle_label_selector=_merge_bundle_label_selectors(bundle_label_selectors), + node_group_indices=node_group_indices, + ) + + +def _create_role_placement_group( + args, + role: str, + *, + total_gpus: int, + gpus_per_node: int, + name: str, + custom: bool, +): + if custom: + return _create_custom_role_placement_group( + args, + role, + total_gpus=total_gpus, + gpus_per_node=gpus_per_node, + name=f"custom_{name}", + ) + return _create_placement_group(total_gpus, strategy="PACK", name=name) + + +def _create_custom_unified_placement_group(args, num_training_gpus: int, num_inference_gpus: int): + training_constraints = _normalize_node_constraints( + args, "training", required=num_training_gpus > 0 + ) + inference_constraints = _normalize_node_constraints( + args, "inference", required=num_inference_gpus > 0 + ) + + training_bundles, training_selectors, training_groups = _build_custom_bundles( + "training", + training_constraints, + num_training_gpus, + args.training_num_gpus_per_node, + ) + inference_bundles, inference_selectors, inference_groups = _build_custom_bundles( + "inference", + inference_constraints, + num_inference_gpus, + args.inference_num_gpus_per_node, + ) + + node_group_offset = len(training_constraints) + node_group_indices = training_groups + [ + node_group_offset + node_group_index for node_group_index in inference_groups + ] + bundles = training_bundles + inference_bundles + bundle_label_selector = _merge_bundle_label_selectors(training_selectors + inference_selectors) + + total_gpus = num_training_gpus + num_inference_gpus + logger.info( + "Creating custom unified placement group with " + f"{total_gpus} GPUs ({num_training_gpus} training + {num_inference_gpus} inference); " + f"training nodes={[constraint.selector_for_log for constraint in training_constraints]}, " + f"inference nodes={[constraint.selector_for_log for constraint in inference_constraints]}" + ) + + pg, sorted_bundle_indices, sorted_gpu_ids = _create_placement_group( + total_gpus, + strategy="PACK", + name="custom_unified_pg", + bundles=bundles, + bundle_label_selector=bundle_label_selector, + node_group_indices=node_group_indices, + ) + + training_bundle_indices = sorted_bundle_indices[:num_training_gpus] + training_gpu_ids = sorted_gpu_ids[:num_training_gpus] + inference_bundle_indices = sorted_bundle_indices[num_training_gpus:] + inference_gpu_ids = sorted_gpu_ids[num_training_gpus:] + + logger.info( + f"Placement (strategy=custom): " + f"training bundles={training_bundle_indices}, " + f"inference bundles={inference_bundle_indices}" + ) + + return { + "training": (pg, training_bundle_indices, training_gpu_ids), + "inference": (pg, inference_bundle_indices, inference_gpu_ids), + } + + +def _get_custom_colocated_constraints(args) -> tuple[str, list[_NodeConstraint]]: + training_constraints = _normalize_node_constraints(args, "training") + inference_constraints = _normalize_node_constraints(args, "inference") + if ( + training_constraints + and inference_constraints + and training_constraints != inference_constraints + ): + raise ValueError( + "custom colocate placement requires training and inference node constraints " + "to match, or only one role's constraints to be set" + ) + + if training_constraints: + role = "training" + constraints = training_constraints + elif inference_constraints: + role = "inference" + constraints = inference_constraints + else: + raise ValueError( + "training.placement_strategy=custom with colocate=True requires training_node_* " + "or inference_node_* constraints" + ) + + return role, constraints + + +def _validate_custom_placement_constraints(args) -> None: + if getattr(args, "placement_strategy", "training_first") != "custom": + return + + if args.debug_train_only: + num_training_gpus = args.training_num_nodes * args.training_num_gpus_per_node + training_constraints = _normalize_node_constraints( + args, "training", required=num_training_gpus > 0 + ) + _build_custom_bundles( + "training", + training_constraints, + num_training_gpus, + args.training_num_gpus_per_node, + ) + return + + if args.debug_inference_only: + num_inference_gpus = args.inference_num_gpus + inference_constraints = _normalize_node_constraints( + args, "inference", required=num_inference_gpus > 0 + ) + _build_custom_bundles( + "inference", + inference_constraints, + num_inference_gpus, + args.inference_num_gpus_per_node, + ) + return + + if args.colocate: + num_gpus = args.training_num_nodes * args.training_num_gpus_per_node + _role, constraints = _get_custom_colocated_constraints(args) + _build_custom_bundles( + "colocate", + constraints, + num_gpus, + args.training_num_gpus_per_node, + ) + return + + num_training_gpus = args.training_num_nodes * args.training_num_gpus_per_node + num_inference_gpus = args.inference_num_gpus + + training_constraints = _normalize_node_constraints( + args, "training", required=num_training_gpus > 0 + ) + _build_custom_bundles( + "training", + training_constraints, + num_training_gpus, + args.training_num_gpus_per_node, + ) + + inference_constraints = _normalize_node_constraints( + args, "inference", required=num_inference_gpus > 0 + ) + _build_custom_bundles( + "inference", + inference_constraints, + num_inference_gpus, + args.inference_num_gpus_per_node, + ) + + +def _create_custom_colocated_placement_group(args, num_gpus: int): + role, constraints = _get_custom_colocated_constraints(args) + # Colocate creates one shared placement group using the training topology. + # Either role's node constraints may select the nodes, but node count + # validation must use the topology that determines ``num_gpus``. + bundles, bundle_label_selectors, node_group_indices = _build_custom_bundles( + "colocate", + constraints, + num_gpus, + args.training_num_gpus_per_node, + ) + logger.info( + f"Creating custom colocated placement group with {num_gpus} GPU(s) " + f"using {role} constraints on " + f"{[constraint.selector_for_log for constraint in constraints]}" + ) + return _create_placement_group( + num_gpus, + strategy="PACK", + name="custom_colocate_pg", + bundles=bundles, + bundle_label_selector=_merge_bundle_label_selectors(bundle_label_selectors), + node_group_indices=node_group_indices, + ) + + def create_placement_groups(args): """Initialize Ray, wait for GPU resources, and create placement groups. This is the single entry point for all GPU placement setup. """ _ensure_ray_initialized() + _validate_custom_strategy_usage(args) + _validate_custom_placement_constraints(args) _wait_for_gpu_resources(_get_expected_gpu_count(args)) + placement_strategy = getattr(args, "placement_strategy", "training_first") if args.debug_train_only: num_training_gpus = args.training_num_nodes * args.training_num_gpus_per_node logger.info(f"Creating training placement group with {num_training_gpus} GPUs...") - training_pg, training_bundle_indices, training_gpu_ids = _create_placement_group( - num_training_gpus, strategy="PACK", name="training_pg" + training_pg, training_bundle_indices, training_gpu_ids = _create_role_placement_group( + args, + "training", + total_gpus=num_training_gpus, + gpus_per_node=args.training_num_gpus_per_node, + name="training_pg", + custom=placement_strategy == "custom", ) return { "training": (training_pg, training_bundle_indices, training_gpu_ids), @@ -166,8 +615,13 @@ def create_placement_groups(args): if args.debug_inference_only: num_inference_gpus = args.inference_num_gpus logger.info(f"Creating inference placement group with {num_inference_gpus} GPUs...") - inference_pg, inference_bundle_indices, inference_gpu_ids = _create_placement_group( - num_inference_gpus, strategy="PACK", name="inference_pg" + inference_pg, inference_bundle_indices, inference_gpu_ids = _create_role_placement_group( + args, + "inference", + total_gpus=num_inference_gpus, + gpus_per_node=args.inference_num_gpus_per_node, + name="inference_pg", + custom=placement_strategy == "custom", ) return { "training": (inference_pg, [], []), @@ -177,9 +631,12 @@ def create_placement_groups(args): if args.colocate: num_gpus = args.training_num_nodes * args.training_num_gpus_per_node logger.info(f"Creating colocated placement group with {num_gpus} GPUs...") - pg, bundle_indices, gpu_ids = _create_placement_group( - num_gpus, strategy="PACK", name="colocate_pg" - ) + if placement_strategy == "custom": + pg, bundle_indices, gpu_ids = _create_custom_colocated_placement_group(args, num_gpus) + else: + pg, bundle_indices, gpu_ids = _create_placement_group( + num_gpus, strategy="PACK", name="colocate_pg" + ) return { "training": (pg, bundle_indices, gpu_ids), "inference": (pg, bundle_indices, gpu_ids), @@ -189,6 +646,9 @@ def create_placement_groups(args): num_inference_gpus = args.inference_num_gpus total_gpus = num_training_gpus + num_inference_gpus + if placement_strategy == "custom": + return _create_custom_unified_placement_group(args, num_training_gpus, num_inference_gpus) + # Single PG ensures deterministic node-to-role assignment across restarts, # avoiding kernel/weight cache misses from random GPU shuffling. logger.info( @@ -200,8 +660,6 @@ def create_placement_groups(args): total_gpus, strategy="PACK", name="unified_pg" ) - placement_strategy = getattr(args, "placement_strategy", "training_first") - if placement_strategy == "training_first": training_bundle_indices = sorted_bundle_indices[:num_training_gpus] training_gpu_ids = sorted_gpu_ids[:num_training_gpus]