diff options
author | Cheng Lian <lian@databricks.com> | 2014-11-24 12:43:45 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-11-24 12:43:45 -0800 |
commit | a6d7b61f92dc7c1f9632cecb232afa8040ab2b4d (patch) | |
tree | 2320e56514ce984dacd929a8d3322f090f11c6d0 /sql/core | |
parent | 29372b63185a4a170178b6ec2362d7112f389852 (diff) | |
download | spark-a6d7b61f92dc7c1f9632cecb232afa8040ab2b4d.tar.gz spark-a6d7b61f92dc7c1f9632cecb232afa8040ab2b4d.tar.bz2 spark-a6d7b61f92dc7c1f9632cecb232afa8040ab2b4d.zip |
[SPARK-4479][SQL] Avoids unnecessary defensive copies when sort based shuffle is on
This PR is a workaround for SPARK-4479. Two changes are introduced: when merge sort is bypassed in `ExternalSorter`,
1. also bypass RDD elements buffering as buffering is the reason that `MutableRow` backed row objects must be copied, and
2. avoids defensive copies in `Exchange` operator
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3422)
<!-- Reviewable:end -->
Author: Cheng Lian <lian@databricks.com>
Closes #3422 from liancheng/avoids-defensive-copies and squashes the following commits:
591f2e9 [Cheng Lian] Passes all shuffle suites
0c3c91e [Cheng Lian] Fixes shuffle write metrics when merge sort is bypassed
ed5df3c [Cheng Lian] Fixes styling changes
f75089b [Cheng Lian] Avoids unnecessary defensive copies when sort based shuffle is on
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala | 16 |
1 files changed, 15 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index cff7a01269..d7c811ca89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -41,11 +41,21 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una /** We must copy rows when sort based shuffle is on */ protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] + private val bypassMergeThreshold = + child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) + override def execute() = attachTree(this , "execute") { newPartitioning match { case HashPartitioning(expressions, numPartitions) => // TODO: Eliminate redundant expressions in grouping key and value. - val rdd = if (sortBasedShuffleOn) { + // This is a workaround for SPARK-4479. When: + // 1. sort based shuffle is on, and + // 2. the partition number is under the merge threshold, and + // 3. no ordering is required + // we can avoid the defensive copies to improve performance. In the long run, we probably + // want to include information in shuffle dependencies to indicate whether elements in the + // source RDD should be copied. + val rdd = if (sortBasedShuffleOn && numPartitions > bypassMergeThreshold) { child.execute().mapPartitions { iter => val hashExpressions = newMutableProjection(expressions, child.output)() iter.map(r => (hashExpressions(r).copy(), r.copy())) @@ -82,6 +92,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una shuffled.map(_._1) case SinglePartition => + // SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since + // operators like `TakeOrdered` may require an ordering within the partition, and currently + // `SinglePartition` doesn't include ordering information. + // TODO Add `SingleOrderedPartition` for operators like `TakeOrdered` val rdd = if (sortBasedShuffleOn) { child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) } } else { |