diff options
author | Cheng Hao <hao.cheng@intel.com> | 2015-05-20 19:09:47 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2015-05-20 19:12:12 +0800 |
commit | 10698e1131f665addb454cd498669920699a91b2 (patch) | |
tree | a1cc3ae945af25f69909e9c69e9e3a9d83d148cc /sql/core | |
parent | 86893390cfd31d36ff03c2e062a13196a1f7a6fa (diff) | |
download | spark-10698e1131f665addb454cd498669920699a91b2.tar.gz spark-10698e1131f665addb454cd498669920699a91b2.tar.bz2 spark-10698e1131f665addb454cd498669920699a91b2.zip |
[SPARK-7320] [SQL] Add Cube / Rollup for dataframe
Add `cube` & `rollup` for DataFrame
For example:
```scala
testData.rollup($"a" + $"b", $"b").agg(sum($"a" - $"b"))
testData.cube($"a" + $"b", $"b").agg(sum($"a" - $"b"))
```
Author: Cheng Hao <hao.cheng@intel.com>
Closes #6257 from chenghao-intel/rollup and squashes the following commits:
7302319 [Cheng Hao] cancel the implicit keyword
a66e38f [Cheng Hao] remove the unnecessary code changes
a2869d4 [Cheng Hao] update the code as comments
c441777 [Cheng Hao] update the code as suggested
84c9564 [Cheng Hao] Remove the CubedData & RollupedData
279584c [Cheng Hao] hiden the CubedData & RollupedData
ef357e1 [Cheng Hao] Add Cube / Rollup for dataframe
(cherry picked from commit 09265ad7c85c6de6b568ec329daad632d4a79fa3)
Signed-off-by: Cheng Lian <lian@databricks.com>
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 104 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala | 92 |
2 files changed, 168 insertions, 28 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index adad85806d..d78b4c2f89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -685,7 +685,53 @@ class DataFrame private[sql]( * @since 1.3.0 */ @scala.annotation.varargs - def groupBy(cols: Column*): GroupedData = new GroupedData(this, cols.map(_.expr)) + def groupBy(cols: Column*): GroupedData = { + GroupedData(this, cols.map(_.expr), GroupedData.GroupByType) + } + + /** + * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns, + * so we can run aggregation on them. + * See [[GroupedData]] for all the available aggregate functions. + * + * {{{ + * // Compute the average for all numeric columns rolluped by department and group. + * df.rollup($"department", $"group").avg() + * + * // Compute the max age and average salary, rolluped by department and gender. + * df.rollup($"department", $"gender").agg(Map( + * "salary" -> "avg", + * "age" -> "max" + * )) + * }}} + * @group dfops + * @since 1.4.0 + */ + @scala.annotation.varargs + def rollup(cols: Column*): GroupedData = { + GroupedData(this, cols.map(_.expr), GroupedData.RollupType) + } + + /** + * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns, + * so we can run aggregation on them. + * See [[GroupedData]] for all the available aggregate functions. + * + * {{{ + * // Compute the average for all numeric columns cubed by department and group. + * df.cube($"department", $"group").avg() + * + * // Compute the max age and average salary, cubed by department and gender. + * df.cube($"department", $"gender").agg(Map( + * "salary" -> "avg", + * "age" -> "max" + * )) + * }}} + * @group dfops + * @since 1.4.0 + */ + @scala.annotation.varargs + def cube(cols: Column*): GroupedData = GroupedData(this, cols.map(_.expr), GroupedData.CubeType) /** * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them. @@ -710,7 +756,61 @@ class DataFrame private[sql]( @scala.annotation.varargs def groupBy(col1: String, cols: String*): GroupedData = { val colNames: Seq[String] = col1 +: cols - new GroupedData(this, colNames.map(colName => resolve(colName))) + GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.GroupByType) + } + + /** + * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns, + * so we can run aggregation on them. + * See [[GroupedData]] for all the available aggregate functions. + * + * This is a variant of rollup that can only group by existing columns using column names + * (i.e. cannot construct expressions). + * + * {{{ + * // Compute the average for all numeric columns rolluped by department and group. + * df.rollup("department", "group").avg() + * + * // Compute the max age and average salary, rolluped by department and gender. + * df.rollup($"department", $"gender").agg(Map( + * "salary" -> "avg", + * "age" -> "max" + * )) + * }}} + * @group dfops + * @since 1.4.0 + */ + @scala.annotation.varargs + def rollup(col1: String, cols: String*): GroupedData = { + val colNames: Seq[String] = col1 +: cols + GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.RollupType) + } + + /** + * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns, + * so we can run aggregation on them. + * See [[GroupedData]] for all the available aggregate functions. + * + * This is a variant of cube that can only group by existing columns using column names + * (i.e. cannot construct expressions). + * + * {{{ + * // Compute the average for all numeric columns cubed by department and group. + * df.cube("department", "group").avg() + * + * // Compute the max age and average salary, cubed by department and gender. + * df.cube($"department", $"gender").agg(Map( + * "salary" -> "avg", + * "age" -> "max" + * )) + * }}} + * @group dfops + * @since 1.4.0 + */ + @scala.annotation.varargs + def cube(col1: String, cols: String*): GroupedData = { + val colNames: Seq[String] = col1 +: cols + GroupedData(this, colNames.map(colName => resolve(colName)), GroupedData.CubeType) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala index 1381b9f1a6..f730e4ae00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala @@ -23,9 +23,40 @@ import scala.language.implicitConversions import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.Star import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.Aggregate +import org.apache.spark.sql.catalyst.plans.logical.{Rollup, Cube, Aggregate} import org.apache.spark.sql.types.NumericType +/** + * Companion object for GroupedData + */ +private[sql] object GroupedData { + def apply( + df: DataFrame, + groupingExprs: Seq[Expression], + groupType: GroupType): GroupedData = { + new GroupedData(df, groupingExprs, groupType: GroupType) + } + + /** + * The Grouping Type + */ + trait GroupType + + /** + * To indicate it's the GroupBy + */ + object GroupByType extends GroupType + + /** + * To indicate it's the CUBE + */ + object CubeType extends GroupType + + /** + * To indicate it's the ROLLUP + */ + object RollupType extends GroupType +} /** * :: Experimental :: @@ -34,19 +65,37 @@ import org.apache.spark.sql.types.NumericType * @since 1.3.0 */ @Experimental -class GroupedData protected[sql](df: DataFrame, groupingExprs: Seq[Expression]) { +class GroupedData protected[sql]( + df: DataFrame, + groupingExprs: Seq[Expression], + private val groupType: GroupedData.GroupType) { - private[sql] implicit def toDF(aggExprs: Seq[NamedExpression]): DataFrame = { - val namedGroupingExprs = groupingExprs.map { - case expr: NamedExpression => expr - case expr: Expression => Alias(expr, expr.prettyString)() + private[this] def toDF(aggExprs: Seq[NamedExpression]): DataFrame = { + val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) { + val retainedExprs = groupingExprs.map { + case expr: NamedExpression => expr + case expr: Expression => Alias(expr, expr.prettyString)() + } + retainedExprs ++ aggExprs + } else { + aggExprs + } + + groupType match { + case GroupedData.GroupByType => + DataFrame( + df.sqlContext, Aggregate(groupingExprs, aggregates, df.logicalPlan)) + case GroupedData.RollupType => + DataFrame( + df.sqlContext, Rollup(groupingExprs, df.logicalPlan, aggregates)) + case GroupedData.CubeType => + DataFrame( + df.sqlContext, Cube(groupingExprs, df.logicalPlan, aggregates)) } - DataFrame( - df.sqlContext, Aggregate(groupingExprs, namedGroupingExprs ++ aggExprs, df.logicalPlan)) } private[this] def aggregateNumericColumns(colNames: String*)(f: Expression => Expression) - : Seq[NamedExpression] = { + : DataFrame = { val columnExprs = if (colNames.isEmpty) { // No columns specified. Use all numeric columns. @@ -63,10 +112,10 @@ class GroupedData protected[sql](df: DataFrame, groupingExprs: Seq[Expression]) namedExpr } } - columnExprs.map { c => + toDF(columnExprs.map { c => val a = f(c) Alias(a, a.prettyString)() - } + }) } private[this] def strToExpr(expr: String): (Expression => Expression) = { @@ -119,10 +168,10 @@ class GroupedData protected[sql](df: DataFrame, groupingExprs: Seq[Expression]) * @since 1.3.0 */ def agg(exprs: Map[String, String]): DataFrame = { - exprs.map { case (colName, expr) => + toDF(exprs.map { case (colName, expr) => val a = strToExpr(expr)(df(colName).expr) Alias(a, a.prettyString)() - }.toSeq + }.toSeq) } /** @@ -175,19 +224,10 @@ class GroupedData protected[sql](df: DataFrame, groupingExprs: Seq[Expression]) */ @scala.annotation.varargs def agg(expr: Column, exprs: Column*): DataFrame = { - val aggExprs = (expr +: exprs).map(_.expr).map { + toDF((expr +: exprs).map(_.expr).map { case expr: NamedExpression => expr case expr: Expression => Alias(expr, expr.prettyString)() - } - if (df.sqlContext.conf.dataFrameRetainGroupColumns) { - val retainedExprs = groupingExprs.map { - case expr: NamedExpression => expr - case expr: Expression => Alias(expr, expr.prettyString)() - } - DataFrame(df.sqlContext, Aggregate(groupingExprs, retainedExprs ++ aggExprs, df.logicalPlan)) - } else { - DataFrame(df.sqlContext, Aggregate(groupingExprs, aggExprs, df.logicalPlan)) - } + }) } /** @@ -196,7 +236,7 @@ class GroupedData protected[sql](df: DataFrame, groupingExprs: Seq[Expression]) * * @since 1.3.0 */ - def count(): DataFrame = Seq(Alias(Count(Literal(1)), "count")()) + def count(): DataFrame = toDF(Seq(Alias(Count(Literal(1)), "count")())) /** * Compute the average value for each numeric columns for each group. This is an alias for `avg`. @@ -256,5 +296,5 @@ class GroupedData protected[sql](df: DataFrame, groupingExprs: Seq[Expression]) @scala.annotation.varargs def sum(colNames: String*): DataFrame = { aggregateNumericColumns(colNames:_*)(Sum) - } + } } |