[SPARK-55795][SS] Add automatic V1 to V2 offset log upgrade for streaming queries with named sources#54577
Open
ericm-db wants to merge 12 commits intoapache:masterfrom
Open
[SPARK-55795][SS] Add automatic V1 to V2 offset log upgrade for streaming queries with named sources#54577ericm-db wants to merge 12 commits intoapache:masterfrom
ericm-db wants to merge 12 commits intoapache:masterfrom
Conversation
fb20d1d to
6f02737
Compare
- Add validation in OffsetSeq.toOffsetMap to detect duplicate source names that would cause silent data loss - Add validation in MicroBatchExecution when building V2 sourceIdMap to prevent duplicate names - Remove unused test parameter (outputDir) - Fix unused variables in tests (v1BatchId, v2BatchId) These changes improve robustness by failing fast with clear error messages when duplicate source names are detected, preventing silent data loss during V1-to-V2 checkpoint upgrade or new V2 query initialization.
6f02737 to
9768076
Compare
Add spark.sql.streaming.checkpoint.v1ToV2.autoUpgrade.enabled config to require explicit user opt-in for V1 to V2 checkpoint migration. Previously, the upgrade would happen automatically when users set offsetLog.formatVersion=2 and added names to sources, which could be surprising for production systems. Changes: - Add STREAMING_CHECKPOINT_V1_TO_V2_AUTO_UPGRADE_ENABLED config (default: false) - Update MicroBatchExecution to check config and throw clear error if not enabled - Error message explains the migration requirement and consequences - Update all existing upgrade tests to set the new config - Add test to verify error message when config is not set This ensures users explicitly acknowledge the one-way, irreversible nature of the checkpoint migration before it occurs.
…inology Update all references from "checkpoint upgrade" to "offset log upgrade" to avoid confusion with checkpoint v2 (state store checkpointing). This change clarifies that we're specifically upgrading the offset log format, not the entire checkpoint structure. Changes: - Rename config: checkpoint.v1ToV2.autoUpgrade.enabled → offsetLog.v1ToV2.autoUpgrade.enabled - Rename function: maybeUpgradeCheckpointToV2 → maybeUpgradeOffsetLogToV2 - Rename test suite: CheckpointV1ToV2UpgradeSuite → OffsetLogV1ToV2UpgradeSuite - Update all error messages, comments, and test names to use "offset log" - Update PR title and description This is a terminology-only change with no functional modifications.
Previously, when a user requested V2 offset log format via spark.sql.streaming.offsetLog.formatVersion=2 but didn't name their sources with .name(), the system would silently ignore the V2 request and continue using V1 format. This commit adds a clear error message when V2 is requested but sources aren't named, explaining that V2 format requires all sources to have names. Changes: - Add validation in MicroBatchExecution to check if sources are named when V2 format is requested - Throw IllegalStateException with clear guidance when unnamed sources are detected with V2 request - Add test case to verify the error is thrown with correct message Error message guides users to either: 1. Add .name() to all sources, or 2. Set offsetLog.formatVersion=1 to continue with V1 format
Enable auto-upgrade from V1 to V2 offset log format even when sources
are not named, using positional indices ("0", "1", "2") as keys.
Rationale: V1 format already assumes plan stability between batches
(index 0 must always map to the same source). The V1→V2 upgrade makes
the exact same assumption, so requiring names was unnecessarily strict.
Changes:
- Add generateSourceIds() to produce either named or positional IDs
- Remove requirement for allSourcesNamed in upgrade path
- Use named keys when available, fall back to positional indices
- Update maybeUpgradeOffsetLogToV2 to handle both cases
- Rebuild sourceIdMap for V2 regardless of naming
- Log whether upgrade used "named" or "positional" keys
- Update test to verify positional key upgrade works
Benefits:
- Users can upgrade to V2 without naming all sources
- Named keys still preferred when available (better debugging)
- No stronger assumptions than V1 already makes
- Easier migration path for existing queries
Example upgrade with unnamed sources:
V1: [paymentOffset, refundOffset] (positional)
V2: {"0": paymentOffset, "1": refundOffset} (positional in map)
Fix bug where V2 checkpoints could have their key scheme inadvertently
changed on restart if users added/removed source names.
Problem:
1. Upgrade V1→V2 with source evolution disabled → Creates {"0", "1"}
2. User later adds .name("my_source") to source
3. Query restart regenerates sourceIdMap with key "my_source"
4. Offset log still has key "0" → mismatch! 💥
Solution:
Read the persisted ENABLE_STREAMING_SOURCE_EVOLUTION config from the
existing V2 checkpoint metadata to determine the original key scheme:
- If source evolution was ENABLED → use named keys (allows evolution)
- If source evolution was DISABLED → use positional keys forever
This ensures the key scheme is immutable once a V2 checkpoint is created,
preventing accidental breakage.
Changes:
- Check existing V2 checkpoint's source evolution config from metadata
- Use that to determine whether to generate positional or named keys
- Add comment explaining protection against re-upgrading V2 checkpoints
- Add test verifying positional keys persist even when names added later
Example:
V2 created with source evolution disabled + unnamed sources:
- Keys: {"0", "1"} → forever positional
- Adding .name() later → still uses {"0", "1"}
V2 created with source evolution enabled + named sources:
- Keys: {"payments", "refunds"} → can add/remove sources
- Renaming sources → uses new names (evolution allowed)
…l key persistence
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This PR introduces an automatic offset log upgrade mechanism that allows streaming queries to migrate from V1 (positional) offset tracking to V2 (named) offset tracking when users add
.name()to their streaming sources.Key components:
OffsetSeq.toOffsetMap() - Converts V1 positional offsets to V2 named offsets using provided source names
MicroBatchExecution upgrade logic - Orchestrates the automatic offset log upgrade
spark.sql.streaming.offsetLog.formatVersion=2Unassigned)spark.sql.streaming.offsetLog.v1ToV2.autoUpgrade.enabled=trueSafety validations:
Comprehensive test suite - OffsetLogV1ToV2UpgradeSuite with tests for:
Why are the changes needed?
Currently, when users want to migrate from V1 (index-based) to V2 (name-based) offset tracking, they must:
This is problematic because:
With this change, users can safely migrate existing V1 offset logs to V2 format by:
.name()to all streaming sourcesspark.sql.streaming.offsetLog.formatVersion=2spark.sql.streaming.offsetLog.v1ToV2.autoUpgrade.enabled=trueThe upgrade preserves all state and offset positions, enabling seamless transition to the more flexible V2 format that supports source evolution (adding/removing sources by name).
Does this PR introduce any user-facing change?
Yes. This PR introduces two new behaviors:
1. New config (default: false)
When set to
true, enables automatic V1 to V2 offset log upgrade when conditions are met.2. New error message when upgrade needed but not enabled:
Previous behavior: Query would continue with V1 format (or fail with unclear error)
New behavior: Clear error message when V1 offset log exists, V2 requested, and upgrade config not set:
This is a backwards compatible change - existing V1 queries continue working unchanged unless users explicitly opt into the upgrade.
How was this patch tested?
Added comprehensive test suite
OffsetLogV1ToV2UpgradeSuitewith the following test cases:V1 offset log + all sources named auto-upgrades to V2
V1 offset log + no sources named continues with V1
Already V2 offset log + named sources continues with V2
Multi-source upgrade preserves all offsets correctly
Source count mismatch throws clear error
V1 offset log + V2 requested without upgrade config throws clear error
All tests use real file-based streaming sources to ensure end-to-end correctness.
Was this patch authored or co-authored using generative AI tooling?
No.