Skip to content

Issue 15:BaseOperator.restore_state() 是死代码,FutureOperator 是不可用的占位符 #1499

@iliujunn

Description

@iliujunn

涉及文件: src/sage/stream/operators.pysrc/sage/stream/datastream.pysrc/sage/runtime/pipeline_compiler.py

问题描述:

BaseOperator.restore_state(state: dict) 方法定义在算子基类中,看起来是状态恢复的入口,但:

  • 在整个代码库中没有任何地方调用它,与 FlowNet 的 StatefulProcessRuntimeCheckpointManager(未实现)等均无连接,是完全孤立的死代码。
  • 方法实现本身只做 self.function.restore_state() 的试探性调用(有该方法则调用,失败仅 warning),不返回任何状态来表明恢复是否成功,上层无法感知恢复失败。

FutureOperator 的全部三个可调用方法(process()emit()process_packet())均直接 raise RuntimeError,属于一个永远不能被执行的占位符:

  • FutureTransformation 被正常注册进 DSL pipeline 并送入 pipeline_compiler.py,但 _classify() 函数没有针对 FutureTransformation 的分支,导致 FutureOperator 被分类为 ServiceActorWrapper,若下游有 packet 触达,会直接崩溃。
  • 没有任何文档说明 FutureOperator 的预期使用方式和正确接入路径,用户无法正确使用。

需要做的工作:

  • 决定 BaseOperator.restore_state() 的设计意图:如果它将被接入 FlowNet 的 checkpoint 恢复路径(见 Issue 1/6),则应在 Issue 6 中统一处理;如果不会被接入,应删除以避免误导。
  • FutureOperator 应有明确定义的使用路径:要么实现其正确的 emit 机制(例如与 FlowNet 的 future/promise topic 集成),要么在 DSL 层阻止用户误用(如在 DataStream.future() 方法中添加不完整功能的警告),而不是让它以一个可以被加入 pipeline 但执行即 crash 的状态存在。

任务目标: 消除代码库中的误导性接口,使每个公开方法要么有完整的调用路径,要么有清晰的"不可用"声明,避免用户在使用过程中遇到无法预见的崩溃。

完成标准: BaseOperator.restore_state() 要么有完整调用路径,要么被删除;FutureOperator 要么有可用的 emit 机制,要么在 DSL 层阻止用户将其加入 pipeline,不能以"可加入但执行即 crash"的状态存在。


父 Issue: #1484

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions