aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-07-12 12:07:27 -0700
committerReynold Xin <rxin@apache.org>2014-07-12 12:07:27 -0700
commit7e26b57615f6c1d3f9058f9c19c05ec91f017f4c (patch)
tree861bb60b7a1b84fe1fc2b943e5b6c4e663ad69f0 /sql
parent7a0135293192aaefc6ae20b57e15a90945bd8a4e (diff)
downloadspark-7e26b57615f6c1d3f9058f9c19c05ec91f017f4c.tar.gz
spark-7e26b57615f6c1d3f9058f9c19c05ec91f017f4c.tar.bz2
spark-7e26b57615f6c1d3f9058f9c19c05ec91f017f4c.zip
[SPARK-2441][SQL] Add more efficient distinct operator.
Author: Michael Armbrust <michael@databricks.com> Closes #1366 from marmbrus/partialDistinct and squashes the following commits: 12a31ab [Michael Armbrust] Add more efficient distinct operator.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala33
2 files changed, 34 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 7080074a69..c078e71fe0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -247,8 +247,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Distinct(child) =>
- execution.Aggregate(
- partial = false, child.output, child.output, planLater(child))(sqlContext) :: Nil
+ execution.Distinct(partial = false,
+ execution.Distinct(partial = true, planLater(child))) :: Nil
case logical.Sort(sortExprs, child) =>
// This sort is a global sort. Its requiredDistribution will be an OrderedDistribution.
execution.Sort(sortExprs, global = true, planLater(child)):: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 97abd636ab..966d8f95fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.util.MutablePair
/**
@@ -248,6 +248,37 @@ object ExistingRdd {
case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
override def execute() = rdd
}
+/**
+ * :: DeveloperApi ::
+ * Computes the set of distinct input rows using a HashSet.
+ * @param partial when true the distinct operation is performed partially, per partition, without
+ * shuffling the data.
+ * @param child the input query plan.
+ */
+@DeveloperApi
+case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
+ override def output = child.output
+
+ override def requiredChildDistribution =
+ if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil
+
+ override def execute() = {
+ child.execute().mapPartitions { iter =>
+ val hashSet = new scala.collection.mutable.HashSet[Row]()
+
+ var currentRow: Row = null
+ while (iter.hasNext) {
+ currentRow = iter.next()
+ if (!hashSet.contains(currentRow)) {
+ hashSet.add(currentRow.copy())
+ }
+ }
+
+ hashSet.iterator
+ }
+ }
+}
+
/**
* :: DeveloperApi ::