Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2928,6 +2928,18 @@ object SQLConf {
.checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
.createWithDefault(1)

val STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED =
buildConf("spark.sql.streaming.offsetLog.v1ToV2.autoUpgrade.enabled")
.internal()
.doc("When true, automatically upgrades V1 offset logs to V2 format when " +
"spark.sql.streaming.offsetLog.formatVersion=2 and all sources are named. " +
"This is a one-way migration that cannot be rolled back. " +
"Users must ensure all batches are committed before enabling this config. " +
"Default: false.")
.version("4.2.0")
.booleanConf
.createWithDefault(false)

val STREAMING_MAX_NUM_STATE_SCHEMA_FILES =
buildConf("spark.sql.streaming.stateStore.maxNumStateSchemaFiles")
.internal()
Expand Down Expand Up @@ -7342,6 +7354,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def streamingOffsetLogFormatVersion: Int = getConf(STREAMING_OFFSET_LOG_FORMAT_VERSION)

def streamingOffsetLogV1ToV2AutoUpgradeEnabled: Boolean =
getConf(STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED)

def stateStoreEncodingFormat: String = getConf(STREAMING_STATE_STORE_ENCODING_FORMAT)

def streamingValueStateSchemaEvolutionThreshold: Int =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,32 @@ case class OffsetSeq(
offsets: Seq[Option[OffsetV2]],
metadataOpt: Option[OffsetSeqMetadata] = None) extends OffsetSeqBase {
override def version: Int = OffsetSeqLog.VERSION_1

/**
* Converts this V1 OffsetSeq to a V2 OffsetMap using the provided ordered source names.
* This is used during V1 to V2 checkpoint upgrade to map positional offsets to named offsets.
*
* @param orderedSourceNames The source names in the same order as the positional offsets.
* @return An OffsetMap with offsets keyed by source name.
*/
def toOffsetMap(orderedSourceNames: Seq[String]): OffsetMap = {
require(orderedSourceNames.size == offsets.size,
s"orderedSourceNames.size (${orderedSourceNames.size}) != offsets.size (${offsets.size})")

// Validate no duplicate source names - duplicates would cause silent data loss in toMap
val duplicates = orderedSourceNames.groupBy(identity).filter(_._2.size > 1).keys
require(duplicates.isEmpty,
s"Cannot convert V1 OffsetSeq to V2 OffsetMap: duplicate source names found: " +
s"${duplicates.mkString(", ")}")

val offsetsMap = orderedSourceNames.zip(offsets).toMap
val v1Meta = metadataOpt.getOrElse(OffsetSeqMetadata())
val v2Meta = OffsetSeqMetadataV2(
batchWatermarkMs = v1Meta.batchWatermarkMs,
batchTimestampMs = v1Meta.batchTimestampMs,
conf = v1Meta.conf)
OffsetMap(offsetsMap, v2Meta)
}
}

object OffsetSeq {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, FileSourceMetadataAttribute, LocalTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState, FlatMapGroupsWithState, GlobalLimit, Join, LeafNode, LocalRelation, LogicalPlan, Project, StreamSourceAwareLogicalPlan, TransformWithState, TransformWithStateInPySpark}
import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, Unassigned, WriteToStream}
import org.apache.spark.sql.catalyst.streaming.{FlowAssigned, StreamingRelationV2, StreamingSourceIdentifyingName, Unassigned, UserProvided, WriteToStream}
import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.classic.{Dataset, SparkSession}
Expand All @@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper}
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2}
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetMap, OffsetSeq, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2}
import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter}
import org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetHandler
import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE}
Expand Down Expand Up @@ -104,6 +104,8 @@ class MicroBatchExecution(

@volatile protected var sources: Seq[SparkDataStream] = Seq.empty

@volatile protected var sourceNames: Seq[StreamingSourceIdentifyingName] = Nil

// Source ID mapping for OffsetMap support
// Using index as sourceId initially, can be extended to support user-provided names
// This is initialized in the same path as the sources Seq (defined above) and is used
Expand Down Expand Up @@ -269,6 +271,7 @@ class MicroBatchExecution(
case r: StreamingDataSourceV2ScanRelation => (r.stream, r.relation.sourceIdentifyingName)
}
sources = sourcesWithNames.map(_._1)
sourceNames = sourcesWithNames.map(_._2)

if (enforceNamed) {
// When enforcement is enabled, all sources should be named after validation in analysis.
Expand All @@ -287,6 +290,7 @@ class MicroBatchExecution(
}.toMap
} else {
// When enforcement is disabled, use positional indices (backward compatibility)
// This will be rebuilt with name keys later if V1-to-V2 upgrade occurs
sourceIdMap = sources.zipWithIndex.map {
case (source, index) => index.toString -> source
}.toMap
Expand Down Expand Up @@ -470,16 +474,102 @@ class MicroBatchExecution(
sparkSessionForStream.conf.get(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION)
}

// Get the desired offset log format version from config (what user explicitly requested)
val desiredOffsetLogFormatVersion =
sparkSessionForStream.conf.get(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION)

// --- V1 to V2 auto-upgrade ---
// Only upgrade if:
// 1. Current offset log is V1 (protects against re-upgrading V2 checkpoints)
// 2. User explicitly requests V2 via config
// 3. User has explicitly enabled the upgrade
//
// IMPORTANT: The check offsetLogFormatVersion == VERSION_1 ensures we never
// modify existing V2 checkpoints, even if upgrade config is enabled.
// This prevents inadvertent key changes (e.g., "0" -> "payments") when
// users add names to sources.
val finalOffsetLogFormatVersion =
if (offsetLogFormatVersion == OffsetSeqLog.VERSION_1 &&
desiredOffsetLogFormatVersion == OffsetSeqLog.VERSION_2) {

val upgradeEnabled =
sparkSessionForStream.conf.get(
SQLConf.STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED)

if (upgradeEnabled) {
maybeUpgradeOffsetLogToV2(lastCommittedBatchId) match {
case Some(newBatchId) =>
val keyType = if (allSourcesNamed(sourceNames)) "named" else "positional"
logInfo(s"Upgraded offset log from V1 to V2 format using $keyType keys " +
s"at batch $newBatchId.")
OffsetSeqLog.VERSION_2
case None =>
logWarning(
"V1 to V2 offset log upgrade skipped: uncommitted batch exists. " +
"Allow the query to finish one clean batch then restart to " +
"trigger the upgrade.")
offsetLogFormatVersion
}
} else {
// User wants V2 but hasn't enabled upgrade - give clear guidance
throw new IllegalStateException(
"Offset log is in V1 format but V2 format was requested via " +
"spark.sql.streaming.offsetLog.formatVersion=2. " +
"To migrate the offset log, set " +
"spark.sql.streaming.offsetLog.v1ToV2.autoUpgrade.enabled=true. " +
"Important: This is a one-way migration that cannot be rolled back. " +
"Ensure all batches are committed before enabling. " +
"See documentation for details.")
}
} else {
offsetLogFormatVersion
}

// Rebuild sourceIdMap with V2 keys if the final format is V2
if (finalOffsetLogFormatVersion == OffsetSeqLog.VERSION_2) {
val sourceIds = offsetLog.getLatest() match {
case Some((_, offsetMap: OffsetMap)) =>
// Existing V2 checkpoint - use the key scheme that was originally chosen
// by checking if source evolution was enabled when checkpoint was created
val wasSourceEvolutionEnabled = offsetMap.metadata match {
case v2Meta: OffsetSeqMetadataV2 =>
v2Meta.conf.get(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key)
.exists(_.toBoolean)
case _ => false
}

if (wasSourceEvolutionEnabled) {
// Source evolution enabled - use named keys (allows add/remove)
generateSourceIds(sourceNames)
} else {
// Source evolution disabled - use positional keys (immutable)
sources.indices.map(_.toString)
}
case _ =>
// Fresh V2 or just upgraded - generate based on current naming
generateSourceIds(sourceNames)
}

val duplicateIds = sourceIds.groupBy(identity).filter(_._2.size > 1).keys
require(duplicateIds.isEmpty,
s"Cannot use V2 offset format: duplicate source IDs found: " +
s"${duplicateIds.mkString(", ")}")

sourceIdMap = sources.zip(sourceIds).map { case (source, id) =>
id -> source
}.toMap
}

// Set the offset log format version in the sparkSessionForStream conf
sparkSessionForStream.conf.set(
SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key, offsetLogFormatVersion)
SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key, finalOffsetLogFormatVersion)

val execCtx = new MicroBatchExecutionContext(id, runId, name, triggerClock, sources, sink,
progressReporter, -1, sparkSession,
offsetLogFormatVersionOpt = Some(offsetLogFormatVersion),
offsetLogFormatVersionOpt = Some(finalOffsetLogFormatVersion),
previousContext = None)

execCtx.offsetSeqMetadata = offsetLogFormatVersion match {
execCtx.offsetSeqMetadata = finalOffsetLogFormatVersion match {
case OffsetSeqLog.VERSION_2 =>
OffsetSeqMetadataV2(batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf)
case OffsetSeqLog.VERSION_1 =>
Expand Down Expand Up @@ -1443,6 +1533,115 @@ class MicroBatchExecution(
awaitProgressLock.unlock()
}
}

private def allSourcesNamed(names: Seq[StreamingSourceIdentifyingName]): Boolean =
names.nonEmpty && names.forall {
case UserProvided(_) | FlowAssigned(_) =>
true
case Unassigned =>
false
}

/**
* Extracts the string key to use in the V2 OffsetMap for a named source.
*/
private def sourceIdentifyingNameToId(name: StreamingSourceIdentifyingName): String =
name match {
case UserProvided(n) =>
n
case FlowAssigned(n) =>
n
case Unassigned =>
throw new IllegalStateException("Cannot derive source ID from Unassigned name")
}

/**
* Generates source IDs for V2 offset log.
* - If all sources are named, use those names
* - Otherwise, use positional indices ("0", "1", "2", etc.)
*/
private def generateSourceIds(names: Seq[StreamingSourceIdentifyingName]): Seq[String] = {
if (allSourcesNamed(names)) {
names.map(sourceIdentifyingNameToId)
} else {
// Use positional indices - same assumption V1 makes about plan stability
names.indices.map(_.toString)
}
}

/**
* Attempts to write a new V2 upgrade batch (lastBatchId + 1) by converting the last
* committed V1 OffsetSeq into an OffsetMap keyed by source IDs.
*
* Source IDs are either user-provided names (if all sources are named) or positional
* indices ("0", "1", "2"). Using positional indices makes the same plan stability
* assumption that V1 format already makes.
*
* Preconditions (caller is responsible):
* - offsetLogFormatVersion == VERSION_1
*
* Returns Some(newBatchId) on success.
* Returns None (skips upgrade) if the query is not in a clean state, i.e.
* latestBatchId != lastCommittedBatchId. In that case the query will run one V1 batch
* to process/commit the outstanding batch, and the upgrade fires on the next clean
* restart.
*
* Throws if source count in the offset log doesn't match the plan, or on concurrent
* write.
*/
private def maybeUpgradeOffsetLogToV2(lastCommittedBatchId: Long): Option[Long] = {
// Fresh query (never committed a batch) - nothing to upgrade
if (lastCommittedBatchId < 0) return None

val latestBatchId = offsetLog.getLatestBatchId().getOrElse(return None)

// Only upgrade from a clean state (no outstanding uncommitted batch)
if (latestBatchId != lastCommittedBatchId) return None

offsetLog.get(lastCommittedBatchId) match {
case Some(v1: OffsetSeq) =>
val v1Offsets = v1.offsets // Seq[Option[OffsetV2]], positionally ordered

// Sanity-check: plan must have the same number of sources as the offset log
if (v1Offsets.size != sources.size) {
throw new IllegalStateException(
s"Cannot upgrade offset log to V2: offset log has ${v1Offsets.size} " +
s"source(s) but the current query plan has ${sources.size} source(s). " +
s"Make sure the query has the same number of sources as when it " +
s"was last run.")
}

val newBatchId = latestBatchId + 1

// Convert V1 OffsetSeq to V2 OffsetMap using source IDs in plan order.
// If sources are named, use those names. Otherwise, use positional indices.
// The plan-traversal order is the same order used to assign V1 positional
// indices, so this is the safest available mapping (same assumption V1 makes
// about plan stability).
val upgradeOffsetMap = v1.toOffsetMap(generateSourceIds(sourceNames))

// Write offset log entry - fails loudly on concurrent modification
if (!offsetLog.add(newBatchId, upgradeOffsetMap)) {
throw QueryExecutionErrors.concurrentStreamLogUpdate(newBatchId)
}

// Write commit entry - upgrade batch is immediately committed (no data was processed)
val upgradeCommit = commitLog.get(lastCommittedBatchId)
.getOrElse(CommitMetadata())
.copy(stateUniqueIds = None)
if (!commitLog.add(newBatchId, upgradeCommit)) {
throw QueryExecutionErrors.concurrentStreamLogUpdate(newBatchId)
}

Some(newBatchId)

case Some(_: OffsetMap) =>
None // Already V2; no upgrade needed

case _ =>
None
}
}
}

object MicroBatchExecution {
Expand Down
Loading