aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-08-20 15:51:14 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-20 15:51:14 -0700
commita2e658dcdab614058eefcf50ae2d419ece9b1fe7 (patch)
tree1fd8b0dd3cea6d9623c37f1aac5842a32114705b
parentfb60bec34e0b20ae95b4b865a79744916e0a5737 (diff)
downloadspark-a2e658dcdab614058eefcf50ae2d419ece9b1fe7.tar.gz
spark-a2e658dcdab614058eefcf50ae2d419ece9b1fe7.tar.bz2
spark-a2e658dcdab614058eefcf50ae2d419ece9b1fe7.zip
[SPARK-2967][SQL] Fix sort based shuffle for spark sql.
Add explicit row copies when sort based shuffle is on. Author: Michael Armbrust <michael@databricks.com> Closes #2066 from marmbrus/sortShuffle and squashes the following commits: fcd7bb2 [Michael Armbrust] Fix sort based shuffle for spark sql.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala30
1 files changed, 23 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 77dc2ad733..09c34b7059 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -18,7 +18,8 @@
package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf}
+import org.apache.spark.shuffle.sort.SortShuffleManager
+import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner, SparkConf}
import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.catalyst.errors.attachTree
@@ -37,6 +38,9 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
def output = child.output
+ /** We must copy rows when sort based shuffle is on */
+ protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
+
def execute() = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
@@ -45,8 +49,12 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
@transient val hashExpressions =
newMutableProjection(expressions, child.output)()
- val mutablePair = new MutablePair[Row, Row]()
- iter.map(r => mutablePair.update(hashExpressions(r), r))
+ if (sortBasedShuffleOn) {
+ iter.map(r => (hashExpressions(r), r.copy()))
+ } else {
+ val mutablePair = new MutablePair[Row, Row]()
+ iter.map(r => mutablePair.update(hashExpressions(r), r))
+ }
}
val part = new HashPartitioner(numPartitions)
val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)
@@ -58,8 +66,12 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
implicit val ordering = new RowOrdering(sortingExpressions, child.output)
val rdd = child.execute().mapPartitions { iter =>
- val mutablePair = new MutablePair[Row, Null](null, null)
- iter.map(row => mutablePair.update(row, null))
+ if (sortBasedShuffleOn) {
+ iter.map(row => (row.copy(), null))
+ } else {
+ val mutablePair = new MutablePair[Row, Null](null, null)
+ iter.map(row => mutablePair.update(row, null))
+ }
}
val part = new RangePartitioner(numPartitions, rdd, ascending = true)
val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part)
@@ -69,8 +81,12 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
case SinglePartition =>
val rdd = child.execute().mapPartitions { iter =>
- val mutablePair = new MutablePair[Null, Row]()
- iter.map(r => mutablePair.update(null, r))
+ if (sortBasedShuffleOn) {
+ iter.map(r => (null, r.copy()))
+ } else {
+ val mutablePair = new MutablePair[Null, Row]()
+ iter.map(r => mutablePair.update(null, r))
+ }
}
val partitioner = new HashPartitioner(1)
val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner)