From db81853d9eecee6c9e6935f998f9c17693088e27 Mon Sep 17 00:00:00 2001 From: JensenFire Date: Sun, 11 Jan 2026 07:41:51 +0000 Subject: [PATCH 1/5] support multi-node rdma --- .../update_weight/remote_transfer_plan.py | 34 +++- .../update_weight/update_weight_from_rdma.py | 148 +++++++++++++++++- slime/backends/sglang_utils/sglang_engine.py | 11 +- slime/ray/rollout.py | 37 ++++- tests/test_weight_transfer_moe_multinode.py | 2 + 5 files changed, 212 insertions(+), 20 deletions(-) diff --git a/slime/backends/megatron_utils/update_weight/remote_transfer_plan.py b/slime/backends/megatron_utils/update_weight/remote_transfer_plan.py index 211653b657..dce3765938 100644 --- a/slime/backends/megatron_utils/update_weight/remote_transfer_plan.py +++ b/slime/backends/megatron_utils/update_weight/remote_transfer_plan.py @@ -85,12 +85,24 @@ def _get_parallelism(self, args: Namespace) -> None: self._rollout_tp_size = args.sglang_tp_size self._rollout_dp_size = args.sglang_dp_size self._rollout_ep_size = args.sglang_ep_size + self._rollout_attn_tp_size = self._rollout_tp_size // self._rollout_dp_size + self._rollout_moe_tp_size = self._rollout_tp_size // self._rollout_ep_size + # EP and PP sizes are not tested and likely miss functionalities. self._rollout_pp_size = args.sglang_pp_size if self._rollout_ep_size != 1 or self._rollout_pp_size != 1: raise NotImplementedError("Rollout expert and pipeline parallelisms are not supported yet.") - self._num_gpu_per_engine = min(args.rollout_num_gpus_per_engine, args.num_gpus_per_node) - self._rollout_engine_count = args.rollout_num_gpus // self._num_gpu_per_engine + # self._num_gpu_per_engine = min(args.rollout_num_gpus_per_engine, args.num_gpus_per_node) + + # NOTE: here we need to use the `args.rollout_num_gpus_per_engine` instead of + # `min(args.rollout_num_gpus_per_engine, args.num_gpus_per_node)` as the _num_gpu_per_engine + + # The reason is that for multi-node scenarios, the target ranks of nodes where the node_rank > 1 + # should be taken as the parts of one complete rollout engine, and the target_rank should be larger + # than ` args.num_gpus_per_node`. + + self._rollout_num_gpu_per_engine = args.rollout_num_gpus_per_engine + self._rollout_engine_count = args.rollout_num_gpus // self._rollout_num_gpu_per_engine self._rollout_num_gpus = args.rollout_num_gpus logger.info( f"RemoteTransferPlan initialized: mode={self.mode}, pp_rank={self._pp_rank}/{self._pp_size}, tp_rank={self._tp_rank}/{self._tp_size}, " @@ -148,7 +160,9 @@ def plan_p2p(self) -> list[TransferTaskP2PMeta]: """ all_targets = [ - (m_idx, k_idx) for m_idx in range(self._rollout_engine_count) for k_idx in range(self._num_gpu_per_engine) + (m_idx, k_idx) + for m_idx in range(self._rollout_engine_count) + for k_idx in range(self._rollout_num_gpu_per_engine) ] # Assignments: source_rank -> {engin_rank: [engine_indices]} assignements = defaultdict(lambda: defaultdict(list)) @@ -191,6 +205,20 @@ def count_engine_index_assignments(k_idx: int) -> int: ) return transfer_tasks + def tp_conversion(self, targeted_tp_rank: int) -> dict[str, int]: + """ + Given tp_rank, return the rank of attn_tp/dp/ep/moe-tp. + """ + parallel_rank_dict = {} + # attn_tp/dp + # NOTE: iiuc, in sglang, _num_gpu_per_engine == targeted_tp_size? + parallel_rank_dict["attn_tp_rank"] = targeted_tp_rank % self._rollout_attn_tp_size + parallel_rank_dict["dp_rank"] = targeted_tp_rank // self._rollout_attn_tp_size + # moe-tp/ep + parallel_rank_dict["moe_tp_rank"] = targeted_tp_rank % self._rollout_moe_tp_size + parallel_rank_dict["ep_rank"] = targeted_tp_rank // self._rollout_moe_tp_size + return parallel_rank_dict + def is_source(self) -> bool: """ Determine if the current rank needs to initiate weight transfer. diff --git a/slime/backends/megatron_utils/update_weight/update_weight_from_rdma.py b/slime/backends/megatron_utils/update_weight/update_weight_from_rdma.py index f76c1f19c6..01fa42eeef 100644 --- a/slime/backends/megatron_utils/update_weight/update_weight_from_rdma.py +++ b/slime/backends/megatron_utils/update_weight/update_weight_from_rdma.py @@ -4,6 +4,7 @@ from collections.abc import Callable, Mapping, Sequence import ray +import sglang.srt.distributed.parallel_state as sglang_parallel_state import sglang.srt.layers.dp_attention as sglang_dp_attention import sglang.srt.server_args as sglang_server_args import torch @@ -167,13 +168,29 @@ def connect_rollout_engines( session_id = targets_to_session_id[(target.engine_ind, target.engine_rank)] remote_info = RemoteWeightInfo(session_id, self.remote_weight_infos_by_session_id[session_id]) # Instantiate the local model replicas and a corresponding transfer engine with memory registry for each type of rollout shard. + # TODO verify: + # - if sglang dp is enabled, then attn_tp is equal to tp // dp + # - if sglang ep is enabled, then moe-tp is equal to tp // ep + # generally tp should be equal to the world_size if target.engine_rank not in self.engines: transfer_engine = self._create_transfer_engine() + parallel_rank_dict = self.transfer_plan.tp_conversion(target.engine_rank) + logger.info( + f"[RDMA] Creating model replica for engine rank {target.engine_rank} with rank dict {parallel_rank_dict}" + ) model_replica = self._create_inference_replica( self.args.hf_checkpoint, pp_shard=target.source_shard, - target_rank=target.engine_rank, + target_rank=target.engine_rank, # NOTE: here we assume that sglang_tp == world_size target_tp=self.args.rollout_num_gpus_per_engine, + dp_rank=parallel_rank_dict["dp_rank"], + dp_size=self.transfer_plan._rollout_dp_size, + attn_tp_rank=parallel_rank_dict["attn_tp_rank"], + attn_tp_size=self.transfer_plan._rollout_attn_tp_size, + ep_rank=parallel_rank_dict["ep_rank"], + ep_size=self.transfer_plan._rollout_ep_size, + moe_tp_rank=parallel_rank_dict["moe_tp_rank"], + moe_tp_size=self.transfer_plan._rollout_moe_tp_size, server_args=self.session_id_to_server_args[session_id], ) print_memory(f"[RDMA] After model replica at {target.engine_rank}") @@ -216,7 +233,20 @@ def _create_transfer_engine(self) -> TransferEngine: return transfer_engine def _create_inference_replica( - self, model_path: str, pp_shard: int, target_rank: int, target_tp: int, server_args: ServerArgs + self, + model_path: str, + pp_shard: int, + target_rank: int, + target_tp: int, + dp_rank: int, + dp_size: int, + attn_tp_rank: int, + attn_tp_size: int, + ep_rank: int, + ep_size: int, + moe_tp_rank: int, + moe_tp_size: int, + server_args: ServerArgs, ): """ Create model replica for target rank with correct tp settings. @@ -235,9 +265,24 @@ def _create_inference_replica( # Mock the distributed environment to get correct weight shapes logger.info( - f" Engine replica: {target_rank} tp {target_tp} pp_shard {pp_shard}, model pp sharding not implemented " + f" Engine replica: {target_rank} tp {target_tp} pp_shard {pp_shard}, model pp sharding not implemented, " + f" dp_rank {dp_rank}/{dp_size}, attn_tp_rank {attn_tp_rank}/{attn_tp_size}, " + f" ep_rank {ep_rank}/{ep_size}, moe_tp_rank {moe_tp_rank}/{moe_tp_size} " ) - with MockSglangDistributedContext(tp_size=target_tp, tp_rank=target_rank, server_args=server_args): + # TODO: should take attn_tp/ep/dp into account in the future. + with MockSglangDistributedContext( + tp_size=target_tp, + tp_rank=target_rank, + dp_rank=dp_rank, + dp_size=dp_size, + attn_tp_rank=attn_tp_rank, + attn_tp_size=attn_tp_size, + ep_rank=ep_rank, + ep_size=ep_size, + moe_tp_rank=moe_tp_rank, + moe_tp_size=moe_tp_size, + server_args=server_args, + ): model = get_model( model_config=model_config, load_config=load_config, @@ -297,7 +342,20 @@ def finish_transfer_task(self) -> None: class MockSglangDistributedContext: - def __init__(self, tp_size: int, tp_rank: int, server_args: ServerArgs): + def __init__( + self, + tp_size: int, + tp_rank: int, + dp_rank: int, + dp_size: int, + attn_tp_rank: int, + attn_tp_size: int, + ep_rank: int, + ep_size: int, + moe_tp_rank: int, + moe_tp_size: int, + server_args: ServerArgs, + ): """ TODO: Extend this to support ep, and dp attention? """ @@ -305,8 +363,14 @@ def __init__(self, tp_size: int, tp_rank: int, server_args: ServerArgs): self.tp_rank = tp_rank self.pp_size = 1 self.pp_rank = 0 - self.attn_tp_size = tp_size - self.attn_tp_rank = tp_rank + self.attn_tp_size = attn_tp_size + self.attn_tp_rank = attn_tp_rank + self.dp_rank = dp_rank + self.dp_size = dp_size + self.ep_rank = ep_rank + self.ep_size = ep_size + self.moe_tp_rank = moe_tp_rank + self.moe_tp_size = moe_tp_size self.server_args = server_args # Store active patches for cleanup self._patches = [] @@ -320,11 +384,29 @@ def __enter__(self): mock_group.world_size = self.tp_size mock_group.rank_in_group = self.tp_rank + # Mock Attn TP group + mock_attn_tp_group = MagicMock() + mock_attn_tp_group.world_size = self.attn_tp_size + mock_attn_tp_group.rank_in_group = self.attn_tp_rank + # Mock PP group with proper attributes mock_pp_group = MagicMock() mock_pp_group.rank_in_group = self.pp_rank mock_pp_group.world_size = self.pp_size + # Mock MoE EP group + mock_ep_group = MagicMock() + mock_ep_group.world_size = self.ep_size + mock_ep_group.rank_in_group = self.ep_rank + + # Mock Moe-tp group + mock_moe_tp_group = MagicMock() + mock_moe_tp_group.world_size = self.moe_tp_size + mock_moe_tp_group.rank_in_group = self.moe_tp_rank + + sglang_parallel_state._MOE_TP = mock_moe_tp_group + sglang_parallel_state._MOE_EP = mock_ep_group + # IMPORTANT: Set global variables FIRST, before any patches or model loading. # The get_attention_tp_rank() function reads from _ATTN_TP_RANK global variable. # Setting this BEFORE model loading ensures the correct value is used. @@ -339,9 +421,25 @@ def __enter__(self): # 1. Where they are defined (sglang.srt.layers.dp_attention) # 2. Where they are imported and used (sglang.srt.models.qwen3, etc.) # This is because Python's import creates a local reference in the importing module. + self._patches = [ patch("sglang.srt.distributed.parallel_state.get_tp_group", return_value=mock_group), + patch("sglang.srt.distributed.parallel_state.get_moe_expert_parallel_rank", return_value=self.ep_rank), + patch( + "sglang.srt.distributed.parallel_state.get_moe_expert_parallel_world_size", return_value=self.ep_size + ), + patch("sglang.srt.distributed.parallel_state.get_moe_tensor_parallel_rank", return_value=self.moe_tp_rank), + patch( + "sglang.srt.distributed.parallel_state.get_moe_tensor_parallel_world_size", + return_value=self.moe_tp_size, + ), patch("sglang.srt.distributed.get_pp_group", return_value=mock_pp_group), + patch("sglang.srt.distributed.get_moe_tp_group", return_value=mock_moe_tp_group), + patch("sglang.srt.distributed.get_tp_group", return_value=mock_group), + patch("sglang.srt.distributed.get_moe_expert_parallel_rank", return_value=self.ep_rank), + patch("sglang.srt.distributed.get_moe_expert_parallel_world_size", return_value=self.ep_size), + patch("sglang.srt.distributed.get_moe_tensor_parallel_rank", return_value=self.moe_tp_rank), + patch("sglang.srt.distributed.get_moe_tensor_parallel_world_size", return_value=self.moe_tp_size), patch( "sglang.srt.distributed.parallel_state.get_tensor_model_parallel_world_size", return_value=self.tp_size ), @@ -349,12 +447,48 @@ def __enter__(self): # Patch at definition location patch("sglang.srt.layers.dp_attention.get_attention_tp_rank", return_value=self.attn_tp_rank), patch("sglang.srt.layers.dp_attention.get_attention_tp_size", return_value=self.attn_tp_size), + patch("sglang.srt.layers.dp_attention.get_attention_tp_group", return_value=mock_attn_tp_group), # Patch at import locations in model files - these are critical! patch("sglang.srt.models.qwen3.get_attention_tp_rank", return_value=self.attn_tp_rank), patch("sglang.srt.models.qwen3.get_attention_tp_size", return_value=self.attn_tp_size), + patch("sglang.srt.models.qwen3.get_pp_group", return_value=mock_pp_group), + # Patch at import locations in DeepSeek V2 model + patch("sglang.srt.models.deepseek_v2.get_attention_tp_rank", return_value=self.attn_tp_rank), + patch("sglang.srt.models.deepseek_v2.get_attention_tp_size", return_value=self.attn_tp_size), + patch("sglang.srt.models.deepseek_v2.get_tensor_model_parallel_world_size", return_value=self.tp_size), + patch("sglang.srt.models.deepseek_v2.get_pp_group", return_value=mock_pp_group), + patch("sglang.srt.models.deepseek_v2.get_moe_expert_parallel_world_size", return_value=self.ep_size), + # Patch moe layers + patch( + "sglang.srt.layers.moe.fused_moe_triton.layer.get_moe_expert_parallel_rank", return_value=self.ep_rank + ), + patch( + "sglang.srt.layers.moe.fused_moe_triton.layer.get_moe_expert_parallel_world_size", + return_value=self.ep_size, + ), + patch("sglang.srt.layers.moe.fused_moe_triton.layer.get_tp_group", return_value=mock_group), + patch( + "sglang.srt.layers.moe.fused_moe_triton.layer.get_moe_tensor_parallel_rank", + return_value=self.moe_tp_rank, + ), + patch( + "sglang.srt.layers.moe.fused_moe_triton.layer.get_moe_tensor_parallel_world_size", + return_value=self.moe_tp_size, + ), + # Patch at import locations in MoE token dispatcher + patch( + "sglang.srt.layers.moe.token_dispatcher.standard.get_moe_expert_parallel_rank", + return_value=self.ep_rank, + ), + patch( + "sglang.srt.layers.moe.token_dispatcher.standard.get_moe_expert_parallel_world_size", + return_value=self.ep_size, + ), + patch("sglang.srt.layers.moe.token_dispatcher.standard.get_tp_group", return_value=mock_group), # Also patch in distributed module where get_tensor_model_parallel_rank may be imported patch("sglang.srt.distributed.get_tensor_model_parallel_rank", return_value=self.tp_rank), patch("sglang.srt.distributed.get_tensor_model_parallel_world_size", return_value=self.tp_size), + patch("sglang.srt.distributed.get_moe_expert_parallel_world_size", return_value=self.ep_size), ] # Start all patches diff --git a/slime/backends/sglang_utils/sglang_engine.py b/slime/backends/sglang_utils/sglang_engine.py index 202c3f1b02..72aa10332b 100644 --- a/slime/backends/sglang_utils/sglang_engine.py +++ b/slime/backends/sglang_utils/sglang_engine.py @@ -90,7 +90,7 @@ def __init__(self, args, rank: int, worker_type: str = "regular"): self.rank = rank self.worker_type = worker_type - def init(self, dist_init_addr, port, nccl_port, host=None): + def init(self, dist_init_addr, port, nccl_port, host=None, node_hosts=None): self.router_ip = self.args.sglang_router_ip self.router_port = self.args.sglang_router_port @@ -107,7 +107,7 @@ def init(self, dist_init_addr, port, nccl_port, host=None): dist_init_addr = f"[{ipv6_addr}]:{port_str}" server_args_dict, external_engine_need_check_fields = _compute_server_args( - self.args, self.rank, dist_init_addr, nccl_port, host, port, self.worker_type + self.args, self.rank, dist_init_addr, nccl_port, host, port, self.worker_type, node_hosts=node_hosts ) self.node_rank = server_args_dict["node_rank"] @@ -408,7 +408,9 @@ def stop_profile(self): return response -def _compute_server_args(args, rank, dist_init_addr, nccl_port, host, port, worker_type: str = "regular"): +def _compute_server_args( + args, rank, dist_init_addr, nccl_port, host, port, worker_type: str = "regular", node_hosts: str | None = None +): nnodes = max(1, args.rollout_num_gpus_per_engine // args.num_gpus_per_node) node_rank = rank % nnodes kwargs = { @@ -446,7 +448,8 @@ def _compute_server_args(args, rank, dist_init_addr, nccl_port, host, port, work kwargs["enable_return_routed_experts"] = True if args.fp16: kwargs["dtype"] = "float16" - + if node_hosts is not None and nnodes > 1: + kwargs["node_hosts"] = node_hosts external_engine_need_check_fields = [k for k in kwargs.keys() if k not in _EXTERNAL_ENGINE_SKIP_CHECK_FIELDS] unused_keys = set(kwargs.keys()) diff --git a/slime/ray/rollout.py b/slime/ray/rollout.py index 5a9deeebfc..e80b9af2d5 100644 --- a/slime/ray/rollout.py +++ b/slime/ray/rollout.py @@ -1,3 +1,4 @@ +import json import logging import multiprocessing import os @@ -398,6 +399,7 @@ def init_rollout_engines(args, pg, all_rollout_engines): def _allocate_rollout_engine_addr_and_ports_external(args, rollout_engines): + # TODO: add bonus address setting for rdma weight transfer addr_and_ports = [] for rank, _ in rollout_engines: [host, port] = args.rollout_external_engine_addrs[rank].split(":") @@ -422,16 +424,12 @@ def _allocate_rollout_engine_addr_and_ports_normal(*, args, num_engines, rollout num_engines_per_node = max( 1, min(args.num_gpus_per_node, args.rollout_num_gpus) // args.rollout_num_gpus_per_engine ) + nnodes = max(1, args.rollout_num_gpus_per_engine // args.num_gpus_per_node) addr_and_ports = [{} for _ in range(num_engines)] visited_nodes = set() + all_server_node_hosts = {} # {server_id: {node_rank:address} }, server_id = rank // num_engines_per_node for rank, engine in rollout_engines: - if rank // num_engines_per_node in visited_nodes: - continue - visited_nodes.add(rank // num_engines_per_node) - # TODO: currently when restarting engines, we will set port for all engines on this node starting with this rank. - # e.g. for 8 gpus, if we are restarting engine on gpu 3, we will set port for engine 3,4,5,6,7 on this node. - num_engines_on_this_node = num_engines_per_node - (rank % num_engines_per_node) def get_addr_and_ports(engine): # use small ports to prevent ephemeral port between 32768 and 65536. @@ -457,6 +455,24 @@ def addr(): get_addr, get_port = get_addr_and_ports(engine) + # add node_rank and address into the dict for multi-node scenarios + if nnodes > 1: + server_id = rank // nnodes + all_server_node_hosts[server_id] = all_server_node_hosts.get(server_id, {}) + node_rank = rank % nnodes + assert ( + node_rank not in all_server_node_hosts[server_id] + ), f"Duplicate node rank {node_rank} for server {server_id}" + all_server_node_hosts[server_id][node_rank] = get_addr() + + engine_id = rank // num_engines_per_node + if engine_id in visited_nodes: + continue + visited_nodes.add(engine_id) + # TODO: currently when restarting engines, we will set port for all engines on this node starting with this rank. + # e.g. for 8 gpus, if we are restarting engine on gpu 3, we will set port for engine 3,4,5,6,7 on this node. + num_engines_on_this_node = num_engines_per_node - (rank % num_engines_per_node) + for i in range(num_engines_on_this_node): addr_and_ports[rank + i]["host"] = get_addr() addr_and_ports[rank + i]["port"] = get_port() @@ -473,7 +489,16 @@ def addr(): for i in range(num_engines_on_this_node): addr_and_ports[rank + i]["dist_init_addr"] = f"{get_addr()}:{get_port(6 + args.sglang_dp_size)}" + node_host_addr_str = {} + if nnodes > 1: + for server_id, node_rank_addr_dict in all_server_node_hosts.items(): + assert len(node_rank_addr_dict) == nnodes, f"server {server_id} missing node address {node_rank_addr_dict}" + node_host_addr_str[server_id] = json.dumps(node_rank_addr_dict) + for i, _ in rollout_engines: + if nnodes > 1: + server_id = i // nnodes + addr_and_ports[i]["node_hosts"] = node_host_addr_str[server_id] for key in ["port", "nccl_port", "dist_init_addr"]: assert key in addr_and_ports[i], f"Engine {i} {key} is not set." logger.info(f"Ports for engine {i}: {addr_and_ports[i]}") diff --git a/tests/test_weight_transfer_moe_multinode.py b/tests/test_weight_transfer_moe_multinode.py index 99fd57b740..8e781d14ec 100644 --- a/tests/test_weight_transfer_moe_multinode.py +++ b/tests/test_weight_transfer_moe_multinode.py @@ -39,6 +39,7 @@ class ScriptArgs(U.ExecuteTrainConfig): head_node_ip: str | None = None node_rank: int = 0 nnodes: int = 1 + inter_node_transfer_engine_info_port: int = 15500 # TODO: initialize this port from ray. def validate(self): if self.multinode: @@ -165,6 +166,7 @@ def execute(args: ScriptArgs): f"--sglang-expert-parallel-size {args.sglang_ep} " f"--sglang-pipeline-parallel-size {args.sglang_pp} " "--sglang-mem-fraction-static 0.8 " + f"--sglang-inter-node-transfer-engine-info-port {args.inter_node_transfer_engine_info_port} " ) if args.mode == "rdma": sglang_args += "--sglang-remote-instance-weight-loader-start-seed-via-transfer-engine " From 9d2f5817f82bbf243bd6ff02b7d4882e585539f9 Mon Sep 17 00:00:00 2001 From: JensenFire Date: Sun, 11 Jan 2026 08:55:14 +0000 Subject: [PATCH 2/5] misc --- tests/test_weight_transfer_multinode_h100_80g.sh | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/test_weight_transfer_multinode_h100_80g.sh b/tests/test_weight_transfer_multinode_h100_80g.sh index 10c8a5cf08..7fcc78a3fc 100644 --- a/tests/test_weight_transfer_multinode_h100_80g.sh +++ b/tests/test_weight_transfer_multinode_h100_80g.sh @@ -3,26 +3,26 @@ # 1 training node, 1 rollout node # NODE 0: -MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --head-node-ip h100-069-001 --node-rank 0 --nnodes 2 2>&1 | tee temp2_moe_2node.log +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 0 --nnodes 2 2>&1 | tee temp2_moe_2node.log # NODE 1: -MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --head-node-ip h100-069-001 --node-rank 1 --nnodes 2 2>&1 | tee temp2_moe_2node.log +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 1 --nnodes 2 2>&1 | tee temp2_moe_2node.log # 2 training nodes, 1 rollout node # NODE 0: -MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --head-node-ip h100-069-001 --node-rank 0 --num-train-gpus 16 --train-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_2training.log +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 0 --num-train-gpus 16 --train-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_2training.log # NODE 1: -MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --head-node-ip h100-069-001 --node-rank 1 --num-train-gpus 16 --train-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_2training.log +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 1 --num-train-gpus 16 --train-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_2training.log # NODE 2: -MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --head-node-ip h100-069-001 --node-rank 2 --num-train-gpus 16 --train-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_2training.log +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 2 --num-train-gpus 16 --train-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_2training.log # 1 training node, 2 rollout nodes # NODE 0: -MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --head-node-ip h100-069-001 --node-rank 0 --num-rollout-gpus 16 --sglang-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_1training.log +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 0 --num-rollout-gpus 16 --sglang-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_1training.log # NODE 1: -MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --head-node-ip h100-069-001 --node-rank 1 --num-rollout-gpus 16 --sglang-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_1training.log +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 1 --num-rollout-gpus 16 --sglang-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_1training.log # NODE 2: -MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --head-node-ip h100-069-001 --node-rank 2 --num-rollout-gpus 16 --sglang-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_1training.log +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 2 --num-rollout-gpus 16 --sglang-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_1training.log From f035b317e826fd3630bbc74ca0b16b1e22b88cd8 Mon Sep 17 00:00:00 2001 From: JensenFire Date: Sun, 11 Jan 2026 19:11:19 +0800 Subject: [PATCH 3/5] [2/2] supports for multinode ep/dp/pp, both training and rollout (#1) * ep/dp/pp for multinode, maybe cp ok too --- .../update_weight/remote_transfer_plan.py | 32 +++++++++++++--- slime/utils/external_utils/command_utils.py | 6 +++ tests/test_weight_transfer_moe_multinode.py | 10 +++-- ...test_weight_transfer_multinode_h100_80g.sh | 37 ++++++++++++++++--- 4 files changed, 70 insertions(+), 15 deletions(-) diff --git a/slime/backends/megatron_utils/update_weight/remote_transfer_plan.py b/slime/backends/megatron_utils/update_weight/remote_transfer_plan.py index dce3765938..14bc953b4d 100644 --- a/slime/backends/megatron_utils/update_weight/remote_transfer_plan.py +++ b/slime/backends/megatron_utils/update_weight/remote_transfer_plan.py @@ -80,6 +80,7 @@ def _get_parallelism(self, args: Namespace) -> None: self._dp_rank, self._dp_size = mpu.get_data_parallel_rank( with_context_parallel=True ), mpu.get_data_parallel_world_size(with_context_parallel=True) + self._edp_rank, self._edp_size = mpu.get_expert_data_parallel_rank(), mpu.get_expert_data_parallel_world_size() # Gather the target (rollout engine count and parallelism) information. self._rollout_tp_size = args.sglang_tp_size @@ -90,7 +91,7 @@ def _get_parallelism(self, args: Namespace) -> None: # EP and PP sizes are not tested and likely miss functionalities. self._rollout_pp_size = args.sglang_pp_size - if self._rollout_ep_size != 1 or self._rollout_pp_size != 1: + if self._rollout_pp_size != 1: raise NotImplementedError("Rollout expert and pipeline parallelisms are not supported yet.") # self._num_gpu_per_engine = min(args.rollout_num_gpus_per_engine, args.num_gpus_per_node) @@ -112,13 +113,31 @@ def _get_parallelism(self, args: Namespace) -> None: f"Rollout engine count: {self._rollout_engine_count}, tp_size={self._rollout_tp_size}, ep_size={self._rollout_ep_size}, dp_size={self._rollout_dp_size}" ) + # Calculate the non-expert dp/ expert dp from training side + # Reference: `Megatron-LM/megatron/core/parallel_state.py` + + # NOTE: + # For Megatron (training_side), `world_size = non_expert_dp_size_with_cp * tp_size * pp_size ` + # and `world_size = expert_dp_size * ep_size * expert_tp_size * pp_size` + # Then for non_expert part and each pp_rank, + # the number of "tp_groups" are `non_expert_dp_size_with_cp` + # then after all-gather in tp dimension, each gpu of `non_expert_dp_size_with_cp` * `tp_size` will have the full weights of this pp_rank + + # Then for expert part and each pp_rank, + # after tp_all_gather and ep_all_gather, each gpu of `expert_dp_size` * `expert_tp_size` * `ep_size` will have the full weights of this pp_rank + + # Since `world_size // pp_size` = `non_expert_dp_size_with_cp` * `tp_size` = `expert_dp_size` * `expert_tp_size` * `ep_size` + # For each gpu of same pp_rank, it has the full weights of the whole model. + + # Non_expert part self._gathered_dp_size = self._dp_size * self._tp_size self._gathered_dp_rank = self._dp_rank * self._tp_size + self._tp_rank # TODO: If I understand correctly the final size should be same as we now only have pp - dp dimensions for both param groups? + expert_tp_size = self._ep_size * self._etp_size - self._gathered_expert_dp_size = self._dp_size * expert_tp_size + self._gathered_expert_dp_size = self._edp_size * expert_tp_size self._gathered_expert_dp_rank = ( - self._dp_rank * expert_tp_size + self._ep_rank * self._etp_size + self._etp_rank + self._edp_rank * expert_tp_size + self._ep_rank * self._etp_size + self._etp_rank ) logger.info( f"Gathered dp_size={self._gathered_dp_size}, gathered expert dp_size={self._gathered_expert_dp_size}" @@ -128,6 +147,7 @@ def _get_parallelism(self, args: Namespace) -> None: ) self._rank = self._gathered_dp_rank + self._size = self._gathered_dp_size def get_nccl_group(self) -> str: """ @@ -168,13 +188,13 @@ def plan_p2p(self) -> list[TransferTaskP2PMeta]: assignements = defaultdict(lambda: defaultdict(list)) # First round robin assignment i = -1 - for source_rank, (idx, target) in zip(range(self._gathered_dp_size), enumerate(all_targets), strict=False): + for source_rank, (idx, target) in zip(range(self._size), enumerate(all_targets), strict=False): i = idx m_idx, k_idx = target assignements[source_rank][k_idx].append(m_idx) def count_engine_index_assignments(k_idx: int) -> int: - return [len(assignements[source][k_idx]) for source in range(self._gathered_dp_size)] + return [len(assignements[source][k_idx]) for source in range(self._size)] # Reminder assignment by least_assigned_source cur_source_index = 0 @@ -188,7 +208,7 @@ def count_engine_index_assignments(k_idx: int) -> int: _, select_source = min((val, idx) for (idx, val) in enumerate(counted) if val > 0) # Else go back to round robin. else: - select_source = cur_source_index % self._gathered_dp_size + select_source = cur_source_index % self._size cur_source_index += 1 assignements[select_source][k_idx].append(m_idx) diff --git a/slime/utils/external_utils/command_utils.py b/slime/utils/external_utils/command_utils.py index 6e3762d39c..3855fddb02 100644 --- a/slime/utils/external_utils/command_utils.py +++ b/slime/utils/external_utils/command_utils.py @@ -31,6 +31,7 @@ def convert_checkpoint( master_addr: str | None = None, nnodes: int = 1, node_rank: int = 0, + decoder_last_pipeline_num_layers: int | None = None, ): hf_checkpoint = hf_checkpoint or f"/root/models/{model_name}" @@ -39,6 +40,11 @@ def convert_checkpoint( path_dst = ( f"{dir_dst}/{model_name}_torch_dist" if nnodes == 1 else f"{dir_dst}/{model_name}_torch_dist_nodes_{nnodes}" ) + path_dst = ( + f"{path_dst}_decoder_last_{decoder_last_pipeline_num_layers}" + if decoder_last_pipeline_num_layers is not None + else path_dst + ) if Path(path_dst).exists(): print(f"convert_checkpoint skip {path_dst} since exists") return diff --git a/tests/test_weight_transfer_moe_multinode.py b/tests/test_weight_transfer_moe_multinode.py index 8e781d14ec..ec92efb40c 100644 --- a/tests/test_weight_transfer_moe_multinode.py +++ b/tests/test_weight_transfer_moe_multinode.py @@ -22,8 +22,9 @@ class ScriptArgs(U.ExecuteTrainConfig): train_tp: int = 8 train_ep: int = 1 train_pp: int = 1 + train_cp: int = 1 train_etp: int = 8 - sglang_tp: int = 8 + sglang_tp: int = 8 # NOTE: for sglang, moe_tp_size = tp_size // ep_size sglang_dp: int = 1 sglang_ep: int = 1 sglang_pp: int = 1 @@ -40,6 +41,7 @@ class ScriptArgs(U.ExecuteTrainConfig): node_rank: int = 0 nnodes: int = 1 inter_node_transfer_engine_info_port: int = 15500 # TODO: initialize this port from ray. + decoder_last_pipeline_num_layers: int | None = None def validate(self): if self.multinode: @@ -73,6 +75,7 @@ def prepare(args: ScriptArgs): nnodes=args.nnodes, dir_dst="/root/multinode", node_rank=args.node_rank, + decoder_last_pipeline_num_layers=args.decoder_last_pipeline_num_layers, ) @@ -128,17 +131,18 @@ def execute(args: ScriptArgs): perf_args = ( f"--tensor-model-parallel-size {args.train_tp} " "--sequence-parallel " # NOTE: necessary: ```ValueError: During training, performance may degrade if MoE and tensor parallelismare enabled without also enabling sequence parallelism.``` - # f"--context-parallel-size {args.train_cp} " + f"--context-parallel-size {args.train_cp} " f"--pipeline-model-parallel-size {args.train_pp} " f"--expert-model-parallel-size {args.train_ep} " f"--expert-tensor-parallel-size {args.train_etp} " - "--context-parallel-size 1 " "--recompute-granularity full " "--recompute-method uniform " "--recompute-num-layers 1 " "--use-dynamic-batch-size " "--max-tokens-per-gpu 2048 " ) + if args.decoder_last_pipeline_num_layers is not None: + perf_args += f"--decoder-last-pipeline-num-layers {args.decoder_last_pipeline_num_layers} " grpo_args = ( "--advantage-estimator gspo " diff --git a/tests/test_weight_transfer_multinode_h100_80g.sh b/tests/test_weight_transfer_multinode_h100_80g.sh index 7fcc78a3fc..e83bdbeb71 100644 --- a/tests/test_weight_transfer_multinode_h100_80g.sh +++ b/tests/test_weight_transfer_multinode_h100_80g.sh @@ -7,22 +7,47 @@ MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multi # NODE 1: MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 1 --nnodes 2 2>&1 | tee temp2_moe_2node.log +# enable training dp = 2 +# NODE 0: +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 0 --nnodes 2 --train-tp 4 2>&1 | tee temp2_moe_2node.log +# NODE 1: +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 1 --nnodes 2 --train-tp 4 2>&1 | tee temp2_moe_2node.log + +# enable training ep = 8, dp = 2, attn-tp = 4 +# NODE 0: +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 0 --nnodes 2 --train-tp 4 --train-ep 8 --train-etp 1 2>&1 | tee temp2_moe_2node.log +# NODE 1: +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 1 --nnodes 2 --train-tp 4 --train-ep 8 --train-etp 1 2>&1 | tee temp2_moe_2node.log + +# enable training pp = 2 +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 0 --nnodes 2 --train-pp 2 --train-tp 4 --train-etp 4 --decoder-last-pipeline-num-layers 14 2>&1 | tee temp2_moe_2node.log +# NODE 1: +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 1 --nnodes 2 --train-pp 2 --train-tp 4 --train-etp 4 --decoder-last-pipeline-num-layers 14 2>&1 | tee temp2_moe_2node.log + + # 2 training nodes, 1 rollout node # NODE 0: -MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 0 --num-train-gpus 16 --train-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_2training.log +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --mode rdma --head-node-ip h100-069-001 --node-rank 0 --num-train-gpus 16 --train-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_2training.log # NODE 1: -MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 1 --num-train-gpus 16 --train-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_2training.log +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --mode rdma --head-node-ip h100-069-001 --node-rank 1 --num-train-gpus 16 --train-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_2training.log # NODE 2: -MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 2 --num-train-gpus 16 --train-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_2training.log +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --mode rdma --head-node-ip h100-069-001 --node-rank 2 --num-train-gpus 16 --train-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_2training.log # 1 training node, 2 rollout nodes # NODE 0: -MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 0 --num-rollout-gpus 16 --sglang-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_1training.log +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --mode rdma --head-node-ip h100-069-001 --node-rank 0 --num-rollout-gpus 16 --sglang-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_1training.log # NODE 1: -MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 1 --num-rollout-gpus 16 --sglang-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_1training.log +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --mode rdma --head-node-ip h100-069-001 --node-rank 1 --num-rollout-gpus 16 --sglang-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_1training.log # NODE 2: -MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --head-node-ip h100-069-001 --node-rank 2 --num-rollout-gpus 16 --sglang-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_1training.log +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --mode rdma --head-node-ip h100-069-001 --node-rank 2 --num-rollout-gpus 16 --sglang-tp 16 --nnodes 3 2>&1 | tee temp2_moe_3node_1training.log +# 1 training node, 2 rollout nodes, rollout_ep = 16 +# NODE 0: +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --mode rdma --head-node-ip h100-069-001 --node-rank 0 --num-rollout-gpus 16 --sglang-tp 16 --sglang-ep 16 --nnodes 3 2>&1 | tee temp2_moe_3node_1training.log +# NODE 1: +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --mode rdma --head-node-ip h100-069-001 --node-rank 1 --num-rollout-gpus 16 --sglang-tp 16 --sglang-ep 16 --nnodes 3 2>&1 | tee temp2_moe_3node_1training.log +# NODE 2: +MASTER_ADDR=h100-069-001 python /root/slime/tests/test_weight_transfer_moe_multinode.py --multinode --mode rdma --mode rdma --head-node-ip h100-069-001 --node-rank 2 --num-rollout-gpus 16 --sglang-tp 16 --sglang-ep 16 --nnodes 3 2>&1 | tee temp2_moe_3node_1training.log From 2597689fab01e5429b67312e07e1f4ec5ec962f5 Mon Sep 17 00:00:00 2001 From: JensenFire Date: Tue, 13 Jan 2026 06:30:20 +0000 Subject: [PATCH 4/5] fix comments --- .../update_weight/remote_transfer_plan.py | 26 ------------------- .../update_weight/update_weight_from_rdma.py | 8 +++--- 2 files changed, 5 insertions(+), 29 deletions(-) diff --git a/slime/backends/megatron_utils/update_weight/remote_transfer_plan.py b/slime/backends/megatron_utils/update_weight/remote_transfer_plan.py index 14bc953b4d..2e37fd5fb1 100644 --- a/slime/backends/megatron_utils/update_weight/remote_transfer_plan.py +++ b/slime/backends/megatron_utils/update_weight/remote_transfer_plan.py @@ -93,15 +93,6 @@ def _get_parallelism(self, args: Namespace) -> None: self._rollout_pp_size = args.sglang_pp_size if self._rollout_pp_size != 1: raise NotImplementedError("Rollout expert and pipeline parallelisms are not supported yet.") - # self._num_gpu_per_engine = min(args.rollout_num_gpus_per_engine, args.num_gpus_per_node) - - # NOTE: here we need to use the `args.rollout_num_gpus_per_engine` instead of - # `min(args.rollout_num_gpus_per_engine, args.num_gpus_per_node)` as the _num_gpu_per_engine - - # The reason is that for multi-node scenarios, the target ranks of nodes where the node_rank > 1 - # should be taken as the parts of one complete rollout engine, and the target_rank should be larger - # than ` args.num_gpus_per_node`. - self._rollout_num_gpu_per_engine = args.rollout_num_gpus_per_engine self._rollout_engine_count = args.rollout_num_gpus // self._rollout_num_gpu_per_engine self._rollout_num_gpus = args.rollout_num_gpus @@ -112,28 +103,11 @@ def _get_parallelism(self, args: Namespace) -> None: logger.info( f"Rollout engine count: {self._rollout_engine_count}, tp_size={self._rollout_tp_size}, ep_size={self._rollout_ep_size}, dp_size={self._rollout_dp_size}" ) - # Calculate the non-expert dp/ expert dp from training side # Reference: `Megatron-LM/megatron/core/parallel_state.py` - # NOTE: - # For Megatron (training_side), `world_size = non_expert_dp_size_with_cp * tp_size * pp_size ` - # and `world_size = expert_dp_size * ep_size * expert_tp_size * pp_size` - # Then for non_expert part and each pp_rank, - # the number of "tp_groups" are `non_expert_dp_size_with_cp` - # then after all-gather in tp dimension, each gpu of `non_expert_dp_size_with_cp` * `tp_size` will have the full weights of this pp_rank - - # Then for expert part and each pp_rank, - # after tp_all_gather and ep_all_gather, each gpu of `expert_dp_size` * `expert_tp_size` * `ep_size` will have the full weights of this pp_rank - - # Since `world_size // pp_size` = `non_expert_dp_size_with_cp` * `tp_size` = `expert_dp_size` * `expert_tp_size` * `ep_size` - # For each gpu of same pp_rank, it has the full weights of the whole model. - - # Non_expert part self._gathered_dp_size = self._dp_size * self._tp_size self._gathered_dp_rank = self._dp_rank * self._tp_size + self._tp_rank - # TODO: If I understand correctly the final size should be same as we now only have pp - dp dimensions for both param groups? - expert_tp_size = self._ep_size * self._etp_size self._gathered_expert_dp_size = self._edp_size * expert_tp_size self._gathered_expert_dp_rank = ( diff --git a/slime/backends/megatron_utils/update_weight/update_weight_from_rdma.py b/slime/backends/megatron_utils/update_weight/update_weight_from_rdma.py index 01fa42eeef..81fe251b67 100644 --- a/slime/backends/megatron_utils/update_weight/update_weight_from_rdma.py +++ b/slime/backends/megatron_utils/update_weight/update_weight_from_rdma.py @@ -171,7 +171,7 @@ def connect_rollout_engines( # TODO verify: # - if sglang dp is enabled, then attn_tp is equal to tp // dp # - if sglang ep is enabled, then moe-tp is equal to tp // ep - # generally tp should be equal to the world_size + # generally tp * pp should be equal to the world_size if target.engine_rank not in self.engines: transfer_engine = self._create_transfer_engine() parallel_rank_dict = self.transfer_plan.tp_conversion(target.engine_rank) @@ -181,7 +181,7 @@ def connect_rollout_engines( model_replica = self._create_inference_replica( self.args.hf_checkpoint, pp_shard=target.source_shard, - target_rank=target.engine_rank, # NOTE: here we assume that sglang_tp == world_size + target_rank=target.engine_rank, # NOTE: here we assume that sglang_tp == world_size when pp_size == 1 target_tp=self.args.rollout_num_gpus_per_engine, dp_rank=parallel_rank_dict["dp_rank"], dp_size=self.transfer_plan._rollout_dp_size, @@ -433,7 +433,9 @@ def __enter__(self): "sglang.srt.distributed.parallel_state.get_moe_tensor_parallel_world_size", return_value=self.moe_tp_size, ), - patch("sglang.srt.distributed.get_pp_group", return_value=mock_pp_group), + patch( + "sglang.srt.distributed.get_pp_group", return_value=mock_pp_group + ), # TODO: redundant. Delete pp group setting in the future patch("sglang.srt.distributed.get_moe_tp_group", return_value=mock_moe_tp_group), patch("sglang.srt.distributed.get_tp_group", return_value=mock_group), patch("sglang.srt.distributed.get_moe_expert_parallel_rank", return_value=self.ep_rank), From d701c4df99f5775ae0f3e7b495013a979e16a2ff Mon Sep 17 00:00:00 2001 From: JensenFire Date: Tue, 13 Jan 2026 06:31:29 +0000 Subject: [PATCH 5/5] fix comments, dprank --- .../megatron_utils/update_weight/update_weight_from_rdma.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/slime/backends/megatron_utils/update_weight/update_weight_from_rdma.py b/slime/backends/megatron_utils/update_weight/update_weight_from_rdma.py index 81fe251b67..26cb9068a9 100644 --- a/slime/backends/megatron_utils/update_weight/update_weight_from_rdma.py +++ b/slime/backends/megatron_utils/update_weight/update_weight_from_rdma.py @@ -413,8 +413,8 @@ def __enter__(self): sglang_server_args._global_server_args = self.server_args sglang_dp_attention._ATTN_TP_RANK = self.attn_tp_rank sglang_dp_attention._ATTN_TP_SIZE = self.attn_tp_size - sglang_dp_attention._ATTN_DP_RANK = 0 - sglang_dp_attention._ATTN_DP_SIZE = 1 + sglang_dp_attention._ATTN_DP_RANK = self.dp_rank + sglang_dp_attention._ATTN_DP_SIZE = self.dp_size # Mock parallelism getters # IMPORTANT: We need to patch functions at BOTH locations: