aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-03-08 11:45:08 -0800
committerReynold Xin <rxin@databricks.com>2016-03-08 11:45:08 -0800
commit46881b4ea229aecbb481626d8b9fbca24c0df075 (patch)
treeb87768d6d17123041029896afa0ab7b1d6ef06b7 /sql/catalyst
parentad3c9a9730535faa6d270ee83412f79ec3db8333 (diff)
downloadspark-46881b4ea229aecbb481626d8b9fbca24c0df075.tar.gz
spark-46881b4ea229aecbb481626d8b9fbca24c0df075.tar.bz2
spark-46881b4ea229aecbb481626d8b9fbca24c0df075.zip
[SPARK-12727][SQL] support SQL generation for aggregate with multi-distinct
## What changes were proposed in this pull request? This PR add SQL generation support for aggregate with multi-distinct, by simply moving the `DistinctAggregationRewriter` rule to optimizer. More discussions are needed as this breaks an import contract: analyzed plan should be able to run without optimization. However, the `ComputeCurrentTime` rule has kind of broken it already, and I think maybe we should add a new phase for this kind of rules, because strictly speaking they don't belong to analysis and is coupled with the physical plan implementation. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #11579 from cloud-fan/distinct.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala5
3 files changed, 5 insertions, 9 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 268d7f21e6..9ab0a20a52 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -91,7 +91,6 @@ class Analyzer(
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
- DistinctAggregationRewriter(conf) ::
HiveTypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Nondeterministic", Once,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala
index 7518946a94..38c1641f73 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DistinctAggregationRewriter.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.catalyst.analysis
-import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Complete}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LogicalPlan}
@@ -100,13 +99,10 @@ import org.apache.spark.sql.types.IntegerType
* we could improve this in the current rule by applying more advanced expression cannocalization
* techniques.
*/
-case class DistinctAggregationRewriter(conf: CatalystConf) extends Rule[LogicalPlan] {
+object DistinctAggregationRewriter extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case p if !p.resolved => p
- // We need to wait until this Aggregate operator is resolved.
+ def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case a: Aggregate => rewrite(a)
- case p => p
}
def rewrite(a: Aggregate): Aggregate = {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index deea7238f5..7455e68ee8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.annotation.tailrec
import scala.collection.immutable.HashSet
-import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubqueryAliases}
+import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
@@ -42,7 +42,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
// we do not eliminate subqueries or compute current time in the analyzer.
Batch("Finish Analysis", Once,
EliminateSubqueryAliases,
- ComputeCurrentTime) ::
+ ComputeCurrentTime,
+ DistinctAggregationRewriter) ::
//////////////////////////////////////////////////////////////////////////////////////////
// Optimizer rules start here
//////////////////////////////////////////////////////////////////////////////////////////