增加集体通信主机布局结构化输出#11
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a new Python utility tools/host_layout.py along with unit tests to parse MCCL cluster host specifications into a structured JSON layout. The review feedback suggests optimizing the host parsing function to avoid O(N^2) complexity when calculating rank boundaries by using a running counter. Additionally, it recommends using TypedDict to define a more precise schema for host entries, which improves type safety and removes the need for explicit type casting in the summary function.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| from pathlib import Path | ||
|
|
||
|
|
||
| def parse_host_spec(spec: str) -> list[dict[str, int | str]]: | ||
| hosts: list[dict[str, int | str]] = [] | ||
| for index, item in enumerate(part.strip() for part in spec.split(",") if part.strip()): | ||
| host, sep, process_count = item.rpartition(":") | ||
| if not sep or not host: | ||
| raise ValueError(f"invalid host entry: {item!r}") | ||
| ranks = int(process_count) | ||
| if ranks <= 0: | ||
| raise ValueError(f"process count must be positive: {item!r}") | ||
| hosts.append( | ||
| { | ||
| "host": host, | ||
| "process_count": ranks, | ||
| "rank_start": sum(entry["process_count"] for entry in hosts), | ||
| "rank_end": sum(entry["process_count"] for entry in hosts) + ranks - 1, | ||
| "node_index": index, | ||
| } | ||
| ) | ||
| if not hosts: | ||
| raise ValueError("host specification is empty") | ||
| return hosts |
There was a problem hiding this comment.
在 parse_host_spec 中,每次循环都通过 sum(entry["process_count"] for entry in hosts) 重新计算已处理的 rank 总数。这会导致
此外,使用 dict[str, int | str] 作为类型标注较为宽泛,导致在 summarize 中获取 process_count 时需要显式调用 int() 进行类型转换以满足类型检查。
建议引入 TypedDict 来定义结构化的主机信息类型,并使用一个累加变量 current_rank 来维护当前的 rank 起始位置,从而将复杂度降低到
from pathlib import Path
from typing import TypedDict
class HostEntry(TypedDict):
host: str
process_count: int
rank_start: int
rank_end: int
node_index: int
def parse_host_spec(spec: str) -> list[HostEntry]:
hosts: list[HostEntry] = []
current_rank = 0
for index, item in enumerate(part.strip() for part in spec.split(",") if part.strip()):
host, sep, process_count = item.rpartition(":")
if not sep or not host:
raise ValueError(f"invalid host entry: {item!r}")
try:
ranks = int(process_count)
except ValueError:
raise ValueError(f"invalid process count in entry: {item!r}")
if ranks <= 0:
raise ValueError(f"process count must be positive: {item!r}")
hosts.append(
{
"host": host,
"process_count": ranks,
"rank_start": current_rank,
"rank_end": current_rank + ranks - 1,
"node_index": index,
}
)
current_rank += ranks
if not hosts:
raise ValueError("host specification is empty")
return hosts| def summarize(spec: str) -> dict[str, object]: | ||
| hosts = parse_host_spec(spec) | ||
| return { | ||
| "node_count": len(hosts), | ||
| "total_processes": sum(int(entry["process_count"]) for entry in hosts), | ||
| "hosts": hosts, | ||
| } |
There was a problem hiding this comment.
在使用 TypedDict(HostEntry)后,entry["process_count"] 的类型已被明确限定为 int。因此,在计算 total_processes 时,无需再显式调用 int() 进行类型转换。
| def summarize(spec: str) -> dict[str, object]: | |
| hosts = parse_host_spec(spec) | |
| return { | |
| "node_count": len(hosts), | |
| "total_processes": sum(int(entry["process_count"]) for entry in hosts), | |
| "hosts": hosts, | |
| } | |
| def summarize(spec: str) -> dict[str, object]: | |
| hosts = parse_host_spec(spec) | |
| return { | |
| "node_count": len(hosts), | |
| "total_processes": sum(entry["process_count"] for entry in hosts), | |
| "hosts": hosts, | |
| } |
这次改动补上了集体通信主机布局结构化输出,主要是为了解决集体通信测试与结果整理流程里相关信息不够集中、人工整理成本较高的问题,让日常排查、验证和结果归档更直接。
实现上补充了对应工具或脚本逻辑,补上了对应测试,同时尽量保持现有用法不变,避免影响已有流程。
这一分支已经在沐曦算力环境完成实际验证,相关检查均已通过,现提交合入。