aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-04-07 18:39:18 -0700
committerReynold Xin <rxin@apache.org>2014-04-07 18:39:18 -0700
commit55dfd5dcdbf3a9bfddb2108c8325bda3100eb33d (patch)
tree080b57149fd8e6ea2ee69a2b308d8a817776c697 /sql/catalyst
parent14c9238aa7173ba663a999ef320d8cffb73306c4 (diff)
downloadspark-55dfd5dcdbf3a9bfddb2108c8325bda3100eb33d.tar.gz
spark-55dfd5dcdbf3a9bfddb2108c8325bda3100eb33d.tar.bz2
spark-55dfd5dcdbf3a9bfddb2108c8325bda3100eb33d.zip
Removed the default eval implementation from Expression, and added a bunch of override's in classes I touched.
It is more robust to not provide a default implementation for Expression's. Author: Reynold Xin <rxin@apache.org> Closes #350 from rxin/eval-default and squashes the following commits: 0a83b8f [Reynold Xin] Removed the default eval implementation from Expression, and added a bunch of override's in classes I touched.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala52
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala11
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala21
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala32
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala2
8 files changed, 89 insertions, 45 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 41e9bcef3c..d629172a74 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -18,7 +18,8 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.{errors, trees}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
import org.apache.spark.sql.catalyst.trees.TreeNode
@@ -36,7 +37,7 @@ case class UnresolvedRelation(
databaseName: Option[String],
tableName: String,
alias: Option[String] = None) extends BaseRelation {
- def output = Nil
+ override def output = Nil
override lazy val resolved = false
}
@@ -44,26 +45,33 @@ case class UnresolvedRelation(
* Holds the name of an attribute that has yet to be resolved.
*/
case class UnresolvedAttribute(name: String) extends Attribute with trees.LeafNode[Expression] {
- def exprId = throw new UnresolvedException(this, "exprId")
- def dataType = throw new UnresolvedException(this, "dataType")
- def nullable = throw new UnresolvedException(this, "nullable")
- def qualifiers = throw new UnresolvedException(this, "qualifiers")
+ override def exprId = throw new UnresolvedException(this, "exprId")
+ override def dataType = throw new UnresolvedException(this, "dataType")
+ override def nullable = throw new UnresolvedException(this, "nullable")
+ override def qualifiers = throw new UnresolvedException(this, "qualifiers")
override lazy val resolved = false
- def newInstance = this
- def withQualifiers(newQualifiers: Seq[String]) = this
+ override def newInstance = this
+ override def withQualifiers(newQualifiers: Seq[String]) = this
+
+ // Unresolved attributes are transient at compile time and don't get evaluated during execution.
+ override def eval(input: Row = null): EvaluatedType =
+ throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
override def toString: String = s"'$name"
}
case class UnresolvedFunction(name: String, children: Seq[Expression]) extends Expression {
- def exprId = throw new UnresolvedException(this, "exprId")
- def dataType = throw new UnresolvedException(this, "dataType")
+ override def dataType = throw new UnresolvedException(this, "dataType")
override def foldable = throw new UnresolvedException(this, "foldable")
- def nullable = throw new UnresolvedException(this, "nullable")
- def qualifiers = throw new UnresolvedException(this, "qualifiers")
- def references = children.flatMap(_.references).toSet
+ override def nullable = throw new UnresolvedException(this, "nullable")
+ override def references = children.flatMap(_.references).toSet
override lazy val resolved = false
+
+ // Unresolved functions are transient at compile time and don't get evaluated during execution.
+ override def eval(input: Row = null): EvaluatedType =
+ throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
+
override def toString = s"'$name(${children.mkString(",")})"
}
@@ -79,15 +87,15 @@ case class Star(
mapFunction: Attribute => Expression = identity[Attribute])
extends Attribute with trees.LeafNode[Expression] {
- def name = throw new UnresolvedException(this, "exprId")
- def exprId = throw new UnresolvedException(this, "exprId")
- def dataType = throw new UnresolvedException(this, "dataType")
- def nullable = throw new UnresolvedException(this, "nullable")
- def qualifiers = throw new UnresolvedException(this, "qualifiers")
+ override def name = throw new UnresolvedException(this, "exprId")
+ override def exprId = throw new UnresolvedException(this, "exprId")
+ override def dataType = throw new UnresolvedException(this, "dataType")
+ override def nullable = throw new UnresolvedException(this, "nullable")
+ override def qualifiers = throw new UnresolvedException(this, "qualifiers")
override lazy val resolved = false
- def newInstance = this
- def withQualifiers(newQualifiers: Seq[String]) = this
+ override def newInstance = this
+ override def withQualifiers(newQualifiers: Seq[String]) = this
def expand(input: Seq[Attribute]): Seq[NamedExpression] = {
val expandedAttributes: Seq[Attribute] = table match {
@@ -104,5 +112,9 @@ case class Star(
mappedAttributes
}
+ // Star gets expanded at runtime so we never evaluate a Star.
+ override def eval(input: Row = null): EvaluatedType =
+ throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
+
override def toString = table.map(_ + ".").getOrElse("") + "*"
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index f190bd0cca..8a1db8e796 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -50,8 +50,7 @@ abstract class Expression extends TreeNode[Expression] {
def references: Set[Attribute]
/** Returns the result of evaluating this expression on a given input Row */
- def eval(input: Row = null): EvaluatedType =
- throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
+ def eval(input: Row = null): EvaluatedType
/**
* Returns `true` if this expression and all its children have been resolved to a specific schema
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index d5d93778f4..08b2f11d20 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
+
abstract sealed class SortDirection
case object Ascending extends SortDirection
case object Descending extends SortDirection
@@ -26,7 +28,12 @@ case object Descending extends SortDirection
* transformations over expression will descend into its child.
*/
case class SortOrder(child: Expression, direction: SortDirection) extends UnaryExpression {
- def dataType = child.dataType
- def nullable = child.nullable
+ override def dataType = child.dataType
+ override def nullable = child.nullable
+
+ // SortOrder itself is never evaluated.
+ override def eval(input: Row = null): EvaluatedType =
+ throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
+
override def toString = s"$child ${if (direction == Ascending) "ASC" else "DESC"}"
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
index 5edcea1427..b152f95f96 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.trees
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
abstract class AggregateExpression extends Expression {
self: Product =>
@@ -28,6 +29,13 @@ abstract class AggregateExpression extends Expression {
* of input rows/
*/
def newInstance(): AggregateFunction
+
+ /**
+ * [[AggregateExpression.eval]] should never be invoked because [[AggregateExpression]]'s are
+ * replaced with a physical aggregate operator at runtime.
+ */
+ override def eval(input: Row = null): EvaluatedType =
+ throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index eb4bc8e755..a8145c37c2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.types._
object NamedExpression {
@@ -58,9 +59,9 @@ abstract class Attribute extends NamedExpression {
def withQualifiers(newQualifiers: Seq[String]): Attribute
- def references = Set(this)
def toAttribute = this
def newInstance: Attribute
+ override def references = Set(this)
}
/**
@@ -77,15 +78,15 @@ case class Alias(child: Expression, name: String)
(val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil)
extends NamedExpression with trees.UnaryNode[Expression] {
- type EvaluatedType = Any
+ override type EvaluatedType = Any
override def eval(input: Row) = child.eval(input)
- def dataType = child.dataType
- def nullable = child.nullable
- def references = child.references
+ override def dataType = child.dataType
+ override def nullable = child.nullable
+ override def references = child.references
- def toAttribute = {
+ override def toAttribute = {
if (resolved) {
AttributeReference(name, child.dataType, child.nullable)(exprId, qualifiers)
} else {
@@ -127,7 +128,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
h
}
- def newInstance = AttributeReference(name, dataType, nullable)(qualifiers = qualifiers)
+ override def newInstance = AttributeReference(name, dataType, nullable)(qualifiers = qualifiers)
/**
* Returns a copy of this [[AttributeReference]] with changed nullability.
@@ -143,7 +144,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
/**
* Returns a copy of this [[AttributeReference]] with new qualifiers.
*/
- def withQualifiers(newQualifiers: Seq[String]) = {
+ override def withQualifiers(newQualifiers: Seq[String]) = {
if (newQualifiers == qualifiers) {
this
} else {
@@ -151,5 +152,9 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
}
}
+ // Unresolved attributes are transient at compile time and don't get evaluated during execution.
+ override def eval(input: Row = null): EvaluatedType =
+ throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
+
override def toString: String = s"$name#${exprId.id}$typeSuffix"
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 8893744eb2..ffb3a92f8f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.catalyst.plans.physical
-import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder}
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import org.apache.spark.sql.catalyst.expressions.{Expression, Row, SortOrder}
import org.apache.spark.sql.catalyst.types.IntegerType
/**
@@ -139,12 +140,12 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
extends Expression
with Partitioning {
- def children = expressions
- def references = expressions.flatMap(_.references).toSet
- def nullable = false
- def dataType = IntegerType
+ override def children = expressions
+ override def references = expressions.flatMap(_.references).toSet
+ override def nullable = false
+ override def dataType = IntegerType
- lazy val clusteringSet = expressions.toSet
+ private[this] lazy val clusteringSet = expressions.toSet
override def satisfies(required: Distribution): Boolean = required match {
case UnspecifiedDistribution => true
@@ -158,6 +159,9 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
case h: HashPartitioning if h == this => true
case _ => false
}
+
+ override def eval(input: Row = null): EvaluatedType =
+ throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}
/**
@@ -168,17 +172,20 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
* partition.
* - Each partition will have a `min` and `max` row, relative to the given ordering. All rows
* that are in between `min` and `max` in this `ordering` will reside in this partition.
+ *
+ * This class extends expression primarily so that transformations over expression will descend
+ * into its child.
*/
case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
extends Expression
with Partitioning {
- def children = ordering
- def references = ordering.flatMap(_.references).toSet
- def nullable = false
- def dataType = IntegerType
+ override def children = ordering
+ override def references = ordering.flatMap(_.references).toSet
+ override def nullable = false
+ override def dataType = IntegerType
- lazy val clusteringSet = ordering.map(_.child).toSet
+ private[this] lazy val clusteringSet = ordering.map(_.child).toSet
override def satisfies(required: Distribution): Boolean = required match {
case UnspecifiedDistribution => true
@@ -195,4 +202,7 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
case r: RangePartitioning if r == this => true
case _ => false
}
+
+ override def eval(input: Row): EvaluatedType =
+ throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
index 92987405aa..31be6c4ef1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
@@ -100,7 +100,10 @@ class ExpressionEvaluationSuite extends FunSuite {
(null, false, null) ::
(null, null, null) :: Nil)
- def booleanLogicTest(name: String, op: (Expression, Expression) => Expression, truthTable: Seq[(Any, Any, Any)]) {
+ def booleanLogicTest(
+ name: String,
+ op: (Expression, Expression) => Expression,
+ truthTable: Seq[(Any, Any, Any)]) {
test(s"3VL $name") {
truthTable.foreach {
case (l,r,answer) =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
index 2ab14f48cc..20dfba8477 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.catalyst.types.IntegerType
+import org.apache.spark.sql.catalyst.types.{DoubleType, IntegerType}
// For implicit conversions
import org.apache.spark.sql.catalyst.dsl.plans._