aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala15
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala11
3 files changed, 10 insertions, 20 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index 29ae47e842..3f72e6e184 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -184,10 +184,10 @@ abstract class Expression extends TreeNode[Expression] {
*/
trait Unevaluable extends Expression {
- override def eval(input: InternalRow = null): Any =
+ final override def eval(input: InternalRow = null): Any =
throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")
- override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String =
+ final override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String =
throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
index 577ede73cb..d3fee1ade0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
@@ -63,10 +63,6 @@ private[sql] case object Complete extends AggregateMode
*/
private[sql] case object NoOp extends Expression with Unevaluable {
override def nullable: Boolean = true
- override def eval(input: InternalRow): Any = {
- throw new TreeNodeException(
- this, s"No function to evaluate expression. type: ${this.nodeName}")
- }
override def dataType: DataType = NullType
override def children: Seq[Expression] = Nil
}
@@ -151,8 +147,7 @@ abstract class AggregateFunction2
/**
* A helper class for aggregate functions that can be implemented in terms of catalyst expressions.
*/
-abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable {
- self: Product =>
+abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable with Unevaluable {
val initialValues: Seq[Expression]
val updateExpressions: Seq[Expression]
@@ -188,19 +183,15 @@ abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable {
}
}
- override def update(buffer: MutableRow, input: InternalRow): Unit = {
+ override final def update(buffer: MutableRow, input: InternalRow): Unit = {
throw new UnsupportedOperationException(
"AlgebraicAggregate's update should not be called directly")
}
- override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
+ override final def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
throw new UnsupportedOperationException(
"AlgebraicAggregate's merge should not be called directly")
}
- override def eval(buffer: InternalRow): Any = {
- throw new UnsupportedOperationException(
- "AlgebraicAggregate's eval should not be called directly")
- }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
index e07c920a41..d3295b8baf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.expressions
import com.clearspring.analytics.stream.cardinality.HyperLogLog
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.OpenHashSet
@@ -71,8 +71,7 @@ trait PartialAggregate1 extends AggregateExpression1 {
* A specific implementation of an aggregate function. Used to wrap a generic
* [[AggregateExpression1]] with an algorithm that will be used to compute one specific result.
*/
-abstract class AggregateFunction1
- extends LeafExpression with AggregateExpression1 with Serializable {
+abstract class AggregateFunction1 extends LeafExpression with Serializable {
/** Base should return the generic aggregate expression that this function is computing */
val base: AggregateExpression1
@@ -82,9 +81,9 @@ abstract class AggregateFunction1
def update(input: InternalRow): Unit
- // Do we really need this?
- override def newInstance(): AggregateFunction1 = {
- makeCopy(productIterator.map { case a: AnyRef => a }.toArray)
+ override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
+ throw new UnsupportedOperationException(
+ "AggregateFunction1 should not be used for generated aggregates")
}
}