Skip to content

Commit

Permalink
[SPARK-50686][SQL] memory optimization in the hash to sort aggregatio…
Browse files Browse the repository at this point in the history
…n fallback scenario

adjust expected spill values in tests
  • Loading branch information
akupchinskiy committed Dec 27, 2024
1 parent 7a4114c commit e6b8e8e
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -964,8 +964,7 @@ public void reset() {
updatePeakMemoryUsed();
numKeys = 0;
numValues = 0;
freeArray(longArray);
longArray = null;
freeInternalArray();
while (dataPages.size() > 0) {
MemoryBlock dataPage = dataPages.removeLast();
freePage(dataPage);
Expand All @@ -976,6 +975,17 @@ public void reset() {
pageCursor = 0;
}

/**
* Free array memory to reduce the memory footprint in case of a fallback
* from a hash-based aggregation to the sort-based one.
*/
public void freeInternalArray() {
if (longArray != null) {
freeArray(longArray);
longArray = null;
}
}

/**
* Grows the size of the hash table and re-hash everything.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ public UnsafeKVExternalSorter(
// so that we can always reuse the pointer array.
if (map.numValues() > pointerArray.size() / 4) {
// Here we ask the map to allocate memory, so that the memory manager won't ask the map
// to spill, if the memory is not enough.
// to spill, if the memory is not enough. Also, we free the redundant internal map array
// to reduce the overall memory footprint.
map.freeInternalArray();
pointerArray = map.allocateArray(map.numValues() * 4L);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSparkSession
try {
val context = new TaskContextImpl(0, 0, 0, 0, 0, 1, taskMemoryManager, new Properties(), null)
TaskContext.setTaskContext(context)
val expectedSpillSize = map.getTotalMemoryConsumption
val expectedSpillSize = expectedSpillSizeForMapWithDuplicateKeys(map)
val sorter = new UnsafeKVExternalSorter(
schema,
schema,
Expand All @@ -267,7 +267,8 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSparkSession
try {
val context = new TaskContextImpl(0, 0, 0, 0, 0, 1, taskMemoryManager, new Properties(), null)
TaskContext.setTaskContext(context)
val expectedSpillSize = map1.getTotalMemoryConsumption + map2.getTotalMemoryConsumption
val expectedSpillSize = expectedSpillSizeForMapWithDuplicateKeys(map1) +
expectedSpillSizeForMapWithDuplicateKeys(map2)
val sorter1 = new UnsafeKVExternalSorter(
schema,
schema,
Expand Down Expand Up @@ -309,4 +310,10 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSparkSession
}
map
}

private def expectedSpillSizeForMapWithDuplicateKeys(map: BytesToBytesMap): Long = {
val internalArrayMemoryUsed = Option(map.getArray).map(_.memoryBlock().size()).getOrElse(0L)
map.getTotalMemoryConsumption - internalArrayMemoryUsed
}

}

0 comments on commit e6b8e8e

Please sign in to comment.