Skip to content

Issue 14:JoinOperator 缺少时间窗口语义,无界流中内存无界增长 #1498

@iliujunn

Description

@iliujunn

涉及文件: src/sage/stream/operators.pysrc/sage/stream/datastream.py

问题描述:

JoinOperator 的状态缓冲完全由用户传入的 JoinFunction 实例变量管理,没有任何框架层的窗口约束。这在无限流场景中会导致:

  • 如果 join 的两个流中有一侧数据速率较慢或某些 key 的数据稀少,对应 key 的等待缓冲会永久保留在内存中,直到进程 OOM 或重启。
  • 没有 watermark 机制,无法区分"数据尚未到达"和"数据已经迟到",也无法安全地触发窗口关闭和清理。
  • handle_stop_signal 的实现依赖 "Source" 字符串匹配来判断 stop signal 来源,这是脆弱的字符串耦合,当 source 名称不包含 "Source" 时会永远等待第二个 stop signal。

此外,JoinOperator 目前只支持 inner join 语义(双侧均有数据才输出),没有 left outer join 或窗口超时输出(如果某 key 在 N 秒内没有来自另一侧的数据则输出 null join 结果)。

需要明确并实现的内容:

  • 框架层是否提供 tumbling/sliding/session window 支持?如果是,JoinOperator 的窗口边界应由框架维护,而不是由 user function 自行管理缓冲。
  • 如果框架层暂不提供窗口支持,至少应在 JoinOperator 上增加 max_buffer_size_per_key(超出后驱逐最旧条目)和 join_timeout_sec(超时触发 null join 输出或丢弃)两个保护参数,防止无界积累。
  • handle_stop_signal 的两侧 stop signal 检测逻辑应基于注册的输入流数量,而不是字符串匹配。

任务目标:JoinOperator 在无限流场景中具备可控的内存占用,框架层承担窗口约束或至少提供 buffer bound 保护,使得生产环境中 Join pipeline 不会因数据倾斜或流速不均而 OOM。

完成标准: JoinOperator 在无限流下不会无界积累缓冲,具备 max_buffer_size_per_key 或窗口超时保护,handle_stop_signal 不依赖字符串匹配,而是基于注册的输入流数量判断终止条件。


父 Issue: #1484

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions