From 9d1df953599d9ebdf2655956868b318e76b04c21 Mon Sep 17 00:00:00 2001 From: Vlad Rozov Date: Tue, 17 Dec 2024 20:07:53 -0800 Subject: [PATCH] [SPARK-50639][SQL] Improve warning logging in CacheManager --- .../spark/sql/execution/CacheManager.scala | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index a3382c83e1f20..de003025152d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -126,7 +126,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { if (storageLevel == StorageLevel.NONE) { // Do nothing for StorageLevel.NONE since it will not actually cache any data. } else if (lookupCachedDataInternal(normalizedPlan).nonEmpty) { - logWarning("Asked to cache already cached data.") + logWarning(log"An attempt was made to cache data even though the data had already been " + + log"cached. Please un-cache data or clear cache first.\nLogical plan:\n" + + log"${MDC(QUERY_PLAN, normalizedPlan)}") } else { val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark) val inMemoryRelation = sessionWithConfigsOff.withActive { @@ -140,7 +142,8 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { this.synchronized { if (lookupCachedDataInternal(normalizedPlan).nonEmpty) { - logWarning("Data has already been cached.") + logWarning(log"Data has already been cached. No new data is cached.\nLogical plan:\n" + + log"${MDC(QUERY_PLAN, normalizedPlan)}") } else { // the cache key is the normalized plan val cd = CachedData(normalizedPlan, inMemoryRelation) @@ -206,7 +209,10 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { plan: LogicalPlan, cascade: Boolean, blocking: Boolean): Unit = { - uncacheByCondition(spark, _.sameResult(plan), cascade, blocking) + if (!uncacheByCondition(spark, _.sameResult(plan), cascade, blocking)) { + logWarning(log"Data has not been previously cached or it was removed from the " + + log"cache already.\nLogical plan:\n${MDC(QUERY_PLAN, plan)}") + } } def uncacheTableOrView(spark: SparkSession, name: Seq[String], cascade: Boolean): Unit = { @@ -241,16 +247,19 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { spark: SparkSession, isMatchedPlan: LogicalPlan => Boolean, cascade: Boolean, - blocking: Boolean): Unit = { + blocking: Boolean): Boolean = { val shouldRemove: LogicalPlan => Boolean = if (cascade) { _.exists(isMatchedPlan) } else { isMatchedPlan } - val plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan)) + var plansToUncache: IndexedSeq[CachedData] = null this.synchronized { - cachedData = cachedData.filterNot(cd => plansToUncache.exists(_ eq cd)) + plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan)) + if (plansToUncache.nonEmpty) { + cachedData = cachedData.filterNot(cd => plansToUncache.exists(_ eq cd)) + } } plansToUncache.foreach { _.cachedRepresentation.cacheBuilder.clearCache(blocking) } CacheManager.logCacheOperation(log"Removed ${MDC(SIZE, plansToUncache.size)} Dataframe " + @@ -276,6 +285,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { cd.plan.exists(isMatchedPlan) && !cacheAlreadyLoaded }) } + plansToUncache.nonEmpty } // Analyzes column statistics in the given cache data