diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala index 34a1d7f9699..f2615a952fe 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -166,6 +166,8 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { val snapshot = table.update() deltaLog.protocolWrite(snapshot.protocol) + val tableId = snapshot.metadata.id + val truncatedTableId = tableId.split("-").head // VACUUM can break clones by removing files that clones still references for managed tables. // Eventually the catalog should track this dependency to avoid breaking clones, @@ -185,6 +187,10 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { val isLiteVacuumEnabled = spark.sessionState.conf.getConf(DeltaSQLConf.LITE_VACUUM_ENABLED) val defaultType = if (isLiteVacuumEnabled) VacuumType.LITE else VacuumType.FULL val vacuumType = vacuumTypeOpt.map(VacuumType.withName).getOrElse(defaultType) + val vacuumTypeTag = vacuumType match { + case VacuumType.FULL => log"[VACUUM_FULL] " + case VacuumType.LITE => log"[VACUUM_LITE] " + } val snapshotTombstoneRetentionMillis = DeltaLog.tombstoneRetentionMillis(snapshot.metadata) val retentionMillis = retentionHours.flatMap { h => @@ -205,11 +211,14 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { case Some(millis) => clock.getTimeMillis() - millis case _ => snapshot.minFileRetentionTimestamp } - logInfo(log"Starting garbage collection (dryRun = " + - log"${MDC(DeltaLogKeys.IS_DRY_RUN, dryRun)}) of untracked " + - log"files older than ${MDC(DeltaLogKeys.DATE, - new Date(deleteBeforeTimestamp).toGMTString)} in " + - log"${MDC(DeltaLogKeys.PATH, path)}") + logInfo( + log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, truncatedTableId)}] " + + vacuumTypeTag + + log"Starting garbage collection (dryRun = " + + log"${MDC(DeltaLogKeys.IS_DRY_RUN, dryRun)}) of untracked " + + log"files older than ${MDC(DeltaLogKeys.DATE, + new Date(deleteBeforeTimestamp).toGMTString)} in " + + log"${MDC(DeltaLogKeys.PATH, path)}") val hadoopConf = spark.sparkContext.broadcast( new SerializableConfiguration(deltaHadoopConf)) val basePath = fs.makeQualified(path).toString @@ -356,10 +365,14 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { ) recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats) - logInfo(log"Found ${MDC(DeltaLogKeys.NUM_FILES, numFiles.toLong)} files " + - log"(${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} bytes) and directories in " + - log"a total of ${MDC(DeltaLogKeys.NUM_DIRS, dirCounts)} directories " + - log"that are safe to delete. Vacuum stats: ${MDC(DeltaLogKeys.VACUUM_STATS, stats)}") + logInfo( + log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, truncatedTableId)}] " + + vacuumTypeTag + + log"Found ${MDC(DeltaLogKeys.NUM_FILES, numFiles.toLong)} files " + + log"(${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} bytes) and " + + log"directories in a total of " + + log"${MDC(DeltaLogKeys.NUM_DIRS, dirCounts)} directories that are safe " + + log"to delete. Vacuum stats: ${MDC(DeltaLogKeys.VACUUM_STATS, stats)}") return diffFiles.map(f => urlEncodedStringToPath(f).toString).toDF("path") } @@ -369,7 +382,9 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { diffFiles, sizeOfDataToDelete, retentionMillis, - snapshotTombstoneRetentionMillis) + snapshotTombstoneRetentionMillis, + truncatedTableId, + vacuumTypeTag) val deleteStartTime = System.currentTimeMillis() val filesDeleted = try { @@ -410,10 +425,13 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { LastVacuumInfo.persistLastVacuumInfo( LastVacuumInfo(latestCommitVersionOutsideOfRetentionWindowOpt), deltaLog) - logInfo(log"Deleted ${MDC(DeltaLogKeys.NUM_FILES, filesDeleted)} files " + - log"(${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} bytes) and directories in " + - log"a total of ${MDC(DeltaLogKeys.NUM_DIRS, dirCounts)} directories. " + - log"Vacuum stats: ${MDC(DeltaLogKeys.VACUUM_STATS, stats)}") + logInfo( + log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, truncatedTableId)}] " + + vacuumTypeTag + + log"Deleted ${MDC(DeltaLogKeys.NUM_FILES, filesDeleted)} files " + + log"(${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} bytes) and directories in " + + log"a total of ${MDC(DeltaLogKeys.NUM_DIRS, dirCounts)} directories. " + + log"Vacuum stats: ${MDC(DeltaLogKeys.VACUUM_STATS, stats)}") spark.createDataset(Seq(basePath)).toDF("path") @@ -539,13 +557,17 @@ trait VacuumCommandImpl extends DeltaCommand { diff: Dataset[String], sizeOfDataToDelete: Long, specifiedRetentionMillis: Option[Long], - defaultRetentionMillis: Long): Unit = { + defaultRetentionMillis: Long, + truncatedTableId: String, + vacuumTypeLogPrefix: org.apache.spark.internal.MessageWithContext): Unit = { val deltaLog = table.deltaLog logInfo( - log"Deleting untracked files and empty directories in " + - log"${MDC(DeltaLogKeys.PATH, deltaLog.dataPath)}. The amount " + - log"of data to be deleted is ${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} (in bytes)" - ) + log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, truncatedTableId)}] " + + vacuumTypeLogPrefix + + log"Deleting untracked files and empty directories in " + + log"${MDC(DeltaLogKeys.PATH, deltaLog.dataPath)}. The amount " + + log"of data to be deleted is " + + log"${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} (in bytes)") // We perform an empty commit in order to record information about the Vacuum if (shouldLogVacuum(spark, table, deltaLog.newDeltaHadoopConf(), deltaLog.dataPath)) { @@ -939,7 +961,6 @@ trait VacuumCommandImpl extends DeltaCommand { files } - /** * Helper to compute all valid files based on basePath and Snapshot provided. * Returns a DataFrame with a single column "path" containing all files that should be