diff options
author | Patrick Wendell <patrick@databricks.com> | 2015-05-20 13:39:04 -0700 |
---|---|---|
committer | Patrick Wendell <patrick@databricks.com> | 2015-05-20 13:39:04 -0700 |
commit | 6338c40da61de045485c51aa11a5b1e425d22144 (patch) | |
tree | 405740ab6ec1f9c7ebfcba73912c8ee042593b9c /sql/core | |
parent | 829f1d95bac9153e7b646fbc0d55566ecf896200 (diff) | |
download | spark-6338c40da61de045485c51aa11a5b1e425d22144.tar.gz spark-6338c40da61de045485c51aa11a5b1e425d22144.tar.bz2 spark-6338c40da61de045485c51aa11a5b1e425d22144.zip |
Revert "[SPARK-7320] [SQL] Add Cube / Rollup for dataframe"
This reverts commit 10698e1131f665addb454cd498669920699a91b2.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 104 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala | 92 |
2 files changed, 28 insertions, 168 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index d78b4c2f89..adad85806d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -685,53 +685,7 @@ class DataFrame private[sql]( * @since 1.3.0 */ @scala.annotation.varargs - def groupBy(cols: Column*): GroupedData = { - GroupedData(this, cols.map(_.expr), GroupedData.GroupByType) - } - - /** - * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns, - * so we can run aggregation on them. - * See [[GroupedData]] for all the available aggregate functions. - * - * {{{ - * // Compute the average for all numeric columns rolluped by department and group. - * df.rollup($"department", $"group").avg() - * - * // Compute the max age and average salary, rolluped by department and gender. - * df.rollup($"department", $"gender").agg(Map( - * "salary" -> "avg", - * "age" -> "max" - * )) - * }}} - * @group dfops - * @since 1.4.0 - */ - @scala.annotation.varargs - def rollup(cols: Column*): GroupedData = { - GroupedData(this, cols.map(_.expr), GroupedData.RollupType) - } - - /** - * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns, - * so we can run aggregation on them. - * See [[GroupedData]] for all the available aggregate functions. - * - * {{{ - * // Compute the average for all numeric columns cubed by department and group. - * df.cube($"department", $"group").avg() - * - * // Compute the max age and average salary, cubed by department and gender. - * df.cube($"department", $"gender").agg(Map( - * "salary" -> "avg", - * "age" -> "max" - * )) - * }}} - * @group dfops - * @since 1.4.0 - */ - @scala.annotation.varargs - def cube(cols: Column*): GroupedData = GroupedData(this, cols.map(_.expr), GroupedData.CubeType) + def groupBy(cols: Column*): GroupedData = new GroupedData(this, cols.map(_.expr)) /** * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them. @@ -756,61 +710,7 @@ class DataFrame private[sql]( @scala.annotation.varargs def groupBy(col1: String, cols: String*): GroupedData = { val colNames: Seq[String] = col1 +: cols - GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.GroupByType) - } - - /** - * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns, - * so we can run aggregation on them. - * See [[GroupedData]] for all the available aggregate functions. - * - * This is a variant of rollup that can only group by existing columns using column names - * (i.e. cannot construct expressions). - * - * {{{ - * // Compute the average for all numeric columns rolluped by department and group. - * df.rollup("department", "group").avg() - * - * // Compute the max age and average salary, rolluped by department and gender. - * df.rollup($"department", $"gender").agg(Map( - * "salary" -> "avg", - * "age" -> "max" - * )) - * }}} - * @group dfops - * @since 1.4.0 - */ - @scala.annotation.varargs - def rollup(col1: String, cols: String*): GroupedData = { - val colNames: Seq[String] = col1 +: cols - GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.RollupType) - } - - /** - * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns, - * so we can run aggregation on them. - * See [[GroupedData]] for all the available aggregate functions. - * - * This is a variant of cube that can only group by existing columns using column names - * (i.e. cannot construct expressions). - * - * {{{ - * // Compute the average for all numeric columns cubed by department and group. - * df.cube("department", "group").avg() - * - * // Compute the max age and average salary, cubed by department and gender. - * df.cube($"department", $"gender").agg(Map( - * "salary" -> "avg", - * "age" -> "max" - * )) - * }}} - * @group dfops - * @since 1.4.0 - */ - @scala.annotation.varargs - def cube(col1: String, cols: String*): GroupedData = { - val colNames: Seq[String] = col1 +: cols - GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.CubeType) + new GroupedData(this, colNames.map(colName => resolve(colName))) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala index f730e4ae00..1381b9f1a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala @@ -23,40 +23,9 @@ import scala.language.implicitConversions import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.Star import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.{Rollup, Cube, Aggregate} +import org.apache.spark.sql.catalyst.plans.logical.Aggregate import org.apache.spark.sql.types.NumericType -/** - * Companion object for GroupedData - */ -private[sql] object GroupedData { - def apply( - df: DataFrame, - groupingExprs: Seq[Expression], - groupType: GroupType): GroupedData = { - new GroupedData(df, groupingExprs, groupType: GroupType) - } - - /** - * The Grouping Type - */ - trait GroupType - - /** - * To indicate it's the GroupBy - */ - object GroupByType extends GroupType - - /** - * To indicate it's the CUBE - */ - object CubeType extends GroupType - - /** - * To indicate it's the ROLLUP - */ - object RollupType extends GroupType -} /** * :: Experimental :: @@ -65,37 +34,19 @@ private[sql] object GroupedData { * @since 1.3.0 */ @Experimental -class GroupedData protected[sql]( - df: DataFrame, - groupingExprs: Seq[Expression], - private val groupType: GroupedData.GroupType) { +class GroupedData protected[sql](df: DataFrame, groupingExprs: Seq[Expression]) { - private[this] def toDF(aggExprs: Seq[NamedExpression]): DataFrame = { - val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) { - val retainedExprs = groupingExprs.map { - case expr: NamedExpression => expr - case expr: Expression => Alias(expr, expr.prettyString)() - } - retainedExprs ++ aggExprs - } else { - aggExprs - } - - groupType match { - case GroupedData.GroupByType => - DataFrame( - df.sqlContext, Aggregate(groupingExprs, aggregates, df.logicalPlan)) - case GroupedData.RollupType => - DataFrame( - df.sqlContext, Rollup(groupingExprs, df.logicalPlan, aggregates)) - case GroupedData.CubeType => - DataFrame( - df.sqlContext, Cube(groupingExprs, df.logicalPlan, aggregates)) + private[sql] implicit def toDF(aggExprs: Seq[NamedExpression]): DataFrame = { + val namedGroupingExprs = groupingExprs.map { + case expr: NamedExpression => expr + case expr: Expression => Alias(expr, expr.prettyString)() } + DataFrame( + df.sqlContext, Aggregate(groupingExprs, namedGroupingExprs ++ aggExprs, df.logicalPlan)) } private[this] def aggregateNumericColumns(colNames: String*)(f: Expression => Expression) - : DataFrame = { + : Seq[NamedExpression] = { val columnExprs = if (colNames.isEmpty) { // No columns specified. Use all numeric columns. @@ -112,10 +63,10 @@ class GroupedData protected[sql]( namedExpr } } - toDF(columnExprs.map { c => + columnExprs.map { c => val a = f(c) Alias(a, a.prettyString)() - }) + } } private[this] def strToExpr(expr: String): (Expression => Expression) = { @@ -168,10 +119,10 @@ class GroupedData protected[sql]( * @since 1.3.0 */ def agg(exprs: Map[String, String]): DataFrame = { - toDF(exprs.map { case (colName, expr) => + exprs.map { case (colName, expr) => val a = strToExpr(expr)(df(colName).expr) Alias(a, a.prettyString)() - }.toSeq) + }.toSeq } /** @@ -224,10 +175,19 @@ class GroupedData protected[sql]( */ @scala.annotation.varargs def agg(expr: Column, exprs: Column*): DataFrame = { - toDF((expr +: exprs).map(_.expr).map { + val aggExprs = (expr +: exprs).map(_.expr).map { case expr: NamedExpression => expr case expr: Expression => Alias(expr, expr.prettyString)() - }) + } + if (df.sqlContext.conf.dataFrameRetainGroupColumns) { + val retainedExprs = groupingExprs.map { + case expr: NamedExpression => expr + case expr: Expression => Alias(expr, expr.prettyString)() + } + DataFrame(df.sqlContext, Aggregate(groupingExprs, retainedExprs ++ aggExprs, df.logicalPlan)) + } else { + DataFrame(df.sqlContext, Aggregate(groupingExprs, aggExprs, df.logicalPlan)) + } } /** @@ -236,7 +196,7 @@ class GroupedData protected[sql]( * * @since 1.3.0 */ - def count(): DataFrame = toDF(Seq(Alias(Count(Literal(1)), "count")())) + def count(): DataFrame = Seq(Alias(Count(Literal(1)), "count")()) /** * Compute the average value for each numeric columns for each group. This is an alias for `avg`. @@ -296,5 +256,5 @@ class GroupedData protected[sql]( @scala.annotation.varargs def sum(colNames: String*): DataFrame = { aggregateNumericColumns(colNames:_*)(Sum) - } + } } |