feat: add v1 parallel serving strategy support#37
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a Parallel Strategy V1 supporting a DP x TP device topology, adding ParallelConfig and DataParallelAsyncLLMEngine to route requests across independent data-parallel replicas using a least-pending-tokens policy. It also updates CLI arguments, configuration schemas, and the Qwen3-14B runner to support multi-device groups. The review feedback highlights two key improvement opportunities: resolving a double-counting issue in the load estimation of DataParallelAsyncLLMEngine by decrementing the extra load as soon as a request is registered in the scheduler, and starting DP replica engines in parallel using asyncio.gather to optimize server startup time.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| async def add_request( | ||
| self, | ||
| request_id: str, | ||
| prompt: str, | ||
| config, | ||
| ) -> AsyncGenerator[TokenOutput, None]: | ||
| replica_idx = self._select_replica() | ||
| request_load = self._estimate_request_load(prompt, config) | ||
| self._route_extra_load[replica_idx] += request_load | ||
| self._request_to_replica[request_id] = replica_idx | ||
| try: | ||
| engine = self._engines[replica_idx] | ||
| async for output in engine.add_request(request_id, prompt, config): | ||
| yield output | ||
| finally: | ||
| self._request_to_replica.pop(request_id, None) | ||
| self._route_extra_load[replica_idx] = max( | ||
| 0, | ||
| self._route_extra_load[replica_idx] - request_load, | ||
| ) |
There was a problem hiding this comment.
Active requests are double-counted in the load estimation. Once engine.add_request is called, the request is synchronously added to the scheduler, meaning engine.pending_token_load() immediately includes its remaining tokens. However, self._route_extra_load is not decremented until the request completely finishes. This causes the request's load to be counted twice for its entire duration, leading to suboptimal routing decisions.
We should decrement the extra load as soon as the request is registered in the scheduler (i.e., on the first yield of the generator).
async def add_request(
self,
request_id: str,
prompt: str,
config,
) -> AsyncGenerator[TokenOutput, None]:
replica_idx = self._select_replica()
request_load = self._estimate_request_load(prompt, config)
self._route_extra_load[replica_idx] += request_load
self._request_to_replica[request_id] = replica_idx
decremented = False
try:
engine = self._engines[replica_idx]
async for output in engine.add_request(request_id, prompt, config):
if not decremented:
self._route_extra_load[replica_idx] = max(
0,
self._route_extra_load[replica_idx] - request_load,
)
decremented = True
yield output
finally:
self._request_to_replica.pop(request_id, None)
if not decremented:
self._route_extra_load[replica_idx] = max(
0,
self._route_extra_load[replica_idx] - request_load,
)| async def start(self) -> None: | ||
| """Start all DP replica engines.""" | ||
| started: list[Any] = [] | ||
| try: | ||
| for engine in self._engines: | ||
| await engine.start() | ||
| started.append(engine) | ||
| except Exception: | ||
| for engine in reversed(started): | ||
| await engine.stop() | ||
| raise |
There was a problem hiding this comment.
Starting DP replica engines sequentially can significantly increase the server startup time, especially since model loading and kernel compilation are time-consuming operations. Starting them in parallel using asyncio.gather allows overlapping these operations and speeds up startup.
| async def start(self) -> None: | |
| """Start all DP replica engines.""" | |
| started: list[Any] = [] | |
| try: | |
| for engine in self._engines: | |
| await engine.start() | |
| started.append(engine) | |
| except Exception: | |
| for engine in reversed(started): | |
| await engine.stop() | |
| raise | |
| async def start(self) -> None: | |
| """Start all DP replica engines in parallel.""" | |
| try: | |
| await asyncio.gather(*(engine.start() for engine in self._engines)) | |
| except Exception: | |
| await self.stop() | |
| raise |
📝 WalkthroughWalkthroughIntroduces Parallel Strategy V1 supporting DP x TP topology. A new ChangesParallel Strategy V1: DP x TP Serving Topology
Sequence Diagram(s)sequenceDiagram
participant Client as HTTP Client
participant AsyncLLMEngineClient
participant DPEngineCore_0 as DPEngineCore (replica 0)
participant DPEngineCore_1 as DPEngineCore (replica 1)
participant WorkerProcess
Client->>AsyncLLMEngineClient: add_request(prompt, generate_config)
AsyncLLMEngineClient->>DPEngineCore_0: pending_token_load()
AsyncLLMEngineClient->>DPEngineCore_1: pending_token_load()
note over AsyncLLMEngineClient: select replica with min load + _route_extra_load
AsyncLLMEngineClient->>DPEngineCore_1: add_request(prompt, generate_config)
DPEngineCore_1->>WorkerProcess: schedule & execute on TP device group
WorkerProcess-->>DPEngineCore_1: TokenOutput chunks
DPEngineCore_1-->>AsyncLLMEngineClient: AsyncGenerator[TokenOutput]
AsyncLLMEngineClient-->>Client: stream TokenOutput
AsyncLLMEngineClient->>AsyncLLMEngineClient: finally: remove _request_to_replica entry
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
python/core/async_engine.py (1)
132-145:⚠️ Potential issue | 🟠 Major | ⚡ Quick winEnsure spawned worker is cleaned up on startup failure.
If
ready_event.wait(...)times out or raises (Line 141),DPEngineCore.start()exits without tearing down the already spawned worker process. This can leave orphan processes and block subsequent starts.💡 Proposed fix
@@ import asyncio +import contextlib import logging @@ async def start(self) -> None: """Start worker process and engine loop.""" with profile_span("DPEngineCore.start", cat="serving"): process, input_q, output_q, ready_event = spawn_worker(self.config) self._worker_process = process self._input_queue = input_q self._output_queue = output_q logger.info("Waiting for worker to initialize model...") - await asyncio.to_thread(ready_event.wait, timeout=600) - if not ready_event.is_set(): - raise RuntimeError("Worker failed to initialize within timeout") + try: + ready = await asyncio.to_thread(ready_event.wait, timeout=600) + if not ready: + raise RuntimeError("Worker failed to initialize within timeout") + except Exception: + with contextlib.suppress(Exception): + input_q.put(WorkerCommand(type="shutdown")) + with contextlib.suppress(Exception): + process.join(timeout=5) + if process.is_alive(): + process.terminate() + self._worker_process = None + self._input_queue = None + self._output_queue = None + raise logger.info("Worker ready")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@python/core/async_engine.py` around lines 132 - 145, The start method in DPEngineCore spawns a worker process but does not clean it up if the ready_event timeout occurs or an exception is raised. Wrap the asyncio.to_thread call in a try-except block (or similar error handling) to ensure that if ready_event.wait() times out or raises an exception, the worker process stored in self._worker_process is properly terminated before the RuntimeError is raised. This prevents orphan processes from being left behind when startup fails.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@README.md`:
- Around line 170-173: The current documentation in the README.md section
(around lines 170-173) presents conflicting ring-size values for
`PTO2_RING_TASK_WINDOW` and `PTO2_RING_DEP_POOL` without clearly distinguishing
which setting applies to which topology. Restructure this section to explicitly
separate guidance by topology: create distinct subsections for single-replica
configurations and DP=2+ (distributed/multi-replica) configurations, clearly
stating that single-replica uses `1048576` while DP=2+ uses `131072`. This will
eliminate ambiguity and provide operators with one clear setting path for their
specific topology.
---
Outside diff comments:
In `@python/core/async_engine.py`:
- Around line 132-145: The start method in DPEngineCore spawns a worker process
but does not clean it up if the ready_event timeout occurs or an exception is
raised. Wrap the asyncio.to_thread call in a try-except block (or similar error
handling) to ensure that if ready_event.wait() times out or raises an exception,
the worker process stored in self._worker_process is properly terminated before
the RuntimeError is raised. This prevents orphan processes from being left
behind when startup fails.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 9ec74ea0-8337-463b-a68e-695a7aed79d0
📒 Files selected for processing (16)
README.mdexamples/model/qwen3_14b/npu_generate.pyexamples/model/qwen3_14b/npu_serving.jsonexamples/model/qwen3_14b/runner/npu_executor.pypython/cli/main.pypython/core/__init__.pypython/core/api.pypython/core/async_engine.pypython/core/parallel.pypython/core/pypto_executor.pypython/core/server.pypython/core/serving_worker.pytests/test_batching.pytests/test_cli.pytests/test_npu_prefix_chunk.pytests/test_parallel.py
| For the DP=2 validation above, keep `PTO2_RING_TASK_WINDOW` and | ||
| `PTO2_RING_DEP_POOL` at `131072`. Setting both to `1048576` with a 4 GiB heap can | ||
| reserve about 19 GiB of runtime arena per replica and fail with | ||
| `rtMalloc failed: 207001`. |
There was a problem hiding this comment.
Clarify the concurrent-serving ring-size guidance by topology.
The new DP=2 note (131072) conflicts with the earlier concurrent-serving default (1048576) in the same section, which can lead to wrong runtime tuning choices. Please split guidance explicitly by topology (single-replica vs DP=2+) so operators have one unambiguous setting path.
Suggested doc adjustment
-Single-request HTTP serving does not require the larger PTO2 ring settings. For
-concurrent NPU serving, start the server with the larger PTO2 ring settings:
+Single-request HTTP serving does not require the larger PTO2 ring settings.
+For concurrent NPU serving, use topology-specific ring settings:
+
+- Single-replica serving (no DP): `PTO2_RING_TASK_WINDOW=1048576` and `PTO2_RING_DEP_POOL=1048576`.
+- DP=2 serving: keep `PTO2_RING_TASK_WINDOW=131072` and `PTO2_RING_DEP_POOL=131072`.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@README.md` around lines 170 - 173, The current documentation in the README.md
section (around lines 170-173) presents conflicting ring-size values for
`PTO2_RING_TASK_WINDOW` and `PTO2_RING_DEP_POOL` without clearly distinguishing
which setting applies to which topology. Restructure this section to explicitly
separate guidance by topology: create distinct subsections for single-replica
configurations and DP=2+ (distributed/multi-replica) configurations, clearly
stating that single-replica uses `1048576` while DP=2+ uses `131072`. This will
eliminate ambiguity and provide operators with one clear setting path for their
specific topology.
|
Validation after renaming the serving architecture classes:
Local checks:
NPU serving checks:
Command shape: task-submit --device auto --max-time 1200 --run \
"bash -lc 'python /tmp/pypto-serving-dp-check.py dp1 19221'"Result:
Command shape: task-submit --device 12,13 --max-time 1800 --run \
"bash -lc 'export PTO2_RING_HEAP=4294967296 PTO2_RING_TASK_WINDOW=131072 PTO2_RING_DEP_POOL=131072; python /tmp/pypto-serving-dp-check.py dp2 19225'"Result:
Note: the same DP=2 check on auto-selected devices |
ecb8e9a to
6904dc6
Compare
|
Direct python/cli/main.py validation after the CLI import fix:
Notes:
|
d6415e8 to
2ceb391
Compare
|
Addressed the latest review feedback:
Validation:
Note: full |
| type=int, | ||
| default=1, | ||
| help="Offline generation does not launch DP replicas; values > 1 fail fast.", | ||
| ) |
There was a problem hiding this comment.
args参数类型和换行格式都有点五花八门,尽量统一下(dest和fullname一致的话加了有点冗余
| devices=parse_device_ids(args.devices, default_device=args.device_id), | ||
| ) | ||
| if parallel_config.data_parallel_size != 1: | ||
| raise ValueError("offline npu_generate.py supports tensor parallelism only; data_parallel_size must be 1") |
There was a problem hiding this comment.
目前离线的engine和online的engine不是同一个(原因是当前qwen有个cpu版本,用的不是serving的engine),当前dp只在online serving中做了。
| kv_cache_manager, | ||
| platform=args.platform, | ||
| device_id=args.device_id, | ||
| device_id=device_ids[0], |
There was a problem hiding this comment.
device_ids能不能包括device_id的功能
| ) | ||
| parser.add_argument("--dp", "--data-parallel-size", dest="data_parallel_size", type=int, default=1, help="Data-parallel replica count.") | ||
| parser.add_argument("--tp", "--tensor-parallel-size", dest="tensor_parallel_size", type=int, default=1, help="Tensor-parallel group size.") | ||
| parser.add_argument("--pp", "--pipeline-parallel-size", dest="pipeline_parallel_size", type=int, default=1, help="Pipeline parallel size.") |
There was a problem hiding this comment.
从pp到all2all-backend 这几个对外的选项等流程支持了再加吧,没给的先走parallelConfig默认值:1. 跑的时候开了也是无效或报错,没啥意义;2. 选项设计不太全,需要限制choices的都没有。
| # ----------------------------------------------------------------------------------------------------------- | ||
|
|
||
| from .async_engine import AsyncLLMEngineClient, DPEngineCore, EngineConfig | ||
| from .engine import LLMEngine |
There was a problem hiding this comment.
api.py 和 init 完全一样?好像没看到api.py在哪用
There was a problem hiding this comment.
这个是考虑把serving作为包给别的软件用,暂时不知道要不要去掉。
| ) | ||
| device_groups = parallel_config.replica_device_groups | ||
| first_group = device_groups[0] | ||
| worker_device_ids = first_group if parallel_config.data_parallel_size == 1 else devices |
There was a problem hiding this comment.
此处的计算和EngineConfig.worker_device_ids()什么关系
There was a problem hiding this comment.
CLI 手动算出 first_group、worker_device_ids(first_group if DP==1 else devices)和 device_id,然后 EngineConfig.worker_device_ids() 又用一个三分支解析器把同一件事重新推导一遍。结果是:顶层 EngineConfig.device_ids 在 DP=1 时含义是*"本副本的 TP 组",而在 DP>1 时含义是"所有副本的全部设备"*(这个值随即被 client 用 replace() 覆盖,且绝不能被读取)。更深的修法是单一事实来源——完全从 parallel_config 推导 device 归属,停止在顶层设置 device_ids——这也能顺便删掉 141 行的那个分支。
| task-submit --device auto --run \ | ||
| "python -m python.cli.main \ | ||
| --model /path/to/Qwen3-14B \ | ||
| task-submit --device auto --max-time 1200 --run \ |
There was a problem hiding this comment.
task-submit 是环境特定信息 不要写到readme里;改端口号好像也没意义?
There was a problem hiding this comment.
这个是给开发用的,要不加到docs/dev里面吧
There was a problem hiding this comment.
嗯,放到使用指导里之类的(按理说这应该是机器上的rules里。。)还有PTO2_xxx的几个变量设置,也是不太适合放readme
| def _estimate_request_load(self, prompt: str, config) -> int: | ||
| prompt_tokens = 0 | ||
| if self.tokenizer is not None: | ||
| prompt_tokens = len(self.tokenizer.encode(prompt)) |
There was a problem hiding this comment.
在这里和add_request执行了两次encode,需要简化
| self.scheduler.abort_request(sr.request.request_id) | ||
|
|
||
|
|
||
| class AsyncLLMEngineClient: |
There was a problem hiding this comment.
DPEngineCore 和 AsyncLLMEngine 的设计不合理 / 有债务的部分,建议在加新路由策略或第三个并行维度之前修改:
-
路由钩子从编排层反向泄漏进叶子 core —— 这是最大的设计问题。
DPEngineCore.pending_token_load()(async_engine.py:175)和 add_request(*, on_queued=...)(async_engine.py:186-192)存在的唯一原因是给 client 的 least-pending-tokens 路由用。DP=1(默认路径)下 core 独立运行,这俩是纯死重。依赖方向反了:叶子在为编排者的记账需求服务。后果是——一旦路由策略换成 round-robin / 队列深度,DPEngineCore 就得跟着改。根因是 DPEngineCore 在扮演双重身份:DP=1 时被直接当独立引擎用(main.py:206-208),DP>1 时又被 client 当子 core 组合。两种用法对那两个钩子的需求相反,于是叶子被迫同时迁就两边。
-
没有公共接口/Protocol,靠 duck-typed union 维持一致。
DPEngineCore 和 AsyncLLMEngineClient 只是恰好方法名相同,没有共同基类或 Protocol。AsyncLLMEngineClient 还硬编码 core_factory: Callable[..., DPEngineCore] = DPEngineCore(async_engine.py:370),并且直接伸手进 core 的内部(pending_token_load、靠 _request_to_replica 维护映射),而不是面向一个抽象编程。后果:两者签名一旦漂移(比如 core 的 add_request 多了 on_queued 而 client 没有),类型检查不会报,直到运行时。 -
命名误导。
- DPEngineCore:这个叶子是单副本 / TP-only 的核心,它自己不做数据并行——DP 是上面那层 client 做的。名字里的 "DP" 把"层级(core vs 路由)"和"路由层管理的拓扑维度"混为一谈。叫EngineCore 或 ReplicaEngineCore 更准确。
- AsyncLLMEngineClient:它不是任何远端服务的 client,而是进程内的路由/扇出层。叫 "Client" 暗示了外部依赖,实际只是组合了几个本地 DPEngineCore。反而原来的 AsyncLLMEngine 这个名字更适合放在这一层(对外是"引擎",对内组合 core)。
4.(小)失败路径双重 stop。 DPEngineCore.start() 失败时自己先 await self.stop()(async_engine.py:150-152),然后 AsyncLLMEngineClient.start() 又会对所有 core 调一遍 stop()(async_engine.py:416)。第二次 stop 基本幂等(_loop_task 是 None 会被跳过,join 已终止的进程也安全),但属于 sloppy。
建议的收紧方向(按性价比):
- 始终走 client,消除 DPEngineCore 的双重身份 —— run_serve 在 DP=1 时也用 AsyncLLMEngineClient(它本来就支持单 core)。这样 DPEngineCore 永远只作为"被组合的子core"存在,pending_token_load/on_queued 就名正言顺,双重身份和泄漏问题一起消失,顺带让 DP=1 默认路径也覆盖到路由逻辑(目前只有 fake 测试覆盖)。
- 抽一个 EngineCore Protocol(start/stop/add_request/abort_request/pending_token_load/generate_request_id),让 DPEngineCore 实现它,client 面向 Protocol 编程而非具体的 DPEngineCore。依赖方向就正了,也为日后换 core 实现留口子。
- 正名:DPEngineCore → EngineCore/ReplicaEngineCore,AsyncLLMEngineClient → AsyncLLMEngine(或 DPRouter)。让名字反映"路由层 vs 单副本核心"的真实分工。
Summary
ParallelConfigcontract forDP x TPserving topology and validation.DataParallelAsyncLLMEngineto route requests across independent DP replicas by pending token load.DistributedConfig.Validation
python -m pytest tests/test_cli.py tests/test_batching.py tests/test_parallel.py -qpython tests/lint/check_headers.pypython tests/lint/check_english_only.pyruff check --config ruff.toml python examples/model/qwen3_14b testsRuntime check:
--devices "$TASK_DEVICE" --data-parallel-size 2 --tensor-parallel-size 1PTO2_RING_HEAP=4294967296 PTO2_RING_TASK_WINDOW=131072 PTO2_RING_DEP_POOL=131072Successful: 2/2,Tokens total: 16, throughput about6.1 tok/sHuawei is a Chinese multinational technology company headquartered in ShNotes
pipeline_parallel_size > 1and expert parallelism until kernels support those modes.PTO2_RING_TASK_WINDOWandPTO2_RING_DEP_POOLto1048576with a 4 GiB heap can reserve about 19 GiB of runtime arena per replica and fail withrtMalloc failed: 207001.Related Issue
Refs #36