From b2f3aca1e8c182b93e250f9d9c4aa69f97eaa11a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 Jul 2015 16:08:07 -0700 Subject: [SPARK-9286] [SQL] Methods in Unevaluable should be final and AlgebraicAggregate should extend Unevaluable. This patch marks the Unevaluable.eval() and UnevaluablegenCode() methods as final and fixes two cases where they were overridden. It also updates AggregateFunction2 to extend Unevaluable. Author: Josh Rosen Closes #7627 from JoshRosen/unevaluable-fix and squashes the following commits: 8d9ed22 [Josh Rosen] AlgebraicAggregate should extend Unevaluable 65329c2 [Josh Rosen] Do not have AggregateFunction1 inherit from AggregateExpression1 fa68a22 [Josh Rosen] Make eval() and genCode() final --- .../spark/sql/catalyst/expressions/Expression.scala | 4 ++-- .../sql/catalyst/expressions/aggregate/interfaces.scala | 15 +++------------ .../spark/sql/catalyst/expressions/aggregates.scala | 11 +++++------ 3 files changed, 10 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 29ae47e842..3f72e6e184 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -184,10 +184,10 @@ abstract class Expression extends TreeNode[Expression] { */ trait Unevaluable extends Expression { - override def eval(input: InternalRow = null): Any = + final override def eval(input: InternalRow = null): Any = throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") - override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = + final override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala index 577ede73cb..d3fee1ade0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala @@ -63,10 +63,6 @@ private[sql] case object Complete extends AggregateMode */ private[sql] case object NoOp extends Expression with Unevaluable { override def nullable: Boolean = true - override def eval(input: InternalRow): Any = { - throw new TreeNodeException( - this, s"No function to evaluate expression. type: ${this.nodeName}") - } override def dataType: DataType = NullType override def children: Seq[Expression] = Nil } @@ -151,8 +147,7 @@ abstract class AggregateFunction2 /** * A helper class for aggregate functions that can be implemented in terms of catalyst expressions. */ -abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable { - self: Product => +abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable with Unevaluable { val initialValues: Seq[Expression] val updateExpressions: Seq[Expression] @@ -188,19 +183,15 @@ abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable { } } - override def update(buffer: MutableRow, input: InternalRow): Unit = { + override final def update(buffer: MutableRow, input: InternalRow): Unit = { throw new UnsupportedOperationException( "AlgebraicAggregate's update should not be called directly") } - override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = { + override final def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = { throw new UnsupportedOperationException( "AlgebraicAggregate's merge should not be called directly") } - override def eval(buffer: InternalRow): Any = { - throw new UnsupportedOperationException( - "AlgebraicAggregate's eval should not be called directly") - } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index e07c920a41..d3295b8baf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.expressions import com.clearspring.analytics.stream.cardinality.HyperLogLog import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode} import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.types._ import org.apache.spark.util.collection.OpenHashSet @@ -71,8 +71,7 @@ trait PartialAggregate1 extends AggregateExpression1 { * A specific implementation of an aggregate function. Used to wrap a generic * [[AggregateExpression1]] with an algorithm that will be used to compute one specific result. */ -abstract class AggregateFunction1 - extends LeafExpression with AggregateExpression1 with Serializable { +abstract class AggregateFunction1 extends LeafExpression with Serializable { /** Base should return the generic aggregate expression that this function is computing */ val base: AggregateExpression1 @@ -82,9 +81,9 @@ abstract class AggregateFunction1 def update(input: InternalRow): Unit - // Do we really need this? - override def newInstance(): AggregateFunction1 = { - makeCopy(productIterator.map { case a: AnyRef => a }.toArray) + override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + throw new UnsupportedOperationException( + "AggregateFunction1 should not be used for generated aggregates") } } -- cgit v1.2.3