aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-05-11 19:15:14 -0700
committerMichael Armbrust <michael@databricks.com>2015-05-11 19:15:14 -0700
commitb6bf4f76c78abfaafa99b3c3c08b498aa9644346 (patch)
tree02578c66f048c2f8ada178e0f6fadbe9fc335210 /sql
parentf9c7580adadce75a94bd2854cf4f743d8cbd1d23 (diff)
downloadspark-b6bf4f76c78abfaafa99b3c3c08b498aa9644346.tar.gz
spark-b6bf4f76c78abfaafa99b3c3c08b498aa9644346.tar.bz2
spark-b6bf4f76c78abfaafa99b3c3c08b498aa9644346.zip
[SPARK-7324] [SQL] DataFrame.dropDuplicates
This should also close https://github.com/apache/spark/pull/5870 Author: Reynold Xin <rxin@databricks.com> Closes #6066 from rxin/dropDups and squashes the following commits: 130692f [Reynold Xin] [SPARK-7324][SQL] DataFrame.dropDuplicates
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala38
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala35
2 files changed, 71 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 2472999de3..265a61592b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql
import java.io.CharArrayWriter
import java.sql.DriverManager
-
import scala.collection.JavaConversions._
import scala.language.implicitConversions
import scala.reflect.ClassTag
@@ -42,7 +41,7 @@ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
import org.apache.spark.sql.jdbc.JDBCWriteDetails
-import org.apache.spark.sql.json.{JacksonGenerator, JsonRDD}
+import org.apache.spark.sql.json.JacksonGenerator
import org.apache.spark.sql.types._
import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect}
import org.apache.spark.util.Utils
@@ -933,6 +932,40 @@ class DataFrame private[sql](
}
/**
+ * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
+ * This is an alias for `distinct`.
+ * @group dfops
+ */
+ def dropDuplicates(): DataFrame = dropDuplicates(this.columns)
+
+ /**
+ * (Scala-specific) Returns a new [[DataFrame]] with duplicate rows removed, considering only
+ * the subset of columns.
+ *
+ * @group dfops
+ */
+ def dropDuplicates(colNames: Seq[String]): DataFrame = {
+ val groupCols = colNames.map(resolve)
+ val groupColExprIds = groupCols.map(_.exprId)
+ val aggCols = logicalPlan.output.map { attr =>
+ if (groupColExprIds.contains(attr.exprId)) {
+ attr
+ } else {
+ Alias(First(attr), attr.name)()
+ }
+ }
+ Aggregate(groupCols, aggCols, logicalPlan)
+ }
+
+ /**
+ * Returns a new [[DataFrame]] with duplicate rows removed, considering only
+ * the subset of columns.
+ *
+ * @group dfops
+ */
+ def dropDuplicates(colNames: Array[String]): DataFrame = dropDuplicates(colNames.toSeq)
+
+ /**
* Computes statistics for numeric columns, including count, mean, stddev, min, and max.
* If no columns are given, this function computes statistics for all numerical columns.
*
@@ -1089,6 +1122,7 @@ class DataFrame private[sql](
/**
* Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
+ * This is an alias for `dropDuplicates`.
* @group dfops
*/
override def distinct: DataFrame = Distinct(logicalPlan)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 7552c12881..2ade955864 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -457,4 +457,39 @@ class DataFrameSuite extends QueryTest {
assert(complexData.filter(complexData("m")("1") === 1).count() == 1)
assert(complexData.filter(complexData("s")("key") === 1).count() == 1)
}
+
+ test("SPARK-7324 dropDuplicates") {
+ val testData = TestSQLContext.sparkContext.parallelize(
+ (2, 1, 2) :: (1, 1, 1) ::
+ (1, 2, 1) :: (2, 1, 2) ::
+ (2, 2, 2) :: (2, 2, 1) ::
+ (2, 1, 1) :: (1, 1, 2) ::
+ (1, 2, 2) :: (1, 2, 1) :: Nil).toDF("key", "value1", "value2")
+
+ checkAnswer(
+ testData.dropDuplicates(),
+ Seq(Row(2, 1, 2), Row(1, 1, 1), Row(1, 2, 1),
+ Row(2, 2, 2), Row(2, 1, 1), Row(2, 2, 1),
+ Row(1, 1, 2), Row(1, 2, 2)))
+
+ checkAnswer(
+ testData.dropDuplicates(Seq("key", "value1")),
+ Seq(Row(2, 1, 2), Row(1, 2, 1), Row(1, 1, 1), Row(2, 2, 2)))
+
+ checkAnswer(
+ testData.dropDuplicates(Seq("value1", "value2")),
+ Seq(Row(2, 1, 2), Row(1, 2, 1), Row(1, 1, 1), Row(2, 2, 2)))
+
+ checkAnswer(
+ testData.dropDuplicates(Seq("key")),
+ Seq(Row(2, 1, 2), Row(1, 1, 1)))
+
+ checkAnswer(
+ testData.dropDuplicates(Seq("value1")),
+ Seq(Row(2, 1, 2), Row(1, 2, 1)))
+
+ checkAnswer(
+ testData.dropDuplicates(Seq("value2")),
+ Seq(Row(2, 1, 2), Row(1, 1, 1)))
+ }
}