diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala | 16 |
1 files changed, 15 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index cff7a01269..d7c811ca89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -41,11 +41,21 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una /** We must copy rows when sort based shuffle is on */ protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] + private val bypassMergeThreshold = + child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) + override def execute() = attachTree(this , "execute") { newPartitioning match { case HashPartitioning(expressions, numPartitions) => // TODO: Eliminate redundant expressions in grouping key and value. - val rdd = if (sortBasedShuffleOn) { + // This is a workaround for SPARK-4479. When: + // 1. sort based shuffle is on, and + // 2. the partition number is under the merge threshold, and + // 3. no ordering is required + // we can avoid the defensive copies to improve performance. In the long run, we probably + // want to include information in shuffle dependencies to indicate whether elements in the + // source RDD should be copied. + val rdd = if (sortBasedShuffleOn && numPartitions > bypassMergeThreshold) { child.execute().mapPartitions { iter => val hashExpressions = newMutableProjection(expressions, child.output)() iter.map(r => (hashExpressions(r).copy(), r.copy())) @@ -82,6 +92,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una shuffled.map(_._1) case SinglePartition => + // SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since + // operators like `TakeOrdered` may require an ordering within the partition, and currently + // `SinglePartition` doesn't include ordering information. + // TODO Add `SingleOrderedPartition` for operators like `TakeOrdered` val rdd = if (sortBasedShuffleOn) { child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) } } else { |