aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala16
1 files changed, 15 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index cff7a01269..d7c811ca89 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -41,11 +41,21 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
/** We must copy rows when sort based shuffle is on */
protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
+ private val bypassMergeThreshold =
+ child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
+
override def execute() = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
- val rdd = if (sortBasedShuffleOn) {
+ // This is a workaround for SPARK-4479. When:
+ // 1. sort based shuffle is on, and
+ // 2. the partition number is under the merge threshold, and
+ // 3. no ordering is required
+ // we can avoid the defensive copies to improve performance. In the long run, we probably
+ // want to include information in shuffle dependencies to indicate whether elements in the
+ // source RDD should be copied.
+ val rdd = if (sortBasedShuffleOn && numPartitions > bypassMergeThreshold) {
child.execute().mapPartitions { iter =>
val hashExpressions = newMutableProjection(expressions, child.output)()
iter.map(r => (hashExpressions(r).copy(), r.copy()))
@@ -82,6 +92,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
shuffled.map(_._1)
case SinglePartition =>
+ // SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since
+ // operators like `TakeOrdered` may require an ordering within the partition, and currently
+ // `SinglePartition` doesn't include ordering information.
+ // TODO Add `SingleOrderedPartition` for operators like `TakeOrdered`
val rdd = if (sortBasedShuffleOn) {
child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) }
} else {