aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--project/MimaExcludes.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala14
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceDistinctWithAggregateSuite.scala42
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala31
7 files changed, 65 insertions, 35 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 34371c9659..73e4bfd78e 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -46,7 +46,9 @@ object MimaExcludes {
"org.apache.spark.api.java.JavaRDDLike.partitioner"),
// Mima false positive (was a private[spark] class)
ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.util.collection.PairIterator")
+ "org.apache.spark.util.collection.PairIterator"),
+ // SQL execution is considered private.
+ excludePackage("org.apache.spark.sql.execution")
)
case v if v.startsWith("1.4") =>
Seq(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 5c6379b8d4..0a17b10c52 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -36,6 +36,8 @@ object DefaultOptimizer extends Optimizer {
// SubQueries are only needed for analysis and can be removed before execution.
Batch("Remove SubQueries", FixedPoint(100),
EliminateSubQueries) ::
+ Batch("Distinct", FixedPoint(100),
+ ReplaceDistinctWithAggregate) ::
Batch("Operator Reordering", FixedPoint(100),
UnionPushdown,
CombineFilters,
@@ -696,3 +698,15 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {
LocalRelation(projectList.map(_.toAttribute), data.map(projection))
}
}
+
+/**
+ * Replaces logical [[Distinct]] operator with an [[Aggregate]] operator.
+ * {{{
+ * SELECT DISTINCT f1, f2 FROM t ==> SELECT f1, f2 FROM t GROUP BY f1, f2
+ * }}}
+ */
+object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case Distinct(child) => Aggregate(child.output, child.output, child)
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 33a9e55a47..e77e5c27b6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -339,6 +339,9 @@ case class Sample(
override def output: Seq[Attribute] = child.output
}
+/**
+ * Returns a new logical plan that dedups input rows.
+ */
case class Distinct(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceDistinctWithAggregateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceDistinctWithAggregateSuite.scala
new file mode 100644
index 0000000000..df29a62ff0
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceDistinctWithAggregateSuite.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Distinct, LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class ReplaceDistinctWithAggregateSuite extends PlanTest {
+
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches = Batch("ProjectCollapsing", Once, ReplaceDistinctWithAggregate) :: Nil
+ }
+
+ test("replace distinct with aggregate") {
+ val input = LocalRelation('a.int, 'b.int)
+
+ val query = Distinct(input)
+ val optimized = Optimize.execute(query.analyze)
+
+ val correctAnswer = Aggregate(input.output, input.output, input)
+
+ comparePlans(optimized, correctAnswer)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index d1a54ada7b..4a224153e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1311,7 +1311,7 @@ class DataFrame private[sql](
* @group dfops
* @since 1.3.0
*/
- override def distinct: DataFrame = Distinct(logicalPlan)
+ override def distinct: DataFrame = dropDuplicates()
/**
* @group basic
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index d0a1ad0056..7a1331a391 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -284,8 +284,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case r: RunnableCommand => ExecutedCommand(r) :: Nil
case logical.Distinct(child) =>
- execution.Distinct(partial = false,
- execution.Distinct(partial = true, planLater(child))) :: Nil
+ throw new IllegalStateException(
+ "logical distinct operator should have been replaced by aggregate in the optimizer")
case logical.Repartition(numPartitions, shuffle, child) =>
execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil
case logical.SortPartitions(sortExprs, child) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index a30ade8644..fb42072f9d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -232,37 +232,6 @@ case class ExternalSort(
/**
* :: DeveloperApi ::
- * Computes the set of distinct input rows using a HashSet.
- * @param partial when true the distinct operation is performed partially, per partition, without
- * shuffling the data.
- * @param child the input query plan.
- */
-@DeveloperApi
-case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
-
- override def requiredChildDistribution: Seq[Distribution] =
- if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil
-
- protected override def doExecute(): RDD[Row] = {
- child.execute().mapPartitions { iter =>
- val hashSet = new scala.collection.mutable.HashSet[Row]()
-
- var currentRow: Row = null
- while (iter.hasNext) {
- currentRow = iter.next()
- if (!hashSet.contains(currentRow)) {
- hashSet.add(currentRow.copy())
- }
- }
-
- hashSet.iterator
- }
- }
-}
-
-/**
- * :: DeveloperApi ::
* Return a new RDD that has exactly `numPartitions` partitions.
*/
@DeveloperApi