aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-08-23 16:21:08 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-23 16:21:30 -0700
commite23f0bc0177a83dfee3f5579ae6eb12033ae5f90 (patch)
tree19de2291c1eb7f294a1a4878d117e020458708d8
parent7112da8fe8d382a1180118f206db78f8e610d83f (diff)
downloadspark-e23f0bc0177a83dfee3f5579ae6eb12033ae5f90.tar.gz
spark-e23f0bc0177a83dfee3f5579ae6eb12033ae5f90.tar.bz2
spark-e23f0bc0177a83dfee3f5579ae6eb12033ae5f90.zip
[SPARK-2967][SQL] Follow-up: Also copy hash expressions in sort based shuffle fix.
Follow-up to #2066 Author: Michael Armbrust <michael@databricks.com> Closes #2072 from marmbrus/sortShuffle and squashes the following commits: 2ff8114 [Michael Armbrust] Fix bug (cherry picked from commit 3519b5e8e55b4530d7f7c0bcab254f863dbfa814) Signed-off-by: Michael Armbrust <michael@databricks.com>
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala9
1 files changed, 6 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 09c34b7059..4802e40595 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -46,12 +46,15 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
val rdd = child.execute().mapPartitions { iter =>
- @transient val hashExpressions =
- newMutableProjection(expressions, child.output)()
-
if (sortBasedShuffleOn) {
+ @transient val hashExpressions =
+ newProjection(expressions, child.output)
+
iter.map(r => (hashExpressions(r), r.copy()))
} else {
+ @transient val hashExpressions =
+ newMutableProjection(expressions, child.output)()
+
val mutablePair = new MutablePair[Row, Row]()
iter.map(r => mutablePair.update(hashExpressions(r), r))
}