From 1b9001f78d96faefff02b846b169c249d9e4d612 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 6 Sep 2014 00:33:00 -0700 Subject: [SPARK-3409][SQL] Avoid pulling in Exchange operator itself in Exchange's closures. This is a tiny teeny optimization to move the if check of sortBasedShuffledOn to outside the closures so the closures don't need to pull in the entire Exchange operator object. Author: Reynold Xin Closes #2282 from rxin/SPARK-3409 and squashes the following commits: 1de3f88 [Reynold Xin] [SPARK-3409][SQL] Avoid pulling in Exchange operator itself in Exchange's closures. --- .../org/apache/spark/sql/execution/Exchange.scala | 43 +++++++++++----------- 1 file changed, 21 insertions(+), 22 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 4802e40595..927f40063e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -36,25 +36,23 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una override def outputPartitioning = newPartitioning - def output = child.output + override def output = child.output /** We must copy rows when sort based shuffle is on */ protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] - def execute() = attachTree(this , "execute") { + override def execute() = attachTree(this , "execute") { newPartitioning match { case HashPartitioning(expressions, numPartitions) => // TODO: Eliminate redundant expressions in grouping key and value. - val rdd = child.execute().mapPartitions { iter => - if (sortBasedShuffleOn) { - @transient val hashExpressions = - newProjection(expressions, child.output) - + val rdd = if (sortBasedShuffleOn) { + child.execute().mapPartitions { iter => + val hashExpressions = newProjection(expressions, child.output) iter.map(r => (hashExpressions(r), r.copy())) - } else { - @transient val hashExpressions = - newMutableProjection(expressions, child.output)() - + } + } else { + child.execute().mapPartitions { iter => + val hashExpressions = newMutableProjection(expressions, child.output)() val mutablePair = new MutablePair[Row, Row]() iter.map(r => mutablePair.update(hashExpressions(r), r)) } @@ -65,17 +63,18 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una shuffled.map(_._2) case RangePartitioning(sortingExpressions, numPartitions) => - // TODO: RangePartitioner should take an Ordering. - implicit val ordering = new RowOrdering(sortingExpressions, child.output) - - val rdd = child.execute().mapPartitions { iter => - if (sortBasedShuffleOn) { - iter.map(row => (row.copy(), null)) - } else { + val rdd = if (sortBasedShuffleOn) { + child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null))} + } else { + child.execute().mapPartitions { iter => val mutablePair = new MutablePair[Row, Null](null, null) iter.map(row => mutablePair.update(row, null)) } } + + // TODO: RangePartitioner should take an Ordering. + implicit val ordering = new RowOrdering(sortingExpressions, child.output) + val part = new RangePartitioner(numPartitions, rdd, ascending = true) val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part) shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false))) @@ -83,10 +82,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una shuffled.map(_._1) case SinglePartition => - val rdd = child.execute().mapPartitions { iter => - if (sortBasedShuffleOn) { - iter.map(r => (null, r.copy())) - } else { + val rdd = if (sortBasedShuffleOn) { + child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) } + } else { + child.execute().mapPartitions { iter => val mutablePair = new MutablePair[Null, Row]() iter.map(r => mutablePair.update(null, r)) } -- cgit v1.2.3