From 350581fff8044dc1a32e12730f63039ca81fee27 Mon Sep 17 00:00:00 2001 From: Gurpreet Nanda Date: Fri, 27 Feb 2026 02:13:59 +0000 Subject: [PATCH 1/4] [SPARK-55728][SS] Introduce RocksDB conf for file checksum threadpool size --- .../apache/spark/sql/internal/SQLConf.scala | 13 ++++ .../ChecksumCheckpointFileManager.scala | 13 ++-- .../execution/streaming/state/RocksDB.scala | 11 ++-- .../streaming/state/StateStoreConf.scala | 3 + .../ChecksumCheckpointFileManagerSuite.scala | 24 ++++++++ .../streaming/state/RocksDBSuite.scala | 59 ++++++++++++++++++- 6 files changed, 112 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b2a2b7027394f..efd74546d1a01 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3684,6 +3684,16 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE = + buildConf("spark.sql.streaming.stateStore.rocksdb.fileChecksumThreadPoolSize") + .internal() + .doc("Number of threads used to compute file checksums concurrently when uploading " + + "RocksDB state store checkpoints (e.g. main file and checksum file).") + .version("4.1.0") + .intConf + .checkValue(x => x == 1 || (x > 0 && x % 2 == 0), "Must be 1 or a positive even number") + .createWithDefault(4) + val PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION = buildConf("spark.sql.statistics.parallelFileListingInStatsComputation.enabled") .internal() @@ -7173,6 +7183,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def checkpointFileChecksumSkipCreationIfFileMissingChecksum: Boolean = getConf(STREAMING_CHECKPOINT_FILE_CHECKSUM_SKIP_CREATION_IF_FILE_MISSING_CHECKSUM) + def stateStoreRocksDBFileChecksumThreadPoolSize: Int = + getConf(STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE) + def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED) def useDeprecatedKafkaOffsetFetching: Boolean = getConf(USE_DEPRECATED_KAFKA_OFFSET_FETCHING) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala index fcfad636ab776..cf0647cf6c1c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala @@ -127,9 +127,11 @@ case class ChecksumFile(path: Path) { * orphan checksum files. If using this, it is your responsibility * to clean up the potential orphan checksum files. * @param numThreads This is the number of threads to use for the thread pool, for reading/writing - * files. To avoid blocking, if the file manager instance is being used by a - * single thread, then you can set this to 2 (one thread for main file, another - * for checksum file). + * files. Must be 1 or a positive even number. + * Setting this to 1 means operations are performed sequentially (no concurrency + * between main file and checksum file). + * To avoid blocking with concurrent callers, set this to 2 (one thread for main + * file, another for checksum file) per caller thread. * If file manager is shared by multiple threads, you can set it to * number of threads using file manager * 2. * Setting this differently can lead to file operation being blocked waiting for @@ -150,12 +152,13 @@ class ChecksumCheckpointFileManager( val numThreads: Int, val skipCreationIfFileMissingChecksum: Boolean) extends CheckpointFileManager with Logging { - assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1 for the main file" + + assert(numThreads == 1 || numThreads % 2 == 0, + "numThreads must be 1 or a multiple of 2, we need 1 for the main file" + "and another for the checksum file") import ChecksumCheckpointFileManager._ - // This allows us to concurrently read/write the main file and checksum file + // This allows us to read/write the main file and checksum file (concurrently if numThreads > 1) private val threadPool = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonFixedThreadPool(numThreads, s"${this.getClass.getSimpleName}-Thread")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 69a7e9618bb3d..28f1fe3b5cf42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -152,11 +152,10 @@ class RocksDB( private val workingDir = createTempDir("workingDir") - // We need 2 threads per fm caller to avoid blocking - // (one for main file and another for checksum file). - // Since this fm is used by both query task and maintenance thread, - // then we need 2 * 2 = 4 threads. - protected val fileChecksumThreadPoolSize: Option[Int] = Some(4) + // To avoid blocking, we need 2 threads per fm caller (one for main file, one for checksum file). + // Since this fm is used by both query task and maintenance thread, the recommended default is + // 2 * 2 = 4 threads. A value of 1 disables concurrency (sequential execution). + protected val fileChecksumThreadPoolSize: Option[Int] = Some(conf.fileChecksumThreadPoolSize) protected def createFileManager( dfsRootDir: String, @@ -2404,6 +2403,7 @@ case class RocksDBConf( reportSnapshotUploadLag: Boolean, maxVersionsToDeletePerMaintenance: Int, fileChecksumEnabled: Boolean, + fileChecksumThreadPoolSize: Int, rowChecksumEnabled: Boolean, rowChecksumReadVerificationRatio: Long, mergeOperatorVersion: Int, @@ -2619,6 +2619,7 @@ object RocksDBConf { storeConf.reportSnapshotUploadLag, storeConf.maxVersionsToDeletePerMaintenance, storeConf.checkpointFileChecksumEnabled, + storeConf.fileChecksumThreadPoolSize, storeConf.rowChecksumEnabled, storeConf.rowChecksumReadVerificationRatio, getPositiveIntConf(MERGE_OPERATOR_VERSION_CONF), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala index f3bbc0ea24069..5aa2a0ffc850a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala @@ -116,6 +116,9 @@ class StateStoreConf( /** Whether file checksum generation and verification is enabled. */ val checkpointFileChecksumEnabled: Boolean = sqlConf.checkpointFileChecksumEnabled + /** Number of threads used to compute file checksums concurrently for RocksDB state store. */ + val fileChecksumThreadPoolSize: Int = sqlConf.stateStoreRocksDBFileChecksumThreadPoolSize + /** whether to validate state schema during query run. */ val stateSchemaCheckEnabled = sqlConf.isStateSchemaCheckEnabled diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala index b15e8f167db58..a1aadc3c95b11 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala @@ -250,6 +250,30 @@ abstract class ChecksumCheckpointFileManagerSuite extends CheckpointFileManagerT checksumFmWithoutFallback.close() } } + + test("numThreads = 1 is valid") { + withTempHadoopPath { basePath => + val fm = new ChecksumCheckpointFileManager( + createNoChecksumManager(basePath), + allowConcurrentDelete = true, + numThreads = 1, + skipCreationIfFileMissingChecksum = false) + fm.close() + } + } + + test("odd numThreads > 1 is invalid") { + withTempHadoopPath { basePath => + val ex = intercept[AssertionError] { + new ChecksumCheckpointFileManager( + createNoChecksumManager(basePath), + allowConcurrentDelete = true, + numThreads = 3, + skipCreationIfFileMissingChecksum = false) + } + assert(ex.getMessage.contains("numThreads must be 1 or a multiple of 2")) + } + } } class FileContextChecksumCheckpointFileManagerSuite extends ChecksumCheckpointFileManagerSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index bb02dad76ac69..689378ca5d9f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager -import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, FileContextBasedCheckpointFileManager, FileSystemBasedCheckpointFileManager} +import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, ChecksumCheckpointFileManager, FileContextBasedCheckpointFileManager, FileSystemBasedCheckpointFileManager} import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager.{CancellableFSDataOutputStream, RenameBasedFSDataOutputStream} import org.apache.spark.sql.execution.streaming.runtime.StreamExecution import org.apache.spark.sql.internal.SQLConf @@ -2995,6 +2995,63 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } + test("RocksDB: fileChecksumThreadPoolSize propagates to ChecksumCheckpointFileManager") { + withTempDir { dir => + val sqlConf = SQLConf.get.clone() + sqlConf.setConfString( + SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key, "true") + sqlConf.setConfString( + SQLConf.STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "6") + + val dbConf = RocksDBConf(StateStoreConf(sqlConf)) + assert(dbConf.fileChecksumThreadPoolSize === 6) + + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = dbConf) { db => + // Access the fm (lazy val) on fileManager via PrivateMethodTester + val fileManagerMethod = PrivateMethod[CheckpointFileManager](Symbol("fm")) + val fm = db.fileManager invokePrivate fileManagerMethod() + + // Verify it's a ChecksumCheckpointFileManager with the right thread count + assert(fm.isInstanceOf[ChecksumCheckpointFileManager]) + assert(fm.asInstanceOf[ChecksumCheckpointFileManager].numThreads === 6) + } + } + } + + test("RocksDB: fileChecksumThreadPoolSize of 1 is valid and propagates correctly") { + withTempDir { dir => + val sqlConf = SQLConf.get.clone() + sqlConf.setConfString( + SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key, "true") + sqlConf.setConfString( + SQLConf.STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "1") + + val dbConf = RocksDBConf(StateStoreConf(sqlConf)) + assert(dbConf.fileChecksumThreadPoolSize === 1) + + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = dbConf) { db => + val fileManagerMethod = PrivateMethod[CheckpointFileManager](Symbol("fm")) + val fm = db.fileManager invokePrivate fileManagerMethod() + + assert(fm.isInstanceOf[ChecksumCheckpointFileManager]) + assert(fm.asInstanceOf[ChecksumCheckpointFileManager].numThreads === 1) + } + } + } + + Seq("0", "3").foreach { invalidValue => + test(s"STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE: invalid value $invalidValue") { + val sqlConf = SQLConf.get.clone() + val ex = intercept[IllegalArgumentException] { + sqlConf.setConfString(SQLConf.STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE.key, + invalidValue) + } + assert(ex.getMessage.contains("Must be 1 or a positive even number")) + } + } + // Add tests to check valid and invalid values for max_open_files passed to the underlying // RocksDB instance. Seq("-1", "100", "1000").foreach { maxOpenFiles => From 8167c56893eb027a6b26aeb88fd7c20d9c49cf87 Mon Sep 17 00:00:00 2001 From: Gurpreet Nanda Date: Mon, 2 Mar 2026 05:15:21 +0000 Subject: [PATCH 2/4] comments --- .../apache/spark/sql/internal/SQLConf.scala | 16 +-- .../ChecksumCheckpointFileManager.scala | 103 +++++++++++------- .../state/HDFSBackedStateStoreProvider.scala | 9 +- .../execution/streaming/state/RocksDB.scala | 11 +- .../streaming/state/StateStoreConf.scala | 4 +- .../ChecksumCheckpointFileManagerSuite.scala | 10 +- .../streaming/state/RocksDBSuite.scala | 59 +--------- .../streaming/state/StateStoreSuite.scala | 29 +++++ 8 files changed, 121 insertions(+), 120 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index efd74546d1a01..9c44c7b10b2b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3684,14 +3684,16 @@ object SQLConf { .booleanConf .createWithDefault(true) - val STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE = - buildConf("spark.sql.streaming.stateStore.rocksdb.fileChecksumThreadPoolSize") + val STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE = + buildConf("spark.sql.streaming.stateStore.fileChecksumThreadPoolSize") .internal() .doc("Number of threads used to compute file checksums concurrently when uploading " + - "RocksDB state store checkpoints (e.g. main file and checksum file).") - .version("4.1.0") + "state store checkpoints (e.g. main file and checksum file). " + + "Set to 0 to disable the thread pool and run operations sequentially on the calling " + + "thread. WARNING: Reducing below the default value of 4 may have performance impact.") + .version("4.2.0") .intConf - .checkValue(x => x == 1 || (x > 0 && x % 2 == 0), "Must be 1 or a positive even number") + .checkValue(x => x >= 0, "Must be a non-negative integer (0 to disable thread pool)") .createWithDefault(4) val PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION = @@ -7183,8 +7185,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def checkpointFileChecksumSkipCreationIfFileMissingChecksum: Boolean = getConf(STREAMING_CHECKPOINT_FILE_CHECKSUM_SKIP_CREATION_IF_FILE_MISSING_CHECKSUM) - def stateStoreRocksDBFileChecksumThreadPoolSize: Int = - getConf(STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE) + def stateStoreFileChecksumThreadPoolSize: Int = + getConf(STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE) def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala index cf0647cf6c1c8..8f2f4bd520513 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala @@ -21,9 +21,10 @@ import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit import java.util.zip.{CheckedInputStream, CheckedOutputStream, CRC32C} -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} import scala.concurrent.duration.Duration import scala.io.Source +import scala.util.control.NonFatal import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} @@ -127,14 +128,14 @@ case class ChecksumFile(path: Path) { * orphan checksum files. If using this, it is your responsibility * to clean up the potential orphan checksum files. * @param numThreads This is the number of threads to use for the thread pool, for reading/writing - * files. Must be 1 or a positive even number. - * Setting this to 1 means operations are performed sequentially (no concurrency - * between main file and checksum file). - * To avoid blocking with concurrent callers, set this to 2 (one thread for main - * file, another for checksum file) per caller thread. - * If file manager is shared by multiple threads, you can set it to - * number of threads using file manager * 2. - * Setting this differently can lead to file operation being blocked waiting for + * files. Must be a non-negative integer. + * Setting this to 0 disables the thread pool and runs all operations + * sequentially on the calling thread (no concurrency). + * To avoid blocking with a single concurrent caller, set this to 2 (one thread + * for main file, another for checksum file). + * If file manager is shared by multiple threads, set it to + * number of concurrent callers * 2. + * Setting this too low can lead to file operations being blocked waiting for * a free thread. * @param skipCreationIfFileMissingChecksum (ES-1629547): If true, when a file already exists * but its checksum file does not exist, fall back to using the underlying @@ -152,15 +153,33 @@ class ChecksumCheckpointFileManager( val numThreads: Int, val skipCreationIfFileMissingChecksum: Boolean) extends CheckpointFileManager with Logging { - assert(numThreads == 1 || numThreads % 2 == 0, - "numThreads must be 1 or a multiple of 2, we need 1 for the main file" + - "and another for the checksum file") + assert(numThreads >= 0, "numThreads must be a non-negative integer") import ChecksumCheckpointFileManager._ - // This allows us to read/write the main file and checksum file (concurrently if numThreads > 1) - private val threadPool = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonFixedThreadPool(numThreads, s"${this.getClass.getSimpleName}-Thread")) + // Thread pool for concurrent execution, or None when numThreads == 0 (sequential/inline mode). + private val threadPool: Option[ExecutionContextExecutorService] = + if (numThreads == 0) None + else Some(ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonFixedThreadPool(numThreads, s"${this.getClass.getSimpleName}-Thread"))) + + // ExecutionContext to pass to stream classes: uses the thread pool or a caller-runs context. + private val streamContext: ExecutionContext = threadPool.getOrElse(new ExecutionContext { + override def execute(runnable: Runnable): Unit = runnable.run() + override def reportFailure(cause: Throwable): Unit = throw cause + }) + + /** + * Schedules a computation on the thread pool, or runs it directly on the calling thread + * if numThreads == 0 (sequential mode). + */ + private def scheduleOrRun[T](f: => T): Future[T] = threadPool match { + case None => + try Future.successful(f) + catch { case NonFatal(t) => Future.failed(t) } + case Some(pool) => + Future { f }(pool) + } override def list(path: Path, filter: PathFilter): Array[FileStatus] = { underlyingFileMgr.list(path, filter) @@ -192,26 +211,26 @@ class ChecksumCheckpointFileManager( createFunc: Path => CancellableFSDataOutputStream): ChecksumCancellableFSDataOutputStream = { assert(!isChecksumFile(path), "Cannot directly create a checksum file") - val mainFileFuture = Future { + val mainFileFuture = scheduleOrRun { createFunc(path) - }(threadPool) + } - val checksumFileFuture = Future { + val checksumFileFuture = scheduleOrRun { createFunc(getChecksumPath(path)) - }(threadPool) + } new ChecksumCancellableFSDataOutputStream( awaitResult(mainFileFuture, Duration.Inf), path, awaitResult(checksumFileFuture, Duration.Inf), - threadPool + streamContext ) } override def open(path: Path): FSDataInputStream = { assert(!isChecksumFile(path), "Cannot directly open a checksum file") - val checksumInputStreamFuture = Future { + val checksumInputStreamFuture = scheduleOrRun { try { Some(underlyingFileMgr.open(getChecksumPath(path))) } catch { @@ -222,17 +241,17 @@ class ChecksumCheckpointFileManager( log"hence no checksum verification.") None } - }(threadPool) + } - val mainInputStreamFuture = Future { + val mainInputStreamFuture = scheduleOrRun { underlyingFileMgr.open(path) - }(threadPool) + } val mainStream = awaitResult(mainInputStreamFuture, Duration.Inf) val checksumStream = awaitResult(checksumInputStreamFuture, Duration.Inf) checksumStream.map { chkStream => - new ChecksumFSDataInputStream(mainStream, path, chkStream, threadPool) + new ChecksumFSDataInputStream(mainStream, path, chkStream, streamContext) }.getOrElse(mainStream) } @@ -250,13 +269,13 @@ class ChecksumCheckpointFileManager( // But if allowConcurrentDelete is enabled, then we can do it concurrently for perf. // But the client would be responsible for cleaning up potential orphan checksum files // if it happens. - val checksumInputStreamFuture = Future { + val checksumInputStreamFuture = scheduleOrRun { deleteChecksumFile(getChecksumPath(path)) - }(threadPool) + } - val mainInputStreamFuture = Future { + val mainInputStreamFuture = scheduleOrRun { underlyingFileMgr.delete(path) - }(threadPool) + } awaitResult(mainInputStreamFuture, Duration.Inf) awaitResult(checksumInputStreamFuture, Duration.Inf) @@ -282,18 +301,20 @@ class ChecksumCheckpointFileManager( } override def close(): Unit = { - threadPool.shutdown() - // Wait a bit for it to finish up in case there is any ongoing work - // Can consider making this timeout configurable, if needed - val timeoutMs = 500 - if (!threadPool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) { - logWarning(log"Thread pool did not shutdown after ${MDC(TIMEOUT, timeoutMs)} ms," + - log" forcing shutdown") - threadPool.shutdownNow() // stop the executing tasks - - // Wait a bit for the threads to respond - if (!threadPool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) { - logError(log"Thread pool did not terminate") + threadPool.foreach { pool => + pool.shutdown() + // Wait a bit for it to finish up in case there is any ongoing work + // Can consider making this timeout configurable, if needed + val timeoutMs = 500 + if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) { + logWarning(log"Thread pool did not shutdown after ${MDC(TIMEOUT, timeoutMs)} ms," + + log" forcing shutdown") + pool.shutdownNow() // stop the executing tasks + + // Wait a bit for the threads to respond + if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) { + logError(log"Thread pool did not terminate") + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 3303b414ccd35..52ad8d8c23792 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -533,11 +533,10 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with mgr, // Allowing this for perf, since we do orphan checksum file cleanup in maintenance anyway allowConcurrentDelete = true, - // We need 2 threads per fm caller to avoid blocking - // (one for main file and another for checksum file). - // Since this fm is used by both query task and maintenance thread, - // then we need 2 * 2 = 4 threads. - numThreads = 4, + // To avoid blocking, we need 2 threads per fm caller (one for main file, one for checksum + // file). Since this fm is used by both query task and maintenance thread, the recommended + // default is 2 * 2 = 4 threads. A value of 0 disables the thread pool (sequential mode). + numThreads = storeConf.fileChecksumThreadPoolSize, skipCreationIfFileMissingChecksum = storeConf.checkpointFileChecksumSkipCreationIfFileMissingChecksum) } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 28f1fe3b5cf42..8f60e04d33844 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -154,8 +154,15 @@ class RocksDB( // To avoid blocking, we need 2 threads per fm caller (one for main file, one for checksum file). // Since this fm is used by both query task and maintenance thread, the recommended default is - // 2 * 2 = 4 threads. A value of 1 disables concurrency (sequential execution). - protected val fileChecksumThreadPoolSize: Option[Int] = Some(conf.fileChecksumThreadPoolSize) + // 2 * 2 = 4 threads. A value of 0 disables the thread pool (sequential execution). + protected val fileChecksumThreadPoolSize: Option[Int] = { + val size = conf.fileChecksumThreadPoolSize + if (size != 4) { + logWarning(s"fileChecksumThreadPoolSize is set to $size, which differs from the " + + "recommended default of 4. Reducing below the default may have performance impact.") + } + Some(size) + } protected def createFileManager( dfsRootDir: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala index 5aa2a0ffc850a..5a3e875541d02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala @@ -116,8 +116,8 @@ class StateStoreConf( /** Whether file checksum generation and verification is enabled. */ val checkpointFileChecksumEnabled: Boolean = sqlConf.checkpointFileChecksumEnabled - /** Number of threads used to compute file checksums concurrently for RocksDB state store. */ - val fileChecksumThreadPoolSize: Int = sqlConf.stateStoreRocksDBFileChecksumThreadPoolSize + /** Number of threads for the file checksum thread pool (0 to disable). */ + val fileChecksumThreadPoolSize: Int = sqlConf.stateStoreFileChecksumThreadPoolSize /** whether to validate state schema during query run. */ val stateSchemaCheckEnabled = sqlConf.isStateSchemaCheckEnabled diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala index a1aadc3c95b11..be23edc1a97c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala @@ -251,27 +251,27 @@ abstract class ChecksumCheckpointFileManagerSuite extends CheckpointFileManagerT } } - test("numThreads = 1 is valid") { + test("numThreads = 0 disables thread pool") { withTempHadoopPath { basePath => val fm = new ChecksumCheckpointFileManager( createNoChecksumManager(basePath), allowConcurrentDelete = true, - numThreads = 1, + numThreads = 0, skipCreationIfFileMissingChecksum = false) fm.close() } } - test("odd numThreads > 1 is invalid") { + test("negative numThreads is invalid") { withTempHadoopPath { basePath => val ex = intercept[AssertionError] { new ChecksumCheckpointFileManager( createNoChecksumManager(basePath), allowConcurrentDelete = true, - numThreads = 3, + numThreads = -1, skipCreationIfFileMissingChecksum = false) } - assert(ex.getMessage.contains("numThreads must be 1 or a multiple of 2")) + assert(ex.getMessage.contains("numThreads must be a non-negative integer")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 689378ca5d9f9..bb02dad76ac69 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager -import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, ChecksumCheckpointFileManager, FileContextBasedCheckpointFileManager, FileSystemBasedCheckpointFileManager} +import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, FileContextBasedCheckpointFileManager, FileSystemBasedCheckpointFileManager} import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager.{CancellableFSDataOutputStream, RenameBasedFSDataOutputStream} import org.apache.spark.sql.execution.streaming.runtime.StreamExecution import org.apache.spark.sql.internal.SQLConf @@ -2995,63 +2995,6 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } - test("RocksDB: fileChecksumThreadPoolSize propagates to ChecksumCheckpointFileManager") { - withTempDir { dir => - val sqlConf = SQLConf.get.clone() - sqlConf.setConfString( - SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key, "true") - sqlConf.setConfString( - SQLConf.STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "6") - - val dbConf = RocksDBConf(StateStoreConf(sqlConf)) - assert(dbConf.fileChecksumThreadPoolSize === 6) - - val remoteDir = dir.getCanonicalPath - withDB(remoteDir, conf = dbConf) { db => - // Access the fm (lazy val) on fileManager via PrivateMethodTester - val fileManagerMethod = PrivateMethod[CheckpointFileManager](Symbol("fm")) - val fm = db.fileManager invokePrivate fileManagerMethod() - - // Verify it's a ChecksumCheckpointFileManager with the right thread count - assert(fm.isInstanceOf[ChecksumCheckpointFileManager]) - assert(fm.asInstanceOf[ChecksumCheckpointFileManager].numThreads === 6) - } - } - } - - test("RocksDB: fileChecksumThreadPoolSize of 1 is valid and propagates correctly") { - withTempDir { dir => - val sqlConf = SQLConf.get.clone() - sqlConf.setConfString( - SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key, "true") - sqlConf.setConfString( - SQLConf.STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "1") - - val dbConf = RocksDBConf(StateStoreConf(sqlConf)) - assert(dbConf.fileChecksumThreadPoolSize === 1) - - val remoteDir = dir.getCanonicalPath - withDB(remoteDir, conf = dbConf) { db => - val fileManagerMethod = PrivateMethod[CheckpointFileManager](Symbol("fm")) - val fm = db.fileManager invokePrivate fileManagerMethod() - - assert(fm.isInstanceOf[ChecksumCheckpointFileManager]) - assert(fm.asInstanceOf[ChecksumCheckpointFileManager].numThreads === 1) - } - } - } - - Seq("0", "3").foreach { invalidValue => - test(s"STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE: invalid value $invalidValue") { - val sqlConf = SQLConf.get.clone() - val ex = intercept[IllegalArgumentException] { - sqlConf.setConfString(SQLConf.STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE.key, - invalidValue) - } - assert(ex.getMessage.contains("Must be 1 or a positive even number")) - } - } - // Add tests to check valid and invalid values for max_open_files passed to the underlying // RocksDB instance. Seq("-1", "100", "1000").foreach { maxOpenFiles => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index d35263b655d2f..ba9b2dcb27a40 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -1547,6 +1547,35 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } } + test("fileChecksumThreadPoolSize propagates to ChecksumCheckpointFileManager") { + Seq(0, 1, 6).foreach { numThreads => + withTempDir { dir => + val sqlConf = SQLConf.get.clone() + sqlConf.setConfString(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key, "true") + sqlConf.setConfString( + SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key, numThreads.toString) + + val provider = newStoreProvider( + opId = Random.nextInt(), partition = 0, dir = dir.getCanonicalPath, + sqlConfOpt = Some(sqlConf)) + val fileManagerMethod = PrivateMethod[CheckpointFileManager](Symbol("fm")) + val fm = provider invokePrivate fileManagerMethod() + + assert(fm.isInstanceOf[ChecksumCheckpointFileManager]) + assert(fm.asInstanceOf[ChecksumCheckpointFileManager].numThreads === numThreads) + provider.close() + } + } + } + + test("STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE: invalid negative value is rejected") { + val sqlConf = SQLConf.get.clone() + val ex = intercept[IllegalArgumentException] { + sqlConf.setConfString(SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "-1") + } + assert(ex.getMessage.contains("Must be a non-negative integer")) + } + override def newStoreProvider(): HDFSBackedStateStoreProvider = { newStoreProvider(opId = Random.nextInt(), partition = 0) } From 14ca56194454c1a3f3502c397a82b30db98632cd Mon Sep 17 00:00:00 2001 From: Gurpreet Nanda Date: Mon, 2 Mar 2026 22:59:43 +0000 Subject: [PATCH 3/4] comment --- .../spark/sql/execution/streaming/state/RocksDB.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 8f60e04d33844..4b2967aee7d3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -157,9 +157,9 @@ class RocksDB( // 2 * 2 = 4 threads. A value of 0 disables the thread pool (sequential execution). protected val fileChecksumThreadPoolSize: Option[Int] = { val size = conf.fileChecksumThreadPoolSize - if (size != 4) { - logWarning(s"fileChecksumThreadPoolSize is set to $size, which differs from the " + - "recommended default of 4. Reducing below the default may have performance impact.") + if (size < 4) { + logWarning(s"fileChecksumThreadPoolSize is set to $size, which is below the " + + "recommended default of 4. This may have performance impact.") } Some(size) } From e4fbe752ddadb90692f3f59d2293c209e1589820 Mon Sep 17 00:00:00 2001 From: Gurpreet Nanda Date: Tue, 3 Mar 2026 21:54:55 +0000 Subject: [PATCH 4/4] comments --- .../apache/spark/sql/internal/SQLConf.scala | 8 +- .../ChecksumCheckpointFileManager.scala | 78 +++++------ .../state/HDFSBackedStateStoreProvider.scala | 7 +- .../ChecksumCheckpointFileManagerSuite.scala | 11 +- .../streaming/state/StateStoreSuite.scala | 130 ++++++++++++++---- 5 files changed, 156 insertions(+), 78 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9c44c7b10b2b2..99bcc5b804c46 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3687,10 +3687,10 @@ object SQLConf { val STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE = buildConf("spark.sql.streaming.stateStore.fileChecksumThreadPoolSize") .internal() - .doc("Number of threads used to compute file checksums concurrently when uploading " + - "state store checkpoints (e.g. main file and checksum file). " + - "Set to 0 to disable the thread pool and run operations sequentially on the calling " + - "thread. WARNING: Reducing below the default value of 4 may have performance impact.") + .doc("Number of threads used to read/write files and their corresponding checksum files " + + "concurrently. Set to 0 to disable the thread pool and run operations sequentially on " + + "the calling thread. WARNING: Reducing below the default value of 4 may have " + + "performance impact.") .version("4.2.0") .intConf .checkValue(x => x >= 0, "Must be a non-negative integer (0 to disable thread pool)") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala index 8f2f4bd520513..d5fbc563a3aba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala @@ -24,7 +24,6 @@ import java.util.zip.{CheckedInputStream, CheckedOutputStream, CRC32C} import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} import scala.concurrent.duration.Duration import scala.io.Source -import scala.util.control.NonFatal import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} @@ -128,14 +127,14 @@ case class ChecksumFile(path: Path) { * orphan checksum files. If using this, it is your responsibility * to clean up the potential orphan checksum files. * @param numThreads This is the number of threads to use for the thread pool, for reading/writing - * files. Must be a non-negative integer. - * Setting this to 0 disables the thread pool and runs all operations - * sequentially on the calling thread (no concurrency). - * To avoid blocking with a single concurrent caller, set this to 2 (one thread - * for main file, another for checksum file). - * If file manager is shared by multiple threads, set it to - * number of concurrent callers * 2. - * Setting this too low can lead to file operations being blocked waiting for + * files. Must be a non-negative integer. Setting this to 0 disables the thread + * pool and runs all operations sequentially on the calling thread. + * To avoid blocking, if the file manager instance is being used by a + * single thread, then you can set this to 2 (one thread for main file, another + * for checksum file). + * If file manager is shared by multiple threads, you can set it to + * number of threads using file manager * 2. + * Setting this differently can lead to file operation being blocked waiting for * a free thread. * @param skipCreationIfFileMissingChecksum (ES-1629547): If true, when a file already exists * but its checksum file does not exist, fall back to using the underlying @@ -157,29 +156,22 @@ class ChecksumCheckpointFileManager( import ChecksumCheckpointFileManager._ - // Thread pool for concurrent execution, or None when numThreads == 0 (sequential/inline mode). - private val threadPool: Option[ExecutionContextExecutorService] = + // This allows us to concurrently read/write the main file and checksum file + private val threadPoolOpt: Option[ExecutionContextExecutorService] = if (numThreads == 0) None else Some(ExecutionContext.fromExecutorService( ThreadUtils.newDaemonFixedThreadPool(numThreads, s"${this.getClass.getSimpleName}-Thread"))) - // ExecutionContext to pass to stream classes: uses the thread pool or a caller-runs context. - private val streamContext: ExecutionContext = threadPool.getOrElse(new ExecutionContext { - override def execute(runnable: Runnable): Unit = runnable.run() - override def reportFailure(cause: Throwable): Unit = throw cause - }) - - /** - * Schedules a computation on the thread pool, or runs it directly on the calling thread - * if numThreads == 0 (sequential mode). - */ - private def scheduleOrRun[T](f: => T): Future[T] = threadPool match { - case None => - try Future.successful(f) - catch { case NonFatal(t) => Future.failed(t) } - case Some(pool) => - Future { f }(pool) - } + // ExecutionContext used for I/O operations on ChecksumFSDataInputStream and + // ChecksumCancellableFSDataOutputStream: uses the thread pool when numThreads > 0, or + // runs operations synchronously on the calling thread when numThreads == 0. + private val executionContext: ExecutionContext = threadPoolOpt.getOrElse( + // This will execute the runnable synchronously on the calling thread + new ExecutionContext { + override def execute(runnable: Runnable): Unit = runnable.run() + override def reportFailure(cause: Throwable): Unit = throw cause + } + ) override def list(path: Path, filter: PathFilter): Array[FileStatus] = { underlyingFileMgr.list(path, filter) @@ -211,26 +203,26 @@ class ChecksumCheckpointFileManager( createFunc: Path => CancellableFSDataOutputStream): ChecksumCancellableFSDataOutputStream = { assert(!isChecksumFile(path), "Cannot directly create a checksum file") - val mainFileFuture = scheduleOrRun { + val mainFileFuture = Future { createFunc(path) - } + }(executionContext) - val checksumFileFuture = scheduleOrRun { + val checksumFileFuture = Future { createFunc(getChecksumPath(path)) - } + }(executionContext) new ChecksumCancellableFSDataOutputStream( awaitResult(mainFileFuture, Duration.Inf), path, awaitResult(checksumFileFuture, Duration.Inf), - streamContext + executionContext ) } override def open(path: Path): FSDataInputStream = { assert(!isChecksumFile(path), "Cannot directly open a checksum file") - val checksumInputStreamFuture = scheduleOrRun { + val checksumInputStreamFuture = Future { try { Some(underlyingFileMgr.open(getChecksumPath(path))) } catch { @@ -241,17 +233,17 @@ class ChecksumCheckpointFileManager( log"hence no checksum verification.") None } - } + }(executionContext) - val mainInputStreamFuture = scheduleOrRun { + val mainInputStreamFuture = Future { underlyingFileMgr.open(path) - } + }(executionContext) val mainStream = awaitResult(mainInputStreamFuture, Duration.Inf) val checksumStream = awaitResult(checksumInputStreamFuture, Duration.Inf) checksumStream.map { chkStream => - new ChecksumFSDataInputStream(mainStream, path, chkStream, streamContext) + new ChecksumFSDataInputStream(mainStream, path, chkStream, executionContext) }.getOrElse(mainStream) } @@ -269,13 +261,13 @@ class ChecksumCheckpointFileManager( // But if allowConcurrentDelete is enabled, then we can do it concurrently for perf. // But the client would be responsible for cleaning up potential orphan checksum files // if it happens. - val checksumInputStreamFuture = scheduleOrRun { + val checksumInputStreamFuture = Future { deleteChecksumFile(getChecksumPath(path)) - } + }(executionContext) - val mainInputStreamFuture = scheduleOrRun { + val mainInputStreamFuture = Future { underlyingFileMgr.delete(path) - } + }(executionContext) awaitResult(mainInputStreamFuture, Duration.Inf) awaitResult(checksumInputStreamFuture, Duration.Inf) @@ -301,7 +293,7 @@ class ChecksumCheckpointFileManager( } override def close(): Unit = { - threadPool.foreach { pool => + threadPoolOpt.foreach { pool => pool.shutdown() // Wait a bit for it to finish up in case there is any ongoing work // Can consider making this timeout configurable, if needed diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 52ad8d8c23792..33539d0d74b40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -529,6 +529,11 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with private[state] lazy val fm = { val mgr = CheckpointFileManager.create(baseDir, hadoopConf) if (storeConf.checkpointFileChecksumEnabled) { + val threadPoolSize = storeConf.fileChecksumThreadPoolSize + if (threadPoolSize < 4) { + logWarning(s"fileChecksumThreadPoolSize is set to $threadPoolSize, which is below the " + + "recommended default of 4. This may have performance impact.") + } new ChecksumCheckpointFileManager( mgr, // Allowing this for perf, since we do orphan checksum file cleanup in maintenance anyway @@ -536,7 +541,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with // To avoid blocking, we need 2 threads per fm caller (one for main file, one for checksum // file). Since this fm is used by both query task and maintenance thread, the recommended // default is 2 * 2 = 4 threads. A value of 0 disables the thread pool (sequential mode). - numThreads = storeConf.fileChecksumThreadPoolSize, + numThreads = threadPoolSize, skipCreationIfFileMissingChecksum = storeConf.checkpointFileChecksumSkipCreationIfFileMissingChecksum) } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala index be23edc1a97c5..f283cfeecf335 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManagerSuite.scala @@ -251,13 +251,22 @@ abstract class ChecksumCheckpointFileManagerSuite extends CheckpointFileManagerT } } - test("numThreads = 0 disables thread pool") { + test("numThreads = 0 disables thread pool (sequential mode)") { withTempHadoopPath { basePath => val fm = new ChecksumCheckpointFileManager( createNoChecksumManager(basePath), allowConcurrentDelete = true, numThreads = 0, skipCreationIfFileMissingChecksum = false) + val path = new Path(basePath, "testfile") + val checksumPath = getChecksumPath(path) + // Write a file (main + checksum) in sequential mode + fm.createAtomic(path, overwriteIfPossible = false).writeContent(42).close() + // Verify both the main file and checksum file were written to disk + assert(fm.exists(path), "Main file should exist after write") + assert(fm.exists(checksumPath), "Checksum file should exist after write") + // Read it back - readContent() closes the stream, which triggers checksum verification + assert(fm.open(path).readContent() == 42) fm.close() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index ba9b2dcb27a40..cb5d5b0a651e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -1547,35 +1547,6 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] } } - test("fileChecksumThreadPoolSize propagates to ChecksumCheckpointFileManager") { - Seq(0, 1, 6).foreach { numThreads => - withTempDir { dir => - val sqlConf = SQLConf.get.clone() - sqlConf.setConfString(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key, "true") - sqlConf.setConfString( - SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key, numThreads.toString) - - val provider = newStoreProvider( - opId = Random.nextInt(), partition = 0, dir = dir.getCanonicalPath, - sqlConfOpt = Some(sqlConf)) - val fileManagerMethod = PrivateMethod[CheckpointFileManager](Symbol("fm")) - val fm = provider invokePrivate fileManagerMethod() - - assert(fm.isInstanceOf[ChecksumCheckpointFileManager]) - assert(fm.asInstanceOf[ChecksumCheckpointFileManager].numThreads === numThreads) - provider.close() - } - } - } - - test("STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE: invalid negative value is rejected") { - val sqlConf = SQLConf.get.clone() - val ex = intercept[IllegalArgumentException] { - sqlConf.setConfString(SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "-1") - } - assert(ex.getMessage.contains("Must be a non-negative integer")) - } - override def newStoreProvider(): HDFSBackedStateStoreProvider = { newStoreProvider(opId = Random.nextInt(), partition = 0) } @@ -2494,6 +2465,107 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] } } + test("fileChecksumThreadPoolSize propagates to ChecksumCheckpointFileManager") { + Seq(0, 1, 6).foreach { numThreads => + val storeId = StateStoreId(newDir(), 0L, 0) + withSQLConf( + SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true", + SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key -> numThreads.toString) { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + val fmMethod = PrivateMethod[CheckpointFileManager](Symbol("fm")) + val fm = provider match { + case hdfs: HDFSBackedStateStoreProvider => + hdfs.fm + case rocksdb: RocksDBStateStoreProvider => + rocksdb.rocksDB.fileManager invokePrivate fmMethod() + case _ => + fail(s"Unexpected provider type: ${provider.getClass.getName}") + } + assert(fm.isInstanceOf[ChecksumCheckpointFileManager]) + assert(fm.asInstanceOf[ChecksumCheckpointFileManager].numThreads === numThreads) + } + } + } + } + + test("STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE: invalid negative value is rejected") { + val sqlConf = SQLConf.get.clone() + val ex = intercept[IllegalArgumentException] { + sqlConf.setConfString(SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "-1") + } + assert(ex.getMessage.contains("Must be a non-negative integer")) + } + + test("fileChecksumThreadPoolSize = 0 supports sequential I/O (load, write, commit, reload)") { + val storeId = StateStoreId(newDir(), 0L, 0) + withSQLConf( + SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true", + SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key -> "0") { + // Write some state with sequential mode enabled + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + val store = provider.getStore(0) + put(store, "a", 0, 1) + put(store, "b", 0, 2) + store.commit() + provider.doMaintenance() + } + + // Reload and verify state is intact + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + val store = provider.getStore(1) + assert(get(store, "a", 0) === Some(1)) + assert(get(store, "b", 0) === Some(2)) + store.abort() + } + } + } + + test("fileChecksumThreadPoolSize = 0: concurrent store commit and maintenance both complete") { + val storeId = StateStoreId(newDir(), 0L, 0) + withSQLConf( + SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key -> "true", + SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key -> "0") { + tryWithProviderResource(newStoreProviderWithClonedConf(storeId)) { provider => + // Build up a few versions so maintenance has something to work with + (0L until 3L).foreach { version => + putAndCommitStore(provider, version, doMaintenance = false) + } + + // Load the store and prepare the write before maintenance starts, so that + // store.commit() (the actual file I/O) is what overlaps with doMaintenance(). + val store = provider.getStore(3) + put(store, "3", 3, 300) + + val errors = new ConcurrentLinkedQueue[Throwable]() + val maintenanceStartedLatch = new CountDownLatch(1) + val maintenanceDoneLatch = new CountDownLatch(1) + + val maintenanceThread = new Thread(() => { + try { + maintenanceStartedLatch.countDown() + provider.doMaintenance() + } catch { + case t: Throwable => errors.add(t) + } finally { + maintenanceDoneLatch.countDown() + } + }) + maintenanceThread.setDaemon(true) + maintenanceThread.start() + + // Wait until maintenance is running, then commit to simulate concurrency. + assert(maintenanceStartedLatch.await(30, TimeUnit.SECONDS), + "Maintenance thread did not start within 30 seconds") + store.commit() + + assert(maintenanceDoneLatch.await(30, TimeUnit.SECONDS), + "Maintenance did not complete within 30 seconds") + assert(errors.isEmpty, + s"Maintenance failed with: ${Option(errors.peek()).map(_.getMessage).orNull}") + } + } + } + private def verifyChecksumFiles( dir: String, expectedNumFiles: Int, expectedNumChecksumFiles: Int): Unit = { val allFiles = new File(dir)