Skip to content

Commit

Permalink
[SPARK-9700] Pick default page size more intelligently.
Browse files Browse the repository at this point in the history
Previously, we use 64MB as the default page size, which was way too big for a lot of Spark applications (especially for single node).

This patch changes it so that the default page size, if unset by the user, is determined by the number of cores available and the total execution memory available.

Author: Reynold Xin <[email protected]>

Closes apache#8012 from rxin/pagesize and squashes the following commits:

16f4756 [Reynold Xin] Fixed failing test.
5afd570 [Reynold Xin] private...
0d5fb98 [Reynold Xin] Update default value.
674a6cd [Reynold Xin] Address review feedback.
dc00e05 [Reynold Xin] Merge with master.
73ebdb6 [Reynold Xin] [SPARK-9700] Pick default page size more intelligently.
  • Loading branch information
rxin committed Aug 7, 2015
1 parent 7aaed1b commit 4309262
Show file tree
Hide file tree
Showing 20 changed files with 93 additions and 46 deletions.
2 changes: 1 addition & 1 deletion R/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FAILED=0
LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE

SPARK_TESTING=1 $FWDIR/../bin/sparkR --conf spark.buffer.pageSize=4m --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_TESTING=1 $FWDIR/../bin/sparkR --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
FAILED=$((PIPESTATUS[0]||$FAILED))

if [[ $FAILED != 0 ]]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ public UnsafeShuffleExternalSorter(
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.pageSizeBytes = (int) Math.min(
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,
conf.getSizeAsBytes("spark.buffer.pageSize", "64m"));
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, shuffleMemoryManager.pageSizeBytes());
this.maxRecordSizeBytes = pageSizeBytes - 4;
this.writeMetrics = writeMetrics;
initializeForWriting();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ public boolean putNewKey(
private void allocate(int capacity) {
assert (capacity >= 0);
// The capacity needs to be divisible by 64 so that our bit set can be sized properly
capacity = Math.max((int) Math.min(MAX_CAPACITY, nextPowerOf2(capacity)), 64);
capacity = Math.max((int) Math.min(MAX_CAPACITY, ByteArrayMethods.nextPowerOf2(capacity)), 64);
assert (capacity <= MAX_CAPACITY);
longArray = new LongArray(MemoryBlock.fromLongArray(new long[capacity * 2]));
bitset = new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64]));
Expand Down Expand Up @@ -770,10 +770,4 @@ void growAndRehash() {
timeSpentResizingNs += System.nanoTime() - resizeStartTime;
}
}

/** Returns the next number greater or equal num that is power of 2. */
private static long nextPowerOf2(long num) {
final long highBit = Long.highestOneBit(num);
return (highBit == num) ? num : highBit << 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ private UnsafeExternalSorter(
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.fileBufferSizeBytes = 32 * 1024;
// this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
this.pageSizeBytes = pageSizeBytes;
this.writeMetrics = new ShuffleWriteMetrics();

Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
Utils.byteStringAsBytes(get(key, defaultValue))
}

/**
* Get a size parameter as bytes, falling back to a default if not set.
*/
def getSizeAsBytes(key: String, defaultValue: Long): Long = {
Utils.byteStringAsBytes(get(key, defaultValue + "B"))
}

/**
* Get a size parameter as Kibibytes; throws a NoSuchElementException if it's not set. If no
* suffix is provided then Kibibytes are assumed.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* [[org.apache.spark.SparkContext.setLocalProperty]].
*/
def getLocalProperty(key: String): String =
Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
Option(localProperties.get).map(_.getProperty(key)).orNull

/** Set a human readable description of the current job. */
def setJobDescription(value: String) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ object SparkEnv extends Logging {
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val shuffleMemoryManager = new ShuffleMemoryManager(conf)
val shuffleMemoryManager = ShuffleMemoryManager.create(conf, numUsableCores)

val blockTransferService =
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package org.apache.spark.shuffle

import scala.collection.mutable

import com.google.common.annotations.VisibleForTesting

import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.{Logging, SparkException, SparkConf, TaskContext}

/**
Expand All @@ -34,11 +37,19 @@ import org.apache.spark.{Logging, SparkException, SparkConf, TaskContext}
* set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever
* this set changes. This is all done by synchronizing access on "this" to mutate state and using
* wait() and notifyAll() to signal changes.
*
* Use `ShuffleMemoryManager.create()` factory method to create a new instance.
*
* @param maxMemory total amount of memory available for execution, in bytes.
* @param pageSizeBytes number of bytes for each page, by default.
*/
private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
private val taskMemory = new mutable.HashMap[Long, Long]() // taskAttemptId -> memory bytes
private[spark]
class ShuffleMemoryManager protected (
val maxMemory: Long,
val pageSizeBytes: Long)
extends Logging {

def this(conf: SparkConf) = this(ShuffleMemoryManager.getMaxMemory(conf))
private val taskMemory = new mutable.HashMap[Long, Long]() // taskAttemptId -> memory bytes

private def currentTaskAttemptId(): Long = {
// In case this is called on the driver, return an invalid task attempt id.
Expand Down Expand Up @@ -124,15 +135,49 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
}
}


private[spark] object ShuffleMemoryManager {

def create(conf: SparkConf, numCores: Int): ShuffleMemoryManager = {
val maxMemory = ShuffleMemoryManager.getMaxMemory(conf)
val pageSize = ShuffleMemoryManager.getPageSize(conf, maxMemory, numCores)
new ShuffleMemoryManager(maxMemory, pageSize)
}

def create(maxMemory: Long, pageSizeBytes: Long): ShuffleMemoryManager = {
new ShuffleMemoryManager(maxMemory, pageSizeBytes)
}

@VisibleForTesting
def createForTesting(maxMemory: Long): ShuffleMemoryManager = {
new ShuffleMemoryManager(maxMemory, 4 * 1024 * 1024)
}

/**
* Figure out the shuffle memory limit from a SparkConf. We currently have both a fraction
* of the memory pool and a safety factor since collections can sometimes grow bigger than
* the size we target before we estimate their sizes again.
*/
def getMaxMemory(conf: SparkConf): Long = {
private def getMaxMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}

/**
* Sets the page size, in bytes.
*
* If user didn't explicitly set "spark.buffer.pageSize", we figure out the default value
* by looking at the number of cores available to the process, and the total amount of memory,
* and then divide it by a factor of safety.
*/
private def getPageSize(conf: SparkConf, maxMemory: Long, numCores: Int): Long = {
val minPageSize = 1L * 1024 * 1024 // 1MB
val maxPageSize = 64L * minPageSize // 64MB
val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors()
val safetyFactor = 8
val size = ByteArrayMethods.nextPowerOf2(maxMemory / cores / safetyFactor)
val default = math.min(maxPageSize, math.max(minPageSize, size))
conf.getSizeAsBytes("spark.buffer.pageSize", default)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void setUp() throws IOException {
taskMetrics = new TaskMetrics();

when(shuffleMemoryManager.tryToAcquire(anyLong())).then(returnsFirstArg());
when(shuffleMemoryManager.pageSizeBytes()).thenReturn(128L * 1024 * 1024);

when(blockManager.diskBlockManager()).thenReturn(diskBlockManager);
when(blockManager.getDiskWriter(
Expand Down Expand Up @@ -549,14 +550,14 @@ public void testPeakMemoryUsed() throws Exception {
final long recordLengthBytes = 8;
final long pageSizeBytes = 256;
final long numRecordsPerPage = pageSizeBytes / recordLengthBytes;
final SparkConf conf = new SparkConf().set("spark.buffer.pageSize", pageSizeBytes + "b");
when(shuffleMemoryManager.pageSizeBytes()).thenReturn(pageSizeBytes);
final UnsafeShuffleWriter<Object, Object> writer =
new UnsafeShuffleWriter<Object, Object>(
blockManager,
shuffleBlockResolver,
taskMemoryManager,
shuffleMemoryManager,
new UnsafeShuffleHandle<Object, Object>(0, 1, shuffleDep),
new UnsafeShuffleHandle<>(0, 1, shuffleDep),
0, // map id
taskContext,
conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public abstract class AbstractBytesToBytesMapSuite {

@Before
public void setup() {
shuffleMemoryManager = new ShuffleMemoryManager(Long.MAX_VALUE);
shuffleMemoryManager = ShuffleMemoryManager.create(Long.MAX_VALUE, PAGE_SIZE_BYTES);
taskMemoryManager = new TaskMemoryManager(new ExecutorMemoryManager(getMemoryAllocator()));
// Mocked memory manager for tests that check the maximum array size, since actually allocating
// such large arrays will cause us to run out of memory in our tests.
Expand Down Expand Up @@ -441,7 +441,7 @@ public void randomizedTestWithRecordsLargerThanPageSize() {

@Test
public void failureToAllocateFirstPage() {
shuffleMemoryManager = new ShuffleMemoryManager(1024);
shuffleMemoryManager = ShuffleMemoryManager.createForTesting(1024);
BytesToBytesMap map =
new BytesToBytesMap(taskMemoryManager, shuffleMemoryManager, 1, PAGE_SIZE_BYTES);
try {
Expand All @@ -461,7 +461,7 @@ public void failureToAllocateFirstPage() {

@Test
public void failureToGrow() {
shuffleMemoryManager = new ShuffleMemoryManager(1024 * 10);
shuffleMemoryManager = ShuffleMemoryManager.createForTesting(1024 * 10);
BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager, shuffleMemoryManager, 1, 1024);
try {
boolean success = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void setUp() {
MockitoAnnotations.initMocks(this);
sparkConf = new SparkConf();
tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test");
shuffleMemoryManager = new ShuffleMemoryManager(Long.MAX_VALUE);
shuffleMemoryManager = ShuffleMemoryManager.create(Long.MAX_VALUE, pageSizeBytes);
spillFilesCreated.clear();
taskContext = mock(TaskContext.class);
when(taskContext.taskMetrics()).thenReturn(new TaskMetrics());
Expand Down Expand Up @@ -237,7 +237,7 @@ public void testSortingEmptyArrays() throws Exception {

@Test
public void spillingOccursInResponseToMemoryPressure() throws Exception {
shuffleMemoryManager = new ShuffleMemoryManager(pageSizeBytes * 2);
shuffleMemoryManager = ShuffleMemoryManager.create(pageSizeBytes * 2, pageSizeBytes);
final UnsafeExternalSorter sorter = newSorter();
final int numRecords = (int) pageSizeBytes / 4;
for (int i = 0; i <= numRecords; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.mockito.Mockito._
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._

import org.apache.spark.{SparkFunSuite, TaskContext}
import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext}

class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {

Expand All @@ -50,7 +50,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
}

test("single task requesting memory") {
val manager = new ShuffleMemoryManager(1000L)
val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)

assert(manager.tryToAcquire(100L) === 100L)
assert(manager.tryToAcquire(400L) === 400L)
Expand All @@ -72,7 +72,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
// Two threads request 500 bytes first, wait for each other to get it, and then request
// 500 more; we should immediately return 0 as both are now at 1 / N

val manager = new ShuffleMemoryManager(1000L)
val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)

class State {
var t1Result1 = -1L
Expand Down Expand Up @@ -124,7 +124,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
// Two tasks request 250 bytes first, wait for each other to get it, and then request
// 500 more; we should only grant 250 bytes to each of them on this second request

val manager = new ShuffleMemoryManager(1000L)
val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)

class State {
var t1Result1 = -1L
Expand Down Expand Up @@ -176,7 +176,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
// for a bit and releases 250 bytes, which should then be granted to t2. Further requests
// by t2 will return false right away because it now has 1 / 2N of the memory.

val manager = new ShuffleMemoryManager(1000L)
val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)

class State {
var t1Requested = false
Expand Down Expand Up @@ -241,7 +241,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
// t1 grabs 1000 bytes and then waits until t2 is ready to make a request. It sleeps
// for a bit and releases all its memory. t2 should now be able to grab all the memory.

val manager = new ShuffleMemoryManager(1000L)
val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)

class State {
var t1Requested = false
Expand Down Expand Up @@ -307,7 +307,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
}

test("tasks should not be granted a negative size") {
val manager = new ShuffleMemoryManager(1000L)
val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
manager.tryToAcquire(700L)

val latch = new CountDownLatch(1)
Expand Down
1 change: 0 additions & 1 deletion python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def launch_gateway():
if os.environ.get("SPARK_TESTING"):
submit_args = ' '.join([
"--conf spark.ui.enabled=false",
"--conf spark.buffer.pageSize=4mb",
submit_args
])
command = [os.path.join(SPARK_HOME, script)] + shlex.split(submit_args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ class TungstenAggregationIterator(
TaskContext.get.taskMemoryManager(),
SparkEnv.get.shuffleMemoryManager,
1024 * 16, // initial capacity
SparkEnv.get.conf.getSizeAsBytes("spark.buffer.pageSize", "64m"),
SparkEnv.get.shuffleMemoryManager.pageSizeBytes,
false // disable tracking of performance metrics
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,17 +282,15 @@ private[joins] final class UnsafeHashedRelation(
// This is used in Broadcast, shared by multiple tasks, so we use on-heap memory
val taskMemoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP))

val pageSizeBytes = Option(SparkEnv.get).map(_.shuffleMemoryManager.pageSizeBytes)
.getOrElse(new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "16m"))

// Dummy shuffle memory manager which always grants all memory allocation requests.
// We use this because it doesn't make sense count shared broadcast variables' memory usage
// towards individual tasks' quotas. In the future, we should devise a better way of handling
// this.
val shuffleMemoryManager = new ShuffleMemoryManager(new SparkConf()) {
override def tryToAcquire(numBytes: Long): Long = numBytes
override def release(numBytes: Long): Unit = {}
}

val pageSizeBytes = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
.getSizeAsBytes("spark.buffer.pageSize", "64m")
val shuffleMemoryManager =
ShuffleMemoryManager.create(maxMemory = Long.MaxValue, pageSizeBytes = pageSizeBytes)

binaryMap = new BytesToBytesMap(
taskMemoryManager,
Expand All @@ -306,11 +304,11 @@ private[joins] final class UnsafeHashedRelation(
while (i < nKeys) {
val keySize = in.readInt()
val valuesSize = in.readInt()
if (keySize > keyBuffer.size) {
if (keySize > keyBuffer.length) {
keyBuffer = new Array[Byte](keySize)
}
in.readFully(keyBuffer, 0, keySize)
if (valuesSize > valuesBuffer.size) {
if (valuesSize > valuesBuffer.length) {
valuesBuffer = new Array[Byte](valuesSize)
}
in.readFully(valuesBuffer, 0, valuesSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.{InternalAccumulator, TaskContext}
import org.apache.spark.{SparkEnv, InternalAccumulator, TaskContext}
import org.apache.spark.rdd.{MapPartitionsWithPreparationRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
Expand Down Expand Up @@ -122,7 +122,7 @@ case class TungstenSort(
protected override def doExecute(): RDD[InternalRow] = {
val schema = child.schema
val childOutput = child.output
val pageSize = sparkContext.conf.getSizeAsBytes("spark.buffer.pageSize", "64m")
val pageSize = SparkEnv.get.shuffleMemoryManager.pageSizeBytes

/**
* Set up the sorter in each partition before computing the parent partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.shuffle.ShuffleMemoryManager
/**
* A [[ShuffleMemoryManager]] that can be controlled to run out of memory.
*/
class TestShuffleMemoryManager extends ShuffleMemoryManager(Long.MaxValue) {
class TestShuffleMemoryManager extends ShuffleMemoryManager(Long.MaxValue, 4 * 1024 * 1024) {
private var oom = false

override def tryToAcquire(numBytes: Long): Long = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ object TestHive
.set("spark.sql.test", "")
.set("spark.sql.hive.metastore.barrierPrefixes",
"org.apache.spark.sql.hive.execution.PairSerDe")
.set("spark.buffer.pageSize", "4m")
// SPARK-8910
.set("spark.ui.enabled", "false")))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ private ByteArrayMethods() {
// Private constructor, since this class only contains static methods.
}

/** Returns the next number greater or equal num that is power of 2. */
public static long nextPowerOf2(long num) {
final long highBit = Long.highestOneBit(num);
return (highBit == num) ? num : highBit << 1;
}

public static int roundNumberOfBytesToNearestWord(int numBytes) {
int remainder = numBytes & 0x07; // This is equivalent to `numBytes % 8`
if (remainder == 0) {
Expand Down

0 comments on commit 4309262

Please sign in to comment.