diff options
author | Michael Armbrust <michael@databricks.com> | 2014-08-20 15:51:14 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-08-20 15:51:14 -0700 |
commit | a2e658dcdab614058eefcf50ae2d419ece9b1fe7 (patch) | |
tree | 1fd8b0dd3cea6d9623c37f1aac5842a32114705b | |
parent | fb60bec34e0b20ae95b4b865a79744916e0a5737 (diff) | |
download | spark-a2e658dcdab614058eefcf50ae2d419ece9b1fe7.tar.gz spark-a2e658dcdab614058eefcf50ae2d419ece9b1fe7.tar.bz2 spark-a2e658dcdab614058eefcf50ae2d419ece9b1fe7.zip |
[SPARK-2967][SQL] Fix sort based shuffle for spark sql.
Add explicit row copies when sort based shuffle is on.
Author: Michael Armbrust <michael@databricks.com>
Closes #2066 from marmbrus/sortShuffle and squashes the following commits:
fcd7bb2 [Michael Armbrust] Fix sort based shuffle for spark sql.
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala | 30 |
1 files changed, 23 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 77dc2ad733..09c34b7059 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf} +import org.apache.spark.shuffle.sort.SortShuffleManager +import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner, SparkConf} import org.apache.spark.rdd.ShuffledRDD import org.apache.spark.sql.{SQLContext, Row} import org.apache.spark.sql.catalyst.errors.attachTree @@ -37,6 +38,9 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una def output = child.output + /** We must copy rows when sort based shuffle is on */ + protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] + def execute() = attachTree(this , "execute") { newPartitioning match { case HashPartitioning(expressions, numPartitions) => @@ -45,8 +49,12 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una @transient val hashExpressions = newMutableProjection(expressions, child.output)() - val mutablePair = new MutablePair[Row, Row]() - iter.map(r => mutablePair.update(hashExpressions(r), r)) + if (sortBasedShuffleOn) { + iter.map(r => (hashExpressions(r), r.copy())) + } else { + val mutablePair = new MutablePair[Row, Row]() + iter.map(r => mutablePair.update(hashExpressions(r), r)) + } } val part = new HashPartitioner(numPartitions) val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part) @@ -58,8 +66,12 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una implicit val ordering = new RowOrdering(sortingExpressions, child.output) val rdd = child.execute().mapPartitions { iter => - val mutablePair = new MutablePair[Row, Null](null, null) - iter.map(row => mutablePair.update(row, null)) + if (sortBasedShuffleOn) { + iter.map(row => (row.copy(), null)) + } else { + val mutablePair = new MutablePair[Row, Null](null, null) + iter.map(row => mutablePair.update(row, null)) + } } val part = new RangePartitioner(numPartitions, rdd, ascending = true) val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part) @@ -69,8 +81,12 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una case SinglePartition => val rdd = child.execute().mapPartitions { iter => - val mutablePair = new MutablePair[Null, Row]() - iter.map(r => mutablePair.update(null, r)) + if (sortBasedShuffleOn) { + iter.map(r => (null, r.copy())) + } else { + val mutablePair = new MutablePair[Null, Row]() + iter.map(r => mutablePair.update(null, r)) + } } val partitioner = new HashPartitioner(1) val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner) |