aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala43
1 files changed, 28 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
index 844f3051fa..7da8379c9a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
@@ -43,52 +43,65 @@ import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
*
* Based loosely on Aggregator from Algebird: https://github.com/twitter/algebird
*
- * @tparam I The input type for the aggregation.
- * @tparam B The type of the intermediate value of the reduction.
- * @tparam O The type of the final output result.
+ * @tparam IN The input type for the aggregation.
+ * @tparam BUF The type of the intermediate value of the reduction.
+ * @tparam OUT The type of the final output result.
* @since 1.6.0
*/
-abstract class Aggregator[-I, B, O] extends Serializable {
+abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
/**
* A zero value for this aggregation. Should satisfy the property that any b + zero = b.
* @since 1.6.0
*/
- def zero: B
+ def zero: BUF
/**
* Combine two values to produce a new value. For performance, the function may modify `b` and
* return it instead of constructing new object for b.
* @since 1.6.0
*/
- def reduce(b: B, a: I): B
+ def reduce(b: BUF, a: IN): BUF
/**
* Merge two intermediate values.
* @since 1.6.0
*/
- def merge(b1: B, b2: B): B
+ def merge(b1: BUF, b2: BUF): BUF
/**
* Transform the output of the reduction.
* @since 1.6.0
*/
- def finish(reduction: B): O
+ def finish(reduction: BUF): OUT
/**
- * Returns this `Aggregator` as a [[TypedColumn]] that can be used in [[Dataset]] or [[DataFrame]]
+ * Specifies the [[Encoder]] for the intermediate value type.
+ * @since 2.0.0
+ */
+ def bufferEncoder: Encoder[BUF]
+
+ /**
+ * Specifies the [[Encoder]] for the final ouput value type.
+ * @since 2.0.0
+ */
+ def outputEncoder: Encoder[OUT]
+
+ /**
+ * Returns this `Aggregator` as a [[TypedColumn]] that can be used in [[Dataset]].
* operations.
* @since 1.6.0
*/
- def toColumn(
- implicit bEncoder: Encoder[B],
- cEncoder: Encoder[O]): TypedColumn[I, O] = {
+ def toColumn: TypedColumn[IN, OUT] = {
+ implicit val bEncoder = bufferEncoder
+ implicit val cEncoder = outputEncoder
+
val expr =
- new AggregateExpression(
+ AggregateExpression(
TypedAggregateExpression(this),
Complete,
- false)
+ isDistinct = false)
- new TypedColumn[I, O](expr, encoderFor[O])
+ new TypedColumn[IN, OUT](expr, encoderFor[OUT])
}
}