涉及文件: connectors/core.py、flowengine/、可能涉及 CheckpointManager(Issue 3)
问题描述:
connectors/core.py 中的 _CheckpointTracker 已经实现了 offset 级别的 checkpoint 机制(按行数计数、周期性调用 on_checkpoint 回调、finalize 时保存最终位点)。但这套机制目前是一个独立的辅助工具层,存在以下集成缺失:
on_checkpoint 回调在当前使用场景中是可选参数,没有任何地方将其与 FlowNet 的执行路径或 CheckpointManager 对接。connector 产生的 checkpoint 数据只会在调用方自己传入回调时才被保存,不会主动持久化。
- 当 connector 被重新分配给新的 worker(例如节点失败后重建)时,没有从 CheckpointManager 中读取上次
resume_offset 的逻辑,导致重建后必须从头重新读取数据。
- connector 的错误传播(
ConnectorError 分类、ConnectorQualityStats 的统计)与 FlowNet operator 层的故障分类(Issue 5)没有对接路径,connector 读取失败时无法被 recovery_policy 感知和处理。
DEFAULT_MAX_JSONL_LINE_BYTES 和 DEFAULT_MAX_PANDAS_FALLBACK_BYTES 等参数目前是模块级常量,无法通过 FlowNet 的 operator config 进行配置。
任务目标: 让 data connector 的 checkpoint 和恢复行为成为 FlowNet 执行路径的一部分,而不是只能在 connector 层内部使用的独立机制,从而支持 connector 数据源的可靠重放和断点续读。
完成标准: ConnectorCheckpoint 的 on_checkpoint 在 FlowNet 运行时内有默认持久化路径,connector 重建后能从 CheckpointManager 读取上次 resume_offset 而不必从头重读数据。
父 Issue: #1484
涉及文件:
connectors/core.py、flowengine/、可能涉及CheckpointManager(Issue 3)问题描述:
connectors/core.py中的_CheckpointTracker已经实现了 offset 级别的 checkpoint 机制(按行数计数、周期性调用on_checkpoint回调、finalize时保存最终位点)。但这套机制目前是一个独立的辅助工具层,存在以下集成缺失:on_checkpoint回调在当前使用场景中是可选参数,没有任何地方将其与 FlowNet 的执行路径或CheckpointManager对接。connector 产生的 checkpoint 数据只会在调用方自己传入回调时才被保存,不会主动持久化。resume_offset的逻辑,导致重建后必须从头重新读取数据。ConnectorError分类、ConnectorQualityStats的统计)与 FlowNet operator 层的故障分类(Issue 5)没有对接路径,connector 读取失败时无法被recovery_policy感知和处理。DEFAULT_MAX_JSONL_LINE_BYTES和DEFAULT_MAX_PANDAS_FALLBACK_BYTES等参数目前是模块级常量,无法通过 FlowNet 的 operator config 进行配置。任务目标: 让 data connector 的 checkpoint 和恢复行为成为 FlowNet 执行路径的一部分,而不是只能在 connector 层内部使用的独立机制,从而支持 connector 数据源的可靠重放和断点续读。
完成标准:
ConnectorCheckpoint的on_checkpoint在 FlowNet 运行时内有默认持久化路径,connector 重建后能从 CheckpointManager 读取上次resume_offset而不必从头重读数据。父 Issue: #1484