diff options
author | Michael Armbrust <michael@databricks.com> | 2014-08-23 16:21:08 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-08-23 16:21:08 -0700 |
commit | 3519b5e8e55b4530d7f7c0bcab254f863dbfa814 (patch) | |
tree | d8491731bf9614a680a8525e84881430396b4983 /sql | |
parent | 7e191fe29bb09a8560cd75d453c4f7f662dff406 (diff) | |
download | spark-3519b5e8e55b4530d7f7c0bcab254f863dbfa814.tar.gz spark-3519b5e8e55b4530d7f7c0bcab254f863dbfa814.tar.bz2 spark-3519b5e8e55b4530d7f7c0bcab254f863dbfa814.zip |
[SPARK-2967][SQL] Follow-up: Also copy hash expressions in sort based shuffle fix.
Follow-up to #2066
Author: Michael Armbrust <michael@databricks.com>
Closes #2072 from marmbrus/sortShuffle and squashes the following commits:
2ff8114 [Michael Armbrust] Fix bug
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala | 9 |
1 files changed, 6 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 09c34b7059..4802e40595 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -46,12 +46,15 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una case HashPartitioning(expressions, numPartitions) => // TODO: Eliminate redundant expressions in grouping key and value. val rdd = child.execute().mapPartitions { iter => - @transient val hashExpressions = - newMutableProjection(expressions, child.output)() - if (sortBasedShuffleOn) { + @transient val hashExpressions = + newProjection(expressions, child.output) + iter.map(r => (hashExpressions(r), r.copy())) } else { + @transient val hashExpressions = + newMutableProjection(expressions, child.output)() + val mutablePair = new MutablePair[Row, Row]() iter.map(r => mutablePair.update(hashExpressions(r), r)) } |