aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main/scala/org/apache
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-10-13 13:46:34 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-13 13:46:34 -0700
commit371321cadee7df39258bd374eb59c1e32451d96b (patch)
tree6b5fae061f90f4acb94ed8e58554ec91cccc88a3 /sql/catalyst/src/main/scala/org/apache
parente10d71e7e58bf2ec0f1942cb2f0602396ab866b4 (diff)
downloadspark-371321cadee7df39258bd374eb59c1e32451d96b.tar.gz
spark-371321cadee7df39258bd374eb59c1e32451d96b.tar.bz2
spark-371321cadee7df39258bd374eb59c1e32451d96b.zip
[SQL] Add type checking debugging functions
Adds some functions that were very useful when trying to track down the bug from #2656. This change also changes the tree output for query plans to include the `'` prefix to unresolved nodes and `!` prefix to nodes that refer to non-existent attributes. Author: Michael Armbrust <michael@databricks.com> Closes #2657 from marmbrus/debugging and squashes the following commits: 654b926 [Michael Armbrust] Clean-up, add tests 763af15 [Michael Armbrust] Add typeChecking debugging functions 8c69303 [Michael Armbrust] Add inputSet, references to QueryPlan. Improve tree string with a prefix to denote invalid or unresolved nodes. fbeab54 [Michael Armbrust] Better toString, factories for AttributeSet.
Diffstat (limited to 'sql/catalyst/src/main/scala/org/apache')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala23
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala23
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala5
6 files changed, 45 insertions, 20 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
index c3a08bbdb6..2b4969b7cf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
@@ -17,19 +17,26 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.analysis.Star
+
protected class AttributeEquals(val a: Attribute) {
override def hashCode() = a.exprId.hashCode()
- override def equals(other: Any) = other match {
- case otherReference: AttributeEquals => a.exprId == otherReference.a.exprId
- case otherAttribute => false
+ override def equals(other: Any) = (a, other.asInstanceOf[AttributeEquals].a) match {
+ case (a1: AttributeReference, a2: AttributeReference) => a1.exprId == a2.exprId
+ case (a1, a2) => a1 == a2
}
}
object AttributeSet {
- /** Constructs a new [[AttributeSet]] given a sequence of [[Attribute Attributes]]. */
- def apply(baseSet: Seq[Attribute]) = {
- new AttributeSet(baseSet.map(new AttributeEquals(_)).toSet)
- }
+ def apply(a: Attribute) =
+ new AttributeSet(Set(new AttributeEquals(a)))
+
+ /** Constructs a new [[AttributeSet]] given a sequence of [[Expression Expressions]]. */
+ def apply(baseSet: Seq[Expression]) =
+ new AttributeSet(
+ baseSet
+ .flatMap(_.references)
+ .map(new AttributeEquals(_)).toSet)
}
/**
@@ -103,4 +110,6 @@ class AttributeSet private (val baseSet: Set[AttributeEquals])
// We must force toSeq to not be strict otherwise we end up with a [[Stream]] that captures all
// sorts of things in its closure.
override def toSeq: Seq[Attribute] = baseSet.map(_.a).toArray.toSeq
+
+ override def toString = "{" + baseSet.map(_.a).mkString(", ") + "}"
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index 204904ecf0..e7e81a21fd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -39,6 +39,8 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection {
}
new GenericRow(outputArray)
}
+
+ override def toString = s"Row => [${exprArray.mkString(",")}]"
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index e5a958d599..d023db44d8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -57,6 +57,8 @@ abstract class NamedExpression extends Expression {
abstract class Attribute extends NamedExpression {
self: Product =>
+ override def references = AttributeSet(this)
+
def withNullability(newNullability: Boolean): Attribute
def withQualifiers(newQualifiers: Seq[String]): Attribute
def withName(newName: String): Attribute
@@ -116,8 +118,6 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
(val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil)
extends Attribute with trees.LeafNode[Expression] {
- override def references = AttributeSet(this :: Nil)
-
override def equals(other: Any) = other match {
case ar: AttributeReference => exprId == ar.exprId && dataType == ar.dataType
case _ => false
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index af9e4d86e9..dcbbb62c0a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -32,6 +32,25 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
def outputSet: AttributeSet = AttributeSet(output)
/**
+ * All Attributes that appear in expressions from this operator. Note that this set does not
+ * include attributes that are implicitly referenced by being passed through to the output tuple.
+ */
+ def references: AttributeSet = AttributeSet(expressions.flatMap(_.references))
+
+ /**
+ * The set of all attributes that are input to this operator by its children.
+ */
+ def inputSet: AttributeSet =
+ AttributeSet(children.flatMap(_.asInstanceOf[QueryPlan[PlanType]].output))
+
+ /**
+ * Attributes that are referenced by expressions but not provided by this nodes children.
+ * Subclasses should override this method if they produce attributes internally as it is used by
+ * assertions designed to prevent the construction of invalid plans.
+ */
+ def missingInput: AttributeSet = references -- inputSet
+
+ /**
* Runs [[transform]] with `rule` on all expressions present in this query operator.
* Users should not expect a specific directionality. If a specific directionality is needed,
* transformExpressionsDown or transformExpressionsUp should be used.
@@ -132,4 +151,8 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
/** Prints out the schema in the tree format */
def printSchema(): Unit = println(schemaString)
+
+ protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else ""
+
+ override def simpleString = statePrefix + super.simpleString
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 4f8ad8a7e0..882e9c6110 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -54,12 +54,6 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
}
/**
- * Returns the set of attributes that this node takes as
- * input from its children.
- */
- lazy val inputSet: AttributeSet = AttributeSet(children.flatMap(_.output))
-
- /**
* Returns true if this expression and all its children have been resolved to a specific schema
* and false if it still contains any unresolved placeholders. Implementations of LogicalPlan
* can override this (e.g.
@@ -68,6 +62,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
*/
lazy val resolved: Boolean = !expressions.exists(!_.resolved) && childrenResolved
+ override protected def statePrefix = if (!resolved) "'" else super.statePrefix
+
/**
* Returns true if all its children of this query plan have been resolved.
*/
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index f8e9930ac2..14b03c7445 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -138,11 +138,6 @@ case class Aggregate(
child: LogicalPlan)
extends UnaryNode {
- /** The set of all AttributeReferences required for this aggregation. */
- def references =
- AttributeSet(
- groupingExpressions.flatMap(_.references) ++ aggregateExpressions.flatMap(_.references))
-
override def output = aggregateExpressions.map(_.toAttribute)
}