diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d5719a35cb366..e7b68ba8cc215 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2928,6 +2928,18 @@ object SQLConf { .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") .createWithDefault(1) + val STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED = + buildConf("spark.sql.streaming.offsetLog.v1ToV2.autoUpgrade.enabled") + .internal() + .doc("When true, automatically upgrades V1 offset logs to V2 format when " + + "spark.sql.streaming.offsetLog.formatVersion=2 and all sources are named. " + + "This is a one-way migration that cannot be rolled back. " + + "Users must ensure all batches are committed before enabling this config. " + + "Default: false.") + .version("4.2.0") + .booleanConf + .createWithDefault(false) + val STREAMING_MAX_NUM_STATE_SCHEMA_FILES = buildConf("spark.sql.streaming.stateStore.maxNumStateSchemaFiles") .internal() @@ -7342,6 +7354,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def streamingOffsetLogFormatVersion: Int = getConf(STREAMING_OFFSET_LOG_FORMAT_VERSION) + def streamingOffsetLogV1ToV2AutoUpgradeEnabled: Boolean = + getConf(STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED) + def stateStoreEncodingFormat: String = getConf(STREAMING_STATE_STORE_ENCODING_FORMAT) def streamingValueStateSchemaEvolutionThreshold: Int = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala index bf2278b814922..ca400d55a7fe5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala @@ -97,6 +97,32 @@ case class OffsetSeq( offsets: Seq[Option[OffsetV2]], metadataOpt: Option[OffsetSeqMetadata] = None) extends OffsetSeqBase { override def version: Int = OffsetSeqLog.VERSION_1 + + /** + * Converts this V1 OffsetSeq to a V2 OffsetMap using the provided ordered source names. + * This is used during V1 to V2 checkpoint upgrade to map positional offsets to named offsets. + * + * @param orderedSourceNames The source names in the same order as the positional offsets. + * @return An OffsetMap with offsets keyed by source name. + */ + def toOffsetMap(orderedSourceNames: Seq[String]): OffsetMap = { + require(orderedSourceNames.size == offsets.size, + s"orderedSourceNames.size (${orderedSourceNames.size}) != offsets.size (${offsets.size})") + + // Validate no duplicate source names - duplicates would cause silent data loss in toMap + val duplicates = orderedSourceNames.groupBy(identity).filter(_._2.size > 1).keys + require(duplicates.isEmpty, + s"Cannot convert V1 OffsetSeq to V2 OffsetMap: duplicate source names found: " + + s"${duplicates.mkString(", ")}") + + val offsetsMap = orderedSourceNames.zip(offsets).toMap + val v1Meta = metadataOpt.getOrElse(OffsetSeqMetadata()) + val v2Meta = OffsetSeqMetadataV2( + batchWatermarkMs = v1Meta.batchWatermarkMs, + batchTimestampMs = v1Meta.batchTimestampMs, + conf = v1Meta.conf) + OffsetMap(offsetsMap, v2Meta) + } } object OffsetSeq { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index 35a658d350ab5..ccb6a7bab7b59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -32,7 +32,7 @@ import org.apache.spark.internal.LogKeys._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, FileSourceMetadataAttribute, LocalTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState, FlatMapGroupsWithState, GlobalLimit, Join, LeafNode, LocalRelation, LogicalPlan, Project, StreamSourceAwareLogicalPlan, TransformWithState, TransformWithStateInPySpark} -import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, Unassigned, WriteToStream} +import org.apache.spark.sql.catalyst.streaming.{FlowAssigned, StreamingRelationV2, StreamingSourceIdentifyingName, Unassigned, UserProvided, WriteToStream} import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.classic.{Dataset, SparkSession} @@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper} -import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2} +import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetMap, OffsetSeq, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2} import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter} import org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetHandler import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE} @@ -104,6 +104,8 @@ class MicroBatchExecution( @volatile protected var sources: Seq[SparkDataStream] = Seq.empty + @volatile protected var sourceNames: Seq[StreamingSourceIdentifyingName] = Nil + // Source ID mapping for OffsetMap support // Using index as sourceId initially, can be extended to support user-provided names // This is initialized in the same path as the sources Seq (defined above) and is used @@ -269,6 +271,7 @@ class MicroBatchExecution( case r: StreamingDataSourceV2ScanRelation => (r.stream, r.relation.sourceIdentifyingName) } sources = sourcesWithNames.map(_._1) + sourceNames = sourcesWithNames.map(_._2) if (enforceNamed) { // When enforcement is enabled, all sources should be named after validation in analysis. @@ -287,6 +290,7 @@ class MicroBatchExecution( }.toMap } else { // When enforcement is disabled, use positional indices (backward compatibility) + // This will be rebuilt with name keys later if V1-to-V2 upgrade occurs sourceIdMap = sources.zipWithIndex.map { case (source, index) => index.toString -> source }.toMap @@ -470,16 +474,102 @@ class MicroBatchExecution( sparkSessionForStream.conf.get(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION) } + // Get the desired offset log format version from config (what user explicitly requested) + val desiredOffsetLogFormatVersion = + sparkSessionForStream.conf.get(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION) + + // --- V1 to V2 auto-upgrade --- + // Only upgrade if: + // 1. Current offset log is V1 (protects against re-upgrading V2 checkpoints) + // 2. User explicitly requests V2 via config + // 3. User has explicitly enabled the upgrade + // + // IMPORTANT: The check offsetLogFormatVersion == VERSION_1 ensures we never + // modify existing V2 checkpoints, even if upgrade config is enabled. + // This prevents inadvertent key changes (e.g., "0" -> "payments") when + // users add names to sources. + val finalOffsetLogFormatVersion = + if (offsetLogFormatVersion == OffsetSeqLog.VERSION_1 && + desiredOffsetLogFormatVersion == OffsetSeqLog.VERSION_2) { + + val upgradeEnabled = + sparkSessionForStream.conf.get( + SQLConf.STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED) + + if (upgradeEnabled) { + maybeUpgradeOffsetLogToV2(lastCommittedBatchId) match { + case Some(newBatchId) => + val keyType = if (allSourcesNamed(sourceNames)) "named" else "positional" + logInfo(s"Upgraded offset log from V1 to V2 format using $keyType keys " + + s"at batch $newBatchId.") + OffsetSeqLog.VERSION_2 + case None => + logWarning( + "V1 to V2 offset log upgrade skipped: uncommitted batch exists. " + + "Allow the query to finish one clean batch then restart to " + + "trigger the upgrade.") + offsetLogFormatVersion + } + } else { + // User wants V2 but hasn't enabled upgrade - give clear guidance + throw new IllegalStateException( + "Offset log is in V1 format but V2 format was requested via " + + "spark.sql.streaming.offsetLog.formatVersion=2. " + + "To migrate the offset log, set " + + "spark.sql.streaming.offsetLog.v1ToV2.autoUpgrade.enabled=true. " + + "Important: This is a one-way migration that cannot be rolled back. " + + "Ensure all batches are committed before enabling. " + + "See documentation for details.") + } + } else { + offsetLogFormatVersion + } + + // Rebuild sourceIdMap with V2 keys if the final format is V2 + if (finalOffsetLogFormatVersion == OffsetSeqLog.VERSION_2) { + val sourceIds = offsetLog.getLatest() match { + case Some((_, offsetMap: OffsetMap)) => + // Existing V2 checkpoint - use the key scheme that was originally chosen + // by checking if source evolution was enabled when checkpoint was created + val wasSourceEvolutionEnabled = offsetMap.metadata match { + case v2Meta: OffsetSeqMetadataV2 => + v2Meta.conf.get(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key) + .exists(_.toBoolean) + case _ => false + } + + if (wasSourceEvolutionEnabled) { + // Source evolution enabled - use named keys (allows add/remove) + generateSourceIds(sourceNames) + } else { + // Source evolution disabled - use positional keys (immutable) + sources.indices.map(_.toString) + } + case _ => + // Fresh V2 or just upgraded - generate based on current naming + generateSourceIds(sourceNames) + } + + val duplicateIds = sourceIds.groupBy(identity).filter(_._2.size > 1).keys + require(duplicateIds.isEmpty, + s"Cannot use V2 offset format: duplicate source IDs found: " + + s"${duplicateIds.mkString(", ")}") + + sourceIdMap = sources.zip(sourceIds).map { case (source, id) => + id -> source + }.toMap + } + // Set the offset log format version in the sparkSessionForStream conf sparkSessionForStream.conf.set( - SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key, offsetLogFormatVersion) + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key, finalOffsetLogFormatVersion) val execCtx = new MicroBatchExecutionContext(id, runId, name, triggerClock, sources, sink, progressReporter, -1, sparkSession, - offsetLogFormatVersionOpt = Some(offsetLogFormatVersion), + offsetLogFormatVersionOpt = Some(finalOffsetLogFormatVersion), previousContext = None) - execCtx.offsetSeqMetadata = offsetLogFormatVersion match { + execCtx.offsetSeqMetadata = finalOffsetLogFormatVersion match { case OffsetSeqLog.VERSION_2 => OffsetSeqMetadataV2(batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf) case OffsetSeqLog.VERSION_1 => @@ -1443,6 +1533,115 @@ class MicroBatchExecution( awaitProgressLock.unlock() } } + + private def allSourcesNamed(names: Seq[StreamingSourceIdentifyingName]): Boolean = + names.nonEmpty && names.forall { + case UserProvided(_) | FlowAssigned(_) => + true + case Unassigned => + false + } + + /** + * Extracts the string key to use in the V2 OffsetMap for a named source. + */ + private def sourceIdentifyingNameToId(name: StreamingSourceIdentifyingName): String = + name match { + case UserProvided(n) => + n + case FlowAssigned(n) => + n + case Unassigned => + throw new IllegalStateException("Cannot derive source ID from Unassigned name") + } + + /** + * Generates source IDs for V2 offset log. + * - If all sources are named, use those names + * - Otherwise, use positional indices ("0", "1", "2", etc.) + */ + private def generateSourceIds(names: Seq[StreamingSourceIdentifyingName]): Seq[String] = { + if (allSourcesNamed(names)) { + names.map(sourceIdentifyingNameToId) + } else { + // Use positional indices - same assumption V1 makes about plan stability + names.indices.map(_.toString) + } + } + + /** + * Attempts to write a new V2 upgrade batch (lastBatchId + 1) by converting the last + * committed V1 OffsetSeq into an OffsetMap keyed by source IDs. + * + * Source IDs are either user-provided names (if all sources are named) or positional + * indices ("0", "1", "2"). Using positional indices makes the same plan stability + * assumption that V1 format already makes. + * + * Preconditions (caller is responsible): + * - offsetLogFormatVersion == VERSION_1 + * + * Returns Some(newBatchId) on success. + * Returns None (skips upgrade) if the query is not in a clean state, i.e. + * latestBatchId != lastCommittedBatchId. In that case the query will run one V1 batch + * to process/commit the outstanding batch, and the upgrade fires on the next clean + * restart. + * + * Throws if source count in the offset log doesn't match the plan, or on concurrent + * write. + */ + private def maybeUpgradeOffsetLogToV2(lastCommittedBatchId: Long): Option[Long] = { + // Fresh query (never committed a batch) - nothing to upgrade + if (lastCommittedBatchId < 0) return None + + val latestBatchId = offsetLog.getLatestBatchId().getOrElse(return None) + + // Only upgrade from a clean state (no outstanding uncommitted batch) + if (latestBatchId != lastCommittedBatchId) return None + + offsetLog.get(lastCommittedBatchId) match { + case Some(v1: OffsetSeq) => + val v1Offsets = v1.offsets // Seq[Option[OffsetV2]], positionally ordered + + // Sanity-check: plan must have the same number of sources as the offset log + if (v1Offsets.size != sources.size) { + throw new IllegalStateException( + s"Cannot upgrade offset log to V2: offset log has ${v1Offsets.size} " + + s"source(s) but the current query plan has ${sources.size} source(s). " + + s"Make sure the query has the same number of sources as when it " + + s"was last run.") + } + + val newBatchId = latestBatchId + 1 + + // Convert V1 OffsetSeq to V2 OffsetMap using source IDs in plan order. + // If sources are named, use those names. Otherwise, use positional indices. + // The plan-traversal order is the same order used to assign V1 positional + // indices, so this is the safest available mapping (same assumption V1 makes + // about plan stability). + val upgradeOffsetMap = v1.toOffsetMap(generateSourceIds(sourceNames)) + + // Write offset log entry - fails loudly on concurrent modification + if (!offsetLog.add(newBatchId, upgradeOffsetMap)) { + throw QueryExecutionErrors.concurrentStreamLogUpdate(newBatchId) + } + + // Write commit entry - upgrade batch is immediately committed (no data was processed) + val upgradeCommit = commitLog.get(lastCommittedBatchId) + .getOrElse(CommitMetadata()) + .copy(stateUniqueIds = None) + if (!commitLog.add(newBatchId, upgradeCommit)) { + throw QueryExecutionErrors.concurrentStreamLogUpdate(newBatchId) + } + + Some(newBatchId) + + case Some(_: OffsetMap) => + None // Already V2; no upgrade needed + + case _ => + None + } + } } object MicroBatchExecution { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetLogV1ToV2UpgradeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetLogV1ToV2UpgradeSuite.scala new file mode 100644 index 0000000000000..ccdf267c49103 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetLogV1ToV2UpgradeSuite.scala @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.File + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap, OffsetSeq, OffsetSeqLog} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamTest, Trigger} + +/** + * Test suite for automatic V1 to V2 offset log upgrade functionality. + * + * Tests the migration from positional offset tracking (OffsetSeq) to named + * offset tracking (OffsetMap) when users add .name() to their streaming sources. + */ +class OffsetLogV1ToV2UpgradeSuite extends StreamTest with BeforeAndAfter { + + after { + spark.streams.active.foreach(_.stop()) + } + + /** + * Helper to write JSON data to a directory for file streaming sources. + */ + private def writeJsonData(dir: File, data: Seq[String]): Unit = { + val file = new File(dir, s"data-${System.currentTimeMillis()}.json") + val writer = new java.io.PrintWriter(file) + try { + // scalastyle:off println + data.foreach(writer.println) + // scalastyle:on println + } finally { + writer.close() + } + } + + /** + * Helper to get the OffsetSeqLog from a checkpoint directory. + */ + private def getOffsetLog(checkpointPath: String): OffsetSeqLog = { + new OffsetSeqLog(spark, s"$checkpointPath/offsets") + } + + /** + * Helper to create multiple temporary directories for testing. + */ + private def withTempDirs(f: (File, File, File) => Unit): Unit = { + withTempDir { dir1 => + withTempDir { dir2 => + withTempDir { dir3 => + f(dir1, dir2, dir3) + } + } + } + } + + /** + * Helper for 4-directory variant. + */ + private def withTempDirs(f: (File, File, File, File) => Unit): Unit = { + withTempDir { dir1 => + withTempDir { dir2 => + withTempDir { dir3 => + withTempDir { dir4 => + f(dir1, dir2, dir3, dir4) + } + } + } + } + } + + /** + * Helper for 5-directory variant. + */ + private def withTempDirs(f: (File, File, File, File, File) => Unit): Unit = { + withTempDir { dir1 => + withTempDir { dir2 => + withTempDir { dir3 => + withTempDir { dir4 => + withTempDir { dir5 => + f(dir1, dir2, dir3, dir4, dir5) + } + } + } + } + } + } + + test("V1 offset log + all sources named auto-upgrades to V2") { + withTempDirs { (checkpointDir, dataDir1, dataDir2) => + // Write initial data + writeJsonData(dataDir1, Seq("""{"value": 1}""", """{"value": 2}""")) + writeJsonData(dataDir2, Seq("""{"value": 10}""", """{"value": 20}""")) + + // Step 1: Start with V1 offset log (no names, enforcement disabled) + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "1", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") { + val query1 = spark.readStream + .format("json") + .schema("value INT") + .load(dataDir1.getCanonicalPath) + .union( + spark.readStream + .format("json") + .schema("value INT") + .load(dataDir2.getCanonicalPath) + ) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query1.awaitTermination() + + // Verify V1 was used + val offsetLog = getOffsetLog(checkpointDir.getCanonicalPath) + val v1LatestOpt = offsetLog.getLatest() + assert(v1LatestOpt.isDefined, "Should have offset log entry") + val (_, v1Offset) = v1LatestOpt.get + assert(v1Offset.isInstanceOf[OffsetSeq], "Should be using V1 OffsetSeq") + assert(v1Offset.version == OffsetSeqLog.VERSION_1) + } + + // Step 2: Restart with named sources + V2 explicitly requested - should auto-upgrade to V2 + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true", + SQLConf.STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED.key -> "true") { + val query2 = spark.readStream + .format("json") + .schema("value INT") + .name("source_a") + .load(dataDir1.getCanonicalPath) + .union( + spark.readStream + .format("json") + .schema("value INT") + .name("source_b") + .load(dataDir2.getCanonicalPath) + ) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query2.awaitTermination() + + // Verify upgrade occurred + val offsetLog2 = getOffsetLog(checkpointDir.getCanonicalPath) + val v2LatestOpt = offsetLog2.getLatest() + assert(v2LatestOpt.isDefined, "Should have offset log entry after upgrade") + val (_, v2Offset) = v2LatestOpt.get + assert(v2Offset.isInstanceOf[OffsetMap], "Should have upgraded to V2 OffsetMap") + assert(v2Offset.version == OffsetSeqLog.VERSION_2) + + // Verify offsets are keyed by name + val offsetMap = v2Offset.asInstanceOf[OffsetMap] + assert(offsetMap.offsetsMap.contains("source_a"), "Should contain source_a") + assert(offsetMap.offsetsMap.contains("source_b"), "Should contain source_b") + } + } + } + + test("V1 offset log + no sources named continues with V1") { + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "1", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") { + withTempDir { checkpointDir => + withTempDir { dataDir => + writeJsonData(dataDir, Seq("""{"value": 1}""", """{"value": 2}""")) + + // Start with V1, no names + val query1 = spark.readStream + .format("json") + .schema("value INT") + .load(dataDir.getCanonicalPath) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query1.awaitTermination() + + val offsetLog = getOffsetLog(checkpointDir.getCanonicalPath) + val v1Offset = offsetLog.getLatest().get._2 + assert(v1Offset.isInstanceOf[OffsetSeq]) + assert(v1Offset.version == OffsetSeqLog.VERSION_1) + + // Restart without names - should remain V1 + writeJsonData(dataDir, Seq("""{"value": 3}""")) + + val query2 = spark.readStream + .format("json") + .schema("value INT") + .load(dataDir.getCanonicalPath) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query2.awaitTermination() + + val offsetLog2 = getOffsetLog(checkpointDir.getCanonicalPath) + val v1OffsetAfter = offsetLog2.getLatest().get._2 + assert(v1OffsetAfter.isInstanceOf[OffsetSeq], "Should still be V1") + assert(v1OffsetAfter.version == OffsetSeqLog.VERSION_1) + } + } + } + } + + test("Already V2 offset log + named sources continues with V2") { + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") { + withTempDir { checkpointDir => + withTempDir { dataDir => + writeJsonData(dataDir, Seq("""{"value": 1}""")) + + // Start with V2 + val query1 = spark.readStream + .format("json") + .schema("value INT") + .name("my_source") + .load(dataDir.getCanonicalPath) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query1.awaitTermination() + + val offsetLog = getOffsetLog(checkpointDir.getCanonicalPath) + val v2Offset = offsetLog.getLatest().get._2 + assert(v2Offset.isInstanceOf[OffsetMap]) + + // Restart - should remain V2 + writeJsonData(dataDir, Seq("""{"value": 2}""")) + + val query2 = spark.readStream + .format("json") + .schema("value INT") + .name("my_source") + .load(dataDir.getCanonicalPath) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query2.awaitTermination() + + val offsetLog2 = getOffsetLog(checkpointDir.getCanonicalPath) + val v2OffsetAfter = offsetLog2.getLatest().get._2 + assert(v2OffsetAfter.isInstanceOf[OffsetMap], "Should still be V2") + } + } + } + } + + test("Multi-source upgrade preserves all offsets correctly") { + withTempDirs { (checkpointDir, dataDir1, dataDir2, dataDir3) => + // Write initial data + writeJsonData(dataDir1, Seq("""{"value": 1}""")) + writeJsonData(dataDir2, Seq("""{"value": 2}""")) + writeJsonData(dataDir3, Seq("""{"value": 3}""")) + + // Start with V1, 3 sources (enforcement disabled) + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "1", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") { + val query1 = spark.readStream + .format("json") + .schema("value INT") + .load(dataDir1.getCanonicalPath) + .union( + spark.readStream.format("json").schema("value INT") + .load(dataDir2.getCanonicalPath)) + .union( + spark.readStream.format("json").schema("value INT") + .load(dataDir3.getCanonicalPath)) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query1.awaitTermination() + + val offsetLog = getOffsetLog(checkpointDir.getCanonicalPath) + val v1Offset = offsetLog.getLatest().get._2.asInstanceOf[OffsetSeq] + assert(v1Offset.offsets.size == 3, "Should have 3 sources in V1") + } + + // Restart with all sources named + V2 explicitly requested + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true", + SQLConf.STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED.key -> "true") { + val query2 = spark.readStream + .format("json") + .schema("value INT") + .name("payments") + .load(dataDir1.getCanonicalPath) + .union( + spark.readStream + .format("json") + .schema("value INT") + .name("refunds") + .load(dataDir2.getCanonicalPath) + ) + .union( + spark.readStream + .format("json") + .schema("value INT") + .name("adjustments") + .load(dataDir3.getCanonicalPath) + ) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query2.awaitTermination() + + val offsetLog2 = getOffsetLog(checkpointDir.getCanonicalPath) + val v2Offset = offsetLog2.getLatest().get._2.asInstanceOf[OffsetMap] + + // Verify all three sources are in the map with correct names + assert(v2Offset.offsetsMap.size == 3, "Should have 3 sources in V2") + assert(v2Offset.offsetsMap.contains("payments")) + assert(v2Offset.offsetsMap.contains("refunds")) + assert(v2Offset.offsetsMap.contains("adjustments")) + } + } + } + + test("Source count mismatch throws clear error") { + withTempDirs { (checkpointDir, dataDir1, dataDir2, dataDir3) => + writeJsonData(dataDir1, Seq("""{"value": 1}""")) + writeJsonData(dataDir2, Seq("""{"value": 2}""")) + + // Start with 2 sources (enforcement disabled) + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "1", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") { + val query1 = spark.readStream + .format("json") + .schema("value INT") + .load(dataDir1.getCanonicalPath) + .union( + spark.readStream.format("json").schema("value INT") + .load(dataDir2.getCanonicalPath)) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query1.awaitTermination() + } + + // Try to restart with 3 named sources + V2 requested - should fail + writeJsonData(dataDir3, Seq("""{"value": 3}""")) + + val e = intercept[Exception] { + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true", + SQLConf.STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED.key -> "true") { + val query2 = spark.readStream + .format("json") + .schema("value INT") + .name("a") + .load(dataDir1.getCanonicalPath) + .union( + spark.readStream + .format("json") + .schema("value INT") + .name("b") + .load(dataDir2.getCanonicalPath) + ) + .union( + spark.readStream + .format("json") + .schema("value INT") + .name("c") + .load(dataDir3.getCanonicalPath) + ) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query2.awaitTermination() + } + } + + // The error can be either wrapped in StreamingQueryException or thrown directly + val errorMessage = e match { + case sqe: org.apache.spark.sql.streaming.StreamingQueryException => + sqe.getCause.getMessage + case other => other.getMessage + } + + assert(errorMessage.contains("2") && errorMessage.contains("3"), + s"Error should mention source count mismatch (2 vs 3): $errorMessage") + } + } + + test("V1 offset log + V2 requested without upgrade config throws clear error") { + withTempDirs { (checkpointDir, dataDir1, dataDir2) => + // Write initial data + writeJsonData(dataDir1, Seq("""{"value": 1}""")) + writeJsonData(dataDir2, Seq("""{"value": 2}""")) + + // Start with V1 offset log (no names, enforcement disabled) + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "1", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") { + val query1 = spark.readStream + .format("json") + .schema("value INT") + .load(dataDir1.getCanonicalPath) + .union( + spark.readStream + .format("json") + .schema("value INT") + .load(dataDir2.getCanonicalPath) + ) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query1.awaitTermination() + } + + // Try to restart with named sources + V2 requested but WITHOUT upgrade config + val e = intercept[Exception] { + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") { + val query2 = spark.readStream + .format("json") + .schema("value INT") + .name("source_a") + .load(dataDir1.getCanonicalPath) + .union( + spark.readStream + .format("json") + .schema("value INT") + .name("source_b") + .load(dataDir2.getCanonicalPath) + ) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query2.awaitTermination() + } + } + + // The error can be either wrapped in StreamingQueryException or thrown directly + val errorMessage = e match { + case sqe: org.apache.spark.sql.streaming.StreamingQueryException => + sqe.getCause.getMessage + case other => other.getMessage + } + + // Verify error message contains key guidance + assert(errorMessage.contains("V1 format") && + errorMessage.contains("V2 format was requested") && + errorMessage.contains("v1ToV2.autoUpgrade.enabled"), + s"Error should explain upgrade requirement and how to enable it: $errorMessage") + } + } + + test("V1 offset log + V2 requested without named sources upgrades with positional keys") { + withTempDir { checkpointDir => + withTempDir { dataDir => + // Write initial data + writeJsonData(dataDir, Seq("""{"value": 1}""")) + + // Start with V1 offset log, unnamed sources + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "1", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") { + val query1 = spark.readStream + .format("json") + .schema("value INT") + .load(dataDir.getCanonicalPath) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query1.awaitTermination() + } + + // Restart requesting V2 WITHOUT naming sources - should upgrade with positional keys + writeJsonData(dataDir, Seq("""{"value": 2}""")) + + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false", + SQLConf.STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED.key -> "true") { + val query2 = spark.readStream + .format("json") + .schema("value INT") + // Note: NO .name() call - source is unnamed + .load(dataDir.getCanonicalPath) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query2.awaitTermination() + + // Verify upgrade to V2 happened with positional keys + val offsetLog = getOffsetLog(checkpointDir.getCanonicalPath) + val latestOffset = offsetLog.getLatest().get._2 + assert(latestOffset.isInstanceOf[OffsetMap], + "Should have upgraded to V2 OffsetMap with positional keys") + assert(latestOffset.version == OffsetSeqLog.VERSION_2) + + // Verify offset is keyed by positional index "0" + val offsetMap = latestOffset.asInstanceOf[OffsetMap] + assert(offsetMap.offsetsMap.contains("0"), + "Should contain positional key '0'") + } + } + } + } + + test("V2 with positional keys continues using positional even if names added later") { + withTempDir { checkpointDir => + withTempDir { dataDir => + // Write initial data + writeJsonData(dataDir, Seq("""{"value": 1}""")) + + // Start with V1, source evolution disabled + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "1", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") { + val query1 = spark.readStream + .format("json") + .schema("value INT") + .load(dataDir.getCanonicalPath) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query1.awaitTermination() + } + + // Upgrade to V2 with source evolution disabled (creates positional keys) + writeJsonData(dataDir, Seq("""{"value": 2}""")) + + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false", + SQLConf.STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED.key -> "true") { + val query2 = spark.readStream + .format("json") + .schema("value INT") + .load(dataDir.getCanonicalPath) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query2.awaitTermination() + + // Verify V2 with positional key "0" + val offsetLog = getOffsetLog(checkpointDir.getCanonicalPath) + val offset1 = offsetLog.getLatest().get._2.asInstanceOf[OffsetMap] + assert(offset1.offsetsMap.contains("0")) + } + + // Now restart with source evolution ENABLED and name added + // Should still use positional keys because checkpoint metadata has it disabled + writeJsonData(dataDir, Seq("""{"value": 3}""")) + + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") { + val query3 = spark.readStream + .format("json") + .schema("value INT") + .name("my_source") // Added name! + .load(dataDir.getCanonicalPath) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query3.awaitTermination() + + // Verify still using positional key "0" (not "my_source") + // Even though source evolution is enabled NOW, the checkpoint was created + // with it disabled, so we read that from metadata and continue with positional + val offsetLog2 = getOffsetLog(checkpointDir.getCanonicalPath) + val offset2 = offsetLog2.getLatest().get._2.asInstanceOf[OffsetMap] + assert(offset2.offsetsMap.contains("0"), + "Should still use positional key '0' even though name was added") + assert(!offset2.offsetsMap.contains("my_source"), + "Should not use name 'my_source' for checkpoint created with positional keys") + } + } + } + } +}