diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala | 43 |
1 files changed, 28 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala index 844f3051fa..7da8379c9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala @@ -43,52 +43,65 @@ import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression * * Based loosely on Aggregator from Algebird: https://github.com/twitter/algebird * - * @tparam I The input type for the aggregation. - * @tparam B The type of the intermediate value of the reduction. - * @tparam O The type of the final output result. + * @tparam IN The input type for the aggregation. + * @tparam BUF The type of the intermediate value of the reduction. + * @tparam OUT The type of the final output result. * @since 1.6.0 */ -abstract class Aggregator[-I, B, O] extends Serializable { +abstract class Aggregator[-IN, BUF, OUT] extends Serializable { /** * A zero value for this aggregation. Should satisfy the property that any b + zero = b. * @since 1.6.0 */ - def zero: B + def zero: BUF /** * Combine two values to produce a new value. For performance, the function may modify `b` and * return it instead of constructing new object for b. * @since 1.6.0 */ - def reduce(b: B, a: I): B + def reduce(b: BUF, a: IN): BUF /** * Merge two intermediate values. * @since 1.6.0 */ - def merge(b1: B, b2: B): B + def merge(b1: BUF, b2: BUF): BUF /** * Transform the output of the reduction. * @since 1.6.0 */ - def finish(reduction: B): O + def finish(reduction: BUF): OUT /** - * Returns this `Aggregator` as a [[TypedColumn]] that can be used in [[Dataset]] or [[DataFrame]] + * Specifies the [[Encoder]] for the intermediate value type. + * @since 2.0.0 + */ + def bufferEncoder: Encoder[BUF] + + /** + * Specifies the [[Encoder]] for the final ouput value type. + * @since 2.0.0 + */ + def outputEncoder: Encoder[OUT] + + /** + * Returns this `Aggregator` as a [[TypedColumn]] that can be used in [[Dataset]]. * operations. * @since 1.6.0 */ - def toColumn( - implicit bEncoder: Encoder[B], - cEncoder: Encoder[O]): TypedColumn[I, O] = { + def toColumn: TypedColumn[IN, OUT] = { + implicit val bEncoder = bufferEncoder + implicit val cEncoder = outputEncoder + val expr = - new AggregateExpression( + AggregateExpression( TypedAggregateExpression(this), Complete, - false) + isDistinct = false) - new TypedColumn[I, O](expr, encoderFor[O]) + new TypedColumn[IN, OUT](expr, encoderFor[OUT]) } } |