Skip to content

Issue 13:修复 KeyBy 算子与 stateful.py 的状态隔离脱钩问题 #1497

@iliujunn

Description

@iliujunn

涉及文件: src/sage/stream/operators.pysrc/sage/stream/datastream.pysrc/sage/runtime/flownet/runtime/operator_runtime/stateful.py

问题描述:

KeyByOperator 将 packet 的 partition_key 字段更新后路由出去,按文档设计,下游 stateful operator 应当基于这个 key 实现 per-key 的状态隔离。但检查代码可以发现,partition_key 从未被传递或注入到 StatefulProcessRuntime 的状态读写路径中。当前 stateful.py 的状态操作只按 namespace + key 寻址,没有感知 packet 携带的 partition_key,也没有在 worker 维度做隔离。这意味着:

  • 对"user_A"和"user_B"的 packet 调用 process_with_state,得到的是完全相同的状态 namespace,而不是各自独立的 per-key 状态空间。
  • keyed_state_example.py 注释声称实现了"Per-user feature aggregation"和"Automatic state persistence and recovery",但实际上状态是保存在 MapFunction 实例变量里的,重启即丢失、跨 worker 无隔离。
  • KeyByTransformationpipeline_compiler.py 中被分类为 SourceActorWrapper,而不是一个独立的路由阶段,这可能导致 key 路由在某些 pipeline 拓扑下不生效。

需要修复的内容:

  • FlowEngineV1(或 operator 的执行上下文)中,将 packet 的 partition_key 注入到 StatefulProcessRuntime 的 namespace 构造逻辑中,使得 per-key 状态在存储层真正隔离。
  • 明确 keyed state 的语义:key_by 后的 process_with_state 是否自动获得 per-key 隔离,还是需要用户在 state namespace 中手动嵌入 key?目前这个语义在文档和代码中均未定义。
  • 修复 keyed_state_example.py 中描述与实现不符的问题,或补充实现使其真正支持 per-key 隔离。

任务目标:key_by 操作产生真实的 per-key 状态隔离,而不只是一个修改 packet 字段但对 stateful operator 无效的路由暗示。

完成标准: key_by 后的 process_with_state 能获得真实的 per-key 状态隔离,partition_key 被传递到 StatefulProcessRuntime 的 namespace 寻址中,不同 key 的状态互不干扰,keyed_state_example.py 的行为与注释描述一致。


父 Issue: #1484

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions