aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-11-21 00:54:18 -0800
committerReynold Xin <rxin@databricks.com>2015-11-21 00:54:18 -0800
commit596710268e29e8f624c3ba2fade08b66ec7084eb (patch)
treee0ede3d6ab287294cad415b21fa32a5af6bb59f3
parent54328b6d862fe62ae01bdd87df4798ceb9d506d6 (diff)
downloadspark-596710268e29e8f624c3ba2fade08b66ec7084eb.tar.gz
spark-596710268e29e8f624c3ba2fade08b66ec7084eb.tar.bz2
spark-596710268e29e8f624c3ba2fade08b66ec7084eb.zip
[SPARK-11901][SQL] API audit for Aggregator.
Author: Reynold Xin <rxin@databricks.com> Closes #9882 from rxin/SPARK-11901.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala39
2 files changed, 24 insertions, 16 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index bdcdc5d47c..0764750842 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -22,7 +22,6 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.api.java.function._
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
index 72610e735f..b0cd32b5f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
@@ -17,11 +17,10 @@
package org.apache.spark.sql.expressions
-import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete}
import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
-import org.apache.spark.sql.{Dataset, DataFrame, TypedColumn}
+import org.apache.spark.sql.{DataFrame, Dataset, Encoder, TypedColumn}
/**
* A base class for user-defined aggregations, which can be used in [[DataFrame]] and [[Dataset]]
@@ -32,55 +31,65 @@ import org.apache.spark.sql.{Dataset, DataFrame, TypedColumn}
* case class Data(i: Int)
*
* val customSummer = new Aggregator[Data, Int, Int] {
- * def zero = 0
- * def reduce(b: Int, a: Data) = b + a.i
- * def present(r: Int) = r
+ * def zero: Int = 0
+ * def reduce(b: Int, a: Data): Int = b + a.i
+ * def merge(b1: Int, b2: Int): Int = b1 + b2
+ * def present(r: Int): Int = r
* }.toColumn()
*
- * val ds: Dataset[Data]
+ * val ds: Dataset[Data] = ...
* val aggregated = ds.select(customSummer)
* }}}
*
* Based loosely on Aggregator from Algebird: https://github.com/twitter/algebird
*
- * @tparam A The input type for the aggregation.
+ * @tparam I The input type for the aggregation.
* @tparam B The type of the intermediate value of the reduction.
- * @tparam C The type of the final result.
+ * @tparam O The type of the final output result.
+ *
+ * @since 1.6.0
*/
-abstract class Aggregator[-A, B, C] extends Serializable {
+abstract class Aggregator[-I, B, O] extends Serializable {
- /** A zero value for this aggregation. Should satisfy the property that any b + zero = b */
+ /**
+ * A zero value for this aggregation. Should satisfy the property that any b + zero = b.
+ * @since 1.6.0
+ */
def zero: B
/**
* Combine two values to produce a new value. For performance, the function may modify `b` and
* return it instead of constructing new object for b.
+ * @since 1.6.0
*/
- def reduce(b: B, a: A): B
+ def reduce(b: B, a: I): B
/**
- * Merge two intermediate values
+ * Merge two intermediate values.
+ * @since 1.6.0
*/
def merge(b1: B, b2: B): B
/**
* Transform the output of the reduction.
+ * @since 1.6.0
*/
- def finish(reduction: B): C
+ def finish(reduction: B): O
/**
* Returns this `Aggregator` as a [[TypedColumn]] that can be used in [[Dataset]] or [[DataFrame]]
* operations.
+ * @since 1.6.0
*/
def toColumn(
implicit bEncoder: Encoder[B],
- cEncoder: Encoder[C]): TypedColumn[A, C] = {
+ cEncoder: Encoder[O]): TypedColumn[I, O] = {
val expr =
new AggregateExpression(
TypedAggregateExpression(this),
Complete,
false)
- new TypedColumn[A, C](expr, encoderFor[C])
+ new TypedColumn[I, O](expr, encoderFor[O])
}
}