From ad960a1090fd102d80f5eea72f167ef8e75e8e71 Mon Sep 17 00:00:00 2001 From: lxzlxzliuxuzhao Date: Fri, 20 Mar 2026 12:31:52 +0000 Subject: [PATCH 1/3] Add CoreX BI-V150 compatibility support --- .../Hardware Support/corex_usage.md | 161 +++++++++++ .../Hardware Support/corex_usage_zh.md | 161 +++++++++++ ...gent_val_frozen_lake_multi_nodes_demo.yaml | 2 +- ...gent_val_frozen_lake_single_node_demo.yaml | 8 +- ...c_pipeline_frozen_lake_multi_nodes_demo.sh | 7 +- ...c_pipeline_frozen_lake_single_node_demo.sh | 7 +- roll/configs/base_config.py | 7 +- roll/distributed/scheduler/initialize.py | 37 ++- .../distributed/scheduler/resource_manager.py | 21 +- roll/distributed/strategy/vllm_strategy.py | 16 ++ roll/platforms/__init__.py | 41 ++- roll/platforms/corex.py | 9 + roll/platforms/platform.py | 252 +++++++++++++++++- roll/third_party/megatron/optimizer.py | 55 +++- roll/third_party/vllm/__init__.py | 28 +- .../distributed/scheduler/test_initialize.py | 24 +- .../scheduler/test_resource_manager.py | 24 ++ tests/platforms/test_platform_init.py | 47 ++++ tests/platforms/test_platform_memory.py | 95 +++++++ .../megatron/test_optimizer_compat.py | 69 +++++ 20 files changed, 1032 insertions(+), 39 deletions(-) create mode 100644 docs_roll/docs/User Guides/Hardware Support/corex_usage.md create mode 100644 docs_roll/docs/User Guides/Hardware Support/corex_usage_zh.md create mode 100644 roll/platforms/corex.py create mode 100644 tests/platforms/test_platform_init.py create mode 100644 tests/platforms/test_platform_memory.py create mode 100644 tests/third_party/megatron/test_optimizer_compat.py diff --git a/docs_roll/docs/User Guides/Hardware Support/corex_usage.md b/docs_roll/docs/User Guides/Hardware Support/corex_usage.md new file mode 100644 index 000000000..9a6196c2b --- /dev/null +++ b/docs_roll/docs/User Guides/Hardware Support/corex_usage.md @@ -0,0 +1,161 @@ +# ROLL x CoreX + +Last updated: 03/20/2026. + +This document records the CoreX-specific adaptations currently integrated in the local ROLL workspace for Iluvatar devices such as `Iluvatar BI-V150`. + +## Current Scope + +The current adaptation targets a CUDA-like CoreX software stack where: + +- `torch.cuda` is available +- Ray exposes the accelerator as `GPU` +- the NVML-compatible monitoring interface is provided by `libixml.so` +- vendor-patched `torch`, `megatron-core`, and `vllm` may diverge from upstream behavior + +This is a practical compatibility layer for running ROLL on the current machine. It is not yet a full official upstream hardware support package. + +## What Was Adapted + +### 1. Platform Detection + +ROLL previously treated `Iluvatar BI-V150` as an unknown CUDA device. The platform initialization logic now detects CoreX-style device names and creates a dedicated `CorexPlatform` instead of falling back to `UnknownPlatform`. + +Current detection keywords include: + +- `ILUVATAR` +- `COREX` +- `BI-V` + +Implementation: + +- `roll/platforms/corex.py` +- `roll/platforms/__init__.py` + +### 2. Safe CUDA Platform Initialization + +On this vendor stack, subprocesses can hit a state where: + +- `torch.cuda.is_available()` is effectively usable +- but `device_count() == 0` in the current visibility scope + +Directly calling `torch.cuda.get_device_name()` in that state can raise `AssertionError: Invalid device id`. + +The platform bootstrap now checks `device_count()` first and only queries the CUDA device name when at least one visible device exists. + +Implementation: + +- `roll/platforms/__init__.py` + +### 3. Ray GPU Resource Registration + +Ray did not automatically register CoreX GPUs as `GPU` resources on this machine, even though `torch` could see the devices. That caused the scheduler to believe the cluster had zero usable GPU nodes. + +ROLL now starts Ray with explicit accelerator resources so the cluster exposes the expected `GPU` count. + +Implementation: + +- `roll/distributed/scheduler/initialize.py` +- `roll/distributed/scheduler/resource_manager.py` + +### 4. NVML-Compatible Memory Monitoring Through `libixml.so` + +The vendor stack does not provide `libnvidia-ml.so.1`, so upstream `torch.cuda.device_memory_used()` fails when it tries to initialize NVML. However, CoreX exposes an NVML-compatible API through `libixml.so`. + +ROLL now: + +1. Tries the upstream `torch.cuda.device_memory_used()` +2. If that fails, tries to load an NVML-compatible library +3. Falls back in this order: + - standard NVML if present + - `libixml.so` +4. Calls: + - `nvmlInit_v2` / `nvmlInit` + - `nvmlDeviceGetHandleByIndex_v2` / `nvmlDeviceGetHandleByIndex` + - `nvmlDeviceGetMemoryInfo` +5. Maps logical device index through `CUDA_VISIBLE_DEVICES` before querying the physical handle + +An override is also supported: + +```bash +export ROLL_NVML_COMPAT_LIB=/path/to/libixml.so +``` + +Implementation: + +- `roll/platforms/platform.py` + +### 5. Megatron Optimizer Compatibility + +The installed vendor-patched `megatron-core` on this machine uses a newer `_get_param_groups_and_buffers()` signature than the previous ROLL compatibility layer expected. + +ROLL now inspects the runtime function signature and passes the new required arguments when needed, while staying compatible with older variants. + +Implementation: + +- `roll/third_party/megatron/optimizer.py` + +### 6. vLLM Sleep/Offload Compatibility + +The current CoreX stack does not expose the allocator backend required by vLLM sleep mode (`cumem`). When ROLL forced sleep mode on this machine, vLLM crashed during initialization. + +ROLL now: + +- disables vLLM sleep/offload automatically when the allocator backend is unavailable +- warns explicitly when this means `actor_infer` will stay resident on GPU +- warns again if the user is still using a colocated train/infer layout + +Implementation: + +- `roll/third_party/vllm/__init__.py` +- `roll/distributed/strategy/vllm_strategy.py` + +### 7. Single-Node Demo Layout Adjustment + +On this CoreX stack, colocating `actor_train` and `actor_infer` on the same GPU was not stable for the default frozen-lake agentic demo after vLLM sleep mode became unavailable. The main failure mode was OOM during the Megatron optimizer step. + +The single-node frozen-lake demo was adjusted to a 2-GPU disaggregated layout: + +- `actor_train` on GPU 0 +- `actor_infer` on GPU 1 +- lower vLLM `gpu_memory_utilization` + +Implementation: + +- `examples/agentic_demo/agent_val_frozen_lake_single_node_demo.yaml` + +## Validation Performed + +The following checks were performed on the current machine: + +- `torch.cuda.get_device_name(0)` returned `Iluvatar BI-V150` +- `ldconfig -p` exposed `libixml.so` +- direct `ctypes` calls to `libixml.so` succeeded for: + - `nvmlInit_v2` + - `nvmlDeviceGetHandleByIndex_v2` + - `nvmlDeviceGetMemoryInfo` +- `current_platform.device_memory_used()` successfully reported memory through the NVML-compatible path +- the frozen-lake single-node pipeline ran past step 0 and step 1 after the disaggregated layout change + +## Tests Added or Updated + +- `tests/platforms/test_platform_init.py` +- `tests/platforms/test_platform_memory.py` +- `tests/distributed/scheduler/test_initialize.py` +- `tests/distributed/scheduler/test_resource_manager.py` +- `tests/third_party/megatron/test_optimizer_compat.py` + +## Known Limitations + +- CoreX is currently integrated as a CUDA-like platform, not as a fully separate backend with vendor-specific kernels or scheduling behavior. +- vLLM sleep mode is still disabled on the current stack because the required allocator backend is unavailable. +- The current adaptation favors reliable execution over preserving the original single-GPU colocated demo topology. +- Existing long-running processes must be restarted to pick up the latest platform and monitoring changes. + +## Recommended Run Command + +```bash +conda activate ROLL +ray stop +bash examples/agentic_demo/run_agentic_pipeline_frozen_lake_single_node_demo.sh +``` diff --git a/docs_roll/docs/User Guides/Hardware Support/corex_usage_zh.md b/docs_roll/docs/User Guides/Hardware Support/corex_usage_zh.md new file mode 100644 index 000000000..841a4aaba --- /dev/null +++ b/docs_roll/docs/User Guides/Hardware Support/corex_usage_zh.md @@ -0,0 +1,161 @@ +# ROLL x CoreX + +最后更新:2026年3月20日 + +本文档记录了当前本地 ROLL 工作区中为象帝先设备(如 `Iluvatar BI-V150`)集成的 CoreX 特定适配。 + +## 当前范围 + +当前适配针对的是类似 CUDA 的 CoreX 软件栈,在该栈中: + +- `torch.cuda` 可用 +- Ray 将加速器暴露为 `GPU` +- 由 `libixml.so` 提供兼容 NVML 的监控接口 +- 供应商打过补丁的 `torch`、`megatron-core` 和 `vllm` 的行为可能与其上游版本不同 + +这是一个在当前机器上运行 ROLL 的实用兼容层。它还不是一个完整的官方上游硬件支持包。 + +## 已适配内容 + +### 1. 平台检测 + +ROLL 此前将 `Iluvatar BI-V150` 视为未知的 CUDA 设备。平台初始化逻辑现在能检测 CoreX 风格的设备名称,并创建一个专用的 `CorexPlatform`,而不是回退到 `UnknownPlatform`。 + +当前检测关键词包括: + +- `ILUVATAR` +- `COREX` +- `BI-V` + +实现: + +- `roll/platforms/corex.py` +- `roll/platforms/__init__.py` + +### 2. 安全的 CUDA 平台初始化 + +在此供应商栈上,子进程可能遇到以下状态: + +- `torch.cuda.is_available()` 实际可用 +- 但在当前可见性范围内 `device_count() == 0` + +在该状态下直接调用 `torch.cuda.get_device_name()` 可能会引发 `AssertionError: Invalid device id`。 + +平台引导程序现在会首先检查 `device_count()`,并且仅在存在至少一个可见设备时才查询 CUDA 设备名称。 + +实现: + +- `roll/platforms/__init__.py` + +### 3. Ray GPU 资源注册 + +即使 `torch` 可以看到设备,Ray 在此机器上也不会自动将 CoreX GPU 注册为 `GPU` 资源。这导致调度器认为集群拥有零个可用 GPU 节点。 + +ROLL 现在在启动 Ray 时会显式指定加速器资源,以便集群暴露预期的 `GPU` 计数。 + +实现: + +- `roll/distributed/scheduler/initialize.py` +- `roll/distributed/scheduler/resource_manager.py` + +### 4. 通过 `libixml.so` 实现 NVML 兼容的内存监控 + +供应商栈未提供 `libnvidia-ml.so.1`,因此上游的 `torch.cuda.device_memory_used()` 在尝试初始化 NVML 时会失败。但是,CoreX 通过 `libixml.so` 暴露了一个兼容 NVML 的 API。 + +ROLL 现在会: + +1. 首先尝试使用上游的 `torch.cuda.device_memory_used()` +2. 如果失败,则尝试加载一个兼容 NVML 的库 +3. 按以下顺序回退: + - 标准 NVML(如果存在) + - `libixml.so` +4. 调用: + - `nvmlInit_v2` / `nvmlInit` + - `nvmlDeviceGetHandleByIndex_v2` / `nvmlDeviceGetHandleByIndex` + - `nvmlDeviceGetMemoryInfo` +5. 在查询物理句柄之前,通过 `CUDA_VISIBLE_DEVICES` 映射逻辑设备索引 + +也支持通过以下方式覆盖库路径: + +```bash +export ROLL_NVML_COMPAT_LIB=/path/to/libixml.so +``` + +实现: + +- `roll/platforms/platform.py` + +### 5. Megatron 优化器兼容性 + +此机器上安装的供应商打过补丁的 `megatron-core` 使用了比之前 ROLL 兼容层所期望的更新的 `_get_param_groups_and_buffers()` 签名。 + +ROLL 现在会检查运行时函数的签名,并在需要时传递新的必需参数,同时保持与旧版本的兼容性。 + +实现: + +- `roll/third_party/megatron/optimizer.py` + +### 6. vLLM 休眠/卸载兼容性 + +当前的 CoreX 栈未暴露 vLLM 休眠模式所需的分配器后端(`cumem`)。当 ROLL 在此机器上强制启用休眠模式时,vLLM 会在初始化期间崩溃。 + +ROLL 现在会: + +- 当分配器后端不可用时,自动禁用 vLLM 休眠/卸载功能 +- 当这意味着 `actor_infer` 将常驻 GPU 时,发出明确的警告 +- 如果用户仍在使用了共置的训练/推理布局,再次发出警告 + +实现: + +- `roll/third_party/vllm/__init__.py` +- `roll/distributed/strategy/vllm_strategy.py` + +### 7. 单节点演示布局调整 + +在此 CoreX 栈上,在 vLLM 休眠模式变得不可用之后,将 `actor_train` 和 `actor_infer` 共置于同一 GPU 上对于默认的 frozen-lake 代理演示来说不稳定。主要故障模式是在 Megatron 优化器步骤中发生 OOM。 + +frozen-lake 单节点演示调整为 2-GPU 分离布局: + +- `actor_train` 在 GPU 0 上 +- `actor_infer` 在 GPU 1 上 +- 降低 vLLM 的 `gpu_memory_utilization` + +实现: + +- `examples/agentic_demo/agent_val_frozen_lake_single_node_demo.yaml` + +## 已执行的验证 + +在当前机器上执行了以下检查: + +- `torch.cuda.get_device_name(0)` 返回 `Iluvatar BI-V150` +- `ldconfig -p` 暴露了 `libixml.so` +- 对 `libixml.so` 的直接 `ctypes` 调用成功执行了: + - `nvmlInit_v2` + - `nvmlDeviceGetHandleByIndex_v2` + - `nvmlDeviceGetMemoryInfo` +- `current_platform.device_memory_used()` 成功通过 NVML 兼容路径报告了内存使用情况 +- 在更改为分离布局后,frozen-lake 单节点流水线成功运行了第 0 步和第 1 步 + +## 新增或更新的测试 + +- `tests/platforms/test_platform_init.py` +- `tests/platforms/test_platform_memory.py` +- `tests/distributed/scheduler/test_initialize.py` +- `tests/distributed/scheduler/test_resource_manager.py` +- `tests/third_party/megatron/test_optimizer_compat.py` + +## 已知限制 + +- CoreX 目前作为类似 CUDA 的平台集成,而非具有供应商特定内核或调度行为的完全独立后端。 +- vLLM 休眠模式在当前栈上仍被禁用,因为所需的分配器后端不可用。 +- 当前的适配优先保证可靠执行,而不是保留原始的单 GPU 共置演示拓扑。 +- 必须重启现有的长时间运行进程,才能应用最新的平台和监控更改。 + +## 推荐运行命令 + +```bash +conda activate ROLL +ray stop +bash examples/agentic_demo/run_agentic_pipeline_frozen_lake_single_node_demo.sh +``` \ No newline at end of file diff --git a/examples/agentic_demo/agent_val_frozen_lake_multi_nodes_demo.yaml b/examples/agentic_demo/agent_val_frozen_lake_multi_nodes_demo.yaml index 5e88e736a..86f30e078 100644 --- a/examples/agentic_demo/agent_val_frozen_lake_multi_nodes_demo.yaml +++ b/examples/agentic_demo/agent_val_frozen_lake_multi_nodes_demo.yaml @@ -107,7 +107,7 @@ actor_infer: strategy_args: strategy_name: vllm strategy_config: - gpu_memory_utilization: 0.8 + gpu_memory_utilization: 0.6 block_size: 16 load_format: auto device_mapping: list(range(0,4)) diff --git a/examples/agentic_demo/agent_val_frozen_lake_single_node_demo.yaml b/examples/agentic_demo/agent_val_frozen_lake_single_node_demo.yaml index b4435dd44..00a77c40e 100644 --- a/examples/agentic_demo/agent_val_frozen_lake_single_node_demo.yaml +++ b/examples/agentic_demo/agent_val_frozen_lake_single_node_demo.yaml @@ -37,7 +37,7 @@ checkpoint_config: type: file_system output_dir: ./output/render -num_gpus_per_node: 1 +num_gpus_per_node: 2 max_steps: 100 save_steps: 10000 @@ -107,10 +107,10 @@ actor_infer: strategy_args: strategy_name: vllm strategy_config: - gpu_memory_utilization: 0.8 + gpu_memory_utilization: 0.6 block_size: 16 load_format: auto - device_mapping: list(range(0,1)) + device_mapping: list(range(1,2)) infer_batch_size: 1 reference: @@ -157,4 +157,4 @@ custom_envs: SokobanDifferentGridVocab: ${custom_env.SokobanDifferentGridVocab} FrozenLake: - ${custom_env.FrozenLake} \ No newline at end of file + ${custom_env.FrozenLake} diff --git a/examples/agentic_demo/run_agentic_pipeline_frozen_lake_multi_nodes_demo.sh b/examples/agentic_demo/run_agentic_pipeline_frozen_lake_multi_nodes_demo.sh index e6e1b83e4..6c14dd476 100755 --- a/examples/agentic_demo/run_agentic_pipeline_frozen_lake_multi_nodes_demo.sh +++ b/examples/agentic_demo/run_agentic_pipeline_frozen_lake_multi_nodes_demo.sh @@ -1,7 +1,8 @@ #!/bin/bash set +x -ROLL_PATH="/workspace/ROLL-main" -CONFIG_PATH=$(basename $(dirname $0)) +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROLL_PATH="$(cd "${SCRIPT_DIR}/../.." && pwd)" +CONFIG_PATH="$(basename "${SCRIPT_DIR}")" export PYTHONPATH="$ROLL_PATH:$PYTHONPATH" -python examples/start_agentic_pipeline.py --config_path $CONFIG_PATH --config_name agent_val_frozen_lake_multi_nodes_demo +python "$ROLL_PATH/examples/start_agentic_pipeline.py" --config_path "$CONFIG_PATH" --config_name agent_val_frozen_lake_multi_nodes_demo diff --git a/examples/agentic_demo/run_agentic_pipeline_frozen_lake_single_node_demo.sh b/examples/agentic_demo/run_agentic_pipeline_frozen_lake_single_node_demo.sh index 74a4f3fdf..8800efbcc 100755 --- a/examples/agentic_demo/run_agentic_pipeline_frozen_lake_single_node_demo.sh +++ b/examples/agentic_demo/run_agentic_pipeline_frozen_lake_single_node_demo.sh @@ -1,7 +1,8 @@ #!/bin/bash set +x -ROLL_PATH="/workspace/ROLL-main" -CONFIG_PATH=$(basename $(dirname $0)) +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROLL_PATH="$(cd "${SCRIPT_DIR}/../.." && pwd)" +CONFIG_PATH="$(basename "${SCRIPT_DIR}")" export PYTHONPATH="$ROLL_PATH:$PYTHONPATH" -python examples/start_agentic_pipeline.py --config_path $CONFIG_PATH --config_name agent_val_frozen_lake_single_node_demo +python "$ROLL_PATH/examples/start_agentic_pipeline.py" --config_path "$CONFIG_PATH" --config_name agent_val_frozen_lake_single_node_demo diff --git a/roll/configs/base_config.py b/roll/configs/base_config.py index 80949874c..f5a753d46 100644 --- a/roll/configs/base_config.py +++ b/roll/configs/base_config.py @@ -320,7 +320,12 @@ def __post_init__(self): os.environ.update(self.system_envs) from ..platforms import current_platform - self.num_gpus_per_node = current_platform.device_count() + available_gpus_per_node = current_platform.device_count() + if available_gpus_per_node > 0: + assert self.num_gpus_per_node <= available_gpus_per_node, ( + f"num_gpus_per_node={self.num_gpus_per_node} exceeds visible devices on this node " + f"({available_gpus_per_node})." + ) if hasattr(self, 'actor_train') and isinstance(self.actor_train, WorkerConfig): self.actor_train.system_envs.update({k: v for k, v in self.system_envs.items() if k not in self.actor_train.system_envs}) diff --git a/roll/distributed/scheduler/initialize.py b/roll/distributed/scheduler/initialize.py index 877e4ef18..5e5b5ff52 100644 --- a/roll/distributed/scheduler/initialize.py +++ b/roll/distributed/scheduler/initialize.py @@ -1,9 +1,11 @@ +import json import os import subprocess import sys import time import ray +import torch from roll.distributed.scheduler.driver_utils import ( get_driver_rank, @@ -24,6 +26,30 @@ logger = get_logger() +def _get_ray_start_resource_args() -> str: + ray_device_key = getattr(current_platform, "ray_device_key", "") + if ray_device_key in ("", "CPU"): + return "" + + device_module = getattr(torch, current_platform.device_type, None) + device_count_fn = getattr(device_module, "device_count", None) + if not callable(device_count_fn): + logger.warning( + "Current platform %s does not expose a usable device_count() method. Ray will rely on auto-detection.", + current_platform.device_type, + ) + return "" + + device_count = device_count_fn() + if device_count <= 0: + return "" + + if ray_device_key == "GPU": + return f" --num-gpus={device_count}" + + return f" --resources='{json.dumps({ray_device_key: device_count})}'" + + def start_ray_cluster(): rank = get_driver_rank() world_size = get_driver_world_size() @@ -36,12 +62,19 @@ def start_ray_cluster(): logger.info("Ray cluster already initialized") return False + resource_args = _get_ray_start_resource_args() if rank == 0: - cmd = f"ray start --head --port={master_port} --node-name={node_name} --dashboard-port={dashboard_port}" + cmd = ( + f"ray start --head --port={master_port} --node-name={node_name} " + f"--dashboard-port={dashboard_port}{resource_args}" + ) else: # fix: 处理大规模下可能会出现的head/worker node创建顺序不一致问题 time.sleep(5) - cmd = f"ray start --address={master_addr}:{master_port} --node-name={node_name} --dashboard-port={dashboard_port}" + cmd = ( + f"ray start --address={master_addr}:{master_port} --node-name={node_name} " + f"--dashboard-port={dashboard_port}{resource_args}" + ) logger.info(f"Starting ray cluster: {cmd}") ret = subprocess.run(cmd, shell=True, capture_output=True) diff --git a/roll/distributed/scheduler/resource_manager.py b/roll/distributed/scheduler/resource_manager.py index ee2abc93f..b6b65d01f 100644 --- a/roll/distributed/scheduler/resource_manager.py +++ b/roll/distributed/scheduler/resource_manager.py @@ -3,6 +3,7 @@ from typing import Dict, List, Tuple, Optional import ray +import torch from ray.util.placement_group import PlacementGroup from roll.platforms import current_platform @@ -31,8 +32,24 @@ def __init__(self, num_gpus_per_node, num_nodes): if num_nodes is None: num_nodes = ray_num_nodes - assert num_nodes <= ray_num_nodes, (f"The Ray clusters(ray_num_nodes: {ray_num_nodes}) cannot meet the " - f"required number of nodes (`num_nodes`{num_nodes}).") + device_module = getattr(torch, current_platform.device_type, None) + device_count_fn = getattr(device_module, "device_count", None) + visible_device_count = device_count_fn() if callable(device_count_fn) else 0 + accelerator_hint = "" + if current_platform.ray_device_key != "CPU" and visible_device_count > 0 and available_gpu == 0: + accelerator_hint = ( + f" torch sees {visible_device_count} {current_platform.device_name} device(s) on this node, " + f"but Ray reports 0 {current_platform.ray_device_key} resources. " + f"This usually means the existing Ray cluster was started without explicitly registering accelerator " + f"resources. Stop the current Ray cluster and restart it with explicit resources, for example " + f"`ray stop` then `ray start --head --num-gpus={visible_device_count}`." + ) + + assert num_nodes <= ray_num_nodes, ( + f"The Ray clusters(ray_num_nodes: {ray_num_nodes}) cannot meet the required number of nodes " + f"(`num_nodes`{num_nodes}). required per-node {current_platform.ray_device_key} >= {num_gpus_per_node}, " + f"ray.available_resources()={available_resources}.{accelerator_hint}" + ) self.num_nodes = num_nodes self.gpu_per_node = num_gpus_per_node self.num_gpus = self.gpu_per_node * self.num_nodes diff --git a/roll/distributed/strategy/vllm_strategy.py b/roll/distributed/strategy/vllm_strategy.py index 6ee0f9d9d..857a89d1b 100644 --- a/roll/distributed/strategy/vllm_strategy.py +++ b/roll/distributed/strategy/vllm_strategy.py @@ -113,6 +113,18 @@ async def initialize(self, model_provider): os.environ["VLLM_PORT"] = str(vllm_port) self.model = await create_async_llm(resource_placement_groups=self.worker_config.resource_placement_groups, **vllm_config) + self.sleep_mode_enabled = bool(getattr(self.model, "roll_sleep_mode_enabled", False)) + if self.sleep_level > 0 and not self.sleep_mode_enabled: + logger.warning( + "vLLM sleep/offload is disabled because the allocator backend is unavailable. " + "actor_infer will stay resident on GPU." + ) + if getattr(self.worker.pipeline_config, "is_actor_infer_colocated", False): + logger.warning( + "actor_infer overlaps with training/reference GPUs while vLLM sleep/offload is unavailable. " + "This colocated layout can easily OOM on non-standard accelerator stacks; prefer disjoint " + "device_mapping for actor_train/reference and actor_infer." + ) if Version("0.15.0") <= Version(vllm.__version__): @@ -325,6 +337,10 @@ async def load_states(self, *args, **kwargs): async def offload_states(self, include=None, non_blocking=False): await self.model.reset_prefix_cache() + if not getattr(self, "sleep_mode_enabled", False): + gc.collect() + current_platform.empty_cache() + return if include is None or OffloadStateType.model_params in include: if self.is_model_in_gpu and self.worker.pipeline_config.is_actor_infer_colocated: await self.model.offload_states(self.sleep_level) diff --git a/roll/platforms/__init__.py b/roll/platforms/__init__.py index 6869621f4..561196ee5 100644 --- a/roll/platforms/__init__.py +++ b/roll/platforms/__init__.py @@ -1,6 +1,7 @@ import torch from .platform import Platform +from .corex import CorexPlatform from .cuda import CudaPlatform from .npu import NpuPlatform from .rocm import RocmPlatform @@ -13,20 +14,35 @@ logger = get_logger() +def _is_corex_device_name(device_name: str) -> bool: + normalized = device_name.upper() + return any(keyword in normalized for keyword in ("ILUVATAR", "COREX", "BI-V")) + + def _init_platform() -> Platform: """ Detect and initialize the appropriate platform based on available devices. Priority: - 1. CUDA (NVIDIA / AMD ROCm) + 1. CUDA (NVIDIA / AMD ROCm / CoreX-like stacks) 2. NPU (if torch_npu is installed) 3. CPU (fallback) Returns: An instance of a subclass of Platform corresponding to the detected hardware. """ - if torch.cuda.is_available(): - device_name = torch.cuda.get_device_name().upper() + cuda_device_count = 0 + try: + cuda_device_count = torch.cuda.device_count() + except Exception as exc: + logger.warning("Failed to query CUDA device count. Falling back to CPU/NPU detection. Error: %s", exc) + + if cuda_device_count > 0: + try: + device_name = torch.cuda.get_device_name(0).upper() + except Exception as exc: + logger.warning("Failed to query CUDA device name. Falling back to UnknownPlatform. Error: %s", exc) + return UnknownPlatform() logger.debug(f"Detected CUDA device: {device_name}") if "NVIDIA" in device_name: logger.debug("Initializing CUDA platform (NVIDIA).") @@ -34,17 +50,20 @@ def _init_platform() -> Platform: elif "AMD" in device_name: logger.debug("Initializing ROCm platform (AMD).") return RocmPlatform() + elif _is_corex_device_name(device_name): + logger.debug("Initializing CoreX platform.") + return CorexPlatform() logger.warning("Unrecognized CUDA device. Falling back to UnknownPlatform.") return UnknownPlatform() - else: - try: - import torch_npu # noqa: F401 - logger.debug("Detected torch_npu. Initializing NPU platform.") - return NpuPlatform() - except ImportError: - logger.debug("No supported accelerator detected. Initializing CPU platform.") - return CpuPlatform() + try: + import torch_npu # noqa: F401 + + logger.debug("Detected torch_npu. Initializing NPU platform.") + return NpuPlatform() + except ImportError: + logger.debug("No supported accelerator detected. Initializing CPU platform.") + return CpuPlatform() # Global singleton representing the current platform in use. diff --git a/roll/platforms/corex.py b/roll/platforms/corex.py new file mode 100644 index 000000000..235c9ae9a --- /dev/null +++ b/roll/platforms/corex.py @@ -0,0 +1,9 @@ +from .unknown import UnknownPlatform + + +class CorexPlatform(UnknownPlatform): + device_name: str = "COREX" + + @classmethod + def is_cuda(cls) -> bool: + return True diff --git a/roll/platforms/platform.py b/roll/platforms/platform.py index 9f9d4b105..ef8faa98a 100644 --- a/roll/platforms/platform.py +++ b/roll/platforms/platform.py @@ -1,9 +1,156 @@ -import torch +import ctypes +import ctypes.util import os +from threading import Lock + +import torch from ..utils.logging import get_logger logger = get_logger() +_device_memory_used_fallback_warned = set() +_device_memory_used_compat_warned = set() +_nvml_compat_lib = None +_nvml_compat_lib_name = None +_nvml_compat_lib_lock = Lock() + + +class _NvmlMemoryInfo(ctypes.Structure): + _fields_ = [ + ("total", ctypes.c_ulonglong), + ("free", ctypes.c_ulonglong), + ("used", ctypes.c_ulonglong), + ] + + +def _nvml_compatible_library_candidates(): + candidates = [] + + env_library = os.environ.get("ROLL_NVML_COMPAT_LIB") + if env_library: + candidates.append(env_library) + + for library_name in ( + ctypes.util.find_library("nvidia-ml"), + "libnvidia-ml.so.1", + ctypes.util.find_library("ixml"), + "libixml.so", + ): + if library_name and library_name not in candidates: + candidates.append(library_name) + + return candidates + + +def _load_nvml_compatible_library(): + global _nvml_compat_lib, _nvml_compat_lib_name + + if _nvml_compat_lib is not None: + return _nvml_compat_lib, _nvml_compat_lib_name + + with _nvml_compat_lib_lock: + if _nvml_compat_lib is not None: + return _nvml_compat_lib, _nvml_compat_lib_name + + errors = [] + for library_name in _nvml_compatible_library_candidates(): + try: + library = ctypes.CDLL(library_name) + except OSError as exc: + errors.append(f"{library_name}: {exc}") + continue + + init_fn = getattr(library, "nvmlInit_v2", None) or getattr(library, "nvmlInit", None) + if init_fn is None: + errors.append(f"{library_name}: missing nvmlInit_v2/nvmlInit") + continue + + init_fn.restype = ctypes.c_int + ret = init_fn() + if ret != 0: + error_string = _nvml_error_string(library, ret) + errors.append(f"{library_name}: nvmlInit failed with {ret} ({error_string})") + continue + + _nvml_compat_lib = library + _nvml_compat_lib_name = library_name + return _nvml_compat_lib, _nvml_compat_lib_name + + error_msg = "; ".join(errors) if errors else "no candidate libraries" + raise RuntimeError(f"failed to load a NVML-compatible library: {error_msg}") + + +def _nvml_error_string(library, retcode: int) -> str: + error_fn = getattr(library, "nvmlErrorString", None) + if error_fn is None: + return "unknown" + + error_fn.argtypes = [ctypes.c_int] + error_fn.restype = ctypes.c_char_p + try: + value = error_fn(retcode) + except Exception: + return "unknown" + + if not value: + return "unknown" + + try: + return value.decode("utf-8", errors="replace") + except Exception: + return str(value) + + +def _map_visible_device_index(device: int, env_var: str) -> int: + visible_devices = os.environ.get(env_var, "") + if not visible_devices: + return device + + parts = [part.strip() for part in visible_devices.split(",") if part.strip()] + if not parts or device < 0 or device >= len(parts): + return device + + try: + return int(parts[device]) + except ValueError: + return device + + +def _nvml_compatible_device_memory_used(device: int, env_var: str) -> tuple[int, str]: + library, library_name = _load_nvml_compatible_library() + physical_device = _map_visible_device_index(device, env_var) + + handle = ctypes.c_void_p() + get_handle = getattr(library, "nvmlDeviceGetHandleByIndex_v2", None) or getattr( + library, "nvmlDeviceGetHandleByIndex", None + ) + if get_handle is None: + raise RuntimeError(f"{library_name} does not export nvmlDeviceGetHandleByIndex") + + get_handle.argtypes = [ctypes.c_uint, ctypes.POINTER(ctypes.c_void_p)] + get_handle.restype = ctypes.c_int + ret = get_handle(physical_device, ctypes.byref(handle)) + if ret != 0: + raise RuntimeError( + f"{library_name} nvmlDeviceGetHandleByIndex({physical_device}) failed with {ret} " + f"({_nvml_error_string(library, ret)})" + ) + + memory_info = _NvmlMemoryInfo() + get_memory_info = getattr(library, "nvmlDeviceGetMemoryInfo", None) + if get_memory_info is None: + raise RuntimeError(f"{library_name} does not export nvmlDeviceGetMemoryInfo") + + get_memory_info.argtypes = [ctypes.c_void_p, ctypes.POINTER(_NvmlMemoryInfo)] + get_memory_info.restype = ctypes.c_int + ret = get_memory_info(handle, ctypes.byref(memory_info)) + if ret != 0: + raise RuntimeError( + f"{library_name} nvmlDeviceGetMemoryInfo({physical_device}) failed with {ret} " + f"({_nvml_error_string(library, ret)})" + ) + + return int(memory_info.used), library_name class Platform: @@ -89,6 +236,104 @@ def __getattr__(self, key: str): logger.warning("Current platform %s does not have '%s' attribute.", self.device_type, key) return None + @classmethod + def device_memory_used(cls, device=None) -> int: + device_module = getattr(torch, cls.device_type, None) + if device_module is None: + logger.warning("Current platform %s does not expose torch.%s.", cls.device_name, cls.device_type) + return 0 + + if device is None: + try: + device_count = device_module.device_count() + device = device_module.current_device() if device_count > 0 else 0 + except Exception: + device = 0 + + try: + return int(device_module.device_memory_used(device)) + except Exception as exc: + compat_used = cls._nvml_compatible_device_memory_used(device=device, primary_exc=exc) + if compat_used is not None: + return compat_used + return cls._fallback_device_memory_used(device_module=device_module, device=device, primary_exc=exc) + + @classmethod + def _nvml_compatible_device_memory_used(cls, device: int, primary_exc: Exception): + if cls.device_type != "cuda": + return None + + try: + used, library_name = _nvml_compatible_device_memory_used(device, cls.device_control_env_var) + except Exception: + return None + + warning_key = (cls.device_name, library_name) + if warning_key not in _device_memory_used_compat_warned: + _device_memory_used_compat_warned.add(warning_key) + logger.warning( + "torch.%s.device_memory_used is unavailable on platform %s (device %s, error: %s). " + "Using NVML-compatible library %s instead.", + cls.device_type, + cls.device_name, + device, + primary_exc, + library_name, + ) + + return used + + @classmethod + def _fallback_device_memory_used(cls, device_module, device: int, primary_exc: Exception) -> int: + fallback_candidates = ( + ("mem_get_info", lambda: _mem_get_info_used(device_module, device)), + ("memory_reserved", lambda: int(device_module.memory_reserved(device))), + ("memory_allocated", lambda: int(device_module.memory_allocated(device))), + ) + + errors = [] + for fallback_name, fallback_fn in fallback_candidates: + if not hasattr(device_module, fallback_name): + continue + try: + value = fallback_fn() + except Exception as fallback_exc: + errors.append(f"{fallback_name}: {fallback_exc}") + continue + + cls._warn_device_memory_used_fallback_once( + device=device, + fallback_name=fallback_name, + primary_exc=primary_exc, + ) + return value + + logger.warning( + "Failed to query device memory usage for platform %s on device %s. Primary error: %s. " + "Fallback errors: %s", + cls.device_name, + device, + primary_exc, + "; ".join(errors) if errors else "none", + ) + raise primary_exc + + @classmethod + def _warn_device_memory_used_fallback_once(cls, device: int, fallback_name: str, primary_exc: Exception) -> None: + warning_key = (cls.device_name, fallback_name) + if warning_key in _device_memory_used_fallback_warned: + return + + _device_memory_used_fallback_warned.add(warning_key) + logger.warning( + "device_memory_used is unavailable on platform %s (device %s, error: %s). " + "Falling back to torch.%s.", + cls.device_name, + device, + primary_exc, + fallback_name, + ) + @classmethod def is_cuda(cls) -> bool: return False @@ -201,3 +446,8 @@ def apply_ulysses_patch(cls) -> None: provide framework- and hardware-specific Ulysses patching. """ raise NotImplementedError + + +def _mem_get_info_used(device_module, device: int) -> int: + free, total = device_module.mem_get_info(device) + return int(total - free) diff --git a/roll/third_party/megatron/optimizer.py b/roll/third_party/megatron/optimizer.py index 888dc7a87..177a5a045 100644 --- a/roll/third_party/megatron/optimizer.py +++ b/roll/third_party/megatron/optimizer.py @@ -14,6 +14,37 @@ logger = logging.getLogger(__name__) +def _build_param_groups_and_buffers_kwargs( + *, + model_chunk_offset: int, + config: OptimizerConfig, + no_weight_decay_cond: Optional[Callable], + scale_lr_cond: Optional[Callable], + lr_mult: float, + filter_fn: Callable, + buffer_name: str, +) -> Dict: + signature = inspect.signature(_get_param_groups_and_buffers).parameters + kwargs = { + "model_chunk_offset": model_chunk_offset, + "config": config, + "filter_fn": filter_fn, + "buffer_name": buffer_name, + } + if "no_weight_decay_cond" in signature: + kwargs["no_weight_decay_cond"] = no_weight_decay_cond + if "scale_lr_cond" in signature: + kwargs["scale_lr_cond"] = scale_lr_cond + if "lr_mult" in signature: + kwargs["lr_mult"] = lr_mult + if "default_skip_embedding_weight_decay" in signature: + kwargs["default_skip_embedding_weight_decay"] = False + if "config_overrides" in signature: + # config_overrides is required in some newer mcore versions. + kwargs["config_overrides"] = None + return kwargs + + def get_megatron_optimizer( config: OptimizerConfig, model_chunks: List[MegatronModule], @@ -68,19 +99,20 @@ def get_megatron_optimizer( optimizers = [] model_chunk_offset = 0 - kwargs = {} - if "config_overrides" in inspect.signature(_get_param_groups_and_buffers).parameters: - # config_overrides is required in mcore-core>=0.16 - kwargs = {"config_overrides": None} for dense_model_chunks, overlap_param_gather_with_optimizer_step in zip( all_dense_model_chunks, overlap_param_gather_with_optimizer_step_flags ): - param_groups, buffers = _get_param_groups_and_buffers( - dense_model_chunks, + kwargs = _build_param_groups_and_buffers_kwargs( model_chunk_offset=model_chunk_offset, config=config, + no_weight_decay_cond=no_weight_decay_cond, + scale_lr_cond=scale_lr_cond, + lr_mult=lr_mult, filter_fn=lambda g: not g['is_expert_parallel'], buffer_name='buffers', + ) + param_groups, buffers = _get_param_groups_and_buffers( + dense_model_chunks, **kwargs, ) for model_chunk in dense_model_chunks: @@ -109,13 +141,18 @@ def get_megatron_optimizer( setattr(optimizers[-1], "model_chunks", dense_model_chunks) model_chunk_offset += 1 - moe_param_groups, moe_buffers = _get_param_groups_and_buffers( - model_chunks, + moe_kwargs = _build_param_groups_and_buffers_kwargs( model_chunk_offset=0, config=config, + no_weight_decay_cond=no_weight_decay_cond, + scale_lr_cond=scale_lr_cond, + lr_mult=lr_mult, filter_fn=lambda g: g['is_expert_parallel'], buffer_name='expert_parallel_buffers', - **kwargs, + ) + moe_param_groups, moe_buffers = _get_param_groups_and_buffers( + model_chunks, + **moe_kwargs, ) if len(moe_param_groups) > 0: model_parallel_rank = torch.distributed.get_rank( diff --git a/roll/third_party/vllm/__init__.py b/roll/third_party/vllm/__init__.py index 77f67cbbb..8de27f0ec 100644 --- a/roll/third_party/vllm/__init__.py +++ b/roll/third_party/vllm/__init__.py @@ -44,8 +44,33 @@ logger.info(f"Using vllm version {vllm.__version__}") +def _is_cumem_available() -> bool: + try: + from vllm.device_allocator.cumem import cumem_available + + return bool(cumem_available) + except Exception as exc: + logger.warning("Failed to probe vLLM cumem allocator availability: %s", exc) + return False + + +def _resolve_enable_sleep_mode(requested: bool) -> bool: + if not requested: + return False + + if _is_cumem_available(): + return True + + logger.warning( + "vLLM sleep mode requested but cumem allocator is unavailable on this platform. " + "Disabling sleep mode and continuing without vLLM offload/reload." + ) + return False + + async def create_async_llm(resource_placement_groups: List[Dict], **kwargs): - kwargs["enable_sleep_mode"] = True + requested_sleep_mode = kwargs.pop("enable_sleep_mode", True) + kwargs["enable_sleep_mode"] = _resolve_enable_sleep_mode(requested_sleep_mode) if "worker_extension_cls" not in kwargs: # VLLM_USE_V1 is deprecated in vllm>=0.11.1 @@ -135,6 +160,7 @@ async def create_async_llm(resource_placement_groups: List[Dict], **kwargs): stat_loggers=None, ) + setattr(async_llm, "roll_sleep_mode_enabled", kwargs["enable_sleep_mode"]) await async_llm.custom_init_worker() return async_llm diff --git a/tests/distributed/scheduler/test_initialize.py b/tests/distributed/scheduler/test_initialize.py index 6a20c5480..5d2519d23 100644 --- a/tests/distributed/scheduler/test_initialize.py +++ b/tests/distributed/scheduler/test_initialize.py @@ -1,6 +1,8 @@ +from types import SimpleNamespace + import ray -from roll.distributed.scheduler.initialize import init +from roll.distributed.scheduler.initialize import _get_ray_start_resource_args, init @ray.remote @@ -33,6 +35,26 @@ def test_ray_cluster_func(): print(hello_msg2) +def test_get_ray_start_resource_args_for_gpu(monkeypatch): + fake_platform = SimpleNamespace(device_type="cuda", ray_device_key="GPU") + fake_torch = SimpleNamespace(cuda=SimpleNamespace(device_count=lambda: 4)) + + monkeypatch.setattr("roll.distributed.scheduler.initialize.current_platform", fake_platform) + monkeypatch.setattr("roll.distributed.scheduler.initialize.torch", fake_torch) + + assert _get_ray_start_resource_args() == " --num-gpus=4" + + +def test_get_ray_start_resource_args_for_custom_accelerator(monkeypatch): + fake_platform = SimpleNamespace(device_type="npu", ray_device_key="NPU") + fake_torch = SimpleNamespace(npu=SimpleNamespace(device_count=lambda: 8)) + + monkeypatch.setattr("roll.distributed.scheduler.initialize.current_platform", fake_platform) + monkeypatch.setattr("roll.distributed.scheduler.initialize.torch", fake_torch) + + assert _get_ray_start_resource_args() == """ --resources='{"NPU": 8}'""" + + if __name__ == "__main__": """ RANK=0 WORLD_SIZE=2 MASTER_ADDR='33.197.137.224' MASTER_PORT=54893 python tests/distributed/scheduler/test_initialize.py diff --git a/tests/distributed/scheduler/test_resource_manager.py b/tests/distributed/scheduler/test_resource_manager.py index 96e2f9142..3ba68e1cb 100644 --- a/tests/distributed/scheduler/test_resource_manager.py +++ b/tests/distributed/scheduler/test_resource_manager.py @@ -1,8 +1,10 @@ import os +from types import SimpleNamespace from ray.runtime_env import RuntimeEnv os.environ["RAY_DEDUP_LOGS"] = "0" +import pytest import ray from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy @@ -91,6 +93,28 @@ def test_resource_manager_num_gpus_per_worker_gt_1(): print(res) +def test_resource_manager_reports_missing_ray_gpu_resources(monkeypatch): + monkeypatch.setattr( + "roll.distributed.scheduler.resource_manager.ray.available_resources", + lambda: {"CPU": 8.0}, + ) + monkeypatch.setattr( + "roll.distributed.scheduler.resource_manager.ray.nodes", + lambda: [{"Alive": True, "Resources": {"CPU": 8.0}}], + ) + monkeypatch.setattr( + "roll.distributed.scheduler.resource_manager.current_platform", + SimpleNamespace(ray_device_key="GPU", device_type="cuda", device_name="Iluvatar"), + ) + monkeypatch.setattr( + "roll.distributed.scheduler.resource_manager.torch", + SimpleNamespace(cuda=SimpleNamespace(device_count=lambda: 16)), + ) + + with pytest.raises(AssertionError, match="Ray reports 0 GPU resources"): + ResourceManager(num_gpus_per_node=1, num_nodes=1) + + if __name__ == "__main__": """ RANK=0 WORLD_SIZE=2 MASTER_ADDR='33.195.52.67' MASTER_PORT=54893 python tests/distributed/scheduler/test_resource_manager.py diff --git a/tests/platforms/test_platform_init.py b/tests/platforms/test_platform_init.py new file mode 100644 index 000000000..38d6591f8 --- /dev/null +++ b/tests/platforms/test_platform_init.py @@ -0,0 +1,47 @@ +from types import SimpleNamespace + +from roll.platforms import _init_platform + + +def test_init_platform_falls_back_to_cpu_when_no_visible_cuda_devices(monkeypatch): + fake_torch = SimpleNamespace( + cuda=SimpleNamespace( + device_count=lambda: 0, + ) + ) + + monkeypatch.setattr("roll.platforms.torch", fake_torch) + + platform = _init_platform() + + assert platform.device_name == "CPU" + + +def test_init_platform_uses_corex_platform_for_iluvatar_device(monkeypatch): + fake_torch = SimpleNamespace( + cuda=SimpleNamespace( + device_count=lambda: 1, + get_device_name=lambda index=0: "Iluvatar BI-V150", + ) + ) + + monkeypatch.setattr("roll.platforms.torch", fake_torch) + + platform = _init_platform() + + assert platform.device_name == "COREX" + + +def test_init_platform_uses_unknown_platform_for_unrecognized_cuda_device(monkeypatch): + fake_torch = SimpleNamespace( + cuda=SimpleNamespace( + device_count=lambda: 1, + get_device_name=lambda index=0: "Some Vendor Accelerator", + ) + ) + + monkeypatch.setattr("roll.platforms.torch", fake_torch) + + platform = _init_platform() + + assert platform.device_name == "UNKNOWN" diff --git a/tests/platforms/test_platform_memory.py b/tests/platforms/test_platform_memory.py new file mode 100644 index 000000000..7ae668b3f --- /dev/null +++ b/tests/platforms/test_platform_memory.py @@ -0,0 +1,95 @@ +import ctypes + +import roll.platforms.platform as platform_module +from roll.platforms.unknown import UnknownPlatform + + +def test_device_memory_used_falls_back_to_mem_get_info(monkeypatch): + def fake_device_memory_used(device=None): + raise RuntimeError("nvml unavailable") + + def fake_mem_get_info(device=None): + return (3, 10) + + monkeypatch.setattr("torch.cuda.device_memory_used", fake_device_memory_used) + monkeypatch.setattr("torch.cuda.mem_get_info", fake_mem_get_info) + monkeypatch.setattr( + UnknownPlatform, + "_nvml_compatible_device_memory_used", + classmethod(lambda cls, device, primary_exc: None), + ) + + assert UnknownPlatform.device_memory_used() == 7 + + +def test_device_memory_used_falls_back_to_memory_reserved(monkeypatch): + def fake_device_memory_used(device=None): + raise RuntimeError("nvml unavailable") + + def fake_mem_get_info(device=None): + raise RuntimeError("mem_get_info unavailable") + + monkeypatch.setattr("torch.cuda.device_memory_used", fake_device_memory_used) + monkeypatch.setattr("torch.cuda.mem_get_info", fake_mem_get_info) + monkeypatch.setattr("torch.cuda.memory_reserved", lambda device=None: 123) + monkeypatch.setattr( + UnknownPlatform, + "_nvml_compatible_device_memory_used", + classmethod(lambda cls, device, primary_exc: None), + ) + + assert UnknownPlatform.device_memory_used() == 123 + + +def test_map_visible_device_index(monkeypatch): + monkeypatch.setenv("CUDA_VISIBLE_DEVICES", "4,7") + + assert platform_module._map_visible_device_index(0, "CUDA_VISIBLE_DEVICES") == 4 + assert platform_module._map_visible_device_index(1, "CUDA_VISIBLE_DEVICES") == 7 + + +def test_device_memory_used_uses_nvml_compatible_library(monkeypatch): + class FakeFunction: + def __init__(self, fn): + self.fn = fn + self.argtypes = None + self.restype = None + + def __call__(self, *args, **kwargs): + return self.fn(*args, **kwargs) + + class FakeLibrary: + def __init__(self): + self.handle_index = None + self.nvmlInit_v2 = FakeFunction(lambda: 0) + self.nvmlErrorString = FakeFunction(lambda retcode: b"ok") + self.nvmlDeviceGetHandleByIndex_v2 = FakeFunction(self._get_handle) + self.nvmlDeviceGetMemoryInfo = FakeFunction(self._get_memory_info) + + def _get_handle(self, index, handle_ptr): + self.handle_index = index + ctypes.cast(handle_ptr, ctypes.POINTER(ctypes.c_void_p)).contents.value = 1234 + return 0 + + def _get_memory_info(self, handle, memory_info_ptr): + memory_info = ctypes.cast( + memory_info_ptr, ctypes.POINTER(platform_module._NvmlMemoryInfo) + ).contents + memory_info.total = 100 + memory_info.free = 25 + memory_info.used = 75 + return 0 + + fake_library = FakeLibrary() + + monkeypatch.setenv("CUDA_VISIBLE_DEVICES", "5") + monkeypatch.setattr( + platform_module, + "_load_nvml_compatible_library", + lambda: (fake_library, "libixml.so"), + ) + + used, library_name = platform_module._nvml_compatible_device_memory_used(0, "CUDA_VISIBLE_DEVICES") + assert used == 75 + assert library_name == "libixml.so" + assert fake_library.handle_index == 5 diff --git a/tests/third_party/megatron/test_optimizer_compat.py b/tests/third_party/megatron/test_optimizer_compat.py new file mode 100644 index 000000000..b77d0d743 --- /dev/null +++ b/tests/third_party/megatron/test_optimizer_compat.py @@ -0,0 +1,69 @@ +from roll.third_party.megatron.optimizer import _build_param_groups_and_buffers_kwargs + + +def test_build_param_groups_and_buffers_kwargs_supports_new_signature(monkeypatch): + def fake_get_param_groups_and_buffers( + model_chunks, + model_chunk_offset, + config, + no_weight_decay_cond, + scale_lr_cond, + lr_mult, + filter_fn, + buffer_name, + default_skip_embedding_weight_decay=False, + ): + return model_chunks, {} + + monkeypatch.setattr( + "roll.third_party.megatron.optimizer._get_param_groups_and_buffers", + fake_get_param_groups_and_buffers, + ) + + kwargs = _build_param_groups_and_buffers_kwargs( + model_chunk_offset=3, + config="cfg", + no_weight_decay_cond="no_wd", + scale_lr_cond="scale_lr", + lr_mult=0.5, + filter_fn="filter", + buffer_name="buffers", + ) + + assert kwargs == { + "model_chunk_offset": 3, + "config": "cfg", + "no_weight_decay_cond": "no_wd", + "scale_lr_cond": "scale_lr", + "lr_mult": 0.5, + "filter_fn": "filter", + "buffer_name": "buffers", + "default_skip_embedding_weight_decay": False, + } + + +def test_build_param_groups_and_buffers_kwargs_supports_old_signature(monkeypatch): + def fake_get_param_groups_and_buffers(model_chunks, model_chunk_offset, config, filter_fn, buffer_name): + return model_chunks, {} + + monkeypatch.setattr( + "roll.third_party.megatron.optimizer._get_param_groups_and_buffers", + fake_get_param_groups_and_buffers, + ) + + kwargs = _build_param_groups_and_buffers_kwargs( + model_chunk_offset=1, + config="cfg", + no_weight_decay_cond="no_wd", + scale_lr_cond="scale_lr", + lr_mult=1.0, + filter_fn="filter", + buffer_name="buffers", + ) + + assert kwargs == { + "model_chunk_offset": 1, + "config": "cfg", + "filter_fn": "filter", + "buffer_name": "buffers", + } From 5a2e836759600344de8ba913b22f58168a8f9cc6 Mon Sep 17 00:00:00 2001 From: lxzlxzliuxuzhao Date: Sat, 21 Mar 2026 11:28:38 +0000 Subject: [PATCH 2/3] fix vllm version bug --- examples/agentic_demo/agent_val_rock_swe.yaml | 20 +++++----- .../agent_val_sokoban_sandbox.yaml | 2 +- roll/third_party/vllm/__init__.py | 11 ++++-- roll/third_party/vllm/versioning.py | 27 +++++++++++++ roll/third_party/vllm/worker.py | 8 ++-- tests/third_party/vllm/test_versioning.py | 39 +++++++++++++++++++ 6 files changed, 88 insertions(+), 19 deletions(-) create mode 100644 roll/third_party/vllm/versioning.py create mode 100644 tests/third_party/vllm/test_versioning.py diff --git a/examples/agentic_demo/agent_val_rock_swe.yaml b/examples/agentic_demo/agent_val_rock_swe.yaml index 55110b3e6..5e3898b18 100644 --- a/examples/agentic_demo/agent_val_rock_swe.yaml +++ b/examples/agentic_demo/agent_val_rock_swe.yaml @@ -75,13 +75,13 @@ actor_train: strategy_args: strategy_name: megatron_train strategy_config: - tensor_model_parallel_size: 1 + tensor_model_parallel_size: 2 pipeline_model_parallel_size: 1 expert_model_parallel_size: 1 context_parallel_size: 1 use_distributed_optimizer: false recompute_granularity: full - device_mapping: list(range(2,3)) + device_mapping: list(range(2,4)) infer_batch_size: 1 actor_infer: model_args: @@ -105,8 +105,8 @@ actor_infer: gpu_memory_utilization: 0.8 block_size: 16 load_format: auto - tensor_parallel_size: 1 - device_mapping: list(range(1,2)) + tensor_parallel_size: 2 + device_mapping: list(range(0,2)) reference: model_args: @@ -123,7 +123,7 @@ reference: pipeline_model_parallel_size: 1 expert_model_parallel_size: 1 context_parallel_size: 1 - device_mapping: list(range(2,3)) + device_mapping: list(range(4,5)) infer_batch_size: 1 reward_normalization: @@ -181,7 +181,7 @@ agent_config_common: custom_install_cmd: "wget --retry-connrefused --tries=10 --waitretry=2 -O ~/iflow-cli.tgz 'http://cloud.iflow.cn/iflow-cli/iflow-ai-iflow-cli-for-roll-0-4-4-v5.tgz' && npm i -g ~/iflow-cli.tgz" env: IFLOW_apiKey: "test" - IFLOW_baseUrl: "http://localhost:8080/v1" + IFLOW_baseUrl: "http://10.31.10.33:8080/v1" IFLOW_modelName: "ROME" IFLOW_searchApiKey: "88888888" IFLOW_selectedAuthType: "openai-compatible" @@ -200,11 +200,11 @@ custom_envs: agent_system_template: "agent_system_template placeholder" agent_template: "agent_template placeholder" env_config: - dataset_name: /ROLL/data/swe_bench_verified_example.jsonl + dataset_name: /data1/lxzs_workspace/ROLL/data/swe_bench_verified_example.jsonl tools: ~ max_steps: ${max_actions_per_traj} mode: "train" - sandbox_base_url: http://localhost:8080 # change to your own service address if needed + sandbox_base_url: http://10.31.10.33:8080 # change to your own service address if needed user_id: "xxx" experiment_id: "test_tb_native" test_files: ["/terminal-bench-datasets/datasets/swebench-verified"] @@ -217,11 +217,11 @@ custom_envs: agent_system_template: "agent_system_template placeholder" agent_template: "agent_template placeholder" env_config: - dataset_name: /ROLL/data/swe_bench_verified_example.jsonl + dataset_name: /data1/lxzs_workspace/ROLL/data/swe_bench_verified_example.jsonl tools: ~ max_steps: ${max_actions_per_traj} mode: "val" - sandbox_base_url: http://localhost:8080 # change to your own service address if needed + sandbox_base_url: http://10.31.10.33:8080 # change to your own service address if needed user_id: "xxx" experiment_id: "test_tb_native" test_files: ["/terminal-bench-datasets/datasets/swebench-verified"] diff --git a/examples/agentic_demo/agent_val_sokoban_sandbox.yaml b/examples/agentic_demo/agent_val_sokoban_sandbox.yaml index 8bb2d8042..3256558a5 100644 --- a/examples/agentic_demo/agent_val_sokoban_sandbox.yaml +++ b/examples/agentic_demo/agent_val_sokoban_sandbox.yaml @@ -162,7 +162,7 @@ custom_envs: SokobanSandbox: env_type: sokoban_sandbox env_config: - base_url: 'http://localhost:8080' # change to your own service address if needed + base_url: 'http://10.31.10.33:8080' # change to your own service address if needed max_steps: ${max_actions_per_traj} max_tokens_per_step: ${max_tokens_per_step} env_manager_cls: ${env_manager_cls} diff --git a/roll/third_party/vllm/__init__.py b/roll/third_party/vllm/__init__.py index 8de27f0ec..0678016a0 100644 --- a/roll/third_party/vllm/__init__.py +++ b/roll/third_party/vllm/__init__.py @@ -12,6 +12,7 @@ from roll.platforms import current_platform import roll.third_party.vllm.fp8 as fp8 +from roll.third_party.vllm.versioning import supports_vllm_0_11_v0_ray_executor, uses_vllm_0_11_adapter from roll.utils.import_utils import safe_import_class from roll.utils.logging import get_logger @@ -25,9 +26,13 @@ elif Version("0.10.2") == Version(vllm.__version__): ray_executor_class_v0 = safe_import_class("roll.third_party.vllm.vllm_0_10_2.ray_distributed_executor.CustomRayDistributedExecutor") ray_executor_class_v1 = safe_import_class("roll.third_party.vllm.vllm_0_10_2.v1.ray_distributed_executor.CustomRayDistributedExecutor") -elif Version("0.11.0") == Version(vllm.__version__) or Version("0.11.1rc1") == Version(vllm.__version__) or Version("0.11.1rc2.dev0+gc3a722fcb.d20251021") == Version(vllm.__version__): - ray_executor_class_v0 = safe_import_class("roll.third_party.vllm.vllm_0_11_0.ray_distributed_executor.CustomRayDistributedExecutor") - ray_executor_class_v1 = safe_import_class("roll.third_party.vllm.vllm_0_11_0.v1.ray_distributed_executor.CustomRayDistributedExecutor") +elif uses_vllm_0_11_adapter(vllm.__version__): + ray_executor_class_v0 = None + if supports_vllm_0_11_v0_ray_executor(vllm.__version__): + ray_executor_class_v0 = safe_import_class("roll.third_party.vllm.vllm_0_11_0.ray_distributed_executor.CustomRayDistributedExecutor") + ray_executor_class_v1 = safe_import_class("roll.third_party.vllm.vllm_0_11_0.v1.ray_distributed_executor.CustomRayDistributedExecutor") + else: + ray_executor_class_v1 = safe_import_class("roll.third_party.vllm.vllm_0_12_0.ray_distributed_executor.CustomRayDistributedExecutor") elif Version("0.12.0") == Version(vllm.__version__): ray_executor_class_v0 = None # V0 deprecated ray_executor_class_v1 = safe_import_class("roll.third_party.vllm.vllm_0_12_0.ray_distributed_executor.CustomRayDistributedExecutor") diff --git a/roll/third_party/vllm/versioning.py b/roll/third_party/vllm/versioning.py new file mode 100644 index 000000000..e7337419a --- /dev/null +++ b/roll/third_party/vllm/versioning.py @@ -0,0 +1,27 @@ +from packaging.version import Version + + +_VLLM_0_11_MIN = Version("0.11.0") +_VLLM_0_12_MIN = Version("0.12.0") +_VLLM_0_11_V0_RAY_EXECUTOR_MAX = Version("0.11.2") + + +def uses_vllm_0_11_adapter(version: str) -> bool: + parsed = Version(version) + return _VLLM_0_11_MIN <= parsed < _VLLM_0_12_MIN + + +def supports_vllm_0_11_v0_ray_executor(version: str) -> bool: + parsed = Version(version) + return _VLLM_0_11_MIN <= parsed < _VLLM_0_11_V0_RAY_EXECUTOR_MAX + + +def load_process_weights_after_loading_utils(): + from vllm.model_executor.model_loader.utils import process_weights_after_loading + + try: + from vllm.model_executor.model_loader.utils import set_default_torch_dtype + except ImportError: + from vllm.utils.torch_utils import set_default_torch_dtype + + return process_weights_after_loading, set_default_torch_dtype diff --git a/roll/third_party/vllm/worker.py b/roll/third_party/vllm/worker.py index ea82ceb40..f9bb5e0e1 100644 --- a/roll/third_party/vllm/worker.py +++ b/roll/third_party/vllm/worker.py @@ -7,9 +7,9 @@ import torch import vllm -from packaging.version import Version from roll.platforms import current_platform +from roll.third_party.vllm.versioning import load_process_weights_after_loading_utils, uses_vllm_0_11_adapter from roll.third_party.vllm.vllm_utils import TensorLoRARequest, patch_vllm_lora_manager from roll.utils.collective import collective from roll.utils.cuda_ipc_utils import MultiprocessingSerializer @@ -144,10 +144,8 @@ def update_parameter_in_bucket(self, serialized_named_tensors, is_lora=False): self.load_weights([(name, weight) for name, weight in named_params]) def process_weights_after_loading(self): - if (Version("0.11.0") == Version(vllm.__version__) or - Version("0.11.1rc1") == Version(vllm.__version__) or - Version("0.11.1rc2.dev0+gc3a722fcb.d20251021") == Version(vllm.__version__)): - from vllm.model_executor.model_loader.utils import process_weights_after_loading,set_default_torch_dtype + if uses_vllm_0_11_adapter(vllm.__version__): + process_weights_after_loading, set_default_torch_dtype = load_process_weights_after_loading_utils() device_config = self.device_config load_config = self.vllm_config.load_config load_device = (device_config.device if load_config.device is None else load_config.device) diff --git a/tests/third_party/vllm/test_versioning.py b/tests/third_party/vllm/test_versioning.py new file mode 100644 index 000000000..06e6ac8ee --- /dev/null +++ b/tests/third_party/vllm/test_versioning.py @@ -0,0 +1,39 @@ +import vllm + +import roll.third_party.vllm as roll_vllm +from roll.third_party.vllm.versioning import ( + load_process_weights_after_loading_utils, + supports_vllm_0_11_v0_ray_executor, + uses_vllm_0_11_adapter, +) + + +def test_uses_vllm_0_11_adapter(): + assert uses_vllm_0_11_adapter("0.11.0") + assert uses_vllm_0_11_adapter("0.11.1rc1") + assert uses_vllm_0_11_adapter("0.11.2") + assert not uses_vllm_0_11_adapter("0.10.2") + assert not uses_vllm_0_11_adapter("0.12.0") + + +def test_supports_vllm_0_11_v0_ray_executor(): + assert supports_vllm_0_11_v0_ray_executor("0.11.0") + assert supports_vllm_0_11_v0_ray_executor("0.11.1rc1") + assert not supports_vllm_0_11_v0_ray_executor("0.11.2") + assert not supports_vllm_0_11_v0_ray_executor("0.12.0") + + +def test_load_process_weights_after_loading_utils(): + if not uses_vllm_0_11_adapter(vllm.__version__): + return + + process_weights_after_loading, set_default_torch_dtype = load_process_weights_after_loading_utils() + assert callable(process_weights_after_loading) + assert callable(set_default_torch_dtype) + + +def test_roll_vllm_module_exposes_v1_ray_executor(): + if not uses_vllm_0_11_adapter(vllm.__version__): + return + + assert roll_vllm.ray_executor_class_v1 is not None From 626a39e6627b7519fa563010215c0ea7cad0d995 Mon Sep 17 00:00:00 2001 From: liuxuzhao Date: Sun, 29 Mar 2026 15:55:58 +0000 Subject: [PATCH 3/3] feat: add CliEnv for ROCK sandbox-backed RL training MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add gem.Env subclass that connects ROLL's agentic RL pipeline to ROCK's MiniSandbox backend via Sandbox CRUD HTTP API. Enables "chain 2" Agent RL training where agents practice in real sandboxed CLI environments. Key features: - Lifecycle: reset() starts sandbox, step() runs commands, close() cleanup - check='ignore' in run_in_session to handle non-zero exit codes - Standard metrics (action_is_valid, format_penalty) aligned with Sokoban - __del__ safety net prevents sandbox leaks when TrajEnvManager skips close - Zero changes to TrajEnvManager — purely gem registration-based integration Co-Authored-By: Claude Opus 4.6 --- roll/pipeline/agentic/env/__init__.py | 1 + roll/pipeline/agentic/env/cli.py | 204 ++++++++++++++++++++++++++ 2 files changed, 205 insertions(+) create mode 100644 roll/pipeline/agentic/env/cli.py diff --git a/roll/pipeline/agentic/env/__init__.py b/roll/pipeline/agentic/env/__init__.py index 1cb9620d1..9d9be8173 100644 --- a/roll/pipeline/agentic/env/__init__.py +++ b/roll/pipeline/agentic/env/__init__.py @@ -16,6 +16,7 @@ gem.register("sokoban_native_env", entry_point="roll.pipeline.agentic.env.sokoban.native_env:SokobanNativeEnv") gem.register("deepeyes", entry_point="roll.pipeline.agentic.env.deepeyes:DeepEyesEnv") gem.register("rock_tb_native_env", entry_point="roll.pipeline.agentic.env.sandbox.rock_tb_native_env:RockTBNativeEnv") +gem.register("cli", entry_point="roll.pipeline.agentic.env.cli:CliEnv") diff --git a/roll/pipeline/agentic/env/cli.py b/roll/pipeline/agentic/env/cli.py new file mode 100644 index 000000000..2a922f60b --- /dev/null +++ b/roll/pipeline/agentic/env/cli.py @@ -0,0 +1,204 @@ +import logging +import time + +import httpx + +from gem import Env +from roll.pipeline.agentic.env.parse_action_utils import default_parser_action_func + +logger = logging.getLogger(__name__) + + +class CliEnv(Env): + """CLI environment backed by ROCK sandbox. + + The LLM agent sends shell commands as actions, receives stdout as observations. + Lifecycle: reset() starts a fresh sandbox, step() runs commands. + Cleanup is handled via reset() (stops previous sandbox) and __del__ (final cleanup). + """ + + def __init__( + self, + sandbox_base_url: str = "http://localhost:8080", + sandbox_type: str = "minisandbox", + sandbox_image: str = "", + memory: str = "8g", + cpus: float = 2.0, + auto_clear_seconds: int = 1200, + workspace_dir: str = "/tmp/cli_workspace", + max_steps: int = 30, + format_penalty: float = -0.1, + action_pattern: str = "(.*?)", + special_token_list: tuple = ("<|im_start|>", "<|im_end|>"), + env_instruction: str | None = None, + **kwargs, + ): + self.sandbox_base_url = sandbox_base_url.rstrip("/") + self._api_url = f"{self.sandbox_base_url}/apis/envs/sandbox/v1" + self.sandbox_type = sandbox_type + self.sandbox_image = sandbox_image + self.memory = memory + self.cpus = cpus + self.auto_clear_seconds = auto_clear_seconds + self.workspace_dir = workspace_dir + self.max_steps = max_steps + self.format_penalty = format_penalty + self.action_pattern = action_pattern + self.special_token_list = special_token_list + self.env_instruction = env_instruction or ( + "You are a CLI assistant. Execute shell commands to complete tasks. " + "Output format: COMMAND where COMMAND is a valid shell command." + ) + + self._client = httpx.Client(timeout=300.0) + self._sandbox_id: str | None = None + self._session: str | None = None + self._step_count: int = 0 + + def __del__(self): + self._cleanup() + + def _cleanup(self): + if self._sandbox_id: + try: + self._post("stop", {"sandbox_id": self._sandbox_id}) + except Exception: + logger.warning("Failed to stop sandbox during cleanup, ignoring") + self._sandbox_id = None + try: + if hasattr(self, "_client") and self._client is not None: + self._client.close() + except Exception: + pass + + def _post(self, path: str, data: dict) -> dict: + response = self._client.post(f"{self._api_url}/{path}", json=data) + response.raise_for_status() + body = response.json() + if body.get("status") != "Success": + raise RuntimeError(f"API error at {path}: {body}") + return body.get("result", {}) + + def _get(self, path: str, params: dict | None = None) -> dict: + response = self._client.get(f"{self._api_url}/{path}", params=params) + response.raise_for_status() + body = response.json() + if body.get("status") != "Success": + raise RuntimeError(f"API error at {path}: {body}") + return body.get("result", {}) + + def reset(self, seed=None) -> tuple[str, dict]: + Env.reset(self, seed) + + # Stop existing sandbox if any + if self._sandbox_id: + try: + self._post("stop", {"sandbox_id": self._sandbox_id}) + except Exception: + logger.warning("Failed to stop previous sandbox, ignoring") + + # Start new sandbox + start_result = self._post("start_async", { + "type": self.sandbox_type, + "image": self.sandbox_image, + "memory": self.memory, + "cpus": self.cpus, + "auto_clear_time": self.auto_clear_seconds / 60, + "auto_clear_time_minutes": self.auto_clear_seconds / 60, + "deployment_config": {}, + }) + self._sandbox_id = start_result["sandbox_id"] + + # Poll until alive + deadline = time.monotonic() + 120 + while time.monotonic() < deadline: + alive_result = self._get("is_alive", {"sandbox_id": self._sandbox_id}) + if alive_result.get("is_alive"): + break + time.sleep(2) + else: + raise TimeoutError(f"Sandbox {self._sandbox_id} did not become alive within 120s") + + # Create bash session + session_name = f"cli-{int(time.time_ns())}" + self._post("create_session", { + "sandbox_id": self._sandbox_id, + "session": session_name, + }) + self._session = session_name + + self._step_count = 0 + + # Prepare workspace + obs = "" + try: + obs = self._run_command(f"mkdir -p {self.workspace_dir} && cd {self.workspace_dir} && pwd") + except Exception: + obs = "Ready." + + return obs, {"env_instruction": self.env_instruction} + + def step(self, action: str) -> tuple[str, float, bool, bool, dict]: + self._step_count += 1 + + action_info = default_parser_action_func( + action, self.action_pattern, None, self.special_token_list + ) + command = action_info.get("action_content", "").strip() + + metrics_agg_mode = { + "action_is_valid": "mean", + "format_penalty": "mean", + } + + if not command: + metrics = {"action_is_valid": False, "format_penalty": self.format_penalty} + return ( + "Invalid action: could not parse a command from your response.", + self.format_penalty, + False, + self._step_count >= self.max_steps, + {"metrics": metrics, "metrics_agg_mode": metrics_agg_mode, **action_info}, + ) + + try: + output = self._run_command(command) + except Exception as e: + metrics = {"action_is_valid": True, "format_penalty": self.format_penalty} + return ( + f"Command execution failed: {e}", + self.format_penalty, + False, + self._step_count >= self.max_steps, + {"metrics": metrics, "metrics_agg_mode": metrics_agg_mode, **action_info}, + ) + + metrics = {"action_is_valid": True, "format_penalty": 0.0} + truncated = self._step_count >= self.max_steps + return output, 0.0, False, truncated, { + "metrics": metrics, + "metrics_agg_mode": metrics_agg_mode, + **action_info, + } + + def _run_command(self, command: str, timeout: int = 30) -> str: + result = self._post("run_in_session", { + "action_type": "bash", + "sandbox_id": self._sandbox_id, + "session": self._session, + "command": command, + "timeout": timeout, + "check": "ignore", + }) + output = result.get("output", "") + exit_code = result.get("exit_code", -1) + if exit_code not in (None, 0): + failure = result.get("failure_reason", "") + output = f"{output}\n[exit code: {exit_code}] {failure}".strip() + return output + + def sample_random_action(self) -> str: + return "ls" + + def close(self): + self._cleanup()