aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala6
2 files changed, 6 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
index 8c3a72644c..a0d8abc2ee 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
@@ -175,7 +175,9 @@ private[spark] object ShuffleMemoryManager {
val minPageSize = 1L * 1024 * 1024 // 1MB
val maxPageSize = 64L * minPageSize // 64MB
val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors()
- val safetyFactor = 8
+ // Because of rounding to next power of 2, we may have safetyFactor as 8 in worst case
+ val safetyFactor = 16
+ // TODO(davies): don't round to next power of 2
val size = ByteArrayMethods.nextPowerOf2(maxMemory / cores / safetyFactor)
val default = math.min(maxPageSize, math.max(minPageSize, size))
conf.getSizeAsBytes("spark.buffer.pageSize", default)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index e316930470..40ef7c3b53 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -17,15 +17,15 @@
package org.apache.spark.sql.execution
-import org.apache.spark.{SparkEnv, InternalAccumulator, TaskContext}
import org.apache.spark.rdd.{MapPartitionsWithPreparationRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{UnspecifiedDistribution, OrderedDistribution, Distribution}
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.ExternalSorter
+import org.apache.spark.{SparkEnv, InternalAccumulator, TaskContext}
////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines various sort operators.
@@ -122,7 +122,6 @@ case class TungstenSort(
protected override def doExecute(): RDD[InternalRow] = {
val schema = child.schema
val childOutput = child.output
- val pageSize = SparkEnv.get.shuffleMemoryManager.pageSizeBytes
/**
* Set up the sorter in each partition before computing the parent partition.
@@ -143,6 +142,7 @@ case class TungstenSort(
}
}
+ val pageSize = SparkEnv.get.shuffleMemoryManager.pageSizeBytes
val sorter = new UnsafeExternalRowSorter(
schema, ordering, prefixComparator, prefixComputer, pageSize)
if (testSpillFrequency > 0) {