aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2016-10-13 13:27:57 +0800
committerWenchen Fan <wenchen@databricks.com>2016-10-13 13:27:57 +0800
commit064d6650e93ed6515a1309079c361e20404843cc (patch)
tree18c3fae7018c447592b88fa7265a0bcc031d8000
parentedeb51a39d76d64196d7635f52be1b42c7ec4341 (diff)
downloadspark-064d6650e93ed6515a1309079c361e20404843cc.tar.gz
spark-064d6650e93ed6515a1309079c361e20404843cc.tar.bz2
spark-064d6650e93ed6515a1309079c361e20404843cc.zip
[SPARK-17866][SPARK-17867][SQL] Fix Dataset.dropduplicates
## What changes were proposed in this pull request? Two issues regarding Dataset.dropduplicates: 1. Dataset.dropDuplicates should consider the columns with same column name We find and get the first resolved attribute from output with the given column name in `Dataset.dropDuplicates`. When we have the more than one columns with the same name. Other columns are put into aggregation columns, instead of grouping columns. 2. Dataset.dropDuplicates should not change the output of child plan We create new `Alias` with new exprId in `Dataset.dropDuplicates` now. However it causes problem when we want to select the columns as follows: val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS() // ds("_2") will cause analysis exception ds.dropDuplicates("_1").select(ds("_1").as[String], ds("_2").as[Int]) Because the two issues are both related to `Dataset.dropduplicates` and the code changes are not big, so submitting them together as one PR. ## How was this patch tested? Jenkins tests. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #15427 from viirya/fix-dropduplicates.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala17
2 files changed, 29 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index a7a84730a6..e59a483075 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1892,17 +1892,25 @@ class Dataset[T] private[sql](
def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
val resolver = sparkSession.sessionState.analyzer.resolver
val allColumns = queryExecution.analyzed.output
- val groupCols = colNames.map { colName =>
- allColumns.find(col => resolver(col.name, colName)).getOrElse(
+ val groupCols = colNames.flatMap { colName =>
+ // It is possibly there are more than one columns with the same name,
+ // so we call filter instead of find.
+ val cols = allColumns.filter(col => resolver(col.name, colName))
+ if (cols.isEmpty) {
throw new AnalysisException(
- s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})"""))
+ s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
+ }
+ cols
}
val groupColExprIds = groupCols.map(_.exprId)
val aggCols = logicalPlan.output.map { attr =>
if (groupColExprIds.contains(attr.exprId)) {
attr
} else {
- Alias(new First(attr).toAggregateExpression(), attr.name)()
+ // Removing duplicate rows should not change output attributes. We should keep
+ // the original exprId of the attribute. Otherwise, to select a column in original
+ // dataset will cause analysis exception due to unresolved attribute.
+ Alias(new First(attr).toAggregateExpression(), attr.name)(exprId = attr.exprId)
}
}
Aggregate(groupCols, aggCols, logicalPlan)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 3243f352a5..5fce9b4fe9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -872,6 +872,23 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
("a", 1), ("a", 2), ("b", 1))
}
+ test("dropDuplicates: columns with same column name") {
+ val ds1 = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
+ val ds2 = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
+ // The dataset joined has two columns of the same name "_2".
+ val joined = ds1.join(ds2, "_1").select(ds1("_2").as[Int], ds2("_2").as[Int])
+ checkDataset(
+ joined.dropDuplicates(),
+ (1, 2), (1, 1), (2, 1), (2, 2))
+ }
+
+ test("dropDuplicates should not change child plan output") {
+ val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
+ checkDataset(
+ ds.dropDuplicates("_1").select(ds("_1").as[String], ds("_2").as[Int]),
+ ("a", 1), ("b", 1))
+ }
+
test("SPARK-16097: Encoders.tuple should handle null object correctly") {
val enc = Encoders.tuple(Encoders.tuple(Encoders.STRING, Encoders.STRING), Encoders.STRING)
val data = Seq((("a", "b"), "c"), (null, "d"))