aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-09-06 00:33:00 -0700
committerReynold Xin <rxin@apache.org>2014-09-06 00:33:00 -0700
commit1b9001f78d96faefff02b846b169c249d9e4d612 (patch)
treed77ac6d67e09b8f893eb34459a5cfa028eac6c0f /sql
parent9422c4ee0eaf4a32d2ed7c96799feac2f5f79d40 (diff)
downloadspark-1b9001f78d96faefff02b846b169c249d9e4d612.tar.gz
spark-1b9001f78d96faefff02b846b169c249d9e4d612.tar.bz2
spark-1b9001f78d96faefff02b846b169c249d9e4d612.zip
[SPARK-3409][SQL] Avoid pulling in Exchange operator itself in Exchange's closures.
This is a tiny teeny optimization to move the if check of sortBasedShuffledOn to outside the closures so the closures don't need to pull in the entire Exchange operator object. Author: Reynold Xin <rxin@apache.org> Closes #2282 from rxin/SPARK-3409 and squashes the following commits: 1de3f88 [Reynold Xin] [SPARK-3409][SQL] Avoid pulling in Exchange operator itself in Exchange's closures.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala43
1 files changed, 21 insertions, 22 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 4802e40595..927f40063e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -36,25 +36,23 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
override def outputPartitioning = newPartitioning
- def output = child.output
+ override def output = child.output
/** We must copy rows when sort based shuffle is on */
protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
- def execute() = attachTree(this , "execute") {
+ override def execute() = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
- val rdd = child.execute().mapPartitions { iter =>
- if (sortBasedShuffleOn) {
- @transient val hashExpressions =
- newProjection(expressions, child.output)
-
+ val rdd = if (sortBasedShuffleOn) {
+ child.execute().mapPartitions { iter =>
+ val hashExpressions = newProjection(expressions, child.output)
iter.map(r => (hashExpressions(r), r.copy()))
- } else {
- @transient val hashExpressions =
- newMutableProjection(expressions, child.output)()
-
+ }
+ } else {
+ child.execute().mapPartitions { iter =>
+ val hashExpressions = newMutableProjection(expressions, child.output)()
val mutablePair = new MutablePair[Row, Row]()
iter.map(r => mutablePair.update(hashExpressions(r), r))
}
@@ -65,17 +63,18 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
shuffled.map(_._2)
case RangePartitioning(sortingExpressions, numPartitions) =>
- // TODO: RangePartitioner should take an Ordering.
- implicit val ordering = new RowOrdering(sortingExpressions, child.output)
-
- val rdd = child.execute().mapPartitions { iter =>
- if (sortBasedShuffleOn) {
- iter.map(row => (row.copy(), null))
- } else {
+ val rdd = if (sortBasedShuffleOn) {
+ child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null))}
+ } else {
+ child.execute().mapPartitions { iter =>
val mutablePair = new MutablePair[Row, Null](null, null)
iter.map(row => mutablePair.update(row, null))
}
}
+
+ // TODO: RangePartitioner should take an Ordering.
+ implicit val ordering = new RowOrdering(sortingExpressions, child.output)
+
val part = new RangePartitioner(numPartitions, rdd, ascending = true)
val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part)
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
@@ -83,10 +82,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
shuffled.map(_._1)
case SinglePartition =>
- val rdd = child.execute().mapPartitions { iter =>
- if (sortBasedShuffleOn) {
- iter.map(r => (null, r.copy()))
- } else {
+ val rdd = if (sortBasedShuffleOn) {
+ child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) }
+ } else {
+ child.execute().mapPartitions { iter =>
val mutablePair = new MutablePair[Null, Row]()
iter.map(r => mutablePair.update(null, r))
}