Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ class RocksDB(
// Was snapshot auto repair performed when loading the current version
@volatile private var performedSnapshotAutoRepair = false

// Number of DFS (cloud) fetches performed when loading the current version
@volatile private var numCloudLoads = 0L

@volatile private var fileManagerMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS

// SPARK-46249 - Keep track of recorded metrics per version which can be used for querying later
Expand Down Expand Up @@ -515,10 +518,7 @@ class RocksDB(
MDC(LogKeys.SNAPSHOT_VERSION, latestSnapshotVersion)}, latestSnapshotUniqueId: ${
MDC(LogKeys.UUID, latestSnapshotUniqueId)}")

val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion,
workingDir, rocksDBFileMapping, latestSnapshotUniqueId)

loadedVersion = latestSnapshotVersion
val metadata = fetchCheckpointFromDfs(latestSnapshotVersion, latestSnapshotUniqueId)

// reset the last snapshot version to the latest available snapshot version
lastSnapshotVersion = latestSnapshotVersion
Expand Down Expand Up @@ -601,12 +601,30 @@ class RocksDB(

private def loadEmptyStore(version: Long): Unit = {
// Use version 0 logic to create empty directory with no SST files
val metadata = fileManager.loadCheckpointFromDfs(0, workingDir, rocksDBFileMapping, None)
val metadata = fetchCheckpointFromDfs(0)
// No real snapshot exists at this version; advance loadedVersion to the target
// so the next commit produces version + 1 rather than 1.
loadedVersion = version
fileManager.setMaxSeenVersion(version)
openLocalRocksDB(metadata)
}

/**
* Fetches a snapshot from DFS, sets [[loadedVersion]] to the snapshot version,
* and increments [[numCloudLoads]]. Returns the checkpoint metadata.
* Callers are responsible for calling [[openLocalRocksDB]], setting
* [[lastSnapshotVersion]], and any load-path-specific [[loadedVersion]] overrides.
*/
private def fetchCheckpointFromDfs(
snapshotVersion: Long,
uniqueId: Option[String] = None): RocksDBCheckpointMetadata = {
numCloudLoads += 1
val metadata = fileManager.loadCheckpointFromDfs(
snapshotVersion, workingDir, rocksDBFileMapping, uniqueId)
loadedVersion = snapshotVersion
metadata
}

private def loadWithoutCheckpointId(
version: Long,
readOnly: Boolean = false,
Expand Down Expand Up @@ -680,14 +698,12 @@ class RocksDB(
override protected def beforeLoad(): Unit = closeDB(ignoreException = false)

override protected def loadSnapshotFromCheckpoint(snapshotVersion: Long): Unit = {
val remoteMetaData = fileManager.loadCheckpointFromDfs(snapshotVersion,
workingDir, rocksDBFileMapping)
val metadata = fetchCheckpointFromDfs(snapshotVersion)

loadedVersion = snapshotVersion
// Initialize maxVersion upon successful load from DFS
fileManager.setMaxSeenVersion(snapshotVersion)

openLocalRocksDB(remoteMetaData)
openLocalRocksDB(metadata)

// By setting this to the snapshot version we successfully loaded,
// if auto snapshot repair is enabled, and we end up skipping the latest snapshot
Expand Down Expand Up @@ -782,6 +798,7 @@ class RocksDB(
assert(version >= 0)
recordedMetrics = None
performedSnapshotAutoRepair = false
numCloudLoads = 0L
// Reset the load metrics before loading
loadMetrics.clear()

Expand All @@ -800,7 +817,8 @@ class RocksDB(
// Record the metrics after loading
val duration = System.currentTimeMillis() - startTime
loadMetrics ++= Map(
"load" -> duration
"load" -> duration,
"numCloudLoads" -> numCloudLoads
)
// Register with memory manager after successful load
updateMemoryUsageIfNeeded()
Expand Down Expand Up @@ -831,6 +849,7 @@ class RocksDB(
assert(snapshotVersionStateStoreCkptId.isDefined == endVersionStateStoreCkptId.isDefined)
assert(snapshotVersion >= 0 && endVersion >= snapshotVersion)
recordedMetrics = None
numCloudLoads = 0L
loadMetrics.clear()

logInfo(
Expand All @@ -856,7 +875,8 @@ class RocksDB(

// Record the metrics after loading
loadMetrics ++= Map(
"loadFromSnapshot" -> (System.currentTimeMillis() - startTime)
"loadFromSnapshot" -> (System.currentTimeMillis() - startTime),
"numCloudLoads" -> numCloudLoads
)

this
Expand All @@ -880,9 +900,7 @@ class RocksDB(
assert(snapshotVersionStateStoreCkptId.isDefined == endVersionStateStoreCkptId.isDefined)

closeDB()
val metadata = fileManager.loadCheckpointFromDfs(snapshotVersion,
workingDir, rocksDBFileMapping, snapshotVersionStateStoreCkptId)
loadedVersion = snapshotVersion
val metadata = fetchCheckpointFromDfs(snapshotVersion, snapshotVersionStateStoreCkptId)
lastSnapshotVersion = snapshotVersion

setInitialCFInfo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ private[sql] class RocksDBStateStoreProvider
CUSTOM_METRIC_LOAD_TIME -> loadMetrics("load"),
CUSTOM_METRIC_REPLAY_CHANGE_LOG -> loadMetrics("replayChangelog"),
CUSTOM_METRIC_NUM_REPLAY_CHANGE_LOG_FILES -> loadMetrics("numReplayChangeLogFiles"),
CUSTOM_METRIC_LOADED_FROM_CLOUD -> loadMetrics("numCloudLoads"),
CUSTOM_METRIC_BYTES_COPIED -> rocksDBMetrics.bytesCopied,
CUSTOM_METRIC_FILES_COPIED -> rocksDBMetrics.filesCopied,
CUSTOM_METRIC_FILES_REUSED -> rocksDBMetrics.filesReused,
Expand Down Expand Up @@ -1367,6 +1368,10 @@ object RocksDBStateStoreProvider {
"rocksdbNumReplayChangelogFiles",
"RocksDB: load - number of change log files replayed")

val CUSTOM_METRIC_LOADED_FROM_CLOUD = StateStoreCustomSumMetric(
"rocksdbLoadedFromCloud",
"RocksDB: load - number of times state was loaded from cloud storage")

val CUSTOM_METRIC_BLOCK_CACHE_MISS = StateStoreCustomSumMetric(
"rocksdbReadBlockCacheMissCount",
"RocksDB: read - count of cache misses that required reading from local disk")
Expand Down Expand Up @@ -1436,7 +1441,7 @@ object RocksDBStateStoreProvider {
CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES, CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES,
CUSTOM_METRIC_LOAD_FROM_SNAPSHOT_TIME, CUSTOM_METRIC_LOAD_TIME, CUSTOM_METRIC_REPLAY_CHANGE_LOG,
CUSTOM_METRIC_NUM_REPLAY_CHANGE_LOG_FILES, CUSTOM_METRIC_NUM_SNAPSHOTS_AUTO_REPAIRED,
CUSTOM_METRIC_FORCE_SNAPSHOT)
CUSTOM_METRIC_FORCE_SNAPSHOT, CUSTOM_METRIC_LOADED_FROM_CLOUD)

val CUSTOM_INSTANCE_METRIC_SNAPSHOT_LAST_UPLOADED = StateStoreSnapshotLastUploadInstanceMetric()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
"SnapshotLastUploaded.partition_0_default", "rocksdbChangeLogWriterCommitLatencyMs",
"rocksdbSaveZipFilesLatencyMs", "rocksdbLoadFromSnapshotLatencyMs",
"rocksdbLoadLatencyMs", "rocksdbReplayChangeLogLatencyMs",
"rocksdbNumReplayChangelogFiles", "rocksdbForceSnapshotCount"))
"rocksdbNumReplayChangelogFiles", "rocksdbForceSnapshotCount",
"rocksdbLoadedFromCloud"))
assert(stateOperatorMetrics.customMetrics.get("rocksdbNumSnapshotsAutoRepaired") == 0,
"Should be 0 since we didn't repair any snapshot")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2920,6 +2920,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession

val m1 = db.metricsOpt.get
assert(m1.loadMetrics("load") > 0)
assert(m1.loadMetrics("numCloudLoads") === 1)
// since we called load, loadFromSnapshot should not be populated
assert(!m1.loadMetrics.contains("loadFromSnapshot"))

Expand Down Expand Up @@ -2956,6 +2957,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
db.refreshRecordedMetricsForTest()
val m1 = db.metricsOpt.get
assert(m1.loadMetrics("loadFromSnapshot") > 0)
assert(m1.loadMetrics("numCloudLoads") === 1)
// since we called loadFromSnapshot, load should not be populated
assert(!m1.loadMetrics.contains("load"))
assert(m1.loadMetrics("replayChangelog") > 0)
Expand Down