From 910b000d47921f7a16630bc23fb1da4ce72b018c Mon Sep 17 00:00:00 2001 From: Gurpreet Nanda Date: Mon, 2 Mar 2026 04:35:18 +0000 Subject: [PATCH 1/3] [SPARK-55751)][SS] Add metrics on state store loads from cloud storage --- .../execution/streaming/state/RocksDB.scala | 48 +++++++++++++------ .../state/RocksDBStateStoreProvider.scala | 7 ++- .../RocksDBStateStoreIntegrationSuite.scala | 3 +- .../streaming/state/RocksDBSuite.scala | 2 + 4 files changed, 43 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index a17c788d32079..e0bb6e4b6719b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -244,6 +244,9 @@ class RocksDB( // Was snapshot auto repair performed when loading the current version @volatile private var performedSnapshotAutoRepair = false + // Was a DFS (cloud) fetch performed when loading the current version + @volatile private var loadedFromCloud = 0 + @volatile private var fileManagerMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS // SPARK-46249 - Keep track of recorded metrics per version which can be used for querying later @@ -515,10 +518,7 @@ class RocksDB( MDC(LogKeys.SNAPSHOT_VERSION, latestSnapshotVersion)}, latestSnapshotUniqueId: ${ MDC(LogKeys.UUID, latestSnapshotUniqueId)}") - val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, - workingDir, rocksDBFileMapping, latestSnapshotUniqueId) - - loadedVersion = latestSnapshotVersion + val metadata = fetchCheckpointFromDfs(latestSnapshotVersion, latestSnapshotUniqueId) // reset the last snapshot version to the latest available snapshot version lastSnapshotVersion = latestSnapshotVersion @@ -601,12 +601,30 @@ class RocksDB( private def loadEmptyStore(version: Long): Unit = { // Use version 0 logic to create empty directory with no SST files - val metadata = fileManager.loadCheckpointFromDfs(0, workingDir, rocksDBFileMapping, None) + val metadata = fetchCheckpointFromDfs(0) + // No real snapshot exists at this version; advance loadedVersion to the target + // so the next commit produces version + 1 rather than 1. loadedVersion = version fileManager.setMaxSeenVersion(version) openLocalRocksDB(metadata) } + /** + * Fetches a snapshot from DFS, sets [[loadedVersion]] to the snapshot version, + * and increments [[loadedFromCloud]]. Returns the checkpoint metadata. + * Callers are responsible for calling [[openLocalRocksDB]], setting + * [[lastSnapshotVersion]], and any load-path-specific [[loadedVersion]] overrides. + */ + private def fetchCheckpointFromDfs( + snapshotVersion: Long, + uniqueId: Option[String] = None): RocksDBCheckpointMetadata = { + loadedFromCloud += 1 + val metadata = fileManager.loadCheckpointFromDfs( + snapshotVersion, workingDir, rocksDBFileMapping, uniqueId) + loadedVersion = snapshotVersion + metadata + } + private def loadWithoutCheckpointId( version: Long, readOnly: Boolean = false, @@ -680,17 +698,15 @@ class RocksDB( override protected def beforeLoad(): Unit = closeDB(ignoreException = false) override protected def loadSnapshotFromCheckpoint(snapshotVersion: Long): Unit = { - val remoteMetaData = fileManager.loadCheckpointFromDfs(snapshotVersion, - workingDir, rocksDBFileMapping) + val metadata = fetchCheckpointFromDfs(snapshotVersion) - loadedVersion = snapshotVersion // Initialize maxVersion upon successful load from DFS fileManager.setMaxSeenVersion(snapshotVersion) - openLocalRocksDB(remoteMetaData) + openLocalRocksDB(metadata) // By setting this to the snapshot version we successfully loaded, - // if auto snapshot repair is enabled, and we end up skipping the latest snapshot + // if auto snapshot repair is enabled and we end up skipping the latest snapshot // and used an older one, we will create a new snapshot at commit time // if the loaded one is old enough. lastSnapshotVersion = snapshotVersion @@ -782,6 +798,7 @@ class RocksDB( assert(version >= 0) recordedMetrics = None performedSnapshotAutoRepair = false + loadedFromCloud = 0 // Reset the load metrics before loading loadMetrics.clear() @@ -800,7 +817,8 @@ class RocksDB( // Record the metrics after loading val duration = System.currentTimeMillis() - startTime loadMetrics ++= Map( - "load" -> duration + "load" -> duration, + "loadedFromCloud" -> loadedFromCloud.toLong ) // Register with memory manager after successful load updateMemoryUsageIfNeeded() @@ -831,6 +849,7 @@ class RocksDB( assert(snapshotVersionStateStoreCkptId.isDefined == endVersionStateStoreCkptId.isDefined) assert(snapshotVersion >= 0 && endVersion >= snapshotVersion) recordedMetrics = None + loadedFromCloud = 0 loadMetrics.clear() logInfo( @@ -856,7 +875,8 @@ class RocksDB( // Record the metrics after loading loadMetrics ++= Map( - "loadFromSnapshot" -> (System.currentTimeMillis() - startTime) + "loadFromSnapshot" -> (System.currentTimeMillis() - startTime), + "loadedFromCloud" -> loadedFromCloud.toLong ) this @@ -880,9 +900,7 @@ class RocksDB( assert(snapshotVersionStateStoreCkptId.isDefined == endVersionStateStoreCkptId.isDefined) closeDB() - val metadata = fileManager.loadCheckpointFromDfs(snapshotVersion, - workingDir, rocksDBFileMapping, snapshotVersionStateStoreCkptId) - loadedVersion = snapshotVersion + val metadata = fetchCheckpointFromDfs(snapshotVersion, snapshotVersionStateStoreCkptId) lastSnapshotVersion = snapshotVersion setInitialCFInfo() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 1dbf948e04345..c3105d8bc0867 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -659,6 +659,7 @@ private[sql] class RocksDBStateStoreProvider CUSTOM_METRIC_LOAD_TIME -> loadMetrics("load"), CUSTOM_METRIC_REPLAY_CHANGE_LOG -> loadMetrics("replayChangelog"), CUSTOM_METRIC_NUM_REPLAY_CHANGE_LOG_FILES -> loadMetrics("numReplayChangeLogFiles"), + CUSTOM_METRIC_LOADED_FROM_CLOUD -> loadMetrics("loadedFromCloud"), CUSTOM_METRIC_BYTES_COPIED -> rocksDBMetrics.bytesCopied, CUSTOM_METRIC_FILES_COPIED -> rocksDBMetrics.filesCopied, CUSTOM_METRIC_FILES_REUSED -> rocksDBMetrics.filesReused, @@ -1367,6 +1368,10 @@ object RocksDBStateStoreProvider { "rocksdbNumReplayChangelogFiles", "RocksDB: load - number of change log files replayed") + val CUSTOM_METRIC_LOADED_FROM_CLOUD = StateStoreCustomSumMetric( + "rocksdbLoadedFromCloud", + "RocksDB: load - number of times state was loaded from cloud storage") + val CUSTOM_METRIC_BLOCK_CACHE_MISS = StateStoreCustomSumMetric( "rocksdbReadBlockCacheMissCount", "RocksDB: read - count of cache misses that required reading from local disk") @@ -1436,7 +1441,7 @@ object RocksDBStateStoreProvider { CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES, CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES, CUSTOM_METRIC_LOAD_FROM_SNAPSHOT_TIME, CUSTOM_METRIC_LOAD_TIME, CUSTOM_METRIC_REPLAY_CHANGE_LOG, CUSTOM_METRIC_NUM_REPLAY_CHANGE_LOG_FILES, CUSTOM_METRIC_NUM_SNAPSHOTS_AUTO_REPAIRED, - CUSTOM_METRIC_FORCE_SNAPSHOT) + CUSTOM_METRIC_FORCE_SNAPSHOT, CUSTOM_METRIC_LOADED_FROM_CLOUD) val CUSTOM_INSTANCE_METRIC_SNAPSHOT_LAST_UPLOADED = StateStoreSnapshotLastUploadInstanceMetric() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala index 89a0d914127e5..019f6bd50d7d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala @@ -114,7 +114,8 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest "SnapshotLastUploaded.partition_0_default", "rocksdbChangeLogWriterCommitLatencyMs", "rocksdbSaveZipFilesLatencyMs", "rocksdbLoadFromSnapshotLatencyMs", "rocksdbLoadLatencyMs", "rocksdbReplayChangeLogLatencyMs", - "rocksdbNumReplayChangelogFiles", "rocksdbForceSnapshotCount")) + "rocksdbNumReplayChangelogFiles", "rocksdbForceSnapshotCount", + "rocksdbLoadedFromCloud")) assert(stateOperatorMetrics.customMetrics.get("rocksdbNumSnapshotsAutoRepaired") == 0, "Should be 0 since we didn't repair any snapshot") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index bb02dad76ac69..84e4299cc9a52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -2920,6 +2920,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession val m1 = db.metricsOpt.get assert(m1.loadMetrics("load") > 0) + assert(m1.loadMetrics("loadedFromCloud") === 1) // since we called load, loadFromSnapshot should not be populated assert(!m1.loadMetrics.contains("loadFromSnapshot")) @@ -2956,6 +2957,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession db.refreshRecordedMetricsForTest() val m1 = db.metricsOpt.get assert(m1.loadMetrics("loadFromSnapshot") > 0) + assert(m1.loadMetrics("loadedFromCloud") === 1) // since we called loadFromSnapshot, load should not be populated assert(!m1.loadMetrics.contains("load")) assert(m1.loadMetrics("replayChangelog") > 0) From 22cd93daa4d3b887b91f729dc03839332c39cae8 Mon Sep 17 00:00:00 2001 From: Gurpreet Nanda Date: Mon, 2 Mar 2026 23:27:40 +0000 Subject: [PATCH 2/3] comments --- .../execution/streaming/state/RocksDB.scala | 18 +++++++++--------- .../state/RocksDBStateStoreProvider.scala | 2 +- .../streaming/state/RocksDBSuite.scala | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index e0bb6e4b6719b..1ebcc81a540fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -244,8 +244,8 @@ class RocksDB( // Was snapshot auto repair performed when loading the current version @volatile private var performedSnapshotAutoRepair = false - // Was a DFS (cloud) fetch performed when loading the current version - @volatile private var loadedFromCloud = 0 + // Number of DFS (cloud) fetches performed when loading the current version + @volatile private var numCloudLoads = 0L @volatile private var fileManagerMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS @@ -611,14 +611,14 @@ class RocksDB( /** * Fetches a snapshot from DFS, sets [[loadedVersion]] to the snapshot version, - * and increments [[loadedFromCloud]]. Returns the checkpoint metadata. + * and increments [[numCloudLoads]]. Returns the checkpoint metadata. * Callers are responsible for calling [[openLocalRocksDB]], setting * [[lastSnapshotVersion]], and any load-path-specific [[loadedVersion]] overrides. */ private def fetchCheckpointFromDfs( snapshotVersion: Long, uniqueId: Option[String] = None): RocksDBCheckpointMetadata = { - loadedFromCloud += 1 + numCloudLoads += 1 val metadata = fileManager.loadCheckpointFromDfs( snapshotVersion, workingDir, rocksDBFileMapping, uniqueId) loadedVersion = snapshotVersion @@ -706,7 +706,7 @@ class RocksDB( openLocalRocksDB(metadata) // By setting this to the snapshot version we successfully loaded, - // if auto snapshot repair is enabled and we end up skipping the latest snapshot + // if auto snapshot repair is enabled, and we end up skipping the latest snapshot // and used an older one, we will create a new snapshot at commit time // if the loaded one is old enough. lastSnapshotVersion = snapshotVersion @@ -798,7 +798,7 @@ class RocksDB( assert(version >= 0) recordedMetrics = None performedSnapshotAutoRepair = false - loadedFromCloud = 0 + numCloudLoads = 0L // Reset the load metrics before loading loadMetrics.clear() @@ -818,7 +818,7 @@ class RocksDB( val duration = System.currentTimeMillis() - startTime loadMetrics ++= Map( "load" -> duration, - "loadedFromCloud" -> loadedFromCloud.toLong + "numCloudLoads" -> numCloudLoads ) // Register with memory manager after successful load updateMemoryUsageIfNeeded() @@ -849,7 +849,7 @@ class RocksDB( assert(snapshotVersionStateStoreCkptId.isDefined == endVersionStateStoreCkptId.isDefined) assert(snapshotVersion >= 0 && endVersion >= snapshotVersion) recordedMetrics = None - loadedFromCloud = 0 + numCloudLoads = 0L loadMetrics.clear() logInfo( @@ -876,7 +876,7 @@ class RocksDB( // Record the metrics after loading loadMetrics ++= Map( "loadFromSnapshot" -> (System.currentTimeMillis() - startTime), - "loadedFromCloud" -> loadedFromCloud.toLong + "numCloudLoads" -> numCloudLoads ) this diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index c3105d8bc0867..f4ee82d63dc7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -659,7 +659,7 @@ private[sql] class RocksDBStateStoreProvider CUSTOM_METRIC_LOAD_TIME -> loadMetrics("load"), CUSTOM_METRIC_REPLAY_CHANGE_LOG -> loadMetrics("replayChangelog"), CUSTOM_METRIC_NUM_REPLAY_CHANGE_LOG_FILES -> loadMetrics("numReplayChangeLogFiles"), - CUSTOM_METRIC_LOADED_FROM_CLOUD -> loadMetrics("loadedFromCloud"), + CUSTOM_METRIC_LOADED_FROM_CLOUD -> loadMetrics("numCloudLoads"), CUSTOM_METRIC_BYTES_COPIED -> rocksDBMetrics.bytesCopied, CUSTOM_METRIC_FILES_COPIED -> rocksDBMetrics.filesCopied, CUSTOM_METRIC_FILES_REUSED -> rocksDBMetrics.filesReused, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 84e4299cc9a52..89481758ffdd1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -2920,7 +2920,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession val m1 = db.metricsOpt.get assert(m1.loadMetrics("load") > 0) - assert(m1.loadMetrics("loadedFromCloud") === 1) + assert(m1.loadMetrics("numCloudLoads") === 1) // since we called load, loadFromSnapshot should not be populated assert(!m1.loadMetrics.contains("loadFromSnapshot")) @@ -2957,7 +2957,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession db.refreshRecordedMetricsForTest() val m1 = db.metricsOpt.get assert(m1.loadMetrics("loadFromSnapshot") > 0) - assert(m1.loadMetrics("loadedFromCloud") === 1) + assert(m1.loadMetrics("numCloudLoads") === 1) // since we called loadFromSnapshot, load should not be populated assert(!m1.loadMetrics.contains("load")) assert(m1.loadMetrics("replayChangelog") > 0) From 73e68506ccc8e72ebf3c4558d3abde4b1a485ae8 Mon Sep 17 00:00:00 2001 From: Gurpreet Nanda Date: Tue, 3 Mar 2026 23:05:03 +0000 Subject: [PATCH 3/3] add auto repair test --- .../spark/sql/execution/streaming/state/RocksDBSuite.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 89481758ffdd1..7f6df6e42c05e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -4075,6 +4075,8 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession db.put("e", "4") db.commit() // a new snapshot (5.zip) will be created since previous one is corrupt assert(db.metricsOpt.get.numSnapshotsAutoRepaired == 1) + // 4.zip was tried and failed (1 load), then 2.zip succeeded (2 loads) + assert(db.metricsOpt.get.loadMetrics("numCloudLoads") === 2) db.doMaintenance() // upload snapshot 5.zip } @@ -4087,6 +4089,8 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession assert(toStr(db.get("b")) == "1") db.commit() assert(db.metricsOpt.get.numSnapshotsAutoRepaired == 1) + // 5.zip failed (1), 4.zip failed (2), 2.zip failed (3), then version 0 succeeded (4) + assert(db.metricsOpt.get.loadMetrics("numCloudLoads") === 4) } } }