aboutsummaryrefslogtreecommitdiff
path: root/repl/scala-2.11
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-09 00:00:39 -0700
committerReynold Xin <rxin@databricks.com>2016-04-09 00:00:39 -0700
commit520dde48d0d52dbbbbe1710a3275fdd5355dd69d (patch)
treec6fdf831b43c044e894f5cf24af30650e6aa82c0 /repl/scala-2.11
parent2f0b882e5c8787b09bedcc8208e6dcc5662dbbab (diff)
downloadspark-520dde48d0d52dbbbbe1710a3275fdd5355dd69d.tar.gz
spark-520dde48d0d52dbbbbe1710a3275fdd5355dd69d.tar.bz2
spark-520dde48d0d52dbbbbe1710a3275fdd5355dd69d.zip
[SPARK-14451][SQL] Move encoder definition into Aggregator interface
## What changes were proposed in this pull request? When we first introduced Aggregators, we required the user of Aggregators to (implicitly) specify the encoders. It would actually make more sense to have the encoders be specified by the implementation of Aggregators, since each implementation should have the most state about how to encode its own data type. Note that this simplifies the Java API because Java users no longer need to explicitly specify encoders for aggregators. ## How was this patch tested? Updated unit tests. Author: Reynold Xin <rxin@databricks.com> Closes #12231 from rxin/SPARK-14451.
Diffstat (limited to 'repl/scala-2.11')
-rw-r--r--repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala29
1 files changed, 3 insertions, 26 deletions
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index dbfacba346..7e10f15226 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -267,7 +267,7 @@ class ReplSuite extends SparkFunSuite {
val output = runInterpreter("local",
"""
|import org.apache.spark.sql.functions._
- |import org.apache.spark.sql.Encoder
+ |import org.apache.spark.sql.{Encoder, Encoders}
|import org.apache.spark.sql.expressions.Aggregator
|import org.apache.spark.sql.TypedColumn
|val simpleSum = new Aggregator[Int, Int, Int] {
@@ -275,6 +275,8 @@ class ReplSuite extends SparkFunSuite {
| def reduce(b: Int, a: Int) = b + a // Add an element to the running total
| def merge(b1: Int, b2: Int) = b1 + b2 // Merge intermediate values.
| def finish(b: Int) = b // Return the final result.
+ | def bufferEncoder: Encoder[Int] = Encoders.scalaInt
+ | def outputEncoder: Encoder[Int] = Encoders.scalaInt
|}.toColumn
|
|val ds = Seq(1, 2, 3, 4).toDS()
@@ -321,31 +323,6 @@ class ReplSuite extends SparkFunSuite {
}
}
- test("Datasets agg type-inference") {
- val output = runInterpreter("local",
- """
- |import org.apache.spark.sql.functions._
- |import org.apache.spark.sql.Encoder
- |import org.apache.spark.sql.expressions.Aggregator
- |import org.apache.spark.sql.TypedColumn
- |/** An `Aggregator` that adds up any numeric type returned by the given function. */
- |class SumOf[I, N : Numeric](f: I => N) extends
- | org.apache.spark.sql.expressions.Aggregator[I, N, N] {
- | val numeric = implicitly[Numeric[N]]
- | override def zero: N = numeric.zero
- | override def reduce(b: N, a: I): N = numeric.plus(b, f(a))
- | override def merge(b1: N,b2: N): N = numeric.plus(b1, b2)
- | override def finish(reduction: N): N = reduction
- |}
- |
- |def sum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, N] = new SumOf(f).toColumn
- |val ds = Seq((1, 1, 2L), (1, 2, 3L), (1, 3, 4L), (2, 1, 5L)).toDS()
- |ds.groupByKey(_._1).agg(sum(_._2), sum(_._3)).collect()
- """.stripMargin)
- assertDoesNotContain("error:", output)
- assertDoesNotContain("Exception", output)
- }
-
test("collecting objects of class defined in repl") {
val output = runInterpreter("local[2]",
"""