aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorPatrick Wendell <patrick@databricks.com>2015-05-20 13:39:04 -0700
committerPatrick Wendell <patrick@databricks.com>2015-05-20 13:39:04 -0700
commit6338c40da61de045485c51aa11a5b1e425d22144 (patch)
tree405740ab6ec1f9c7ebfcba73912c8ee042593b9c /sql
parent829f1d95bac9153e7b646fbc0d55566ecf896200 (diff)
downloadspark-6338c40da61de045485c51aa11a5b1e425d22144.tar.gz
spark-6338c40da61de045485c51aa11a5b1e425d22144.tar.bz2
spark-6338c40da61de045485c51aa11a5b1e425d22144.zip
Revert "[SPARK-7320] [SQL] Add Cube / Rollup for dataframe"
This reverts commit 10698e1131f665addb454cd498669920699a91b2.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala104
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala92
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala62
3 files changed, 28 insertions, 230 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index d78b4c2f89..adad85806d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -685,53 +685,7 @@ class DataFrame private[sql](
* @since 1.3.0
*/
@scala.annotation.varargs
- def groupBy(cols: Column*): GroupedData = {
- GroupedData(this, cols.map(_.expr), GroupedData.GroupByType)
- }
-
- /**
- * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns,
- * so we can run aggregation on them.
- * See [[GroupedData]] for all the available aggregate functions.
- *
- * {{{
- * // Compute the average for all numeric columns rolluped by department and group.
- * df.rollup($"department", $"group").avg()
- *
- * // Compute the max age and average salary, rolluped by department and gender.
- * df.rollup($"department", $"gender").agg(Map(
- * "salary" -> "avg",
- * "age" -> "max"
- * ))
- * }}}
- * @group dfops
- * @since 1.4.0
- */
- @scala.annotation.varargs
- def rollup(cols: Column*): GroupedData = {
- GroupedData(this, cols.map(_.expr), GroupedData.RollupType)
- }
-
- /**
- * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns,
- * so we can run aggregation on them.
- * See [[GroupedData]] for all the available aggregate functions.
- *
- * {{{
- * // Compute the average for all numeric columns cubed by department and group.
- * df.cube($"department", $"group").avg()
- *
- * // Compute the max age and average salary, cubed by department and gender.
- * df.cube($"department", $"gender").agg(Map(
- * "salary" -> "avg",
- * "age" -> "max"
- * ))
- * }}}
- * @group dfops
- * @since 1.4.0
- */
- @scala.annotation.varargs
- def cube(cols: Column*): GroupedData = GroupedData(this, cols.map(_.expr), GroupedData.CubeType)
+ def groupBy(cols: Column*): GroupedData = new GroupedData(this, cols.map(_.expr))
/**
* Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
@@ -756,61 +710,7 @@ class DataFrame private[sql](
@scala.annotation.varargs
def groupBy(col1: String, cols: String*): GroupedData = {
val colNames: Seq[String] = col1 +: cols
- GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.GroupByType)
- }
-
- /**
- * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns,
- * so we can run aggregation on them.
- * See [[GroupedData]] for all the available aggregate functions.
- *
- * This is a variant of rollup that can only group by existing columns using column names
- * (i.e. cannot construct expressions).
- *
- * {{{
- * // Compute the average for all numeric columns rolluped by department and group.
- * df.rollup("department", "group").avg()
- *
- * // Compute the max age and average salary, rolluped by department and gender.
- * df.rollup($"department", $"gender").agg(Map(
- * "salary" -> "avg",
- * "age" -> "max"
- * ))
- * }}}
- * @group dfops
- * @since 1.4.0
- */
- @scala.annotation.varargs
- def rollup(col1: String, cols: String*): GroupedData = {
- val colNames: Seq[String] = col1 +: cols
- GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.RollupType)
- }
-
- /**
- * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns,
- * so we can run aggregation on them.
- * See [[GroupedData]] for all the available aggregate functions.
- *
- * This is a variant of cube that can only group by existing columns using column names
- * (i.e. cannot construct expressions).
- *
- * {{{
- * // Compute the average for all numeric columns cubed by department and group.
- * df.cube("department", "group").avg()
- *
- * // Compute the max age and average salary, cubed by department and gender.
- * df.cube($"department", $"gender").agg(Map(
- * "salary" -> "avg",
- * "age" -> "max"
- * ))
- * }}}
- * @group dfops
- * @since 1.4.0
- */
- @scala.annotation.varargs
- def cube(col1: String, cols: String*): GroupedData = {
- val colNames: Seq[String] = col1 +: cols
- GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.CubeType)
+ new GroupedData(this, colNames.map(colName => resolve(colName)))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
index f730e4ae00..1381b9f1a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@@ -23,40 +23,9 @@ import scala.language.implicitConversions
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.Star
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{Rollup, Cube, Aggregate}
+import org.apache.spark.sql.catalyst.plans.logical.Aggregate
import org.apache.spark.sql.types.NumericType
-/**
- * Companion object for GroupedData
- */
-private[sql] object GroupedData {
- def apply(
- df: DataFrame,
- groupingExprs: Seq[Expression],
- groupType: GroupType): GroupedData = {
- new GroupedData(df, groupingExprs, groupType: GroupType)
- }
-
- /**
- * The Grouping Type
- */
- trait GroupType
-
- /**
- * To indicate it's the GroupBy
- */
- object GroupByType extends GroupType
-
- /**
- * To indicate it's the CUBE
- */
- object CubeType extends GroupType
-
- /**
- * To indicate it's the ROLLUP
- */
- object RollupType extends GroupType
-}
/**
* :: Experimental ::
@@ -65,37 +34,19 @@ private[sql] object GroupedData {
* @since 1.3.0
*/
@Experimental
-class GroupedData protected[sql](
- df: DataFrame,
- groupingExprs: Seq[Expression],
- private val groupType: GroupedData.GroupType) {
+class GroupedData protected[sql](df: DataFrame, groupingExprs: Seq[Expression]) {
- private[this] def toDF(aggExprs: Seq[NamedExpression]): DataFrame = {
- val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) {
- val retainedExprs = groupingExprs.map {
- case expr: NamedExpression => expr
- case expr: Expression => Alias(expr, expr.prettyString)()
- }
- retainedExprs ++ aggExprs
- } else {
- aggExprs
- }
-
- groupType match {
- case GroupedData.GroupByType =>
- DataFrame(
- df.sqlContext, Aggregate(groupingExprs, aggregates, df.logicalPlan))
- case GroupedData.RollupType =>
- DataFrame(
- df.sqlContext, Rollup(groupingExprs, df.logicalPlan, aggregates))
- case GroupedData.CubeType =>
- DataFrame(
- df.sqlContext, Cube(groupingExprs, df.logicalPlan, aggregates))
+ private[sql] implicit def toDF(aggExprs: Seq[NamedExpression]): DataFrame = {
+ val namedGroupingExprs = groupingExprs.map {
+ case expr: NamedExpression => expr
+ case expr: Expression => Alias(expr, expr.prettyString)()
}
+ DataFrame(
+ df.sqlContext, Aggregate(groupingExprs, namedGroupingExprs ++ aggExprs, df.logicalPlan))
}
private[this] def aggregateNumericColumns(colNames: String*)(f: Expression => Expression)
- : DataFrame = {
+ : Seq[NamedExpression] = {
val columnExprs = if (colNames.isEmpty) {
// No columns specified. Use all numeric columns.
@@ -112,10 +63,10 @@ class GroupedData protected[sql](
namedExpr
}
}
- toDF(columnExprs.map { c =>
+ columnExprs.map { c =>
val a = f(c)
Alias(a, a.prettyString)()
- })
+ }
}
private[this] def strToExpr(expr: String): (Expression => Expression) = {
@@ -168,10 +119,10 @@ class GroupedData protected[sql](
* @since 1.3.0
*/
def agg(exprs: Map[String, String]): DataFrame = {
- toDF(exprs.map { case (colName, expr) =>
+ exprs.map { case (colName, expr) =>
val a = strToExpr(expr)(df(colName).expr)
Alias(a, a.prettyString)()
- }.toSeq)
+ }.toSeq
}
/**
@@ -224,10 +175,19 @@ class GroupedData protected[sql](
*/
@scala.annotation.varargs
def agg(expr: Column, exprs: Column*): DataFrame = {
- toDF((expr +: exprs).map(_.expr).map {
+ val aggExprs = (expr +: exprs).map(_.expr).map {
case expr: NamedExpression => expr
case expr: Expression => Alias(expr, expr.prettyString)()
- })
+ }
+ if (df.sqlContext.conf.dataFrameRetainGroupColumns) {
+ val retainedExprs = groupingExprs.map {
+ case expr: NamedExpression => expr
+ case expr: Expression => Alias(expr, expr.prettyString)()
+ }
+ DataFrame(df.sqlContext, Aggregate(groupingExprs, retainedExprs ++ aggExprs, df.logicalPlan))
+ } else {
+ DataFrame(df.sqlContext, Aggregate(groupingExprs, aggExprs, df.logicalPlan))
+ }
}
/**
@@ -236,7 +196,7 @@ class GroupedData protected[sql](
*
* @since 1.3.0
*/
- def count(): DataFrame = toDF(Seq(Alias(Count(Literal(1)), "count")()))
+ def count(): DataFrame = Seq(Alias(Count(Literal(1)), "count")())
/**
* Compute the average value for each numeric columns for each group. This is an alias for `avg`.
@@ -296,5 +256,5 @@ class GroupedData protected[sql](
@scala.annotation.varargs
def sum(colNames: String*): DataFrame = {
aggregateNumericColumns(colNames:_*)(Sum)
- }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala
deleted file mode 100644
index 3ad05f4825..0000000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.test.TestHive
-import org.apache.spark.sql.hive.test.TestHive._
-import org.apache.spark.sql.hive.test.TestHive.implicits._
-
-case class TestData2Int(a: Int, b: Int)
-
-// TODO ideally we should put the test suite into the package `sql`, as
-// `hive` package is optional in compiling, however, `SQLContext.sql` doesn't
-// support the `cube` or `rollup` yet.
-class HiveDataFrameAnalyticsSuite extends QueryTest {
- val testData =
- TestHive.sparkContext.parallelize(
- TestData2Int(1, 2) ::
- TestData2Int(2, 4) :: Nil).toDF()
-
- testData.registerTempTable("mytable")
-
- test("rollup") {
- checkAnswer(
- testData.rollup($"a" + $"b", $"b").agg(sum($"a" - $"b")),
- sql("select a + b, b, sum(a - b) from mytable group by a + b, b with rollup").collect()
- )
-
- checkAnswer(
- testData.rollup("a", "b").agg(sum("b")),
- sql("select a, b, sum(b) from mytable group by a, b with rollup").collect()
- )
- }
-
- test("cube") {
- checkAnswer(
- testData.cube($"a" + $"b", $"b").agg(sum($"a" - $"b")),
- sql("select a + b, b, sum(a - b) from mytable group by a + b, b with cube").collect()
- )
-
- checkAnswer(
- testData.cube("a", "b").agg(sum("b")),
- sql("select a, b, sum(b) from mytable group by a, b with cube").collect()
- )
- }
-}