From cf4480aa1bd4feac286a7a22622dedb721e2ee0e Mon Sep 17 00:00:00 2001 From: Haichuan Hu Date: Thu, 28 May 2026 14:59:16 +0800 Subject: [PATCH 1/3] [feat] Enable openYuanrong RDMA support Signed-off-by: Haichuan Hu --- .../openyuanrong_datasystem.md | 59 ++++++++++++++++++- transfer_queue/config.yaml | 11 ++++ .../storage/bootstrap/yuanrong_bootstrap.py | 45 +++++++++++--- 3 files changed, 107 insertions(+), 8 deletions(-) diff --git a/docs/storage_backends/openyuanrong_datasystem.md b/docs/storage_backends/openyuanrong_datasystem.md index 1fbeba60..3155b47a 100644 --- a/docs/storage_backends/openyuanrong_datasystem.md +++ b/docs/storage_backends/openyuanrong_datasystem.md @@ -1,6 +1,6 @@ # OpenYuanrong-Datasystem Integration for TransferQueue -> Last updated: 05/18/2026 +> Last updated: 05/28/2026 ## Overview @@ -134,6 +134,8 @@ backend: worker_port: 31501 # Port for Yuanrong datasystem worker on each node metastore_port: 2379 # Port for metastore service on the head node enable_yr_npu_transport: true # Enable NPU transport for high-performance device-to-device transfer + enable_rdma: false # Enable host RDMA (H2H) transport via UCX + ucx_env_vars: {} # UCX env vars for dscli subprocess (e.g., {UCX_LOG_FILE: /tmp/ucx.log, UCX_LOG_LEVEL: ERROR}) worker_args: "--shared_memory_size_mb 8192 --remote_h2d_device_ids 0 --enable_huge_tlb true" ``` @@ -150,6 +152,17 @@ backend: - `worker_args` (**mandatory** when `enable_yr_npu_transport: true`): - `--remote_h2d_device_ids`: Enable RH2D (Remote Host-to-Device) for efficient cross-node NPU data transfer. Specify NPU device IDs as comma-separated values (e.g., `0,1,2,3`). Yuanrong manages all specified devices - to put/get tensors on NPU `X`, device ID `X` must be included in this argument. +**RDMA Options:** +- `enable_rdma`: Whether to enable host RDMA (H2H) transport via UCX. Requires RDMA-capable NIC hardware and `rdma-core` driver on all nodes. When enabled, TQ automatically adds `--enable_rdma true` to the dscli startup command and defaults `UCX_TLS=rc_x` in the subprocess environment. RDMA H2H and RH2D (NPU cross-node) can be enabled simultaneously — they are **not** mutually exclusive. +- `ucx_env_vars`: Dictionary of UCX environment variables passed to the dscli subprocess. These override parent process environment. Common variables: + - `UCX_TLS`: RDMA transport mode. Defaults to `rc_x` when `enable_rdma=true` and not specified here. Alternatives: `rc` (compatible), `ud` (low-latency), `dc` (large-scale). See [UCX environment parameters](https://github.com/openucx/ucx/wiki/UCX-environment-parameters). + - `UCX_LOG_FILE`: Path to UCX log file (e.g., `/tmp/ucx.log`). Requires `UCX_LOG_LEVEL` to be set. + - `UCX_LOG_LEVEL`: Log verbosity — `FATAL`, `ERROR`, `WARN`, `INFO`, `DEBUG`, `TRACE`. Use `DEBUG`/`TRACE` for troubleshooting. + - `UCX_NET_DEVICES`: RDMA device name (e.g., `mlx5_0:1`). Required in multi-NIC setups. + - `UCX_TCP_CM_ROUTE`: TCP control-flow interface for UCX connection setup. Must match the RDMA NIC's network plane. + +> For RDMA best practices, troubleshooting, and K8s deployment, see [openYuanrong RDMA Best Practices](https://pages.openeuler.openatom.cn/openyuanrong-datasystem/docs/zh-cn/latest/best_practices/best_practices_for_rdma.html). + > More configuration parameters for deploying the datasystem can refer to [dscli config](https://gitcode.com/openeuler/yuanrong-datasystem/blob/master/docs/source_zh_cn/deployment/dscli.md). ### Multi-Node Deployment @@ -178,6 +191,8 @@ backend: worker_port: 31501 metastore_port: 2379 enable_yr_npu_transport: true + enable_rdma: false + ucx_env_vars: {} worker_args: "--shared_memory_size_mb 65536 --remote_h2d_device_ids 0 --enable_huge_tlb true" ``` @@ -290,6 +305,37 @@ dscli start -w --worker_address :31501 \ --shared_memory_size_mb 8192 ``` +### Start with RDMA + +To enable RDMA for host-to-host (H2H) transfer, add `--enable_rdma true` to the dscli command and set UCX environment variables: + +```bash +# Set UCX environment variables +export UCX_TLS=rc_x +# (Optional) Configure UCX logging for debugging +export UCX_LOG_FILE=/tmp/ucx.log +export UCX_LOG_LEVEL=ERROR + +# Head node +dscli start -w --worker_address :31501 \ + --metastore_address :2379 \ + --start_metastore_service true \ + --enable_rdma true \ + --arena_per_tenant 1 \ + --enable_worker_worker_batch_get true \ + --shared_memory_size_mb 8192 + +# Worker node +dscli start -w --worker_address :31501 \ + --metastore_address :2379 \ + --enable_rdma true \ + --arena_per_tenant 1 \ + --enable_worker_worker_batch_get true \ + --shared_memory_size_mb 8192 +``` + +> `UCX_TLS=rc_x` forces RDMA transport — if RDMA is unavailable, the system will error rather than fall back to TCP. For alternative transport modes, see [UCX environment parameters](https://github.com/openucx/ucx/wiki/UCX-environment-parameters). + ### Stop Worker ```bash @@ -388,6 +434,17 @@ Common errors and solutions: - `Device not found`: Check if device ID is included in `--remote_h2d_device_ids` - `CANN error`: Verify CANN installation path and environment variables +### RDMA Issues + +When using `enable_rdma: true`, ensure: +- RDMA NIC hardware and `rdma-core` driver are installed on all nodes. Verify with `ibv_devices`. +- `UCX_TLS=rc_x` is compatible with your NIC. If not, set alternative mode via `ucx_env_vars` (e.g., `{UCX_TLS: rc}`). + +Common errors and solutions: +- **UCX endpoint timeout**: In multi-NIC setups, UCX may select an isolated network interface for control flow. Set `UCX_NET_DEVICES` and `UCX_TCP_CM_ROUTE` in `ucx_env_vars` to specify the correct RDMA device and its TCP interface. See [openYuanrong RDMA Best Practices](https://pages.openeuler.openatom.cn/openyuanrong-datasystem/docs/zh-cn/latest/best_practices/best_practices_for_rdma.html) for detailed troubleshooting. +- **RDMA verification**: Set `UCX_LOG_FILE` and `UCX_LOG_LEVEL` in `ucx_env_vars` (e.g., `{UCX_LOG_FILE: /tmp/ucx.log, UCX_LOG_LEVEL: INFO}`), then check logs for RC/RDMA entries to confirm RDMA is active. +- **Container environments**: Set `memlock` to `unlimited` in the container, otherwise RDMA memory registration may fail. + ### Out of Memory Error If Yuanrong throws an OOM error during operation: diff --git a/transfer_queue/config.yaml b/transfer_queue/config.yaml index faffbfaf..c277e44c 100644 --- a/transfer_queue/config.yaml +++ b/transfer_queue/config.yaml @@ -61,6 +61,17 @@ backend: metastore_port: 2379 # If enable npu transport enable_yr_npu_transport: false + # If enable host RDMA (H2H) transport via UCX. Requires RDMA NIC hardware and rdma-core driver. + # See https://pages.openeuler.openatom.cn/openyuanrong-datasystem/docs/zh-cn/latest/best_practices/best_practices_for_rdma.html + enable_rdma: false + # UCX env vars passed to dscli subprocess. Overrides parent env. TQ defaults UCX_TLS=rc_x when enable_rdma=true. + # UCX_TLS: RDMA transport mode. "rc_x" (default), "rc" (compatible), "ud" (low-latency), "dc" (large-scale). + # UCX_LOG_FILE: Path to UCX log file. Requires UCX_LOG_LEVEL to be set. + # UCX_LOG_LEVEL: FATAL, ERROR, WARN, INFO, DEBUG, TRACE. Use DEBUG/TRACE for troubleshooting. + # UCX_NET_DEVICES: RDMA device name (e.g., mlx5_0:1). Required in multi-NIC setups. + # UCX_TCP_CM_ROUTE: TCP control-flow interface for UCX connection setup. + # Example: ucx_env_vars: { UCX_TLS: rc_x, UCX_LOG_FILE: /tmp/ucx.log, UCX_LOG_LEVEL: ERROR } + ucx_env_vars: {} # Additional config for yuanrong worker. # Recommended options for NPU environments: # --remote_h2d_device_ids Enable RH2D for efficient cross-node data transfer. Specify NPU device IDs (comma-separated). diff --git a/transfer_queue/storage/bootstrap/yuanrong_bootstrap.py b/transfer_queue/storage/bootstrap/yuanrong_bootstrap.py index e114f939..c155d025 100644 --- a/transfer_queue/storage/bootstrap/yuanrong_bootstrap.py +++ b/transfer_queue/storage/bootstrap/yuanrong_bootstrap.py @@ -78,6 +78,8 @@ def start_datasystem_worker( metastore_address: str, is_head: bool, worker_args: str = "", + enable_rdma: bool = False, + ucx_env_vars: dict | None = None, ) -> None: """Start Yuanrong datasystem worker in metastore mode. @@ -105,18 +107,33 @@ def start_datasystem_worker( if worker_args: cmd.extend(worker_args.split()) + # Append --enable_rdma if enabled + if enable_rdma: + cmd.extend(["--enable_rdma", "true"]) + node_type = "head node" if is_head else "worker node" logger.info(f"Starting Yuanrong datasystem ({node_type}) at {worker_address}, worker_args={worker_args}") # Build environment with ASCEND_RT_VISIBLE_DEVICES if specified env = None device_ids = _parse_remote_h2d_device_ids(worker_args) - if device_ids: + if device_ids or enable_rdma or ucx_env_vars: env = os.environ.copy() - env["ASCEND_RT_VISIBLE_DEVICES"] = device_ids - logger.info( - f"Setting ASCEND_RT_VISIBLE_DEVICES={device_ids} for dscli subprocess ({node_type} at {worker_address})" - ) + if device_ids: + env["ASCEND_RT_VISIBLE_DEVICES"] = device_ids + logger.info( + f"Setting ASCEND_RT_VISIBLE_DEVICES={device_ids} for dscli subprocess ({node_type} at {worker_address})" + ) + # ucx_env_vars overrides parent env (highest priority) + if ucx_env_vars: + for key, value in ucx_env_vars.items(): + env[key] = str(value) + # Default UCX_TLS=rc_x only when enable_rdma and UCX_TLS still absent + if enable_rdma and "UCX_TLS" not in env: + env["UCX_TLS"] = "rc_x" + logger.info( + f"Setting UCX_TLS=rc_x (default for RDMA) for dscli subprocess ({node_type} at {worker_address})" + ) try: ds_result = subprocess.run( @@ -179,7 +196,15 @@ class YuanrongWorkerActor: intersection of local IP addresses with the provided node IPs. """ - def __init__(self, node_ips: list[str], worker_port: int, metastore_port: int, worker_args: str = ""): + def __init__( + self, + node_ips: list[str], + worker_port: int, + metastore_port: int, + worker_args: str = "", + enable_rdma: bool = False, + ucx_env_vars: dict | None = None, + ): """Initialize the Yuanrong worker actor. Args: @@ -208,6 +233,8 @@ def __init__(self, node_ips: list[str], worker_port: int, metastore_port: int, w self.metastore_port = metastore_port self.worker_address = f"{self.my_ip}:{worker_port}" self.worker_args = worker_args + self.enable_rdma = enable_rdma + self.ucx_env_vars = ucx_env_vars # First node in the list is assumed to be the head node. # This assumption is based on how interface.py constructs node_ips from ray.nodes(). @@ -236,6 +263,8 @@ def start(self) -> str: metastore_address=self.metastore_address, is_head=self.is_head, worker_args=self.worker_args, + enable_rdma=self.enable_rdma, + ucx_env_vars=self.ucx_env_vars, ) logger.info(f"Datasystem worker started successfully at {self.worker_address}") return self.worker_address @@ -313,6 +342,8 @@ def initialize_yuanrong_storage(conf: DictConfig) -> dict[str, Any] | None: worker_port = conf.backend.Yuanrong.worker_port metastore_port = conf.backend.Yuanrong.metastore_port worker_args = conf.backend.Yuanrong.get("worker_args", "") + enable_rdma = conf.backend.Yuanrong.get("enable_rdma", False) + ucx_env_vars = dict(conf.backend.Yuanrong.get("ucx_env_vars", {})) logger.info(f"Found {len(ordered_nodes)} alive Ray nodes: {node_ips}") @@ -352,7 +383,7 @@ def initialize_yuanrong_storage(conf: DictConfig) -> dict[str, Any] | None: actor = YuanrongWorkerActor.options( # type: ignore[attr-defined] placement_group=pg, placement_group_bundle_index=rank, - ).remote(node_ips, worker_port, metastore_port, worker_args) + ).remote(node_ips, worker_port, metastore_port, worker_args, enable_rdma, ucx_env_vars) worker_actors.append(actor) logger.info(f"Created {len(worker_actors)} YuanrongWorkerActor instances") From 83f68ed4ed9f00a0a5e19c5f736f8267e76a6938 Mon Sep 17 00:00:00 2001 From: Haichuan Hu Date: Fri, 29 May 2026 15:08:18 +0800 Subject: [PATCH 2/3] [feat] Enable openYuanrong RDMA support Signed-off-by: Haichuan Hu --- .../storage_backends/openyuanrong_datasystem.md | 17 +++++++++++++---- transfer_queue/config.yaml | 7 +++++-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/docs/storage_backends/openyuanrong_datasystem.md b/docs/storage_backends/openyuanrong_datasystem.md index 3155b47a..a0e346a1 100644 --- a/docs/storage_backends/openyuanrong_datasystem.md +++ b/docs/storage_backends/openyuanrong_datasystem.md @@ -145,7 +145,7 @@ backend: - `metastore_port`: Port for metastore service on the head node. - `worker_args`: Additional arguments passed to `dscli start` command: - `--shared_memory_size_mb`: Shared memory size in MB for datasystem worker. - - `--enable_huge_tlb`: Configure huge page memory to reduce TLB misses and improve memory access efficiency. Note: may cause system memory shortage, kernel OOM, or system instability. **Please allocate huge pages before starting datasystem** - refer to [Huge Page Guide](https://pages.openeuler.openatom.cn/openyuanrong-datasystem/docs/zh-cn/latest/appendix/hugepage_guide.html). + - `--enable_huge_tlb`: Configure huge page memory to reduce TLB misses and improve memory access efficiency. Note: may cause system memory shortage, kernel OOM, or system instability. **Please allocate huge pages before starting datasystem** - refer to [Huge Page Guide](https://pages.openeuler.openatom.cn/openyuanrong-datasystem/docs/zh-cn/latest/appendix/hugepage_guide.html). Before enabling, OS config required (root privilege): `sysctl -w vm.nr_hugepages=` (each page is 2MB, e.g. 65536 for 128GB) and `ulimit -l unlimited` (allow pinning enough memory for RDMA/Ascend). **NPU Transfer Options:** - `enable_yr_npu_transport`: Enable NPU transport for high-performance device-to-device data transfer. Set to `true` when using NPU tensors. @@ -155,7 +155,7 @@ backend: **RDMA Options:** - `enable_rdma`: Whether to enable host RDMA (H2H) transport via UCX. Requires RDMA-capable NIC hardware and `rdma-core` driver on all nodes. When enabled, TQ automatically adds `--enable_rdma true` to the dscli startup command and defaults `UCX_TLS=rc_x` in the subprocess environment. RDMA H2H and RH2D (NPU cross-node) can be enabled simultaneously — they are **not** mutually exclusive. - `ucx_env_vars`: Dictionary of UCX environment variables passed to the dscli subprocess. These override parent process environment. Common variables: - - `UCX_TLS`: RDMA transport mode. Defaults to `rc_x` when `enable_rdma=true` and not specified here. Alternatives: `rc` (compatible), `ud` (low-latency), `dc` (large-scale). See [UCX environment parameters](https://github.com/openucx/ucx/wiki/UCX-environment-parameters). + - `UCX_TLS`: RDMA transport mode. Precedence: `ucx_env_vars` > parent env > fallback default `rc_x` (when `enable_rdma=true`). Alternatives: `rc` (compatible), `ud` (low-latency), `dc` (large-scale). See [UCX environment parameters](https://github.com/openucx/ucx/wiki/UCX-environment-parameters). - `UCX_LOG_FILE`: Path to UCX log file (e.g., `/tmp/ucx.log`). Requires `UCX_LOG_LEVEL` to be set. - `UCX_LOG_LEVEL`: Log verbosity — `FATAL`, `ERROR`, `WARN`, `INFO`, `DEBUG`, `TRACE`. Use `DEBUG`/`TRACE` for troubleshooting. - `UCX_NET_DEVICES`: RDMA device name (e.g., `mlx5_0:1`). Required in multi-NIC setups. @@ -323,7 +323,8 @@ dscli start -w --worker_address :31501 \ --enable_rdma true \ --arena_per_tenant 1 \ --enable_worker_worker_batch_get true \ - --shared_memory_size_mb 8192 + --shared_memory_size_mb 8192 \ + --enable_huge_tlb true # Worker node dscli start -w --worker_address :31501 \ @@ -331,9 +332,17 @@ dscli start -w --worker_address :31501 \ --enable_rdma true \ --arena_per_tenant 1 \ --enable_worker_worker_batch_get true \ - --shared_memory_size_mb 8192 + --shared_memory_size_mb 8192 \ + --enable_huge_tlb true ``` +Parameters: +- `--enable_rdma true`: Enable RDMA for H2H data transfer between workers. +- `--arena_per_tenant 1`: Number of shared memory arenas per tenant. Set to 1 for fastest startup; higher values improve first-allocation performance but increase fd usage. +- `--enable_worker_worker_batch_get true`: Enable batch get between workers for better cross-node transfer throughput. +- `--shared_memory_size_mb 8192`: Per-node shared memory size in MB. All clients on the same node share this shared memory space. +- `--enable_huge_tlb true`: Enable huge page memory to reduce TLB misses and accelerate startup/transfer. Before enabling, OS config required (root privilege): `sysctl -w vm.nr_hugepages=` (each page is 2MB) and `ulimit -l unlimited`. + > `UCX_TLS=rc_x` forces RDMA transport — if RDMA is unavailable, the system will error rather than fall back to TCP. For alternative transport modes, see [UCX environment parameters](https://github.com/openucx/ucx/wiki/UCX-environment-parameters). ### Stop Worker diff --git a/transfer_queue/config.yaml b/transfer_queue/config.yaml index c277e44c..9d3ff151 100644 --- a/transfer_queue/config.yaml +++ b/transfer_queue/config.yaml @@ -64,8 +64,8 @@ backend: # If enable host RDMA (H2H) transport via UCX. Requires RDMA NIC hardware and rdma-core driver. # See https://pages.openeuler.openatom.cn/openyuanrong-datasystem/docs/zh-cn/latest/best_practices/best_practices_for_rdma.html enable_rdma: false - # UCX env vars passed to dscli subprocess. Overrides parent env. TQ defaults UCX_TLS=rc_x when enable_rdma=true. - # UCX_TLS: RDMA transport mode. "rc_x" (default), "rc" (compatible), "ud" (low-latency), "dc" (large-scale). + # UCX env vars passed to dscli subprocess. Precedence: ucx_env_vars > parent env > TQ default (UCX_TLS=rc_x when enable_rdma=true). + # UCX_TLS: RDMA transport mode. "rc_x" (default when RDMA enabled and unset), "rc" (compatible), "ud" (low-latency), "dc" (large-scale). # UCX_LOG_FILE: Path to UCX log file. Requires UCX_LOG_LEVEL to be set. # UCX_LOG_LEVEL: FATAL, ERROR, WARN, INFO, DEBUG, TRACE. Use DEBUG/TRACE for troubleshooting. # UCX_NET_DEVICES: RDMA device name (e.g., mlx5_0:1). Required in multi-NIC setups. @@ -76,5 +76,8 @@ backend: # Recommended options for NPU environments: # --remote_h2d_device_ids Enable RH2D for efficient cross-node data transfer. Specify NPU device IDs (comma-separated). # --enable_huge_tlb Enable huge page memory to improve performance. Required for >21GB shared memory on 910B. + # Before enabling, OS config required (root privilege): + # sysctl -w vm.nr_hugepages= (each page is 2MB, e.g. 65536 for 128GB) + # ulimit -l unlimited (allow pinning enough memory for RDMA/Ascend) # Example: "--shared_memory_size_mb 16384 --remote_h2d_device_ids 0,1,2,3 --enable_huge_tlb true" worker_args: "--shared_memory_size_mb 8192" From 22903ab391fffb5400f28e620160ab514815b2fd Mon Sep 17 00:00:00 2001 From: Haichuan Hu Date: Fri, 29 May 2026 17:45:05 +0800 Subject: [PATCH 3/3] [feat] fix gratefully stopping Signed-off-by: Haichuan Hu --- .../storage/bootstrap/yuanrong_bootstrap.py | 19 ++++++++----------- transfer_queue/utils/yuanrong_utils.py | 16 +++++++++------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/transfer_queue/storage/bootstrap/yuanrong_bootstrap.py b/transfer_queue/storage/bootstrap/yuanrong_bootstrap.py index c155d025..56825932 100644 --- a/transfer_queue/storage/bootstrap/yuanrong_bootstrap.py +++ b/transfer_queue/storage/bootstrap/yuanrong_bootstrap.py @@ -390,7 +390,7 @@ def initialize_yuanrong_storage(conf: DictConfig) -> dict[str, Any] | None: # Find which actor is running on the head node (driver IP) # The head node actor needs to start first to initialize metastore service - head_actor_index = None + head_actor_index = 0 for idx, actor in enumerate(worker_actors): try: node_ip = ray.get(actor.get_node_ip.remote()) @@ -400,10 +400,6 @@ def initialize_yuanrong_storage(conf: DictConfig) -> dict[str, Any] | None: except Exception: pass - if head_actor_index is None: - logger.warning("Could not identify head node actor, using actor 0 as default") - head_actor_index = 0 - logger.info(f"Head node actor identified: actor {head_actor_index}") # Start head worker first to initialize metastore service @@ -426,26 +422,27 @@ def initialize_yuanrong_storage(conf: DictConfig) -> dict[str, Any] | None: "worker_actors": worker_actors, "metastore_address": metastore_address, "placement_group": pg, + "head_actor_index": head_actor_index, } except Exception as e: # Cleanup on initialization failure: attempt graceful stop of started workers first logger.error(f"Failed to start Yuanrong workers: {e}, cleaning up...") # Try to gracefully stop workers that may have already started + # Stop worker nodes (non-head) in parallel first, then head node (metastore service) if worker_actors: stop_exceptions = [] - # Stop worker nodes (all except head node 0) first - if len(worker_actors) > 1: - stop_refs = [actor.stop.remote() for actor in worker_actors[1:]] - for idx, stop_ref in enumerate(stop_refs, start=1): + other_indices = [i for i in range(len(worker_actors)) if i != head_actor_index] + if other_indices: + stop_refs = [worker_actors[idx].stop.remote() for idx in other_indices] + for idx, stop_ref in zip(other_indices, stop_refs, strict=False): try: ray.get(stop_ref, timeout=30) except Exception as stop_e: stop_exceptions.append(stop_e) logger.warning(f"Failed to stop worker node actor {idx}: {stop_e}") - # Stop head node (actor 0) try: - ray.get(worker_actors[0].stop.remote(), timeout=30) + ray.get(worker_actors[head_actor_index].stop.remote(), timeout=30) except Exception as stop_e: stop_exceptions.append(stop_e) logger.warning(f"Failed to stop head node actor: {stop_e}") diff --git a/transfer_queue/utils/yuanrong_utils.py b/transfer_queue/utils/yuanrong_utils.py index 5f8ddf6e..342cfad4 100644 --- a/transfer_queue/utils/yuanrong_utils.py +++ b/transfer_queue/utils/yuanrong_utils.py @@ -153,17 +153,19 @@ def cleanup_yuanrong_resources(storage_value: Any) -> None: worker_actors = storage_value.get("worker_actors", []) placement_group = storage_value.get("placement_group") + head_actor_index = storage_value.get("head_actor_index", 0) try: if worker_actors: logger.info(f"Cleaning up Yuanrong backend (stopping {len(worker_actors)} workers)...") - # Stop worker nodes (all except head node 0) in parallel first + # Stop worker nodes (non-head) in parallel first, then head node (metastore service) stop_exceptions = [] - if len(worker_actors) > 1: - logger.info(f"Stopping {len(worker_actors) - 1} worker nodes (excluding head) in parallel...") - stop_refs = [actor.stop.remote() for actor in worker_actors[1:]] - for idx, stop_ref in enumerate(stop_refs, start=1): + other_indices = [i for i in range(len(worker_actors)) if i != head_actor_index] + if other_indices: + logger.info(f"Stopping {len(other_indices)} worker nodes in parallel...") + stop_refs = [worker_actors[idx].stop.remote() for idx in other_indices] + for idx, stop_ref in zip(other_indices, stop_refs, strict=False): try: ray.get(stop_ref) except Exception as e: @@ -172,10 +174,10 @@ def cleanup_yuanrong_resources(storage_value: Any) -> None: if len(stop_exceptions) < len(stop_refs): logger.info("Completed stop requests for non-head worker nodes") - # Then stop head node (actor 0) which runs metastore service + # Then stop head node which runs metastore service logger.info("Stopping head node with metastore service...") try: - ray.get(worker_actors[0].stop.remote()) + ray.get(worker_actors[head_actor_index].stop.remote()) logger.info("Head node stopped successfully") except Exception as e: stop_exceptions.append(e)