aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-07-23 16:08:07 -0700
committerYin Huai <yhuai@databricks.com>2015-07-23 16:08:07 -0700
commitb2f3aca1e8c182b93e250f9d9c4aa69f97eaa11a (patch)
tree8050714cecdb5f5610705f260b366153b73d6e3a
parent662d60db3f4a758b6869de5bd971d23bd5962c3b (diff)
downloadspark-b2f3aca1e8c182b93e250f9d9c4aa69f97eaa11a.tar.gz
spark-b2f3aca1e8c182b93e250f9d9c4aa69f97eaa11a.tar.bz2
spark-b2f3aca1e8c182b93e250f9d9c4aa69f97eaa11a.zip
[SPARK-9286] [SQL] Methods in Unevaluable should be final and AlgebraicAggregate should extend Unevaluable.
This patch marks the Unevaluable.eval() and UnevaluablegenCode() methods as final and fixes two cases where they were overridden. It also updates AggregateFunction2 to extend Unevaluable. Author: Josh Rosen <joshrosen@databricks.com> Closes #7627 from JoshRosen/unevaluable-fix and squashes the following commits: 8d9ed22 [Josh Rosen] AlgebraicAggregate should extend Unevaluable 65329c2 [Josh Rosen] Do not have AggregateFunction1 inherit from AggregateExpression1 fa68a22 [Josh Rosen] Make eval() and genCode() final
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala15
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala11
3 files changed, 10 insertions, 20 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index 29ae47e842..3f72e6e184 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -184,10 +184,10 @@ abstract class Expression extends TreeNode[Expression] {
*/
trait Unevaluable extends Expression {
- override def eval(input: InternalRow = null): Any =
+ final override def eval(input: InternalRow = null): Any =
throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")
- override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String =
+ final override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String =
throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
index 577ede73cb..d3fee1ade0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
@@ -63,10 +63,6 @@ private[sql] case object Complete extends AggregateMode
*/
private[sql] case object NoOp extends Expression with Unevaluable {
override def nullable: Boolean = true
- override def eval(input: InternalRow): Any = {
- throw new TreeNodeException(
- this, s"No function to evaluate expression. type: ${this.nodeName}")
- }
override def dataType: DataType = NullType
override def children: Seq[Expression] = Nil
}
@@ -151,8 +147,7 @@ abstract class AggregateFunction2
/**
* A helper class for aggregate functions that can be implemented in terms of catalyst expressions.
*/
-abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable {
- self: Product =>
+abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable with Unevaluable {
val initialValues: Seq[Expression]
val updateExpressions: Seq[Expression]
@@ -188,19 +183,15 @@ abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable {
}
}
- override def update(buffer: MutableRow, input: InternalRow): Unit = {
+ override final def update(buffer: MutableRow, input: InternalRow): Unit = {
throw new UnsupportedOperationException(
"AlgebraicAggregate's update should not be called directly")
}
- override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
+ override final def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
throw new UnsupportedOperationException(
"AlgebraicAggregate's merge should not be called directly")
}
- override def eval(buffer: InternalRow): Any = {
- throw new UnsupportedOperationException(
- "AlgebraicAggregate's eval should not be called directly")
- }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
index e07c920a41..d3295b8baf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.expressions
import com.clearspring.analytics.stream.cardinality.HyperLogLog
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.OpenHashSet
@@ -71,8 +71,7 @@ trait PartialAggregate1 extends AggregateExpression1 {
* A specific implementation of an aggregate function. Used to wrap a generic
* [[AggregateExpression1]] with an algorithm that will be used to compute one specific result.
*/
-abstract class AggregateFunction1
- extends LeafExpression with AggregateExpression1 with Serializable {
+abstract class AggregateFunction1 extends LeafExpression with Serializable {
/** Base should return the generic aggregate expression that this function is computing */
val base: AggregateExpression1
@@ -82,9 +81,9 @@ abstract class AggregateFunction1
def update(input: InternalRow): Unit
- // Do we really need this?
- override def newInstance(): AggregateFunction1 = {
- makeCopy(productIterator.map { case a: AnyRef => a }.toArray)
+ override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+ throw new UnsupportedOperationException(
+ "AggregateFunction1 should not be used for generated aggregates")
}
}