diff options
author | Reynold Xin <rxin@apache.org> | 2014-09-06 00:33:00 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-09-06 00:33:00 -0700 |
commit | 1b9001f78d96faefff02b846b169c249d9e4d612 (patch) | |
tree | d77ac6d67e09b8f893eb34459a5cfa028eac6c0f /sql | |
parent | 9422c4ee0eaf4a32d2ed7c96799feac2f5f79d40 (diff) | |
download | spark-1b9001f78d96faefff02b846b169c249d9e4d612.tar.gz spark-1b9001f78d96faefff02b846b169c249d9e4d612.tar.bz2 spark-1b9001f78d96faefff02b846b169c249d9e4d612.zip |
[SPARK-3409][SQL] Avoid pulling in Exchange operator itself in Exchange's closures.
This is a tiny teeny optimization to move the if check of sortBasedShuffledOn to outside the closures so the closures don't need to pull in the entire Exchange operator object.
Author: Reynold Xin <rxin@apache.org>
Closes #2282 from rxin/SPARK-3409 and squashes the following commits:
1de3f88 [Reynold Xin] [SPARK-3409][SQL] Avoid pulling in Exchange operator itself in Exchange's closures.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala | 43 |
1 files changed, 21 insertions, 22 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 4802e40595..927f40063e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -36,25 +36,23 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una override def outputPartitioning = newPartitioning - def output = child.output + override def output = child.output /** We must copy rows when sort based shuffle is on */ protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] - def execute() = attachTree(this , "execute") { + override def execute() = attachTree(this , "execute") { newPartitioning match { case HashPartitioning(expressions, numPartitions) => // TODO: Eliminate redundant expressions in grouping key and value. - val rdd = child.execute().mapPartitions { iter => - if (sortBasedShuffleOn) { - @transient val hashExpressions = - newProjection(expressions, child.output) - + val rdd = if (sortBasedShuffleOn) { + child.execute().mapPartitions { iter => + val hashExpressions = newProjection(expressions, child.output) iter.map(r => (hashExpressions(r), r.copy())) - } else { - @transient val hashExpressions = - newMutableProjection(expressions, child.output)() - + } + } else { + child.execute().mapPartitions { iter => + val hashExpressions = newMutableProjection(expressions, child.output)() val mutablePair = new MutablePair[Row, Row]() iter.map(r => mutablePair.update(hashExpressions(r), r)) } @@ -65,17 +63,18 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una shuffled.map(_._2) case RangePartitioning(sortingExpressions, numPartitions) => - // TODO: RangePartitioner should take an Ordering. - implicit val ordering = new RowOrdering(sortingExpressions, child.output) - - val rdd = child.execute().mapPartitions { iter => - if (sortBasedShuffleOn) { - iter.map(row => (row.copy(), null)) - } else { + val rdd = if (sortBasedShuffleOn) { + child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null))} + } else { + child.execute().mapPartitions { iter => val mutablePair = new MutablePair[Row, Null](null, null) iter.map(row => mutablePair.update(row, null)) } } + + // TODO: RangePartitioner should take an Ordering. + implicit val ordering = new RowOrdering(sortingExpressions, child.output) + val part = new RangePartitioner(numPartitions, rdd, ascending = true) val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part) shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false))) @@ -83,10 +82,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una shuffled.map(_._1) case SinglePartition => - val rdd = child.execute().mapPartitions { iter => - if (sortBasedShuffleOn) { - iter.map(r => (null, r.copy())) - } else { + val rdd = if (sortBasedShuffleOn) { + child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) } + } else { + child.execute().mapPartitions { iter => val mutablePair = new MutablePair[Null, Row]() iter.map(r => mutablePair.update(null, r)) } |