Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3684,6 +3684,18 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE =
buildConf("spark.sql.streaming.stateStore.fileChecksumThreadPoolSize")
.internal()
.doc("Number of threads used to compute file checksums concurrently when uploading " +
"state store checkpoints (e.g. main file and checksum file). " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Number of threads used to read/write files and their corresponding checksum files concurrently

"Set to 0 to disable the thread pool and run operations sequentially on the calling " +
"thread. WARNING: Reducing below the default value of 4 may have performance impact.")
.version("4.2.0")
.intConf
.checkValue(x => x >= 0, "Must be a non-negative integer (0 to disable thread pool)")
.createWithDefault(4)

val PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION =
buildConf("spark.sql.statistics.parallelFileListingInStatsComputation.enabled")
.internal()
Expand Down Expand Up @@ -7173,6 +7185,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
def checkpointFileChecksumSkipCreationIfFileMissingChecksum: Boolean =
getConf(STREAMING_CHECKPOINT_FILE_CHECKSUM_SKIP_CREATION_IF_FILE_MISSING_CHECKSUM)

def stateStoreFileChecksumThreadPoolSize: Int =
getConf(STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE)

def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)

def useDeprecatedKafkaOffsetFetching: Boolean = getConf(USE_DEPRECATED_KAFKA_OFFSET_FETCHING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.zip.{CheckedInputStream, CheckedOutputStream, CRC32C}

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
import scala.concurrent.duration.Duration
import scala.io.Source
import scala.util.control.NonFatal

import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
Expand Down Expand Up @@ -127,12 +128,14 @@ case class ChecksumFile(path: Path) {
* orphan checksum files. If using this, it is your responsibility
* to clean up the potential orphan checksum files.
* @param numThreads This is the number of threads to use for the thread pool, for reading/writing
* files. To avoid blocking, if the file manager instance is being used by a
* single thread, then you can set this to 2 (one thread for main file, another
* for checksum file).
* If file manager is shared by multiple threads, you can set it to
* number of threads using file manager * 2.
* Setting this differently can lead to file operation being blocked waiting for
* files. Must be a non-negative integer.
* Setting this to 0 disables the thread pool and runs all operations
* sequentially on the calling thread (no concurrency).
* To avoid blocking with a single concurrent caller, set this to 2 (one thread
Copy link
Contributor

@micheal-o micheal-o Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid blocking

For this remaining statements, lets use the original statement. This new one is not very clear.

* for main file, another for checksum file).
* If file manager is shared by multiple threads, set it to
* number of concurrent callers * 2.
Comment on lines +136 to +137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this go over in the doc config? Also how would a user figure out how many threads are going to be accessing it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk No this is an internal detail. The config doc already has the user friendly info. Ideally the user shouldn't be changing this conf. It is just there if they want to disable it or reduce the default threadpool for some reason (e.g. resource constraint).

* Setting this too low can lead to file operations being blocked waiting for
* a free thread.
* @param skipCreationIfFileMissingChecksum (ES-1629547): If true, when a file already exists
* but its checksum file does not exist, fall back to using the underlying
Expand All @@ -150,14 +153,33 @@ class ChecksumCheckpointFileManager(
val numThreads: Int,
val skipCreationIfFileMissingChecksum: Boolean)
extends CheckpointFileManager with Logging {
assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1 for the main file" +
"and another for the checksum file")
assert(numThreads >= 0, "numThreads must be a non-negative integer")

import ChecksumCheckpointFileManager._

// This allows us to concurrently read/write the main file and checksum file
private val threadPool = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonFixedThreadPool(numThreads, s"${this.getClass.getSimpleName}-Thread"))
// Thread pool for concurrent execution, or None when numThreads == 0 (sequential/inline mode).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please lets also retain the original comment here

private val threadPool: Option[ExecutionContextExecutorService] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to threadPoolOpt to make it obvious

if (numThreads == 0) None
else Some(ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonFixedThreadPool(numThreads, s"${this.getClass.getSimpleName}-Thread")))

// ExecutionContext to pass to stream classes: uses the thread pool or a caller-runs context.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you mean input/outStream? Lets make it clear

private val streamContext: ExecutionContext = threadPool.getOrElse(new ExecutionContext {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this named streamContext?

override def execute(runnable: Runnable): Unit = runnable.run()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comment: This will execute the runnable synchronously

override def reportFailure(cause: Throwable): Unit = throw cause
})

/**
* Schedules a computation on the thread pool, or runs it directly on the calling thread
* if numThreads == 0 (sequential mode).
*/
private def scheduleOrRun[T](f: => T): Future[T] = threadPool match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have scheduleOrRun and the synchronous Exec Context above? Why not only use the sync Exec context. Not sure I get why we need both

case None =>
try Future.successful(f)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: try {}

catch { case NonFatal(t) => Future.failed(t) }
case Some(pool) =>
Future { f }(pool)
}

override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
underlyingFileMgr.list(path, filter)
Expand Down Expand Up @@ -189,26 +211,26 @@ class ChecksumCheckpointFileManager(
createFunc: Path => CancellableFSDataOutputStream): ChecksumCancellableFSDataOutputStream = {
assert(!isChecksumFile(path), "Cannot directly create a checksum file")

val mainFileFuture = Future {
val mainFileFuture = scheduleOrRun {
createFunc(path)
}(threadPool)
}

val checksumFileFuture = Future {
val checksumFileFuture = scheduleOrRun {
createFunc(getChecksumPath(path))
}(threadPool)
}

new ChecksumCancellableFSDataOutputStream(
awaitResult(mainFileFuture, Duration.Inf),
path,
awaitResult(checksumFileFuture, Duration.Inf),
threadPool
streamContext
)
}

override def open(path: Path): FSDataInputStream = {
assert(!isChecksumFile(path), "Cannot directly open a checksum file")

val checksumInputStreamFuture = Future {
val checksumInputStreamFuture = scheduleOrRun {
try {
Some(underlyingFileMgr.open(getChecksumPath(path)))
} catch {
Expand All @@ -219,17 +241,17 @@ class ChecksumCheckpointFileManager(
log"hence no checksum verification.")
None
}
}(threadPool)
}

val mainInputStreamFuture = Future {
val mainInputStreamFuture = scheduleOrRun {
underlyingFileMgr.open(path)
}(threadPool)
}

val mainStream = awaitResult(mainInputStreamFuture, Duration.Inf)
val checksumStream = awaitResult(checksumInputStreamFuture, Duration.Inf)

checksumStream.map { chkStream =>
new ChecksumFSDataInputStream(mainStream, path, chkStream, threadPool)
new ChecksumFSDataInputStream(mainStream, path, chkStream, streamContext)
}.getOrElse(mainStream)
}

Expand All @@ -247,13 +269,13 @@ class ChecksumCheckpointFileManager(
// But if allowConcurrentDelete is enabled, then we can do it concurrently for perf.
// But the client would be responsible for cleaning up potential orphan checksum files
// if it happens.
val checksumInputStreamFuture = Future {
val checksumInputStreamFuture = scheduleOrRun {
deleteChecksumFile(getChecksumPath(path))
}(threadPool)
}

val mainInputStreamFuture = Future {
val mainInputStreamFuture = scheduleOrRun {
underlyingFileMgr.delete(path)
}(threadPool)
}

awaitResult(mainInputStreamFuture, Duration.Inf)
awaitResult(checksumInputStreamFuture, Duration.Inf)
Expand All @@ -279,18 +301,20 @@ class ChecksumCheckpointFileManager(
}

override def close(): Unit = {
threadPool.shutdown()
// Wait a bit for it to finish up in case there is any ongoing work
// Can consider making this timeout configurable, if needed
val timeoutMs = 500
if (!threadPool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
logWarning(log"Thread pool did not shutdown after ${MDC(TIMEOUT, timeoutMs)} ms," +
log" forcing shutdown")
threadPool.shutdownNow() // stop the executing tasks

// Wait a bit for the threads to respond
if (!threadPool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
logError(log"Thread pool did not terminate")
threadPool.foreach { pool =>
pool.shutdown()
Comment on lines +304 to +305
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe silly but would it make sense to do this first foreach and then the subsequent blocking checkin? It's only during shutdown so not a big deal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk the threadPool here is an Option, hence at most one value

// Wait a bit for it to finish up in case there is any ongoing work
// Can consider making this timeout configurable, if needed
val timeoutMs = 500
if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
logWarning(log"Thread pool did not shutdown after ${MDC(TIMEOUT, timeoutMs)} ms," +
log" forcing shutdown")
pool.shutdownNow() // stop the executing tasks

// Wait a bit for the threads to respond
if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
logError(log"Thread pool did not terminate")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,11 +533,10 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
mgr,
// Allowing this for perf, since we do orphan checksum file cleanup in maintenance anyway
allowConcurrentDelete = true,
// We need 2 threads per fm caller to avoid blocking
// (one for main file and another for checksum file).
// Since this fm is used by both query task and maintenance thread,
// then we need 2 * 2 = 4 threads.
numThreads = 4,
// To avoid blocking, we need 2 threads per fm caller (one for main file, one for checksum
// file). Since this fm is used by both query task and maintenance thread, the recommended
// default is 2 * 2 = 4 threads. A value of 0 disables the thread pool (sequential mode).
numThreads = storeConf.fileChecksumThreadPoolSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets also do the logging for this too right

skipCreationIfFileMissingChecksum =
storeConf.checkpointFileChecksumSkipCreationIfFileMissingChecksum)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,17 @@ class RocksDB(

private val workingDir = createTempDir("workingDir")

// We need 2 threads per fm caller to avoid blocking
// (one for main file and another for checksum file).
// Since this fm is used by both query task and maintenance thread,
// then we need 2 * 2 = 4 threads.
protected val fileChecksumThreadPoolSize: Option[Int] = Some(4)
// To avoid blocking, we need 2 threads per fm caller (one for main file, one for checksum file).
// Since this fm is used by both query task and maintenance thread, the recommended default is
// 2 * 2 = 4 threads. A value of 0 disables the thread pool (sequential execution).
protected val fileChecksumThreadPoolSize: Option[Int] = {
val size = conf.fileChecksumThreadPoolSize
if (size < 4) {
logWarning(s"fileChecksumThreadPoolSize is set to $size, which is below the " +
"recommended default of 4. This may have performance impact.")
}
Some(size)
}

protected def createFileManager(
dfsRootDir: String,
Expand Down Expand Up @@ -2404,6 +2410,7 @@ case class RocksDBConf(
reportSnapshotUploadLag: Boolean,
maxVersionsToDeletePerMaintenance: Int,
fileChecksumEnabled: Boolean,
fileChecksumThreadPoolSize: Int,
rowChecksumEnabled: Boolean,
rowChecksumReadVerificationRatio: Long,
mergeOperatorVersion: Int,
Expand Down Expand Up @@ -2619,6 +2626,7 @@ object RocksDBConf {
storeConf.reportSnapshotUploadLag,
storeConf.maxVersionsToDeletePerMaintenance,
storeConf.checkpointFileChecksumEnabled,
storeConf.fileChecksumThreadPoolSize,
storeConf.rowChecksumEnabled,
storeConf.rowChecksumReadVerificationRatio,
getPositiveIntConf(MERGE_OPERATOR_VERSION_CONF),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ class StateStoreConf(
/** Whether file checksum generation and verification is enabled. */
val checkpointFileChecksumEnabled: Boolean = sqlConf.checkpointFileChecksumEnabled

/** Number of threads for the file checksum thread pool (0 to disable). */
val fileChecksumThreadPoolSize: Int = sqlConf.stateStoreFileChecksumThreadPoolSize

/** whether to validate state schema during query run. */
val stateSchemaCheckEnabled = sqlConf.isStateSchemaCheckEnabled

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,30 @@ abstract class ChecksumCheckpointFileManagerSuite extends CheckpointFileManagerT
checksumFmWithoutFallback.close()
}
}

test("numThreads = 0 disables thread pool") {
withTempHadoopPath { basePath =>
val fm = new ChecksumCheckpointFileManager(
createNoChecksumManager(basePath),
allowConcurrentDelete = true,
numThreads = 0,
skipCreationIfFileMissingChecksum = false)
fm.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't testing much. We should write and read a file to confirm that main and checksum files are written and read

}
}

test("negative numThreads is invalid") {
withTempHadoopPath { basePath =>
val ex = intercept[AssertionError] {
new ChecksumCheckpointFileManager(
createNoChecksumManager(basePath),
allowConcurrentDelete = true,
numThreads = -1,
skipCreationIfFileMissingChecksum = false)
}
assert(ex.getMessage.contains("numThreads must be a non-negative integer"))
}
}
}

class FileContextChecksumCheckpointFileManagerSuite extends ChecksumCheckpointFileManagerSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1547,6 +1547,35 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
}
}

test("fileChecksumThreadPoolSize propagates to ChecksumCheckpointFileManager") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test cases are currently under StateStoreSuite, if you look at the class def, it is for HDFS store only. This should be under StateStoreSuiteBase so that is runs for rocksdb too

Seq(0, 1, 6).foreach { numThreads =>
withTempDir { dir =>
val sqlConf = SQLConf.get.clone()
sqlConf.setConfString(SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key, "true")
sqlConf.setConfString(
SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key, numThreads.toString)

val provider = newStoreProvider(
opId = Random.nextInt(), partition = 0, dir = dir.getCanonicalPath,
sqlConfOpt = Some(sqlConf))
val fileManagerMethod = PrivateMethod[CheckpointFileManager](Symbol("fm"))
val fm = provider invokePrivate fileManagerMethod()

assert(fm.isInstanceOf[ChecksumCheckpointFileManager])
assert(fm.asInstanceOf[ChecksumCheckpointFileManager].numThreads === numThreads)
provider.close()
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets also add test case with threadpool disabled to make sure using the state store works i.e. load, write, get, commit. Reload.. etc. You can also do concurrent store.commit and maintenance, to make sure they both finish


test("STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE: invalid negative value is rejected") {
val sqlConf = SQLConf.get.clone()
val ex = intercept[IllegalArgumentException] {
sqlConf.setConfString(SQLConf.STATE_STORE_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "-1")
}
assert(ex.getMessage.contains("Must be a non-negative integer"))
}

override def newStoreProvider(): HDFSBackedStateStoreProvider = {
newStoreProvider(opId = Random.nextInt(), partition = 0)
}
Expand Down