aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-08-06 23:18:29 -0700
committerReynold Xin <rxin@databricks.com>2015-08-06 23:18:29 -0700
commit4309262ec9146d7158ee9957a128bb152289d557 (patch)
tree47f6dabaaea5c5fcff48dcbdb114aede873ae320
parent7aaed1b114751a24835204b8c588533d5c5ffaf0 (diff)
downloadspark-4309262ec9146d7158ee9957a128bb152289d557.tar.gz
spark-4309262ec9146d7158ee9957a128bb152289d557.tar.bz2
spark-4309262ec9146d7158ee9957a128bb152289d557.zip
[SPARK-9700] Pick default page size more intelligently.
Previously, we use 64MB as the default page size, which was way too big for a lot of Spark applications (especially for single node). This patch changes it so that the default page size, if unset by the user, is determined by the number of cores available and the total execution memory available. Author: Reynold Xin <rxin@databricks.com> Closes #8012 from rxin/pagesize and squashes the following commits: 16f4756 [Reynold Xin] Fixed failing test. 5afd570 [Reynold Xin] private... 0d5fb98 [Reynold Xin] Update default value. 674a6cd [Reynold Xin] Address review feedback. dc00e05 [Reynold Xin] Merge with master. 73ebdb6 [Reynold Xin] [SPARK-9700] Pick default page size more intelligently.
-rwxr-xr-xR/run-tests.sh2
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java3
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java8
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java1
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala53
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java5
-rw-r--r--core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java6
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java4
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala14
-rw-r--r--python/pyspark/java_gateway.py1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala)0
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala1
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java6
20 files changed, 93 insertions, 46 deletions
diff --git a/R/run-tests.sh b/R/run-tests.sh
index 18a1e13bdc..e82ad0ba2c 100755
--- a/R/run-tests.sh
+++ b/R/run-tests.sh
@@ -23,7 +23,7 @@ FAILED=0
LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE
-SPARK_TESTING=1 $FWDIR/../bin/sparkR --conf spark.buffer.pageSize=4m --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
+SPARK_TESTING=1 $FWDIR/../bin/sparkR --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
FAILED=$((PIPESTATUS[0]||$FAILED))
if [[ $FAILED != 0 ]]; then
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
index bf4eaa59ff..f6e0913a7a 100644
--- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
@@ -115,8 +115,7 @@ final class UnsafeShuffleExternalSorter {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.pageSizeBytes = (int) Math.min(
- PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,
- conf.getSizeAsBytes("spark.buffer.pageSize", "64m"));
+ PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, shuffleMemoryManager.pageSizeBytes());
this.maxRecordSizeBytes = pageSizeBytes - 4;
this.writeMetrics = writeMetrics;
initializeForWriting();
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 5ac3736ac6..0636ae7c8d 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -642,7 +642,7 @@ public final class BytesToBytesMap {
private void allocate(int capacity) {
assert (capacity >= 0);
// The capacity needs to be divisible by 64 so that our bit set can be sized properly
- capacity = Math.max((int) Math.min(MAX_CAPACITY, nextPowerOf2(capacity)), 64);
+ capacity = Math.max((int) Math.min(MAX_CAPACITY, ByteArrayMethods.nextPowerOf2(capacity)), 64);
assert (capacity <= MAX_CAPACITY);
longArray = new LongArray(MemoryBlock.fromLongArray(new long[capacity * 2]));
bitset = new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64]));
@@ -770,10 +770,4 @@ public final class BytesToBytesMap {
timeSpentResizingNs += System.nanoTime() - resizeStartTime;
}
}
-
- /** Returns the next number greater or equal num that is power of 2. */
- private static long nextPowerOf2(long num) {
- final long highBit = Long.highestOneBit(num);
- return (highBit == num) ? num : highBit << 1;
- }
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 4c54ba4bce..5ebbf9b068 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -127,7 +127,6 @@ public final class UnsafeExternalSorter {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.fileBufferSizeBytes = 32 * 1024;
- // this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
this.pageSizeBytes = pageSizeBytes;
this.writeMetrics = new ShuffleWriteMetrics();
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 08bab4bf27..8ff154fb5e 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -250,6 +250,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
/**
+ * Get a size parameter as bytes, falling back to a default if not set.
+ */
+ def getSizeAsBytes(key: String, defaultValue: Long): Long = {
+ Utils.byteStringAsBytes(get(key, defaultValue + "B"))
+ }
+
+ /**
* Get a size parameter as Kibibytes; throws a NoSuchElementException if it's not set. If no
* suffix is provided then Kibibytes are assumed.
* @throws NoSuchElementException
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 0c0705325b..5662686436 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -629,7 +629,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* [[org.apache.spark.SparkContext.setLocalProperty]].
*/
def getLocalProperty(key: String): String =
- Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
+ Option(localProperties.get).map(_.getProperty(key)).orNull
/** Set a human readable description of the current job. */
def setJobDescription(value: String) {
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index adfece4d6e..a796e72850 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -324,7 +324,7 @@ object SparkEnv extends Logging {
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
- val shuffleMemoryManager = new ShuffleMemoryManager(conf)
+ val shuffleMemoryManager = ShuffleMemoryManager.create(conf, numUsableCores)
val blockTransferService =
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
index e3d229cc99..8c3a72644c 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
@@ -19,6 +19,9 @@ package org.apache.spark.shuffle
import scala.collection.mutable
+import com.google.common.annotations.VisibleForTesting
+
+import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.{Logging, SparkException, SparkConf, TaskContext}
/**
@@ -34,11 +37,19 @@ import org.apache.spark.{Logging, SparkException, SparkConf, TaskContext}
* set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever
* this set changes. This is all done by synchronizing access on "this" to mutate state and using
* wait() and notifyAll() to signal changes.
+ *
+ * Use `ShuffleMemoryManager.create()` factory method to create a new instance.
+ *
+ * @param maxMemory total amount of memory available for execution, in bytes.
+ * @param pageSizeBytes number of bytes for each page, by default.
*/
-private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
- private val taskMemory = new mutable.HashMap[Long, Long]() // taskAttemptId -> memory bytes
+private[spark]
+class ShuffleMemoryManager protected (
+ val maxMemory: Long,
+ val pageSizeBytes: Long)
+ extends Logging {
- def this(conf: SparkConf) = this(ShuffleMemoryManager.getMaxMemory(conf))
+ private val taskMemory = new mutable.HashMap[Long, Long]() // taskAttemptId -> memory bytes
private def currentTaskAttemptId(): Long = {
// In case this is called on the driver, return an invalid task attempt id.
@@ -124,15 +135,49 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
}
}
+
private[spark] object ShuffleMemoryManager {
+
+ def create(conf: SparkConf, numCores: Int): ShuffleMemoryManager = {
+ val maxMemory = ShuffleMemoryManager.getMaxMemory(conf)
+ val pageSize = ShuffleMemoryManager.getPageSize(conf, maxMemory, numCores)
+ new ShuffleMemoryManager(maxMemory, pageSize)
+ }
+
+ def create(maxMemory: Long, pageSizeBytes: Long): ShuffleMemoryManager = {
+ new ShuffleMemoryManager(maxMemory, pageSizeBytes)
+ }
+
+ @VisibleForTesting
+ def createForTesting(maxMemory: Long): ShuffleMemoryManager = {
+ new ShuffleMemoryManager(maxMemory, 4 * 1024 * 1024)
+ }
+
/**
* Figure out the shuffle memory limit from a SparkConf. We currently have both a fraction
* of the memory pool and a safety factor since collections can sometimes grow bigger than
* the size we target before we estimate their sizes again.
*/
- def getMaxMemory(conf: SparkConf): Long = {
+ private def getMaxMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}
+
+ /**
+ * Sets the page size, in bytes.
+ *
+ * If user didn't explicitly set "spark.buffer.pageSize", we figure out the default value
+ * by looking at the number of cores available to the process, and the total amount of memory,
+ * and then divide it by a factor of safety.
+ */
+ private def getPageSize(conf: SparkConf, maxMemory: Long, numCores: Int): Long = {
+ val minPageSize = 1L * 1024 * 1024 // 1MB
+ val maxPageSize = 64L * minPageSize // 64MB
+ val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors()
+ val safetyFactor = 8
+ val size = ByteArrayMethods.nextPowerOf2(maxMemory / cores / safetyFactor)
+ val default = math.min(maxPageSize, math.max(minPageSize, size))
+ conf.getSizeAsBytes("spark.buffer.pageSize", default)
+ }
}
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 98c32bbc29..c68354ba49 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -115,6 +115,7 @@ public class UnsafeShuffleWriterSuite {
taskMetrics = new TaskMetrics();
when(shuffleMemoryManager.tryToAcquire(anyLong())).then(returnsFirstArg());
+ when(shuffleMemoryManager.pageSizeBytes()).thenReturn(128L * 1024 * 1024);
when(blockManager.diskBlockManager()).thenReturn(diskBlockManager);
when(blockManager.getDiskWriter(
@@ -549,14 +550,14 @@ public class UnsafeShuffleWriterSuite {
final long recordLengthBytes = 8;
final long pageSizeBytes = 256;
final long numRecordsPerPage = pageSizeBytes / recordLengthBytes;
- final SparkConf conf = new SparkConf().set("spark.buffer.pageSize", pageSizeBytes + "b");
+ when(shuffleMemoryManager.pageSizeBytes()).thenReturn(pageSizeBytes);
final UnsafeShuffleWriter<Object, Object> writer =
new UnsafeShuffleWriter<Object, Object>(
blockManager,
shuffleBlockResolver,
taskMemoryManager,
shuffleMemoryManager,
- new UnsafeShuffleHandle<Object, Object>(0, 1, shuffleDep),
+ new UnsafeShuffleHandle<>(0, 1, shuffleDep),
0, // map id
taskContext,
conf);
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 3c50033801..0b11562980 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -48,7 +48,7 @@ public abstract class AbstractBytesToBytesMapSuite {
@Before
public void setup() {
- shuffleMemoryManager = new ShuffleMemoryManager(Long.MAX_VALUE);
+ shuffleMemoryManager = ShuffleMemoryManager.create(Long.MAX_VALUE, PAGE_SIZE_BYTES);
taskMemoryManager = new TaskMemoryManager(new ExecutorMemoryManager(getMemoryAllocator()));
// Mocked memory manager for tests that check the maximum array size, since actually allocating
// such large arrays will cause us to run out of memory in our tests.
@@ -441,7 +441,7 @@ public abstract class AbstractBytesToBytesMapSuite {
@Test
public void failureToAllocateFirstPage() {
- shuffleMemoryManager = new ShuffleMemoryManager(1024);
+ shuffleMemoryManager = ShuffleMemoryManager.createForTesting(1024);
BytesToBytesMap map =
new BytesToBytesMap(taskMemoryManager, shuffleMemoryManager, 1, PAGE_SIZE_BYTES);
try {
@@ -461,7 +461,7 @@ public abstract class AbstractBytesToBytesMapSuite {
@Test
public void failureToGrow() {
- shuffleMemoryManager = new ShuffleMemoryManager(1024 * 10);
+ shuffleMemoryManager = ShuffleMemoryManager.createForTesting(1024 * 10);
BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager, shuffleMemoryManager, 1, 1024);
try {
boolean success = true;
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index f5300373d8..83049b8a21 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -102,7 +102,7 @@ public class UnsafeExternalSorterSuite {
MockitoAnnotations.initMocks(this);
sparkConf = new SparkConf();
tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test");
- shuffleMemoryManager = new ShuffleMemoryManager(Long.MAX_VALUE);
+ shuffleMemoryManager = ShuffleMemoryManager.create(Long.MAX_VALUE, pageSizeBytes);
spillFilesCreated.clear();
taskContext = mock(TaskContext.class);
when(taskContext.taskMetrics()).thenReturn(new TaskMetrics());
@@ -237,7 +237,7 @@ public class UnsafeExternalSorterSuite {
@Test
public void spillingOccursInResponseToMemoryPressure() throws Exception {
- shuffleMemoryManager = new ShuffleMemoryManager(pageSizeBytes * 2);
+ shuffleMemoryManager = ShuffleMemoryManager.create(pageSizeBytes * 2, pageSizeBytes);
final UnsafeExternalSorter sorter = newSorter();
final int numRecords = (int) pageSizeBytes / 4;
for (int i = 0; i <= numRecords; i++) {
diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
index f495b6a037..6d45b1a101 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
@@ -24,7 +24,7 @@ import org.mockito.Mockito._
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{SparkFunSuite, TaskContext}
+import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext}
class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
@@ -50,7 +50,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
}
test("single task requesting memory") {
- val manager = new ShuffleMemoryManager(1000L)
+ val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
assert(manager.tryToAcquire(100L) === 100L)
assert(manager.tryToAcquire(400L) === 400L)
@@ -72,7 +72,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
// Two threads request 500 bytes first, wait for each other to get it, and then request
// 500 more; we should immediately return 0 as both are now at 1 / N
- val manager = new ShuffleMemoryManager(1000L)
+ val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
class State {
var t1Result1 = -1L
@@ -124,7 +124,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
// Two tasks request 250 bytes first, wait for each other to get it, and then request
// 500 more; we should only grant 250 bytes to each of them on this second request
- val manager = new ShuffleMemoryManager(1000L)
+ val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
class State {
var t1Result1 = -1L
@@ -176,7 +176,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
// for a bit and releases 250 bytes, which should then be granted to t2. Further requests
// by t2 will return false right away because it now has 1 / 2N of the memory.
- val manager = new ShuffleMemoryManager(1000L)
+ val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
class State {
var t1Requested = false
@@ -241,7 +241,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
// t1 grabs 1000 bytes and then waits until t2 is ready to make a request. It sleeps
// for a bit and releases all its memory. t2 should now be able to grab all the memory.
- val manager = new ShuffleMemoryManager(1000L)
+ val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
class State {
var t1Requested = false
@@ -307,7 +307,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
}
test("tasks should not be granted a negative size") {
- val manager = new ShuffleMemoryManager(1000L)
+ val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
manager.tryToAcquire(700L)
val latch = new CountDownLatch(1)
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 60be85e53e..cd4c55f79f 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -54,7 +54,6 @@ def launch_gateway():
if os.environ.get("SPARK_TESTING"):
submit_args = ' '.join([
"--conf spark.ui.enabled=false",
- "--conf spark.buffer.pageSize=4mb",
submit_args
])
command = [os.path.join(SPARK_HOME, script)] + shlex.split(submit_args)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index b9d44aace1..4d5e98a3e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -342,7 +342,7 @@ class TungstenAggregationIterator(
TaskContext.get.taskMemoryManager(),
SparkEnv.get.shuffleMemoryManager,
1024 * 16, // initial capacity
- SparkEnv.get.conf.getSizeAsBytes("spark.buffer.pageSize", "64m"),
+ SparkEnv.get.shuffleMemoryManager.pageSizeBytes,
false // disable tracking of performance metrics
)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 3f257ecdd1..953abf409f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -282,17 +282,15 @@ private[joins] final class UnsafeHashedRelation(
// This is used in Broadcast, shared by multiple tasks, so we use on-heap memory
val taskMemoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP))
+ val pageSizeBytes = Option(SparkEnv.get).map(_.shuffleMemoryManager.pageSizeBytes)
+ .getOrElse(new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "16m"))
+
// Dummy shuffle memory manager which always grants all memory allocation requests.
// We use this because it doesn't make sense count shared broadcast variables' memory usage
// towards individual tasks' quotas. In the future, we should devise a better way of handling
// this.
- val shuffleMemoryManager = new ShuffleMemoryManager(new SparkConf()) {
- override def tryToAcquire(numBytes: Long): Long = numBytes
- override def release(numBytes: Long): Unit = {}
- }
-
- val pageSizeBytes = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
- .getSizeAsBytes("spark.buffer.pageSize", "64m")
+ val shuffleMemoryManager =
+ ShuffleMemoryManager.create(maxMemory = Long.MaxValue, pageSizeBytes = pageSizeBytes)
binaryMap = new BytesToBytesMap(
taskMemoryManager,
@@ -306,11 +304,11 @@ private[joins] final class UnsafeHashedRelation(
while (i < nKeys) {
val keySize = in.readInt()
val valuesSize = in.readInt()
- if (keySize > keyBuffer.size) {
+ if (keySize > keyBuffer.length) {
keyBuffer = new Array[Byte](keySize)
}
in.readFully(keyBuffer, 0, keySize)
- if (valuesSize > valuesBuffer.size) {
+ if (valuesSize > valuesBuffer.length) {
valuesBuffer = new Array[Byte](valuesSize)
}
in.readFully(valuesBuffer, 0, valuesSize)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index 7f69cdb08a..e316930470 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution
-import org.apache.spark.{InternalAccumulator, TaskContext}
+import org.apache.spark.{SparkEnv, InternalAccumulator, TaskContext}
import org.apache.spark.rdd.{MapPartitionsWithPreparationRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
@@ -122,7 +122,7 @@ case class TungstenSort(
protected override def doExecute(): RDD[InternalRow] = {
val schema = child.schema
val childOutput = child.output
- val pageSize = sparkContext.conf.getSizeAsBytes("spark.buffer.pageSize", "64m")
+ val pageSize = SparkEnv.get.shuffleMemoryManager.pageSizeBytes
/**
* Set up the sorter in each partition before computing the parent partition.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala
index 3854f5bd39..3854f5bd39 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
index 53de2d0f07..48c3938ff8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
@@ -22,7 +22,7 @@ import org.apache.spark.shuffle.ShuffleMemoryManager
/**
* A [[ShuffleMemoryManager]] that can be controlled to run out of memory.
*/
-class TestShuffleMemoryManager extends ShuffleMemoryManager(Long.MaxValue) {
+class TestShuffleMemoryManager extends ShuffleMemoryManager(Long.MaxValue, 4 * 1024 * 1024) {
private var oom = false
override def tryToAcquire(numBytes: Long): Long = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 167086db5b..296cc5c5e0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -52,7 +52,6 @@ object TestHive
.set("spark.sql.test", "")
.set("spark.sql.hive.metastore.barrierPrefixes",
"org.apache.spark.sql.hive.execution.PairSerDe")
- .set("spark.buffer.pageSize", "4m")
// SPARK-8910
.set("spark.ui.enabled", "false")))
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java b/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
index cf693d01a4..70b81ce015 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
@@ -25,6 +25,12 @@ public class ByteArrayMethods {
// Private constructor, since this class only contains static methods.
}
+ /** Returns the next number greater or equal num that is power of 2. */
+ public static long nextPowerOf2(long num) {
+ final long highBit = Long.highestOneBit(num);
+ return (highBit == num) ? num : highBit << 1;
+ }
+
public static int roundNumberOfBytesToNearestWord(int numBytes) {
int remainder = numBytes & 0x07; // This is equivalent to `numBytes % 8`
if (remainder == 0) {