diff options
author | Cheng Hao <hao.cheng@intel.com> | 2014-12-18 18:58:29 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-12-18 18:58:29 -0800 |
commit | f728e0fe7e860fe6dd3437e248472a67a2d435f8 (patch) | |
tree | 28a78bc4c5a9820d9558471882760c0134997c12 /sql/core | |
parent | 9804a759b68f56eceb8a2f4ea90f76a92b5f9f67 (diff) | |
download | spark-f728e0fe7e860fe6dd3437e248472a67a2d435f8.tar.gz spark-f728e0fe7e860fe6dd3437e248472a67a2d435f8.tar.bz2 spark-f728e0fe7e860fe6dd3437e248472a67a2d435f8.zip |
[SPARK-2663] [SQL] Support the Grouping Set
Add support for `GROUPING SETS`, `ROLLUP`, `CUBE` and the the virtual column `GROUPING__ID`.
More details on how to use the `GROUPING SETS" can be found at: https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation,+Cube,+Grouping+and+Rollup
https://issues.apache.org/jira/secure/attachment/12676811/grouping_set.pdf
The generic idea of the implementations are :
1 Replace the `ROLLUP`, `CUBE` with `GROUPING SETS`
2 Explode each of the input row, and then feed them to `Aggregate`
* Each grouping set are represented as the bit mask for the `GroupBy Expression List`, for each bit, `1` means the expression is selected, otherwise `0` (left is the lower bit, and right is the higher bit in the `GroupBy Expression List`)
* Several of projections are constructed according to the grouping sets, and within each projection(Seq[Expression), we replace those expressions with `Literal(null)` if it's not selected in the grouping set (based on the bit mask)
* Output Schema of `Explode` is `child.output :+ grouping__id`
* GroupBy Expressions of `Aggregate` is `GroupBy Expression List :+ grouping__id`
* Keep the `Aggregation expressions` the same for the `Aggregate`
The expressions substitutions happen in Logic Plan analyzing, so we will benefit from the Logical Plan optimization (e.g. expression constant folding, and map side aggregation etc.), Only an `Explosive` operator added for Physical Plan, which will explode the rows according the pre-set projections.
A known issue will be done in the follow up PR:
* Optimization `ColumnPruning` is not supported yet for `Explosive` node.
Author: Cheng Hao <hao.cheng@intel.com>
Closes #1567 from chenghao-intel/grouping_sets and squashes the following commits:
fe65fcc [Cheng Hao] Remove the extra space
3547056 [Cheng Hao] Add more doc and Simplify the Expand
a7c869d [Cheng Hao] update code as feedbacks
d23c672 [Cheng Hao] Add GroupingExpression to replace the Seq[Expression]
414b165 [Cheng Hao] revert the unnecessary changes
ec276c6 [Cheng Hao] Support Rollup/Cube/GroupingSets
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala | 79 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala | 2 |
2 files changed, 81 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala new file mode 100644 index 0000000000..9517242060 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.errors._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{UnknownPartitioning, Partitioning} + +/** + * Apply the all of the GroupExpressions to every input row, hence we will get + * multiple output rows for a input row. + * @param projections The group of expressions, all of the group expressions should + * output the same schema specified bye the parameter `output` + * @param output The output Schema + * @param child Child operator + */ +@DeveloperApi +case class Expand( + projections: Seq[GroupExpression], + output: Seq[Attribute], + child: SparkPlan) + extends UnaryNode { + + // The GroupExpressions can output data with arbitrary partitioning, so set it + // as UNKNOWN partitioning + override def outputPartitioning: Partitioning = UnknownPartitioning(0) + + override def execute() = attachTree(this, "execute") { + child.execute().mapPartitions { iter => + // TODO Move out projection objects creation and transfer to + // workers via closure. However we can't assume the Projection + // is serializable because of the code gen, so we have to + // create the projections within each of the partition processing. + val groups = projections.map(ee => newProjection(ee.children, child.output)).toArray + + new Iterator[Row] { + private[this] var result: Row = _ + private[this] var idx = -1 // -1 means the initial state + private[this] var input: Row = _ + + override final def hasNext = (-1 < idx && idx < groups.length) || iter.hasNext + + override final def next(): Row = { + if (idx <= 0) { + // in the initial (-1) or beginning(0) of a new input row, fetch the next input tuple + input = iter.next() + idx = 0 + } + + result = groups(idx)(input) + idx += 1 + + if (idx == groups.length && iter.hasNext) { + idx = 0 + } + + result + } + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 1225d18857..6e04f26c84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -270,6 +270,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Project(projectList, planLater(child)) :: Nil case logical.Filter(condition, child) => execution.Filter(condition, planLater(child)) :: Nil + case logical.Expand(projections, output, child) => + execution.Expand(projections, output, planLater(child)) :: Nil case logical.Aggregate(group, agg, child) => execution.Aggregate(partial = false, group, agg, planLater(child)) :: Nil case logical.Sample(fraction, withReplacement, seed, child) => |