From a6d7b61f92dc7c1f9632cecb232afa8040ab2b4d Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 24 Nov 2014 12:43:45 -0800 Subject: [SPARK-4479][SQL] Avoids unnecessary defensive copies when sort based shuffle is on This PR is a workaround for SPARK-4479. Two changes are introduced: when merge sort is bypassed in `ExternalSorter`, 1. also bypass RDD elements buffering as buffering is the reason that `MutableRow` backed row objects must be copied, and 2. avoids defensive copies in `Exchange` operator [Review on Reviewable](https://reviewable.io/reviews/apache/spark/3422) Author: Cheng Lian Closes #3422 from liancheng/avoids-defensive-copies and squashes the following commits: 591f2e9 [Cheng Lian] Passes all shuffle suites 0c3c91e [Cheng Lian] Fixes shuffle write metrics when merge sort is bypassed ed5df3c [Cheng Lian] Fixes styling changes f75089b [Cheng Lian] Avoids unnecessary defensive copies when sort based shuffle is on --- .../scala/org/apache/spark/sql/execution/Exchange.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index cff7a01269..d7c811ca89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -41,11 +41,21 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una /** We must copy rows when sort based shuffle is on */ protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] + private val bypassMergeThreshold = + child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) + override def execute() = attachTree(this , "execute") { newPartitioning match { case HashPartitioning(expressions, numPartitions) => // TODO: Eliminate redundant expressions in grouping key and value. - val rdd = if (sortBasedShuffleOn) { + // This is a workaround for SPARK-4479. When: + // 1. sort based shuffle is on, and + // 2. the partition number is under the merge threshold, and + // 3. no ordering is required + // we can avoid the defensive copies to improve performance. In the long run, we probably + // want to include information in shuffle dependencies to indicate whether elements in the + // source RDD should be copied. + val rdd = if (sortBasedShuffleOn && numPartitions > bypassMergeThreshold) { child.execute().mapPartitions { iter => val hashExpressions = newMutableProjection(expressions, child.output)() iter.map(r => (hashExpressions(r).copy(), r.copy())) @@ -82,6 +92,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una shuffled.map(_._1) case SinglePartition => + // SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since + // operators like `TakeOrdered` may require an ordering within the partition, and currently + // `SinglePartition` doesn't include ordering information. + // TODO Add `SingleOrderedPartition` for operators like `TakeOrdered` val rdd = if (sortBasedShuffleOn) { child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) } } else { -- cgit v1.2.3