From 5ff75c748a27bcfae71759d0e509218f0c5d0200 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 18 Jun 2014 17:52:42 -0700 Subject: [SPARK-2184][SQL] AddExchange isn't idempotent ...redPartitioning. Author: Michael Armbrust Closes #1122 from marmbrus/fixAddExchange and squashes the following commits: 3417537 [Michael Armbrust] Don't bind partitioning expressions as that breaks comparison with requiredPartitioning. --- .../apache/spark/sql/catalyst/expressions/BoundAttribute.scala | 4 ++-- .../scala/org/apache/spark/sql/catalyst/expressions/Row.scala | 3 +++ .../src/main/scala/org/apache/spark/sql/execution/Exchange.scala | 8 ++++---- 3 files changed, 9 insertions(+), 6 deletions(-) (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index 4ebf6c4584..655d4a08fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -68,7 +68,7 @@ class BindReferences[TreeNode <: QueryPlan[TreeNode]] extends Rule[TreeNode] { } object BindReferences extends Logging { - def bindReference(expression: Expression, input: Seq[Attribute]): Expression = { + def bindReference[A <: Expression](expression: A, input: Seq[Attribute]): A = { expression.transform { case a: AttributeReference => attachTree(a, "Binding attribute") { val ordinal = input.indexWhere(_.exprId == a.exprId) @@ -83,6 +83,6 @@ object BindReferences extends Logging { BoundReference(ordinal, a) } } - } + }.asInstanceOf[A] // Kind of a hack, but safe. TODO: Tighten return type when possible. } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala index 77b5429bad..74ae723686 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala @@ -208,6 +208,9 @@ class GenericMutableRow(size: Int) extends GenericRow(size) with MutableRow { class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] { + def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) = + this(ordering.map(BindReferences.bindReference(_, inputSchema))) + def compare(a: Row, b: Row): Int = { var i = 0 while (i < ordering.size) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index cef294167f..05dfb85b38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -22,7 +22,7 @@ import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf} import org.apache.spark.rdd.ShuffledRDD import org.apache.spark.sql.{SQLConf, SQLContext, Row} import org.apache.spark.sql.catalyst.errors.attachTree -import org.apache.spark.sql.catalyst.expressions.{MutableProjection, RowOrdering} +import org.apache.spark.sql.catalyst.expressions.{NoBind, MutableProjection, RowOrdering} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.util.MutablePair @@ -31,7 +31,7 @@ import org.apache.spark.util.MutablePair * :: DeveloperApi :: */ @DeveloperApi -case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode { +case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode with NoBind { override def outputPartitioning = newPartitioning @@ -42,7 +42,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una case HashPartitioning(expressions, numPartitions) => // TODO: Eliminate redundant expressions in grouping key and value. val rdd = child.execute().mapPartitions { iter => - val hashExpressions = new MutableProjection(expressions) + val hashExpressions = new MutableProjection(expressions, child.output) val mutablePair = new MutablePair[Row, Row]() iter.map(r => mutablePair.update(hashExpressions(r), r)) } @@ -53,7 +53,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una case RangePartitioning(sortingExpressions, numPartitions) => // TODO: RangePartitioner should take an Ordering. - implicit val ordering = new RowOrdering(sortingExpressions) + implicit val ordering = new RowOrdering(sortingExpressions, child.output) val rdd = child.execute().mapPartitions { iter => val mutablePair = new MutablePair[Row, Null](null, null) -- cgit v1.2.3