aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2014-12-18 18:58:29 -0800
committerMichael Armbrust <michael@databricks.com>2014-12-18 18:58:29 -0800
commitf728e0fe7e860fe6dd3437e248472a67a2d435f8 (patch)
tree28a78bc4c5a9820d9558471882760c0134997c12 /sql/core
parent9804a759b68f56eceb8a2f4ea90f76a92b5f9f67 (diff)
downloadspark-f728e0fe7e860fe6dd3437e248472a67a2d435f8.tar.gz
spark-f728e0fe7e860fe6dd3437e248472a67a2d435f8.tar.bz2
spark-f728e0fe7e860fe6dd3437e248472a67a2d435f8.zip
[SPARK-2663] [SQL] Support the Grouping Set
Add support for `GROUPING SETS`, `ROLLUP`, `CUBE` and the the virtual column `GROUPING__ID`. More details on how to use the `GROUPING SETS" can be found at: https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation,+Cube,+Grouping+and+Rollup https://issues.apache.org/jira/secure/attachment/12676811/grouping_set.pdf The generic idea of the implementations are : 1 Replace the `ROLLUP`, `CUBE` with `GROUPING SETS` 2 Explode each of the input row, and then feed them to `Aggregate` * Each grouping set are represented as the bit mask for the `GroupBy Expression List`, for each bit, `1` means the expression is selected, otherwise `0` (left is the lower bit, and right is the higher bit in the `GroupBy Expression List`) * Several of projections are constructed according to the grouping sets, and within each projection(Seq[Expression), we replace those expressions with `Literal(null)` if it's not selected in the grouping set (based on the bit mask) * Output Schema of `Explode` is `child.output :+ grouping__id` * GroupBy Expressions of `Aggregate` is `GroupBy Expression List :+ grouping__id` * Keep the `Aggregation expressions` the same for the `Aggregate` The expressions substitutions happen in Logic Plan analyzing, so we will benefit from the Logical Plan optimization (e.g. expression constant folding, and map side aggregation etc.), Only an `Explosive` operator added for Physical Plan, which will explode the rows according the pre-set projections. A known issue will be done in the follow up PR: * Optimization `ColumnPruning` is not supported yet for `Explosive` node. Author: Cheng Hao <hao.cheng@intel.com> Closes #1567 from chenghao-intel/grouping_sets and squashes the following commits: fe65fcc [Cheng Hao] Remove the extra space 3547056 [Cheng Hao] Add more doc and Simplify the Expand a7c869d [Cheng Hao] update code as feedbacks d23c672 [Cheng Hao] Add GroupingExpression to replace the Seq[Expression] 414b165 [Cheng Hao] revert the unnecessary changes ec276c6 [Cheng Hao] Support Rollup/Cube/GroupingSets
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala79
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala2
2 files changed, 81 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
new file mode 100644
index 0000000000..9517242060
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.errors._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{UnknownPartitioning, Partitioning}
+
+/**
+ * Apply the all of the GroupExpressions to every input row, hence we will get
+ * multiple output rows for a input row.
+ * @param projections The group of expressions, all of the group expressions should
+ * output the same schema specified bye the parameter `output`
+ * @param output The output Schema
+ * @param child Child operator
+ */
+@DeveloperApi
+case class Expand(
+ projections: Seq[GroupExpression],
+ output: Seq[Attribute],
+ child: SparkPlan)
+ extends UnaryNode {
+
+ // The GroupExpressions can output data with arbitrary partitioning, so set it
+ // as UNKNOWN partitioning
+ override def outputPartitioning: Partitioning = UnknownPartitioning(0)
+
+ override def execute() = attachTree(this, "execute") {
+ child.execute().mapPartitions { iter =>
+ // TODO Move out projection objects creation and transfer to
+ // workers via closure. However we can't assume the Projection
+ // is serializable because of the code gen, so we have to
+ // create the projections within each of the partition processing.
+ val groups = projections.map(ee => newProjection(ee.children, child.output)).toArray
+
+ new Iterator[Row] {
+ private[this] var result: Row = _
+ private[this] var idx = -1 // -1 means the initial state
+ private[this] var input: Row = _
+
+ override final def hasNext = (-1 < idx && idx < groups.length) || iter.hasNext
+
+ override final def next(): Row = {
+ if (idx <= 0) {
+ // in the initial (-1) or beginning(0) of a new input row, fetch the next input tuple
+ input = iter.next()
+ idx = 0
+ }
+
+ result = groups(idx)(input)
+ idx += 1
+
+ if (idx == groups.length && iter.hasNext) {
+ idx = 0
+ }
+
+ result
+ }
+ }
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 1225d18857..6e04f26c84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -270,6 +270,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.Project(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
execution.Filter(condition, planLater(child)) :: Nil
+ case logical.Expand(projections, output, child) =>
+ execution.Expand(projections, output, planLater(child)) :: Nil
case logical.Aggregate(group, agg, child) =>
execution.Aggregate(partial = false, group, agg, planLater(child)) :: Nil
case logical.Sample(fraction, withReplacement, seed, child) =>