aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-06-18 17:52:42 -0700
committerReynold Xin <rxin@apache.org>2014-06-18 17:52:42 -0700
commit5ff75c748a27bcfae71759d0e509218f0c5d0200 (patch)
tree3ac6911fbb07ab801940f1dbc5a1f0e11bf93c59 /sql
parent45a95f82caea55a8616141444285faf58fef128b (diff)
downloadspark-5ff75c748a27bcfae71759d0e509218f0c5d0200.tar.gz
spark-5ff75c748a27bcfae71759d0e509218f0c5d0200.tar.bz2
spark-5ff75c748a27bcfae71759d0e509218f0c5d0200.zip
[SPARK-2184][SQL] AddExchange isn't idempotent
...redPartitioning. Author: Michael Armbrust <michael@databricks.com> Closes #1122 from marmbrus/fixAddExchange and squashes the following commits: 3417537 [Michael Armbrust] Don't bind partitioning expressions as that breaks comparison with requiredPartitioning.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala8
3 files changed, 9 insertions, 6 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
index 4ebf6c4584..655d4a08fe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
@@ -68,7 +68,7 @@ class BindReferences[TreeNode <: QueryPlan[TreeNode]] extends Rule[TreeNode] {
}
object BindReferences extends Logging {
- def bindReference(expression: Expression, input: Seq[Attribute]): Expression = {
+ def bindReference[A <: Expression](expression: A, input: Seq[Attribute]): A = {
expression.transform { case a: AttributeReference =>
attachTree(a, "Binding attribute") {
val ordinal = input.indexWhere(_.exprId == a.exprId)
@@ -83,6 +83,6 @@ object BindReferences extends Logging {
BoundReference(ordinal, a)
}
}
- }
+ }.asInstanceOf[A] // Kind of a hack, but safe. TODO: Tighten return type when possible.
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
index 77b5429bad..74ae723686 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
@@ -208,6 +208,9 @@ class GenericMutableRow(size: Int) extends GenericRow(size) with MutableRow {
class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] {
+ def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
+ this(ordering.map(BindReferences.bindReference(_, inputSchema)))
+
def compare(a: Row, b: Row): Int = {
var i = 0
while (i < ordering.size) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index cef294167f..05dfb85b38 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -22,7 +22,7 @@ import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf}
import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.sql.{SQLConf, SQLContext, Row}
import org.apache.spark.sql.catalyst.errors.attachTree
-import org.apache.spark.sql.catalyst.expressions.{MutableProjection, RowOrdering}
+import org.apache.spark.sql.catalyst.expressions.{NoBind, MutableProjection, RowOrdering}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.util.MutablePair
@@ -31,7 +31,7 @@ import org.apache.spark.util.MutablePair
* :: DeveloperApi ::
*/
@DeveloperApi
-case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode {
+case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode with NoBind {
override def outputPartitioning = newPartitioning
@@ -42,7 +42,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
val rdd = child.execute().mapPartitions { iter =>
- val hashExpressions = new MutableProjection(expressions)
+ val hashExpressions = new MutableProjection(expressions, child.output)
val mutablePair = new MutablePair[Row, Row]()
iter.map(r => mutablePair.update(hashExpressions(r), r))
}
@@ -53,7 +53,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
case RangePartitioning(sortingExpressions, numPartitions) =>
// TODO: RangePartitioner should take an Ordering.
- implicit val ordering = new RowOrdering(sortingExpressions)
+ implicit val ordering = new RowOrdering(sortingExpressions, child.output)
val rdd = child.execute().mapPartitions { iter =>
val mutablePair = new MutablePair[Row, Null](null, null)