Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {

val snapshot = table.update()
deltaLog.protocolWrite(snapshot.protocol)
val tableId = snapshot.metadata.id
val truncatedTableId = tableId.split("-").head

// VACUUM can break clones by removing files that clones still references for managed tables.
// Eventually the catalog should track this dependency to avoid breaking clones,
Expand All @@ -185,6 +187,10 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
val isLiteVacuumEnabled = spark.sessionState.conf.getConf(DeltaSQLConf.LITE_VACUUM_ENABLED)
val defaultType = if (isLiteVacuumEnabled) VacuumType.LITE else VacuumType.FULL
val vacuumType = vacuumTypeOpt.map(VacuumType.withName).getOrElse(defaultType)
val vacuumTypeTag = vacuumType match {
case VacuumType.FULL => log"[VACUUM_FULL] "
case VacuumType.LITE => log"[VACUUM_LITE] "
}

val snapshotTombstoneRetentionMillis = DeltaLog.tombstoneRetentionMillis(snapshot.metadata)
val retentionMillis = retentionHours.flatMap { h =>
Expand All @@ -205,11 +211,14 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
case Some(millis) => clock.getTimeMillis() - millis
case _ => snapshot.minFileRetentionTimestamp
}
logInfo(log"Starting garbage collection (dryRun = " +
log"${MDC(DeltaLogKeys.IS_DRY_RUN, dryRun)}) of untracked " +
log"files older than ${MDC(DeltaLogKeys.DATE,
new Date(deleteBeforeTimestamp).toGMTString)} in " +
log"${MDC(DeltaLogKeys.PATH, path)}")
logInfo(
log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, truncatedTableId)}] " +
vacuumTypeTag +
log"Starting garbage collection (dryRun = " +
log"${MDC(DeltaLogKeys.IS_DRY_RUN, dryRun)}) of untracked " +
log"files older than ${MDC(DeltaLogKeys.DATE,
new Date(deleteBeforeTimestamp).toGMTString)} in " +
log"${MDC(DeltaLogKeys.PATH, path)}")
val hadoopConf = spark.sparkContext.broadcast(
new SerializableConfiguration(deltaHadoopConf))
val basePath = fs.makeQualified(path).toString
Expand Down Expand Up @@ -356,10 +365,14 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
)

recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
logInfo(log"Found ${MDC(DeltaLogKeys.NUM_FILES, numFiles.toLong)} files " +
log"(${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} bytes) and directories in " +
log"a total of ${MDC(DeltaLogKeys.NUM_DIRS, dirCounts)} directories " +
log"that are safe to delete. Vacuum stats: ${MDC(DeltaLogKeys.VACUUM_STATS, stats)}")
logInfo(
log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, truncatedTableId)}] " +
vacuumTypeTag +
log"Found ${MDC(DeltaLogKeys.NUM_FILES, numFiles.toLong)} files " +
log"(${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} bytes) and " +
log"directories in a total of " +
log"${MDC(DeltaLogKeys.NUM_DIRS, dirCounts)} directories that are safe " +
log"to delete. Vacuum stats: ${MDC(DeltaLogKeys.VACUUM_STATS, stats)}")

return diffFiles.map(f => urlEncodedStringToPath(f).toString).toDF("path")
}
Expand All @@ -369,7 +382,9 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
diffFiles,
sizeOfDataToDelete,
retentionMillis,
snapshotTombstoneRetentionMillis)
snapshotTombstoneRetentionMillis,
truncatedTableId,
vacuumTypeTag)

val deleteStartTime = System.currentTimeMillis()
val filesDeleted = try {
Expand Down Expand Up @@ -410,10 +425,13 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
LastVacuumInfo.persistLastVacuumInfo(
LastVacuumInfo(latestCommitVersionOutsideOfRetentionWindowOpt), deltaLog)

logInfo(log"Deleted ${MDC(DeltaLogKeys.NUM_FILES, filesDeleted)} files " +
log"(${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} bytes) and directories in " +
log"a total of ${MDC(DeltaLogKeys.NUM_DIRS, dirCounts)} directories. " +
log"Vacuum stats: ${MDC(DeltaLogKeys.VACUUM_STATS, stats)}")
logInfo(
log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, truncatedTableId)}] " +
vacuumTypeTag +
log"Deleted ${MDC(DeltaLogKeys.NUM_FILES, filesDeleted)} files " +
log"(${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} bytes) and directories in " +
log"a total of ${MDC(DeltaLogKeys.NUM_DIRS, dirCounts)} directories. " +
log"Vacuum stats: ${MDC(DeltaLogKeys.VACUUM_STATS, stats)}")


spark.createDataset(Seq(basePath)).toDF("path")
Expand Down Expand Up @@ -539,13 +557,17 @@ trait VacuumCommandImpl extends DeltaCommand {
diff: Dataset[String],
sizeOfDataToDelete: Long,
specifiedRetentionMillis: Option[Long],
defaultRetentionMillis: Long): Unit = {
defaultRetentionMillis: Long,
truncatedTableId: String,
vacuumTypeLogPrefix: org.apache.spark.internal.MessageWithContext): Unit = {
val deltaLog = table.deltaLog
logInfo(
log"Deleting untracked files and empty directories in " +
log"${MDC(DeltaLogKeys.PATH, deltaLog.dataPath)}. The amount " +
log"of data to be deleted is ${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} (in bytes)"
)
log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, truncatedTableId)}] " +
vacuumTypeLogPrefix +
log"Deleting untracked files and empty directories in " +
log"${MDC(DeltaLogKeys.PATH, deltaLog.dataPath)}. The amount " +
log"of data to be deleted is " +
log"${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} (in bytes)")

// We perform an empty commit in order to record information about the Vacuum
if (shouldLogVacuum(spark, table, deltaLog.newDeltaHadoopConf(), deltaLog.dataPath)) {
Expand Down Expand Up @@ -939,7 +961,6 @@ trait VacuumCommandImpl extends DeltaCommand {
files
}


/**
* Helper to compute all valid files based on basePath and Snapshot provided.
* Returns a DataFrame with a single column "path" containing all files that should be
Expand Down
Loading