Skip to content

Issue 6:明确恢复触发时机与入口,确保恢复逻辑在真实失败场景中被调用 #1490

@iliujunn

Description

@iliujunn

涉及文件: runtime.pyV1RuntimeHost)、dispatch.pyflowengine/node_runtime.py

问题描述:

即使 Issue 1 和 Issue 3 实现了 snapshot 接口和 CheckpointManager,如果没有明确的触发时机和统一的恢复入口,"有恢复代码但没有被调用"的问题依然会出现。当前代码库中缺少以下恢复触发场景的实现:

  • 启动时恢复(startup recovery)V1RuntimeHost 初始化时,检查是否存在未完成的 flow run 或已注册的 stateful operator,如果 CheckpointManager 中有对应的有效快照,则在 operator 第一次执行前自动调用 import_snapshot() 恢复状态。
  • worker 重建后的恢复(post-restart recovery):当 WorkerHeartbeatMonitor(Issue 4)或 exception handler 决定重建某个 operator replica 时,重建流程应在执行 operator.setup() 之后、开始处理第一个 packet 之前,查询 CheckpointManager 并在策略允许时执行状态恢复。
  • connector resume(connector 位点恢复):当 data connector 重新分配给新的 worker 时,应从 CheckpointManager 或 _CheckpointTracker 中读取上次的 resume_offset,而不是从头重新读取数据。
  • 恢复失败后的后续动作:当 import_snapshot() 因 checksum 错误或 schema 不兼容而返回 CheckpointLoadFailure 时,恢复入口需要按照 recovery_policy 决定下一步动作,而不是将异常直接向上抛给 user code。

所有恢复触发场景应统一通过一个恢复协调函数(可以是 FlowEngineV1 的方法或独立的 RecoveryCoordinator)进行,避免每个触发场景各自实现一套恢复逻辑。

任务目标: 让 SAGE 的恢复能力具备明确入口和完整触发时机,确保任何合理的失败场景都能找到对应的恢复路径,而不是仅在测试环境中手动调用恢复接口。

完成标准: 启动时恢复、worker 重建后恢复、connector resume 三类场景有明确的恢复触发入口,且恢复失败时的后续动作由 recovery_policy 决定而非直接抛出未捕获异常。


父 Issue: #1484

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions