aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala28
1 files changed, 18 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index c3d2c7019a..3e46596ecf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -17,17 +17,18 @@
package org.apache.spark.sql.execution
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv}
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.serializer.Serializer
-import org.apache.spark.sql.{SQLContext, Row}
+import org.apache.spark.shuffle.sort.SortShuffleManager
+import org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.util.MutablePair
object Exchange {
@@ -85,7 +86,9 @@ case class Exchange(
// corner-cases where a partitioner constructed with `numPartitions` partitions may output
// fewer partitions (like RangePartitioner, for example).
val conf = child.sqlContext.sparkContext.conf
- val sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
+ val shuffleManager = SparkEnv.get.shuffleManager
+ val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager] ||
+ shuffleManager.isInstanceOf[UnsafeShuffleManager]
val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
val serializeMapOutputs = conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true)
if (newOrdering.nonEmpty) {
@@ -93,11 +96,11 @@ case class Exchange(
// which requires a defensive copy.
true
} else if (sortBasedShuffleOn) {
- // Spark's sort-based shuffle also uses `ExternalSorter` to buffer records in memory.
- // However, there are two special cases where we can avoid the copy, described below:
- if (partitioner.numPartitions <= bypassMergeThreshold) {
- // If the number of output partitions is sufficiently small, then Spark will fall back to
- // the old hash-based shuffle write path which doesn't buffer deserialized records.
+ val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
+ if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) {
+ // If we're using the original SortShuffleManager and the number of output partitions is
+ // sufficiently small, then Spark will fall back to the hash-based shuffle write path, which
+ // doesn't buffer deserialized records.
// Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass.
false
} else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) {
@@ -105,9 +108,14 @@ case class Exchange(
// them. This optimization is guarded by a feature-flag and is only applied in cases where
// shuffle dependency does not specify an ordering and the record serializer has certain
// properties. If this optimization is enabled, we can safely avoid the copy.
+ //
+ // This optimization also applies to UnsafeShuffleManager (added in SPARK-7081).
false
} else {
- // None of the special cases held, so we must copy.
+ // Spark's SortShuffleManager uses `ExternalSorter` to buffer records in memory. This code
+ // path is used both when SortShuffleManager is used and when UnsafeShuffleManager falls
+ // back to SortShuffleManager to perform a shuffle that the new fast path can't handle. In
+ // both cases, we must copy.
true
}
} else {