aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-08-06 23:18:29 -0700
committerReynold Xin <rxin@databricks.com>2015-08-06 23:18:29 -0700
commit4309262ec9146d7158ee9957a128bb152289d557 (patch)
tree47f6dabaaea5c5fcff48dcbdb114aede873ae320 /sql
parent7aaed1b114751a24835204b8c588533d5c5ffaf0 (diff)
downloadspark-4309262ec9146d7158ee9957a128bb152289d557.tar.gz
spark-4309262ec9146d7158ee9957a128bb152289d557.tar.bz2
spark-4309262ec9146d7158ee9957a128bb152289d557.zip
[SPARK-9700] Pick default page size more intelligently.
Previously, we use 64MB as the default page size, which was way too big for a lot of Spark applications (especially for single node). This patch changes it so that the default page size, if unset by the user, is determined by the number of cores available and the total execution memory available. Author: Reynold Xin <rxin@databricks.com> Closes #8012 from rxin/pagesize and squashes the following commits: 16f4756 [Reynold Xin] Fixed failing test. 5afd570 [Reynold Xin] private... 0d5fb98 [Reynold Xin] Update default value. 674a6cd [Reynold Xin] Address review feedback. dc00e05 [Reynold Xin] Merge with master. 73ebdb6 [Reynold Xin] [SPARK-9700] Pick default page size more intelligently.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala)0
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala1
6 files changed, 11 insertions, 14 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index b9d44aace1..4d5e98a3e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -342,7 +342,7 @@ class TungstenAggregationIterator(
TaskContext.get.taskMemoryManager(),
SparkEnv.get.shuffleMemoryManager,
1024 * 16, // initial capacity
- SparkEnv.get.conf.getSizeAsBytes("spark.buffer.pageSize", "64m"),
+ SparkEnv.get.shuffleMemoryManager.pageSizeBytes,
false // disable tracking of performance metrics
)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 3f257ecdd1..953abf409f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -282,17 +282,15 @@ private[joins] final class UnsafeHashedRelation(
// This is used in Broadcast, shared by multiple tasks, so we use on-heap memory
val taskMemoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP))
+ val pageSizeBytes = Option(SparkEnv.get).map(_.shuffleMemoryManager.pageSizeBytes)
+ .getOrElse(new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "16m"))
+
// Dummy shuffle memory manager which always grants all memory allocation requests.
// We use this because it doesn't make sense count shared broadcast variables' memory usage
// towards individual tasks' quotas. In the future, we should devise a better way of handling
// this.
- val shuffleMemoryManager = new ShuffleMemoryManager(new SparkConf()) {
- override def tryToAcquire(numBytes: Long): Long = numBytes
- override def release(numBytes: Long): Unit = {}
- }
-
- val pageSizeBytes = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
- .getSizeAsBytes("spark.buffer.pageSize", "64m")
+ val shuffleMemoryManager =
+ ShuffleMemoryManager.create(maxMemory = Long.MaxValue, pageSizeBytes = pageSizeBytes)
binaryMap = new BytesToBytesMap(
taskMemoryManager,
@@ -306,11 +304,11 @@ private[joins] final class UnsafeHashedRelation(
while (i < nKeys) {
val keySize = in.readInt()
val valuesSize = in.readInt()
- if (keySize > keyBuffer.size) {
+ if (keySize > keyBuffer.length) {
keyBuffer = new Array[Byte](keySize)
}
in.readFully(keyBuffer, 0, keySize)
- if (valuesSize > valuesBuffer.size) {
+ if (valuesSize > valuesBuffer.length) {
valuesBuffer = new Array[Byte](valuesSize)
}
in.readFully(valuesBuffer, 0, valuesSize)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index 7f69cdb08a..e316930470 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution
-import org.apache.spark.{InternalAccumulator, TaskContext}
+import org.apache.spark.{SparkEnv, InternalAccumulator, TaskContext}
import org.apache.spark.rdd.{MapPartitionsWithPreparationRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
@@ -122,7 +122,7 @@ case class TungstenSort(
protected override def doExecute(): RDD[InternalRow] = {
val schema = child.schema
val childOutput = child.output
- val pageSize = sparkContext.conf.getSizeAsBytes("spark.buffer.pageSize", "64m")
+ val pageSize = SparkEnv.get.shuffleMemoryManager.pageSizeBytes
/**
* Set up the sorter in each partition before computing the parent partition.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala
index 3854f5bd39..3854f5bd39 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypesConverter.scala
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
index 53de2d0f07..48c3938ff8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
@@ -22,7 +22,7 @@ import org.apache.spark.shuffle.ShuffleMemoryManager
/**
* A [[ShuffleMemoryManager]] that can be controlled to run out of memory.
*/
-class TestShuffleMemoryManager extends ShuffleMemoryManager(Long.MaxValue) {
+class TestShuffleMemoryManager extends ShuffleMemoryManager(Long.MaxValue, 4 * 1024 * 1024) {
private var oom = false
override def tryToAcquire(numBytes: Long): Long = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 167086db5b..296cc5c5e0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -52,7 +52,6 @@ object TestHive
.set("spark.sql.test", "")
.set("spark.sql.hive.metastore.barrierPrefixes",
"org.apache.spark.sql.hive.execution.PairSerDe")
- .set("spark.buffer.pageSize", "4m")
// SPARK-8910
.set("spark.ui.enabled", "false")))