aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-16 01:05:26 -0700
committerReynold Xin <rxin@databricks.com>2016-04-16 01:05:26 -0700
commit527c780bb0d6cb074128448da00cb330e9049385 (patch)
treed4085adef6750a5315daa6fabef7f9c0218fca20
parent12854464c4fa30c4df3b5b17bd8914d048dbf4a9 (diff)
downloadspark-527c780bb0d6cb074128448da00cb330e9049385.tar.gz
spark-527c780bb0d6cb074128448da00cb330e9049385.tar.bz2
spark-527c780bb0d6cb074128448da00cb330e9049385.zip
Revert "[SPARK-13363][SQL] support Aggregator in RelationalGroupedDataset"
This reverts commit 12854464c4fa30c4df3b5b17bd8914d048dbf4a9.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala14
2 files changed, 2 insertions, 18 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index deb2e82165..7dbf2e6c7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -208,11 +208,7 @@ class RelationalGroupedDataset protected[sql](
*/
@scala.annotation.varargs
def agg(expr: Column, exprs: Column*): DataFrame = {
- toDF((expr +: exprs).map {
- case typed: TypedColumn[_, _] =>
- typed.withInputType(df.resolvedTEncoder, df.logicalPlan.output).expr
- case c => c.expr
- })
+ toDF((expr +: exprs).map(_.expr))
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
index 0d84a594f7..3a7215ee39 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
import scala.language.postfixOps
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.expressions.scala.typed
import org.apache.spark.sql.functions._
@@ -84,15 +85,6 @@ class ParameterizedTypeSum[IN, OUT : Numeric : Encoder](f: IN => OUT)
override def outputEncoder: Encoder[OUT] = implicitly[Encoder[OUT]]
}
-object RowAgg extends Aggregator[Row, Int, Int] {
- def zero: Int = 0
- def reduce(b: Int, a: Row): Int = a.getInt(0) + b
- def merge(b1: Int, b2: Int): Int = b1 + b2
- def finish(r: Int): Int = r
- override def bufferEncoder: Encoder[Int] = Encoders.scalaInt
- override def outputEncoder: Encoder[Int] = Encoders.scalaInt
-}
-
class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
@@ -208,8 +200,4 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
(1279869254, "Some String"))
}
- test("aggregator in DataFrame/Dataset[Row]") {
- val df = Seq(1 -> "a", 2 -> "b", 3 -> "b").toDF("i", "j")
- checkAnswer(df.groupBy($"j").agg(RowAgg.toColumn), Row("a", 1) :: Row("b", 5) :: Nil)
- }
}