Skip to content

Commit 01f6986

Browse files
[Feature][Spark] Add tableId and vacuum type prefixes to VACUUM logs
Signed-off-by: AnudeepKonaboina <[email protected]>
1 parent 9357c6f commit 01f6986

File tree

1 file changed

+41
-20
lines changed

1 file changed

+41
-20
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
166166

167167
val snapshot = table.update()
168168
deltaLog.protocolWrite(snapshot.protocol)
169+
val tableId = snapshot.metadata.id
170+
val truncatedTableId = tableId.split("-").head
169171

170172
// VACUUM can break clones by removing files that clones still references for managed tables.
171173
// Eventually the catalog should track this dependency to avoid breaking clones,
@@ -185,6 +187,10 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
185187
val isLiteVacuumEnabled = spark.sessionState.conf.getConf(DeltaSQLConf.LITE_VACUUM_ENABLED)
186188
val defaultType = if (isLiteVacuumEnabled) VacuumType.LITE else VacuumType.FULL
187189
val vacuumType = vacuumTypeOpt.map(VacuumType.withName).getOrElse(defaultType)
190+
val vacuumTypeTag = vacuumType match {
191+
case VacuumType.FULL => log"[VACUUM_FULL] "
192+
case VacuumType.LITE => log"[VACUUM_LITE] "
193+
}
188194

189195
val snapshotTombstoneRetentionMillis = DeltaLog.tombstoneRetentionMillis(snapshot.metadata)
190196
val retentionMillis = retentionHours.flatMap { h =>
@@ -205,11 +211,14 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
205211
case Some(millis) => clock.getTimeMillis() - millis
206212
case _ => snapshot.minFileRetentionTimestamp
207213
}
208-
logInfo(log"Starting garbage collection (dryRun = " +
209-
log"${MDC(DeltaLogKeys.IS_DRY_RUN, dryRun)}) of untracked " +
210-
log"files older than ${MDC(DeltaLogKeys.DATE,
211-
new Date(deleteBeforeTimestamp).toGMTString)} in " +
212-
log"${MDC(DeltaLogKeys.PATH, path)}")
214+
logInfo(
215+
log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, truncatedTableId)}] " +
216+
vacuumTypeTag +
217+
log"Starting garbage collection (dryRun = " +
218+
log"${MDC(DeltaLogKeys.IS_DRY_RUN, dryRun)}) of untracked " +
219+
log"files older than ${MDC(DeltaLogKeys.DATE,
220+
new Date(deleteBeforeTimestamp).toGMTString)} in " +
221+
log"${MDC(DeltaLogKeys.PATH, path)}")
213222
val hadoopConf = spark.sparkContext.broadcast(
214223
new SerializableConfiguration(deltaHadoopConf))
215224
val basePath = fs.makeQualified(path).toString
@@ -356,10 +365,14 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
356365
)
357366

358367
recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
359-
logInfo(log"Found ${MDC(DeltaLogKeys.NUM_FILES, numFiles.toLong)} files " +
360-
log"(${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} bytes) and directories in " +
361-
log"a total of ${MDC(DeltaLogKeys.NUM_DIRS, dirCounts)} directories " +
362-
log"that are safe to delete. Vacuum stats: ${MDC(DeltaLogKeys.VACUUM_STATS, stats)}")
368+
logInfo(
369+
log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, truncatedTableId)}] " +
370+
vacuumTypeTag +
371+
log"Found ${MDC(DeltaLogKeys.NUM_FILES, numFiles.toLong)} files " +
372+
log"(${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} bytes) and " +
373+
log"directories in a total of " +
374+
log"${MDC(DeltaLogKeys.NUM_DIRS, dirCounts)} directories that are safe " +
375+
log"to delete. Vacuum stats: ${MDC(DeltaLogKeys.VACUUM_STATS, stats)}")
363376

364377
return diffFiles.map(f => urlEncodedStringToPath(f).toString).toDF("path")
365378
}
@@ -369,7 +382,9 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
369382
diffFiles,
370383
sizeOfDataToDelete,
371384
retentionMillis,
372-
snapshotTombstoneRetentionMillis)
385+
snapshotTombstoneRetentionMillis,
386+
truncatedTableId,
387+
vacuumTypeTag)
373388

374389
val deleteStartTime = System.currentTimeMillis()
375390
val filesDeleted = try {
@@ -410,10 +425,13 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
410425
LastVacuumInfo.persistLastVacuumInfo(
411426
LastVacuumInfo(latestCommitVersionOutsideOfRetentionWindowOpt), deltaLog)
412427

413-
logInfo(log"Deleted ${MDC(DeltaLogKeys.NUM_FILES, filesDeleted)} files " +
414-
log"(${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} bytes) and directories in " +
415-
log"a total of ${MDC(DeltaLogKeys.NUM_DIRS, dirCounts)} directories. " +
416-
log"Vacuum stats: ${MDC(DeltaLogKeys.VACUUM_STATS, stats)}")
428+
logInfo(
429+
log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, truncatedTableId)}] " +
430+
vacuumTypeTag +
431+
log"Deleted ${MDC(DeltaLogKeys.NUM_FILES, filesDeleted)} files " +
432+
log"(${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} bytes) and directories in " +
433+
log"a total of ${MDC(DeltaLogKeys.NUM_DIRS, dirCounts)} directories. " +
434+
log"Vacuum stats: ${MDC(DeltaLogKeys.VACUUM_STATS, stats)}")
417435

418436

419437
spark.createDataset(Seq(basePath)).toDF("path")
@@ -539,13 +557,17 @@ trait VacuumCommandImpl extends DeltaCommand {
539557
diff: Dataset[String],
540558
sizeOfDataToDelete: Long,
541559
specifiedRetentionMillis: Option[Long],
542-
defaultRetentionMillis: Long): Unit = {
560+
defaultRetentionMillis: Long,
561+
truncatedTableId: String,
562+
vacuumTypeLogPrefix: org.apache.spark.internal.MessageWithContext): Unit = {
543563
val deltaLog = table.deltaLog
544564
logInfo(
545-
log"Deleting untracked files and empty directories in " +
546-
log"${MDC(DeltaLogKeys.PATH, deltaLog.dataPath)}. The amount " +
547-
log"of data to be deleted is ${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} (in bytes)"
548-
)
565+
log"[tableId=${MDC(DeltaLogKeys.TABLE_ID, truncatedTableId)}] " +
566+
vacuumTypeLogPrefix +
567+
log"Deleting untracked files and empty directories in " +
568+
log"${MDC(DeltaLogKeys.PATH, deltaLog.dataPath)}. The amount " +
569+
log"of data to be deleted is " +
570+
log"${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} (in bytes)")
549571

550572
// We perform an empty commit in order to record information about the Vacuum
551573
if (shouldLogVacuum(spark, table, deltaLog.newDeltaHadoopConf(), deltaLog.dataPath)) {
@@ -939,7 +961,6 @@ trait VacuumCommandImpl extends DeltaCommand {
939961
files
940962
}
941963

942-
943964
/**
944965
* Helper to compute all valid files based on basePath and Snapshot provided.
945966
* Returns a DataFrame with a single column "path" containing all files that should be

0 commit comments

Comments
 (0)