aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xR/run-tests.sh2
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java3
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java8
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java1
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala53
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java5
-rw-r--r--core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java6
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java4
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala14
-rw-r--r--python/pyspark/java_gateway.py1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala)0
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala1
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java6
20 files changed, 93 insertions, 46 deletions
diff --git a/R/run-tests.sh b/R/run-tests.sh
index 18a1e13bdc..e82ad0ba2c 100755
--- a/R/run-tests.sh
+++ b/R/run-tests.sh
@@ -23,7 +23,7 @@ FAILED=0
LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE
-SPARK_TESTING=1 $FWDIR/../bin/sparkR --conf spark.buffer.pageSize=4m --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
+SPARK_TESTING=1 $FWDIR/../bin/sparkR --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
FAILED=$((PIPESTATUS[0]||$FAILED))
if [[ $FAILED != 0 ]]; then
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
index bf4eaa59ff..f6e0913a7a 100644
--- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
@@ -115,8 +115,7 @@ final class UnsafeShuffleExternalSorter {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.pageSizeBytes = (int) Math.min(
- PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,
- conf.getSizeAsBytes("spark.buffer.pageSize", "64m"));
+ PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, shuffleMemoryManager.pageSizeBytes());
this.maxRecordSizeBytes = pageSizeBytes - 4;
this.writeMetrics = writeMetrics;
initializeForWriting();
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 5ac3736ac6..0636ae7c8d 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -642,7 +642,7 @@ public final class BytesToBytesMap {
private void allocate(int capacity) {
assert (capacity >= 0);
// The capacity needs to be divisible by 64 so that our bit set can be sized properly
- capacity = Math.max((int) Math.min(MAX_CAPACITY, nextPowerOf2(capacity)), 64);
+ capacity = Math.max((int) Math.min(MAX_CAPACITY, ByteArrayMethods.nextPowerOf2(capacity)), 64);
assert (capacity <= MAX_CAPACITY);
longArray = new LongArray(MemoryBlock.fromLongArray(new long[capacity * 2]));
bitset = new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64]));
@@ -770,10 +770,4 @@ public final class BytesToBytesMap {
timeSpentResizingNs += System.nanoTime() - resizeStartTime;
}
}
-
- /** Returns the next number greater or equal num that is power of 2. */
- private static long nextPowerOf2(long num) {
- final long highBit = Long.highestOneBit(num);
- return (highBit == num) ? num : highBit << 1;
- }
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 4c54ba4bce..5ebbf9b068 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -127,7 +127,6 @@ public final class UnsafeExternalSorter {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.fileBufferSizeBytes = 32 * 1024;
- // this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
this.pageSizeBytes = pageSizeBytes;
this.writeMetrics = new ShuffleWriteMetrics();
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 08bab4bf27..8ff154fb5e 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -250,6 +250,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
/**
+ * Get a size parameter as bytes, falling back to a default if not set.
+ */
+ def getSizeAsBytes(key: String, defaultValue: Long): Long = {
+ Utils.byteStringAsBytes(get(key, defaultValue + "B"))
+ }
+
+ /**
* Get a size parameter as Kibibytes; throws a NoSuchElementException if it's not set. If no
* suffix is provided then Kibibytes are assumed.
* @throws NoSuchElementException
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 0c0705325b..5662686436 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -629,7 +629,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* [[org.apache.spark.SparkContext.setLocalProperty]].
*/
def getLocalProperty(key: String): String =
- Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
+ Option(localProperties.get).map(_.getProperty(key)).orNull
/** Set a human readable description of the current job. */
def setJobDescription(value: String) {
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index adfece4d6e..a796e72850 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -324,7 +324,7 @@ object SparkEnv extends Logging {
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
- val shuffleMemoryManager = new ShuffleMemoryManager(conf)
+ val shuffleMemoryManager = ShuffleMemoryManager.create(conf, numUsableCores)
val blockTransferService =
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
index e3d229cc99..8c3a72644c 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
@@ -19,6 +19,9 @@ package org.apache.spark.shuffle
import scala.collection.mutable
+import com.google.common.annotations.VisibleForTesting
+
+import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.{Logging, SparkException, SparkConf, TaskContext}
/**
@@ -34,11 +37,19 @@ import org.apache.spark.{Logging, SparkException, SparkConf, TaskContext}
* set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever
* this set changes. This is all done by synchronizing access on "this" to mutate state and using
* wait() and notifyAll() to signal changes.
+ *
+ * Use `ShuffleMemoryManager.create()` factory method to create a new instance.
+ *
+ * @param maxMemory total amount of memory available for execution, in bytes.
+ * @param pageSizeBytes number of bytes for each page, by default.
*/
-private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
- private val taskMemory = new mutable.HashMap[Long, Long]() // taskAttemptId -> memory bytes
+private[spark]
+class ShuffleMemoryManager protected (
+ val maxMemory: Long,
+ val pageSizeBytes: Long)
+ extends Logging {
- def this(conf: SparkConf) = this(ShuffleMemoryManager.getMaxMemory(conf))
+ private val taskMemory = new mutable.HashMap[Long, Long]() // taskAttemptId -> memory bytes
private def currentTaskAttemptId(): Long = {
// In case this is called on the driver, return an invalid task attempt id.
@@ -124,15 +135,49 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
}
}
+
private[spark] object ShuffleMemoryManager {
+
+ def create(conf: SparkConf, numCores: Int): ShuffleMemoryManager = {
+ val maxMemory = ShuffleMemoryManager.getMaxMemory(conf)
+ val pageSize = ShuffleMemoryManager.getPageSize(conf, maxMemory, numCores)
+ new ShuffleMemoryManager(maxMemory, pageSize)
+ }
+
+ def create(maxMemory: Long, pageSizeBytes: Long): ShuffleMemoryManager = {
+ new ShuffleMemoryManager(maxMemory, pageSizeBytes)
+ }
+
+ @VisibleForTesting
+ def createForTesting(maxMemory: Long): ShuffleMemoryManager = {
+ new ShuffleMemoryManager(maxMemory, 4 * 1024 * 1024)
+ }
+
/**
* Figure out the shuffle memory limit from a SparkConf. We currently have both a fraction
* of the memory pool and a safety factor since collections can sometimes grow bigger than
* the size we target before we estimate their sizes again.
*/
- def getMaxMemory(conf: SparkConf): Long = {
+ private def getMaxMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}
+
+ /**
+ * Sets the page size, in bytes.
+ *
+ * If user didn't explicitly set "spark.buffer.pageSize", we figure out the default value
+ * by looking at the number of cores available to the process, and the total amount of memory,
+ * and then divide it by a factor of safety.
+ */
+ private def getPageSize(conf: SparkConf, maxMemory: Long, numCores: Int): Long = {
+ val minPageSize = 1L * 1024 * 1024 // 1MB
+ val maxPageSize = 64L * minPageSize // 64MB
+ val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors()
+ val safetyFactor = 8
+ val size = ByteArrayMethods.nextPowerOf2(maxMemory / cores / safetyFactor)
+ val default = math.min(maxPageSize, math.max(minPageSize, size))
+ conf.getSizeAsBytes("spark.buffer.pageSize", default)
+ }
}
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 98c32bbc29..c68354ba49 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -115,6 +115,7 @@ public class UnsafeShuffleWriterSuite {
taskMetrics = new TaskMetrics();
when(shuffleMemoryManager.tryToAcquire(anyLong())).then(returnsFirstArg());
+ when(shuffleMemoryManager.pageSizeBytes()).thenReturn(128L * 1024 * 1024);
when(blockManager.diskBlockManager()).thenReturn(diskBlockManager);
when(blockManager.getDiskWriter(
@@ -549,14 +550,14 @@ public class UnsafeShuffleWriterSuite {
final long recordLengthBytes = 8;
final long pageSizeBytes = 256;
final long numRecordsPerPage = pageSizeBytes / recordLengthBytes;
- final SparkConf conf = new SparkConf().set("spark.buffer.pageSize", pageSizeBytes + "b");
+ when(shuffleMemoryManager.pageSizeBytes()).thenReturn(pageSizeBytes);
final UnsafeShuffleWriter<Object, Object> writer =
new UnsafeShuffleWriter<Object, Object>(
blockManager,
shuffleBlockResolver,
taskMemoryManager,
shuffleMemoryManager,
- new UnsafeShuffleHandle<Object, Object>(0, 1, shuffleDep),
+ new UnsafeShuffleHandle<>(0, 1, shuffleDep),
0, // map id
taskContext,
conf);
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 3c50033801..0b11562980 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -48,7 +48,7 @@ public abstract class AbstractBytesToBytesMapSuite {
@Before
public void setup() {
- shuffleMemoryManager = new ShuffleMemoryManager(Long.MAX_VALUE);
+ shuffleMemoryManager = ShuffleMemoryManager.create(Long.MAX_VALUE, PAGE_SIZE_BYTES);
taskMemoryManager = new TaskMemoryManager(new ExecutorMemoryManager(getMemoryAllocator()));
// Mocked memory manager for tests that check the maximum array size, since actually allocating
// such large arrays will cause us to run out of memory in our tests.
@@ -441,7 +441,7 @@ public abstract class AbstractBytesToBytesMapSuite {
@Test
public void failureToAllocateFirstPage() {
- shuffleMemoryManager = new ShuffleMemoryManager(1024);
+ shuffleMemoryManager = ShuffleMemoryManager.createForTesting(1024);
BytesToBytesMap map =
new BytesToBytesMap(taskMemoryManager, shuffleMemoryManager, 1, PAGE_SIZE_BYTES);
try {
@@ -461,7 +461,7 @@ public abstract class AbstractBytesToBytesMapSuite {
@Test
public void failureToGrow() {
- shuffleMemoryManager = new ShuffleMemoryManager(1024 * 10);
+ shuffleMemoryManager = ShuffleMemoryManager.createForTesting(1024 * 10);
BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager, shuffleMemoryManager, 1, 1024);
try {
boolean success = true;
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index f5300373d8..83049b8a21 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -102,7 +102,7 @@ public class UnsafeExternalSorterSuite {
MockitoAnnotations.initMocks(this);
sparkConf = new SparkConf();
tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test");
- shuffleMemoryManager = new ShuffleMemoryManager(Long.MAX_VALUE);
+ shuffleMemoryManager = ShuffleMemoryManager.create(Long.MAX_VALUE, pageSizeBytes);
spillFilesCreated.clear();
taskContext = mock(TaskContext.class);
when(taskContext.taskMetrics()).thenReturn(new TaskMetrics());
@@ -237,7 +237,7 @@ public class UnsafeExternalSorterSuite {
@Test
public void spillingOccursInResponseToMemoryPressure() throws Exception {
- shuffleMemoryManager = new ShuffleMemoryManager(pageSizeBytes * 2);
+ shuffleMemoryManager = ShuffleMemoryManager.create(pageSizeBytes * 2, pageSizeBytes);
final UnsafeExternalSorter sorter = newSorter();
final int numRecords = (int) pageSizeBytes / 4;
for (int i = 0; i <= numRecords; i++) {
diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
index f495b6a037..6d45b1a101 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
@@ -24,7 +24,7 @@ import org.mockito.Mockito._
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{SparkFunSuite, TaskContext}
+import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext}
class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
@@ -50,7 +50,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
}
test("single task requesting memory") {
- val manager = new ShuffleMemoryManager(1000L)
+ val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
assert(manager.tryToAcquire(100L) === 100L)
assert(manager.tryToAcquire(400L) === 400L)
@@ -72,7 +72,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
// Two threads request 500 bytes first, wait for each other to get it, and then request
// 500 more; we should immediately return 0 as both are now at 1 / N
- val manager = new ShuffleMemoryManager(1000L)
+ val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
class State {
var t1Result1 = -1L
@@ -124,7 +124,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
// Two tasks request 250 bytes first, wait for each other to get it, and then request
// 500 more; we should only grant 250 bytes to each of them on this second request
- val manager = new ShuffleMemoryManager(1000L)
+ val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
class State {
var t1Result1 = -1L
@@ -176,7 +176,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
// for a bit and releases 250 bytes, which should then be granted to t2. Further requests
// by t2 will return false right away because it now has 1 / 2N of the memory.
- val manager = new ShuffleMemoryManager(1000L)
+ val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
class State {
var t1Requested = false
@@ -241,7 +241,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
// t1 grabs 1000 bytes and then waits until t2 is ready to make a request. It sleeps
// for a bit and releases all its memory. t2 should now be able to grab all the memory.
- val manager = new ShuffleMemoryManager(1000L)
+ val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
class State {
var t1Requested = false
@@ -307,7 +307,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
}
test("tasks should not be granted a negative size") {
- val manager = new ShuffleMemoryManager(1000L)
+ val manager = ShuffleMemoryManager.createForTesting(maxMemory = 1000L)
manager.tryToAcquire(700L)
val latch = new CountDownLatch(1)
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 60be85e53e..cd4c55f79f 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -54,7 +54,6 @@ def launch_gateway():
if os.environ.get("SPARK_TESTING"):
submit_args = ' '.join([
"--conf spark.ui.enabled=false",
- "--conf spark.buffer.pageSize=4mb",
submit_args
])
command = [os.path.join(SPARK_HOME, script)] + shlex.split(submit_args)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index b9d44aace1..4d5e98a3e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -342,7 +342,7 @@ class TungstenAggregationIterator(
TaskContext.get.taskMemoryManager(),
SparkEnv.get.shuffleMemoryManager,
1024 * 16, // initial capacity
- SparkEnv.get.conf.getSizeAsBytes("spark.buffer.pageSize", "64m"),
+ SparkEnv.get.shuffleMemoryManager.pageSizeBytes,
false // disable tracking of performance metrics
)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 3f257ecdd1..953abf409f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -282,17 +282,15 @@ private[joins] final class UnsafeHashedRelation(
// This is used in Broadcast, shared by multiple tasks, so we use on-heap memory
val taskMemoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP))
+ val pageSizeBytes = Option(SparkEnv.get).map(_.shuffleMemoryManager.pageSizeBytes)
+ .getOrElse(new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "16m"))
+
// Dummy shuffle memory manager which always grants all memory allocation requests.
// We use this because it doesn't make sense count shared broadcast variables' memory usage
// towards individual tasks' quotas. In the future, we should devise a better way of handling
// this.
- val shuffleMemoryManager = new ShuffleMemoryManager(new SparkConf()) {
- override def tryToAcquire(numBytes: Long): Long = numBytes
- override def release(numBytes: Long): Unit = {}
- }
-
- val pageSizeBytes = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
- .getSizeAsBytes("spark.buffer.pageSize", "64m")
+ val shuffleMemoryManager =
+ ShuffleMemoryManager.create(maxMemory = Long.MaxValue, pageSizeBytes = pageSizeBytes)
binaryMap = new BytesToBytesMap(
taskMemoryManager,
@@ -306,11 +304,11 @@ private[joins] final class UnsafeHashedRelation(
while (i < nKeys) {
val keySize = in.readInt()
val valuesSize = in.readInt()
- if (keySize > keyBuffer.size) {
+ if (keySize > keyBuffer.length) {
keyBuffer = new Array[Byte](keySize)
}
in.readFully(keyBuffer, 0, keySize)
- if (valuesSize > valuesBuffer.size) {
+ if (valuesSize > valuesBuffer.length) {
valuesBuffer = new Array[Byte](valuesSize)
}
in.readFully(valuesBuffer, 0, valuesSize)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index 7f69cdb08a..e316930470 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution
-import org.apache.spark.{InternalAccumulator, TaskContext}
+import org.apache.spark.{SparkEnv, InternalAccumulator, TaskContext}
import org.apache.spark.rdd.{MapPartitionsWithPreparationRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
@@ -122,7 +122,7 @@ case class TungstenSort(
protected override def doExecute(): RDD[InternalRow] = {
val schema = child.schema
val childOutput = child.output
- val pageSize = sparkContext.conf.getSizeAsBytes("spark.buffer.pageSize", "64m")
+ val pageSize = SparkEnv.get.shuffleMemoryManager.pageSizeBytes
/**
* Set up the sorter in each partition before computing the parent partition.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala
index 3854f5bd39..3854f5bd39 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
index 53de2d0f07..48c3938ff8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
@@ -22,7 +22,7 @@ import org.apache.spark.shuffle.ShuffleMemoryManager
/**
* A [[ShuffleMemoryManager]] that can be controlled to run out of memory.
*/
-class TestShuffleMemoryManager extends ShuffleMemoryManager(Long.MaxValue) {
+class TestShuffleMemoryManager extends ShuffleMemoryManager(Long.MaxValue, 4 * 1024 * 1024) {
private var oom = false
override def tryToAcquire(numBytes: Long): Long = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 167086db5b..296cc5c5e0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -52,7 +52,6 @@ object TestHive
.set("spark.sql.test", "")
.set("spark.sql.hive.metastore.barrierPrefixes",
"org.apache.spark.sql.hive.execution.PairSerDe")
- .set("spark.buffer.pageSize", "4m")
// SPARK-8910
.set("spark.ui.enabled", "false")))
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java b/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
index cf693d01a4..70b81ce015 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
@@ -25,6 +25,12 @@ public class ByteArrayMethods {
// Private constructor, since this class only contains static methods.
}
+ /** Returns the next number greater or equal num that is power of 2. */
+ public static long nextPowerOf2(long num) {
+ final long highBit = Long.highestOneBit(num);
+ return (highBit == num) ? num : highBit << 1;
+ }
+
public static int roundNumberOfBytesToNearestWord(int numBytes) {
int remainder = numBytes & 0x07; // This is equivalent to `numBytes % 8`
if (remainder == 0) {