aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala23
1 files changed, 8 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 1d3379a5e2..7f60c8f5ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -24,7 +24,6 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.shuffle.sort.SortShuffleManager
-import org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.attachTree
@@ -87,10 +86,8 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
// fewer partitions (like RangePartitioner, for example).
val conf = child.sqlContext.sparkContext.conf
val shuffleManager = SparkEnv.get.shuffleManager
- val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager] ||
- shuffleManager.isInstanceOf[UnsafeShuffleManager]
+ val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager]
val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
- val serializeMapOutputs = conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true)
if (sortBasedShuffleOn) {
val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) {
@@ -99,22 +96,18 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
// doesn't buffer deserialized records.
// Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass.
false
- } else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) {
- // SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting
- // them. This optimization is guarded by a feature-flag and is only applied in cases where
- // shuffle dependency does not specify an aggregator or ordering and the record serializer
- // has certain properties. If this optimization is enabled, we can safely avoid the copy.
+ } else if (serializer.supportsRelocationOfSerializedObjects) {
+ // SPARK-4550 and SPARK-7081 extended sort-based shuffle to serialize individual records
+ // prior to sorting them. This optimization is only applied in cases where shuffle
+ // dependency does not specify an aggregator or ordering and the record serializer has
+ // certain properties. If this optimization is enabled, we can safely avoid the copy.
//
// Exchange never configures its ShuffledRDDs with aggregators or key orderings, so we only
// need to check whether the optimization is enabled and supported by our serializer.
- //
- // This optimization also applies to UnsafeShuffleManager (added in SPARK-7081).
false
} else {
- // Spark's SortShuffleManager uses `ExternalSorter` to buffer records in memory. This code
- // path is used both when SortShuffleManager is used and when UnsafeShuffleManager falls
- // back to SortShuffleManager to perform a shuffle that the new fast path can't handle. In
- // both cases, we must copy.
+ // Spark's SortShuffleManager uses `ExternalSorter` to buffer records in memory, so we must
+ // copy.
true
}
} else if (shuffleManager.isInstanceOf[HashShuffleManager]) {