diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index a17c788d32079..1ebcc81a540fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -244,6 +244,9 @@ class RocksDB( // Was snapshot auto repair performed when loading the current version @volatile private var performedSnapshotAutoRepair = false + // Number of DFS (cloud) fetches performed when loading the current version + @volatile private var numCloudLoads = 0L + @volatile private var fileManagerMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS // SPARK-46249 - Keep track of recorded metrics per version which can be used for querying later @@ -515,10 +518,7 @@ class RocksDB( MDC(LogKeys.SNAPSHOT_VERSION, latestSnapshotVersion)}, latestSnapshotUniqueId: ${ MDC(LogKeys.UUID, latestSnapshotUniqueId)}") - val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, - workingDir, rocksDBFileMapping, latestSnapshotUniqueId) - - loadedVersion = latestSnapshotVersion + val metadata = fetchCheckpointFromDfs(latestSnapshotVersion, latestSnapshotUniqueId) // reset the last snapshot version to the latest available snapshot version lastSnapshotVersion = latestSnapshotVersion @@ -601,12 +601,30 @@ class RocksDB( private def loadEmptyStore(version: Long): Unit = { // Use version 0 logic to create empty directory with no SST files - val metadata = fileManager.loadCheckpointFromDfs(0, workingDir, rocksDBFileMapping, None) + val metadata = fetchCheckpointFromDfs(0) + // No real snapshot exists at this version; advance loadedVersion to the target + // so the next commit produces version + 1 rather than 1. loadedVersion = version fileManager.setMaxSeenVersion(version) openLocalRocksDB(metadata) } + /** + * Fetches a snapshot from DFS, sets [[loadedVersion]] to the snapshot version, + * and increments [[numCloudLoads]]. Returns the checkpoint metadata. + * Callers are responsible for calling [[openLocalRocksDB]], setting + * [[lastSnapshotVersion]], and any load-path-specific [[loadedVersion]] overrides. + */ + private def fetchCheckpointFromDfs( + snapshotVersion: Long, + uniqueId: Option[String] = None): RocksDBCheckpointMetadata = { + numCloudLoads += 1 + val metadata = fileManager.loadCheckpointFromDfs( + snapshotVersion, workingDir, rocksDBFileMapping, uniqueId) + loadedVersion = snapshotVersion + metadata + } + private def loadWithoutCheckpointId( version: Long, readOnly: Boolean = false, @@ -680,14 +698,12 @@ class RocksDB( override protected def beforeLoad(): Unit = closeDB(ignoreException = false) override protected def loadSnapshotFromCheckpoint(snapshotVersion: Long): Unit = { - val remoteMetaData = fileManager.loadCheckpointFromDfs(snapshotVersion, - workingDir, rocksDBFileMapping) + val metadata = fetchCheckpointFromDfs(snapshotVersion) - loadedVersion = snapshotVersion // Initialize maxVersion upon successful load from DFS fileManager.setMaxSeenVersion(snapshotVersion) - openLocalRocksDB(remoteMetaData) + openLocalRocksDB(metadata) // By setting this to the snapshot version we successfully loaded, // if auto snapshot repair is enabled, and we end up skipping the latest snapshot @@ -782,6 +798,7 @@ class RocksDB( assert(version >= 0) recordedMetrics = None performedSnapshotAutoRepair = false + numCloudLoads = 0L // Reset the load metrics before loading loadMetrics.clear() @@ -800,7 +817,8 @@ class RocksDB( // Record the metrics after loading val duration = System.currentTimeMillis() - startTime loadMetrics ++= Map( - "load" -> duration + "load" -> duration, + "numCloudLoads" -> numCloudLoads ) // Register with memory manager after successful load updateMemoryUsageIfNeeded() @@ -831,6 +849,7 @@ class RocksDB( assert(snapshotVersionStateStoreCkptId.isDefined == endVersionStateStoreCkptId.isDefined) assert(snapshotVersion >= 0 && endVersion >= snapshotVersion) recordedMetrics = None + numCloudLoads = 0L loadMetrics.clear() logInfo( @@ -856,7 +875,8 @@ class RocksDB( // Record the metrics after loading loadMetrics ++= Map( - "loadFromSnapshot" -> (System.currentTimeMillis() - startTime) + "loadFromSnapshot" -> (System.currentTimeMillis() - startTime), + "numCloudLoads" -> numCloudLoads ) this @@ -880,9 +900,7 @@ class RocksDB( assert(snapshotVersionStateStoreCkptId.isDefined == endVersionStateStoreCkptId.isDefined) closeDB() - val metadata = fileManager.loadCheckpointFromDfs(snapshotVersion, - workingDir, rocksDBFileMapping, snapshotVersionStateStoreCkptId) - loadedVersion = snapshotVersion + val metadata = fetchCheckpointFromDfs(snapshotVersion, snapshotVersionStateStoreCkptId) lastSnapshotVersion = snapshotVersion setInitialCFInfo() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 1dbf948e04345..f4ee82d63dc7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -659,6 +659,7 @@ private[sql] class RocksDBStateStoreProvider CUSTOM_METRIC_LOAD_TIME -> loadMetrics("load"), CUSTOM_METRIC_REPLAY_CHANGE_LOG -> loadMetrics("replayChangelog"), CUSTOM_METRIC_NUM_REPLAY_CHANGE_LOG_FILES -> loadMetrics("numReplayChangeLogFiles"), + CUSTOM_METRIC_LOADED_FROM_CLOUD -> loadMetrics("numCloudLoads"), CUSTOM_METRIC_BYTES_COPIED -> rocksDBMetrics.bytesCopied, CUSTOM_METRIC_FILES_COPIED -> rocksDBMetrics.filesCopied, CUSTOM_METRIC_FILES_REUSED -> rocksDBMetrics.filesReused, @@ -1367,6 +1368,10 @@ object RocksDBStateStoreProvider { "rocksdbNumReplayChangelogFiles", "RocksDB: load - number of change log files replayed") + val CUSTOM_METRIC_LOADED_FROM_CLOUD = StateStoreCustomSumMetric( + "rocksdbLoadedFromCloud", + "RocksDB: load - number of times state was loaded from cloud storage") + val CUSTOM_METRIC_BLOCK_CACHE_MISS = StateStoreCustomSumMetric( "rocksdbReadBlockCacheMissCount", "RocksDB: read - count of cache misses that required reading from local disk") @@ -1436,7 +1441,7 @@ object RocksDBStateStoreProvider { CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES, CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES, CUSTOM_METRIC_LOAD_FROM_SNAPSHOT_TIME, CUSTOM_METRIC_LOAD_TIME, CUSTOM_METRIC_REPLAY_CHANGE_LOG, CUSTOM_METRIC_NUM_REPLAY_CHANGE_LOG_FILES, CUSTOM_METRIC_NUM_SNAPSHOTS_AUTO_REPAIRED, - CUSTOM_METRIC_FORCE_SNAPSHOT) + CUSTOM_METRIC_FORCE_SNAPSHOT, CUSTOM_METRIC_LOADED_FROM_CLOUD) val CUSTOM_INSTANCE_METRIC_SNAPSHOT_LAST_UPLOADED = StateStoreSnapshotLastUploadInstanceMetric() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala index 89a0d914127e5..019f6bd50d7d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala @@ -114,7 +114,8 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest "SnapshotLastUploaded.partition_0_default", "rocksdbChangeLogWriterCommitLatencyMs", "rocksdbSaveZipFilesLatencyMs", "rocksdbLoadFromSnapshotLatencyMs", "rocksdbLoadLatencyMs", "rocksdbReplayChangeLogLatencyMs", - "rocksdbNumReplayChangelogFiles", "rocksdbForceSnapshotCount")) + "rocksdbNumReplayChangelogFiles", "rocksdbForceSnapshotCount", + "rocksdbLoadedFromCloud")) assert(stateOperatorMetrics.customMetrics.get("rocksdbNumSnapshotsAutoRepaired") == 0, "Should be 0 since we didn't repair any snapshot") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index bb02dad76ac69..89481758ffdd1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -2920,6 +2920,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession val m1 = db.metricsOpt.get assert(m1.loadMetrics("load") > 0) + assert(m1.loadMetrics("numCloudLoads") === 1) // since we called load, loadFromSnapshot should not be populated assert(!m1.loadMetrics.contains("loadFromSnapshot")) @@ -2956,6 +2957,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession db.refreshRecordedMetricsForTest() val m1 = db.metricsOpt.get assert(m1.loadMetrics("loadFromSnapshot") > 0) + assert(m1.loadMetrics("numCloudLoads") === 1) // since we called loadFromSnapshot, load should not be populated assert(!m1.loadMetrics.contains("load")) assert(m1.loadMetrics("replayChangelog") > 0)