diff --git a/miles/backends/megatron_utils/actor.py b/miles/backends/megatron_utils/actor.py index ce26c10109..94456c6ff4 100644 --- a/miles/backends/megatron_utils/actor.py +++ b/miles/backends/megatron_utils/actor.py @@ -131,7 +131,8 @@ def init( single_tag=None if args.enable_weights_backuper else "actor", ) self._active_model_tag: str | None = "actor" - self.weights_backuper.backup("actor") + if self._enable_weight_backup: + self.weights_backuper.backup("actor") if with_ref: self.load_other_checkpoint("ref", args.ref_load) @@ -208,7 +209,14 @@ def wake_up(self) -> None: reload_process_groups() print_memory("after wake_up model") + @property + def _enable_weight_backup(self) -> bool: + """Weight backup is needed only for CPU-side model switching paths.""" + return self.with_ref or self.args.keep_old_actor or self.args.colocate + def _switch_model(self, target_tag: str) -> None: + if not self._enable_weight_backup: + return if target_tag not in self.weights_backuper.backup_tags: raise ValueError(f"Cannot switch to unknown model tag: {target_tag}") self.weights_backuper.restore(target_tag) @@ -444,7 +452,10 @@ def train_actor(self, rollout_id: int, rollout_data: RolloutBatch) -> None: m.clear_all() # update the cpu actor weight to the latest model - self.weights_backuper.backup("actor") + if self._enable_weight_backup: + self.weights_backuper.backup("actor") + else: + torch.cuda.synchronize() # Update ref model if needed if ( diff --git a/miles/backends/megatron_utils/update_weight/update_weight_from_distributed/p2p_transfer_utils.py b/miles/backends/megatron_utils/update_weight/update_weight_from_distributed/p2p_transfer_utils.py index 804d73ce04..f2ac80b5b7 100644 --- a/miles/backends/megatron_utils/update_weight/update_weight_from_distributed/p2p_transfer_utils.py +++ b/miles/backends/megatron_utils/update_weight/update_weight_from_distributed/p2p_transfer_utils.py @@ -1,5 +1,7 @@ import dataclasses +import json import logging +import os from argparse import Namespace from collections import defaultdict from collections.abc import Callable, Sequence @@ -205,13 +207,39 @@ def register_cpu_memory(params_dict: dict, transfer_engine) -> dict: return weight_dict +def resolve_mooncake_ib_device() -> str: + config = os.environ.get("MILES_MOONCAKE_IB_DEVICE") or os.environ.get("SGLANG_MOONCAKE_IB_DEVICE") or "" + if not config: + return "" + + if os.path.isfile(config): + with open(config) as f: + config = json.load(f).get(str(int(os.environ.get("LOCAL_RANK", 0))), "") + return config or "" + + if "," in config: + devices = [device.strip() for device in config.split(",") if device.strip()] + if devices: + return devices[int(os.environ.get("LOCAL_RANK", 0)) % len(devices)] + return "" + + return config + + def create_transfer_engine(): transfer_engine = TransferEngine() local_ip = ray._private.services.get_node_ip_address() - transfer_engine.initialize(local_ip, "P2PHANDSHAKE", "rdma", "") + transfer_engine.initialize(local_ip, "P2PHANDSHAKE", "rdma", resolve_mooncake_ib_device()) return transfer_engine +def unpack_remote_transfer_engine_info(remote_info): + """Keep compatibility with sglang endpoints that append warmup metadata.""" + if len(remote_info) < 2: + raise ValueError("remote_instance_transfer_engine_info must contain at least session_id and weights_info") + return remote_info[0], remote_info[1] + + def query_remote_weight_infos( rollout_engines: Sequence[ActorHandle], targets, @@ -223,8 +251,10 @@ def query_remote_weight_infos( targets_to_query = set((target.engine_ind, target.engine_rank) for target in targets) for engine_ind, engine_rank in targets_to_query: - session_id, weights_info = ray.get( + session_id, weights_info = unpack_remote_transfer_engine_info( + ray.get( rollout_engines[engine_ind].get_remote_instance_transfer_engine_info.remote(rank=engine_rank) + ) ) parallelism_info = ray.get(rollout_engines[engine_ind].get_parallelism_info.remote(rank=engine_rank)) diff --git a/scripts/launch-p2p-rdma.sh b/scripts/launch-p2p-rdma.sh new file mode 100755 index 0000000000..8b62c57e36 --- /dev/null +++ b/scripts/launch-p2p-rdma.sh @@ -0,0 +1,235 @@ +#!/bin/bash +# Multi-node P2P RDMA launcher for miles +# +# Usage: +# TRAIN_SCRIPT=scripts/run-qwen3-30B-A3B-p2p.sh bash scripts/launch-p2p-rdma.sh start +# TRAIN_SCRIPT=scripts/run-qwen3-235B-A22B-Instruct-2507-p2p.sh bash scripts/launch-p2p-rdma.sh start +# +# Commands: +# start Full: containers + install + ray + submit +# stop Stop Ray cluster +# status Show cluster status +# submit Submit training job +# logs Tail training logs +# cleanup Stop + remove containers +# containers / install / ray Individual steps + +set -euo pipefail + +# ============== Configuration ============== +NODES=("mia1-p02-g23" "mia1-p02-g46" "mia1-p02-g05" "mia1-p02-g45") +HEAD_NODE="${NODES[0]}" + +DOCKER_IMAGE="rlsys/miles:MI350-355-latest" +CONTAINER_NAME="yuzhen_miles_0330" +HOST_DATA_DIR="/it-share-2/data/yuzhzhou" +AITER_CACHE_ROOT="/tmp/yuzhzhou_aiter_cache" +WORKSPACE="/workspace" +MILES_DIR="${WORKSPACE}/p2prdma/miles" +SGLANG_DIR="${WORKSPACE}/p2prdma/sglang" + +TRAIN_SCRIPT="${TRAIN_SCRIPT:-scripts/run-qwen3-235B-A22B-Instruct-2507-p2p.sh}" +NETWORK_IFNAME="${NETWORK_IFNAME:-${MILES_SOCKET_IFNAME:-eno1}}" +DEFAULT_MOONCAKE_IB_DEVICE="rdma0,rdma1,rdma2,rdma3,rdma4,rdma5,rdma6,rdma7" +MILES_MOONCAKE_IB_DEVICE="${MILES_MOONCAKE_IB_DEVICE:-${DEFAULT_MOONCAKE_IB_DEVICE}}" +TRAIN_ENV="MILES_SOCKET_IFNAME=${NETWORK_IFNAME} MILES_MOONCAKE_IB_DEVICE=${MILES_MOONCAKE_IB_DEVICE}" +SSH_OPTS="-F /dev/null -i ~/.ssh/cluster_id_ed25519 -o StrictHostKeyChecking=no -o ConnectTimeout=30" + +# ============== Helpers ============== +log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*"; } + +ssh_cmd() { + local host=$1; shift + ssh ${SSH_OPTS} "$host" "$@" +} + +docker_exec() { + local host=$1; shift + ssh_cmd "$host" "docker exec ${CONTAINER_NAME} bash -c '$*'" +} + +check_container() { + ssh_cmd "$1" "docker ps -q -f name=^${CONTAINER_NAME}\$" 2>/dev/null | grep -q . +} + +get_container_ip() { + docker_exec "$1" "cd ${MILES_DIR} && ${TRAIN_ENV} bash ${TRAIN_SCRIPT} print-ip" +} + +# ============== Actions ============== +start_containers() { + log "Starting containers on ${#NODES[@]} nodes..." + for host in "${NODES[@]}"; do + if check_container "$host"; then + log " $host: already running" + continue + fi + log " $host: creating..." + ssh_cmd "$host" "mkdir -p ${AITER_CACHE_ROOT}/${CONTAINER_NAME} && docker run -itd --network=host --privileged \ + --device=/dev/kfd --device=/dev/dri --device=/dev/infiniband \ + --ipc=host --shm-size 64G --group-add video \ + --cap-add=SYS_PTRACE --security-opt seccomp=unconfined \ + -v ${HOST_DATA_DIR}:${WORKSPACE} \ + -w ${MILES_DIR} \ + -v ${AITER_CACHE_ROOT}/${CONTAINER_NAME}:/root/.aiter \ + -v /data/yuzhzhou/cache/huggingface:/root/.cache/huggingface \ + -v /data/yuzhzhou/cache/torch:/root/.cache/torch \ + -v /data/yuzhzhou/cache/pip:/root/.cache/pip \ + -v /usr/lib/x86_64-linux-gnu/libionic.so.1.0.54.0-149.g3304be71:/usr/lib/x86_64-linux-gnu/libionic.so.1.0.54.0-149.g3304be71:ro \ + -v /usr/lib/x86_64-linux-gnu/libibverbs/libionic-rdmav34.so:/usr/lib/x86_64-linux-gnu/libibverbs/libionic-rdmav34.so:ro \ + -v /etc/libibverbs.d/ionic.driver:/etc/libibverbs.d/ionic.driver:ro \ + -v ${HOST_DATA_DIR}/rccl-net-plugin:/opt/rocm/lib/rccl-net-plugin:ro \ + -e LD_LIBRARY_PATH=/opt/rocm/lib/rccl-net-plugin:/opt/rocm/lib \ + -e HSA_FORCE_FINE_GRAIN_PCIE=1 \ + -e HSA_NO_SCRATCH_RECLAIM=1 \ + -e HF_HOME=/root/.cache/huggingface \ + -e WANDB_KEY=cd411df8b73eb3f5c1ae1220cc1ec4e3c9d1f86e \ + --name ${CONTAINER_NAME} \ + ${DOCKER_IMAGE}" || true + done + log "All containers ready." +} + +install_packages() { + log "Installing sglang + miles + mooncake + ray patch on all nodes..." + for host in "${NODES[@]}"; do + docker_exec "$host" ' + pip uninstall sglang -y -q 2>/dev/null + cd '"${SGLANG_DIR}"'/python && pip install -e . --no-build-isolation --no-deps -q 2>&1 | tail -2 + cd '"${MILES_DIR}"' && pip install -e . -q 2>&1 | tail -2 + rm -rf /tmp/torch_memory_saver + git clone --depth 1 https://github.com/fzyzcjy/torch_memory_saver.git /tmp/torch_memory_saver >/tmp/torch_memory_saver.clone.log 2>&1 + cd /tmp/torch_memory_saver && pip install -e . -q 2>&1 | tail -2 + pip install mooncake-transfer-engine-non-cuda -q 2>&1 | tail -1 + + python3 -c " +path = \"/opt/venv/lib/python3.10/site-packages/ray/_private/accelerators/amd_gpu.py\" +with open(path) as f: + lines = f.readlines() +out = [] +i = 0 +while i < len(lines): + if \"ROCR_VISIBLE_DEVICES\" in lines[i] and \"if\" in lines[i] and \"os.environ\" in lines[i]: + out.append(lines[i]); i += 1 + while i < len(lines) and (\"raise\" in lines[i] or \"Please use\" in lines[i] or lines[i].strip() == \")\"): + i += 1 + out.append(\" os.environ.pop(\\\"ROCR_VISIBLE_DEVICES\\\", None)\n\") + out.append(\" return AMDGPUAcceleratorManager.get_current_process_visible_accelerator_ids()\n\") + else: + out.append(lines[i]); i += 1 +with open(path, \"w\") as f: + f.writelines(out) +print(\"ray patched\") +" + echo "=== done on $(hostname) ===" + ' & + done + wait + log "All packages installed." +} + +start_ray() { + log "Starting Ray cluster..." + local head_ip + head_ip=$(get_container_ip "${HEAD_NODE}") + log "Using interface ${NETWORK_IFNAME}; head IP=${head_ip}; Mooncake IB=${MILES_MOONCAKE_IB_DEVICE:-auto}" + + # Cleanup all nodes + for host in "${NODES[@]}"; do + docker_exec "$host" \ + "pkill -9 -f '${TRAIN_SCRIPT}' 2>/dev/null || true; pkill -9 sglang; ray stop --force; pkill -9 ray; pkill -9 python; rm -rf /tmp/ray/" \ + 2>/dev/null || true + done + sleep 3 + + # Start head + docker_exec "${HEAD_NODE}" "cd ${MILES_DIR} && ${TRAIN_ENV} bash ${TRAIN_SCRIPT} head" + local retries=0 + while ! docker_exec "${HEAD_NODE}" "ray status" &>/dev/null; do + retries=$((retries + 1)) + [ $retries -ge 6 ] && log "ERROR: head timeout" && exit 1 + sleep 5 + done + + # Start workers + for ((i = 1; i < ${#NODES[@]}; i++)); do + docker_exec "${NODES[$i]}" \ + "cd ${MILES_DIR} && ${TRAIN_ENV} MASTER_ADDR=${head_ip} bash ${TRAIN_SCRIPT} worker" & + done + sleep 15 + + docker_exec "${HEAD_NODE}" "ray status" || true + echo "" + log "Ray cluster ready: ${NODES[*]}" +} + +submit_job() { + log "Submitting: ${TRAIN_SCRIPT}" + docker_exec "${HEAD_NODE}" \ + "cd ${MILES_DIR} && nohup env ${TRAIN_ENV} bash ${TRAIN_SCRIPT} submit > nohup_p2p_rdma.out 2>&1 &" + log "Logs: ssh ${HEAD_NODE} 'docker exec ${CONTAINER_NAME} tail -f ${MILES_DIR}/nohup_p2p_rdma.out'" +} + +stop_cluster() { + log "Stopping cluster..." + for host in "${NODES[@]}"; do + docker_exec "$host" \ + "pkill -9 -f '${TRAIN_SCRIPT}' 2>/dev/null || true; pkill -9 sglang; ray stop --force; pkill -9 ray; pkill -9 python; rm -rf /tmp/ray/" \ + 2>/dev/null || true + done + log "Stopped." +} + +cleanup_all() { + stop_cluster + for host in "${NODES[@]}"; do + ssh_cmd "$host" "docker rm -f ${CONTAINER_NAME}" 2>/dev/null || true + done + log "Cleaned up." +} + +show_status() { + for host in "${NODES[@]}"; do + echo "=== $host ===" + if check_container "$host"; then + docker_exec "$host" "ray status 2>/dev/null | head -8" || echo " Ray not running" + else + echo " Container not running" + fi + done +} + +tail_logs() { + ssh_cmd "${HEAD_NODE}" \ + "docker exec -it ${CONTAINER_NAME} tail -f ${MILES_DIR}/nohup_p2p_rdma.out" +} + +full_start() { + start_containers + sleep 3 + install_packages + sleep 2 + start_ray + sleep 2 + submit_job +} + +# ============== Main ============== +echo "TRAIN_SCRIPT=${TRAIN_SCRIPT}" +if [ $# -lt 1 ]; then + echo "Usage: TRAIN_SCRIPT=scripts/run-xxx-p2p.sh $0 {start|stop|status|submit|logs|cleanup|containers|install|ray}" + exit 1 +fi + +case "$1" in + start) full_start ;; + stop) stop_cluster ;; + status) show_status ;; + submit) submit_job ;; + logs) tail_logs ;; + cleanup) cleanup_all ;; + containers) start_containers ;; + install) install_packages ;; + ray) start_ray ;; + *) echo "Unknown command: $1"; exit 1 ;; +esac diff --git a/scripts/run-qwen3-235B-A22B-Instruct-2507-p2p.sh b/scripts/run-qwen3-235B-A22B-Instruct-2507-p2p.sh new file mode 100755 index 0000000000..2a279f2247 --- /dev/null +++ b/scripts/run-qwen3-235B-A22B-Instruct-2507-p2p.sh @@ -0,0 +1,257 @@ +#!/bin/bash +# Qwen3-235B-A22B-Instruct-2507: P2P RDMA weight transfer, 2 train + 2 rollout +# +# Usage (4 nodes): +# Node 1 (head/train): bash scripts/run-qwen3-235B-A22B-Instruct-2507-p2p.sh head +# Node 2-4 (workers): MASTER_ADDR= bash scripts/run-qwen3-235B-A22B-Instruct-2507-p2p.sh worker +# Node 1 (submit): nohup bash scripts/run-qwen3-235B-A22B-Instruct-2507-p2p.sh submit > nohup_p2p_rdma.out + +set -euo pipefail + +# ============== Paths ============== +export MODEL_DIR="${MODEL_DIR:-/workspace}" +export DATA_DIR="${DATA_DIR:-/workspace}" +MODEL_NAME="Qwen3-235B-A22B-Instruct-2507" +HF_CHECKPOINT="${MODEL_DIR}/${MODEL_NAME}" +TORCH_DIST_PATH="${MODEL_DIR}/${MODEL_NAME}_torch_dist" +MILES_DIR="${MILES_DIR:-/workspace/p2prdma/miles}" +SGLANG_DIR="${SGLANG_DIR:-/workspace/p2prdma/sglang}" +export MODEL_ARGS_ROTARY_BASE=5000000 +export MILES_SOCKET_IFNAME="${MILES_SOCKET_IFNAME:-eno1}" +DEFAULT_MOONCAKE_IB_DEVICE="rdma0,rdma1,rdma2,rdma3,rdma4,rdma5,rdma6,rdma7" +export MILES_MOONCAKE_IB_DEVICE="${MILES_MOONCAKE_IB_DEVICE:-${DEFAULT_MOONCAKE_IB_DEVICE}}" + +# ============== Network Helpers ============== +resolve_ip_from_ifname() { + local ifname="${1:?interface name is required}" + python3 - "$ifname" <<'PY' +import fcntl +import socket +import struct +import sys + +ifname = sys.argv[1] +sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +try: + result = fcntl.ioctl(sock.fileno(), 0x8915, struct.pack("256s", ifname[:15].encode())) +except OSError as exc: + raise SystemExit(f"Failed to resolve IPv4 for interface {ifname}: {exc}") +print(socket.inet_ntoa(result[20:24])) +PY +} + +resolve_local_ip() { + resolve_ip_from_ifname "${MILES_SOCKET_IFNAME}" +} + +# ============== Environment ============== +setup_env() { + export RAY_EXPERIMENTAL_NOSET_HIP_VISIBLE_DEVICES=1 + export HIP_VISIBLE_DEVICES=${HIP_VISIBLE_DEVICES:-"0,1,2,3,4,5,6,7"} + export PYTORCH_HIP_ALLOC_CONF=${PYTORCH_HIP_ALLOC_CONF:-"expandable_segments:True"} + export NUM_GPUS=$(echo ${HIP_VISIBLE_DEVICES} | tr ',' '\n' | wc -l) + export MILES_HOST_IP="$(resolve_local_ip)" + export SGLANG_LOCAL_IP_NIC="${SGLANG_LOCAL_IP_NIC:-${MILES_SOCKET_IFNAME}}" + export RAY_NODE_IP_ADDRESS="${RAY_NODE_IP_ADDRESS:-${MILES_HOST_IP}}" + export GLOO_SOCKET_IFNAME=${GLOO_SOCKET_IFNAME:-${MILES_SOCKET_IFNAME}} + export NCCL_SOCKET_IFNAME=${NCCL_SOCKET_IFNAME:-${MILES_SOCKET_IFNAME}} + export SGLANG_MOONCAKE_IB_DEVICE="${SGLANG_MOONCAKE_IB_DEVICE:-${MILES_MOONCAKE_IB_DEVICE}}" + export MOONCAKE_DEVICE="${MOONCAKE_DEVICE:-${SGLANG_MOONCAKE_IB_DEVICE}}" + export NCCL_DEBUG=${NCCL_DEBUG:-VERSION} +} + +cleanup() { + pkill -9 sglang 2>/dev/null || true; sleep 2 + ray stop --force 2>/dev/null || true + pkill -9 ray 2>/dev/null || true + pkill -9 python 2>/dev/null || true; sleep 2 +} + +# ============== Ray Head/Worker ============== +run_head() { + setup_env; cleanup + export MASTER_ADDR="${MILES_HOST_IP}" + echo "=== Ray Head (235B P2P) IFACE=${MILES_SOCKET_IFNAME} IP=${MASTER_ADDR} GPUs=${NUM_GPUS} MOONCAKE_IB=${SGLANG_MOONCAKE_IB_DEVICE:-auto} ===" + ray start --head \ + --node-ip-address ${MASTER_ADDR} \ + --num-gpus ${NUM_GPUS} \ + --disable-usage-stats \ + --dashboard-host=0.0.0.0 --dashboard-port=8265 + echo "Worker cmd: MASTER_ADDR=${MASTER_ADDR} bash $0 worker" +} + +run_worker() { + [ -z "${MASTER_ADDR:-}" ] && echo "Error: MASTER_ADDR not set" && exit 1 + setup_env; cleanup + local worker_ip="${MILES_HOST_IP}" + echo "=== Ray Worker (235B P2P) IFACE=${MILES_SOCKET_IFNAME} Head=${MASTER_ADDR} Worker=${worker_ip} MOONCAKE_IB=${SGLANG_MOONCAKE_IB_DEVICE:-auto} ===" + ray start \ + --address=${MASTER_ADDR}:6379 \ + --num-gpus ${NUM_GPUS} \ + --node-ip-address ${worker_ip} \ + --disable-usage-stats + while true; do sleep 60; echo "[$(date)] Worker alive"; done +} + +# ============== Submit ============== +run_submit() { + setup_env + ray status + + SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" + cd "${SCRIPT_DIR}/.." + + export MASTER_ADDR="${MILES_HOST_IP}" + export PYTHONUNBUFFERED=1 + export DEPRECATED_MEGATRON_COMPATIBLE=1 + + source "${SCRIPT_DIR}/models/qwen3-235B-A22B.sh" + + MEGATRON_LM_PATH=$(python3 -c \ + "import megatron, os; print(os.path.dirname(os.path.dirname(megatron.__file__)))" \ + 2>/dev/null || echo "/app/Megatron-LM") + + CKPT_ARGS=( + --hf-checkpoint "${HF_CHECKPOINT}" + --load "${TORCH_DIST_PATH}" + ) + + ROLLOUT_ARGS=( + --prompt-data "${DATA_DIR}/dapo-math-17k/dapo-math-17k.jsonl" + --input-key prompt + --label-key label + --apply-chat-template + --rollout-shuffle + --rm-type math + --num-rollout 3000 + --rollout-batch-size 32 + --n-samples-per-prompt 8 + --rollout-max-response-len 8192 + --rollout-temperature 1 + --global-batch-size 256 + --balance-data + ) + + PERF_ARGS=( + --tensor-model-parallel-size 4 + --sequence-parallel + --pipeline-model-parallel-size 2 + --context-parallel-size 1 + --expert-model-parallel-size 8 + --expert-tensor-parallel-size 1 + --recompute-granularity full + --recompute-method uniform + --recompute-num-layers 1 + --use-dynamic-batch-size + --max-tokens-per-gpu 9216 + ) + + GRPO_ARGS=( + --advantage-estimator grpo + --kl-loss-coef 0.00 + --kl-loss-type low_var_kl + --entropy-coef 0.00 + --eps-clip 0.2 + --eps-clip-high 0.28 + ) + + OPTIMIZER_ARGS=( + --optimizer adam + --lr 1e-6 + --lr-decay-style constant + --weight-decay 0.1 + --adam-beta1 0.9 + --adam-beta2 0.98 + --use-precision-aware-optimizer + ) + + SGLANG_ARGS=( + --rollout-num-gpus-per-engine 8 + --sglang-ep-size 8 + --sglang-mem-fraction-static 1.0 + --sglang-max-total-tokens 262144 + --sglang-cuda-graph-bs 1 2 4 8 $(seq 16 8 256) + --sglang-remote-instance-weight-loader-start-seed-via-transfer-engine + ) + + if [ -n "${SGLANG_MOONCAKE_IB_DEVICE}" ]; then + SGLANG_ARGS+=(--sglang-mooncake-ib-device "${SGLANG_MOONCAKE_IB_DEVICE}") + fi + + MISC_ARGS=( + --attention-dropout 0.0 + --hidden-dropout 0.0 + --accumulate-allreduce-grads-in-fp32 + --attention-softmax-in-fp32 + --attention-backend flash + ) + + P2P_ARGS=( + --update-weight-transfer-mode p2p + --update-weight-buffer-size "$((2 * 1024 * 1024 * 1024))" + --check-weight-update-equal + ) + + WANDB_ARGS=( + --use-wandb + --wandb-project miles-p2p-rdma + --wandb-group Qwen3-235B-A22B-Instruct-2507 + --wandb-key ${WANDB_KEY} + ) + + TRAIN_ARGS=( + train_async.py + --update-weights-interval 2 + --actor-num-nodes 2 + --actor-num-gpus-per-node 8 + --rollout-num-gpus 16 + "${MODEL_ARGS[@]}" + "${CKPT_ARGS[@]}" + "${ROLLOUT_ARGS[@]}" + "${PERF_ARGS[@]}" + "${GRPO_ARGS[@]}" + "${OPTIMIZER_ARGS[@]}" + "${SGLANG_ARGS[@]}" + "${MISC_ARGS[@]}" + "${P2P_ARGS[@]}" + "${WANDB_ARGS[@]}" + ) + + ray job submit --address="http://127.0.0.1:8265" \ + --runtime-env-json="{ + \"env_vars\": { + \"PYTHONPATH\": \"${MEGATRON_LM_PATH}/:${SGLANG_DIR}/python:${MILES_DIR}\", + \"CUDA_DEVICE_MAX_CONNECTIONS\": \"1\", + \"DEPRECATED_MEGATRON_COMPATIBLE\": \"1\", + \"RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES\": \"1\", + \"RAY_EXPERIMENTAL_NOSET_HIP_VISIBLE_DEVICES\": \"1\", + \"HIP_VISIBLE_DEVICES\": \"0,1,2,3,4,5,6,7\", + \"PYTORCH_HIP_ALLOC_CONF\": \"expandable_segments:True\", + \"MILES_SOCKET_IFNAME\": \"${MILES_SOCKET_IFNAME}\", + \"MILES_MOONCAKE_IB_DEVICE\": \"${MILES_MOONCAKE_IB_DEVICE}\", + \"MOONCAKE_DEVICE\": \"${MOONCAKE_DEVICE}\", + \"SGLANG_LOCAL_IP_NIC\": \"${SGLANG_LOCAL_IP_NIC}\", + \"SGLANG_MOONCAKE_IB_DEVICE\": \"${SGLANG_MOONCAKE_IB_DEVICE}\", + \"GLOO_SOCKET_IFNAME\": \"${GLOO_SOCKET_IFNAME}\", + \"NCCL_SOCKET_IFNAME\": \"${NCCL_SOCKET_IFNAME}\", + \"no_proxy\": \"${MASTER_ADDR},127.0.0.1\", + \"MASTER_ADDR\": \"${MASTER_ADDR}\" + } + }" \ + -- python3 "${TRAIN_ARGS[@]}" +} + +print_ip() { + setup_env + echo "${MILES_HOST_IP}" +} + +# ============== Main ============== +[ $# -lt 1 ] && echo "Usage: bash $0 {head|worker|submit|print-ip}" && exit 1 +case "$1" in + head) run_head ;; + worker) run_worker ;; + submit) run_submit ;; + print-ip) print_ip ;; + *) echo "Unknown command: $1"; exit 1 ;; +esac diff --git a/scripts/run-qwen3-30B-A3B-p2p.sh b/scripts/run-qwen3-30B-A3B-p2p.sh new file mode 100755 index 0000000000..270168e9fd --- /dev/null +++ b/scripts/run-qwen3-30B-A3B-p2p.sh @@ -0,0 +1,207 @@ +#!/bin/bash +# Qwen3-30B-A3B: P2P RDMA weight transfer, 1 train + 1 rollout +# +# Usage (2 nodes): +# Node 1 (head/train): bash scripts/run-qwen3-30B-A3B-p2p.sh head +# Node 2 (rollout): MASTER_ADDR= bash scripts/run-qwen3-30B-A3B-p2p.sh worker +# Node 1 (submit): nohup bash scripts/run-qwen3-30B-A3B-p2p.sh submit > nohup_p2p_rdma.out + +set -euo pipefail + +# ============== Paths ============== +export MODEL_DIR="${MODEL_DIR:-/workspace}" +export DATA_DIR="${DATA_DIR:-/workspace}" +MODEL_NAME="Qwen3-30B-A3B" +HF_CHECKPOINT="${MODEL_DIR}/${MODEL_NAME}" +TORCH_DIST_PATH="${MODEL_DIR}/${MODEL_NAME}_torch_dist" +MILES_DIR="${MILES_DIR:-/workspace/p2prdma/miles}" +SGLANG_DIR="${SGLANG_DIR:-/workspace/p2prdma/sglang}" + +# ============== Environment ============== +setup_env() { + export RAY_EXPERIMENTAL_NOSET_HIP_VISIBLE_DEVICES=1 + export HIP_VISIBLE_DEVICES=${HIP_VISIBLE_DEVICES:-"0,1,2,3,4,5,6,7"} + export PYTORCH_HIP_ALLOC_CONF=${PYTORCH_HIP_ALLOC_CONF:-"expandable_segments:True"} + export NUM_GPUS=$(echo ${HIP_VISIBLE_DEVICES} | tr ',' '\n' | wc -l) + export NCCL_DEBUG=${NCCL_DEBUG:-VERSION} +} + +cleanup() { + pkill -9 sglang 2>/dev/null || true; sleep 2 + ray stop --force 2>/dev/null || true + pkill -9 ray 2>/dev/null || true + pkill -9 python 2>/dev/null || true; sleep 2 +} + +# ============== Ray Head/Worker ============== +run_head() { + setup_env; cleanup + export MASTER_ADDR=$(hostname -I | awk '{print $1}') + echo "=== Ray Head (30B P2P) IP=${MASTER_ADDR} GPUs=${NUM_GPUS} ===" + ray start --head \ + --node-ip-address ${MASTER_ADDR} \ + --num-gpus ${NUM_GPUS} \ + --disable-usage-stats \ + --dashboard-host=0.0.0.0 --dashboard-port=8265 + echo "Worker cmd: MASTER_ADDR=${MASTER_ADDR} bash $0 worker" +} + +run_worker() { + [ -z "${MASTER_ADDR:-}" ] && echo "Error: MASTER_ADDR not set" && exit 1 + setup_env; cleanup + local worker_ip=$(hostname -I | awk '{print $1}') + echo "=== Ray Worker (30B P2P) Head=${MASTER_ADDR} Worker=${worker_ip} ===" + ray start \ + --address=${MASTER_ADDR}:6379 \ + --num-gpus ${NUM_GPUS} \ + --node-ip-address ${worker_ip} \ + --disable-usage-stats + while true; do sleep 60; echo "[$(date)] Worker alive"; done +} + +# ============== Submit ============== +run_submit() { + setup_env + ray status + + SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" + cd "${SCRIPT_DIR}/.." + + export MASTER_ADDR=$(hostname -I | awk '{print $1}') + export PYTHONUNBUFFERED=1 + export DEPRECATED_MEGATRON_COMPATIBLE=1 + + source "${SCRIPT_DIR}/models/qwen3-30B-A3B.sh" + + MEGATRON_LM_PATH=$(python3 -c \ + "import megatron, os; print(os.path.dirname(os.path.dirname(megatron.__file__)))" \ + 2>/dev/null || echo "/app/Megatron-LM") + + CKPT_ARGS=( + --hf-checkpoint "${HF_CHECKPOINT}" + --ref-load "${TORCH_DIST_PATH}" + ) + + ROLLOUT_ARGS=( + --prompt-data "${DATA_DIR}/dapo-math-17k/dapo-math-17k.jsonl" + --input-key prompt + --label-key label + --apply-chat-template + --rollout-shuffle + --rm-type deepscaler + --num-rollout 3000 + --rollout-batch-size 32 + --n-samples-per-prompt 8 + --rollout-max-response-len 8192 + --rollout-temperature 1 + --global-batch-size 256 + --balance-data + ) + + PERF_ARGS=( + --tensor-model-parallel-size 4 + --sequence-parallel + --pipeline-model-parallel-size 1 + --context-parallel-size 1 + --expert-model-parallel-size 8 + --expert-tensor-parallel-size 1 + --recompute-granularity full + --recompute-method uniform + --recompute-num-layers 1 + --use-dynamic-batch-size + --max-tokens-per-gpu 20480 + ) + + GRPO_ARGS=( + --advantage-estimator grpo + --kl-loss-coef 0.00 + --kl-loss-type low_var_kl + --entropy-coef 0.00 + --eps-clip 0.2 + --eps-clip-high 0.28 + ) + + OPTIMIZER_ARGS=( + --optimizer adam + --lr 1e-6 + --lr-decay-style constant + --weight-decay 0.1 + --adam-beta1 0.9 + --adam-beta2 0.98 + --optimizer-cpu-offload + --overlap-cpu-optimizer-d2h-h2d + --use-precision-aware-optimizer + ) + + SGLANG_ARGS=( + --rollout-num-gpus-per-engine 8 + --sglang-ep-size 8 + --sglang-mem-fraction-static 0.7 + --sglang-remote-instance-weight-loader-start-seed-via-transfer-engine + ) + + MISC_ARGS=( + --attention-dropout 0.0 + --hidden-dropout 0.0 + --accumulate-allreduce-grads-in-fp32 + --attention-softmax-in-fp32 + --attention-backend flash + ) + + P2P_ARGS=( + --update-weight-transfer-mode p2p + --update-weight-buffer-size "$((1024 * 1024 * 1024))" + --check-weight-update-equal + "${extra_mismatch_args[@]}" + ) + + WANDB_ARGS=( + --use-wandb + --wandb-project miles-p2p-rdma + --wandb-group Qwen3-30B-A3B + --wandb-key ${WANDB_KEY} + ) + + TRAIN_ARGS=( + train_async.py + --update-weights-interval 2 + --actor-num-nodes 1 + --actor-num-gpus-per-node 8 + --rollout-num-gpus 8 + "${MODEL_ARGS[@]}" + "${CKPT_ARGS[@]}" + "${ROLLOUT_ARGS[@]}" + "${PERF_ARGS[@]}" + "${GRPO_ARGS[@]}" + "${OPTIMIZER_ARGS[@]}" + "${SGLANG_ARGS[@]}" + "${MISC_ARGS[@]}" + "${P2P_ARGS[@]}" + "${WANDB_ARGS[@]}" + ) + + ray job submit --address="http://127.0.0.1:8265" \ + --runtime-env-json="{ + \"env_vars\": { + \"PYTHONPATH\": \"${MEGATRON_LM_PATH}/:${SGLANG_DIR}/python:${MILES_DIR}\", + \"CUDA_DEVICE_MAX_CONNECTIONS\": \"1\", + \"DEPRECATED_MEGATRON_COMPATIBLE\": \"1\", + \"RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES\": \"1\", + \"RAY_EXPERIMENTAL_NOSET_HIP_VISIBLE_DEVICES\": \"1\", + \"HIP_VISIBLE_DEVICES\": \"0,1,2,3,4,5,6,7\", + \"PYTORCH_HIP_ALLOC_CONF\": \"expandable_segments:True\", + \"no_proxy\": \"${MASTER_ADDR},127.0.0.1\", + \"MASTER_ADDR\": \"${MASTER_ADDR}\" + } + }" \ + -- python3 "${TRAIN_ARGS[@]}" +} + +# ============== Main ============== +[ $# -lt 1 ] && echo "Usage: bash $0 {head|worker|submit}" && exit 1 +case "$1" in + head) run_head ;; + worker) run_worker ;; + submit) run_submit ;; + *) echo "Unknown command: $1"; exit 1 ;; +esac