aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2014-11-24 12:43:45 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-24 12:43:45 -0800
commita6d7b61f92dc7c1f9632cecb232afa8040ab2b4d (patch)
tree2320e56514ce984dacd929a8d3322f090f11c6d0 /sql
parent29372b63185a4a170178b6ec2362d7112f389852 (diff)
downloadspark-a6d7b61f92dc7c1f9632cecb232afa8040ab2b4d.tar.gz
spark-a6d7b61f92dc7c1f9632cecb232afa8040ab2b4d.tar.bz2
spark-a6d7b61f92dc7c1f9632cecb232afa8040ab2b4d.zip
[SPARK-4479][SQL] Avoids unnecessary defensive copies when sort based shuffle is on
This PR is a workaround for SPARK-4479. Two changes are introduced: when merge sort is bypassed in `ExternalSorter`, 1. also bypass RDD elements buffering as buffering is the reason that `MutableRow` backed row objects must be copied, and 2. avoids defensive copies in `Exchange` operator <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3422) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #3422 from liancheng/avoids-defensive-copies and squashes the following commits: 591f2e9 [Cheng Lian] Passes all shuffle suites 0c3c91e [Cheng Lian] Fixes shuffle write metrics when merge sort is bypassed ed5df3c [Cheng Lian] Fixes styling changes f75089b [Cheng Lian] Avoids unnecessary defensive copies when sort based shuffle is on
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala16
1 files changed, 15 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index cff7a01269..d7c811ca89 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -41,11 +41,21 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
/** We must copy rows when sort based shuffle is on */
protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
+ private val bypassMergeThreshold =
+ child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
+
override def execute() = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
- val rdd = if (sortBasedShuffleOn) {
+ // This is a workaround for SPARK-4479. When:
+ // 1. sort based shuffle is on, and
+ // 2. the partition number is under the merge threshold, and
+ // 3. no ordering is required
+ // we can avoid the defensive copies to improve performance. In the long run, we probably
+ // want to include information in shuffle dependencies to indicate whether elements in the
+ // source RDD should be copied.
+ val rdd = if (sortBasedShuffleOn && numPartitions > bypassMergeThreshold) {
child.execute().mapPartitions { iter =>
val hashExpressions = newMutableProjection(expressions, child.output)()
iter.map(r => (hashExpressions(r).copy(), r.copy()))
@@ -82,6 +92,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
shuffled.map(_._1)
case SinglePartition =>
+ // SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since
+ // operators like `TakeOrdered` may require an ordering within the partition, and currently
+ // `SinglePartition` doesn't include ordering information.
+ // TODO Add `SingleOrderedPartition` for operators like `TakeOrdered`
val rdd = if (sortBasedShuffleOn) {
child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) }
} else {