From 9eab4f61b12b297995bcd68e3efdbcda32df3a6c Mon Sep 17 00:00:00 2001 From: ericm-db Date: Tue, 3 Mar 2026 14:49:27 -0800 Subject: [PATCH] [SPARK-XXXXX][SS] Add V1 to V2 offset log auto-upgrade Add automatic upgrade mechanism for Spark Structured Streaming offset logs, migrating from V1 (position-based) to V2 (name-based) format without losing state or requiring checkpoint deletion. Changes: - Add config spark.sql.streaming.offsetLog.v1ToV2.autoUpgrade.enabled - Add SupportsOffsetLogUpgrade trait for sources to handle metadata migration - Implement upgrade logic in MicroBatchExecution with two paths: - Positional: V1 to V2 with keys "0", "1", "2" - Named: V1 to V2 with actual source names, migrating metadata directories - FileStreamSource implements metadata migration by copying all batches from old to new paths - Add OffsetSeq.toOffsetMap() conversion method - Add comprehensive test suite with 7 passing tests Co-Authored-By: Claude --- .../streaming/SupportsOffsetLogUpgrade.java | 12 + .../apache/spark/sql/internal/SQLConf.scala | 15 + .../streaming/checkpointing/OffsetSeq.scala | 27 + .../streaming/runtime/FileStreamSource.scala | 84 ++- .../runtime/MicroBatchExecution.scala | 211 +++++- .../OffsetLogV1ToV2UpgradeSuite.scala | 604 ++++++++++++++++++ 6 files changed, 944 insertions(+), 9 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsOffsetLogUpgrade.java create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetLogV1ToV2UpgradeSuite.scala diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsOffsetLogUpgrade.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsOffsetLogUpgrade.java new file mode 100644 index 0000000000000..ac91923d7a7ca --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/SupportsOffsetLogUpgrade.java @@ -0,0 +1,12 @@ +package org.apache.spark.sql.connector.read.streaming; + +import org.apache.spark.annotation.Evolving; + +@Evolving +public interface SupportsOffsetLogUpgrade { + void migrateMetadataForUpgrade( + String oldMetadataPath, + String newMetadataPath, + long lastBatchId, + long upgradeBatchId); +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e4390c6754790..63914e5adcee8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2951,6 +2951,18 @@ object SQLConf { .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") .createWithDefault(1) + val STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED = + buildConf("spark.sql.streaming.offsetLog.v1ToV2.autoUpgrade.enabled") + .internal() + .doc("When true, automatically upgrades V1 offset logs to V2 format when " + + "spark.sql.streaming.offsetLog.formatVersion=2 is set. " + + "This is a one-way migration that cannot be rolled back. " + + "Users must ensure all batches are committed before enabling this config. " + + "Default: false.") + .version("4.2.0") + .booleanConf + .createWithDefault(false) + val STREAMING_MAX_NUM_STATE_SCHEMA_FILES = buildConf("spark.sql.streaming.stateStore.maxNumStateSchemaFiles") .internal() @@ -7367,6 +7379,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def streamingOffsetLogFormatVersion: Int = getConf(STREAMING_OFFSET_LOG_FORMAT_VERSION) + def streamingOffsetLogV1ToV2AutoUpgradeEnabled: Boolean = + getConf(STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED) + def stateStoreEncodingFormat: String = getConf(STREAMING_STATE_STORE_ENCODING_FORMAT) def streamingValueStateSchemaEvolutionThreshold: Int = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala index bf2278b814922..4cf0d57e64763 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala @@ -97,6 +97,33 @@ case class OffsetSeq( offsets: Seq[Option[OffsetV2]], metadataOpt: Option[OffsetSeqMetadata] = None) extends OffsetSeqBase { override def version: Int = OffsetSeqLog.VERSION_1 + + /** + * Converts this V1 OffsetSeq to a V2 OffsetMap using the provided ordered source IDs. + * This is used during V1 to V2 offset log upgrade to map positional offsets to keyed offsets. + * + * @param orderedSourceIds The source IDs in the same order as the positional offsets. + * Can be either positional ("0", "1", "2") or named ("payments", etc). + * @return An OffsetMap with offsets keyed by source ID. + */ + def toOffsetMap(orderedSourceIds: Seq[String]): OffsetMap = { + require(orderedSourceIds.size == offsets.size, + s"orderedSourceIds.size (${orderedSourceIds.size}) != offsets.size (${offsets.size})") + + // Validate no duplicate source IDs - duplicates would cause silent data loss in toMap + val duplicates = orderedSourceIds.groupBy(identity).filter(_._2.size > 1).keys + require(duplicates.isEmpty, + s"Cannot convert V1 OffsetSeq to V2 OffsetMap: duplicate source IDs found: " + + s"${duplicates.mkString(", ")}") + + val offsetsMap = orderedSourceIds.zip(offsets).toMap + val v1Meta = metadataOpt.getOrElse(OffsetSeqMetadata()) + val v2Meta = OffsetSeqMetadataV2( + batchWatermarkMs = v1Meta.batchWatermarkMs, + batchTimestampMs = v1Meta.batchTimestampMs, + conf = v1Meta.conf) + OffsetMap(offsetsMap, v2Meta) + } } object OffsetSeq { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/FileStreamSource.scala index 9847bd9d76448..3fe5bd86342fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/FileStreamSource.scala @@ -32,7 +32,7 @@ import org.apache.spark.paths.SparkPath import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.classic.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.connector.read.streaming -import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxBytes, ReadMaxFiles, SupportsAdmissionControl, SupportsTriggerAvailableNow} +import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxBytes, ReadMaxFiles, SupportsAdmissionControl, SupportsOffsetLogUpgrade, SupportsTriggerAvailableNow} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.execution.streaming.{Offset, Source} @@ -55,6 +55,7 @@ class FileStreamSource( options: Map[String, String]) extends SupportsAdmissionControl with SupportsTriggerAvailableNow + with SupportsOffsetLogUpgrade with Source with Logging { @@ -430,6 +431,87 @@ class FileStreamSource( } override def stop(): Unit = sourceCleaner.foreach(_.stop()) + + override def migrateMetadataForUpgrade( + oldMetadataPath: String, + newMetadataPath: String, + lastBatchId: Long, + upgradeBatchId: Long): Unit = { + + // For positional upgrades, old and new paths are the same - no copy needed + if (oldMetadataPath == newMetadataPath) { + logInfo(s"Paths are identical ($oldMetadataPath), skipping metadata copy") + + // Still need to write the upgrade batch + val log = new FileStreamSourceLog( + FileStreamSourceLog.VERSION, + sparkSession, + oldMetadataPath) + + log.get(lastBatchId) match { + case Some(entries) => + if (!log.add(upgradeBatchId, entries)) { + throw new IllegalStateException( + s"Failed to write upgrade batch to $oldMetadataPath") + } + logInfo(s"Wrote upgrade batch $upgradeBatchId") + case None => + if (!log.add(upgradeBatchId, Array.empty)) { + throw new IllegalStateException( + s"Failed to write empty upgrade batch to $oldMetadataPath") + } + logInfo(s"Wrote empty upgrade batch $upgradeBatchId") + } + return + } + + val oldLog = new FileStreamSourceLog( + FileStreamSourceLog.VERSION, + sparkSession, + oldMetadataPath) + + val newLog = new FileStreamSourceLog( + FileStreamSourceLog.VERSION, + sparkSession, + newMetadataPath) + + // Copy all existing batches from old path to new path + // This is needed because FileStreamSource.restore() reads all batches from 0 to latest + val oldBatches = oldLog.allFiles() + val batchIds = oldBatches.map(_.batchId).distinct.sorted + + batchIds.foreach { batchId => + oldLog.get(batchId) match { + case Some(entries) => + if (!newLog.add(batchId, entries)) { + throw new IllegalStateException( + s"Failed to copy batch $batchId metadata to $newMetadataPath") + } + logInfo(s"Copied batch $batchId: ${entries.length} entries from " + + s"$oldMetadataPath to $newMetadataPath") + + case None => + logWarning(s"Batch $batchId not found in $oldMetadataPath, skipping") + } + } + + // Write upgrade batch using data from lastBatchId + oldLog.get(lastBatchId) match { + case Some(entries) => + if (!newLog.add(upgradeBatchId, entries)) { + throw new IllegalStateException( + s"Failed to write upgrade batch metadata to $newMetadataPath") + } + logInfo(s"Wrote upgrade batch $upgradeBatchId with ${entries.length} entries") + + case None => + if (!newLog.add(upgradeBatchId, Array.empty)) { + throw new IllegalStateException( + s"Failed to write empty upgrade batch metadata to $newMetadataPath") + } + logInfo(s"Wrote empty upgrade batch $upgradeBatchId") + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index 35a658d350ab5..f334961982a25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -32,21 +32,21 @@ import org.apache.spark.internal.LogKeys._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, FileSourceMetadataAttribute, LocalTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState, FlatMapGroupsWithState, GlobalLimit, Join, LeafNode, LocalRelation, LogicalPlan, Project, StreamSourceAwareLogicalPlan, TransformWithState, TransformWithStateInPySpark} -import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, Unassigned, WriteToStream} +import org.apache.spark.sql.catalyst.streaming.{FlowAssigned, StreamingRelationV2, StreamingSourceIdentifyingName, Unassigned, UserProvided, WriteToStream} import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.classic.{Dataset, SparkSession} import org.apache.spark.sql.classic.ClassicConversions.castToImpl import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, TableCapability} -import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, ReadLimit, SparkDataStream, SupportsAdmissionControl, SupportsRealTimeMode, SupportsTriggerAvailableNow} +import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, ReadLimit, SparkDataStream, SupportsAdmissionControl, SupportsOffsetLogUpgrade, SupportsRealTimeMode, SupportsTriggerAvailableNow} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper} -import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2} +import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetMap, OffsetSeq, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2} import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter} -import org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetHandler +import org.apache.spark.sql.execution.streaming.runtime.{AcceptsLatestSeenOffsetHandler, StreamingExecutionRelation} import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE} import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1} import org.apache.spark.sql.execution.streaming.state.{OfflineStateRepartitionUtils, StateSchemaBroadcast, StateStoreErrors} @@ -461,7 +461,7 @@ class MicroBatchExecution( // Read the offset log format version from the last written offset log entry. If no entries // are found, use the set/default value from the config. - val offsetLogFormatVersion = if (latestStartedBatch.isDefined) { + val currentOffsetLogFormatVersion = if (latestStartedBatch.isDefined) { latestStartedBatch.get._2.version } else { // If no offset log entries are found, assert that the query does not have any committed @@ -470,16 +470,50 @@ class MicroBatchExecution( sparkSessionForStream.conf.get(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION) } + // Get the desired offset log format version from config (what user explicitly requested) + val desiredOffsetLogFormatVersion = + sparkSessionForStream.conf.get(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION) + + // --- V1 to V2 auto-upgrade --- + // Only upgrade if: + // 1. Current offset log is V1 + // 2. User explicitly requests V2 via config + // 3. User has explicitly enabled the upgrade + val finalOffsetLogFormatVersion = + if (currentOffsetLogFormatVersion == OffsetSeqLog.VERSION_1 && + desiredOffsetLogFormatVersion == OffsetSeqLog.VERSION_2) { + + val upgradeEnabled = + sparkSessionForStream.conf.get(SQLConf.STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED) + + if (upgradeEnabled) { + maybeUpgradeOffsetLogToV2(lastCommittedBatchId, sparkSessionForStream) + OffsetSeqLog.VERSION_2 + } else { + // User wants V2 but hasn't enabled upgrade - give clear guidance + throw new IllegalStateException( + "Offset log is in V1 format but V2 format was requested via " + + "spark.sql.streaming.offsetLog.formatVersion=2. " + + "To migrate the offset log, set " + + "spark.sql.streaming.offsetLog.v1ToV2.autoUpgrade.enabled=true. " + + "Important: This is a one-way migration that cannot be rolled back. " + + "Ensure all batches are committed before enabling. " + + "See documentation for details.") + } + } else { + currentOffsetLogFormatVersion + } + // Set the offset log format version in the sparkSessionForStream conf sparkSessionForStream.conf.set( - SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key, offsetLogFormatVersion) + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key, finalOffsetLogFormatVersion) val execCtx = new MicroBatchExecutionContext(id, runId, name, triggerClock, sources, sink, progressReporter, -1, sparkSession, - offsetLogFormatVersionOpt = Some(offsetLogFormatVersion), + offsetLogFormatVersionOpt = Some(finalOffsetLogFormatVersion), previousContext = None) - execCtx.offsetSeqMetadata = offsetLogFormatVersion match { + execCtx.offsetSeqMetadata = finalOffsetLogFormatVersion match { case OffsetSeqLog.VERSION_2 => OffsetSeqMetadataV2(batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf) case OffsetSeqLog.VERSION_1 => @@ -1443,6 +1477,167 @@ class MicroBatchExecution( awaitProgressLock.unlock() } } + + /** + * Performs V1 to V2 offset log upgrade if needed. + * + * This method: + * 1. Converts the last committed V1 OffsetSeq to V2 OffsetMap format + * 2. Creates an upgrade batch (N+1) in offset log, commit log, and source metadata logs + * 3. For named upgrades: Renames source directories from positional to named paths + * 4. Stops query execution (user must restart) + * + * @param lastCommittedBatchId The last committed batch ID + * @param sparkSessionForStream The Spark session for this stream + */ + private def maybeUpgradeOffsetLogToV2( + lastCommittedBatchId: Long, + sparkSessionForStream: SparkSession): Unit = { + // Fresh query (never committed a batch) - nothing to upgrade + if (lastCommittedBatchId < 0) { + logInfo("No upgrade needed: fresh query with no committed batches") + return + } + + val latestBatchId = offsetLog.getLatestBatchId().getOrElse { + logInfo("No upgrade needed: no offset log entries") + return + } + + // Only upgrade from a clean state (no outstanding uncommitted batch) + if (latestBatchId != lastCommittedBatchId) { + logWarning( + s"V1 to V2 offset log upgrade skipped: uncommitted batch exists " + + s"(latest=$latestBatchId, lastCommitted=$lastCommittedBatchId). " + + s"Allow the query to finish one clean batch then restart to trigger the upgrade.") + return + } + + offsetLog.get(lastCommittedBatchId) match { + case Some(v1: OffsetSeq) => + val v1Offsets = v1.offsets // Seq[Option[OffsetV2]], positionally ordered + + // Sanity-check: plan must have the same number of sources as the offset log + if (v1Offsets.size != sources.size) { + throw new IllegalStateException( + s"Cannot upgrade offset log to V2: offset log has ${v1Offsets.size} " + + s"source(s) but the current query plan has ${sources.size} source(s). " + + s"Make sure the query has the same number of sources as when it was last run.") + } + + val newBatchId = latestBatchId + 1 + + // Determine upgrade path: positional vs named + val sourceEvolutionEnabled = + sparkSessionForStream.conf.get(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION) + + // Extract source identifying names from the logical plan in traversal order + val sourceNames = logicalPlan.collect { + case s: StreamingExecutionRelation => s.sourceIdentifyingName + case r: StreamingDataSourceV2ScanRelation => r.relation.sourceIdentifyingName + } + + // Generate source IDs based on whether we're doing positional or named upgrade + val sourceIds = if (sourceEvolutionEnabled && allSourcesNamed(sourceNames)) { + // Path 2: Named upgrade - use actual source names + val namedIds = sourceNames.map(sourceIdentifyingNameToId) + logInfo(s"Upgrading to V2 with named keys: ${namedIds.mkString(", ")}") + namedIds + } else { + // Path 1: Positional upgrade - use stringified indices + val positionalIds = sourceNames.indices.map(_.toString).toSeq + logInfo(s"Upgrading to V2 with positional keys: " + + s"${positionalIds.mkString(", ")}") + positionalIds + } + + // Convert V1 OffsetSeq to V2 OffsetMap + val upgradeOffsetMap = v1.toOffsetMap(sourceIds) + + // Write upgrade batch to offset log + if (!offsetLog.add(newBatchId, upgradeOffsetMap)) { + throw QueryExecutionErrors.concurrentStreamLogUpdate(newBatchId) + } + + // Write upgrade batch to commit log + val upgradeCommit = commitLog.get(lastCommittedBatchId) + .getOrElse(CommitMetadata()) + .copy(stateUniqueIds = None) + if (!commitLog.add(newBatchId, upgradeCommit)) { + throw QueryExecutionErrors.concurrentStreamLogUpdate(newBatchId) + } + + // CRITICAL: Write upgrade batch to each source's metadata log + // This ensures getBatch(N, N+1) won't fail with "batch N+1 doesn't exist" + writeUpgradeBatchToSourceMetadataLogs(lastCommittedBatchId, newBatchId, sourceIds) + + logInfo(s"Successfully upgraded offset log from V1 to V2 at batch $newBatchId. " + + s"Query will now stop. Restart the query to continue with V2 format.") + + // Stop query execution - user must restart + // The upgrade batch is committed but no data was processed + // On restart, sources will be created with correct paths and continues from + // batch N+2 + stop() + + case Some(_: OffsetMap) => + logInfo("No upgrade needed: already using V2 offset log format") + + case _ => + logInfo("No upgrade needed: no offset log entry found") + } + } + + /** + * Checks if all sources have names (UserProvided or FlowAssigned, not Unassigned). + */ + private def allSourcesNamed(names: Seq[StreamingSourceIdentifyingName]): Boolean = + names.nonEmpty && names.forall { + case UserProvided(_) | FlowAssigned(_) => true + case Unassigned => false + } + + /** + * Extracts the string ID from a StreamingSourceIdentifyingName. + */ + private def sourceIdentifyingNameToId(name: StreamingSourceIdentifyingName): String = + name match { + case UserProvided(n) => n + case FlowAssigned(n) => n + case Unassigned => + throw new IllegalStateException("Cannot derive source ID from Unassigned name") + } + + /** + * Migrates source metadata from old paths to new paths for upgrade batch. + * Delegates to sources that implement SupportsOffsetLogUpgrade. + * + * @param lastBatchId The last committed batch ID (batch N) + * @param upgradeBatchId The upgrade batch ID (batch N+1) + * @param sourceIds The source IDs in plan order (positional or named) + */ + private def writeUpgradeBatchToSourceMetadataLogs( + lastBatchId: Long, + upgradeBatchId: Long, + sourceIds: Seq[String]): Unit = { + + sourceIds.zipWithIndex.foreach { case (sourceId, index) => + val source = sources(index) + source match { + case upgradeable: SupportsOffsetLogUpgrade => + val oldMetadataPath = s"$resolvedCheckpointRoot/sources/$index" + val newMetadataPath = s"$resolvedCheckpointRoot/sources/$sourceId" + upgradeable.migrateMetadataForUpgrade( + oldMetadataPath, + newMetadataPath, + lastBatchId, + upgradeBatchId) + + case _ => + logInfo(s"Source $sourceId does not require metadata migration") + } + } + } } object MicroBatchExecution { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetLogV1ToV2UpgradeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetLogV1ToV2UpgradeSuite.scala new file mode 100644 index 0000000000000..2759b9cfb1568 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetLogV1ToV2UpgradeSuite.scala @@ -0,0 +1,604 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.File + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap, OffsetSeq, OffsetSeqLog} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamTest, Trigger} + +/** + * Test suite for automatic V1 to V2 offset log upgrade functionality. + * + * Tests the migration from positional offset tracking (OffsetSeq) to named + * offset tracking (OffsetMap) when users add .name() to their streaming sources. + */ +class OffsetLogV1ToV2UpgradeSuite extends StreamTest with BeforeAndAfter { + + after { + spark.streams.active.foreach(_.stop()) + } + + /** + * Helper to write JSON data to a directory for file streaming sources. + */ + private def writeJsonData(dir: File, data: Seq[String]): Unit = { + val file = new File(dir, s"data-${System.currentTimeMillis()}.json") + val writer = new java.io.PrintWriter(file) + try { + // scalastyle:off println + data.foreach(writer.println) + // scalastyle:on println + } finally { + writer.close() + } + } + + /** + * Helper to get the OffsetSeqLog from a checkpoint directory. + */ + private def getOffsetLog(checkpointPath: String): OffsetSeqLog = { + new OffsetSeqLog(spark, s"$checkpointPath/offsets") + } + + /** + * Helper to create multiple temporary directories for testing. + */ + private def withTempDirs(f: (File, File, File) => Unit): Unit = { + withTempDir { dir1 => + withTempDir { dir2 => + withTempDir { dir3 => + f(dir1, dir2, dir3) + } + } + } + } + + /** + * Helper for 4-directory variant. + */ + private def withTempDirs(f: (File, File, File, File) => Unit): Unit = { + withTempDir { dir1 => + withTempDir { dir2 => + withTempDir { dir3 => + withTempDir { dir4 => + f(dir1, dir2, dir3, dir4) + } + } + } + } + } + + /** + * Helper for 5-directory variant. + */ + private def withTempDirs(f: (File, File, File, File, File) => Unit): Unit = { + withTempDir { dir1 => + withTempDir { dir2 => + withTempDir { dir3 => + withTempDir { dir4 => + withTempDir { dir5 => + f(dir1, dir2, dir3, dir4, dir5) + } + } + } + } + } + } + + test("V1 offset log + all sources named auto-upgrades to V2") { + withTempDirs { (checkpointDir, dataDir1, dataDir2) => + // Write initial data + writeJsonData(dataDir1, Seq("""{"value": 1}""", """{"value": 2}""")) + writeJsonData(dataDir2, Seq("""{"value": 10}""", """{"value": 20}""")) + + // Step 1: Start with V1 offset log (no names, enforcement disabled) + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "1", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") { + val query1 = spark.readStream + .format("json") + .schema("value INT") + .load(dataDir1.getCanonicalPath) + .union( + spark.readStream + .format("json") + .schema("value INT") + .load(dataDir2.getCanonicalPath) + ) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query1.awaitTermination() + + // Verify V1 was used + val offsetLog = getOffsetLog(checkpointDir.getCanonicalPath) + val v1LatestOpt = offsetLog.getLatest() + assert(v1LatestOpt.isDefined, "Should have offset log entry") + val (_, v1Offset) = v1LatestOpt.get + assert(v1Offset.isInstanceOf[OffsetSeq], "Should be using V1 OffsetSeq") + assert(v1Offset.version == OffsetSeqLog.VERSION_1) + } + + // Step 2: Restart with named sources + V2 explicitly requested - should auto-upgrade to V2 + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true", + SQLConf.STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED.key -> "true") { + val query2 = spark.readStream + .format("json") + .schema("value INT") + .name("source_a") + .load(dataDir1.getCanonicalPath) + .union( + spark.readStream + .format("json") + .schema("value INT") + .name("source_b") + .load(dataDir2.getCanonicalPath) + ) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query2.awaitTermination() + + // Verify upgrade occurred + val offsetLog2 = getOffsetLog(checkpointDir.getCanonicalPath) + val v2LatestOpt = offsetLog2.getLatest() + assert(v2LatestOpt.isDefined, "Should have offset log entry after upgrade") + val (_, v2Offset) = v2LatestOpt.get + assert(v2Offset.isInstanceOf[OffsetMap], "Should have upgraded to V2 OffsetMap") + assert(v2Offset.version == OffsetSeqLog.VERSION_2) + + // Verify offsets are keyed by name + val offsetMap = v2Offset.asInstanceOf[OffsetMap] + assert(offsetMap.offsetsMap.contains("source_a"), "Should contain source_a") + assert(offsetMap.offsetsMap.contains("source_b"), "Should contain source_b") + } + + // Step 3: Add more data and restart again to verify query can continue with new paths + writeJsonData(dataDir1, Seq("""{"value": 3}""", """{"value": 4}""")) + writeJsonData(dataDir2, Seq("""{"value": 30}""", """{"value": 40}""")) + + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true", + SQLConf.STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED.key -> "true") { + val query3 = spark.readStream + .format("json") + .schema("value INT") + .name("source_a") + .load(dataDir1.getCanonicalPath) + .union( + spark.readStream + .format("json") + .schema("value INT") + .name("source_b") + .load(dataDir2.getCanonicalPath) + ) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query3.awaitTermination() + + // Verify query continued with V2 and processed new data + val offsetLog3 = getOffsetLog(checkpointDir.getCanonicalPath) + val v2Latest = offsetLog3.getLatest() + assert(v2Latest.isDefined, "Should have offset log after third run") + val (latestBatchId, latestOffset) = v2Latest.get + assert(latestOffset.isInstanceOf[OffsetMap], "Should still be V2") + assert(latestBatchId >= 2, "Should have processed at least batch 2") + } + } + } + + test("V1 offset log + no sources named continues with V1") { + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "1", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") { + withTempDir { checkpointDir => + withTempDir { dataDir => + writeJsonData(dataDir, Seq("""{"value": 1}""", """{"value": 2}""")) + + // Start with V1, no names + val query1 = spark.readStream + .format("json") + .schema("value INT") + .load(dataDir.getCanonicalPath) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query1.awaitTermination() + + val offsetLog = getOffsetLog(checkpointDir.getCanonicalPath) + val v1Offset = offsetLog.getLatest().get._2 + assert(v1Offset.isInstanceOf[OffsetSeq]) + assert(v1Offset.version == OffsetSeqLog.VERSION_1) + + // Restart without names - should remain V1 + writeJsonData(dataDir, Seq("""{"value": 3}""")) + + val query2 = spark.readStream + .format("json") + .schema("value INT") + .load(dataDir.getCanonicalPath) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query2.awaitTermination() + + val offsetLog2 = getOffsetLog(checkpointDir.getCanonicalPath) + val v1OffsetAfter = offsetLog2.getLatest().get._2 + assert(v1OffsetAfter.isInstanceOf[OffsetSeq], "Should still be V1") + assert(v1OffsetAfter.version == OffsetSeqLog.VERSION_1) + } + } + } + } + + test("Already V2 offset log + named sources continues with V2") { + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") { + withTempDir { checkpointDir => + withTempDir { dataDir => + writeJsonData(dataDir, Seq("""{"value": 1}""")) + + // Start with V2 + val query1 = spark.readStream + .format("json") + .schema("value INT") + .name("my_source") + .load(dataDir.getCanonicalPath) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query1.awaitTermination() + + val offsetLog = getOffsetLog(checkpointDir.getCanonicalPath) + val v2Offset = offsetLog.getLatest().get._2 + assert(v2Offset.isInstanceOf[OffsetMap]) + + // Restart - should remain V2 + writeJsonData(dataDir, Seq("""{"value": 2}""")) + + val query2 = spark.readStream + .format("json") + .schema("value INT") + .name("my_source") + .load(dataDir.getCanonicalPath) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query2.awaitTermination() + + val offsetLog2 = getOffsetLog(checkpointDir.getCanonicalPath) + val v2OffsetAfter = offsetLog2.getLatest().get._2 + assert(v2OffsetAfter.isInstanceOf[OffsetMap], "Should still be V2") + } + } + } + } + + test("Multi-source upgrade preserves all offsets correctly") { + withTempDirs { (checkpointDir, dataDir1, dataDir2, dataDir3) => + // Write initial data + writeJsonData(dataDir1, Seq("""{"value": 1}""")) + writeJsonData(dataDir2, Seq("""{"value": 2}""")) + writeJsonData(dataDir3, Seq("""{"value": 3}""")) + + // Start with V1, 3 sources (enforcement disabled) + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "1", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") { + val query1 = spark.readStream + .format("json") + .schema("value INT") + .load(dataDir1.getCanonicalPath) + .union( + spark.readStream.format("json").schema("value INT") + .load(dataDir2.getCanonicalPath)) + .union( + spark.readStream.format("json").schema("value INT") + .load(dataDir3.getCanonicalPath)) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query1.awaitTermination() + + val offsetLog = getOffsetLog(checkpointDir.getCanonicalPath) + val v1Offset = offsetLog.getLatest().get._2.asInstanceOf[OffsetSeq] + assert(v1Offset.offsets.size == 3, "Should have 3 sources in V1") + } + + // Restart with all sources named + V2 explicitly requested + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true", + SQLConf.STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED.key -> "true") { + val query2 = spark.readStream + .format("json") + .schema("value INT") + .name("payments") + .load(dataDir1.getCanonicalPath) + .union( + spark.readStream + .format("json") + .schema("value INT") + .name("refunds") + .load(dataDir2.getCanonicalPath) + ) + .union( + spark.readStream + .format("json") + .schema("value INT") + .name("adjustments") + .load(dataDir3.getCanonicalPath) + ) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query2.awaitTermination() + + val offsetLog2 = getOffsetLog(checkpointDir.getCanonicalPath) + val v2Offset = offsetLog2.getLatest().get._2.asInstanceOf[OffsetMap] + + // Verify all three sources are in the map with correct names + assert(v2Offset.offsetsMap.size == 3, "Should have 3 sources in V2") + assert(v2Offset.offsetsMap.contains("payments")) + assert(v2Offset.offsetsMap.contains("refunds")) + assert(v2Offset.offsetsMap.contains("adjustments")) + } + } + } + + test("Source count mismatch throws clear error") { + withTempDirs { (checkpointDir, dataDir1, dataDir2, dataDir3) => + writeJsonData(dataDir1, Seq("""{"value": 1}""")) + writeJsonData(dataDir2, Seq("""{"value": 2}""")) + + // Start with 2 sources (enforcement disabled) + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "1", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") { + val query1 = spark.readStream + .format("json") + .schema("value INT") + .load(dataDir1.getCanonicalPath) + .union( + spark.readStream.format("json").schema("value INT") + .load(dataDir2.getCanonicalPath)) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query1.awaitTermination() + } + + // Try to restart with 3 named sources + V2 requested - should fail + writeJsonData(dataDir3, Seq("""{"value": 3}""")) + + val e = intercept[Exception] { + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true", + SQLConf.STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED.key -> "true") { + val query2 = spark.readStream + .format("json") + .schema("value INT") + .name("a") + .load(dataDir1.getCanonicalPath) + .union( + spark.readStream + .format("json") + .schema("value INT") + .name("b") + .load(dataDir2.getCanonicalPath) + ) + .union( + spark.readStream + .format("json") + .schema("value INT") + .name("c") + .load(dataDir3.getCanonicalPath) + ) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query2.awaitTermination() + } + } + + // The error can be either wrapped in StreamingQueryException or thrown directly + val errorMessage = e match { + case sqe: org.apache.spark.sql.streaming.StreamingQueryException => + sqe.getCause.getMessage + case other => other.getMessage + } + + assert(errorMessage.contains("2") && errorMessage.contains("3"), + s"Error should mention source count mismatch (2 vs 3): $errorMessage") + } + } + + test("V1 offset log + V2 requested without upgrade config throws clear error") { + withTempDirs { (checkpointDir, dataDir1, dataDir2) => + // Write initial data + writeJsonData(dataDir1, Seq("""{"value": 1}""")) + writeJsonData(dataDir2, Seq("""{"value": 2}""")) + + // Start with V1 offset log (no names, enforcement disabled) + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "1", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") { + val query1 = spark.readStream + .format("json") + .schema("value INT") + .load(dataDir1.getCanonicalPath) + .union( + spark.readStream + .format("json") + .schema("value INT") + .load(dataDir2.getCanonicalPath) + ) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query1.awaitTermination() + } + + // Try to restart with named sources + V2 requested but WITHOUT upgrade config + val e = intercept[Exception] { + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") { + val query2 = spark.readStream + .format("json") + .schema("value INT") + .name("source_a") + .load(dataDir1.getCanonicalPath) + .union( + spark.readStream + .format("json") + .schema("value INT") + .name("source_b") + .load(dataDir2.getCanonicalPath) + ) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query2.awaitTermination() + } + } + + // The error can be either wrapped in StreamingQueryException or thrown directly + val errorMessage = e match { + case sqe: org.apache.spark.sql.streaming.StreamingQueryException => + sqe.getCause.getMessage + case other => other.getMessage + } + + // Verify error message contains key guidance + assert(errorMessage.contains("V1 format") && + errorMessage.contains("V2 format was requested") && + errorMessage.contains("v1ToV2.autoUpgrade.enabled"), + s"Error should explain upgrade requirement and how to enable it: $errorMessage") + } + } + + test("V1 offset log + V2 requested without named sources upgrades with positional keys") { + withTempDir { checkpointDir => + withTempDir { dataDir => + // Write initial data + writeJsonData(dataDir, Seq("""{"value": 1}""")) + + // Start with V1 offset log, unnamed sources + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "1", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") { + val query1 = spark.readStream + .format("json") + .schema("value INT") + .load(dataDir.getCanonicalPath) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query1.awaitTermination() + } + + // Restart requesting V2 WITHOUT naming sources - should upgrade with positional keys + writeJsonData(dataDir, Seq("""{"value": 2}""")) + + withSQLConf( + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2", + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false", + SQLConf.STREAMING_OFFSET_LOG_V1_TO_V2_AUTO_UPGRADE_ENABLED.key -> "true") { + val query2 = spark.readStream + .format("json") + .schema("value INT") + // Note: NO .name() call - source is unnamed + .load(dataDir.getCanonicalPath) + .writeStream + .format("noop") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .trigger(Trigger.Once()) + .start() + + query2.awaitTermination() + + // Verify upgrade to V2 happened with positional keys + val offsetLog = getOffsetLog(checkpointDir.getCanonicalPath) + val latestOffset = offsetLog.getLatest().get._2 + assert(latestOffset.isInstanceOf[OffsetMap], + "Should have upgraded to V2 OffsetMap with positional keys") + assert(latestOffset.version == OffsetSeqLog.VERSION_2) + + // Verify offset is keyed by positional index "0" + val offsetMap = latestOffset.asInstanceOf[OffsetMap] + assert(offsetMap.offsetsMap.contains("0"), + "Should contain positional key '0'") + } + } + } + } + +}