Skip to content

Commit

Permalink
[SPARK-50639][SQL] Improve warning logging in CacheManager
Browse files Browse the repository at this point in the history
  • Loading branch information
vrozov committed Dec 23, 2024
1 parent 7cd5c4a commit 9d1df95
Showing 1 changed file with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
if (storageLevel == StorageLevel.NONE) {
// Do nothing for StorageLevel.NONE since it will not actually cache any data.
} else if (lookupCachedDataInternal(normalizedPlan).nonEmpty) {
logWarning("Asked to cache already cached data.")
logWarning(log"An attempt was made to cache data even though the data had already been " +
log"cached. Please un-cache data or clear cache first.\nLogical plan:\n" +
log"${MDC(QUERY_PLAN, normalizedPlan)}")
} else {
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
val inMemoryRelation = sessionWithConfigsOff.withActive {
Expand All @@ -140,7 +142,8 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {

this.synchronized {
if (lookupCachedDataInternal(normalizedPlan).nonEmpty) {
logWarning("Data has already been cached.")
logWarning(log"Data has already been cached. No new data is cached.\nLogical plan:\n" +
log"${MDC(QUERY_PLAN, normalizedPlan)}")
} else {
// the cache key is the normalized plan
val cd = CachedData(normalizedPlan, inMemoryRelation)
Expand Down Expand Up @@ -206,7 +209,10 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
plan: LogicalPlan,
cascade: Boolean,
blocking: Boolean): Unit = {
uncacheByCondition(spark, _.sameResult(plan), cascade, blocking)
if (!uncacheByCondition(spark, _.sameResult(plan), cascade, blocking)) {
logWarning(log"Data has not been previously cached or it was removed from the " +
log"cache already.\nLogical plan:\n${MDC(QUERY_PLAN, plan)}")
}
}

def uncacheTableOrView(spark: SparkSession, name: Seq[String], cascade: Boolean): Unit = {
Expand Down Expand Up @@ -241,16 +247,19 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
spark: SparkSession,
isMatchedPlan: LogicalPlan => Boolean,
cascade: Boolean,
blocking: Boolean): Unit = {
blocking: Boolean): Boolean = {
val shouldRemove: LogicalPlan => Boolean =
if (cascade) {
_.exists(isMatchedPlan)
} else {
isMatchedPlan
}
val plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan))
var plansToUncache: IndexedSeq[CachedData] = null
this.synchronized {
cachedData = cachedData.filterNot(cd => plansToUncache.exists(_ eq cd))
plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan))
if (plansToUncache.nonEmpty) {
cachedData = cachedData.filterNot(cd => plansToUncache.exists(_ eq cd))
}
}
plansToUncache.foreach { _.cachedRepresentation.cacheBuilder.clearCache(blocking) }
CacheManager.logCacheOperation(log"Removed ${MDC(SIZE, plansToUncache.size)} Dataframe " +
Expand All @@ -276,6 +285,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
cd.plan.exists(isMatchedPlan) && !cacheAlreadyLoaded
})
}
plansToUncache.nonEmpty
}

// Analyzes column statistics in the given cache data
Expand Down

0 comments on commit 9d1df95

Please sign in to comment.