Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50655][SS] Move virtual col family related mapping into db layer instead of encoder #49304

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
6ee1ad5
[SPARK-50655] Move virt col family related mapping into db layer inst…
anishshri-db Dec 27, 2024
1eea64a
Misc fix
anishshri-db Dec 27, 2024
3f341fb
Misc change
anishshri-db Dec 27, 2024
f7dba53
Merge branch 'master' into task/SPARK-50655
anishshri-db Dec 27, 2024
30c2bd7
Misc fix
anishshri-db Dec 27, 2024
1c07d73
Misc fix
anishshri-db Dec 27, 2024
a156092
Misc fix
anishshri-db Dec 27, 2024
2f026d0
Fix test
anishshri-db Dec 28, 2024
09f0b6c
Fix test
anishshri-db Dec 28, 2024
4deab17
Add comment
anishshri-db Dec 28, 2024
4b93a40
Misc fix
anishshri-db Dec 28, 2024
0027123
Misc fix
anishshri-db Dec 30, 2024
c82b370
Fix
anishshri-db Dec 30, 2024
cb32482
Fix
anishshri-db Dec 30, 2024
1c427c1
Fix CI
anishshri-db Dec 30, 2024
f9439a2
Fix test
anishshri-db Dec 31, 2024
0ee0796
Update
anishshri-db Dec 31, 2024
d5c6111
Fix test
anishshri-db Jan 1, 2025
e2a2121
Fix test
anishshri-db Jan 2, 2025
600389e
Add comments
anishshri-db Jan 2, 2025
ddf1770
Address Eric's comments
anishshri-db Jan 2, 2025
7d21451
Small fix
anishshri-db Jan 2, 2025
8b7eadc
Fix test
anishshri-db Jan 3, 2025
5a4ae97
Merge branch 'master' into task/SPARK-50655
anishshri-db Jan 21, 2025
8ffb78b
Fix test
anishshri-db Jan 21, 2025
6f0eac0
Fix test
anishshri-db Jan 21, 2025
f6b5c0c
Address review comments
anishshri-db Jan 21, 2025
039d09d
Merge branch 'master' into task/SPARK-50655
anishshri-db Feb 4, 2025
a0f599b
Misc fix
anishshri-db Feb 4, 2025
509ab21
Merge branch 'master' into task/SPARK-50655
anishshri-db Feb 7, 2025
c0057b9
Fix comment
anishshri-db Feb 7, 2025
fa95eee
Fix issue
anishshri-db Feb 8, 2025
72bb09f
Address comments
anishshri-db Feb 14, 2025
b576cdc
Add test
anishshri-db Feb 14, 2025
cea9c74
Address comments
anishshri-db Feb 18, 2025
0880fda
Address comments
anishshri-db Feb 18, 2025
e51d37d
Misc fix
anishshri-db Feb 18, 2025
732e13c
Fix issue
anishshri-db Feb 18, 2025
e55a99d
Fix test
anishshri-db Feb 18, 2025
09d0ee0
Fix issue
anishshri-db Feb 19, 2025
9088b36
Fix test
anishshri-db Feb 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -248,17 +248,37 @@ class RocksDBFileManager(
checkpointDir: File,
version: Long,
numKeys: Long,
numInternalKeys: Long,
fileMapping: Map[String, RocksDBSnapshotFile],
columnFamilyMapping: Option[Map[String, Short]] = None,
columnFamilyMapping: Option[Map[String, ColumnFamilyInfo]] = None,
maxColumnFamilyId: Option[Short] = None,
checkpointUniqueId: Option[String] = None): Unit = {
logFilesInDir(checkpointDir, log"Saving checkpoint files " +
log"for version ${MDC(LogKeys.VERSION_NUM, version)}")
val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir)
val rocksDBFiles = saveImmutableFilesToDfs(
version, localImmutableFiles, fileMapping, checkpointUniqueId)
val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys, columnFamilyMapping,
maxColumnFamilyId)

val colFamilyIdMapping: Option[Map[String, Short]] = if (columnFamilyMapping.isDefined) {
Some(columnFamilyMapping.get.map {
case (cfName, cfInfo) =>
cfName -> cfInfo.cfId
})
} else {
None
}

val colFamilyTypeMapping: Option[Map[String, Boolean]] = if (columnFamilyMapping.isDefined) {
Some(columnFamilyMapping.get.map {
case (cfName, cfInfo) =>
cfName -> cfInfo.isInternal
})
} else {
None
}

val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys, numInternalKeys,
colFamilyIdMapping, colFamilyTypeMapping, maxColumnFamilyId)
val metadataFile = localMetadataFile(checkpointDir)
metadata.writeToFile(metadataFile)
logInfo(log"Written metadata for version ${MDC(LogKeys.VERSION_NUM, version)}:\n" +
Expand Down Expand Up @@ -923,6 +943,15 @@ object RocksDBFileManagerMetrics {
val EMPTY_METRICS = RocksDBFileManagerMetrics(0L, 0L, 0L, None)
}

/**
* Case class to keep track of column family info within checkpoint metadata.
* @param cfId - virtual column family id
* @param isInternal - whether the column family is internal or not
*/
case class ColumnFamilyInfo(
cfId: Short,
isInternal: Boolean)

/**
* Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
* changes to this MUST be backward-compatible.
Expand All @@ -931,7 +960,9 @@ case class RocksDBCheckpointMetadata(
sstFiles: Seq[RocksDBSstFile],
logFiles: Seq[RocksDBLogFile],
numKeys: Long,
numInternalKeys: Long,
columnFamilyMapping: Option[Map[String, Short]] = None,
columnFamilyTypeMap: Option[Map[String, Boolean]] = None,
maxColumnFamilyId: Option[Short] = None) {

require(columnFamilyMapping.isDefined == maxColumnFamilyId.isDefined,
Expand Down Expand Up @@ -997,6 +1028,7 @@ object RocksDBCheckpointMetadata {
sstFiles.map(_.asInstanceOf[RocksDBSstFile]),
logFiles.map(_.asInstanceOf[RocksDBLogFile]),
numKeys,
0,
None,
None
)
Expand All @@ -1005,14 +1037,18 @@ object RocksDBCheckpointMetadata {
def apply(
rocksDBFiles: Seq[RocksDBImmutableFile],
numKeys: Long,
numInternalKeys: Long,
columnFamilyMapping: Option[Map[String, Short]],
columnFamilyTypeMap: Option[Map[String, Boolean]],
maxColumnFamilyId: Option[Short]): RocksDBCheckpointMetadata = {
val (sstFiles, logFiles) = rocksDBFiles.partition(_.isInstanceOf[RocksDBSstFile])
new RocksDBCheckpointMetadata(
sstFiles.map(_.asInstanceOf[RocksDBSstFile]),
logFiles.map(_.asInstanceOf[RocksDBLogFile]),
numKeys,
numInternalKeys,
columnFamilyMapping,
columnFamilyTypeMap,
maxColumnFamilyId
)
}
Expand All @@ -1022,21 +1058,25 @@ object RocksDBCheckpointMetadata {
sstFiles: Seq[RocksDBSstFile],
logFiles: Seq[RocksDBLogFile],
numKeys: Long): RocksDBCheckpointMetadata = {
new RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys, None, None)
new RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys, 0, None, None)
}

// Apply method for cases with column family information
def apply(
rocksDBFiles: Seq[RocksDBImmutableFile],
numKeys: Long,
numInternalKeys: Long,
columnFamilyMapping: Map[String, Short],
columnFamilyTypeMap: Map[String, Boolean],
maxColumnFamilyId: Short): RocksDBCheckpointMetadata = {
val (sstFiles, logFiles) = rocksDBFiles.partition(_.isInstanceOf[RocksDBSstFile])
new RocksDBCheckpointMetadata(
sstFiles.map(_.asInstanceOf[RocksDBSstFile]),
logFiles.map(_.asInstanceOf[RocksDBLogFile]),
numKeys,
numInternalKeys,
Some(columnFamilyMapping),
Some(columnFamilyTypeMap),
Some(maxColumnFamilyId)
)
}
Expand All @@ -1046,13 +1086,17 @@ object RocksDBCheckpointMetadata {
sstFiles: Seq[RocksDBSstFile],
logFiles: Seq[RocksDBLogFile],
numKeys: Long,
numInternalKeys: Long,
columnFamilyMapping: Map[String, Short],
columnFamilyTypeMap: Map[String, Boolean],
maxColumnFamilyId: Short): RocksDBCheckpointMetadata = {
new RocksDBCheckpointMetadata(
sstFiles,
logFiles,
numKeys,
numInternalKeys,
Some(columnFamilyMapping),
Some(columnFamilyTypeMap),
Some(maxColumnFamilyId)
)
}
Expand Down
Loading